本篇内容主要讲解“netty服务端处理请求联合pipeline源码分析”,感兴趣的朋友不妨来看看。本文介绍的方法操作简单快捷,实用性强。下面就让小编来带大家学习“netty服务端处理请求联合pipeline源码分析”吧!
两个问题
在客户端接入的时候,
NioMessageUnsafe的
read方法中
pipeline.fireChannelRead(readBuf.get(i))为什么会调用到
ServerBootstrap的内部类
ServerBootstrapAcceptor中的
channelRead()方法。
客户端
handler是什么时候被添加的?
先分析第一个问题。回到netty处理客户端请求分析_1中服务端接收到
accpet事件后,进行读取的方法
NioMessageUnsafe.read()
NioMessageUnsafe.read()
public void read() { //必须是NioEventLoop方法调用的, 不能通过外部线程调用 assert eventLoop().inEventLoop(); //服务端channel的config final ChannelConfig config = config(); //服务端channel的pipeline final ChannelPipeline pipeline = pipeline(); //处理服务端接入的速率 final RecvByteBufAllocator.Handle allocHandle = unsafe().recvBufAllocHandle(); //设置配置 allocHandle.reset(config); boolean closed = false; Throwable exception = null; try { try { do { //创建jdk底层的channel //readBuf用于临时承载读到链接 int localRead = doReadMessages(readBuf); if (localRead == 0) { break; } if (localRead < 0) { closed = true; break; } //分配器将读到的链接进行计数 allocHandle.incMessagesRead(localRead); //连接数是否超过最大值 } while (allocHandle.continueReading()); } catch (Throwable t) { exception = t; } int size = readBuf.size(); //遍历每一条客户端连接 for (int i = 0; i < size; i ++) { readPending = false; //传递事件, 将创建NioSokectChannel进行传递 //最终会调用ServerBootstrap的内部类ServerBootstrapAcceptor的channelRead()方法 pipeline.fireChannelRead(readBuf.get(i)); } readBuf.clear(); allocHandle.readComplete(); pipeline.fireChannelReadComplete(); //代码省略 } finally { //代码省略 } }
重点看
pipeline.fireChannelRead(readBuf.get(i))
首先, 这里
pipeline是服务端
channel的
pipeline, 也就是
NioServerSocketChannel的
pipeline
我们学习过
pipeline之后, 对这种写法并不陌生, 就是传递
channelRead事件, 这里通过传递
channelRead事件走到了
ServerBootstrapAcceptor的
channelRead()方法, 说明在这步之前,
ServerBootstrapAcceptor作为一个
handler添加到了服务端
channel的
pipeline中, 那么这个
handler什么时候添加的呢?
我们回顾下第一章, 初始化NioServerSocketChannel的时候, 调用了ServerBootstrap的init方法 回顾下
ServerBootstrap.init的调用链路:
ServerBootstrap.bind(8899)--->
AbstractBootstrap.doBind(final SocketAddress localAddress)--->
AbstractBootstrap.initAndRegister()--->
ServerBootstrap.init(Channel channel)
ServerBootstrap.init(Channel channel)
void init(Channel channel) throws Exception { //获取用户定义的选项(1) final Map<ChannelOption<?>, Object> options = options0(); synchronized (options) { channel.config().setOptions(options); } //获取用户定义的属性(2) final Map<AttributeKey<?>, Object> attrs = attrs0(); synchronized (attrs) { for (Entry<AttributeKey<?>, Object> e: attrs.entrySet()) { @SuppressWarnings("unchecked") AttributeKey<Object> key = (AttributeKey<Object>) e.getKey(); channel.attr(key).set(e.getValue()); } } //获取channel的pipline(3) ChannelPipeline p = channel.pipeline(); //work线程组(4) final EventLoopGroup currentChildGroup = childGroup; //用户设置的Handler(5) final ChannelHandler currentChildHandler = childHandler; final Entry<ChannelOption<?>, Object>[] currentChildOptions; final Entry<AttributeKey<?>, Object>[] currentChildAttrs; //选项转化为Entry对象(6) synchronized (childOptions) { currentChildOptions = childOptions.entrySet().toArray(newOptionArray(childOptions.size())); } //属性转化为Entry对象(7) synchronized (childAttrs) { currentChildAttrs = childAttrs.entrySet().toArray(newAttrArray(childAttrs.size())); } //添加服务端handler(8) p.addLast(new ChannelInitializer<Channel>() { //初始化channel @Override public void initChannel(Channel ch) throws Exception { final ChannelPipeline pipeline = ch.pipeline(); ChannelHandler handler = config.handler(); if (handler != null) { pipeline.addLast(handler); } ch.eventLoop().execute(new Runnable() { @Override public void run() { pipeline.addLast(new ServerBootstrapAcceptor( currentChildGroup, currentChildHandler, currentChildOptions, currentChildAttrs)); } }); } }); }
我们重点关注第8步, 添加服务端
channel, 这里的
pipeline, 是服务服务端
channel的
pipeline, 也就是
NioServerSocketChannel绑定的
pipeline, 这里添加了一个
ChannelInitializer类型的
handler
ChannelInitializer的继承关系
public abstract class ChannelInitializer<C extends Channel> extends ChannelInboundHandlerAdapter { //省略类体 }
我们看到其继承了
ChannelInboundHandlerAdapter, 说明是一个
inbound类型的
handler
这里我们可能会想到, 添加完
handler会执行
handlerAdded, 然后在
handlerAdded方法中做了添加
ServerBootstrapAcceptor这个
handler
但是, 实际上并不是这样的, 当程序执行到这里, 并没有马上执行
handlerAdded, 我们紧跟
addLast方法
最后执行到DefualtChannelPipeline.addLast(EventExecutorGroup group, String name, ChannelHandler handler)
public final ChannelPipeline addLast(EventExecutorGroup group, String name, ChannelHandler handler) { final AbstractChannelHandlerContext newCtx; synchronized (this) { //判断handler是否被重复添加(1) checkMultiplicity(handler); //创建一个HandlerContext并添加到列表(2) newCtx = newContext(group, filterName(name, handler), handler); //添加HandlerContext(3) addLast0(newCtx); //是否已注册 if (!registered) { newCtx.setAddPending(); callHandlerCallbackLater(newCtx, true); return this; } EventExecutor executor = newCtx.executor(); if (!executor.inEventLoop()) { newCtx.setAddPending(); //回调用户事件 executor.execute(new Runnable() { @Override public void run() { callHandlerAdded0(newCtx); } }); return this; } } //回调添加事件(4) callHandlerAdded0(newCtx); return this; }
首先完成了
handler的添加, 但是并没有马上执行回调
这里我们重点关注
if (!registered)这个条件判断, 其实在注册完成,
registered会变成
true, 但是走到这一步的时候
NioServerSockeChannel并没有完成注册(可以回顾第一章看注册在哪一步), 所以会进到if里并返回自身
DefualtChannelPipeline.callHandlerCallbackLater(AbstractChannelHandlerContext ctx, boolean added)
private void callHandlerCallbackLater(AbstractChannelHandlerContext ctx, boolean added) { assert !registered; //判断是否已添加, 未添加, 进行添加, 已添加进行删除 PendingHandlerCallback task = added ? new PendingHandlerAddedTask(ctx) : new PendingHandlerRemovedTask(ctx); //获取第一个Callback任务 PendingHandlerCallback pending = pendingHandlerCallbackHead; //如果第一个Callback任务为空 if (pending == null) { //将第一个任务设置为刚创建的任务 pendingHandlerCallbackHead = task; } else { while (pending.next != null) { pending = pending.next; } pending.next = task; } }
因我们调用这个方法的时候
added传的
true, 所以
PendingHandlerCallback task赋值为
new PendingHandlerAddedTask(ctx)
PendingHandlerAddedTask这个类, 我们从名字可以看出, 这是一个
handler添加的延迟任务, 用于执行
handler延迟添加的操作, 同样也对应一个名字为
PendingHandlerRemovedTask的类, 用于执行延迟删除
handler的操作, 这两个类都继承抽象类
PendingHandlerCallback
PendingHandlerAddedTask构造方法
PendingHandlerAddedTask(AbstractChannelHandlerContext ctx) { super(ctx); }
进入
super(ctx)
PendingHandlerCallback构造方法
PendingHandlerCallback(AbstractChannelHandlerContext ctx) { this.ctx = ctx; }
在父类中, 保存了要添加的
context, 也就是
ChannelInitializer类型的包装类
回到callHandlerCallbackLater方法
PendingHandlerCallback pending = pendingHandlerCallbackHead;
这表示获取第一个
PendingHandlerCallback的任务, 其实
PendingHandlerCallback是一个单向链表, 自身维护一个
PendingHandlerCallback类型的
next, 指向下一个任务, 在
DefaultChannelPipeline这个类中, 定义了个
PendingHandlerCallback类型的引用
pendingHandlerCallbackHead, 用来指向延迟回调任务的中的第一个任务。
之后判断这个任务是为空, 如果是第一次添加
handler, 那么这里就是空, 所以将第一个任务赋值为我们刚创建的添加任务。
如果不是第一次添加
handler, 则将我们新创建的任务添加到链表的尾部, 因为这里我们是第一次添加, 所以第一个回调任务就指向了我们创建的添加
handler的任务。
完成这一系列操作之后,
addLast方法返归, 此时并没有完成添加操作。
而什么时候完成添加操作的呢?
回到在服务端channel注册时候的会走到AbstractChannel.register0方法 回顾下
AbstractChannel.register0的调用链路:
ServerBootstrap.bind(8899)--->
AbstractBootstrap.doBind(final SocketAddress localAddress)--->
AbstractBootstrap.initAndRegister()--->
config().group().register(channel)--->
SingleThreadEventLoop.register(final ChannelPromise promise)--->
AbstractChannel.register(EventLoop eventLoop, final ChannelPromise promise)--->
AbstractChannel.register0(ChannelPromise promise)
AbstractChannel.register0(ChannelPromise promise)
private void register0(ChannelPromise promise) { try { //做实际的注册(1) doRegister(); neverRegistered = false; registered = true; //触发事件(2) pipeline.invokeHandlerAddedIfNeeded(); safeSetSuccess(promise); //触发注册成功事件(3) pipeline.fireChannelRegistered(); if (isActive()) { if (firstRegistration) { //传播active事件(4) pipeline.fireChannelActive(); } else if (config().isAutoRead()) { beginRead(); } } } catch (Throwable t) { //省略代码 } }
重点关注第二步pipeline.invokeHandlerAddedIfNeeded(), 这里已经通过doRegister()方法完成了实际的注册, 我们跟到该方法中
pipeline.invokeHandlerAddedIfNeeded()
final void invokeHandlerAddedIfNeeded() { assert channel.eventLoop().inEventLoop(); if (firstRegistration) { firstRegistration = false; callHandlerAddedForAllHandlers(); } }
这里会判断是否第一次注册, 这里返回
true, 然后会执行
callHandlerAddedForAllHandlers()方法, 我们跟进去
DefaultChannelPipeline.callHandlerAddedForAllHandlers
private void callHandlerAddedForAllHandlers() { final PendingHandlerCallback pendingHandlerCallbackHead; synchronized (this) { assert !registered; registered = true; pendingHandlerCallbackHead = this.pendingHandlerCallbackHead; this.pendingHandlerCallbackHead = null; } //获取task PendingHandlerCallback task = pendingHandlerCallbackHead; while (task != null) { //执行添加handler方法 task.execute(); task = task.next; } }
这里拿到第一个延迟执行
handler添加的
task其实就是我们之前剖析过的, 延迟执行
handler添加的
task, 就是
PendingHandlerAddedTask对象
在
while循环中, 通过执行
execute()方法将
handler添加
进入PendingHandlerAddedTask.execute()
void execute() { //获取当前eventLoop线程 EventExecutor executor = ctx.executor(); //是当前执行的线程 if (executor.inEventLoop()) { callHandlerAdded0(ctx); } else { try { //添加到队列 executor.execute(this); } catch (RejectedExecutionException e) { //代码省略 } } }
再进入
callHandlerAdded0方法
callHandlerAdded0(final AbstractChannelHandlerContext ctx)
private void callHandlerAdded0(final AbstractChannelHandlerContext ctx) { try { ctx.handler().handlerAdded(ctx); ctx.setAddComplete(); } catch (Throwable t) { //省略... } }
终于在这里, 我们看到了执行回调的方法
再回到ServerBootstrap.init(Channel channel)
void init(Channel channel) throws Exception { //获取用户定义的选项(1) final Map<ChannelOption<?>, Object> options = options0(); synchronized (options) { channel.config().setOptions(options); } //获取用户定义的属性(2) final Map<AttributeKey<?>, Object> attrs = attrs0(); synchronized (attrs) { for (Entry<AttributeKey<?>, Object> e: attrs.entrySet()) { @SuppressWarnings("unchecked") AttributeKey<Object> key = (AttributeKey<Object>) e.getKey(); channel.attr(key).set(e.getValue()); } } //获取channel的pipline(3) ChannelPipeline p = channel.pipeline(); //work线程组(4) final EventLoopGroup currentChildGroup = childGroup; //用户设置的Handler(5) final ChannelHandler currentChildHandler = childHandler; final Entry<ChannelOption<?>, Object>[] currentChildOptions; final Entry<AttributeKey<?>, Object>[] currentChildAttrs; //选项转化为Entry对象(6) synchronized (childOptions) { currentChildOptions = childOptions.entrySet().toArray(newOptionArray(childOptions.size())); } //属性转化为Entry对象(7) synchronized (childAttrs) { currentChildAttrs = childAttrs.entrySet().toArray(newAttrArray(childAttrs.size())); } //添加服务端handler(8) p.addLast(new ChannelInitializer<Channel>() { //初始化channel @Override public void initChannel(Channel ch) throws Exception { final ChannelPipeline pipeline = ch.pipeline(); ChannelHandler handler = config.handler(); if (handler != null) { pipeline.addLast(handler); } ch.eventLoop().execute(new Runnable() { @Override public void run() { pipeline.addLast(new ServerBootstrapAcceptor( currentChildGroup, currentChildHandler, currentChildOptions, currentChildAttrs)); } }); } }); }
我们继续看第8步添加服务端
handler
因为这里的
handler是
ChannelInitializer, 所以完成添加之后会调用
ChannelInitializer的
handlerAdded方法
跟到
handlerAdded方法
ChannelInitializer.handlerAdded(ChannelHandlerContext ctx)
public void handlerAdded(ChannelHandlerContext ctx) throws Exception { //默认情况下, 会返回true if (ctx.channel().isRegistered()) { initChannel(ctx); } }
因为执行到这步服务端
channel已经完成注册, 所以会执行到
initChannel方法
ChannelInitializer.initChannel(ChannelHandlerContext ctx)
private boolean initChannel(ChannelHandlerContext ctx) throws Exception { //这段代码是否被执行过 if (initMap.putIfAbsent(ctx, Boolean.TRUE) == null) { try { initChannel((C) ctx.channel()); } catch (Throwable cause) { exceptionCaught(ctx, cause); } finally { //调用之后会删除当前节点 remove(ctx); } return true; } return false; }
我们关注
initChannel这个方法, 这个方法是在
ChannelInitializer的匿名内部来实现的, 这里我们注意, 在
initChannel方法执行完毕之后会调用
remove(ctx)删除当前节点
继续跟进initChannel方法
public void initChannel(Channel ch) throws Exception { final ChannelPipeline pipeline = ch.pipeline(); ChannelHandler handler = config.handler(); if (handler != null) { pipeline.addLast(handler); } ch.eventLoop().execute(new Runnable() { @Override public void run() { pipeline.addLast(new ServerBootstrapAcceptor( currentChildGroup, currentChildHandler, currentChildOptions, currentChildAttrs)); } }); }
这里首先添加用户自定义的
handler, 这里如果用户没有定义, 则添加不成功, 然后, 会调用
addLast将
ServerBootstrapAcceptor这个
handler添加了进去, 同样这个
handler也继承了
ChannelInboundHandlerAdapter, 在这个
handler中, 重写了
channelRead方法, 所以, 这就是第一个问题的答案
紧接着我们看第二个问题:
客户端handler是什么时候被添加的?
看ServerBootstrapAcceptor的channelRead方法
public void channelRead(ChannelHandlerContext ctx, Object msg) { final Channel child = (Channel) msg; //添加channelHadler, 这个channelHandler, 就是用户代码添加的ChannelInitializer child.pipeline().addLast(childHandler); //代码省略 try { //work线程注册channel childGroup.register(child).addListener(new ChannelFutureListener() { //代码省略 }); } catch (Throwable t) { forceClose(child, t); } }
这里真相可以大白了, 服务端再创建完客户端
channel之后, 将新创建的
NioSocketChannel作为参数触发
channelRead事件(可以回顾
NioMessageUnsafe.read方法, 代码这里就不贴了), 所以这里的参数
msg就是
NioSocketChannel
拿到
channel时候再将客户端的
handler添加进去, 我们回顾客户端
handler的添加过程:
.childHandler(new ChannelInitializer<SocketChannel>() { @Override public void initChannel(SocketChannel ch) { ch.pipeline().addLast(new StringDecoder()); ch.pipeline().addLast(new StringEncoder()); ch.pipeline().addLast(new ServerHandler()); } });
和服务端
channel的逻辑一样, 首先会添加
ChannelInitializer这个
handler但是没有注册所以没有执行添加
handler的回调, 将任务保存到一个延迟回调的
task中
等客户端
channel注册完毕, 会将执行添加
handler的回调, 也就是
handlerAdded方法, 在回调中执行
initChannel方法将客户端
handler添加进去, 然后删除
ChannelInitializer这个
handler
因为在服务端
channel中这块逻辑已经进行了详细的剖析, 所以这边就不在赘述, 同学们可以自己跟进去走一遍流程
这里注意, 因为每创建一个
NioSoeketChannel都会调用服务端
ServerBootstrapAcceptor的
channelRead方法, 所以这里会将每一个
NioSocketChannel的
handler进行添加。