注:本文是阅读muduo网络库之后的理解以及自己的代码实现
IO多路复用是Reactor的核心,简单来说,我们将多个文件描述符存放于一个统一的框架之中进行管理,每当某一个文件描述符中发生了某个事件,系统就去执行对应的响应函数,这也被称为回调。如图1所示是Reactor的流程描述,fd_2
变为红色代表着该文件描述符当前存在激活事件,系统将其推入Event Callback Trigger
的卡槽中,代表着fd_2
的开关开启,系统开始执行对应的回调函数。

实现一个Reactor模式需要将其中的几个核心部件进行拆分。
首先,我们知道,对于对象,它们都拥有自己的回调函数,而对象本身,在Linux中可以用文件描述符进行表示,这包括但不限于各类文件资源、标准输入输出、SOCKET通信甚至是定时器。综上,对于这一块,我们使用一个class Channel
对一个文件描述符和它对应的回调函数进行封装,这在图1中对应了各个橙色的小卡片。
其次,整个系统应该有一个最上层的抽象描述。所谓抽象描述,是我们不去关心具体的实现,而是需要描述出这整个系统的核心特点。在这里,虽然是IO多路复用,但更抽象的来说,这个系统其实是管理所有文件描述符的所有事件(包括可读,可写,等等),并在适当时机执行指定的回调函数。所以,这实际上是一个事件循环,系统无非是对所有的事件进行轮询,探测哪些事件在当前是激活了的。所以,class EventLoop
就是我们的抽象表示。class EventLoop
的核心函数,即EventLoop::loop()
,总结一下就是:轮询所有的事件,得到激活了的事件对应的class Channel
,在这些Channel
执行各自的回调函数。
最后,便是如何进行轮询。Linux中的IO多路复用无非就是select()
,poll()
,epoll()
三个函数,这里,为了简单而又不至于使系统性能太差,我们使用poll()
。直接使用Linux底层的系统调用肯定是有悖于C++的风格的,因此我们使用class Poller
对其进行封装。那么,除了poll()
,还需要封装哪些东西呢?回忆一下poll()
,该函数需要一个struct pollfd
组成的数组,每个struct pollfd
对应了一个文件描述符,那么在我们的系统中,它也就对应着一个Channel
。
是不是有点绕晕了?没关系,这里画一张图来说明整个class EventLoop
的执行流程,如图2所示:
- ①
class EventLoop
开始执行EventLoop::loop()
,表示事件循环开始; - ② 系统将控制权交给
class Poller
,该类中封装有真正的IO多路复用底层调用; - ③ 系统执行
poll()
函数,在vector<struct pollfd>
得到更新,随后再得到对应的Active Channels
,两者之间通过文件描述符予以关联,之后再将控制权交给class EventLoop
; - ④ 所有的
Active Channels
依次执行各自的事件回调。

最后再总结一下实现过程中一些需要注意的要点:
- 唯一性:
class EventLoop
作为上层抽象,在一个IO线程中只能有一个实例; - 注册:每生成一个
class Channel
的实例,且需要对其进行监听时,则自动在EventLoop
中进行注册; - 一对一:
class Channel
和struct pollfd
是一对一的关系,class Poller
每执行一次class Poller::poll()
之后,只能得到激活的struct pollfd
,但却需要返回给class EventLoop
激活的class Channel
,因此在class Poller
中需要一个std::map<int, Channel *>
的查找树,int
指代的是文件描述符。 - 前向声明:在编写类的头文件的时候,如果两个类互相需要依赖另一个类,则为了避免依赖限制,可以使用前向声明,但这样的话,就需要注意,构造函数和析构函数的定义最好都写在.cc文件中,因为头文件中的类型尚不完整
具体实现
/* Channel.h */
#ifndef CHANNEL_H
#define CHANNEL_H
#include <functional>
// forward declaration, avoid dependency
class EventLoop;
// Dispatch of event
class Channel
{
using eventCallback = std::function<void()>;
private:
EventLoop *ownerLoop_; // ownerLoop_ owns this Channel
int fd_; // just as fd in struct pollfd
int events_;
int revents_;
int index_; // necessary in Poller, -1 as inactivated
static const int kNoneEvent;
static const int kReadEvent;
static const int kWriteEvent;
eventCallback errorCallback_;
eventCallback readCallback_;
eventCallback writeCallback_;
public:
Channel(EventLoop *loop, int fd);
~Channel();
int fd() const {
return fd_;
}
int events() const {
return events_;
}
int revents() const {
return revents_;
}
int index() const {
return index_;
}
bool isNoneEvent() const {
return events_ == kNoneEvent;
}
void set_index(int idx){
index_ = idx;
}
void enableRead(){
events_ = events_ | kReadEvent;
update();
}
void enableWrite(){
events_ = events_ | kWriteEvent;
update();
}
void set_revent(int revents){
revents_ = revents;
}
void setErrorCallback(const eventCallback cb){
errorCallback_ = cb;
}
void setReadCallback(const eventCallback cb){
readCallback_ = cb;
}
void setWriteCallback(const eventCallback cb){
writeCallback_ = cb;
}
void handleEvents();
void update(); // Channel::update() ==> EventLoop::updateChannel()
// ==> Poller::updateChannel()
};
#endif /* CHANNEL_H */
/* Channel.cc */
#include "Channel.h"
#include "EventLoop.h"
#include <poll.h>
// static variables should have a initialization
const int Channel::kNoneEvent = 0;
const int Channel::kReadEvent = POLLIN | POLLPRI;
const int Channel::kWriteEvent = POLLOUT;
Channel::Channel(EventLoop *loop, int fd):
ownerLoop_(loop),
fd_(fd),
events_(0),
revents_(0),
index_(-1)
{
}
Channel::~Channel() {
}
void Channel::handleEvents() {
if(revents_ & (POLLNVAL|POLLERR)){
printf("error event comes\n");
if(errorCallback_) errorCallback_();
}
if(revents_ & (POLLIN | POLLPRI | POLLRDHUP)){
printf("read event comes\n");
if(readCallback_) readCallback_();
}
if(revents_ & POLLOUT){
printf("write event comes\n");
if(writeCallback_) writeCallback_();
}
}
void Channel::update(){
ownerLoop_->updateChannel(this);
}
/* Poller.h */
#ifndef POLLER_H
#define POLLER_H
#include <vector>
#include <map>
#include <muduo/base/noncopyable.h>
class Channel;
class EventLoop;
struct pollfd;
class Poller : muduo::noncopyable
{
using ChannelVec = std::vector<Channel *>;
using ChannelMap = std::map<int, Channel *>; // fd => Channel *
private:
std::vector<struct pollfd> pollfds_;
ChannelMap channels_;
EventLoop *ownerLoop_;
public:
Poller(EventLoop *loop);
~Poller();
void updateChannel(Channel *);
void poll(int maxWaitTimeM, ChannelVec *activeChannels);
void fillActiveChannels(int activeNum, ChannelVec *activeChannels);
};
#endif /* POLLER_H */
/* Poller.cc */
#include <assert.h>
#include <poll.h>
#include "Poller.h"
#include "Channel.h"
#include <muduo/base/Logging.h>
Poller::Poller(EventLoop *loop): ownerLoop_(loop) {
}
Poller::~Poller() {
}
void Poller::updateChannel(Channel *ch){
if(ch->index() < 0){ // this channel is a new one
assert(channels_.find(ch->fd()) == channels_.end());
struct pollfd tmp;
tmp.events = ch->events();
tmp.revents = ch->revents();
tmp.fd = ch->fd();
pollfds_.push_back(tmp);
channels_[tmp.fd] = ch;
ch->set_index(pollfds_.size()-1);
} else { // it alreay stores in pollfds_, we just change the value
assert(channels_.find(ch->fd()) != channels_.end());
struct pollfd &tmp = pollfds_[ch->index()];
tmp.events = ch->events();
tmp.revents = ch->revents();
if(ch->isNoneEvent()){
tmp.fd = -1; // no event under watched, so set -1
}
}
}
void Poller::poll(int maxWaitTimeM, ChannelVec *activeChannels){
int activeNum = ::poll(pollfds_.data(), pollfds_.size(), maxWaitTimeM);
if(activeNum>0){
fillActiveChannels(activeNum, activeChannels);
} else if(!activeNum){
LOG_INFO << "No active event in " << maxWaitTimeM << " m seconds";
} else {
LOG_FATAL << "ERROR occurs when ::poll()";
}
}
void Poller::fillActiveChannels(int activeNum, ChannelVec *activeChannels){
for(const auto &tmp: pollfds_){
if(activeNum<=0) break;
if(tmp.revents>0){
assert(channels_.find(tmp.fd) != channels_.end());
--activeNum;
channels_[tmp.fd]->set_revent(tmp.revents); // revent of channel should be updated
activeChannels->push_back(channels_[tmp.fd]);
}
}
}
/* EventLoop.h */
#ifndef EVENTLOOP_H
#define EVENTLOOP_H
#include <vector>
#include <memory>
class Channel;
class Poller;
// core of Reactor
class EventLoop
{
using ChannelVec = std::vector<Channel *>;
private:
ChannelVec activeChannels_;
bool quit_; // should be atomatic
bool isLoopping_;
const pid_t threadId_; // notes of IO thread
int maxWaitTimeM;
std::unique_ptr<Poller> pollerPtr_;
void assertInLoopThread();
public:
EventLoop();
~EventLoop();
// precondition:
// 1. should be IO thread
// 2. cannot call loop() repeately
void loop();
void updateChannel(Channel *ch);
void quit() {
quit_ =true;
}
};
#endif /* EVENTLOOP_H */
/* EventLoop.cc */
#include "EventLoop.h"
#include "Channel.h"
#include "Poller.h"
#include <muduo/base/Thread.h>
#include <muduo/base/Logging.h>
__thread EventLoop * loopOfCurrentThread_ = 0;
EventLoop::EventLoop():
quit_(false),
isLoopping_(false),
threadId_(muduo::CurrentThread::tid()),
maxWaitTimeM(10000),
pollerPtr_(new Poller (this))
{
if(loopOfCurrentThread_){
LOG_FATAL << "Already has an EventLoop!";
} else {
loopOfCurrentThread_ = this;
}
}
EventLoop::~EventLoop(){
LOG_INFO << "EventLoop " << this << "deconstructed in thread "
<< muduo::CurrentThread::tid();
loopOfCurrentThread_ = 0;
}
void EventLoop::loop(){
assert(!isLoopping_);
assertInLoopThread();
isLoopping_ = true;
quit_ = false;
while(!quit_){
activeChannels_.clear();
pollerPtr_->poll(maxWaitTimeM, &activeChannels_);
for(auto &ch: activeChannels_){
ch->handleEvents();
}
}
isLoopping_ = false;
LOG_INFO << "EventLoop::loop() stopped";
}
void EventLoop::assertInLoopThread() {
assert(threadId_ == muduo::CurrentThread::tid());
}
void EventLoop::updateChannel(Channel *ch){
pollerPtr_->updateChannel(ch);
}
网友评论