Golang中的协程池可以通过使用goroutine
和channel
实现。下面是一个简单的示例,展示了如何使用协程池来处理任务:
package main
import (
"fmt"
"sync"
)
type Worker struct {
ID int
Task chan int
Done chan bool
WorkerPool chan chan int
}
func NewWorker(id int, workerPool chan chan int) *Worker {
return &Worker{
ID: id,
Task: make(chan int),
Done: make(chan bool),
WorkerPool: workerPool,
}
}
func (w *Worker) Start() {
go func() {
for {
// 把自己的任务通道注册到工作池
w.WorkerPool <- w.Task
select {
case task := <-w.Task:
// 处理任务
fmt.Printf("Worker %d processing task %d\n", w.ID, task)
case <-w.Done:
// 任务完成
fmt.Printf("Worker %d stopping\n", w.ID)
return
}
}
}()
}
func (w *Worker) Stop() {
go func() {
w.Done <- true
}()
}
type Pool struct {
WorkerPool chan chan int
Tasks chan int
MaxWorkers int
WaitGroup sync.WaitGroup
}
func NewPool(maxWorkers, maxTasks int) *Pool {
return &Pool{
WorkerPool: make(chan chan int, maxWorkers),
Tasks: make(chan int, maxTasks),
MaxWorkers: maxWorkers,
}
}
func (p *Pool) Start() {
// 启动协程池中的工作协程
for i := 0; i < p.MaxWorkers; i++ {
worker := NewWorker(i, p.WorkerPool)
worker.Start()
}
go p.dispatch()
}
func (p *Pool) dispatch() {
for {
select {
case task := <-p.Tasks:
workerTask := <-p.WorkerPool
// 分发任务给空闲的工作协程
workerTask <- task
}
}
}
func main() {
pool := NewPool(3, 10)
pool.Start()
// 添加任务到任务队列
for i := 0; i < 10; i++ {
pool.Tasks <- i
}
pool.WaitGroup.Wait()
}
在上面的示例中,我们定义了一个Worker
结构体,其中包含了一个任务通道Task
和一个完成通道Done
。当Worker启动时,它会把自己的任务通道注册到工作池中,并等待任务的到来。在任务到来时,它会从任务通道中接收任务并处理。当任务完成时,它会通过完成通道通知主线程。
Pool
结构体包含了一个工作协程池和一个任务通道。在Start
方法中,我们创建了maxWorkers
个工作协程,并启动它们。同时,我们还启动了一个dispatch
协程,该协程用于从任务通道中接收任务,并将其分发给空闲的工作协程。
在main
函数中,我们创建了一个协程池,并向任务通道中添加了10个任务。最后,我们使用WaitGroup
等待所有任务完成。
这就是一个简单的Golang协程池的实现和应用。你可以根据自己的需求进行扩展和修改。