这篇“Rust多线程Web服务器怎么搭建”文章的知识点大部分人都不太理解,所以小编给大家总结了以下内容,内容详细,步骤清晰,具有一定的借鉴价值,希望大家阅读完这篇文章能有所收获,下面我们一起来看看这篇“Rust多线程Web服务器怎么搭建”文章吧。
在 socket 上监听 TCP 连接
解析少量的 HTTP 请求
创建一个合适的 HTTP 响应
使用线程池改进服务器的吞吐量
优雅的停机和清理
注意:并不是最佳实践
创建项目
~/rust ➜ cargo new hello Created binary (application) `hello` package ~/rust ➜
main.rs 文件
use std::net::TcpListener; fn main() { let listener = TcpListener::bind("127.0.0.1:7878").unwrap(); for stream in listener.incoming() { let stream = stream.unwrap(); println!("Connection established!"); } }
修改一:
use std::io::prelude::*; use std::net::TcpListener; use std::net::TcpStream; fn main() { let listener = TcpListener::bind("127.0.0.1:7878").unwrap(); for stream in listener.incoming() { let stream = stream.unwrap(); handle_connection(stream); } } fn handle_connection(mut stream: TcpStream) { let mut buffer = [0; 512]; stream.read(&mut buffer).unwrap(); println!("Request: {}", String::from_utf8_lossy(&buffer[..])); }
修改二:
use std::io::prelude::*; use std::net::TcpListener; use std::net::TcpStream; fn main() { let listener = TcpListener::bind("127.0.0.1:7878").unwrap(); for stream in listener.incoming() { let stream = stream.unwrap(); handle_connection(stream); } } fn handle_connection(mut stream: TcpStream) { let mut buffer = [0; 512]; stream.read(&mut buffer).unwrap(); // 请求 // Method Request-URI HTTP-Version CRLF // headers CRLF // message-body // 响应 // HTTP-Version Status-Code Reason-Phrase CRLF // headers CRLF // message-body let response = "HTTP/1.1 200 OK\r\n\r\n"; stream.write(response.as_bytes()).unwrap(); stream.flush().unwrap(); println!("Request: {}", String::from_utf8_lossy(&buffer[..])); }
修改三:
use std::fs; use std::io::prelude::*; use std::net::TcpListener; use std::net::TcpStream; fn main() { let listener = TcpListener::bind("127.0.0.1:7878").unwrap(); for stream in listener.incoming() { let stream = stream.unwrap(); handle_connection(stream); } } fn handle_connection(mut stream: TcpStream) { let mut buffer = [0; 512]; stream.read(&mut buffer).unwrap(); // 请求 // Method Request-URI HTTP-Version CRLF // headers CRLF // message-body // 响应 // HTTP-Version Status-Code Reason-Phrase CRLF // headers CRLF // message-body let contents = fs::read_to_string("hello.html").unwrap(); let response = format!("HTTP/1.1 200 OK\r\n\r\n{}", contents); stream.write(response.as_bytes()).unwrap(); stream.flush().unwrap(); }
修改四:
use std::fs; use std::io::prelude::*; use std::net::TcpListener; use std::net::TcpStream; fn main() { let listener = TcpListener::bind("127.0.0.1:7878").unwrap(); for stream in listener.incoming() { let stream = stream.unwrap(); handle_connection(stream); } } fn handle_connection(mut stream: TcpStream) { let mut buffer = [0; 512]; stream.read(&mut buffer).unwrap(); let get = b"GET / HTTP/1.1\r\n"; if buffer.starts_with(get) { let contents = fs::read_to_string("hello.html").unwrap(); let response = format!("HTTP/1.1 200 OK\r\n\r\n{}", contents); stream.write(response.as_bytes()).unwrap(); stream.flush().unwrap(); } else { let status_line = "HTTP/1.1 404 NOT FOUND\r\n\r\n"; let contents = fs::read_to_string("404.html").unwrap(); let response = format!("{}{}", status_line, contents); stream.write(response.as_bytes()).unwrap(); stream.flush().unwrap(); } }
修改五:
use std::fs; use std::io::prelude::*; use std::net::TcpListener; use std::net::TcpStream; fn main() { let listener = TcpListener::bind("127.0.0.1:7878").unwrap(); for stream in listener.incoming() { let stream = stream.unwrap(); handle_connection(stream); } } fn handle_connection(mut stream: TcpStream) { let mut buffer = [0; 512]; stream.read(&mut buffer).unwrap(); let get = b"GET / HTTP/1.1\r\n"; let (status_line, filename) = if buffer.starts_with(get) { ("HTTP/1.1 200 OK\r\n\r\n", "hello.html") } else { ("HTTP/1.1 404 NOT FOUND\r\n\r\n", "404.html") }; let contents = fs::read_to_string(filename).unwrap(); let response = format!("{}{}", status_line, contents); stream.write(response.as_bytes()).unwrap(); stream.flush().unwrap(); }
hello.html 文件
<!DOCTYPE html> <html lang="en"> <head> <meta charset="utf-8"> <title>Hello</title> </head> <body> <h2>Hello</h2> <p>Hi from Rust</p> </body> </html>
404.html 文件
<!DOCTYPE html> <html lang="en"> <head> <meta charset="utf-8"> <title>Hello!</title> </head> <body> <h2>Oops!</h2> <p>Sorry, I don't know what you're asking for.</p> </body> </html>
单线程Web服务器
use std::fs; use std::thread; use std::io::prelude::*; use std::net::TcpListener; use std::net::TcpStream; use std::time::Duration; fn main() { let listener = TcpListener::bind("127.0.0.1:7878").unwrap(); for stream in listener.incoming() { let stream = stream.unwrap(); handle_connection(stream); } } fn handle_connection(mut stream: TcpStream) { let mut buffer = [0; 512]; stream.read(&mut buffer).unwrap(); let get = b"GET / HTTP/1.1\r\n"; let sleep = b"GET /sleep HTTP/1.1\r\n"; let (status_line, filename) = if buffer.starts_with(get) { ("HTTP/1.1 200 OK\r\n\r\n", "hello.html") } else if buffer.starts_with(sleep) { thread::sleep(Duration::from_secs(5)); ("HTTP/1.1 200 OK\r\n\r\n", "hello.html") } else { ("HTTP/1.1 404 NOT FOUND\r\n\r\n", "404.html") }; let contents = fs::read_to_string(filename).unwrap(); let response = format!("{}{}", status_line, contents); stream.write(response.as_bytes()).unwrap(); stream.flush().unwrap(); }
开启线程
use std::fs; use std::thread; use std::io::prelude::*; use std::net::TcpListener; use std::net::TcpStream; use std::time::Duration; fn main() { let listener = TcpListener::bind("localhost:7878").unwrap(); for stream in listener.incoming() { let stream = stream.unwrap(); thread::spawn(|| { handle_connection(stream); }); } } fn handle_connection(mut stream: TcpStream) { let mut buffer = [0; 512]; stream.read(&mut buffer).unwrap(); let get = b"GET / HTTP/1.1\r\n"; let sleep = b"GET /sleep HTTP/1.1\r\n"; let (status_line, filename) = if buffer.starts_with(get) { ("HTTP/1.1 200 OK\r\n\r\n", "hello.html") } else if buffer.starts_with(sleep) { thread::sleep(Duration::from_secs(5)); ("HTTP/1.1 200 OK\r\n\r\n", "hello.html") } else { ("HTTP/1.1 404 NOT FOUND\r\n\r\n", "404.html") }; let contents = fs::read_to_string(filename).unwrap(); let response = format!("{}{}", status_line, contents); stream.write(response.as_bytes()).unwrap(); stream.flush().unwrap(); }
lib.rs 文件
use std::thread; pub struct ThreadPool { threads: Vec<thread::JoinHandle<()>>, } impl ThreadPool { /// Creates a new ThreadPool. /// /// The size is the number of threads in the pool. /// /// # Panics /// /// The `new` function will panic if the size is zero. pub fn new(size: usize) -> ThreadPool { assert!(size > 0); let mut threads = Vec::with_capacity(size); for _ in 0..size { // create some threads and store them in the vector } ThreadPool { threads } } pub fn execute<F>(&self, f: F) where F: FnOnce() + Send + 'static, { } }
lib.rs 修改一
use std::thread; pub struct ThreadPool { workers: Vec<Worker>, } impl ThreadPool { /// Creates a new ThreadPool. /// /// The size is the number of threads in the pool. /// /// # Panics /// /// The `new` function will panic if the size is zero. pub fn new(size: usize) -> ThreadPool { assert!(size > 0); let mut workers = Vec::with_capacity(size); for id in 0..size { workers.push(Worker::new(id)); } ThreadPool { workers } } pub fn execute<F>(&self, f: F) where F: FnOnce() + Send + 'static, { } } struct Worker { id: usize, thread: thread::JoinHandle<()>, } impl Worker { fn new(id: usize) -> Worker { let thread = thread::spawn(|| {}); Worker { id, thread } } }
lib.rs 修改二
use std::thread; use std::sync::mpsc; pub struct ThreadPool { workers: Vec<Worker>, sender: mpsc::Sender<Job>, } struct Job; impl ThreadPool { /// Creates a new ThreadPool. /// /// The size is the number of threads in the pool. /// /// # Panics /// /// The `new` function will panic if the size is zero. pub fn new(size: usize) -> ThreadPool { assert!(size > 0); let (sender, receiver) = mpsc::channel(); let mut workers = Vec::with_capacity(size); for id in 0..size { workers.push(Worker::new(id, receiver)); } ThreadPool { workers, sender } } pub fn execute<F>(&self, f: F) where F: FnOnce() + Send + 'static, { } } struct Worker { id: usize, thread: thread::JoinHandle<()>, } impl Worker { fn new(id: usize, receiver: mpsc::Receiver<Job>) -> Worker { let thread = thread::spawn(|| { receiver; }); Worker { id, thread } } }
lib.rs
修改三
use std::thread; use std::sync::mpsc; use std::sync::Arc; use std::sync::Mutex; pub struct ThreadPool { workers: Vec<Worker>, sender: mpsc::Sender<Job>, } struct Job; impl ThreadPool { /// Creates a new ThreadPool. /// /// The size is the number of threads in the pool. /// /// # Panics /// /// The `new` function will panic if the size is zero. pub fn new(size: usize) -> ThreadPool { assert!(size > 0); let (sender, receiver) = mpsc::channel(); let receiver = Arc::new(Mutex::new(receiver)); let mut workers = Vec::with_capacity(size); for id in 0..size { workers.push(Worker::new(id, Arc::clone(&receiver))); } ThreadPool { workers, sender } } pub fn execute<F>(&self, f: F) where F: FnOnce() + Send + 'static, { } } struct Worker { id: usize, thread: thread::JoinHandle<()>, } impl Worker { fn new(id: usize, receiver: Arc<Mutex<mpsc::Receiver<Job>>>) -> Worker { let thread = thread::spawn(|| { receiver; }); Worker { id, thread } } }
lib.rs
修改四
use std::thread; use std::sync::mpsc; use std::sync::Arc; use std::sync::Mutex; pub struct ThreadPool { workers: Vec<Worker>, sender: mpsc::Sender<Job>, } // struct Job; type Job = Box<FnOnce() + Send + 'static>; impl ThreadPool { /// Creates a new ThreadPool. /// /// The size is the number of threads in the pool. /// /// # Panics /// /// The `new` function will panic if the size is zero. pub fn new(size: usize) -> ThreadPool { assert!(size > 0); let (sender, receiver) = mpsc::channel(); let receiver = Arc::new(Mutex::new(receiver)); let mut workers = Vec::with_capacity(size); for id in 0..size { workers.push(Worker::new(id, Arc::clone(&receiver))); } ThreadPool { workers, sender } } pub fn execute<F>(&self, f: F) where F: FnOnce() + Send + 'static, { let job = Box::new(f); self.sender.send(job).unwrap(); } } struct Worker { id: usize, thread: thread::JoinHandle<()>, } impl Worker { fn new(id: usize, receiver: Arc<Mutex<mpsc::Receiver<Job>>>) -> Worker { let thread = thread::spawn(move || loop { let job = receiver.lock().unwrap().recv().unwrap(); println!("Worker {} got a job; executing.", id); (*job)(); }); Worker { id, thread } } }
lib.rs
修改五
use std::thread; use std::sync::mpsc; use std::sync::Arc; use std::sync::Mutex; pub struct ThreadPool { workers: Vec<Worker>, sender: mpsc::Sender<Job>, } // struct Job; // type Job = Box<FnOnce() + Send + 'static>; impl ThreadPool { /// Creates a new ThreadPool. /// /// The size is the number of threads in the pool. /// /// # Panics /// /// The `new` function will panic if the size is zero. pub fn new(size: usize) -> ThreadPool { assert!(size > 0); let (sender, receiver) = mpsc::channel(); let receiver = Arc::new(Mutex::new(receiver)); let mut workers = Vec::with_capacity(size); for id in 0..size { workers.push(Worker::new(id, Arc::clone(&receiver))); } ThreadPool { workers, sender } } pub fn execute<F>(&self, f: F) where F: FnOnce() + Send + 'static, { let job = Box::new(f); self.sender.send(job).unwrap(); } } struct Worker { id: usize, thread: thread::JoinHandle<()>, } trait FnBox { fn call_box(self: Box<Self>); } impl<F: FnOnce()> FnBox for F { fn call_box(self: Box<F>) { (*self)() } } type Job = Box<dyn FnBox + Send + 'static>; impl Worker { fn new(id: usize, receiver: Arc<Mutex<mpsc::Receiver<Job>>>) -> Worker { let thread = thread::spawn(move || loop { let job = receiver.lock().unwrap().recv().unwrap(); println!("Worker {} got a job; executing.", id); // (*job)(); job.call_box(); }); Worker { id, thread } } }
lib.rs
修改六
use std::sync::mpsc; use std::sync::Arc; use std::sync::Mutex; use std::thread; pub struct ThreadPool { workers: Vec<Worker>, sender: mpsc::Sender<Job>, } // struct Job; // type Job = Box<FnOnce() + Send + 'static>; impl ThreadPool { /// Creates a new ThreadPool. /// /// The size is the number of threads in the pool. /// /// # Panics /// /// The `new` function will panic if the size is zero. pub fn new(size: usize) -> ThreadPool { assert!(size > 0); let (sender, receiver) = mpsc::channel(); let receiver = Arc::new(Mutex::new(receiver)); let mut workers = Vec::with_capacity(size); for id in 0..size { workers.push(Worker::new(id, Arc::clone(&receiver))); } ThreadPool { workers, sender } } pub fn execute<F>(&self, f: F) where F: FnOnce() + Send + 'static, { let job = Box::new(f); self.sender.send(job).unwrap(); } } struct Worker { id: usize, thread: thread::JoinHandle<()>, } trait FnBox { fn call_box(self: Box<Self>); } impl<F: FnOnce()> FnBox for F { fn call_box(self: Box<F>) { (*self)() } } type Job = Box<dyn FnBox + Send + 'static>; impl Worker { fn new(id: usize, receiver: Arc<Mutex<mpsc::Receiver<Job>>>) -> Worker { let thread = thread::spawn(move || loop { while let Ok(job) = receiver.lock().unwrap().recv() { println!("Worker {} got a job; executing.", id); job.call_box(); } }); Worker { id, thread } } }
use std::sync::mpsc; use std::sync::Arc; use std::sync::Mutex; use std::thread; pub struct ThreadPool { workers: Vec<Worker>, sender: mpsc::Sender<Job>, } // struct Job; // type Job = Box<FnOnce() + Send + 'static>; impl ThreadPool { /// Creates a new ThreadPool. /// /// The size is the number of threads in the pool. /// /// # Panics /// /// The `new` function will panic if the size is zero. pub fn new(size: usize) -> ThreadPool { assert!(size > 0); let (sender, receiver) = mpsc::channel(); let receiver = Arc::new(Mutex::new(receiver)); let mut workers = Vec::with_capacity(size); for id in 0..size { workers.push(Worker::new(id, Arc::clone(&receiver))); } ThreadPool { workers, sender } } pub fn execute<F>(&self, f: F) where F: FnOnce() + Send + 'static, { let job = Box::new(f); self.sender.send(job).unwrap(); } } impl Drop for ThreadPool { fn drop(&mut self) { for worker in &mut self.workers { println!("Shutting down worker {}", worker.id); worker.thread.join().unwrap() } } } struct Worker { id: usize, thread: thread::JoinHandle<()>, } trait FnBox { fn call_box(self: Box<Self>); } impl<F: FnOnce()> FnBox for F { fn call_box(self: Box<F>) { (*self)() } } type Job = Box<dyn FnBox + Send + 'static>; impl Worker { fn new(id: usize, receiver: Arc<Mutex<mpsc::Receiver<Job>>>) -> Worker { let thread = thread::spawn(move || loop { while let Ok(job) = receiver.lock().unwrap().recv() { println!("Worker {} got a job; executing.", id); job.call_box(); } }); Worker { id, thread } } }
修改优化一:
use std::sync::mpsc; use std::sync::Arc; use std::sync::Mutex; use std::thread; pub struct ThreadPool { workers: Vec<Worker>, sender: mpsc::Sender<Job>, } // struct Job; // type Job = Box<FnOnce() + Send + 'static>; impl ThreadPool { /// Creates a new ThreadPool. /// /// The size is the number of threads in the pool. /// /// # Panics /// /// The `new` function will panic if the size is zero. pub fn new(size: usize) -> ThreadPool { assert!(size > 0); let (sender, receiver) = mpsc::channel(); let receiver = Arc::new(Mutex::new(receiver)); let mut workers = Vec::with_capacity(size); for id in 0..size { workers.push(Worker::new(id, Arc::clone(&receiver))); } ThreadPool { workers, sender } } pub fn execute<F>(&self, f: F) where F: FnOnce() + Send + 'static, { let job = Box::new(f); self.sender.send(job).unwrap(); } } impl Drop for ThreadPool { fn drop(&mut self) { for worker in &mut self.workers { println!("Shutting down worker {}", worker.id); if let Some(thread) = worker.thread.take() { thread.join().unwrap(); } } } } struct Worker { id: usize, thread: Option<thread::JoinHandle<()>>, } trait FnBox { fn call_box(self: Box<Self>); } impl<F: FnOnce()> FnBox for F { fn call_box(self: Box<F>) { (*self)() } } type Job = Box<dyn FnBox + Send + 'static>; impl Worker { fn new(id: usize, receiver: Arc<Mutex<mpsc::Receiver<Job>>>) -> Worker { let thread = thread::spawn(move || loop { while let Ok(job) = receiver.lock().unwrap().recv() { println!("Worker {} got a job; executing.", id); job.call_box(); } }); Worker { id, thread: Some(thread), } } }
最终版 lib.rs 文件
use std::sync::mpsc; use std::sync::Arc; use std::sync::Mutex; use std::thread; enum Message { NewJob(Job), Terminate, } pub struct ThreadPool { workers: Vec<Worker>, sender: mpsc::Sender<Message>, } // struct Job; // type Job = Box<FnOnce() + Send + 'static>; impl ThreadPool { /// Creates a new ThreadPool. /// /// The size is the number of threads in the pool. /// /// # Panics /// /// The `new` function will panic if the size is zero. pub fn new(size: usize) -> ThreadPool { assert!(size > 0); let (sender, receiver) = mpsc::channel(); let receiver = Arc::new(Mutex::new(receiver)); let mut workers = Vec::with_capacity(size); for id in 0..size { workers.push(Worker::new(id, Arc::clone(&receiver))); } ThreadPool { workers, sender } } pub fn execute<F>(&self, f: F) where F: FnOnce() + Send + 'static, { let job = Box::new(f); self.sender.send(Message::NewJob(job)).unwrap(); } } impl Drop for ThreadPool { fn drop(&mut self) { println!("Sending terminate message to all workers."); for _ in &mut self.workers { self.sender.send(Message::Terminate).unwrap(); } println!("Shutting down all workers."); for worker in &mut self.workers { println!("Shutting down worker {}", worker.id); if let Some(thread) = worker.thread.take() { thread.join().unwrap(); } } } } struct Worker { id: usize, thread: Option<thread::JoinHandle<()>>, } trait FnBox { fn call_box(self: Box<Self>); } impl<F: FnOnce()> FnBox for F { fn call_box(self: Box<F>) { (*self)() } } type Job = Box<dyn FnBox + Send + 'static>; impl Worker { fn new(id: usize, receiver: Arc<Mutex<mpsc::Receiver<Message>>>) -> Worker { let thread = thread::spawn(move || loop { let message = receiver.lock().unwrap().recv().unwrap(); match message { Message::NewJob(job) => { println!("Worker {} got a job; executing.", id); job.call_box(); } Message::Terminate => { println!("Worker {} got a job; executing.", id); break; } } }); Worker { id, thread: Some(thread), } } }
最终版 main.rs 文件
use hello::ThreadPool; use std::fs; use std::io::prelude::*; use std::net::TcpListener; use std::net::TcpStream; use std::thread; use std::time::Duration; fn main() { let listener = TcpListener::bind("localhost:7878").unwrap(); let pool = ThreadPool::new(4); for stream in listener.incoming().take(2) { let stream = stream.unwrap(); pool.execute(|| { handle_connection(stream); }); } println!("Shutting down."); } fn handle_connection(mut stream: TcpStream) { let mut buffer = [0; 512]; stream.read(&mut buffer).unwrap(); let get = b"GET / HTTP/1.1\r\n"; let sleep = b"GET /sleep HTTP/1.1\r\n"; let (status_line, filename) = if buffer.starts_with(get) { ("HTTP/1.1 200 OK\r\n\r\n", "hello.html") } else if buffer.starts_with(sleep) { thread::sleep(Duration::from_secs(5)); ("HTTP/1.1 200 OK\r\n\r\n", "hello.html") } else { ("HTTP/1.1 404 NOT FOUND\r\n\r\n", "404.html") }; let contents = fs::read_to_string(filename).unwrap(); let response = format!("{}{}", status_line, contents); stream.write(response.as_bytes()).unwrap(); stream.flush().unwrap(); }
运行
hello on master [?] is ???? 0.1.0 via ???? 1.67.1 ➜ cargo run Compiling hello v0.1.0 (/Users/qiaopengjun/rust/hello) Finished dev [unoptimized + debuginfo] target(s) in 0.43s Running `target/debug/hello` Worker 0 got a job; executing. Shutting down. Sending terminate message to all workers. Shutting down all workers. Shutting down worker 0 Worker 1 got a job; executing. Worker 2 got a job; executing. Worker 3 got a job; executing. Worker 1 got a job; executing. Worker 0 got a job; executing. Shutting down worker 1 Shutting down worker 2 Shutting down worker 3 hello on master [?] is ???? 0.1.0 via ???? 1.67.1 took 21.9s ➜
以上就是关于“Rust多线程Web服务器怎么搭建”这篇文章的内容,相信大家都有了一定的了解,希望小编分享的内容对大家有帮助,若想了解更多相关的知识内容,请关注亿速云行业资讯频道。
免责声明:本站发布的内容(图片、视频和文字)以原创、转载和分享为主,文章观点不代表本网站立场,如果涉及侵权请联系站长邮箱:is@yisu.com进行举报,并提供相关证据,一经查实,将立刻删除涉嫌侵权内容。