这篇文章主要介绍“go语言任务队列machinery的用法”,在日常操作中,相信很多人在go语言任务队列machinery的用法问题上存在疑惑,小编查阅了各式资料,整理出简单好用的操作方法,希望对大家解答”go语言任务队列machinery的用法”的疑惑有所帮助!接下来,请跟着小编一起来学习吧!
go实现的基于消息中间件的异步任务队列, 下面是学习笔记
步骤1: 创建server,配置参数、注册task。(此处server只是个配置作用, 并不是单独的server进程)
步骤2: 启动worker
步骤3: 发送task
与celery的用法是完全一致的
func startServer() (*machinery.Server, error) {
cnf := &config.Config{
Broker: "amqp://guest:guest@localhost:5672/",
DefaultQueue: "machinery_tasks",
ResultBackend: "amqp://guest:guest@localhost:5672/",
ResultsExpireIn: 3600, //任务有效期
AMQP: &config.AMQPConfig{
Exchange: "machinery_exchange",
ExchangeType: "direct",
BindingKey: "machinery_task",
PrefetchCount: 3, //限定消费能力
},
}
// Create server instance
broker := amqpbroker.New(cnf)
backend := amqpbackend.New(cnf)
lock := eagerlock.New() //任务锁
server := machinery.NewServer(cnf, broker, backend, lock)
// Register tasks
tasks := map[string]interface{}{
"add": exampletasks.Add,
"multiply": exampletasks.Multiply,
"sum_ints": exampletasks.SumInts,
"sum_floats": exampletasks.SumFloats,
"concat": exampletasks.Concat,
"split": exampletasks.Split,
"panic_task": exampletasks.PanicTask,
"long_running_task": exampletasks.LongRunningTask,
}
return server, server.RegisterTasks(tasks)
}
创建worker, 之后就可以启动了
func worker() error {
//消费者的标记
consumerTag := "machinery_worker"
server, err := startServer()
if err != nil {
return err
}
//第二个参数并发数, 0表示不限制
worker := server.NewWorker(consumerTag, 0)
//钩子函数
errorhandler := func(err error) {}
pretaskhandler := func(signature *tasks.Signature) {}
posttaskhandler := func(signature *tasks.Signature) {}
worker.SetPostTaskHandler(posttaskhandler)
worker.SetErrorHandler(errorhandler)
worker.SetPreTaskHandler(pretaskhandler)
return worker.Launch()
}
启动结果
INFO: 2021/05/01 08:28:27 worker.go:58 Launching a worker with the following settings:
INFO: 2021/05/01 08:28:27 worker.go:59 - Broker: amqp://192.168.120.101:5672
INFO: 2021/05/01 08:28:27 worker.go:61 - DefaultQueue: machinery_tasks
INFO: 2021/05/01 08:28:27 worker.go:65 - ResultBackend: amqp://192.168.120.101:5672
INFO: 2021/05/01 08:28:27 worker.go:67 - AMQP: machinery_exchange
INFO: 2021/05/01 08:28:27 worker.go:68 - Exchange: machinery_exchange
INFO: 2021/05/01 08:28:27 worker.go:69 - ExchangeType: direct
INFO: 2021/05/01 08:28:27 worker.go:70 - BindingKey: machinery_task
INFO: 2021/05/01 08:28:27 worker.go:71 - PrefetchCount: 0
INFO: 2021/05/01 08:28:27 amqp.go:96 [*] Waiting for messages. To exit press CTRL+C
server, _ := startServer()
signature := &tasks.Signature{
Name: "add",
Args: []tasks.Arg{
{
Type: "int64",
Value: 1,
},
{
Type: "int64",
Value: 1,
},
},
}
asyncResult, _ := server.SendTask(signature)
fmt.Println(asyncResult.Get(time.Millisecond * 5)) //等待间隔,理论上是越小越好
//asyncResult.GetWithTimeout(time.Second*120, time.Millisecond * 5) //第一个参数才是timeout
以上就是machinery的基本用法,与celery基本一样, 更详细内容参考官方文档
到此,关于“go语言任务队列machinery的用法”的学习就结束了,希望能够解决大家的疑惑。理论与实践的搭配能更好的帮助大家学习,快去试试吧!若想继续学习更多相关知识,请继续关注亿速云网站,小编会继续努力为大家带来更多实用的文章!
亿速云「云服务器」,即开即用、新一代英特尔至强铂金CPU、三副本存储NVMe SSD云盘,价格低至29元/月。点击查看>>
免责声明:本站发布的内容(图片、视频和文字)以原创、转载和分享为主,文章观点不代表本网站立场,如果涉及侵权请联系站长邮箱:is@yisu.com进行举报,并提供相关证据,一经查实,将立刻删除涉嫌侵权内容。
原文链接:https://my.oschina.net/yongqing/blog/5037728