美文网首页
Netty源码笔记(五)消息发送之writeAndFlush

Netty源码笔记(五)消息发送之writeAndFlush

作者: 李亚林1990 | 来源:发表于2019-01-22 20:54 被阅读28次

在Netty学习笔记(二)中我们介绍了服务端消息接收的处理流程,通过pipeline.fireChannelRead来回调pipeline处理器链中每一个inboundHandler的channelRead方法。
本篇我们将简单分析下消息发送的处理流程。

通常我们会在最后一个inboundHandler的channelRead方法中完成业务逻辑的处理,并将响应消息体调用ChannelHandlerContext.writeAndFlush(response)发送回远端。
相关类的继承结构:


image.png

结合Netty学习笔记(一),pipeline.addLast(ChannelHandler... handlers)最终会通过new DefaultChannelHandlerContext(this, childExecutor(group), name, handler);来对每一个处理器进行包装,并添加到pipeline的处理器链中。
详细代码逻辑:

    //AbstractChannelHandlerContext.writeAndFlush
    @Override
    public ChannelFuture writeAndFlush(Object msg) {
        return writeAndFlush(msg, newPromise());
    }

    @Override
    public ChannelFuture writeAndFlush(Object msg, ChannelPromise promise) {
        write(msg, true, promise);

        return promise;
    }

    private void write(Object msg, boolean flush, ChannelPromise promise) {
        //此处为核心代码
        AbstractChannelHandlerContext next = findContextOutbound();
        final Object m = pipeline.touch(msg, next);
        EventExecutor executor = next.executor();
        if (executor.inEventLoop()) {
            if (flush) {
                next.invokeWriteAndFlush(m, promise);
            } else {
                next.invokeWrite(m, promise);
            }
        } else {
            AbstractWriteTask task;
            if (flush) {
                task = WriteAndFlushTask.newInstance(next, m, promise);
            }  else {
                task = WriteTask.newInstance(next, m, promise);
            }
            safeExecute(executor, task, promise, m);
        }
    }
    //outbound调用顺序
    private AbstractChannelHandlerContext findContextOutbound() {
        AbstractChannelHandlerContext ctx = this;
        do {
            ctx = ctx.prev;
        } while (!ctx.outbound);
        return ctx;
    }

关于findContextOutbound在Netty学习笔记(二)中已涉及,与inbound的执行顺序相反,依次调用每一个执行器的write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise),最终执行到pipeline处理器器链的HeadContext处理器。

        //HeadContext.write
        @Override
        public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
            //在此处调用unsafe实现将消息发送到远端
            unsafe.write(msg, promise);
        }

消息发送逻辑比较简单,本篇到此结束。

相关文章

网友评论

      本文标题:Netty源码笔记(五)消息发送之writeAndFlush

      本文链接:https://www.haomeiwen.com/subject/icmrjqtx.html