zoukankan      html  css  js  c++  java
  • 基于C++11线程池

    • 1.包装线程对象
    class task : public std::tr1::enable_shared_from_this<task>
    {
    public:
    	task():exit_(false){}
    	task( const task & ) = delete;
    	~task(){}
    	task & operator =( const task &) = delete;
    
    	void start();
    	void stop()
    	{
    		exit_ = true;
    		sync_.notify_one();
    	}
    	void set_job( const std::function<void()> & job, const std::string & file, int line)
    	{//提交任务
    		{
    			std::unique_lock<std::mutex> lock(mutex_);
    			job_ = job;
    			file_ = file;
    			line_ = line;
    		}
    		sync_.notify_one();//通知主线程有任务要运行....
    	}
    	void print_job(){
    		LOG(INFO)<<"sumbit from:"<<file_<<":"<<line_;
    	}
    private:
    
    	bool exit_;
    	std::mutex mutex_;
    	std::condition_variable sync_;
    	std::function< void()> job_;          //线程运行的任务,线程随意时刻,最多仅仅能运行一个任务。
    	std::thread::id       id_;
    	std::string		     file_;
    	int                   line_;
    
    
    };
    void task::start()
    {
    	auto job_proxy = [this] (){
    
    
    		id_ = std::this_thread::get_id();
    
    
    		while( !exit_ )
    		{
    
    
    			std::unique_lock<std::mutex> lock(mutex_);
    			
    			if( job_ )
    			{//有任务了,须要运行任务了
    				try
    				{
    					job_(); //运行任务的代码
    				}catch( std::exception & e)
    				{
    					
    				}catch(...)
    				{
    					
    				}
    				job_ = std::function<void()>(); //释放任务绑定的资源,主要为闭包捕获的资源,特别是shared_ptr对象.
    				tasks->job_completed( shared_from_this() ); //任务运行完毕,通知线程池
    			}else{
    			       //没有任务的时候,等待其它线程提交任务。。
    				sync_.wait(lock);
    
    			}
    		}
    	};
    
    
    	std::thread t(job_proxy); //创建并启动与task管理的线程
    	t.detach();               //分离模式,thread对象销毁了。可是其创建的线程还活着。。。
    }
    

    
    
    • 2.线程池管理对象
    class task_pool
    {
    public:
    	task_pool(unsigned int pool_size = 128):max_size_(pool_size),stop_all_(true)
    	{
    
    	}
    	~task_pool()
    	{
    	}
    	void job_completed( const std::tr1::shared_ptr<task> & t)//回收task对象
    	{
    
    		std::lock_guard<std::mutex> lock(mutex_);
    		bool need_to_notify = idle_tasks_.empty() && (!wait_for_running_jobs_.empty());
    		busying_tasks_.erase(t);
    		idle_tasks_.push_back(t);
    		LOG(INFO)<<"after job_completed, current idle tasks size:"<< idle_tasks_.size()
    			<<" busying tasks size:"<<busying_tasks_.size()
    			<<" wait for running jobs size:"<<wait_for_running_jobs_.size();
    		if( !busying_tasks_.empty() ){
    			(*busying_tasks_.begin())->print_job();
    		}
    		if( need_to_notify )//任务太多了。之前空暇线程使用完了。有任务在等待运行,须要通知
    		{
    			sync_.notify_one();
    		}
    	};
    	//提交任务
    	void submit_job( const std::function<void()> & job, const std::string file, int line)
    	{
    		if( stop_all_ )
    		{
    			return;
    		}
    		std::lock_guard<std::mutex> lock(mutex_);
    		bool need_notify = wait_for_running_jobs_.empty();
    		wait_for_running_jobs_.push(std::make_tuple(job,file,line));
    
    		if( need_notify )//等待运行的任务为空时。须要通知,其它情况不须要通知.
    		{
    			sync_.notify_one();
    		}
    		
    	}
    	void execute_job()
    	{
    
    
    		while(true)
    		{
    			std::unique_lock<std::mutex> lock(mutex_);
    			while(!stop_all_ && wait_for_running_jobs_.empty() )
    			{
    				//等待其它线程提交任务
    				sync_.wait(lock);
    			}
    
    			if( stop_all_ )
    			{
    				return;
    			}
    			while(!stop_all_ && idle_tasks_.empty())
    			{
    				//有任务要运行,可是没有空暇线程,等待其它任务运行完毕。

    sync_.wait(lock); } if( stop_all_ ) { return; } //有任务。也有空暇线程了 auto t = get_task(); auto job =wait_for_running_jobs_.front(); wait_for_running_jobs_.pop(); //分发任务到task 线程. t->set_job(std::get<0>(job), std::get<1>(job), std::get<2>(job)); } } void stop_all() { std::lock_guard<std::mutex> lock(mutex_); stop_all_ = true; for( auto t : idle_tasks_ ) { t->stop(); } idle_tasks_.clear(); for( auto t : busying_tasks_ ) { t->stop(); } while(!wait_for_running_jobs_.empty()){ wait_for_running_jobs_.pop(); } sync_.notify_one(); } void start() {// 初始化启动线程池主线程 try { std::thread t( [this]{ execute_job();}); t.detach(); stop_all_ = false; allocate_tasks(); }catch( std::exception & e ) { LOG(FATAL) << "start tasks pool ... error"<<e.what(); } } protected: std::tr1::shared_ptr<task> get_task() { //获取task对象 if( ! idle_tasks_.empty() ) { auto t = *idle_tasks_.begin(); idle_tasks_.pop_front(); //从空暇队列移除 busying_tasks_.insert(t); //增加忙队列 return t; } return std::tr1::shared_ptr<task>(); } void allocate_tasks() //初始化线程池 { for( int i = 0 ; i < max_size_; i ++ ) { std::tr1::shared_ptr<task> t( new task()); try{ t->start(); idle_tasks_.push_back(t); }catch( std::exception & e) { //超过进程最大线程数限制时,会跑出异常。

    。。 break; } } } private : unsigned int max_size_; std::list < std::tr1::shared_ptr<task> > idle_tasks_; //空暇任务队列 std::set < std::tr1::shared_ptr<task> > busying_tasks_;//正在运行任务的队列 std::queue< std::tuple< std::function<void()> , std::string, int > > wait_for_running_jobs_; //等待运行的任务 std::mutex mutex_; std::condition_variable sync_; bool stop_all_; };


    • usage
    static task_pool *  tasks = nullptr;
    static std::once_flag init_flag;
    static std::once_flag finit_flag;
    
    void run_job(const std::function<void()> & job , const std::string &  file, int line )
    {
    	if( tasks != nullptr)
    		tasks->submit_job(job, file,line);
    
    }
    void task_pool_init( unsigned max_task_size)
    {
    	std::call_once(init_flag,[max_task_size]{
    		tasks = new task_pool(max_task_size);
    		tasks->start();
    	});
    }
    void task_pool_finit()
    {
       std::call_once(finit_flag,[]{ tasks->stop_all();});
    }
    


  • 相关阅读:
    MSSQL ADO.NET
    MSSQL 详解SQL Server连接(内连接、外连接、交叉连接)
    MSSQL DBOtherSQL
    java8时间转换成字符串
    泛型
    给定一个排序数组,你需要在 原地 删除重复出现的元素,使得每个元素只出现一次,返回移除后数组的新长度。
    利用栈判断有效的括号
    回文数
    service层对 @NotBlank注解起作用
    集合的使用
  • 原文地址:https://www.cnblogs.com/mengfanrong/p/4599991.html
Copyright © 2011-2022 走看看