1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
use std::sync::mpsc::Receiver;
use std::sync::{mpsc, Arc, Mutex};
use std::thread;
use std::thread::JoinHandle;

/// 手写一个线程池
type Job = Box<dyn FnOnce() + Send + 'static>;

struct Worker {
id: usize,
join_handle: Option<JoinHandle<()>>,
}

impl Worker {
fn new(id: usize, receiver: Arc<Mutex<Receiver<Job>>>) -> Self {
let join_handle = thread::spawn(move || loop {
// 先获取锁,接收任务,然后马上释放锁
let job = {
let lock = receiver.lock().unwrap();
match lock.recv() {
Ok(job) => job,
Err(_) => {
println!("Worker {id} disconnected; shutting down.");
break;
}
}
};

println!("Worker {id} executing job");
job()
});

Self {
id,
join_handle: Some(join_handle),
}
}
}

struct ThreadPool {
workers: Vec<Worker>,
sender: Option<mpsc::Sender<Job>>,
}

impl ThreadPool {
fn new(size: usize) -> Self {
assert!(size > 0, "ThreadPool size must greater than zero!");

let (sender, receiver) = mpsc::channel();
let receiver = Arc::new(Mutex::new(receiver));

let mut workers = Vec::with_capacity(size);
for i in 0..size {
workers.push(Worker::new(i, receiver.clone()))
}

Self {
workers,
sender: Some(sender),
}
}

fn execute<F>(&self, job: F)
where
F: FnOnce() + Send + 'static,
{
if let Some(sender) = &self.sender {
sender.send(Box::new(job)).unwrap()
} else {
panic!("ThreadPool has closed")
}
}
}

impl Drop for ThreadPool {
fn drop(&mut self) {
drop(self.sender.take());

for worker in &mut self.workers {
println!("shutting down worker {}", worker.id);
if let Some(handle) = worker.join_handle.take() {
handle.join().unwrap();
}
}
}
}

fn main() {
// 创建包含4个线程的线程池
let pool = ThreadPool::new(4);

// 提交8个任务
for i in 0..8 {
println!("Submitting task {i}");
pool.execute(move || {
println!("Task {i} started");
thread::sleep(std::time::Duration::from_secs(10));
println!("Task {i} finished");
});
}

// 等待所有任务完成(当pool离开作用域时自动关闭)
println!("All tasks submitted");
}