MyException - 我的异常网
当前位置:我的异常网» 开源软件 » netty 默许Channel管道线-Inbound和Outbound事件处理

netty 默许Channel管道线-Inbound和Outbound事件处理

www.MyException.Cn  网友分享于:2013-08-22  浏览:0次
netty 默认Channel管道线-Inbound和Outbound事件处理
netty Inboudn/Outbound通道Invoker:http://donald-draper.iteye.com/blog/2388233
netty 异步任务-ChannelFuture:http://donald-draper.iteye.com/blog/2388297
netty 管道线定义-ChannelPipeline:http://donald-draper.iteye.com/blog/2388453
netty 默认Channel管道线初始化:http://donald-draper.iteye.com/blog/2388613
netty 默认Channel管道线-添加通道处理器:http://donald-draper.iteye.com/blog/2388726
netty 默认Channel管道线-通道处理器移除与替换:http://donald-draper.iteye.com/blog/2388793
引言:
前面一篇文章我们看了管道线处理器的移除和替换,先来回顾一下:
   无论是根据名称,处理器句柄,还是根据类型移除通道处理器,都是首先获取对应的处理器上下文,从管道中移除对应的上下文,如果通道已经从事件循环反注册,则添加移除回调任务到管道回调任务链,否则直接创建线程(触发上下文关联的处理器handlerRemoved事件,更新上下文状态为已移除),有上下文关联的事件执行器执行。
      无论是根据名称,处理器句柄,还是根据类型替换通道处理器,都是首先获取对应的
处理器上下文,然后添加新上下文到管道中原始上下文的位置,并将原始上下文的前驱和后继同时指向新上下文,以便转发剩余的buf内容;可以简单理解为添加新上下文,移除原始上下文,注意必须先添加,后移除,因为移除操作会触发channelRead和flush事件,而这些事件处理必须在handlerAdded事件后。
今天我们来看管道线触发Inbound和Outbound事件的操作:
//DefaultChannelPipeline
public class DefaultChannelPipeline implements ChannelPipeline 

//ChannelPipeline
public interface ChannelPipeline
        extends ChannelInboundInvoker, ChannelOutboundInvoker, Iterable<Entry<String, ChannelHandler>> 

从管道线类声明来看继承了Inbound和Outbound通道Invoker,来看相关的方法实现:
先来看Inbound事件触发操作:
//通道注册
@Override
public final ChannelPipeline fireChannelRegistered() {
    AbstractChannelHandlerContext.invokeChannelRegistered(head);
    return this;
}

//AbstractChannelHandlerContext
//触发上下文的invokeChannelRegistered方法
static void invokeChannelRegistered(final AbstractChannelHandlerContext next) {
    EventExecutor executor = next.executor();//获取上下文事件执行器
    //如果事件执行器在当前事务循环,则直接调用上下文invokeChannelRegistered方法
    if (executor.inEventLoop()) {
        next.invokeChannelRegistered();
    } else {
        //否则创建一个线程执行上下文invokeChannelRegistered方法,并有有上下文事务执行器运行
        executor.execute(new Runnable() {
            @Override
            public void run() {
                next.invokeChannelRegistered();
            }
        });
    }
}
//触发通道channelRegistered事件
private void invokeChannelRegistered() {
    //如果通道处理器已添加到管道
    if (invokeHandler()) {
        try {
	    //触发通道处理器的channelRegistered事件
            ((ChannelInboundHandler) handler()).channelRegistered(this);
        } catch (Throwable t) {
            notifyHandlerException(t);
        }
    } else {
        //转发事件消息
        fireChannelRegistered();
    }
}

上述方法有一下几点要看
1.
//判断通道处理器已添加到管道
/**
 * Makes best possible effort to detect if {@link ChannelHandler#handlerAdded(ChannelHandlerContext)} was called
 * yet. If not return {@code false} and if called or could not detect return {@code true}.
 *确保通道处理器的handlerAdded方法已触发。
 * If this method returns {@code false} we will not invoke the {@link ChannelHandler} but just forward the event.
 * This is needed as {@link DefaultChannelPipeline} may already put the {@link ChannelHandler} in the linked-list
 * but not called {@link ChannelHandler#handlerAdded(ChannelHandlerContext)}.
 如果失败,则不会调用通道处理器的相关事件处理方法,而是转发事件。这种情况主要针对通道处理器已经添加到管道,
 但通道处理器handlerAdded方法没有被调用的情况,即通道处理器关联的上下文已经添加管道上下文链,但并没有更新上下文状态
 和触发通道处理器的handlerAdded方法。
 */
private boolean invokeHandler() {
    // Store in local variable to reduce volatile reads.
    int handlerState = this.handlerState;
    return handlerState == ADD_COMPLETE || (!ordered && handlerState == ADD_PENDING);
}

2.
//转发事件消息
Override
public ChannelHandlerContext fireChannelRegistered() {
    //转发事件给上下文所属管道的下一个上下文
    invokeChannelRegistered(findContextInbound());
    return this;
}

//获取上下文所属管道的下一个Inbound上下文
private AbstractChannelHandlerContext findContextInbound() {
        AbstractChannelHandlerContext ctx = this;
        do {
            ctx = ctx.next;
        } while (!ctx.inbound);
        return ctx;
}

从上面可以看出,管道处理通道注册方法fireChannelRegistered,首先从
头部上下文开始,如果上下文已经添加到管道,则触发上下文关联的通道处理器的
channelRegistered事件,否则转发事件给上下文所属管道的下一个上下文。

再来看通道反注册方法
@Override
public final ChannelPipeline fireChannelUnregistered() {
    AbstractChannelHandlerContext.invokeChannelUnregistered(head);
    return this;
}

处理过程与fireChannelRegistered方法思路相同,只不过触发的通道处理器事件为channelUnregistered
,简单看一下
//AbstractChannelHandlerContext
static void invokeChannelUnregistered(final AbstractChannelHandlerContext next) {
    EventExecutor executor = next.executor();
    if (executor.inEventLoop()) {
        next.invokeChannelUnregistered();
    } else {
        executor.execute(new Runnable() {
            @Override
            public void run() {
                next.invokeChannelUnregistered();
            }
        });
    }
}
private void invokeChannelUnregistered() {
    if (invokeHandler()) {
        try {
	   //
            ((ChannelInboundHandler) handler()).channelUnregistered(this);
        } catch (Throwable t) {
            notifyHandlerException(t);
        }
    } else {
        fireChannelUnregistered();
    }
}
@Override
public ChannelHandlerContext fireChannelUnregistered() {
    invokeChannelUnregistered(findContextInbound());
    return this;
}


再来看管道处理关联通道激活事件:
//通道激活
@Override
public final ChannelPipeline fireChannelActive() {
    AbstractChannelHandlerContext.invokeChannelActive(head);
    return this;
}

//AbstractChannelHandlerContext
static void invokeChannelActive(final AbstractChannelHandlerContext next) {
    EventExecutor executor = next.executor();
    if (executor.inEventLoop()) {
        next.invokeChannelActive();
    } else {
        executor.execute(new Runnable() {
            @Override
            public void run() {
                next.invokeChannelActive();
            }
        });
    }
}

private void invokeChannelActive() {
    if (invokeHandler()) {
        try {
	    //触发处理器channelActive事件
            ((ChannelInboundHandler) handler()).channelActive(this);
        } catch (Throwable t) {
            notifyHandlerException(t);
        }
    } else {
        fireChannelActive();
    }
}

@Override
public ChannelHandlerContext fireChannelActive() {
    invokeChannelActive(findContextInbound());
    return this;
}


//通道断开
@Override
public final ChannelPipeline fireChannelInactive() {
    AbstractChannelHandlerContext.invokeChannelInactive(head);
    return this;
}

 static void invokeChannelInactive(final AbstractChannelHandlerContext next) {
     EventExecutor executor = next.executor();
     if (executor.inEventLoop()) {
         next.invokeChannelInactive();
     } else {
         executor.execute(new Runnable() {
             @Override
             public void run() {
                 next.invokeChannelInactive();
             }
         });
     }
 }

 private void invokeChannelInactive() {
     if (invokeHandler()) {
         try {
	     //触发处理器channelInactive事件
             ((ChannelInboundHandler) handler()).channelInactive(this);
         } catch (Throwable t) {
             notifyHandlerException(t);
         }
     } else {
         fireChannelInactive();
     }
 }

 @Override
 public ChannelHandlerContext fireChannelInactive() {
     invokeChannelInactive(findContextInbound());
     return this;
 }

//IO异常
@Override
public final ChannelPipeline fireExceptionCaught(Throwable cause) {
    AbstractChannelHandlerContext.invokeExceptionCaught(head, cause);
    return this;
}


//AbstractChannelHandlerContext
static void invokeExceptionCaught(final AbstractChannelHandlerContext next, final Throwable cause) {
    ObjectUtil.checkNotNull(cause, "cause");
    EventExecutor executor = next.executor();
    if (executor.inEventLoop()) {
        next.invokeExceptionCaught(cause);
    } else {
        try {
            executor.execute(new Runnable() {
                @Override
                public void run() {
                    next.invokeExceptionCaught(cause);
                }
            });
        } catch (Throwable t) {
            if (logger.isWarnEnabled()) {
                logger.warn("Failed to submit an exceptionCaught() event.", t);
                logger.warn("The exceptionCaught() event that was failed to submit was:", cause);
            }
        }
    }
}

private void invokeExceptionCaught(final Throwable cause) {
    if (invokeHandler()) {
        try {
	    //触发处理器exceptionCaught事件
            handler().exceptionCaught(this, cause);
        } catch (Throwable error) {
            if (logger.isDebugEnabled()) {
                logger.debug(
                    "An exception {}" +
                    "was thrown by a user handler's exceptionCaught() " +
                    "method while handling the following exception:",
                    ThrowableUtil.stackTraceToString(error), cause);
            } else if (logger.isWarnEnabled()) {
                logger.warn(
                    "An exception '{}' [enable DEBUG level for full stacktrace] " +
                    "was thrown by a user handler's exceptionCaught() " +
                    "method while handling the following exception:", error, cause);
            }
        }
    } else {
        fireExceptionCaught(cause);
    }
}

@Override
public ChannelHandlerContext fireExceptionCaught(final Throwable cause) {
    invokeExceptionCaught(next, cause);
    return this;
}


//触发用户事件
@Override
public final ChannelPipeline fireUserEventTriggered(Object event) {
    AbstractChannelHandlerContext.invokeUserEventTriggered(head, event);
    return this;
}

//AbstractChannelHandlerContext
static void invokeUserEventTriggered(final AbstractChannelHandlerContext next, final Object event) {
    ObjectUtil.checkNotNull(event, "event");
    EventExecutor executor = next.executor();
    if (executor.inEventLoop()) {
        next.invokeUserEventTriggered(event);
    } else {
        executor.execute(new Runnable() {
            @Override
            public void run() {
                next.invokeUserEventTriggered(event);
            }
        });
    }
}

private void invokeUserEventTriggered(Object event) {
    if (invokeHandler()) {
        try {
	     //触发处理器userEventTriggered事件
            ((ChannelInboundHandler) handler()).userEventTriggered(this, event);
        } catch (Throwable t) {
            notifyHandlerException(t);
        }
    } else {
        fireUserEventTriggered(event);
    }
}

@Override
public ChannelHandlerContext fireUserEventTriggered(final Object event) {
    invokeUserEventTriggered(findContextInbound(), event);
    return this;
}


//读数据
@Override
public final ChannelPipeline fireChannelRead(Object msg) {
    AbstractChannelHandlerContext.invokeChannelRead(head, msg);
    return this;
}

//AbstractChannelHandlerContext

static void invokeChannelRead(final AbstractChannelHandlerContext next, Object msg) {
    //记录消息引用对象,用于内存泄漏时,调试
    final Object m = next.pipeline.touch(ObjectUtil.checkNotNull(msg, "msg"), next);
    EventExecutor executor = next.executor();
    if (executor.inEventLoop()) {
        next.invokeChannelRead(m);
    } else {
        executor.execute(new Runnable() {
            @Override
            public void run() {
                next.invokeChannelRead(m);
            }
        });
    }
}

private void invokeChannelRead(Object msg) {
    if (invokeHandler()) {
        try {
	    //触发处理器channelRead事件
            ((ChannelInboundHandler) handler()).channelRead(this, msg);
        } catch (Throwable t) {
            notifyHandlerException(t);
        }
    } else {
        fireChannelRead(msg);
    }
}

@Override
public ChannelHandlerContext fireChannelRead(final Object msg) {
    invokeChannelRead(findContextInbound(), msg);
    return this;
}

读数据操作过程有一点我们需要关注一下:
//记录消息引用对象,用于内存泄漏时,调试
final Object m = next.pipeline.touch(ObjectUtil.checkNotNull(msg, "msg"), next);

//DefaultChannelPipeline
  final Object touch(Object msg, AbstractChannelHandlerContext next) {
        //如果内存泄漏探测开启,则记录消息引用对象,否则直至返回消息对象
        return touch ? ReferenceCountUtil.touch(msg, next) : msg;
    }

//ReferenceCountUtil
/**
     * Tries to call {@link ReferenceCounted#touch(Object)} if the specified message implements
     * {@link ReferenceCounted}.  If the specified message doesn't implement {@link ReferenceCounted},
     * this method does nothing.
     */
    @SuppressWarnings("unchecked")
    public static <T> T touch(T msg, Object hint) {
        if (msg instanceof ReferenceCounted) {
            return (T) ((ReferenceCounted) msg).touch(hint);
        }
        return msg;
    }

//ReferenceCounted
/**
     * Records the current access location of this object with an additional arbitrary information for debugging
     * purposes.  If this object is determined to be leaked, the information recorded by this operation will be
     * provided to you via {@link ResourceLeakDetector}.
     */
    ReferenceCounted touch(Object hint);


//读数据完成
@Override
public final ChannelPipeline fireChannelReadComplete() {
    AbstractChannelHandlerContext.invokeChannelReadComplete(head);
    return this;
}

//AbstractChannelHandlerContext
 static void invokeChannelReadComplete(final AbstractChannelHandlerContext next) {
     EventExecutor executor = next.executor();
     if (executor.inEventLoop()) {
         next.invokeChannelReadComplete();
     } else {
         Runnable task = next.invokeChannelReadCompleteTask;
         if (task == null) {
             next.invokeChannelReadCompleteTask = task = new Runnable() {
                 @Override
                 public void run() {
                     next.invokeChannelReadComplete();
                 }
             };
         }
         executor.execute(task);
     }
 }

 private void invokeChannelReadComplete() {
     if (invokeHandler()) {
         try {
             ((ChannelInboundHandler) handler()).channelReadComplete(this);
         } catch (Throwable t) {
             notifyHandlerException(t);
         }
     } else {
         fireChannelReadComplete();
     }
 }
@Override
 public ChannelHandlerContext fireChannelReadComplete() {
     invokeChannelReadComplete(findContextInbound());
     return this;
 }

//写状态改变
@Override
public final ChannelPipeline fireChannelWritabilityChanged() {
    AbstractChannelHandlerContext.invokeChannelWritabilityChanged(head);
    return this;
}

//AbstractChannelHandlerContext

static void invokeChannelWritabilityChanged(final AbstractChannelHandlerContext next) {
     EventExecutor executor = next.executor();
     if (executor.inEventLoop()) {
         next.invokeChannelWritabilityChanged();
     } else {
         Runnable task = next.invokeChannelWritableStateChangedTask;
         if (task == null) {
             next.invokeChannelWritableStateChangedTask = task = new Runnable() {
                 @Override
                 public void run() {
                     next.invokeChannelWritabilityChanged();
                 }
             };
         }
         executor.execute(task);
     }
 }

 private void invokeChannelWritabilityChanged() {
     if (invokeHandler()) {
         try {
             ((ChannelInboundHandler) handler()).channelWritabilityChanged(this);
         } catch (Throwable t) {
             notifyHandlerException(t);
         }
     } else {
         fireChannelWritabilityChanged();
     }
 }

 @Override
 public ChannelHandlerContext fireChannelWritabilityChanged() {
     invokeChannelWritabilityChanged(findContextInbound());
     return this;
 }

从上面可以看出管道处理Inbound事件首先从头部上下文开始,直到尾部上下文,最后默认直接丢弃。

再来看Outbound事件触发操作:
//绑定socket的地址
@Override
public final ChannelFuture bind(SocketAddress localAddress) {  
   //从管道尾部开始
    return tail.bind(localAddress);
}

//AbstractChannelHandlerContext
@Override
  public ChannelFuture bind(SocketAddress localAddress) {
      return bind(localAddress, newPromise());
  }

//创建通道任务DefaultChannelPromise
/**
 * The default {@link ChannelPromise} implementation.  It is recommended to use {@link Channel#newPromise()} to create
 * a new {@link ChannelPromise} rather than calling the constructor explicitly.
 */
public class DefaultChannelPromise extends DefaultPromise<Void> implements ChannelPromise, FlushCheckpoint {

    private final Channel channel;
    private long checkpoint;
    ...
    /**
     * Creates a new instance.
     *
     * @param channel
     *        the {@link Channel} associated with this future
     */
    public DefaultChannelPromise(Channel channel, EventExecutor executor) {
        super(executor);
        this.channel = channel;
    }
}

//绑定socket地址
 @Override
 public ChannelFuture bind(final SocketAddress localAddress, final ChannelPromise promise) {
     if (localAddress == null) {
         throw new NullPointerException("localAddress");
     }
     if (isNotValidPromise(promise, false)) {
        //非可写通道任务,直接返回
         // cancelled
         return promise;
     }
    //从当前上下文开始(尾部),向前找到第一个Outbound上下文,处理地址绑定事件
     final AbstractChannelHandlerContext next = findContextOutbound();
     //获取上下为事件执行器
     EventExecutor executor = next.executor();
     if (executor.inEventLoop()) {
         //如果事件执行器线程在事件循环中,则直接委托给invokeBind
         next.invokeBind(localAddress, promise);
     } else {
         safeExecute(executor, new Runnable() {
             @Override
             public void run() {
                 next.invokeBind(localAddress, promise);
             }
         }, promise, null);
     }
     return promise;
 }
//触发通道处理器地址绑定事件
 private void invokeBind(SocketAddress localAddress, ChannelPromise promise) {
     
     if (invokeHandler()) {//如果通道处理器已经添加到管道中
         try {
	    //触发Outbound通道处理器的bind事件方法
             ((ChannelOutboundHandler) handler()).bind(this, localAddress, promise);
         } catch (Throwable t) {
             notifyOutboundHandlerException(t, promise);
         }
     } else {
        //否则传递绑定事件给管道中的下一个Outbound上下文
         bind(localAddress, promise);
     }
 }

  private static void safeExecute(EventExecutor executor, Runnable runnable, ChannelPromise promise, Object msg) {
        try {
            executor.execute(runnable);
        } catch (Throwable cause) {
            try {
                promise.setFailure(cause);
            } finally {
                if (msg != null) {
                    ReferenceCountUtil.release(msg);
                }
            }
        }
    }

再来看一下寻找Outbound处理器:
private AbstractChannelHandlerContext findContextOutbound() {
    AbstractChannelHandlerContext ctx = this;
    do {
        ctx = ctx.prev;
    } while (!ctx.outbound);
    return ctx;
}


从上面可以看出管道处理通道处理器地址绑定bind事件,首先从管道上下文链的尾部开始,
寻找Outbound上下文,获取上下文的事件执行器,如果事件执行器线程在当前事件循环中,
则触发通道处理器地址绑定事件#invokeBind,否则创建一个线程,执行事件触发操作,并交由事件执行器执行;#invokeBind首先判断通道处理器是否已经添加到管道,如果以添加,则触发Outbound通道处理器的bind事件方法,否则,传递地址绑定事件给管道中的下一个Outbound上下文。

管道处理器其他Outbound事件的思路,基本相同,我们只展示一下代码
//连接指定远端地址
@Override
public final ChannelFuture connect(SocketAddress remoteAddress) {
    return tail.connect(remoteAddress);
}

//AbstractChannelHandlerContext
@Override
public ChannelFuture connect(SocketAddress remoteAddress) {
    return connect(remoteAddress, newPromise());
}
@Override
public ChannelFuture connect(SocketAddress remoteAddress, ChannelPromise promise) {
    return connect(remoteAddress, null, promise);
}
@Override
public ChannelFuture connect(
        final SocketAddress remoteAddress, final SocketAddress localAddress, final ChannelPromise promise) {

    if (remoteAddress == null) {
        throw new NullPointerException("remoteAddress");
    }
    if (isNotValidPromise(promise, false)) {
        // cancelled
        return promise;
    }

    final AbstractChannelHandlerContext next = findContextOutbound();
    EventExecutor executor = next.executor();
    if (executor.inEventLoop()) {
        next.invokeConnect(remoteAddress, localAddress, promise);
    } else {
        safeExecute(executor, new Runnable() {
            @Override
            public void run() {
                next.invokeConnect(remoteAddress, localAddress, promise);
            }
        }, promise, null);
    }
    return promise;
}

private void invokeConnect(SocketAddress remoteAddress, SocketAddress localAddress, ChannelPromise promise) {
    if (invokeHandler()) {
        try {
            ((ChannelOutboundHandler) handler()).connect(this, remoteAddress, localAddress, promise);
        } catch (Throwable t) {
            notifyOutboundHandlerException(t, promise);
        }
    } else {
        connect(remoteAddress, localAddress, promise);
    }
}


//绑定本地地址,连接远端地址
@Override
public final ChannelFuture connect(SocketAddress remoteAddress, SocketAddress localAddress) {
    return tail.connect(remoteAddress, localAddress);
}

//AbstractChannelHandlerContext
@Override
 public ChannelFuture connect(SocketAddress remoteAddress, SocketAddress localAddress) {
     return connect(remoteAddress, localAddress, newPromise());
 }

//断开通道连接
@Override
public final ChannelFuture disconnect() {
    return tail.disconnect();
}

//AbstractChannelHandlerContext
 @Override
 public ChannelFuture disconnect() {
     return disconnect(newPromise());
 }

@Override
public ChannelFuture disconnect(final ChannelPromise promise) {
    if (isNotValidPromise(promise, false)) {
        // cancelled
        return promise;
    }

    final AbstractChannelHandlerContext next = findContextOutbound();
    EventExecutor executor = next.executor();
    if (executor.inEventLoop()) {
        // Translate disconnect to close if the channel has no notion of disconnect-reconnect.
        // So far, UDP/IP is the only transport that has such behavior.
        if (!channel().metadata().hasDisconnect()) {
	    //如果还没有连接,则关闭通道
            next.invokeClose(promise);
        } else {
	   //否则断开通道
            next.invokeDisconnect(promise);
        }
    } else {
        safeExecute(executor, new Runnable() {
            @Override
            public void run() {
                if (!channel().metadata().hasDisconnect()) {
                    next.invokeClose(promise);
                } else {
                    next.invokeDisconnect(promise);
                }
            }
        }, promise, null);
    }
    return promise;
}
//触发通道处理器关闭事件
  private void invokeClose(ChannelPromise promise) {
        if (invokeHandler()) {
            try {
                ((ChannelOutboundHandler) handler()).close(this, promise);
            } catch (Throwable t) {
                notifyOutboundHandlerException(t, promise);
            }
        } else {
            close(promise);
        }
    }
 //触发通道处理器断开事件
 private void invokeDisconnect(ChannelPromise promise) {
        if (invokeHandler()) {
            try {
                ((ChannelOutboundHandler) handler()).disconnect(this, promise);
            } catch (Throwable t) {
                notifyOutboundHandlerException(t, promise);
            }
        } else {
            disconnect(promise);
        }
    }

//关闭通道
@Override
public final ChannelFuture close() {
    return tail.close();
}

//AbstractChannelHandlerContext
@Override
 public ChannelFuture close() {
     return close(newPromise());
 }
 @Override
public ChannelFuture close(final ChannelPromise promise) {
    if (isNotValidPromise(promise, false)) {
        // cancelled
        return promise;
    }

    final AbstractChannelHandlerContext next = findContextOutbound();
    EventExecutor executor = next.executor();
    if (executor.inEventLoop()) {
        next.invokeClose(promise);
    } else {
        safeExecute(executor, new Runnable() {
            @Override
            public void run() {
                next.invokeClose(promise);
            }
        }, promise, null);
    }

    return promise;
}

private void invokeClose(ChannelPromise promise) {
    if (invokeHandler()) {
        try {
            ((ChannelOutboundHandler) handler()).close(this, promise);
        } catch (Throwable t) {
            notifyOutboundHandlerException(t, promise);
        }
    } else {
        close(promise);
    }
}


//反注册
@Override
public final ChannelFuture deregister() {
    return tail.deregister();
}

//AbstractChannelHandlerContext
@Override
public ChannelFuture deregister() {
    return deregister(newPromise());
}
@Override
public ChannelFuture deregister(final ChannelPromise promise) {
    if (isNotValidPromise(promise, false)) {
        // cancelled
        return promise;
    }

    final AbstractChannelHandlerContext next = findContextOutbound();
    EventExecutor executor = next.executor();
    if (executor.inEventLoop()) {
        next.invokeDeregister(promise);
    } else {
        safeExecute(executor, new Runnable() {
            @Override
            public void run() {
                next.invokeDeregister(promise);
            }
        }, promise, null);
    }

    return promise;
}

private void invokeDeregister(ChannelPromise promise) {
    if (invokeHandler()) {
        try {
            ((ChannelOutboundHandler) handler()).deregister(this, promise);
        } catch (Throwable t) {
            notifyOutboundHandlerException(t, promise);
        }
    } else {
        deregister(promise);
    }
}

//刷新写请求数据
@Override
public final ChannelPipeline flush() {
    tail.flush();
    return this;
}

//AbstractChannelHandlerContext
@Override
public ChannelHandlerContext flush() {
    final AbstractChannelHandlerContext next = findContextOutbound();
    EventExecutor executor = next.executor();
    if (executor.inEventLoop()) {
        //触发通道处理器刷新事件操作
        next.invokeFlush();
    } else {
        //获取当前上下文的刷新任务线程
        Runnable task = next.invokeFlushTask;
        if (task == null) {
            next.invokeFlushTask = task = new Runnable() {
                @Override
                public void run() {
                    next.invokeFlush();
                }
            };
        }
        safeExecute(executor, task, channel().voidPromise(), null);
    }

    return this;
}
 private void invokeFlush() {
    if (invokeHandler()) {
        invokeFlush0();
    } else {
        flush();
    }
}

private void invokeFlush0() {
    try {
        ((ChannelOutboundHandler) handler()).flush(this);
    } catch (Throwable t) {
        notifyHandlerException(t);
    }
}

//读操作
@Override
public final ChannelPipeline read() {
    tail.read();
    return this;
}

//AbstractChannelHandlerContext
 @Override
public ChannelHandlerContext read() {
    final AbstractChannelHandlerContext next = findContextOutbound();
    EventExecutor executor = next.executor();
    if (executor.inEventLoop()) {
        next.invokeRead();
    } else {
       //获取上下文读任务线程
        Runnable task = next.invokeReadTask;
        if (task == null) {
            next.invokeReadTask = task = new Runnable() {
                @Override
                public void run() {
                    next.invokeRead();
                }
            };
        }
        executor.execute(task);
    }

    return this;
}

private void invokeRead() {
    if (invokeHandler()) {
        try {
            ((ChannelOutboundHandler) handler()).read(this);
        } catch (Throwable t) {
            notifyHandlerException(t);
        }
    } else {
        read();
    }
}


//写消息
@Override
public final ChannelFuture write(Object msg) {
    return tail.write(msg);
}

//AbstractChannelHandlerContext
 @Override
 public ChannelFuture write(Object msg) {
     return write(msg, newPromise());
 }
 @Override
    public ChannelFuture write(final Object msg, final ChannelPromise promise) {
        if (msg == null) {
            throw new NullPointerException("msg");
        }

        try {
            if (isNotValidPromise(promise, true)) {
                ReferenceCountUtil.release(msg);
                // cancelled
                return promise;
            }
        } catch (RuntimeException e) {
	    //出现异常,释放消息对象
            ReferenceCountUtil.release(msg);
            throw e;
        }
        write(msg, false, promise);

        return promise;
    }

    private void invokeWrite(Object msg, ChannelPromise promise) {
        if (invokeHandler()) {
            invokeWrite0(msg, promise);
        } else {
            write(msg, promise);
        }
    }

    private void invokeWrite0(Object msg, ChannelPromise promise) {
        try {
            ((ChannelOutboundHandler) handler()).write(this, msg, promise);
        } catch (Throwable t) {
            notifyOutboundHandlerException(t, promise);
        }
    }


//写消息,并发送
@Override
public final ChannelFuture writeAndFlush(Object msg) {
    return tail.writeAndFlush(msg);
}

//AbstractChannelHandlerContext
@Override
public ChannelFuture writeAndFlush(Object msg) {
    return writeAndFlush(msg, newPromise());
}
 @Override
public ChannelFuture writeAndFlush(Object msg, ChannelPromise promise) {
    if (msg == null) {
        throw new NullPointerException("msg");
    }

    if (isNotValidPromise(promise, true)) {
        ReferenceCountUtil.release(msg);
        // cancelled
        return promise;
    }

    write(msg, true, promise);

    return promise;
}
private void write(Object msg, boolean flush, ChannelPromise promise) {
    AbstractChannelHandlerContext next = findContextOutbound();
    final Object m = pipeline.touch(msg, next);
    EventExecutor executor = next.executor();
    if (executor.inEventLoop()) {
        if (flush) {
	    //如果刷新,则调用invokeWriteAndFlush
            next.invokeWriteAndFlush(m, promise);
        } else {
	    //否则调用invokeWrite
            next.invokeWrite(m, promise);
        }
    } else {
        AbstractWriteTask task;
        if (flush) {
	    //创建写刷新任务线程
            task = WriteAndFlushTask.newInstance(next, m, promise);
        }  else {
	    //创建写任务线程
            task = WriteTask.newInstance(next, m, promise);
        }
	//执行任务线程
        safeExecute(executor, task, promise, m);
    }
}

private void invokeWriteAndFlush(Object msg, ChannelPromise promise) {
    if (invokeHandler()) {
        invokeWrite0(msg, promise);
        invokeFlush0();
    } else {
        writeAndFlush(msg, promise);
    }
}


通道处理器上下文的写WriteTask和刷新写WriteAndFlushTask任务线程定义,见附篇AbstractChannelHandlerContext部分。

下面的这些带可以通道任务参数ChannelPromise,与上面相应的方法基本相同不再说
@Override
public final ChannelFuture bind(SocketAddress localAddress, ChannelPromise promise) {
    return tail.bind(localAddress, promise);
}

@Override
public final ChannelFuture connect(SocketAddress remoteAddress, ChannelPromise promise) {
    return tail.connect(remoteAddress, promise);
}

@Override
public final ChannelFuture connect(
        SocketAddress remoteAddress, SocketAddress localAddress, ChannelPromise promise) {
    return tail.connect(remoteAddress, localAddress, promise);
}

@Override
public final ChannelFuture disconnect(ChannelPromise promise) {
    return tail.disconnect(promise);
}

@Override
public final ChannelFuture close(ChannelPromise promise) {
    return tail.close(promise);
}

@Override
public final ChannelFuture deregister(final ChannelPromise promise) {
    return tail.deregister(promise);
}

@Override
public final ChannelFuture write(Object msg, ChannelPromise promise) {
    return tail.write(msg, promise);
}

@Override
public final ChannelFuture writeAndFlush(Object msg, ChannelPromise promise) {
    return tail.writeAndFlush(msg, promise);
}

从上面可以看,管道处理Outbound相关事件,从尾部上下文到头部上下文,到达头部时,交由
上下文所属管道关联的Channel的Unsafe处理。

总结:

      管道处理通道注册方法fireChannelRegistered,首先从头部上下文开始,如果上下文已经添加到管道,则触发上下文关联的通道处理器的channelRegistered事件,否则转发事件给上下文所属管道的下一个上下文;其他处罚Inbound事件的处理过程与fireChannelRegistered方法思路相同,只不过触发的通道处理器的相应事件。管道处理Inbound事件首先从头部上下文开始,直到尾部上下文,最后默认直接丢弃。
     管道处理通道处理器地址绑定bind事件,首先从管道上下文链的尾部开始,寻找Outbound上下文,获取上下文的事件执行器,如果事件执行器线程在当前事件循环中,则触发通道处理器地址绑定事件#invokeBind,否则创建一个线程,执行事件触发操作,并交由事件执行器执行;#invokeBind首先判断通道处理器是否已经添加到管道,如果以添加,则触发Outbound通道处理器的bind事件方法,否则,传递地址绑定事件给管道中的下一个Outbound上下文。管道处理Outbound相关事件,从尾部上下文到头部上下文,到达头部时,交由
上下文所属管道关联的Channel的Unsafe处理。

附:
//DefaultChannelPipeline
//创建通道异步任务结果
@Override
public final ChannelPromise newPromise() {
    return new DefaultChannelPromise(channel);
}

@Override
public final ChannelProgressivePromise newProgressivePromise() {
    return new DefaultChannelProgressivePromise(channel);
}

@Override
public final ChannelFuture newSucceededFuture() {
    return succeededFuture;
}

@Override
public final ChannelFuture newFailedFuture(Throwable cause) {
    return new FailedChannelFuture(channel, null, cause);
}

@Override
public final ChannelPromise voidPromise() {
    return voidPromise;
}

再来看管道的其他方法
//获取通道消息大小估算Handler
final MessageSizeEstimator.Handle estimatorHandle() {
    if (estimatorHandle == null) {
        estimatorHandle = channel.config().getMessageSizeEstimator().newHandle();
    }
    return estimatorHandle;
}

//MessageSizeEstimator
/**
 * Responsible to estimate size of a message. The size represent how much memory the message will ca. reserve in
 * memory.
 负责估算消息的大小。大小表示消息需要多少内存,以便预留内存
 */
public interface MessageSizeEstimator {
    /**
     * Creates a new handle. The handle provides the actual operations.
     */
    Handle newHandle();
    interface Handle {
        /**
         * Calculate the size of the given message.
	 计算给定消息的大小
         *
         * @param msg       The message for which the size should be calculated
         * @return size     The size in bytes. The returned size must be >= 0
         */
        int size(Object msg);
    }
}

//获取事件执行器组的事件执行器
private EventExecutor childExecutor(EventExecutorGroup group) {
     if (group == null) {
         return null;
     }
     //获取通道的SINGLE_EVENTEXECUTOR_PER_GROUP的配置项
     //是否每个事件执行器组拥有一个执行器
     Boolean pinEventExecutor = channel.config().getOption(ChannelOption.SINGLE_EVENTEXECUTOR_PER_GROUP);
     if (pinEventExecutor != null && !pinEventExecutor) {
         //如果可以同用事件执行器,则返回事件执行器组的事件执行器
         return group.next();
     }
     //否则获取管道的子事件执行器
     Map<EventExecutorGroup, EventExecutor> childExecutors = this.childExecutors;
     if (childExecutors == null) {
         //如果子执行器为空,则创建四个事件执行器组
         // Use size of 4 as most people only use one extra EventExecutor.
         childExecutors = this.childExecutors = new IdentityHashMap<EventExecutorGroup, EventExecutor>(4);
     }
     // Pin one of the child executors once and remember it so that the same child executor
     // is used to fire events for the same channel.
     //获取分组的事件执行器
     EventExecutor childExecutor = childExecutors.get(group);
     if (childExecutor == null) {
         //如果事件执行器组的事件执行器为空,则获取下一个事件执行器
         childExecutor = group.next();
         childExecutors.put(group, childExecutor);
     }
     return childExecutor;
 }

//EventExecutorGroup
/**
 * The {@link EventExecutorGroup} is responsible for providing the {@link EventExecutor}'s to use
 * via its {@link #next()} method. Besides this, it is also responsible for handling their
 * life-cycle and allows shutting them down in a global fashion.
 *
 */
public interface EventExecutorGroup extends ScheduledExecutorService, Iterable<EventExecutor> {


//EventExecutor
/**
 * The {@link EventExecutor} is a special {@link EventExecutorGroup} which comes
 * with some handy methods to see if a {@link Thread} is executed in a event loop.
 * Besides this, it also extends the {@link EventExecutorGroup} to allow for a generic
 * way to access methods.
 *
 */
public interface EventExecutor extends EventExecutorGroup {


//IdentityHashMap
/**
 * This class implements the <tt>Map</tt> interface with a hash table, using
 * reference-equality in place of object-equality when comparing keys (and
 * values).  In other words, in an <tt>IdentityHashMap</tt>, two keys
 * <tt>k1</tt> and <tt>k2</tt> are considered equal if and only if
 * <tt>(k1==k2)</tt>.  (In normal <tt>Map</tt> implementations (like
 * <tt>HashMap</tt>) two keys <tt>k1</tt> and <tt>k2</tt> are considered equal
 * if and only if <tt>(k1==null ? k2==null : k1.equals(k2))</tt>.)
 ...
  *
 * <p><b>This class is <i>not</i> a general-purpose <tt>Map</tt>
 * implementation!  While this class implements the <tt>Map</tt> interface, it
 * intentionally violates <tt>Map's</tt> general contract, which mandates the
 * use of the <tt>equals</tt> method when comparing objects.  This class is
 * designed for use only in the rare cases wherein reference-equality
 * semantics are required.</b>
 *
 * <p>A typical use of this class is <i>topology-preserving object graph
 * transformations</i>, such as serialization or deep-copying.  To perform such
 * a transformation, a program must maintain a "node table" that keeps track
 * of all the object references that have already been processed.  The node
 * table must not equate distinct objects even if they happen to be equal.
 * Another typical use of this class is to maintain <i>proxy objects</i>.  For
 * example, a debugging facility might wish to maintain a proxy object for
 * each object in the program being debugged.
  *
 * @see     System#identityHashCode(Object)
 * @see     Object#hashCode()
 * @see     Collection
 * @see     Map
 * @see     HashMap
 * @see     TreeMap
 * @author  Doug Lea and Josh Bloch
 * @since   1.4
 IdentityHashMap与普通Map的区别在于Key的相等的条件不同,一般判断key是否相等
 为if(k1==null ? k2==null : k1.equals(k2))则相等,而IdentityHashMap为k1==k2
 相同,则任务是相等,可以用于存储
 */
public class IdentityHashMap<K,V>
    extends AbstractMap<K,V>


 //获取关联通道
 @Override
 public final Channel channel() {
     return channel;
 }

 //获取通道处理对应的名称
 private String generateName(ChannelHandler handler) {
        //从通道处理器命名缓存,获取当前线程的通道处理器命名缓存
        Map<Class<?>, String> cache = nameCaches.get();
        Class<?> handlerType = handler.getClass();//获取处理器类型
        String name = cache.get(handlerType);//从缓冲获取通道处理器对应的名称
        if (name == null) {
	    //如果名称为空,则生成处理器名称,并放入缓存
            name = generateName0(handlerType);
            cache.put(handlerType, name);
        }

        // It's not very likely for a user to put more than one handler of the same type, but make sure to avoid
        // any name conflicts.  Note that we don't cache the names generated here.
	//简单是否存在name对应的上下文
        if (context0(name) != null) {
	    //存在,则重新命名
            String baseName = name.substring(0, name.length() - 1); // Strip the trailing '0'.
            for (int i = 1;; i ++) {
                String newName = baseName + i;
                if (context0(newName) == null) {
                    name = newName;
                    break;
                }
            }
        }
        return name;
    }

下面是通道处理器上下文的写和刷新写任务线程定义,只贴出,后面将通道处理器上下文的时候再讲.
//AbstractChannelHandlerContext
abstract static class AbstractWriteTask implements Runnable {
    //在提交时是否估算任务size
    private static final boolean ESTIMATE_TASK_SIZE_ON_SUBMIT =
            SystemPropertyUtil.getBoolean("io.netty.transport.estimateSizeOnSubmit", true);
    //任务负载
    // Assuming a 64-bit JVM, 16 bytes object header, 3 reference fields and one int field, plus alignment
    private static final int WRITE_TASK_OVERHEAD =
            SystemPropertyUtil.getInt("io.netty.transport.writeTaskSizeOverhead", 48);

    private final Recycler.Handle<AbstractWriteTask> handle;
    private AbstractChannelHandlerContext ctx;
    private Object msg;
    private ChannelPromise promise;
    private int size;

    @SuppressWarnings("unchecked")
    private AbstractWriteTask(Recycler.Handle<? extends AbstractWriteTask> handle) {
        this.handle = (Recycler.Handle<AbstractWriteTask>) handle;
    }
    //初始化写任务
    protected static void init(AbstractWriteTask task, AbstractChannelHandlerContext ctx,
                               Object msg, ChannelPromise promise) {
        task.ctx = ctx;
        task.msg = msg;
        task.promise = promise;

        if (ESTIMATE_TASK_SIZE_ON_SUBMIT) {
	        //获取Outbound buf,从上下文关联通道的unsafe获取
            ChannelOutboundBuffer buffer = ctx.channel().unsafe().outboundBuffer();

            // Check for null as it may be set to null if the channel is closed already
            if (buffer != null) {
                task.size = ctx.pipeline.estimatorHandle().size(msg) + WRITE_TASK_OVERHEAD;
                buffer.incrementPendingOutboundBytes(task.size);
            } else {
                task.size = 0;
            }
        } else {
            task.size = 0;
        }
    }

    @Override
    public final void run() {
        try {
            ChannelOutboundBuffer buffer = ctx.channel().unsafe().outboundBuffer();
            // Check for null as it may be set to null if the channel is closed already
            if (ESTIMATE_TASK_SIZE_ON_SUBMIT && buffer != null) {
                buffer.decrementPendingOutboundBytes(size);
            }
		//写消息
            write(ctx, msg, promise);
        } finally {
            // Set to null so the GC can collect them directly
            ctx = null;
            msg = null;
            promise = null;
            handle.recycle(this);
        }
    }
   //委托给上下文
    protected void write(AbstractChannelHandlerContext ctx, Object msg, ChannelPromise promise) {
        ctx.invokeWrite(msg, promise);
    }
}

static final class WriteTask extends AbstractWriteTask implements SingleThreadEventLoop.NonWakeupRunnable {

    private static final Recycler<WriteTask> RECYCLER = new Recycler<WriteTask>() {
        @Override
        protected WriteTask newObject(Handle<WriteTask> handle) {
            return new WriteTask(handle);
        }
    };
    //创建写任务
    private static WriteTask newInstance(
            AbstractChannelHandlerContext ctx, Object msg, ChannelPromise promise) {
        WriteTask task = RECYCLER.get();
	//初始化写任务
        init(task, ctx, msg, promise);
        return task;
    }

    private WriteTask(Recycler.Handle<WriteTask> handle) {
        super(handle);
    }
}

static final class WriteAndFlushTask extends AbstractWriteTask {

    private static final Recycler<WriteAndFlushTask> RECYCLER = new Recycler<WriteAndFlushTask>() {
        @Override
        protected WriteAndFlushTask newObject(Handle<WriteAndFlushTask> handle) {
            return new WriteAndFlushTask(handle);
        }
    };
    //创建写刷新任务
    private static WriteAndFlushTask newInstance(
            AbstractChannelHandlerContext ctx, Object msg,  ChannelPromise promise) {
        WriteAndFlushTask task = RECYCLER.get();
	//初始化写任务
        init(task, ctx, msg, promise);
        return task;
    }

    private WriteAndFlushTask(Recycler.Handle<WriteAndFlushTask> handle) {
        super(handle);
    }

    @Override
    public void write(AbstractChannelHandlerContext ctx, Object msg, ChannelPromise promise) {
        super.write(ctx, msg, promise);
	//调用上下文flush方法
        ctx.invokeFlush();
    }
}

从上面可以看出管道的写消息和写刷新消息操作,当事件执行器线程不在当前事务中时,
则创建写任务WriteTask或写刷新任务WriteAndFlushTask线程,并交由事件执行器执行。
这两任务的定义在抽象上下文的内部,两种任务的写请求实际为委托给关联上下文的invokeWrite方法,
而对于写刷新任务,在调用关联上下文的invokeWrite方法后,并调用invokeFlush方法。

文章评论

鲜为人知的编程真相
鲜为人知的编程真相
做程序猿的老婆应该注意的一些事情
做程序猿的老婆应该注意的一些事情
“肮脏的”IT工作排行榜
“肮脏的”IT工作排行榜
Web开发人员为什么越来越懒了?
Web开发人员为什么越来越懒了?
总结2014中国互联网十大段子
总结2014中国互联网十大段子
什么才是优秀的用户界面设计
什么才是优秀的用户界面设计
我的丈夫是个程序员
我的丈夫是个程序员
当下全球最炙手可热的八位少年创业者
当下全球最炙手可热的八位少年创业者
Java程序员必看电影
Java程序员必看电影
Google伦敦新总部 犹如星级庄园
Google伦敦新总部 犹如星级庄园
程序猿的崛起——Growth Hacker
程序猿的崛起——Growth Hacker
团队中“技术大拿”并非越多越好
团队中“技术大拿”并非越多越好
为啥Android手机总会越用越慢?
为啥Android手机总会越用越慢?
10个帮程序员减压放松的网站
10个帮程序员减压放松的网站
写给自己也写给你 自己到底该何去何从
写给自己也写给你 自己到底该何去何从
5款最佳正则表达式编辑调试器
5款最佳正则表达式编辑调试器
2013年美国开发者薪资调查报告
2013年美国开发者薪资调查报告
Web开发者需具备的8个好习惯
Web开发者需具备的8个好习惯
要嫁就嫁程序猿—钱多话少死的早
要嫁就嫁程序猿—钱多话少死的早
代码女神横空出世
代码女神横空出世
程序员应该关注的一些事儿
程序员应该关注的一些事儿
 程序员的样子
程序员的样子
程序员眼里IE浏览器是什么样的
程序员眼里IE浏览器是什么样的
那些争议最大的编程观点
那些争议最大的编程观点
那些性感的让人尖叫的程序员
那些性感的让人尖叫的程序员
聊聊HTTPS和SSL/TLS协议
聊聊HTTPS和SSL/TLS协议
我是如何打败拖延症的
我是如何打败拖延症的
不懂技术不要对懂技术的人说这很容易实现
不懂技术不要对懂技术的人说这很容易实现
每天工作4小时的程序员
每天工作4小时的程序员
Java 与 .NET 的平台发展之争
Java 与 .NET 的平台发展之争
程序员的一天:一寸光阴一寸金
程序员的一天:一寸光阴一寸金
我跳槽是因为他们的显示器更大
我跳槽是因为他们的显示器更大
2013年中国软件开发者薪资调查报告
2013年中国软件开发者薪资调查报告
如何成为一名黑客
如何成为一名黑客
科技史上最臭名昭著的13大罪犯
科技史上最臭名昭著的13大罪犯
如何区分一个程序员是“老手“还是“新手“?
如何区分一个程序员是“老手“还是“新手“?
一个程序员的时间管理
一个程序员的时间管理
编程语言是女人
编程语言是女人
亲爱的项目经理,我恨你
亲爱的项目经理,我恨你
初级 vs 高级开发者 哪个性价比更高?
初级 vs 高级开发者 哪个性价比更高?
旅行,写作,编程
旅行,写作,编程
程序员的鄙视链
程序员的鄙视链
漫画:程序员的工作
漫画:程序员的工作
为什么程序员都是夜猫子
为什么程序员都是夜猫子
程序员和编码员之间的区别
程序员和编码员之间的区别
中美印日四国程序员比较
中美印日四国程序员比较
程序员必看的十大电影
程序员必看的十大电影
60个开发者不容错过的免费资源库
60个开发者不容错过的免费资源库
十大编程算法助程序员走上高手之路
十大编程算法助程序员走上高手之路
软件开发程序错误异常ExceptionCopyright © 2009-2015 MyException 版权所有