Netty源码解析之ChannelPipeline
阅读须知
- Netty版本:4.1.14.Final
- 文章中使用/* */注释的方法会做深入分析
正文
在之前的文章中,我们经常能看到ChannelPipeline的身影,本篇文章我们就来详细分析一下ChannelPipeline。下面为ChannelPipeline的注释介绍:
ChannelHandler的列表,用于处理或拦截Channel的入站事件和出站操作。ChannelPipeline实现了一种高级形式的拦截过滤器模式,使用户可以完全控制事件的处理方式以及pipeline中的ChannelHandler如何交互。
下图描述了事件如何在pipeline中流动:
ChannelPipeline的默认实现是DefaultChannelPipeline,在分析ServerSocketChannel相关源码的文章中,在实例化
NioServerSocketChannel时我们看到了DefaultChannelPipeline的实例化过程,在我们给出的示例demo中,我们为pipeline添加ChannelHandler时采用如下方式:
protected void initChannel(Channel ch) throws Exception {
ch.pipeline().addLast(new LineBasedFrameDecoder(1024));
ch.pipeline().addLast(new StringDecoder());
// ...
}
这里就是调用ChannelPipeline的addLast方法,我们来看实现:
DefaultChannelPipeline:
public final ChannelPipeline addLast(ChannelHandler... handlers) {
return addLast(null, handlers);
}
addLast的方法的作用是在此pipeline的最后位置插入ChannelHandler。
DefaultChannelPipeline:
public final ChannelPipeline addLast(EventExecutorGroup executor, ChannelHandler... handlers) {
if (handlers == null) {
throw new NullPointerException("handlers");
}
for (ChannelHandler h: handlers) {
if (h == null) {
break;
}
addLast(executor, null, h);
}
return this;
}
DefaultChannelPipeline:
public final ChannelPipeline addLast(EventExecutorGroup group, String name, ChannelHandler handler) {
final AbstractChannelHandlerContext newCtx;
synchronized (this) {
checkMultiplicity(handler);
// filterName:如果指定的Handler的名称,也就是name不为空,则检查是否有重复的名称
// 如果没有给定Handler的名称,则生成Handler的名称
// 新建DefaultChannelHandlerContext作为ChannelHandler和ChannelPipeline交互的上下文
newCtx = newContext(group, filterName(name, handler), handler);
/* 在此pipeline的最后位置插入ChannelHandler */
addLast0(newCtx);
// 如果Channel尚未在event loop中注册
// 在这种情况下,我们将上下文添加到pipeline,并添加一个任务
// 一旦注册Channel,将调用ChannelHandler.handlerAdded()
if (!registered) {
newCtx.setAddPending();
callHandlerCallbackLater(newCtx, true);
return this;
}
EventExecutor executor = newCtx.executor();
if (!executor.inEventLoop()) {
newCtx.setAddPending();
executor.execute(new Runnable() {
@Override
public void run() {
/* 将ChannelHandler添加到实际上下文并准备好处理事件之后调用其HandlerAdded方法 */
callHandlerAdded0(newCtx);
}
});
return this;
}
}
/* 将ChannelHandler添加到实际上下文并准备好处理事件之后调用其HandlerAdded方法 */
callHandlerAdded0(newCtx);
return this;
}
DefaultChannelPipeline:
private void addLast0(AbstractChannelHandlerContext newCtx) {
AbstractChannelHandlerContext prev = tail.prev;
newCtx.prev = prev;
newCtx.next = tail;
prev.next = newCtx;
tail.prev = newCtx;
}
这里就是将新加入的上下文对象插入到tail和tail.prev中间的位置。
DefaultChannelPipeline:
private void callHandlerAdded0(final AbstractChannelHandlerContext ctx) {
try {
// 调用Handler的handlerAdded方法
ctx.handler().handlerAdded(ctx);
ctx.setAddComplete();
} catch (Throwable t) {
boolean removed = false;
try {
// 异常将当前上下文对象从pipeline中移除
remove0(ctx);
try {
// 调用Handler的handlerRemoved方法
ctx.handler().handlerRemoved(ctx);
} finally {
ctx.setRemoved(); // 设置handler的状态为已移除
}
removed = true;
} catch (Throwable t2) {
if (logger.isWarnEnabled()) {
logger.warn("Failed to remove a handler: " + ctx.name(), t2);
}
}
if (removed) {
// 调用ChannelInboundHandler的exceptionCaught方法
fireExceptionCaught(new ChannelPipelineException(
ctx.handler().getClass().getName() +
".handlerAdded() has thrown an exception; removed.", t));
} else {
// 调用ChannelInboundHandler的exceptionCaught方法
fireExceptionCaught(new ChannelPipelineException(
ctx.handler().getClass().getName() +
".handlerAdded() has thrown an exception; also failed to remove.", t));
}
}
}
入站事件
ChannelPipeline接口中以fire为前缀的方法都是入站事件,如下:
这些方法都来自于ChannelInboundInvoker,最终会调用ChannelInboundHandler的相关方法对事件进行处理。
出站操作
一些来自于ChannelOutboundInvoker接口的操作如bind、connect、disconnect、close、deregister、read、write、flush等都属于出站操作,在这些操作被调用后,会通知ChannelOutboundHandler的相关方法。
到这里ChannelPipeline的源码分析就完成了。