1 概述
在文章ElasticSearch 基于Netty的通信原理我们介绍了ElasticSearch是如何使用Netty进行通信的,本文主要介绍ElasticSearch节点之间如何发送和接收请求。
2 请求发送
ElasticSearch中主要调用TransportService.sendRequest进行请求发送。其源码如下:
//TransportService
//node是接收该请求的目标节点
//action会随着请求一同发往接收节点,用于指示
//接收节点使用哪个requestHandler处理该请求
//handler是该请求响应返回之后处理ResponstHandler
public final <T extends TransportResponse> void sendRequest(final DiscoveryNode node,
final String action,
final TransportRequest request,
final TransportRequestOptions options,
TransportResponseHandler<T> handler) {
try {
//首先根据节点获取到该节点的connection
Transport.Connection connection = getConnection(node);
//然后使用Connection进行请求发送
sendRequest(connection, action, request, options, handler);
} catch (NodeNotConnectedException ex) {
// the caller might not handle this so we invoke the handler
handler.handleException(ex);
}
}
//注意下面函数入参handler是请求响应返回之后进行处理的
//handler
public final <T extends TransportResponse> void sendRequest(final Transport.Connection connection, final String action,
final TransportRequest request,
final TransportRequestOptions options,
TransportResponseHandler<T> handler) {
try {
asyncSender.sendRequest(connection, action, request, options, handler);
} catch (NodeNotConnectedException ex) {
//如果发送过程中发生异常,则直接调用handler
//异常处理函数
// the caller might not handle this so we invoke the handler
handler.handleException(ex);
}
}
有的请求的接收节点就是当前节点自身,所以对此要进行特殊处理,特殊处理的逻辑就在TransportService.getConnection返回的Connection实例中,如下:
//TransportService
public Transport.Connection getConnection(DiscoveryNode node) {
//如果是接收节点就是自身,则返回Connection特殊子类
//实现localNodeConnection
if (isLocalNode(node)) {
return localNodeConnection;
} else {
//否则根据节点获取该节点对应的Connection
//这里实现也就是根据节点获取以前打开的Connection
//想看具体实现,可查看
//ConnectionManager.internalOpenConnection的调用轨迹即可
//不再展开介绍
return connectionManager.getConnection(node);
}
}
localNodeConnection定义如下,是一个内部类:
//TransportService
private final Transport.Connection localNodeConnection = new Transport.Connection() {
@Override
public DiscoveryNode getNode() {
return localNode;
}
@Override
public void sendRequest(long requestId, String action, TransportRequest request, TransportRequestOptions options)
throws TransportException {
//直接调用sendLocalRequest发送给自身
//sendLocalRequest函数实现比较简单,直接
//调用注册的请求处理handler进行处理,然后
//向channel返回处理结果,后面不再展开介绍
sendLocalRequest(requestId, action, request, options);
}
@Override
public void addCloseListener(ActionListener<Void> listener) {
}
@Override
public boolean isClosed() {
return false;
}
@Override
public void close() {
}
};
上面代码中asyncSender是TransportService的域,其在TransportService初始化如下:
//TransportService
public TransportService(...) {
...
//发送就是调用TransportService.sendRequestInternal
this.asyncSender = interceptor.interceptSender(this::sendRequestInternal);
...
}
TransportService.sendRequestInternal定义如下:
//TransportService
private <T extends TransportResponse> void sendRequestInternal(final Transport.Connection connection,
final String action,
final TransportRequest request,
final TransportRequestOptions options,
TransportResponseHandler<T> handler) {
if (connection == null) {
throw new IllegalStateException("can't send request to a null connection");
}
DiscoveryNode node = connection.getNode();
Supplier<ThreadContext.StoredContext> storedContextSupplier = threadPool.getThreadContext().newRestorableContext(true);
//首先对传入的ResponseHandler进行封装
ContextRestoreResponseHandler<T> responseHandler = new ContextRestoreResponseHandler<>(storedContextSupplier, handler);
// TODO we can probably fold this entire request ID dance into connection.sendReqeust but it will be a bigger refactoring
//将封装的ResponseHandler记录到responseHandlers中
//并返回requestID,该requestID会随着请求发送到目标节点,
//目标节点处理完之后会将此requestID和响应一起返回给
//发送请求的节点,发送请求的节点会根据
//此requestID查找发送时记录的ResponseHandler
//responseHandlers采用AtomicLong维护此requestID
final long requestId = responseHandlers.add(new Transport.ResponseContext<>(responseHandler, connection, action));
final TimeoutHandler timeoutHandler;
//如果该请求有timeout时间,则注册一个timeoutHandler
if (options.timeout() != null) {
timeoutHandler = new TimeoutHandler(requestId, connection.getNode(), action);
responseHandler.setTimeoutHandler(timeoutHandler);
} else {
timeoutHandler = null;
}
try {
if (lifecycle.stoppedOrClosed()) {
// if we are not started the exception handling will remove the RequestHolder again and calls the handler to notify
// the caller. It will only notify if the toStop code hasn't done the work yet.
throw new TransportException("TransportService is closed stopped can't send request");
}
if (timeoutHandler != null) {
assert options.timeout() != null;
//使用延时任务实现timeoutHandler的调用
timeoutHandler.future = threadPool.schedule(options.timeout(), ThreadPool.Names.GENERIC, timeoutHandler);
}
//调用connection.sendRequest发送该请求,
//注意requestID,action都会被编码进报文发送
//到请求处理节点
connection.sendRequest(requestId, action, request, options); // local node optimization happens upstream
} catch (final Exception e) {
//异常处理
...
}
}
上面还有一个需要注意的点,TransportService.responseHandlers在TransportService构造函数被初始化:
//TransportService构造函数中初始化如下
responseHandlers = transport.getResponseHandlers();
所以在TransportService.sendRequestInternal发送请求前注册响应处理Handler其实是注册到transport中了,即注册到Transport.responseHandlers中。后续响应返回之后,也直接在Transport中查找responseHandler对响应进行处理。
关于请求处理节点(接收节点)如何接受请求、对报文进行解码、根据action查找requestHandler,可以参考文章ElasticSearch TransportAction类继承层次以及ElasticSearch 基于Netty的通信原理,这里不再赘述。
3 响应接收
在文章ElasticSearch 基于Netty的通信原理中我们介绍了向Netty Pipeline中注册处理报文的Handler为Netty4MessageChannelHandler
Netty4MessageChannelHandler定义如下:
final class Netty4MessageChannelHandler extends ChannelDuplexHandler {
private final Netty4Transport transport;
Netty4MessageChannelHandler(Netty4Transport transport) {
this.transport = transport;
}
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
Transports.assertTransportThread();
assert msg instanceof ByteBuf : "Expected message type ByteBuf, found: " + msg.getClass();
final ByteBuf buffer = (ByteBuf) msg;
try {
Channel channel = ctx.channel();
Attribute<Netty4TcpChannel> channelAttribute = channel.attr(Netty4Transport.CHANNEL_KEY);
//transport为Netty4Transport对象实例,和
//TransportService中持有的是同一个实例
transport.inboundMessage(channelAttribute.get(), Netty4Utils.toBytesReference(buffer));
} finally {
buffer.release();
}
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
ExceptionsHelper.maybeDieOnAnotherThread(cause);
final Throwable unwrapped = ExceptionsHelper.unwrap(cause, ElasticsearchException.class);
final Throwable newCause = unwrapped != null ? unwrapped : cause;
Netty4TcpChannel tcpChannel = ctx.channel().attr(Netty4Transport.CHANNEL_KEY).get();
if (newCause instanceof Error) {
transport.onException(tcpChannel, new Exception(newCause));
} else {
transport.onException(tcpChannel, (Exception) newCause);
}
}
}
inboundMessage在Netty4Transport父类TcpTransport中实现:
//TcpTransport
public void inboundMessage(TcpChannel channel, BytesReference message) {
try {
transportLogger.logInboundMessage(channel, message);
// Message length of 0 is a ping
if (message.length() != 0) {
messageReceived(message, channel);
}
} catch (Exception e) {
onException(channel, e);
}
}
public final void messageReceived(BytesReference reference, TcpChannel channel) throws IOException {
String profileName = channel.getProfile();
InetSocketAddress remoteAddress = channel.getRemoteAddress();
int messageLengthBytes = reference.length();
final int totalMessageSize = messageLengthBytes + TcpHeader.MARKER_BYTES_SIZE + TcpHeader.MESSAGE_LENGTH_SIZE;
readBytesMetric.inc(totalMessageSize);
// we have additional bytes to read, outside of the header
boolean hasMessageBytesToRead = (totalMessageSize - TcpHeader.HEADER_SIZE) > 0;
StreamInput streamIn = reference.streamInput();
boolean success = false;
try (ThreadContext.StoredContext tCtx = threadPool.getThreadContext().stashContext()) {
//对于requestID,不管是请求还是响应报文,都会有此ID
long requestId = streamIn.readLong();
//读取状态
byte status = streamIn.readByte();
Version version = Version.fromId(streamIn.readInt());
//是否进行过压缩
if (TransportStatus.isCompress(status) && hasMessageBytesToRead && streamIn.available() > 0) {
Compressor compressor;
try {
final int bytesConsumed = TcpHeader.REQUEST_ID_SIZE + TcpHeader.STATUS_SIZE + TcpHeader.VERSION_ID_SIZE;
compressor = CompressorFactory.compressor(reference.slice(bytesConsumed, reference.length() - bytesConsumed));
} catch (NotCompressedException ex) {
//压缩异常处理
...
}
//解压缩
streamIn = compressor.streamInput(streamIn);
}
//判断是否为握手请求
final boolean isHandshake = TransportStatus.isHandshake(status);
ensureVersionCompatibility(version, getCurrentVersion(), isHandshake);
streamIn = new NamedWriteableAwareStreamInput(streamIn, namedWriteableRegistry);
streamIn.setVersion(version);
threadPool.getThreadContext().readHeaders(streamIn);
threadPool.getThreadContext().putTransient("_remote_address", remoteAddress);
//判断是否为请求
if (TransportStatus.isRequest(status)) {
//进行请求处理,具体的为根据action查找requestHandler
//进行处理,然后使用channel返回响应报文
handleRequest(channel, profileName, streamIn, requestId, messageLengthBytes, version, remoteAddress, status);
} else {//到这里表示是响应报文
final TransportResponseHandler<?> handler;
//对握手响应进行处理
if (isHandshake) {
handler = pendingHandshakes.remove(requestId);
} else {
//根据响应报文中的requestID找到之前发送请求前记录的
//封装过的responseHandler进行处理
//找到了也会从注册的responseHandler中移除该requestID
//对应的responseHandler
TransportResponseHandler theHandler = responseHandlers.onResponseReceived(requestId, messageListener);
if (theHandler == null && TransportStatus.isError(status)) {
handler = pendingHandshakes.remove(requestId);
} else {
handler = theHandler;
}
}
// ignore if its null, the service logs it
if (handler != null) {
if (TransportStatus.isError(status)) {
handlerResponseError(streamIn, handler);
} else {
handleResponse(remoteAddress, streamIn, handler);
}
// Check the entire message has been read
final int nextByte = streamIn.read();
// calling read() is useful to make sure the message is fully read, even if there is an EOS marker
if (nextByte != -1) {
throw new IllegalStateException("Message not fully read (response) for requestId [" + requestId + "], handler ["
+ handler + "], error [" + TransportStatus.isError(status) + "]; resetting");
}
}
}
success = true;
} finally {
if (success) {
IOUtils.close(streamIn);
} else {
IOUtils.closeWhileHandlingException(streamIn);
}
}
}











网友评论