本篇内容主要讲解“Rust中多线程的使用方法”,感兴趣的朋友不妨来看看。本文介绍的方法操作简单快捷,实用性强。下面就让小编来带大家学习“Rust中多线程的使用方法”吧!
Rust对并发编程提供了非常丰富的支持,有传统的多线程方式,也提供流行的异步原语async, await,本篇文章主要介绍多线程方面的基本用法。以下将分为5部分进行讲解
线程的创建
锁
原子变量
管道 , 条件变量
生产者消费者的实现
线程的创建非常的简单
let thread = std::thread::spawn(||{ println!("hello world"); }); thread.join(); //等待线程结束
Rust语言和其他语言不一样的地方是,如果线程里使用了外部变量,则会报错
let data = String::from("hello world"); let thread = std::thread::spawn(||{ println!("{}", data); }); thread.join(); 原因如下: error[E0373]: closure may outlive the current function, but it borrows `data`, which is owned by the current function --> src\main.rs:36:37 | 36 | let thread = std::thread::spawn(||{ | ^^ may outlive borrowed value `data` 37 | println!("{}", data); | ---- `data` is borrowed here
线程中使用了其他线程的变量是不合法的,必须使用move表明线程拥有data的所有权
let data = String::from("hello world"); let thread = std::thread::spawn(move ||{ //使用move 把data的所有权转到线程内 println!("{}", data); }); thread.join();
如果想要在多线程间读写数据,通常需要加锁,如java中的 synchornized。与之对应,在Rust中需要使用Mutex,由于Mutex是跨线程使用,线程会转移Mutex的所有权,所以必须配合Arc使用。
fn main() { let counter = Arc::new(Mutex::new(0)); let counter2 = counter.clone(); let thread = std::thread::spawn(move ||{ let mut i = counter2.lock().unwrap();//获取锁,不需要手动释放,rust的锁和变量的生命周期一样,离开作用域时,锁会自动释放 *i = *i + 1; }); thread.join(); let counter = counter.lock().unwrap(); assert_eq!(1, *counter); }
上面的例子中,我们使用锁来实现对数据的安全访问,锁作用的范围是调用lock到锁对象的scope结束,在这段范围内的代码同一时间只能被一个线程访问,从这点来看,使用锁来实现对单一数据的安全访问就有点重了(当然从锁和原子变量的实现机制来说,锁也远比原子变量重),这时候使用原子变量效率就会高很多。
fn main() { let counter = Arc::new(AtomicU32::new(0)); let counter2 = counter.clone(); let thread = std::thread::spawn(move ||{ counter2.fetch_add( 1, Ordering::SeqCst); }); counter.fetch_add( 1, Ordering::SeqCst); thread.join(); counter.load(Ordering::SeqCst); assert_eq!(2, counter.load(Ordering::SeqCst)); }
线程间的通信、协作,需要有一定的机制来支持,管道和条件变量就是这样的机制。
管道(Channel) Rust的channle包含了2个概念,发送者和接收者。发送者可以将消息放入管道,接收者则从管道中读取消息
fn main() { use std::sync::mpsc::channel; use std::thread; let (sender, receiver) = channel(); let sender2 = sender.clone(); thread::spawn(move|| { sender2.send(123).unwrap(); //线程1 发送消息 }); thread::spawn(move|| { sender.send(456).unwrap(); //线程2 发送消息 }); while let Ok(res) = receiver.recv() { //主线程 接收消息 println!("{:?}", res); } }
值得注意的是接收者(receiver), 是唯一的,不像发送者(sender)那样可以有多个
条件变量 条件变量Condvar,不能单独使用,需要和监视器MutexGuard配合使用。 线程之间通过调用 condvar.wait, condvar.notify_all, condvar.notify_one来实现线程间通信。
fn println(msg: &str){ use chrono::Local; let date = Local::now(); println!("{} {}", date.format("%Y-%m-%d %H:%M:%S"), msg) } fn main() { use std::sync::{Arc, Mutex, Condvar}; use std::thread; let mutex_condva = Arc::new((Mutex::new(false), Condvar::new())); let m_c = mutex_condva.clone(); thread::spawn(move || { println("sub thread start.."); let (lock, cvar) = &*m_c; let mut started = lock.lock().unwrap(); *started = true; //将业务参数设置为true std::thread::sleep(Duration::from_secs(5)); cvar.notify_all(); // 唤醒条件变量等待者 println("sub thread finished.."); }); println("main thread start.."); let (lock, cvar) = &*mutex_condva; let mut started = lock.lock().unwrap(); println("main thread begin wait.."); while !*started { //等待条件变量被唤醒,且等待关注的业务参数为真。这里需要注意,要在循环中判断started,因为条件变量被唤醒时,有可能业务条件并未为true started = cvar.wait(started).unwrap(); } println("main thread finished.."); }
下面的例子使用条件变量Condar来实现多生产者 ,多消费者(使用管道比较容易实现,且只能由一个消费者,这里就不介绍了)
struct Queue<T>{ inner:Vec<T>, capacity: usize } impl<T> Queue<T> { fn new(cap:usize) -> Queue<T> { Queue{ inner: Vec::new(), capacity: cap } } fn push(&mut self, data:T) -> bool { if !self.is_full() { self.inner.push(data); true } else { false } } fn pop(&mut self) -> Option<T> { self.inner.pop() } fn is_empty(&self) -> bool { self.inner.is_empty() } fn is_full(&self) -> bool { if self.inner.len() == self.capacity {true} else {false} } } struct Producer<T>{ inner:Arc<(Mutex<Queue<T>>, Condvar)> } impl<T:Display> Producer<T> { fn new(inner:Arc<(Mutex<Queue<T>>, Condvar)>) -> Producer<T> { Producer{inner} } fn produce(&self, data:T) { let mut queue = self.inner.0.lock().unwrap(); while (*queue).is_full() { println("[Producer] Queue is full, waiting queue to not full"); queue = self.inner.1.wait(queue).unwrap(); println("[Producer22] Queue is full, waiting queue to not full"); } println("[Producer] Queue is not full, push data to queue"); queue.push(data); self.inner.1.notify_all(); } } struct Consumer<T>{ inner: Arc<(Mutex<Queue<T>>, Condvar)> } impl<T:Display> Consumer<T> { fn new(inner:Arc<(Mutex<Queue<T>>, Condvar)>) -> Consumer<T> { Consumer{inner} } fn consume(&self) -> Option<T> { let mut queue = self.inner.0.lock().unwrap(); while (*queue).is_empty() { println("[Consumer] Queue is empty, waiting queue to have data"); queue = self.inner.1.wait(queue).unwrap(); } println("[Consumer] Queue has data, pop data"); let data = queue.pop(); self.inner.1.notify_all(); data } } fn println(msg: &str){ use chrono::Local; let date = Local::now(); println!("{:?} {} {}", std::thread::current().id(), date.format("%Y-%m-%d %H:%M:%S"), msg) } fn main() { let mc = Arc::new((Mutex::new(Queue::<usize>::new(3)), Condvar::new())); produce(&mc); consume(&mc); std::thread::sleep(Duration::from_secs(1000));//主线程等待,不然程序会提早退出 } fn produce(mc: &Arc<(Mutex<Queue<usize>>, Condvar)>){ for i in 0 .. 10 { let mc_clone = mc.clone(); std::thread::spawn(move || { std::thread::sleep(Duration::from_secs(random::<u64>() % 10)); let producer = Producer::new(mc_clone); producer.produce(i); }); } } fn consume(mc: &Arc<(Mutex<Queue<usize>>, Condvar)>){ for i in 0 .. 2 { let mc_clone = mc.clone(); std::thread::spawn(move || { std::thread::sleep(Duration::from_secs(random::<u64>() % 2)); let consumer = Consumer::new(mc_clone); loop { consumer.consume(); } }); } }
到此,相信大家对“Rust中多线程的使用方法”有了更深的了解,不妨来实际操作一番吧!这里是亿速云网站,更多相关内容可以进入相关频道进行查询,关注我们,继续学习!
免责声明:本站发布的内容(图片、视频和文字)以原创、转载和分享为主,文章观点不代表本网站立场,如果涉及侵权请联系站长邮箱:is@yisu.com进行举报,并提供相关证据,一经查实,将立刻删除涉嫌侵权内容。