zoukankan      html  css  js  c++  java
  • 线程管理

    创建线程

    每个程序至少有一个执行 main 函数的线程,其他线程也有自己的入口函数,两者会同时运行。

    #include <thread>
    #include <iostream>
    
    void f() {
    	std::cout << "hello world"<<std::endl;
    }
    
    int main()
    {
    	std::thread t(f);
    	t.join();
    }
    
    • 将函数添加为 std::thread 的参数即可启动线程。

    std::thread 的参数可以是任何 callable 类型:

    lambda 表达式作为线程入口函数

    int main()
    {
    	std::thread t([] {std::cout << "hello world" << std::endl; });
    	t.join();
    }
    

    函数对象作为线程入口函数

    struct A {
    	//@ 重载 operator
    	void operator()() const { std::cout << "hello world" << std::endl; }
    };
    
    int main()
    {
    	A a;
    	std::thread t(a);
    	t.join();
    }
    

    注意,这里应该避免 most vexing parse 的问题:

    std::thread t(A());	//@ A() 会被视为函数
    	
    std::thread t{A()};		//OK,统一初始化方法
    std::thread t((A()));	//OK,多组括号
    

    join 和 detach

    启动线程后在线程销毁前要对其调用 joindetach,否则 std::thread 的析构函数会调用 std::terminate 终止程序。

    detach

    • detach 是让目标线程成为守护线程(daemon threads)。
    • 一旦 detach ,目标线程将独立执行,即便其对应的 thread 对象销毁也不影响线程的执行。
    • 一旦 detach ,主调线程无法再取得该被调线程的控制权。这个子线程将被 C++ 运行时库接管,当该线程执行结束的时候,由 C++ 运行时库负责回收该线程的资源。
    struct A {
    	int& i;
    	A(int& x) :i(x) {}
    	void operator()()const {
    		for (int i = 0; i < 100000; i++) {
    			//@ dosomething(i);
    		}
    	}
    };
    
    void f()
    {
    	int x = 0;
    	A a(x);
    	std::thread t(a);
    	t.detach();	//@ 不等待t结束
    	//@ 函数运行结束后线程t可能还在执行,i是x的引用,但是此时x被销毁了
    }
    
    int main()
    {	
    	std::thread t(f);
    	t.join();
    }
    

    join

    • join 之后,当前线程会一直阻塞,直到目标线程执行完成。
    void f()
    {
    	int x = 0;
    	A a(x);
    	std::thread t(a);
    	t.join();	//@ 等待t结束
    }
    
    • join 之后,当子线程执行结束,主调线程将回收子调线程资源,并继续运行。
    • 如果目标线程的任务非常耗时,就要考虑好是否需要在主线程上等待它了,因此这很可能会导致主线程卡住。
    • 如果线程运行过程中发生异常,之后调用的 join 会被忽略,为此需要捕获异常并在处理异常时调用 join
    void f()
    {
        int x = 0;
        A a(x);
        std::thread t(a);
        try
        {
            doSomethingHere();
        }
        catch(...)
        {
            t.join();
            throw;
        }
        t.join();
    }
    

    joinable

    • joinable 可以用来判断这个线程当前是否可以被 join
    • join 之后不能再被重复 join,反复 join 将出错。
    • detach 之后不能再进行 join,因为此时线程已经分离出去了,如果 detach 之后再 join 将出错。

    thread_guard 类

    针对线程运行过程中可能抛出异常的情况,更简洁的方法是使用 RAII 类来管理 std::thread :

    class thread_guard {
    	std::thread& t;
    public:
    	explicit thread_guard(std::thread& x) :t(x) {}
    	~thread_guard() { if (t.joinable()) t.join(); }
    	thread_guard(const std::thread&) = delete;
    	thread_guard& operator=(const std::thread&) = delete;
    };
    
    struct A {
    	int& i;
    	A(int& x) :i(x) {}
    	void operator()()const {
    		for (int i = 0; i < 100000; i++) {
    			//@ dosomething(i);
    		}
    	}
    };
    
    void f()
    {
    	int x = 0;
    	A a(x);
    	std::thread t(a);
    	thread_guard g(t);
    	//@ doSomethingHere();
    	//@ 局部对象逆序销毁,优先销毁thread_guard对象,从而调用t.join()
    }
    

    为线程传递函数

    void f(int i)
    {
    	std::cout << i << std::endl;
    }
    
    int main()
    {
    	std::thread t(f, 42);
    	t.join();
    }
    

    为线程传递引用

    std::thread 会无视参数的引用类型,因此传入引用类型时需要使用 std::ref

    void f(int& i)
    {
    	++i;
    }
    
    int main()
    {
    	int i = 0;
    	std::thread t(f, std::ref(i));
    	t.join();
    
    	std::cout << i << std::endl;
    }
    

    为线程传递成员函数

    class A {
    public:
    	void f(int i) { std::cout << i << std::endl; }
    };
    
    
    int main()
    {
    	A a;
    	std::thread t(&A::f,&a,42); //@ 第一个参数为成员函数地址,第二个参数为实例地址
    	t.join();
    }
    

    为线程传递 move-only 对象

    如果参数是 move-only 对象则需要使用 std::move

    void f(std::unique_ptr<int> p)
    {
    	std::cout << *p << std::endl;
    }
    
    int main()
    {
    	std::unique_ptr<int> p(new int(42));
    	std::thread t(f,std::move(p)); 
    	t.join();
    }
    

    转移线程所有权

    void f();
    void g();
    
    std::thread t1(f);
    std::thread t2 = std::move(t1);	//@ t1所有权给t2,t2关联执行f的线程
    t1 = std::thread(g);	//@ t1 重新关联一个执行 g  的线程
    
    std::thread t3;
    t3 = std::move(t2);	//@ t3 关联 t2 的线程,t2 无关联
    t1 = std::move(t3); //@ t1 已有关联的 g 线程,调用 std::terminate 终止程序
    

    线程所有权可以转移到函数外

    void f(int i) { std::cout << i << std::endl; }
    std::thread g()
    {
    	return std::thread(f, 42);
    }
    
    int main()
    {
    	std::thread t{g()};
    	t.join();
    }
    

    std::thread 作为参数

    void f(std::thread t);
    
    void g()
    {
    	f(std::thread(someFunction));
    	std::thread t(someFunction);
    	f(std::move(t));
    }
    

    添加了析构行为的 joining_thread

    class scoped_thread
    {
    	std::thread t;
    public:
    	explicit scoped_thread(std::thread t_) :
    		t(std::move(t_))
    	{
    		if (!t.joinable()) {
    			throw std::logic_error("No threds");
    		}
    	}
    
    	~scoped_thread()
    	{
    		t.join();
    	}
    
    	scoped_thread(scoped_thread const&) = delete;
    	scoped_thread& operator=(scoped_thread const&) = delete;
    };
    
    struct func
    {
    	int& i;
    	func(int& i_) : i(i_) {}
    	void operator() ()
    	{
    		for (unsigned j = 0; j<1000000; ++j)
    		{
    			//@ do_something(i);  
    		}
    	}
    };
    
    int main()
    {
    	int some_local_state;
    	scoped_thread t(std::thread(func(some_local_state)));
    	//@ do_someing_in_current_thread();
    }
    

    std::thread 对象的容器,如果这个容器是移动敏感的 (比如,标准中的 std::vector<>) ,那么移动操作同样适用于这些容器。量产线程,等待它们结束:

    void do_work(unsigned id) { std::cout << id << std::endl; }
    
    void f()
    {
    	std::vector<std::thread> threads;
    	for (unsigned i = 0; i < 20; ++i){
    		threads.push_back(std::thread(do_work, i)); //@ 产生线程
    	}
    	std::for_each(threads.begin(), threads.end(),
    		std::mem_fn(&std::thread::join)); //@ 对每个线程调用join()
    }
    

    运行时决定线程数量

    std::thread::hardware_concurrency 在新版 C++ 标准库中是一个很有用的函数。这个函数将返回能同时并发在一个程序中的线程数量。例如,多核系统中,返回值可以是 CPU 核芯的数量。返回值也仅仅是一个提示,当系统信息无法获取时,函数也会返回0。

    template<typename Iterator,typename T>
    struct accumulate_block
    {
    	void operator()(Iterator first, Iteratorlast, T& result) {
    		result = std::accumulate(first,last,result);
    	}
    };
    
    template<typename Iterator,typename T>
    T parallel_accumulate(Iterator first, Iterator last,T init)
    {
    	unsigned long const lengh = std::distance(first,last);
    
    	if (!length) return init;
    
    	unsigned long const min_per_threads = 25;
    	unsigned long const max_threads = 
    		(lengh+min_per_thread-1)/min_per_thread;
    
       //@ 计算量的最大值和硬件支持线程数中,较小的值为启动线程的数量
       //@ 因为上下文频繁的切换会降低线程的性能,所以你肯定不想启动的线程数多于硬件支持的线程数量。
    	unsigned long const hardware_threds =
    		std::thread::hardware_concurrency();
    
    	unsigned long const num_threds =
    		std::min(hardware_threds != 0 ? hardware_threds : 2,max_threads); 
    
    	//@ 每个线程中处理的元素数量,是范围中元素的总量除以线程的个数得到的
    	unsigned long const block_size = lengh / num_threds;
    
    	std::vector<T> results(num_threds);
    	//@ 启动的线程数必须比num_threads少1个,因为在启动之前已经有了一个线程(主线程)。
    	std::vector<std::thread> threads(num_threds - 1);
    
    	Iterator block_start = first;
    	for (unsigned long i = 0; i < num_threds; ++i)
    	{
    		Iterator block_end = block_start;
    		std::advance(block_end, block_size);
    		thread[i] = std::thread(
    			accumulate_block<Iterator, T>(),
    			block_start, block_end, std::ref(results[i]));
    		block_start = block_end;
    	}
    
    	accumulate_block<Iterator, T>()(
    		block_start,last,results[num_threds-1]);
    	std::for_each(threads.begin(), threads.end(),
    		std::mem_fn(&std::thread::join));
    
    	return std::accumulate(results.begin(), results.end(),init);
    }
    

    识别线程

    线程标识类型是 std::thread::id,可以通过两种方式进行检索。

    std::thread::id 对象可以自由的拷贝和对比,因为标识符就可以复用。

    • 如果两个对象的 std::thread::id 相等,那它们就是同一个线程,或者都“没有线程”。
    • 如果不等,那么就代表了两个不同线程,或者一个有线程,另一没有。
    std::thread::id master_thread;
    void some_core_part_of_algorithm()
    {
      if(std::this_thread::get_id()==master_thread)
      {
        do_master_thread_work();
      }
      do_common_work();
    }
    

    暂停线程执行

    • yield:让出处理器,重新调度各执行线程。通常用在自己的主要任务已经完成的时候,此时希望让出处理器给其他任务使用。
    • sleep_for:使当前线程的执行停止指定的时间段。
    • sleep_until:使当前线程的执行停止直到指定的时间点。
    #include <iostream>
    #include <thread>		
    #include <chrono>
    #include <sstream>
    #include <ctime> 
    #include <iomanip>
    
    void print_time()
    {
    	using std::chrono::system_clock;
    	auto in_time_t = system_clock::to_time_t(system_clock::now());
    
    	std::stringstream ss;
    	ss << std::put_time(localtime(&in_time_t), "%Y-%m-%d %X");
    	std::cout << "now is: " << ss.str() << std::endl;
    }
    
    void sleep_thread()
    {
    	std::this_thread::sleep_for(std::chrono::seconds(3)); //@ 暂停执行3s
    	std::cout << "[this thread: " << std::this_thread::get_id() <<
    		" ] is waking up:";
    	print_time();
    	std::cout << std::endl;
    }
    
    void wait_until_thread()
    {
    	using std::chrono::system_clock;
    	time_t time = system_clock::to_time_t(system_clock::now());
    	struct std::tm *ptm = std::localtime(&time);
    	ptm->tm_sec += 10;	//@ 休眠到当前时间点后的 10 s
    	std::this_thread::sleep_until(system_clock::from_time_t(mktime(ptm)));
    	std::cout << "[this thread: " << std::this_thread::get_id() <<
    		" ] waiting until: ";
    	print_time();
    	std::cout << std::endl;
    }
    
    void loop_thread()
    {
    	for (int i = 0; i < 10; i++) {
    		std::cout << "[this thread: " << std::this_thread::get_id() <<
    			" ] print: " << i << std::endl;
    	}
    }
    
    int main()
    {
    	print_time();
    
    	std::thread t1(sleep_thread);
    	std::thread t2(loop_thread);
    	std::thread t3(wait_until_thread);
    
    	t1.join();
    	t2.join();
    	t3.join();
    
    	return 0;
    }
    
  • 相关阅读:
    交易盈利核心
    tbquant 两个画线函数的说明
    胜率40% 盈亏2:1 交易策略源码
    Apache是如何运作的
    JSON_UNESCAPED_UNICODE的作用与理解
    Python的装饰器是什么?
    Python中的浅拷贝、深拷贝和赋值之间有什么区别?
    Python面试题——基础篇
    GD32F30x_ADC电压采集(规则并行+DMA方式)
    GD32F30x_定时器输出比较模式输出方波(DMA方式)
  • 原文地址:https://www.cnblogs.com/xiaojianliu/p/12632306.html
Copyright © 2011-2022 走看看