Golang中怎么实现百万级高并发,很多新手对此不是很清楚,为了帮助大家解决这个难题,下面小编将为大家详细讲解,有这方面需求的人可以来学习下,希望你能有所收获。
package main import ( "fmt" "runtime" "time" ) func main(){ //这里我们假设数据是int类型,缓存格式设为100 dataChan:=make(chan int,100) go func(){ for{ select{ case data:=<-dataChan: fmt.Println("data:",data) time.Sleep(1 * time.Second)//这里延迟是模拟处理数据的耗时 } } }() //填充数据 for i:=0;i<100;i++{ dataChan<-i } //这里循环打印查看协程个数 for { fmt.Println("runtime.NumGoroutine() :", runtime.NumGoroutine()) time.Sleep(2 * time.Second) } }
这里打印出来的协程个数时2,为什么? 因为main方法独占一个主协程,我们又起了一个协程,所以是两个。
首先我们要抽象出几个概念:
Job: type Job interface { Do() } // 一个数据接口,所有的数据都要实现该接口,才能被传递进来 //实现Job接口的一个数据实例,需要实现一个Do()方法,对数据的处理就在这个Do()方法中。 Job通道: 这里有两个Job通道: 1、WorkerPool的Job channel,用于调用者把具体的数据写入到这里,WorkerPool读取。 2、Worker的Job channel,当WorkerPool读取到Job,并拿到可用的Worker的时候,会将Job实例写入该Worker的Job channel,用来直接执行Do()方法。 Worker: type Worker struct { JobQueue chan Job //Worker的Job通道 } //每一个被初始化的worker都会在后期单独占用一个协程 //初始化的时候会先把自己的JobQueue传递到Worker通道中, //然后阻塞读取自己的JobQueue,读到一个Job就执行Job对象的Do()方法。 工作池(WorkerPool): type WorkerPool struct { workerlen int //WorkerPool中同时 存在Worker的个数 JobQueue chan Job // WorkerPool的Job通道 WorkerQueue chan chan Job } //初始化时会按照传入的num,启动num个后台协程,然后循环读取Job通道里面的数据, //读到一个数据时,再获取一个可用的Worker,并将Job对象传递到该Worker的chan通道
整个过程中 每个Worker都会被运行在一个协程中,在整个WorkerPool中就会有num可空闲的Worker,当来一条数据的时候,就会在工作池中去一个空闲的Worker去执行该Job,当工作池中没有可用的worker时,就会阻塞等待一个空闲的worker。
这是一个粗糙最简单的版本,只是为了演示效果,具体使用需要根据实际情况加一些特殊的处理。
当数据无限多的时候func (wp *WorkerPool) Run() 会无限创建协程,这里需要做一些处理,这里是为了让所有的请求不等待,并且体现一下最大峰值时的协程数。具体因项目而异。
代码地址:https://github.com/wangzhen0625/gonote/tree/master/7goroutune
main.go
package main import ( "fmt" "runtime" "time" ) type Score struct { Num int } func (s *Score) Do() { fmt.Println("num:", s.Num) time.Sleep(1 * 1 * time.Second) } func main() { num := 100 * 100 * 20 // debug.SetMaxThreads(num + 1000) //设置最大线程数 // 注册工作池,传入任务 // 参数1 worker并发个数 p := NewWorkerPool(num) p.Run() datanum := 100 * 100 * 100 * 100 go func() { for i := 1; i <= datanum; i++ { sc := &Score{Num: i} p.JobQueue <- sc } }() for { fmt.Println("runtime.NumGoroutine() :", runtime.NumGoroutine()) time.Sleep(2 * time.Second) } }
job.go
package main type Job interface { Do() }
worker.go
package main type Worker struct { JobQueue chan Job } func NewWorker() Worker { return Worker{JobQueue: make(chan Job)} } func (w Worker) Run(wq chan chan Job) { go func() { for { wq <- w.JobQueue select { case job := <-w.JobQueue: job.Do() } } }() }
workerpool.go
package main import "fmt" type WorkerPool struct { workerlen int JobQueue chan Job WorkerQueue chan chan Job } func NewWorkerPool(workerlen int) *WorkerPool { return &WorkerPool{ workerlen: workerlen, JobQueue: make(chan Job), WorkerQueue: make(chan chan Job, workerlen), } } func (wp *WorkerPool) Run() { fmt.Println("初始化worker") //初始化worker for i := 0; i < wp.workerlen; i++ { worker := NewWorker() worker.Run(wp.WorkerQueue) } // 循环获取可用的worker,往worker中写job go func() { for { select { case job := <-wp.JobQueue: worker := <-wp.WorkerQueue worker <- job } } }() }
看完上述内容是否对您有帮助呢?如果还想对相关知识有进一步的了解或阅读更多相关文章,请关注亿速云行业资讯频道,感谢您对亿速云的支持。
免责声明:本站发布的内容(图片、视频和文字)以原创、转载和分享为主,文章观点不代表本网站立场,如果涉及侵权请联系站长邮箱:is@yisu.com进行举报,并提供相关证据,一经查实,将立刻删除涉嫌侵权内容。