美文网首页
ElasticSearch 请求发送与响应接收

ElasticSearch 请求发送与响应接收

作者: persisting_ | 来源:发表于2019-02-12 22:28 被阅读0次

1 概述

在文章ElasticSearch 基于Netty的通信原理我们介绍了ElasticSearch是如何使用Netty进行通信的,本文主要介绍ElasticSearch节点之间如何发送和接收请求。

2 请求发送

ElasticSearch中主要调用TransportService.sendRequest进行请求发送。其源码如下:

//TransportService
//node是接收该请求的目标节点
//action会随着请求一同发往接收节点,用于指示
//接收节点使用哪个requestHandler处理该请求
//handler是该请求响应返回之后处理ResponstHandler
public final <T extends TransportResponse> void sendRequest(final DiscoveryNode node, 
final String action,
final TransportRequest request,
final TransportRequestOptions options,
TransportResponseHandler<T> handler) {
    try {
        //首先根据节点获取到该节点的connection
        Transport.Connection connection = getConnection(node);
        //然后使用Connection进行请求发送
        sendRequest(connection, action, request, options, handler);
    } catch (NodeNotConnectedException ex) {
        // the caller might not handle this so we invoke the handler
        handler.handleException(ex);
    }
}

//注意下面函数入参handler是请求响应返回之后进行处理的
//handler
public final <T extends TransportResponse> void sendRequest(final Transport.Connection connection,            final String action,
            final TransportRequest request,
            final TransportRequestOptions options,
            TransportResponseHandler<T> handler) {
    try {
        asyncSender.sendRequest(connection, action, request, options, handler);
    } catch (NodeNotConnectedException ex) {
        //如果发送过程中发生异常,则直接调用handler
        //异常处理函数
        // the caller might not handle this so we invoke the handler
        handler.handleException(ex);
    }
}

有的请求的接收节点就是当前节点自身,所以对此要进行特殊处理,特殊处理的逻辑就在TransportService.getConnection返回的Connection实例中,如下:

//TransportService
public Transport.Connection getConnection(DiscoveryNode node) {
    //如果是接收节点就是自身,则返回Connection特殊子类
    //实现localNodeConnection
    if (isLocalNode(node)) {
        return localNodeConnection;
    } else {
        //否则根据节点获取该节点对应的Connection
        //这里实现也就是根据节点获取以前打开的Connection
        //想看具体实现,可查看
        //ConnectionManager.internalOpenConnection的调用轨迹即可
        //不再展开介绍
        return connectionManager.getConnection(node);
    }
}

localNodeConnection定义如下,是一个内部类:

//TransportService
private final Transport.Connection localNodeConnection = new Transport.Connection() {
    @Override
    public DiscoveryNode getNode() {
        return localNode;
    }

    @Override
    public void sendRequest(long requestId, String action, TransportRequest request, TransportRequestOptions options)
        throws TransportException {
        //直接调用sendLocalRequest发送给自身
        //sendLocalRequest函数实现比较简单,直接
        //调用注册的请求处理handler进行处理,然后
        //向channel返回处理结果,后面不再展开介绍
        sendLocalRequest(requestId, action, request, options);
    }

    @Override
    public void addCloseListener(ActionListener<Void> listener) {
    }

    @Override
    public boolean isClosed() {
        return false;
    }

    @Override
    public void close() {
    }
};

上面代码中asyncSenderTransportService的域,其在TransportService初始化如下:

//TransportService
public TransportService(...) {
    ...
    //发送就是调用TransportService.sendRequestInternal
    this.asyncSender = interceptor.interceptSender(this::sendRequestInternal);
    ...
}

TransportService.sendRequestInternal定义如下:

//TransportService
private <T extends TransportResponse> void sendRequestInternal(final Transport.Connection connection, 
final String action,
final TransportRequest request,
final TransportRequestOptions options,
TransportResponseHandler<T> handler) {
    if (connection == null) {
        throw new IllegalStateException("can't send request to a null connection");
    }
    DiscoveryNode node = connection.getNode();

    Supplier<ThreadContext.StoredContext> storedContextSupplier = threadPool.getThreadContext().newRestorableContext(true);
    //首先对传入的ResponseHandler进行封装
    ContextRestoreResponseHandler<T> responseHandler = new ContextRestoreResponseHandler<>(storedContextSupplier, handler);
    // TODO we can probably fold this entire request ID dance into connection.sendReqeust but it will be a bigger refactoring
    //将封装的ResponseHandler记录到responseHandlers中
    //并返回requestID,该requestID会随着请求发送到目标节点,
    //目标节点处理完之后会将此requestID和响应一起返回给
    //发送请求的节点,发送请求的节点会根据
    //此requestID查找发送时记录的ResponseHandler
    //responseHandlers采用AtomicLong维护此requestID
    final long requestId = responseHandlers.add(new Transport.ResponseContext<>(responseHandler, connection, action));
    final TimeoutHandler timeoutHandler;
    //如果该请求有timeout时间,则注册一个timeoutHandler
    if (options.timeout() != null) {
        timeoutHandler = new TimeoutHandler(requestId, connection.getNode(), action);
        responseHandler.setTimeoutHandler(timeoutHandler);
    } else {
        timeoutHandler = null;
    }
    try {
        if (lifecycle.stoppedOrClosed()) {
            // if we are not started the exception handling will remove the RequestHolder again and calls the handler to notify
            // the caller. It will only notify if the toStop code hasn't done the work yet.
            throw new TransportException("TransportService is closed stopped can't send request");
        }
        if (timeoutHandler != null) {
            assert options.timeout() != null;
            //使用延时任务实现timeoutHandler的调用
            timeoutHandler.future = threadPool.schedule(options.timeout(), ThreadPool.Names.GENERIC, timeoutHandler);
        }
        //调用connection.sendRequest发送该请求,
        //注意requestID,action都会被编码进报文发送
        //到请求处理节点
        connection.sendRequest(requestId, action, request, options); // local node optimization happens upstream
    } catch (final Exception e) {
        //异常处理
        ...
    }
}

上面还有一个需要注意的点,TransportService.responseHandlersTransportService构造函数被初始化:

//TransportService构造函数中初始化如下
responseHandlers = transport.getResponseHandlers();

所以在TransportService.sendRequestInternal发送请求前注册响应处理Handler其实是注册到transport中了,即注册到Transport.responseHandlers中。后续响应返回之后,也直接在Transport中查找responseHandler对响应进行处理。

关于请求处理节点(接收节点)如何接受请求、对报文进行解码、根据action查找requestHandler,可以参考文章ElasticSearch TransportAction类继承层次以及ElasticSearch 基于Netty的通信原理,这里不再赘述。

3 响应接收

在文章ElasticSearch 基于Netty的通信原理中我们介绍了向Netty Pipeline中注册处理报文的Handler为Netty4MessageChannelHandler

Netty4MessageChannelHandler定义如下:

final class Netty4MessageChannelHandler extends ChannelDuplexHandler {

    private final Netty4Transport transport;

    Netty4MessageChannelHandler(Netty4Transport transport) {
        this.transport = transport;
    }

    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        Transports.assertTransportThread();
        assert msg instanceof ByteBuf : "Expected message type ByteBuf, found: " + msg.getClass();

        final ByteBuf buffer = (ByteBuf) msg;
        try {
            Channel channel = ctx.channel();
            Attribute<Netty4TcpChannel> channelAttribute = channel.attr(Netty4Transport.CHANNEL_KEY);
            //transport为Netty4Transport对象实例,和
            //TransportService中持有的是同一个实例
            transport.inboundMessage(channelAttribute.get(), Netty4Utils.toBytesReference(buffer));
        } finally {
            buffer.release();
        }
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        ExceptionsHelper.maybeDieOnAnotherThread(cause);
        final Throwable unwrapped = ExceptionsHelper.unwrap(cause, ElasticsearchException.class);
        final Throwable newCause = unwrapped != null ? unwrapped : cause;
        Netty4TcpChannel tcpChannel = ctx.channel().attr(Netty4Transport.CHANNEL_KEY).get();
        if (newCause instanceof Error) {
            transport.onException(tcpChannel, new Exception(newCause));
        } else {
            transport.onException(tcpChannel, (Exception) newCause);
        }
    }
}

inboundMessageNetty4Transport父类TcpTransport中实现:

//TcpTransport
 public void inboundMessage(TcpChannel channel, BytesReference message) {
    try {
        transportLogger.logInboundMessage(channel, message);
        // Message length of 0 is a ping
        if (message.length() != 0) {
            messageReceived(message, channel);
        }
    } catch (Exception e) {
        onException(channel, e);
    }
}

public final void messageReceived(BytesReference reference, TcpChannel channel) throws IOException {
    String profileName = channel.getProfile();
    InetSocketAddress remoteAddress = channel.getRemoteAddress();
    int messageLengthBytes = reference.length();
    final int totalMessageSize = messageLengthBytes + TcpHeader.MARKER_BYTES_SIZE + TcpHeader.MESSAGE_LENGTH_SIZE;
    readBytesMetric.inc(totalMessageSize);
    // we have additional bytes to read, outside of the header
    boolean hasMessageBytesToRead = (totalMessageSize - TcpHeader.HEADER_SIZE) > 0;
    StreamInput streamIn = reference.streamInput();
    boolean success = false;
    try (ThreadContext.StoredContext tCtx = threadPool.getThreadContext().stashContext()) {
        //对于requestID,不管是请求还是响应报文,都会有此ID
        long requestId = streamIn.readLong();
        //读取状态
        byte status = streamIn.readByte();
        Version version = Version.fromId(streamIn.readInt());
        //是否进行过压缩
        if (TransportStatus.isCompress(status) && hasMessageBytesToRead && streamIn.available() > 0) {
            Compressor compressor;
            try {
                final int bytesConsumed = TcpHeader.REQUEST_ID_SIZE + TcpHeader.STATUS_SIZE + TcpHeader.VERSION_ID_SIZE;
                compressor = CompressorFactory.compressor(reference.slice(bytesConsumed, reference.length() - bytesConsumed));
            } catch (NotCompressedException ex) {
                //压缩异常处理
                ...
            }
            //解压缩
            streamIn = compressor.streamInput(streamIn);
        }
        //判断是否为握手请求
        final boolean isHandshake = TransportStatus.isHandshake(status);
        ensureVersionCompatibility(version, getCurrentVersion(), isHandshake);
        streamIn = new NamedWriteableAwareStreamInput(streamIn, namedWriteableRegistry);
        streamIn.setVersion(version);
        threadPool.getThreadContext().readHeaders(streamIn);
        threadPool.getThreadContext().putTransient("_remote_address", remoteAddress);
        //判断是否为请求
        if (TransportStatus.isRequest(status)) {
            //进行请求处理,具体的为根据action查找requestHandler
            //进行处理,然后使用channel返回响应报文
            handleRequest(channel, profileName, streamIn, requestId, messageLengthBytes, version, remoteAddress, status);
        } else {//到这里表示是响应报文
            final TransportResponseHandler<?> handler;
            //对握手响应进行处理
            if (isHandshake) {
                handler = pendingHandshakes.remove(requestId);
            } else {
                //根据响应报文中的requestID找到之前发送请求前记录的
                //封装过的responseHandler进行处理
                //找到了也会从注册的responseHandler中移除该requestID
                //对应的responseHandler
                TransportResponseHandler theHandler = responseHandlers.onResponseReceived(requestId, messageListener);
                if (theHandler == null && TransportStatus.isError(status)) {
                    handler = pendingHandshakes.remove(requestId);
                } else {
                    handler = theHandler;
                }
            }
            // ignore if its null, the service logs it
            if (handler != null) {
                if (TransportStatus.isError(status)) {
                    handlerResponseError(streamIn, handler);
                } else {
                    handleResponse(remoteAddress, streamIn, handler);
                }
                // Check the entire message has been read
                final int nextByte = streamIn.read();
                // calling read() is useful to make sure the message is fully read, even if there is an EOS marker
                if (nextByte != -1) {
                    throw new IllegalStateException("Message not fully read (response) for requestId [" + requestId + "], handler ["
                        + handler + "], error [" + TransportStatus.isError(status) + "]; resetting");
                }
            }
        }
        success = true;
    } finally {
        if (success) {
            IOUtils.close(streamIn);
        } else {
            IOUtils.closeWhileHandlingException(streamIn);
        }
    }
}

相关文章

  • ElasticSearch 请求发送与响应接收

    1 概述 2 请求发送 3 响应接收 1 概述 在文章ElasticSearch 基于Netty的通信原理我们介绍...

  • Requests

    Requests库 目录一、Requests基础二、发送请求与接收响应(基本GET请求)三、发送请求与接收响应(基...

  • Postman接口测试要点+自动化测试实例

    一、postman工作原理 1.发请求 2.接受请求响应 3.接收响应显示到页面 二、测试的质量特性 做到说出大点...

  • Vapor文档学习廿八: HTTP -Response

    在接收的请求后通常要返回Response作为响应。我们做外部请求时也要接收响应对象。 Status http请求状...

  • 前端知识之HTML内容

    HTML介绍 Web服务本质 浏览器发请求 --> HTTP协议 --> 服务端接收请求 --> 服务端返回响应 ...

  • HTML基础

    HTML协议简介 Web服务本质浏览器发请求 --> HTTP协议 --> 服务端接收请求 --> 服务端返回响应...

  • HTML5布局之路

    Web服务本质 浏览器发请求 --> HTTP协议 --> 服务端接收请求 --> 服务端返回响应 --> 服务端...

  • 基本知识

    1、服务器本质 浏览器发请求 --> HTTP协议 --> 服务端接收请求 --> 服务端返回响应 --> 服务端...

  • Ajax原理

    (1)创建对象 (2)打开请求 (3)发送请求 (4)接收响应

  • HTTP协议

    一、HTTP协议 二、请求流程 搜集数据 生成http请求报文 发送请求报文 接收响应报文 解析响应报文 展现结果

网友评论

      本文标题:ElasticSearch 请求发送与响应接收

      本文链接:https://www.haomeiwen.com/subject/hflleqtx.html