zoukankan      html  css  js  c++  java
  • Rust 并发编程

    https://www.jianshu.com/p/f4d853c0ef1e

    在并发编程领域,一个非常让程序员兴奋,感到有成就感的事情就是做性能优化,譬如发现某个线程成为了单点瓶颈,然后上多线程。

    提到了上多线程,那自然就会引入 thread pool,也就是我们通常说的线程池,我们会将任务扔给线程池,然后线程池里面自己会负责将任务派发到不同的线程去执行,除开任务自身执行的开销,如何高效的派发也会决定一个线程池是否有足够好的性能。下面,我们就来聊聊几种常见的线程池的实现。

    Mutex + channel

    在 Rust 里面,我们可以通过标准库提供的 channel 进行通讯,但 channel 其实是一个 multi-producer,single-consumer 的结构,也就是我们俗称的 MPSC。但对于线程池来说,我们需要的是一个 MPMC 的 channel,也就是说,我们需要有一个队列,这个队列可以支持多个线程同时添加,同时获取任务。

    虽然单独的 channel 没法支持,但如果我们给 channel 的 Receiver 套上一个 Mutex,在加上 Arc,其实就可以了。通过 Mutex 我们能保证多个线程同时只能有一个线程抢到 lock,然后从队列里面拿到数据。而加上 Arc 主要是能在多个线程共享了,这里就不说明了。

    所以实现也就比较简单了,如下:

    pub struct ThreadPool {
        tx: Option<Sender<Task>>,
        handlers: Option<Vec<thread::JoinHandle<()>>>,
    }
    impl ThreadPool {
        pub fn new(number: usize) -> ThreadPool {
            let (tx, rx) = channel::<Task>();
            let mut handlers = vec![];
    
            let arx = Arc::new(Mutex::new(rx));
            for _ in 0..number {
                let arx = arx.clone();
                let handle = thread::spawn(move || {
                    while let Ok(task) = arx.lock().unwrap().recv() {
                        task.call_box();
                    }
                });
    
                handlers.push(handle);
            }
    
            ThreadPool {
                tx: Some(tx),
                handlers: Some(handlers),
            }
        }
    }
    

    Task 其实就是一个 FnBox,因为只有 nightly 版本支持 FnBox,所以我们自定义了一下

    pub trait FnBox {
        fn call_box(self: Box<Self>);
    }
    
    impl<F: FnOnce()> FnBox for F {
        fn call_box(self: Box<F>) {
            (*self)()
        }
    }
    
    pub type Task = Box<FnBox + Send>;
    

    上面的代码逻辑非常的简单,创建一个 channel,然后使用 Arc + Mutex 包上 Receiver,创建多个线程,每个线程尝试去获取 channel 任务然后执行,如果 channel 里面没任务,recv 哪里就会等着,而其他的线程这时候因为没法拿到 lock 也会等着。

    Condition Variable

    抛开 channel,我们还有一种更通用的做法,可以用在不同的语言,譬如 C 上面,也就是使用 condition variable。关于 condition variable 的使用,大家可以 Google,因为在使用 condition variable 的时候,都会配套有一个 Mutex,所以我们可以通过这个 Mutex 同时控制 condition variable 以及任务队列。

    首先我们定义一个 State,用来处理任务队列

    struct State {
        queue: VecDeque<Task>,
        stopped: bool,
    }
    

    对于不同线程获取任务,我们可以通过

    fn next_task(notifer: &Arc<(Mutex<State>, Condvar)>) -> Option<Task> {
        let &(ref lock, ref cvar) = &**notifer;
        let mut state = lock.lock().unwrap();
        loop {
            if state.stopped {
                return None;
            }
            match state.queue.pop_front() {
                Some(t) => {
                    return Some(t);
                }
                None => {
                    state = cvar.wait(state).unwrap();
                }
            }
        }
    }
    

    首先就是尝试用 Mutex 拿到 State,如果外面没有结束,那么就尝试从队列里面获取任务,如果没有,就调用 Condition Variable 的 wait 进行等待了。

    任务的添加也比较简单

    let &(ref lock, ref cvar) = &*self.notifer;
    {
        let mut state = lock.lock().unwrap();
        state.queue.push_back(task);
        cvar.notify_one();
    }
    

    也是通过 lock 拿到 State,然后放到队列里面,在通知 Condition Variable。对于线程池的创建,也是比较容易的:

    let s = State {
        queue: VecDeque::with_capacity(1024),
        stopped: false,
    };
    let notifer = Arc::new((Mutex::new(s), Condvar::new()));
    for _ in 0..number {
        let notifer = notifer.clone();
        let handle = thread::spawn(move || {
            while let Some(task) = next_task(&notifer) {
                task.call_box();
            }
        });
    
        handlers.push(handle);
    }
    

    Crossbeam

    上面提到的两种做法,虽然都非常的通用,但有一个明显的问题,就在于他是有全局 lock 的,在并发系统里面,lock 如果使用不当,会造成非常严重的性能开销,尤其是在出现 contention 的时候,所以多数时候,我们希望使用的是一个 lock-free 的数据结构。

    幸运的是,在 Rust 里面,已经有一个非常稳定的库来提供相关的支持了,这个就是 crossbeam,关于 crossbeam 的相关知识,后面可以再开一篇文章来详细说明,这里我们直接使用 crossbeam 的 channel,不同于标准库的 channel,crossbeam 的 channel 是一个 MPMC 的实现,所以我们能非常方便的用到线程池上面,简单代码如下:

    let (tx, rx) = channel::unbounded::<Task>();
    let mut handlers = vec![];
    
    for _ in 0..number {
        let rx = rx.clone();
        let handle = thread::spawn(move || {
            while let Some(task) = rx.recv() {
                task.call_box();
            }
        });
    
        handlers.push(handle);
    }
    

    可以看到,crossbeam 的 channel 使用比标准库的更简单,它甚至不需要 Arc 来包一层,而且还是 lock-free 的。

    参考这个 benchmark,分别对不同的 ThreadPool 进行测试,在我的机器上面会发现 crossbeam 的性能会明显好很多,标准库 channel 其次,最后才是 condition variable。

    test thread_pool::benchmark_condvar_thread_pool           ... bench: 128,924,340 ns/iter (+/- 39,853,735)
    test thread_pool::benchmark_crossbeam_channel_thread_pool ... bench:   1,497,272 ns/iter (+/- 355,120)
    test thread_pool::benchmark_std_channel_thread_pool       ... bench:  50,925,087 ns/iter (+/- 6,753,377)
    

    Channel Per-thread

    可以看到,使用 crossbeam 的效果已经非常好了,但这种实现其实还有一个问题,主要在于它有一个全局的队列,当并发严重的时候,多个线程对这个全局队列的争抢,可能成为瓶颈。另外,还有一个问题在于,它的派发机制是任意的,也就是那个线程抢到了任务就执行,在某些时候,我们希望一些任务其实是在某个线程上面执行的,这样对于 CPU 的 cache 来说会更加友好,譬如有一个任务在执行的时候,又会产生一个后续任务,自然,我们希望这个后续任务在同一个线程执行。

    为了解决上面的问题,最直观的做法就是每个线程一个队列,这样我们就能够显示的控制任务派发了。一个非常简单的例子

    let mut handlers = vec![];
    let mut txs = vec![];
    
    for _ in 0..number {
        let (tx, rx) = channel::unbounded::<Task>();
        let handle = thread::spawn(move || {
            while let Some(task) = rx.recv() {
                task.call_box();
            }
        });
    
        txs.push(tx);
        handlers.push(handle);
    }
    

    上面我们为每个线程创建了一个 channel,这样每个线程就不用去争抢全局的 channel 了。

    派发的时候我们也可以手动派发,譬如根据某个 ID hash 到一个对应的 thread 上面,通过 Sender 发送 消息。

    Work Stealing

    虽然每个线程一个 channel 解决了全局争抢问题,也提升了 CPU cache 的使用,但它引入了另一个问题,就是任务的不均衡。直观的来说,就是会导致某些线程一直忙碌,在不断的处理任务,而另一些线程则没有任务处理,一直很闲。为了解决这个问题,就有了 Work Stealing 的线程池。

    Work Stealing 的原理其实很简单,当一个线程执行完自己线程队列里面的所有任务之后,它会尝试去其它线程的队列里面偷一点任务执行。

    因为 Work Stealing 的实现过于复杂,这里就不描述了,Rust 的 tokio 库提供了一个 tokio-threadpool,就是基于 Work Stealing 来做的,不过现在只提供了 Future 的支持。

    小结

    上面简单的列举了一些线程池的实现方式,如果你只是单纯的想用一个比较简单的派发功能,基于 crossbeam 的就可以了,复杂一点的可以使用 Work Stealing 的。当然,这里只是大概列举了一些,如果有更好的实现,麻烦跟我联系讨论,我的邮箱 tl@pingcap.com



    作者:siddontang
    链接:https://www.jianshu.com/p/f4d853c0ef1e
    来源:简书
    著作权归作者所有。商业转载请联系作者获得授权,非商业转载请注明出处。
  • 相关阅读:
    hadoop中namenode发生故障的处理方法
    开启虚拟机所报的错误:VMware Workstation cannot connect to the virtual machine. Make sure you have rights to run the program, access all directories the program uses, and access all directories for temporary fil
    Hbase的安装与部署(集群版)
    分别用反射、编程接口的方式创建DataFrame
    用Mapreduce求共同好友
    SparkSteaming中直连与receiver两种方式的区别
    privot函数使用
    Ajax无刷新显示
    使用ScriptManager服务器控件前后台数据交互
    数据库知识
  • 原文地址:https://www.cnblogs.com/dhcn/p/14255799.html
Copyright © 2011-2022 走看看