美文网首页
brpc之消息处理流程

brpc之消息处理流程

作者: fooboo | 来源:发表于2019-11-10 23:31 被阅读0次

中间大约有段时间没有继续分析brpc源码,因为有些其他事情,这里分析下当client发送消息后,server收到请求分别是如何处理的,以及client收到server响应后,是如何处理的,可能这部分内容不是很多,还是主要分析下思想,可学习下,不涉及socket和网络这块实现,server的启动过程以及需要做哪些工作等流程,后期会单独分析下。

这里以baidu协议为分析情况。

初始化

在client/server启动时,会有一个函数被调用GlobalInitializeOrDie,它的主要作用如声明,初始化全局相关的功能,当然这里使用pthread_once只保证一次。

609 void GlobalInitializeOrDie() {
610     if (pthread_once(&register_extensions_once,
611                      GlobalInitializeOrDieImpl) != 0) {
612         //more code...
614     }
615 } 

其中有信号相关的处理,日志,openssl相关的,名字服务,负载均衡,压缩句柄,处理协议等。

 67 struct Protocol {
 82     typedef ParseResult (*Parse)(butil::IOBuf* source, Socket *socket,
 83                                  bool read_eof, const void *arg);
 84     Parse parse;
 91     typedef void (*SerializeRequest)(
 92         butil::IOBuf* request_buf,
 93         Controller* cntl,
 94         const google::protobuf::Message* request);
 95     SerializeRequest serialize_request;
 96 
102     typedef void (*PackRequest)(
103         butil::IOBuf* iobuf_out,
104         SocketMessage** user_message_out,
105         uint64_t correlation_id,
106         const google::protobuf::MethodDescriptor* method,
107         Controller* controller,
108         const butil::IOBuf& request_buf,
109         const Authenticator* auth);
110     PackRequest pack_request;
111 
118     typedef void (*ProcessRequest)(InputMessageBase* msg);
119     ProcessRequest process_request;
127     typedef void (*ProcessResponse)(InputMessageBase* msg);
128     ProcessResponse process_response;
129 
135     typedef bool (*Verify)(const InputMessageBase* msg);
136     Verify verify;
137     //more code...

以上每个定义协议的处理接口类,各功能如声明和注释,后面分析具体代码时再说。

573     std::vector<Protocol> protocols;
574     ListProtocols(&protocols);
575     for (size_t i = 0; i < protocols.size(); ++i) {
576         if (protocols[i].process_response) {
577             InputMessageHandler handler;
578             // `process_response' is required at client side
579             handler.parse = protocols[i].parse;
580             handler.process = protocols[i].process_response;
581             // No need to verify at client side
582             handler.verify = NULL;
583             handler.arg = NULL;
584             handler.name = protocols[i].name;
585             if (get_or_new_client_side_messenger()->AddHandler(handler) != 0) {
586                 exit(1);
587             }
588         }
589     }

 561     InputMessageHandler handler;
 562     std::vector<Protocol> protocols;
 563     ListProtocols(&protocols);
 564     for (size_t i = 0; i < protocols.size(); ++i) {
 565         if (protocols[i].process_request == NULL) {
 566             // The protocol does not support server-side.
 567             continue;
 568         }
 569         //more code...
 576         // `process_request' is required at server side
 577         handler.parse = protocols[i].parse;
 578         handler.process = protocols[i].process_request;
 579         handler.verify = protocols[i].verify;
 580         handler.arg = this;
 581         handler.name = protocols[i].name;
 582         if (acceptor->AddHandler(handler) != 0) {
 585             delete acceptor;
 586             return NULL;
 587         }
 588     }

以上是client/server两边的注册消息处理句柄,根据process_response来决定是否要addhandler,对于client是要有的,而server是process_request

 31 struct InputMessageHandler {
 45     typedef ParseResult (*Parse)(butil::IOBuf* source, Socket *socket,
 46                                  bool read_eof, const void *arg);
 47     Parse parse;
 54     typedef void (*Process)(InputMessageBase* msg);
 55     Process process;
 56 
 61     typedef bool (*Verify)(const InputMessageBase* msg);
 62     Verify verify;
 63 
 64     //more code...
 69 };
352 int InputMessenger::AddHandler(const InputMessageHandler& handler) {
358     BAIDU_SCOPED_LOCK(_add_handler_mutex);
359     if (NULL == _handlers) {
360         _handlers = new (std::nothrow) InputMessageHandler[_capacity];
361         if (NULL == _handlers) {
363             return -1;
364         }
365         memset(_handlers, 0, sizeof(*_handlers) * _capacity);
366         _non_protocol = false;
367     }
382     if (_handlers[index].parse == NULL) {
383         // The same protocol might be added more than twice
384         _handlers[index] = handler;
385     } else if (_handlers[index].parse != handler.parse
386                || _handlers[index].process != handler.process) {
387         //more code...
390     }

以上是添加handler至InputMessenger中,省略掉一些次要代码。

client发起请求

当client发起一次rpc调用时,会调用_serialize_request(&cntl->_request_buf, cntl, request)进行序列化body部分:

126 void SerializeRequestDefault(butil::IOBuf* buf,
127                              Controller* cntl,
128                              const google::protobuf::Message* request) {
129     // Check sanity of request.
130     if (!request) {
131         //error
132     }
133     if (request->GetDescriptor() == SerializedRequest::descriptor()) {
134         buf->append(((SerializedRequest*)request)->serialized_data());
135         return;
136     }
137     if (!request->IsInitialized()) {
138         //error
141     }
142     if (!SerializeAsCompressedData(*request, buf, cntl->request_compress_type())) {//压缩类型
143         //error...
146     }
147 }

 94 bool SerializeAsCompressedData(const google::protobuf::Message& msg,
 95                                butil::IOBuf* buf, CompressType compress_type) {
 96     if (compress_type == COMPRESS_TYPE_NONE) {
 97         butil::IOBufAsZeroCopyOutputStream wrapper(buf);
 98         return msg.SerializeToZeroCopyStream(&wrapper);
 99     }
100     const CompressHandler* handler = FindCompressHandler(compress_type);
101     if (NULL != handler) {
102         return handler->Compress(msg, buf);
103     }
104     return false;
105 }

具体的压缩实现可以参考源码。奇怪的是,这里并没有发现明显的在rpc请求中设置协议,那么到server端是怎么正确解析出请求并调用正确的handler处理的呢?后面会继续分析。

接着IssueRPC,里面有对请求_pack_request,即PackRpcRequest:

624 void PackRpcRequest(butil::IOBuf* req_buf,
625                     SocketMessage**, 
626                     uint64_t correlation_id,
627                     const google::protobuf::MethodDescriptor* method,
628                     Controller* cntl,
629                     const butil::IOBuf& request_body,
630                     const Authenticator* auth) {
631     RpcMeta meta;
638     ControllerPrivateAccessor accessor(cntl);
639     RpcRequestMeta* request_meta = meta.mutable_request();
640     if (method) {
641         request_meta->set_service_name(FLAGS_baidu_protocol_use_fullname ?
642                                        method->service()->full_name() :
643                                        method->service()->name());
644         request_meta->set_method_name(method->name());
645         meta.set_compress_type(cntl->request_compress_type());
646     }
657     meta.set_correlation_id(correlation_id);
669     // Don't use res->ByteSize() since it may be compressed
670     const size_t req_size = request_body.length();
671     const size_t attached_size = cntl->request_attachment().length();
672     if (attached_size) {
673         meta.set_attachment_size(attached_size);
674     }
681 
682     SerializeRpcHeaderAndMeta(req_buf, meta, req_size + attached_size);
683     req_buf->append(request_body);
684     if (attached_size) {
685         req_buf->append(cntl->request_attachment());
686     }
687 }

以上大概实现代码,会设置meta上服务名,方法名,压缩类型,以及关联到哪个bthread,接着序列化头部和meta部分SerializeRpcHeaderAndMeta,格式12-byte header [PRPC][body_size][meta_size]。这样,client要做的事情基本完成,接着发送数据Write,这块后面再分析,之后便Join(correlation_id)。

 26 message RpcMeta {
 27     optional RpcRequestMeta request = 1;
 28     optional RpcResponseMeta response = 2;
 29     optional int32 compress_type = 3;
 30     optional int64 correlation_id = 4;
 31     optional int32 attachment_size = 5;
 35 }

 37 message RpcRequestMeta {
 38     required string service_name = 1;
 39     required string method_name = 2;
 40     optional int64 log_id = 3;
 44 }

server收到请求

在server这边,对于新连接进来调用的是OnNewConnections,而新连接上有数据时调用OnNewMessages,这里是个大循环,把注释贴上来说明设计思想:

169 void InputMessenger::OnNewMessages(Socket* m) {
170     // Notes:
171     // - If the socket has only one message, the message will be parsed and
172     //   processed in this bthread. nova-pbrpc and http works in this way.
173     // - If the socket has several messages, all messages will be parsed (
174     //   meaning cutting from butil::IOBuf. serializing from protobuf is part of
175     //   "process") in this bthread. All messages except the last one will be
176     //   processed in separate bthreads. To minimize the overhead, scheduling
177     //   is batched(notice the BTHREAD_NOSIGNAL and bthread_flush).
178     // - Verify will always be called in this bthread at most once and before
179     //   any process.
180     InputMessenger* messenger = static_cast<InputMessenger*>(m->user());
181     const InputMessageHandler* handlers = messenger->_handlers;
182     int progress = Socket::PROGRESS_INIT;
183 
184     // Notice that all *return* no matter successful or not will run last
185     // message, even if the socket is about to be closed. This should be
186     // OK in most cases.
187     std::unique_ptr<InputMessageBase, RunLastMessage> last_msg;
188     bool read_eof = false;
189     while (!read_eof) {
190         //more code...
201         // Read.
202         const ssize_t nr = m->DoRead(once_read);
233         while (1) {
234             size_t index = 8888;
235             ParseResult pr = messenger->CutInputMessage(m, &index, read_eof);
285             // This unique_ptr prevents msg to be lost before transfering
286             // ownership to last_msg
287             DestroyingPtr<InputMessageBase> msg(pr.message());
288             QueueMessage(last_msg.release(), &num_bthread_created,
289                              m->_keytable_pool);
290             if (handlers[index].process == NULL) {
292                 continue;
293             }
294             m->ReAddress(&msg->_socket);
295             m->PostponeEOF();
296             msg->_process = handlers[index].process;
297             msg->_arg = handlers[index].arg;
318             if (!m->is_read_progressive()) {
319                 // Transfer ownership to last_msg
320                 last_msg.reset(msg.release());
321             } else {
322                 QueueMessage(msg.release(), &num_bthread_created,
323                                  m->_keytable_pool);
324                 bthread_flush();
325                 num_bthread_created = 0;
326             }
327         //more code...
336 }

由于这块逻辑比较复杂,贴上核心实现,并贴上github上的设计思想io.md

收消息

消息”指从连接读入的有边界的二进制串,可能是来自上游client的request或来自下游server的response。brpc使用一个或多个EventDispatcher(简称为EDISP)等待任一fd发生事件。和常见的“IO线程”不同,EDISP不负责读取。IO线程的问题在于一个线程同时只能读一个fd,当多个繁忙的fd聚集在一个IO线程中时,一些读取就被延迟了。多租户、复杂分流算法,Streaming RPC等功能会加重这个问题。高负载下常见的某次读取卡顿会拖慢一个IO线程中所有fd的读取,对可用性的影响幅度较大。

由于epoll的一个bug(开发brpc时仍有)及epoll_ctl较大的开销,EDISP使用Edge triggered模式。当收到事件时,EDISP给一个原子变量加1,只有当加1前的值是0时启动一个bthread处理对应fd上的数据。在背后,EDISP把所在的pthread让给了新建的bthread,使其有更好的cache locality,可以尽快地读取fd上的数据。而EDISP所在的bthread会被偷到另外一个pthread继续执行,这个过程即是bthread的work stealing调度。要准确理解那个原子变量的工作方式可以先阅读atomic instructions,再看Socket::StartInputEvent。这些方法使得brpc读取同一个fd时产生的竞争是wait-free的。

回到上面处理消息的流程,DoRead读到一段数据后,会调用CutInputMessage切割消息:

724     // last chosen index of the protocol as a heuristic value to avoid
725     // iterating all protocol handlers each time.
726     //int _preferred_index;

 63 ParseResult InputMessenger::CutInputMessage(
 64         Socket* m, size_t* index, bool read_eof) {
 65     const int preferred = m->preferred_index();
 66     const int max_index = (int)_max_index.load(butil::memory_order_acquire);
 67     // Try preferred handler first. The preferred_index is set on last
 68     // selection or by client.
 69     if (preferred >= 0 && preferred <= max_index
 70             && _handlers[preferred].parse != NULL) {
 71         ParseResult result =
 72             _handlers[preferred].parse(&m->_read_buf, m, read_eof, _handlers[preferred].arg);
 73         if (result.is_ok() ||
 74             result.error() == PARSE_ERROR_NOT_ENOUGH_DATA) {
 75             *index = preferred;
 76             return result;
 77         }
103     for (int i = 0; i <= max_index; ++i) {
104         if (i == preferred || _handlers[i].parse == NULL) {
105             // Don't try preferred handler(already tried) or invalid handler
106             continue;
107         }
108         ParseResult result = _handlers[i].parse(&m->_read_buf, m, read_eof, _handlers[i].arg);
109         if (result.is_ok() ||
110             result.error() == PARSE_ERROR_NOT_ENOUGH_DATA) {
111             m->set_preferred_index(i);
112             *index = i;
113             return result;
114         }
132 }

部分代码如上,这里一开始preferred为-1,第一次切割时会尝试哪种协议并记录下来,因为一条链接上的消息格式一般是固定的,这样后面再parse消息时,就不用for循环去尝试是哪种协议了。话说,这里有个疑问,如果在每个rpc协议请求中加上哪种协议不是没问题?可能节省一个字段?这里定位到是baidu协议。

parse实现如下:

 94 ParseResult ParseRpcMessage(butil::IOBuf* source, Socket* socket,
 95                             bool /*read_eof*/, const void*) {
 96     char header_buf[12];
 97     const size_t n = source->copy_to(header_buf, sizeof(header_buf));
 98     if (n >= 4) {
 99         void* dummy = header_buf;
100         if (*(const uint32_t*)dummy != *(const uint32_t*)"PRPC") {
101             return MakeParseError(PARSE_ERROR_TRY_OTHERS);
102         }
103     } else {
104         if (memcmp(header_buf, "PRPC", n) != 0) {
105             return MakeParseError(PARSE_ERROR_TRY_OTHERS);
106         }
107     }
108     //more code...
130     source->pop_front(sizeof(header_buf));
131     MostCommonMessage* msg = MostCommonMessage::Get();
132     source->cutn(&msg->meta, meta_size);
133     source->cutn(&msg->payload, body_size - meta_size);
134     return MakeMessage(msg);
135 }

以上解析出头部,共12节字,和meta/body大小,具体的意义不再分析,每个协议都不一样,这些细节可能随着业务不一样,如果这时解析成功则表示baidu协议并mark。

消息切割完,会启动一个新的bthread处理:

285             // This unique_ptr prevents msg to be lost before transfering
286             // ownership to last_msg
287             DestroyingPtr<InputMessageBase> msg(pr.message());
288             QueueMessage(last_msg.release(), &num_bthread_created,
289                              m->_keytable_pool);

296             msg->_process = handlers[index].process;
297             msg->_arg = handlers[index].arg;

328         if (num_bthread_created) {
329             bthread_flush();
330         }
146 static void QueueMessage(InputMessageBase* to_run_msg,
147                          int* num_bthread_created,
148                          bthread_keytable_pool_t* keytable_pool) {
149     if (!to_run_msg) {
150         return;
151     }       
152     // Create bthread for last_msg. The bthread is not scheduled
153     // until bthread_flush() is called (in the worse case).
154                 
155     // TODO(gejun): Join threads.
156     bthread_t th;
157     bthread_attr_t tmp = (FLAGS_usercode_in_pthread ?
158                           BTHREAD_ATTR_PTHREAD :
159                           BTHREAD_ATTR_NORMAL) | BTHREAD_NOSIGNAL;
160     tmp.keytable_pool = keytable_pool;
161     if (bthread_start_background(
162             &th, &tmp, ProcessInputMessage, to_run_msg) == 0) { 
163         ++*num_bthread_created;
164     } else {                         
165         ProcessInputMessage(to_run_msg);
166     }
167 }

134 void* ProcessInputMessage(void* void_arg) {
135     InputMessageBase* msg = static_cast<InputMessageBase*>(void_arg);
136     msg->_process(msg);
137     return NULL;
138 } 

最后会调到ProcessRpcRequest,由于ProcessRpcRequest处理请求逻辑过于复杂,这里贴上关键逻辑:

304 void ProcessRpcRequest(InputMessageBase* msg_base) {
306     DestroyingPtr<MostCommonMessage> msg(static_cast<MostCommonMessage*>(msg_base));
309     const Server* server = static_cast<const Server*>(msg_base->arg());
312     RpcMeta meta;
313     if (!ParsePbFromIOBuf(&meta, msg->meta)) {
314         //error
318     }
319     const RpcRequestMeta &request_meta = meta.request();
333     std::unique_ptr<Controller> cntl(new (std::nothrow) Controller);
348     cntl->set_request_compress_type((CompressType)meta.compress_type());

411         butil::StringPiece svc_name(request_meta.service_name());
412         if (svc_name.find('.') == butil::StringPiece::npos) {
413             const Server::ServiceProperty* sp =
414                 server_accessor.FindServicePropertyByName(svc_name);
415             if (NULL == sp) {
416                 //error
418                 break;
419             }
420             svc_name = sp->service->GetDescriptor()->full_name();
421         }
422         const Server::MethodProperty* mp =
423             server_accessor.FindMethodPropertyByFullName(
424                 svc_name, request_meta.method_name());
425         if (NULL == mp) {
426             //error
429             break;
430         } else if (mp->service->GetDescriptor()
431                    == BadMethodService::descriptor()) {
432             //error
436             break;
437         }
471         CompressType req_cmp_type = (CompressType)meta.compress_type();
472         req.reset(svc->GetRequestPrototype(method).New());
473         if (!ParseFromCompressedData(*req_buf_ptr, req.get(), req_cmp_type)) {
474             //error
477             break;
478         }
482         google::protobuf::Closure* done = ::brpc::NewCallback<
483             int64_t, Controller*, const google::protobuf::Message*,
484             const google::protobuf::Message*, const Server*,
485             MethodStatus*, int64_t>(
486                 &SendRpcResponse, meta.correlation_id(), cntl.get(),
487                 req.get(), res.get(), server,
488                 method_status, msg->received_us());

503             svc->CallMethod(method, cntl.release(),
504                             req.release(), res.release(), done);
519 }

以上差不多是跟client相反的过程,从msg中ParsePbFromIOBuf出meta数据;并new一个Controller;接着设置一些参数(此代码省略);设置该bthread所在线程的局部存储数据;根据FindMethodPropertyByFullName找到对应的服务处理,接着ParseFromCompressedData解压请求数据:

 81 bool ParseFromCompressedData(const butil::IOBuf& data,
 82                              google::protobuf::Message* msg,
 83                              CompressType compress_type) {
 84     if (compress_type == COMPRESS_TYPE_NONE) {
 85         return ParsePbFromIOBuf(msg, data);
 86     }       
 87     const CompressHandler* handler = FindCompressHandler(compress_type);
 88     if (NULL != handler) {
 89         return handler->Decompress(data, msg);
 90     }       
 91     return false;
 92 }

接着设置个callback的done,里面做了一些其他事情,后面再分析。接着调用CallMethod,比如example/echo_c++/echo.pb.cc:

703 void EchoService::CallMethod(const ::google::protobuf::MethodDescriptor* method,
704                              ::google::protobuf::RpcController* controller,
705                              const ::google::protobuf::Message* request,
706                              ::google::protobuf::Message* response,
707                              ::google::protobuf::Closure* done) {
708   GOOGLE_DCHECK_EQ(method->service(), protobuf_echo_2eproto::file_level_service_descriptors[0]);
709   switch(method->index()) {
710     case 0:
711       Echo(controller,
712              ::google::protobuf::down_cast<const ::example::EchoRequest*>(request),
713              ::google::protobuf::down_cast< ::example::EchoResponse*>(response),
714              done);
715       break;
716     default:
718       break;
719   }
720 }

 36 class EchoServiceImpl : public EchoService {
 37 public:
 38     EchoServiceImpl() {};
 39     virtual ~EchoServiceImpl() {};
 40     virtual void Echo(google::protobuf::RpcController* cntl_base,
 41                       const EchoRequest* request,
 42                       EchoResponse* response,
 43                       google::protobuf::Closure* done) {
 44         // This object helps you to call done->Run() in RAII style. If you need
 45         // to process the request asynchronously, pass done_guard.release().
 46         brpc::ClosureGuard done_guard(done);
 47 
 48         brpc::Controller* cntl =
 49             static_cast<brpc::Controller*>(cntl_base);
73     }
 74 };

其中ClosureGuard通过RAII手段在超出临界区后会调用析构函数并_done->Run(),而后者的实现基本如下:

 194   void Run() {
 195     bool needs_delete = self_deleting_;  // read in case callback deletes
 196     (get_pointer(object_)->*method_)(arg1_);
 197     if (needs_delete) delete this;
 198   }

这里最后会调用SendRpcResponse,即响应请求。

server响应请求

138 void SendRpcResponse(int64_t correlation_id,
139                      Controller* cntl,
140                      const google::protobuf::Message* req,
141                      const google::protobuf::Message* res,
142                      const Server* server,
143                      MethodStatus* method_status,
144                      int64_t received_us) {
164     butil::IOBuf res_body;
168     CompressType type = cntl->response_compress_type();
169     if (res != NULL && !cntl->Failed()) {
170         if (!res->IsInitialized()) {
171             //error
174         } else if (!SerializeAsCompressedData(*res, &res_body, type)) {
175             //error
177         } else {
178             append_body = true;
179         }
180     }
196     RpcMeta meta;
197     RpcResponseMeta* response_meta = meta.mutable_response();
204     meta.set_correlation_id(correlation_id);
205     meta.set_compress_type(cntl->response_compress_type());
221     butil::IOBuf res_buf;
222     SerializeRpcHeaderAndMeta(&res_buf, meta, res_size + attached_size);
223     if (append_body) {
224         res_buf.append(res_body.movable());
225         if (attached_size) {
226             res_buf.append(cntl->response_attachment().movable());
227         }
228     }
253         Socket::WriteOptions wopt; 
254         wopt.ignore_eovercrowded = true; 
255         if (sock->Write(&res_buf, &wopt) != 0) {
256             //error
260             return;
261         }
268 }

以上给client回包,SerializeAsCompressedData响应body数据,带上meta头部数据,跟client发请求时一样,最后Write,这部分后面再分析。

client收到响应

跟server相反,当socket可读时,即有消息到来,此时最终走process_response,即ProcessRpcResponse:

545 void ProcessRpcResponse(InputMessageBase* msg_base) {
547     DestroyingPtr<MostCommonMessage> msg(static_cast<MostCommonMessage*>(msg_base));
548     RpcMeta meta;
549     if (!ParsePbFromIOBuf(&meta, msg->meta)) {
550         //error
552     }
553 
554     const bthread_id_t cid = { static_cast<uint64_t>(meta.correlation_id()) };
556     Controller* cntl = NULL;
557     const int rc = bthread_id_lock(cid, (void**)&cntl);
558     if (rc != 0) {
559         //error
564         return;
565     }
579     const RpcResponseMeta &response_meta = meta.response();
606         const CompressType res_cmp_type = (CompressType)meta.compress_type();
607         cntl->set_response_compress_type(res_cmp_type);
608         if (cntl->response()) {
609             if (!ParseFromCompressedData(
610                     *res_buf_ptr, cntl->response(), res_cmp_type)) {
611                 //error
615             }
616         }
 46     void OnResponse(CallId id, int saved_error) {
 47         const Controller::CompletionInfo info = { id, true };
 48         _cntl->OnVersionedRPCReturned(info, false, saved_error);
 49     }

早些时间分析过bthread_id相关的,这里bthread收到响应后会尝试bthread_id_lock住同一个rpc的处理,然后处理响应,并OnVersionedRPCReturned,其中要唤醒最初client发起rpc时的bthread。

上面有很多细节没有分析,本身细节太多可能会陷进去,这边分析整个大概流程,消息走向,有些实现后面再慢慢分析,要分析的还是挺多的。这里贴上官方一张流程图:


The full picture

相关文章

网友评论

      本文标题:brpc之消息处理流程

      本文链接:https://www.haomeiwen.com/subject/awkobctx.html