zoukankan      html  css  js  c++  java
  • Rust 并发编程

    https://www.jianshu.com/p/f4d853c0ef1e

    在并发编程领域,一个非常让程序员兴奋,感到有成就感的事情就是做性能优化,譬如发现某个线程成为了单点瓶颈,然后上多线程。

    提到了上多线程,那自然就会引入 thread pool,也就是我们通常说的线程池,我们会将任务扔给线程池,然后线程池里面自己会负责将任务派发到不同的线程去执行,除开任务自身执行的开销,如何高效的派发也会决定一个线程池是否有足够好的性能。下面,我们就来聊聊几种常见的线程池的实现。

    Mutex + channel

    在 Rust 里面,我们可以通过标准库提供的 channel 进行通讯,但 channel 其实是一个 multi-producer,single-consumer 的结构,也就是我们俗称的 MPSC。但对于线程池来说,我们需要的是一个 MPMC 的 channel,也就是说,我们需要有一个队列,这个队列可以支持多个线程同时添加,同时获取任务。

    虽然单独的 channel 没法支持,但如果我们给 channel 的 Receiver 套上一个 Mutex,在加上 Arc,其实就可以了。通过 Mutex 我们能保证多个线程同时只能有一个线程抢到 lock,然后从队列里面拿到数据。而加上 Arc 主要是能在多个线程共享了,这里就不说明了。

    所以实现也就比较简单了,如下:

    pub struct ThreadPool {
        tx: Option<Sender<Task>>,
        handlers: Option<Vec<thread::JoinHandle<()>>>,
    }
    impl ThreadPool {
        pub fn new(number: usize) -> ThreadPool {
            let (tx, rx) = channel::<Task>();
            let mut handlers = vec![];
    
            let arx = Arc::new(Mutex::new(rx));
            for _ in 0..number {
                let arx = arx.clone();
                let handle = thread::spawn(move || {
                    while let Ok(task) = arx.lock().unwrap().recv() {
                        task.call_box();
                    }
                });
    
                handlers.push(handle);
            }
    
            ThreadPool {
                tx: Some(tx),
                handlers: Some(handlers),
            }
        }
    }
    

    Task 其实就是一个 FnBox,因为只有 nightly 版本支持 FnBox,所以我们自定义了一下

    pub trait FnBox {
        fn call_box(self: Box<Self>);
    }
    
    impl<F: FnOnce()> FnBox for F {
        fn call_box(self: Box<F>) {
            (*self)()
        }
    }
    
    pub type Task = Box<FnBox + Send>;
    

    上面的代码逻辑非常的简单,创建一个 channel,然后使用 Arc + Mutex 包上 Receiver,创建多个线程,每个线程尝试去获取 channel 任务然后执行,如果 channel 里面没任务,recv 哪里就会等着,而其他的线程这时候因为没法拿到 lock 也会等着。

    Condition Variable

    抛开 channel,我们还有一种更通用的做法,可以用在不同的语言,譬如 C 上面,也就是使用 condition variable。关于 condition variable 的使用,大家可以 Google,因为在使用 condition variable 的时候,都会配套有一个 Mutex,所以我们可以通过这个 Mutex 同时控制 condition variable 以及任务队列。

    首先我们定义一个 State,用来处理任务队列

    struct State {
        queue: VecDeque<Task>,
        stopped: bool,
    }
    

    对于不同线程获取任务,我们可以通过

    fn next_task(notifer: &Arc<(Mutex<State>, Condvar)>) -> Option<Task> {
        let &(ref lock, ref cvar) = &**notifer;
        let mut state = lock.lock().unwrap();
        loop {
            if state.stopped {
                return None;
            }
            match state.queue.pop_front() {
                Some(t) => {
                    return Some(t);
                }
                None => {
                    state = cvar.wait(state).unwrap();
                }
            }
        }
    }
    

    首先就是尝试用 Mutex 拿到 State,如果外面没有结束,那么就尝试从队列里面获取任务,如果没有,就调用 Condition Variable 的 wait 进行等待了。

    任务的添加也比较简单

    let &(ref lock, ref cvar) = &*self.notifer;
    {
        let mut state = lock.lock().unwrap();
        state.queue.push_back(task);
        cvar.notify_one();
    }
    

    也是通过 lock 拿到 State,然后放到队列里面,在通知 Condition Variable。对于线程池的创建,也是比较容易的:

    let s = State {
        queue: VecDeque::with_capacity(1024),
        stopped: false,
    };
    let notifer = Arc::new((Mutex::new(s), Condvar::new()));
    for _ in 0..number {
        let notifer = notifer.clone();
        let handle = thread::spawn(move || {
            while let Some(task) = next_task(&notifer) {
                task.call_box();
            }
        });
    
        handlers.push(handle);
    }
    

    Crossbeam

    上面提到的两种做法,虽然都非常的通用,但有一个明显的问题,就在于他是有全局 lock 的,在并发系统里面,lock 如果使用不当,会造成非常严重的性能开销,尤其是在出现 contention 的时候,所以多数时候,我们希望使用的是一个 lock-free 的数据结构。

    幸运的是,在 Rust 里面,已经有一个非常稳定的库来提供相关的支持了,这个就是 crossbeam,关于 crossbeam 的相关知识,后面可以再开一篇文章来详细说明,这里我们直接使用 crossbeam 的 channel,不同于标准库的 channel,crossbeam 的 channel 是一个 MPMC 的实现,所以我们能非常方便的用到线程池上面,简单代码如下:

    let (tx, rx) = channel::unbounded::<Task>();
    let mut handlers = vec![];
    
    for _ in 0..number {
        let rx = rx.clone();
        let handle = thread::spawn(move || {
            while let Some(task) = rx.recv() {
                task.call_box();
            }
        });
    
        handlers.push(handle);
    }
    

    可以看到,crossbeam 的 channel 使用比标准库的更简单,它甚至不需要 Arc 来包一层,而且还是 lock-free 的。

    参考这个 benchmark,分别对不同的 ThreadPool 进行测试,在我的机器上面会发现 crossbeam 的性能会明显好很多,标准库 channel 其次,最后才是 condition variable。

    test thread_pool::benchmark_condvar_thread_pool           ... bench: 128,924,340 ns/iter (+/- 39,853,735)
    test thread_pool::benchmark_crossbeam_channel_thread_pool ... bench:   1,497,272 ns/iter (+/- 355,120)
    test thread_pool::benchmark_std_channel_thread_pool       ... bench:  50,925,087 ns/iter (+/- 6,753,377)
    

    Channel Per-thread

    可以看到,使用 crossbeam 的效果已经非常好了,但这种实现其实还有一个问题,主要在于它有一个全局的队列,当并发严重的时候,多个线程对这个全局队列的争抢,可能成为瓶颈。另外,还有一个问题在于,它的派发机制是任意的,也就是那个线程抢到了任务就执行,在某些时候,我们希望一些任务其实是在某个线程上面执行的,这样对于 CPU 的 cache 来说会更加友好,譬如有一个任务在执行的时候,又会产生一个后续任务,自然,我们希望这个后续任务在同一个线程执行。

    为了解决上面的问题,最直观的做法就是每个线程一个队列,这样我们就能够显示的控制任务派发了。一个非常简单的例子

    let mut handlers = vec![];
    let mut txs = vec![];
    
    for _ in 0..number {
        let (tx, rx) = channel::unbounded::<Task>();
        let handle = thread::spawn(move || {
            while let Some(task) = rx.recv() {
                task.call_box();
            }
        });
    
        txs.push(tx);
        handlers.push(handle);
    }
    

    上面我们为每个线程创建了一个 channel,这样每个线程就不用去争抢全局的 channel 了。

    派发的时候我们也可以手动派发,譬如根据某个 ID hash 到一个对应的 thread 上面,通过 Sender 发送 消息。

    Work Stealing

    虽然每个线程一个 channel 解决了全局争抢问题,也提升了 CPU cache 的使用,但它引入了另一个问题,就是任务的不均衡。直观的来说,就是会导致某些线程一直忙碌,在不断的处理任务,而另一些线程则没有任务处理,一直很闲。为了解决这个问题,就有了 Work Stealing 的线程池。

    Work Stealing 的原理其实很简单,当一个线程执行完自己线程队列里面的所有任务之后,它会尝试去其它线程的队列里面偷一点任务执行。

    因为 Work Stealing 的实现过于复杂,这里就不描述了,Rust 的 tokio 库提供了一个 tokio-threadpool,就是基于 Work Stealing 来做的,不过现在只提供了 Future 的支持。

    小结

    上面简单的列举了一些线程池的实现方式,如果你只是单纯的想用一个比较简单的派发功能,基于 crossbeam 的就可以了,复杂一点的可以使用 Work Stealing 的。当然,这里只是大概列举了一些,如果有更好的实现,麻烦跟我联系讨论,我的邮箱 tl@pingcap.com



    作者:siddontang
    链接:https://www.jianshu.com/p/f4d853c0ef1e
    来源:简书
    著作权归作者所有。商业转载请联系作者获得授权,非商业转载请注明出处。
  • 相关阅读:
    索引法则--少用OR,它在连接时会索引失效
    索引法则--LIKE以%开头会导致索引失效进而转向全表扫描(使用覆盖索引解决)
    索引法则--字符串不加单引号会导致索引失效
    索引法则--IS NULL, IS NOT NULL 也无法使用索引
    tomcat管理模块报401 Unauthorized
    MySQL报Too many connections
    JDBC连接MySql例子
    linux安装jdk并设置环境变量(看这一篇文章即可)
    深度解析Java可变参数类型以及与数组的区别
    MySQL真正的UTF-8字符集utf8mb4
  • 原文地址:https://www.cnblogs.com/dhcn/p/14255799.html
Copyright © 2011-2022 走看看