美文网首页
Pipeline源码浅析之事件传播

Pipeline源码浅析之事件传播

作者: zZeroZz | 来源:发表于2020-05-31 19:35 被阅读0次

Inbound

1.AbstractBootstrap

public ChannelFuture bind(int inetPort)...

 private static void doBind0(final ChannelFuture regFuture, final Channel channel, final SocketAddress localAddress, final ChannelPromise promise) {
        channel.eventLoop().execute(new Runnable() {
            public void run() {
                if (regFuture.isSuccess()) {
                    // pipeline 
                    channel.bind(localAddress, promise).addListener(ChannelFutureListener.CLOSE_ON_FAILURE);
                } else {
                    promise.setFailure(regFuture.cause());
                }

            }
        });
    }

2.AbstractChannel

   public ChannelFuture bind(SocketAddress localAddress, ChannelPromise promise) {
        return this.pipeline.bind(localAddress, promise);
    }

3.DefaultChannelPipeline

 public final ChannelFuture bind(SocketAddress localAddress, ChannelPromise promise) {
        // inbound 事件,从 TailContext.bind ->....各种 ChannelHandlerInboundAdapter->..HeadContext
      // TailContext
        return this.tail.bind(localAddress, promise);
    }

4.AbstractChannelHandlerContext

public ChannelFuture bind(final SocketAddress localAddress, final ChannelPromise promise) {
        if (localAddress == null) {
            throw new NullPointerException("localAddress");
        } else if (!this.validatePromise(promise, false)) {
            return promise;
        } else {
            // 查找下一个为outbound的handler
            final AbstractChannelHandlerContext next = this.findContextOutbound();
            EventExecutor executor = next.executor();
            if (executor.inEventLoop()) {
                // 调用invokeBind
                next.invokeBind(localAddress, promise);
            } else {
                safeExecute(executor, new Runnable() {
                    public void run() {
                        next.invokeBind(localAddress, promise);
                    }
                }, promise, (Object)null);
            }

            return promise;
        }
    }

 private void invokeBind(SocketAddress localAddress, ChannelPromise promise) {
        if (this.invokeHandler()) {
            try {
               // 默认调用 ChannelOutboundHandlerAdapter,如果自定义的ChannelOutboundHandlerAdapter的子类,重写了bind等inbound事件,则调用子类重写的bind方法,但是子类的bind方法中,一定要再调用  ctx.bind(localAddress, promise),才会继续传递,否则会终止。
如果没有重写inbound,则会默认调用 AbstractChannelHandlerContext的inbound方法,这样就做到了事件传递。
                ((ChannelOutboundHandler)this.handler()).bind(this, localAddress, promise);
            } catch (Throwable var4) {
                notifyOutboundHandlerException(var4, promise);
            }
        } else {
            this.bind(localAddress, promise);
        }
    }

5.DefaultChannelPipeline.HeadContext

public void bind(ChannelHandlerContext ctx, SocketAddress localAddress, ChannelPromise promise) throws Exception {
            this.unsafe.bind(localAddress, promise);
        }

5. AbstractChannel.AbstractUnsafe

public final void bind(SocketAddress localAddress, ChannelPromise promise) {
            this.assertEventLoop();
            if (promise.setUncancellable() && this.ensureOpen(promise)) {
                if (Boolean.TRUE.equals(AbstractChannel.this.config().getOption(ChannelOption.SO_BROADCAST)) && localAddress instanceof InetSocketAddress && !((InetSocketAddress)localAddress).getAddress().isAnyLocalAddress() && !PlatformDependent.isWindows() && !PlatformDependent.isRoot()) {
                    AbstractChannel.logger.warn("A non-root user can't receive a broadcast packet if the socket is not bound to a wildcard address; binding to a non-wildcard address (" + localAddress + ") anyway as requested.");
                }

                boolean wasActive = AbstractChannel.this.isActive();

                try {
// 至此,时间终止,然后通过Promise的监听器,回调绑定事件。
                    AbstractChannel.this.doBind(localAddress);
                } catch (Throwable var5) {
                    this.safeSetFailure(promise, var5);
                    this.closeIfClosed();
                    return;
                }

                if (!wasActive && AbstractChannel.this.isActive()) {
                    this.invokeLater(new Runnable() {
                        public void run() {
                            AbstractChannel.this.pipeline.fireChannelActive();
                        }
                    });
                }
                this.safeSetSuccess(promise);
            }
        }

相关文章

网友评论

      本文标题:Pipeline源码浅析之事件传播

      本文链接:https://www.haomeiwen.com/subject/hsurzhtx.html