zoukankan      html  css  js  c++  java
  • 一个基于线程池和epoll的IO事件管理器

    前面几篇博客介绍了Epoll, ThreadPool,

    其中 Epoll 封装了epoll的各类api, 可在epoll句柄中添加/修改/删除 fd 的 各类事件(EPOLLIN | EPOLLOUT), 可以返回一个 active的事件集合,并提供了一个回调函数指针,统一处理这些事件。

    ThreadPool 则封装了一个任务队列,一个线程vector, 可以网线程池添加任务,并新建线程从任务队列取出任务执行。

    这一篇将结合前面的Epoll和ThreadPool, 实现一个事件管理器,使用Epoll 添加和监听 fd 的 可读或可写事件,注册统一的事件处理接口来处理active的事件集合,将该回调函数放进任务队列,通知并等待线程池分配线程处理,统一的事件处理接口里则通过查询一个fd, callback的map, 找到对应fd的callback, 并加进任务队列,通知并等待线程池新建或者分配线程来处理。事件处理器每次添加或者修改fd的事件时,都会更新相应的fd, callback 的map, 以备后续回调时查询使用。

    #include "Epoll.h"
    #include "ThreadPool.h"
    #include<iostream>
    
    class EventManager{
    public:
      EventManager( int thread_pool_size );
      EventManager( const EventManager& ) = delete;
      EventManager& operator=( const EventManager& ) = delete;
      ~EventManager();
    
      void SetThreadPoolSize( size_t size ) ;
      size_t Size();
    
      void AddTask( Base::Closure* task );
      int AddTaskWaitingReadable( int fd, Base::Closure* task );
      int AddTaskWaitingWritable( int fd, Base::Closure* task );
      int RemoveAwaitingTask( int fd );
      int ModifyTaskWaitingStatus( int fd, int status, Base::Closure* task );
    
      void Start();
      void AwaitTermination();
      void Stop();
    
      void EpollAwakeHandler( const Epoll::ActiveEvents* );
    
    private:
      FixedThreadPool thread_pool_;
      Epoll epoll_;
      std::map<int, Base::Closure*> inactive_tasks_map_;
      std::mutex mutex_;
    };
    
    EventManager::EventManager(int thread_pool_size) {
        // Create at least 3 working threads. One of them is epolling thread.
        if (thread_pool_size < 3) {
            thread_pool_size = 3;
        }
        thread_pool_.SetPoolSize(thread_pool_size);
    }
    
    EventManager::~EventManager() {
        std::cout << "deleting event manager" << std::endl;
    }
    
    void EventManager::SetThreadPoolSize(size_t size) {
        std::unique_lock<std::mutex> lock(mutex_);
        thread_pool_.SetPoolSize(size);
    }
    
    size_t EventManager::Size() {
        std::unique_lock<std::mutex> lock(mutex_);
        return thread_pool_.Size();
    }
    
    
    // 将StartPolling (epoll_wait)作为一个任务,单独一个线程执行
    // 先监听得到待处理的fd和事件的集合,再调用事件统一处理接口EpollAwakeHandler来对监听到的fd及事件进行处理,其处理过程是通过查map,将fd对应的回调函数加入任务队列,等待线程池分配线程
    // 进行处理。
    void EventManager::Start() {
        // Set awake callback for epoll.
        epoll_.SetAwakeCallBack(new Epoll::EpollAwakeCallBack(
                    std::bind(&EventManager::EpollAwakeHandler, this, std::placeholders::_1))); // 用EventManager::EpollAwakeHandle函数来初始化Epoll里的回调函数,在执行Epoll::StartPolli                                                                                            // ng的时候,先epoll_wait监听得到fd和事件集合,再调用该回调函数处理发生的事件。
        // First add epolling thread to work.
        thread_pool_.AddTask(Base::NewCallBack(&Epoll::StartPolling, &epoll_)); //新建一个线程来执行任务, 把 StartPolling 放入任务队列,notify_one 一个线程来执行StartPolling 任务
    
        // Start the internal thread poll.
        thread_pool_.Start(); // 开启线程池
    }
    
    // 将监听到的事件fd集合作为参数传入EventManager::EpollAwakeHandler, 遍历事件集合,在inactive_tasks_map_里根据fd关键字查找对应的回调函数,
    //如果能找到,就将该回到函数加入到任务队列,之后会有某个线程来执行
    // EventManager::EpollAwakeHandler 相当于是一个事件处理的统一接口,封装了基于线程池对不同事件的不同处理过程
    void EventManager::EpollAwakeHandler(const Epoll::ActiveEvents* active_events) {
        std::unique_lock<std::mutex> lock(mutex_);
        for (auto i = 0; i < active_events->num(); i++) {
            int fd = active_events->events()[i].data.fd;
            if (inactive_tasks_map_.find(fd) != inactive_tasks_map_.end()) {
                //std::cout << "find task on fd " << fd << std::endl;
                thread_pool_.AddTask(inactive_tasks_map_[fd]);
            }
        }
    }
    
    void EventManager::AddTask(Base::Closure* task) {
        thread_pool_.AddTask(task);
    }
    
    // 在epoll句柄中添加了一个需要监听可读事件的描述符,并更新了inactive_tasks_map_中的该fd对应的回调函数
    int EventManager::AddTaskWaitingReadable(int fd, Base::Closure* task) {
        std::unique_lock<std::mutex> lock(mutex_);
        int ret = epoll_.AddMonitorReadableEvent(fd); // 监听该fd上是否有可读事件发生,且只监听一次,当这次监听完成后, 如果还需要继续监听这个fd, 需要再次加入
        if (ret) {    // 成功返回0, 失败返回-1
            return ret;
        }
        if (inactive_tasks_map_.find(fd) != inactive_tasks_map_.end()) {  // 如果能在inactive_tasks_map_里找到该fd对应的回调函数,就先删除掉,然后再为该fd添加新的回调函数
            delete inactive_tasks_map_[fd];
        }
        inactive_tasks_map_[fd] = task;
        return 0;
    }
    // 在epoll句柄中添加了一个需要监听可写事件的描述符,并更新了inactive_tasks_map_中的该fd对应的回调函数
    int EventManager::AddTaskWaitingWritable(int fd, Base::Closure* task) {
        std::unique_lock<std::mutex> lock(mutex_);
        int ret = epoll_.AddMonitorWritableEvent(fd);
        if (ret) {
            return ret;
        }
        if (inactive_tasks_map_.find(fd) != inactive_tasks_map_.end()) {
            delete inactive_tasks_map_[fd];
        }
        inactive_tasks_map_[fd] = task;
        return 0;
    }
    
    int EventManager::RemoveAwaitingTask(int fd) {
        std::unique_lock<std::mutex> lock(mutex_);
        int ret = epoll_.DeleteMonitoringEvent(fd); // 在epoll句柄中删除了该fd以及该fd要监听的可读事件
        if (ret) {
            return ret;
        }
        if (inactive_tasks_map_.find(fd) != inactive_tasks_map_.end()) { // 并在inactive_tasks_map_中删除掉该项key-value对
            delete inactive_tasks_map_[fd];
            inactive_tasks_map_.erase(inactive_tasks_map_.find(fd));
        }
        return 0;
    }
    // 修改该fd需要监听的事件,并更新inactive_tasks_map_中该fd对应的回调函数
    int EventManager::ModifyTaskWaitingStatus(int fd, int status, Base::Closure* task) {
        std::unique_lock<std::mutex> lock(mutex_);
        int ret = epoll_.ModifyMonitorEvent(fd, status);
        if (ret) {
            return ret;
        }
        if (inactive_tasks_map_.find(fd) != inactive_tasks_map_.end()) {
            delete inactive_tasks_map_[fd];
        }
        inactive_tasks_map_[fd] = task;
        return 0;
    }
    
    void EventManager::AwaitTermination() {
        thread_pool_.AwaitTermination();
    }
    
    void EventManager::Stop() {
        thread_pool_.Stop();
    }
    
    int main(){
        EventManager em(4);
    
        return 0;
    
    }
  • 相关阅读:
    分布式监控系统开发【day38】:报警自动升级代码解析及测试(八)
    分布式监控系统开发【day38】:报警阈值程序逻辑解析(四)
    分布式监控系统开发【day38】:监控trigger表结构设计(一)
    ubuntu 14.04 gitlab 的搭建
    u-boot2011.09 u-boot.img 的流程跟踪
    am335x u-boot2011.09 SPL 流程跟踪
    ubuntu apt-get 安装指定版本软件
    am335x Lan8710a 双网口配置
    Linux 使用tty0 显示10分钟自动关闭功能
    am335x uboot, kernel 编译
  • 原文地址:https://www.cnblogs.com/zengtx/p/11942433.html
Copyright © 2011-2022 走看看