zoukankan      html  css  js  c++  java
  • [易学易懂系列|rustlang语言|零基础|快速入门|(19)|多线程]

    [易学易懂系列|rustlang语言|零基础|快速入门|(19)|多线程]

    实用知识

    多线程

    我们今天来讲讲Rust中的多线程。

    我直接来看看代码:

    use std::thread;
    use std::time::Duration;
    
    fn main() {
        thread::spawn(|| {
            for i in 1..10 {
                println!("hi number {} from the spawned thread!", i);
                thread::sleep(Duration::from_millis(1));
            }
        });
    
        for i in 1..5 {
            println!("hi number {} from the main thread!", i);
            thread::sleep(Duration::from_millis(1));
        }
    }
    

    运行结果,如下:

    hi number 1 from the main thread!
    hi number 1 from the spawned thread!
    hi number 2 from the main thread!
    hi number 2 from the spawned thread!
    hi number 3 from the spawned thread!
    hi number 3 from the main thread!
    hi number 4 from the main thread!
    hi number 4 from the spawned thread!
    

    我们先来看看,线程定义的方法:

    thread::spawn(|| {
            for i in 1..10 {
                println!("hi number {} from the spawned thread!", i);
                thread::sleep(Duration::from_millis(1));
            }
        });
    

    在Rust中,创建新线程,用thread::spawn函数(这个函数来看std),这个函数传递一个闭包,闭包里包含线程运行代码:

    {
            for i in 1..10 {
                println!("hi number {} from the spawned thread!", i);
                thread::sleep(Duration::from_millis(1));
            }
        }
    

    这里特别说明一下,在Rust,线程模型跟操作系统的线程模型,是1:1的关系

    我们再来看看运行结果。

    我们发现主线程,main thread打印的信息是从1到4,是完整的。

    而自定义的线程spawned thread,也是只打印到4,也许你们的电脑可能打印到5(并没有打印1到10)。

    结果不尽相同。

    但这里说明一个问题,主线程结束后自定义的线程spawned thread,也立马关闭了。

    那如何可以做到让自定义的线程spawned thread打印从1到10的结果呢?

    用JoinHandle。

    JoinHandle是thread::spawn函数的返回类型,它是有个方法join,可以让主线程等待子线程完成所有工作。

    请看代码:

    use std::thread;
    use std::time::Duration;
    
    fn main() {
        let handle = thread::spawn(|| {
            for i in 1..10 {
                println!("hi number {} from the spawned thread!", i);
                thread::sleep(Duration::from_millis(1));
            }
        });
    
        for i in 1..5 {
            println!("hi number {} from the main thread!", i);
            thread::sleep(Duration::from_millis(1));
        }
    
        handle.join().unwrap();
    }
    

    这个时候 ,我们看到我们的运行结果是:

    hi number 1 from the main thread!
    hi number 2 from the main thread!
    hi number 1 from the spawned thread!
    hi number 3 from the main thread!
    hi number 2 from the spawned thread!
    hi number 4 from the main thread!
    hi number 3 from the spawned thread!
    hi number 4 from the spawned thread!
    hi number 5 from the spawned thread!
    hi number 6 from the spawned thread!
    hi number 7 from the spawned thread!
    hi number 8 from the spawned thread!
    hi number 9 from the spawned thread!
    

    可能你们的电脑打印的顺序不一样,但自定义的线程 spawned thread,所有循环都可执行完,主线程才会结束。

    如果我们把相关的代码 handle.join().unwrap();

    调整一下顺序,如下:

    use std::thread;
    use std::time::Duration;
    
    fn main() {
        let handle = thread::spawn(|| {
            for i in 1..10 {
                println!("hi number {} from the spawned thread!", i);
                thread::sleep(Duration::from_millis(1));
            }
        });
    
        handle.join().unwrap();//调整顺序到这里
    
        for i in 1..5 {
            println!("hi number {} from the main thread!", i);
            thread::sleep(Duration::from_millis(1));
        }
    }
    

    打印结果又变成这样的:

    hi number 1 from the spawned thread!
    hi number 2 from the spawned thread!
    hi number 3 from the spawned thread!
    hi number 4 from the spawned thread!
    hi number 5 from the spawned thread!
    hi number 6 from the spawned thread!
    hi number 7 from the spawned thread!
    hi number 8 from the spawned thread!
    hi number 9 from the spawned thread!
    hi number 1 from the main thread!
    hi number 2 from the main thread!
    hi number 3 from the main thread!
    hi number 4 from the main thread!
    

    我们看到主线程会等待子线程 spawned thread运行完后,再运行自己的代码。

    好的,我们知道线程的基本用法后,我们来写一个简单的程序,如下 :

    use std::thread;
    
    fn main() {
        let v = vec![1, 2, 3];
    
        let handle = thread::spawn(|| {
            println!("Here's a vector: {:?}", v);
        });
    
        handle.join().unwrap();//这代码表示主线程main,在这里等待子线程thread::spawn执行完成                             后再结束
    }
    

    编译代码,编译器女王会抛出一个错误信息给你:

    error[E0373]: closure may outlive the current function, but it borrows `v`,
    which is owned by the current function
     --> src/main.rs:6:32
      |
    6 |     let handle = thread::spawn(|| {
      |                                ^^ may outlive borrowed value `v`
    7 |         println!("Here's a vector: {:?}", v);
      |                                           - `v` is borrowed here
      |
    help: to force the closure to take ownership of `v` (and any other referenced
    variables), use the `move` keyword
      |
    6 |     let handle = thread::spawn(move || {
      |  
    

    我们从错误信息可以看到,在子线程中的代码:

    println!("Here's a vector: {:?}", v);
    

    会从主线程main中借用v的引用,但是,这里的v在主线程对数据拥有所有权,并且有可能在主线程的中把它销毁掉。这时,子线程在后台运行时,会发现v已经不存在。这就会发生灾难性后果。

    rust的编译器是不允许这样的情况出现。

    那怎么解决?

    用move。

    代码如下 :

    use std::thread;
    
    fn main() {
        let v = vec![1, 2, 3];
    
        let handle = thread::spawn(move || {
            println!("Here's a vector: {:?}", v);
        });
    
        handle.join().unwrap();
    }
    

    这时,我们再运行一下,会得出正确的结果:

    Here's a vector: [1, 2, 3]
    

    这里的move关键词,让Rust编译器女王知道,子线程中的闭包函数拥有对v中数据的所有权,所以就不会再出现灾难性的后果。

    好,现在我们再来试验一下,增加一段有趣的代码:

    use std::thread;
    
    fn main() {
        let v = vec![1, 2, 3];
    
        let handle = thread::spawn(move || {
            println!("Here's a vector: {:?}", v);
        });
    
        println!("Now  vector: {:?}", v);//增加一段代码,访问v
        handle.join().unwrap();
    }
    
    

    我们编译一下代码,编译器又会报告出一个错误:

    error[E0382]: borrow of moved value: `v`
      --> srcmain.rs:29:35
       |
    23 |     let v = vec![1, 2, 3];
       |         - move occurs because `v` has type `std::vec::Vec<i32>`, which does not implement the `Copy` trait
    24 |
    25 |     let handle = thread::spawn(move || {
       |                                ------- value moved into closure here
    26 |         println!("Here's a vector: {:?}", v);
       |                                           - variable moved due to use in closure
    ...
    29 |     println!("Now  vector: {:?}", v);
       |                                   ^ value borrowed here after move
    

    这个错误信息已经很详细,它的主要意思 就是,v绑定的数据所有权已经move给了线程代码中的闭包,你现在再借用,已经失效。因为v已经失去了数据所有权。

    好,我们现在再来看看多个线程中如何通信。

    我们了解到在golang的口号中,有一句这样的话:

    Do not communicate by sharing memory; instead, share memory by communicating.

    什么意思呢?

    就是说:线程间通信不要用共享内存的方式来通信,而是要用通信的方式来共享内存。

    简单来说,就要用actor的模式来处理线程的通信与共享数据的逻辑。

    所以,在Rust中的个关键词channel,用来定义线程间通信的管道。

    我们可以把它想象成,自来水的管道,而数据就是自来水。

    那管道有上端和下端。

    上端一般用来发送数据。

    下端一般用来接收数据。

    我们先来看看简单的代码:

    use std::sync::mpsc;
    
    fn main() {
        let (tx, rx) = mpsc::channel();
    }
    

    这段代码中,我们用元组来同时定义管道的上端tx,和下端rx。

    mpsc代表什么意思 呢?

    multiple producer, single consumer

    也就是多个生产者,单个消费者。

    知道多线程模型的同学应该很清楚。

    我们把这个多个生产者,单个消费者的模型,可以想象成三通水管,像这样:

    这里,多个生产者线程负责往管道中发送数据。

    而只有一个消费者线程负责接收数据。

    我们来看看完整代码:

    use std::thread;
    use std::sync::mpsc;
    
    fn main() {
        let (tx, rx) = mpsc::channel();
    
        thread::spawn(move || {
            let val = String::from("hi");
            tx.send(val).unwrap();
        });
    
        let received = rx.recv().unwrap();
        println!("Got: {}", received);
    }
    

    thread::spawn创建的子线程负责往管道发送数据,而主线程负责接收数据。

    打印结果为:

    Got: hi
    

    上面有段代码:

    tx.send(val).unwrap();
    

    这里有个方法:unwrap()。

    这个方法,代表什么意思呢?

    在Rust,它的含义是:如果一切正常,给我返回一个返回值;如果有错误或异常,停止执行程序

    我们再来看看,加一段有趣的代码:

    use std::thread;
    use std::sync::mpsc;
    
    fn main() {
        let (tx, rx) = mpsc::channel();
    
        thread::spawn(move || {
            let val = String::from("hi");
            tx.send(val).unwrap();
            println!("val is {}", val);//增加一行打印val值的代码
        });
    
        let received = rx.recv().unwrap();
        println!("Got: {}", received);
    }
    

    结果如何?

    报错:

    error[E0382]: use of moved value: `val`
      --> src/main.rs:10:31
       |
    9  |         tx.send(val).unwrap();
       |                 --- value moved here
    10 |         println!("val is {}", val);
       |                               ^^^ value used here after move
       |
       = note: move occurs because `val` has type `std::string::String`, which does
    not implement the `Copy` trait
    

    这个错误信息明显告诉你,val的所有权已经移动move,即val已经不再拥有数据,所以你现在打印数据,会报错。

    那要怎么处理?用如下代码:

    use std::thread;
    use std::sync::mpsc;
    
    fn main() {
        let (tx, rx) = mpsc::channel();
    
        thread::spawn(move || {
            let val = String::from("hi");
            println!("val is {}", val);//增加一行打印val值的代码
            tx.send(val).unwrap();
            
        });
    
        let received = rx.recv().unwrap();
        println!("Got: {}", received);
    }
    

    我们在tx.send方法之前处理就可以了。这时,数据所有权还没有移动。

    好理解。

    我们在看看水管的上端,不停地放水的代码:

    use std::thread;
    use std::sync::mpsc;
    use std::time::Duration;
    
    fn main() {
        let (tx, rx) = mpsc::channel();
    
        thread::spawn(move || {
            let vals = vec![
                String::from("hi"),
                String::from("from"),
                String::from("the"),
                String::from("thread"),
            ];
    
            for val in vals {
                tx.send(val).unwrap();
                thread::sleep(Duration::from_secs(1));//线程休眠1秒
            }
        });
    
        for received in rx {
            println!("Got: {}", received);
        }
    }
    

    我们看到水管的下端rx,直接可以用for来遍历。

    打印结果为:

    Got: hi
    Got: from
    Got: the
    Got: thread
    

    我们来看看多个生产者线程的情况,代码:

    use std::sync::mpsc;
    use std::thread;
    use std::time::Duration;
    fn main() {
        let (tx, rx) = mpsc::channel();
    
        let tx1 = mpsc::Sender::clone(&tx);
        thread::spawn(move || {
            let vals = vec![
                String::from("hi"),
                String::from("from"),
                String::from("the"),
                String::from("thread"),
            ];
    
            for val in vals {
                tx1.send(val).unwrap();
                thread::sleep(Duration::from_secs(1));
            }
        });
    
        thread::spawn(move || {
            let vals = vec![
                String::from("more"),
                String::from("messages"),
                String::from("for"),
                String::from("you"),
            ];
    
            for val in vals {
                tx.send(val).unwrap();
                thread::sleep(Duration::from_secs(1));
            }
        });
    
        for received in rx {
            println!("Got: {}", received);
        }
    }
    
    

    结果可能为:

    Got: hi
    Got: more
    Got: from
    Got: messages
    Got: for
    Got: the
    Got: thread
    Got: you
    

    这里说可能,是因为,每个电脑因为平台原因,打印的顺序会不一样。

    现在我们来看看用互斥锁(Mutex)实现线程间的数据共享。

    什么是互斥锁(Mutex),举个常见的例子:当我们公司开会的时候,我们一般有一个麦克风。

    拿到麦克风的同学,才能讲话。麦克风,在我们同学之前传来传去,并且一次只允许一个人拥有麦克风。

    这个麦克风,就是互斥锁:Mutex。

    我们先来看看简单的例子:

    use std::sync::Mutex;
    
    fn main() {
        let m = Mutex::new(5);
    
        {
            let mut num = m.lock().unwrap();
            *num = 6;
        }
    
        println!("m = {:?}", m);
    }
    

    我们这里直接用std库里的sync包中的Mutex。

    Mutex是智能指针。

    什么是智能指针?

    在Rust,简单来说,相对普通指针,智能指针,除了保存内存地址外,还有额外的其他属性或元数据。

    什么是普通指针?

    在Rust,普通指针,就是保存内存地址的值。

    好我们回到多线程的知识。

    现在我们用Mutex来进行两个线程共享数据,请看代码:

    use std::sync::Mutex;
    use std::thread;
    
    fn main() {
        let counter = Mutex::new(0);
        let mut handles = vec![];
    
        let handle = thread::spawn(move || {
            let mut num = counter.lock().unwrap();
    
            *num += 1;
        });
        handles.push(handle);
    
        let handle2 = thread::spawn(move || {
            let mut num2 = counter.lock().unwrap();
    
            *num2 += 1;
        });
        handles.push(handle2);
    
        for handle in handles {
            handle.join().unwrap();
        }
    
        println!("Result: {}", *counter.lock().unwrap());
    }
    

    我们直接编译,报错了:

    error[E0382]: use of moved value: `counter`
      --> main.rs:98:33
       |
    88 |     let counter = Mutex::new(0);
       |         ------- move occurs because `counter` has type `std::sync::Mutex<i32>`, which does not implement the `Copy` trait
    ...
    91 |     let handle = thread::spawn(move || {
       |                                ------- value moved into closure here
    92 |         let mut num = counter.lock().unwrap();
       |                       ------- variable moved due to use in closure
    ...
    98 |     let handle2 = thread::spawn(move || {
       |                                 ^^^^^^^ value used here after move
    99 |         let mut num2 = counter.lock().unwrap();
       |                        ------- use occurs due to use in closure
    
    error[E0382]: borrow of moved value: `counter`
       --> main.rs:109:29
        |
    88  |     let counter = Mutex::new(0);
        |         ------- move occurs because `counter` has type `std::sync::Mutex<i32>`, which does not implement the `Copy` trait
    ...
    98  |     let handle2 = thread::spawn(move || {
        |                                 ------- value moved into closure here
    99  |         let mut num2 = counter.lock().unwrap();
        |                        ------- variable moved due to use in closure
    ...
    109 |     println!("Result: {}", *counter.lock().unwrap());
        |                             ^^^^^^^ value borrowed here after move
    
    

    为什么?

    因为,第一个线程代码中,已经把counter的数据所有权移动到闭包的作用域。

    第二个线程再访问,肯定报错了。

    那怎么办呢?

    Arc,请看下面代码:

    use std::sync::{Arc, Mutex};
    use std::thread;
    
    fn main() {
        let ctr = Arc::new(Mutex::new(0));
        let mut handles = vec![];
        let counter = Arc::clone(&ctr);
        let handle = thread::spawn(move || {
            let mut num = counter.lock().unwrap();
    
            *num += 1;
        });
        handles.push(handle);
        let counter = Arc::clone(&ctr);
        let handle2 = thread::spawn(move || {
            let mut num2 = counter.lock().unwrap();
    
            *num2 += 1;
        });
        handles.push(handle2);
    
        for handle in handles {
            handle.join().unwrap();
        }
    
        println!("Result: {}", *ctr.lock().unwrap());
    }
    
    

    我们运行上面这段代码,结果打印:

    Result: 2
    

    结果符合预期。

    那Arc到底是何方神圣,有这么神奇的功效?

    它的含义是:atomically reference counted type。

    翻译成中文,就是原子引用计数类型。有点复杂是不是?

    没有关系,我们现在只要记住它是专门用于多线程共享的引用计数值就行了。

    它专门用来线程中安全地共享数据。

    以上,希望对你有用。

    如果遇到什么问题,欢迎加入:rust新手群,在这里我可以提供一些简单的帮助,加微信:360369487,注明:博客园+rust
    

    参考文章:

    https://doc.rust-lang.org/1.4.0/book/dining-philosophers.html

    https://doc.rust-lang.org/1.30.0/book/second-edition/ch16-01-threads.html

    https://doc.rust-lang.org/1.30.0/book/second-edition/ch16-02-message-passing.html

  • 相关阅读:
    禅道学习(一)
    遍历
    php特性
    jsonRPC
    model
    水仙花数(详细2
    水仙花数(详细1
    递归求n 项和
    poj 1651
    nyist 289
  • 原文地址:https://www.cnblogs.com/gyc567/p/12028052.html
Copyright © 2011-2022 走看看