美文网首页
使用C++实现简单的Reactor模式

使用C++实现简单的Reactor模式

作者: 老杜振熙 | 来源:发表于2020-11-19 19:54 被阅读0次

注:本文是阅读muduo网络库之后的理解以及自己的代码实现

IO多路复用是Reactor的核心,简单来说,我们将多个文件描述符存放于一个统一的框架之中进行管理,每当某一个文件描述符中发生了某个事件,系统就去执行对应的响应函数,这也被称为回调。如图1所示是Reactor的流程描述,fd_2变为红色代表着该文件描述符当前存在激活事件,系统将其推入Event Callback Trigger的卡槽中,代表着fd_2的开关开启,系统开始执行对应的回调函数。

图1. Reactor的多路复用流程示意.png

实现一个Reactor模式需要将其中的几个核心部件进行拆分。

首先,我们知道,对于对象,它们都拥有自己的回调函数,而对象本身,在Linux中可以用文件描述符进行表示,这包括但不限于各类文件资源、标准输入输出、SOCKET通信甚至是定时器。综上,对于这一块,我们使用一个class Channel对一个文件描述符和它对应的回调函数进行封装,这在图1中对应了各个橙色的小卡片。

其次,整个系统应该有一个最上层的抽象描述。所谓抽象描述,是我们不去关心具体的实现,而是需要描述出这整个系统的核心特点。在这里,虽然是IO多路复用,但更抽象的来说,这个系统其实是管理所有文件描述符的所有事件(包括可读可写,等等),并在适当时机执行指定的回调函数。所以,这实际上是一个事件循环,系统无非是对所有的事件进行轮询,探测哪些事件在当前是激活了的。所以,class EventLoop就是我们的抽象表示。class EventLoop的核心函数,即EventLoop::loop(),总结一下就是:轮询所有的事件,得到激活了的事件对应的class Channel,在这些Channel执行各自的回调函数。

最后,便是如何进行轮询。Linux中的IO多路复用无非就是select()poll()epoll()三个函数,这里,为了简单而又不至于使系统性能太差,我们使用poll()。直接使用Linux底层的系统调用肯定是有悖于C++的风格的,因此我们使用class Poller对其进行封装。那么,除了poll(),还需要封装哪些东西呢?回忆一下poll(),该函数需要一个struct pollfd组成的数组,每个struct pollfd对应了一个文件描述符,那么在我们的系统中,它也就对应着一个Channel

是不是有点绕晕了?没关系,这里画一张图来说明整个class EventLoop的执行流程,如图2所示:

  • class EventLoop开始执行EventLoop::loop(),表示事件循环开始;
  • ② 系统将控制权交给class Poller,该类中封装有真正的IO多路复用底层调用;
  • ③ 系统执行poll()函数,在vector<struct pollfd>得到更新,随后再得到对应的Active Channels,两者之间通过文件描述符予以关联,之后再将控制权交给class EventLoop
  • ④ 所有的Active Channels依次执行各自的事件回调。
图2. Reactor系统流程图

最后再总结一下实现过程中一些需要注意的要点:

  • 唯一性:class EventLoop作为上层抽象,在一个IO线程中只能有一个实例;
  • 注册:每生成一个class Channel的实例,且需要对其进行监听时,则自动在EventLoop中进行注册;
  • 一对一:class Channelstruct pollfd是一对一的关系,class Poller每执行一次class Poller::poll()之后,只能得到激活的struct pollfd,但却需要返回给class EventLoop激活的class Channel,因此在class Poller中需要一个std::map<int, Channel *>的查找树,int指代的是文件描述符。
  • 前向声明:在编写类的头文件的时候,如果两个类互相需要依赖另一个类,则为了避免依赖限制,可以使用前向声明,但这样的话,就需要注意,构造函数和析构函数的定义最好都写在.cc文件中,因为头文件中的类型尚不完整

具体实现

/*  Channel.h */
#ifndef CHANNEL_H
#define CHANNEL_H

#include <functional>

// forward declaration, avoid dependency
class EventLoop;

// Dispatch of event
class Channel
{
  using eventCallback = std::function<void()>;
private:
  EventLoop *ownerLoop_; // ownerLoop_ owns this Channel
  int fd_; // just as fd in struct pollfd
  int events_;
  int revents_;

  int index_; // necessary in Poller, -1 as inactivated

  static const int kNoneEvent;
  static const int kReadEvent;
  static const int kWriteEvent;

  eventCallback errorCallback_;
  eventCallback readCallback_;
  eventCallback writeCallback_;

public:
  Channel(EventLoop *loop, int fd);
  ~Channel();

  int fd() const {
    return fd_;
  }
  int events() const {
    return events_;
  }
  int revents() const {
    return revents_;
  }
  int index() const {
    return index_;
  }

  bool isNoneEvent() const {
    return events_ == kNoneEvent;
  }

  void set_index(int idx){
    index_ = idx;
  }
  void enableRead(){
    events_ = events_ | kReadEvent;
    update();
  }
  void enableWrite(){
    events_ = events_ | kWriteEvent;
    update();
  }
  void set_revent(int revents){
    revents_ = revents;
  }

  void setErrorCallback(const eventCallback cb){
    errorCallback_ = cb;
  }
  void setReadCallback(const eventCallback cb){
    readCallback_ = cb;
  }
  void setWriteCallback(const eventCallback cb){
    writeCallback_ = cb;
  }

  void handleEvents();
  void update(); // Channel::update() ==> EventLoop::updateChannel()
  // ==> Poller::updateChannel()
};

#endif /* CHANNEL_H */

/* Channel.cc */
#include "Channel.h"
#include "EventLoop.h"
#include <poll.h>

// static variables should have a initialization
const int Channel::kNoneEvent = 0;
const int Channel::kReadEvent = POLLIN | POLLPRI;
const int Channel::kWriteEvent = POLLOUT;

Channel::Channel(EventLoop *loop, int fd):
  ownerLoop_(loop),
  fd_(fd),
  events_(0),
  revents_(0),
  index_(-1)
{

}

Channel::~Channel() {

}

void Channel::handleEvents() {
  if(revents_ & (POLLNVAL|POLLERR)){
    printf("error event comes\n");
    if(errorCallback_) errorCallback_();
  }
  if(revents_ & (POLLIN | POLLPRI | POLLRDHUP)){
    printf("read event comes\n");
    if(readCallback_) readCallback_();
  }
  if(revents_ & POLLOUT){
    printf("write event comes\n");
    if(writeCallback_) writeCallback_();
  }
}

void Channel::update(){
  ownerLoop_->updateChannel(this);
}

/* Poller.h */
#ifndef POLLER_H
#define POLLER_H

#include <vector>
#include <map>
#include <muduo/base/noncopyable.h>

class Channel;
class EventLoop;
struct pollfd;

class Poller : muduo::noncopyable
{
  using ChannelVec = std::vector<Channel *>;
  using ChannelMap = std::map<int, Channel *>; // fd => Channel *
private:
  std::vector<struct pollfd> pollfds_;
  ChannelMap channels_;
  EventLoop *ownerLoop_;

public:
  Poller(EventLoop *loop);
  ~Poller();

  void updateChannel(Channel *);
  void poll(int maxWaitTimeM, ChannelVec *activeChannels);
  void fillActiveChannels(int activeNum, ChannelVec *activeChannels);
};

#endif /* POLLER_H */

/* Poller.cc */
#include <assert.h>
#include <poll.h>
#include "Poller.h"
#include "Channel.h"
#include <muduo/base/Logging.h>

Poller::Poller(EventLoop *loop): ownerLoop_(loop) {

}

Poller::~Poller() {

}

void Poller::updateChannel(Channel *ch){
  if(ch->index() < 0){ // this channel is a new one
    assert(channels_.find(ch->fd()) == channels_.end());
    struct pollfd tmp;
    tmp.events = ch->events();
    tmp.revents = ch->revents();
    tmp.fd = ch->fd();
    pollfds_.push_back(tmp);
    channels_[tmp.fd] = ch;
    ch->set_index(pollfds_.size()-1);
  } else { // it alreay stores in pollfds_, we just change the value
    assert(channels_.find(ch->fd()) != channels_.end());
    struct pollfd &tmp = pollfds_[ch->index()];
    tmp.events = ch->events();
    tmp.revents = ch->revents();
    if(ch->isNoneEvent()){
      tmp.fd = -1; // no event under watched, so set -1
    }
  }
}

void Poller::poll(int maxWaitTimeM, ChannelVec *activeChannels){
  int activeNum = ::poll(pollfds_.data(), pollfds_.size(), maxWaitTimeM);
  if(activeNum>0){
    fillActiveChannels(activeNum, activeChannels);
  } else if(!activeNum){
    LOG_INFO << "No active event in " << maxWaitTimeM << " m seconds";
  } else {
    LOG_FATAL << "ERROR occurs when ::poll()";
  }
}

void Poller::fillActiveChannels(int activeNum, ChannelVec *activeChannels){
  for(const auto &tmp: pollfds_){
    if(activeNum<=0)  break;
    if(tmp.revents>0){
      assert(channels_.find(tmp.fd) != channels_.end());
      --activeNum;
      channels_[tmp.fd]->set_revent(tmp.revents); // revent of channel should be updated
      activeChannels->push_back(channels_[tmp.fd]);
    }
  }
}

/* EventLoop.h */
#ifndef EVENTLOOP_H
#define EVENTLOOP_H

#include <vector>
#include <memory>

class Channel;
class Poller;

// core of Reactor
class EventLoop
{
  using ChannelVec = std::vector<Channel *>;
private:
  ChannelVec activeChannels_;
  bool quit_; // should be atomatic
  bool isLoopping_;
  const pid_t threadId_; // notes of IO thread
  int maxWaitTimeM;
  std::unique_ptr<Poller> pollerPtr_;

  void assertInLoopThread();

public:
  EventLoop();
  ~EventLoop();

  // precondition:
  // 1. should be IO thread
  // 2. cannot call loop() repeately
  void loop();

  void updateChannel(Channel *ch);
  void quit() {
    quit_ =true;
  }
};

#endif /* EVENTLOOP_H */

/* EventLoop.cc */
#include "EventLoop.h"
#include "Channel.h"
#include "Poller.h"
#include <muduo/base/Thread.h>
#include <muduo/base/Logging.h>

__thread EventLoop * loopOfCurrentThread_ = 0;

EventLoop::EventLoop():
  quit_(false),
  isLoopping_(false),
  threadId_(muduo::CurrentThread::tid()),
  maxWaitTimeM(10000),
  pollerPtr_(new Poller (this))
{
  if(loopOfCurrentThread_){
    LOG_FATAL << "Already has an EventLoop!";
  } else {
    loopOfCurrentThread_ = this;
  }
}

EventLoop::~EventLoop(){
  LOG_INFO << "EventLoop " << this << "deconstructed in thread " 
    << muduo::CurrentThread::tid();
  loopOfCurrentThread_ = 0;
}

void EventLoop::loop(){
  assert(!isLoopping_);
  assertInLoopThread();
  isLoopping_ = true;
  quit_ = false;
  while(!quit_){
    activeChannels_.clear();
    pollerPtr_->poll(maxWaitTimeM, &activeChannels_);
    for(auto &ch: activeChannels_){
      ch->handleEvents();
    }
  }
  isLoopping_ = false;
  LOG_INFO << "EventLoop::loop() stopped";
}

void EventLoop::assertInLoopThread() {
  assert(threadId_ == muduo::CurrentThread::tid());
}

void EventLoop::updateChannel(Channel *ch){
  pollerPtr_->updateChannel(ch);
}


相关文章

网友评论

      本文标题:使用C++实现简单的Reactor模式

      本文链接:https://www.haomeiwen.com/subject/gashbktx.html