本篇内容主要讲解“machinery的功能有哪些”,感兴趣的朋友不妨来看看。本文介绍的方法操作简单快捷,实用性强。下面就让小编来带大家学习“machinery的功能有哪些”吧!
上面只是简单举了个例子,任务队列有着广泛的应用场景,比如大批量的计算任务,当有大量数据插入,通过拆分并分批插入任务队列,从而实现串行链式任务处理或者实现分组并行任务处理,提高系统鲁棒性,提高系统并发度;或者对数据进行预处理,定期的从后端存储将数据同步到到缓存系统,从而在查询请求发生时,直接去缓存系统中查询,提高查询请求的响应速度。适用任务队列的场景有很多,这里就不一一列举了。回归本文主题,既然我们要学习machinery
,就要先了解一下他都有哪些特性呢?
任务队列,简而言之就是一个放大的生产者消费者模型,用户请求会生成任务,任务生产者不断的向队列中插入任务,同时,队列的处理器程序充当消费者不断的消费任务。基于这种框架设计思想,我们来看下machinery的简单设计结构图例:
学习一门新东西,我都习惯先写一个demo,先学会了走,再学会跑。所以先来看一个例子,功能很简单,异步计算1到10的和。
先看一下配置文件代码:
broker: redis://localhost:6379default_queue: "asong"result_backend: redis://localhost:6379redis: max_idle: 3 max_active: 3 max_idle_timeout: 240 wait: true read_timeout: 15 write_timeout: 15 connect_timeout: 15 normal_tasks_poll_period: 1000 delayed_tasks_poll_period: 500 delayed_tasks_key: "asong"
这里broker
与result_backend
来实现。
主代码,完整版github获取:
func main() { cnf,err := config.NewFromYaml("./config.yml",false) if err != nil{ log.Println("config failed",err) return } server,err := machinery.NewServer(cnf) if err != nil{ log.Println("start server failed",err) return } // 注册任务 err = server.RegisterTask("sum",Sum) if err != nil{ log.Println("reg task failed",err) return } worker := server.NewWorker("asong", 1) go func() { err = worker.Launch() if err != nil { log.Println("start worker error",err) return } }() //task signature signature := &tasks.Signature{ Name: "sum", Args: []tasks.Arg{ { Type: "[]int64", Value: []int64{1,2,3,4,5,6,7,8,9,10}, }, }, } asyncResult, err := server.SendTask(signature) if err != nil { log.Fatal(err) } res, err := asyncResult.Get(1) if err != nil { log.Fatal(err) } log.Printf("get res is %v\n", tasks.HumanReadableResults(res))}
运行结果:
INFO: 2020/10/31 11:32:15 file.go:19 Successfully loaded config from file ./config.ymlINFO: 2020/10/31 11:32:15 worker.go:58 Launching a worker with the following settings:INFO: 2020/10/31 11:32:15 worker.go:59 - Broker: redis://localhost:6379INFO: 2020/10/31 11:32:15 worker.go:61 - DefaultQueue: asongINFO: 2020/10/31 11:32:15 worker.go:65 - ResultBackend: redis://localhost:6379INFO: 2020/10/31 11:32:15 redis.go:100 [*] Waiting for messages. To exit press CTRL+CDEBUG: 2020/10/31 11:32:16 redis.go:342 Received new message: {"UUID":"task_9f01be1f-3237-49f1-8464-eecca2e50597","Name":"sum","RoutingKey":"asong","ETA":null,"GroupUUID":"","GroupTaskCount":0,"Args":[{"Name":"","Type":"[]int64","Value":[1,2,3,4,5,6,7,8,9,10]}],"Headers":{},"Priority":0,"Immutable":false,"RetryCount":0,"RetryTimeout":0,"OnSuccess":null,"OnError":null,"ChordCallback":null,"BrokerMessageGroupId":"","SQSReceiptHandle":"","StopTaskDeletionOnError":false,"IgnoreWhenTaskNotRegistered":false}DEBUG: 2020/10/31 11:32:16 worker.go:261 Processed task task_9f01be1f-3237-49f1-8464-eecca2e50597. Results = 552020/10/31 11:32:16 get res is 55
好啦,现在我们开始讲一讲上面的代码流程,
broker
和
result_backend
,这里我都选择的是
redis
,因为电脑正好有这个环境,就直接用了。Machinery
库必须在使用前实例化。实现方法是创建一个
Server
实例。
Server
是
Machinery
配置和注册任务的基本对象。workders
能消费一个任务前,你需要将它注册到服务器。这是通过给任务分配一个唯一的名称来实现的。Server
实例。每个worker将只使用已注册的任务。对于队列中的每个任务,Worker.Process()方法将在一个goroutine中运行。可以使用
server.NewWorker
的第二参数来限制并发运行的worker.Process()调用的数量(每个worker)。Signature
实例传递给
Server
实例来调用任务。HumanReadableResults
这个方法可以处理反射值,获取到最终的结果。上面的代码只是一个简单machinery
使用示例,其实machiney
也支持延时任务的,可以通过在任务signature
上设置ETA时间戳字段来延迟任务。
eta := time.Now().UTC().Add(time.Second * 20) signature.ETA = &eta
在将任务声明为失败之前,可以设置多次重试尝试。斐波那契序列将用于在一段时间内分隔重试请求。这里可以使用两种方法,第一种直接对tsak signature
中的retryTimeout
和RetryCount
字段进行设置,就可以,重试时间将按照斐波那契数列进行叠加。
//task signature signature := &tasks.Signature{ Name: "sum", Args: []tasks.Arg{ { Type: "[]int64", Value: []int64{1,2,3,4,5,6,7,8,9,10}, }, }, RetryTimeout: 100, RetryCount: 3, }
或者,你可以使用return.tasks.ErrRetryTaskLater
返回任务并指定重试的持续时间。
func Sum(args []int64) (int64, error) { sum := int64(0) for _, arg := range args { sum += arg } return sum, tasks.NewErrRetryTaskLater("我说他错了", 4 * time.Second)}
上面我们讲的都是运行一个异步任务,但是我们往往做项目时,一个需求是需要多个异步任务以编排好的方式执行的,所以我们就可以使用machinery
的工作流来完成。
Group
是一组任务,它们将相互独立地并行执行。还是画个图吧,这样看起来更明了:
一起来看一个简单的例子:
// group group,err :=tasks.NewGroup(signature1,signature2,signature3) if err != nil{ log.Println("add group failed",err) } asyncResults, err :=server.SendGroupWithContext(context.Background(),group,10) if err != nil { log.Println(err) } for _, asyncResult := range asyncResults{ results,err := asyncResult.Get(1) if err != nil{ log.Println(err) continue } log.Printf( "%v %v %v\n", asyncResult.Signature.Args[0].Value, tasks.HumanReadableResults(results), ) }
group
中的任务是并行执行的。
我们在做项目时,往往会有一些回调场景,machiney
也为我们考虑到了这一点,Chord
允许你定一个回调任务在groups
中的所有任务执行结束后被执行。
来看一段代码:
callback := &tasks.Signature{ Name: "call", } group, err := tasks.NewGroup(signature1, signature2, signature3) if err != nil { log.Printf("Error creating group: %s", err.Error()) return } chord, err := tasks.NewChord(group, callback) if err != nil { log.Printf("Error creating chord: %s", err) return } chordAsyncResult, err := server.SendChordWithContext(context.Background(), chord, 0) if err != nil { log.Printf("Could not send chord: %s", err.Error()) return } results, err := chordAsyncResult.Get(time.Duration(time.Millisecond * 5)) if err != nil { log.Printf("Getting chord result failed with error: %s", err.Error()) return } log.Printf("%v\n", tasks.HumanReadableResults(results))
上面的例子并行执行task1、task2、task3,聚合它们的结果并将它们传递给callback任务。
chain
就是一个接一个执行的任务集,每个成功的任务都会触发chain
中的下一个任务。
看这样一段代码:
//chain chain,err := tasks.NewChain(signature1,signature2,signature3,callback) if err != nil { log.Printf("Error creating group: %s", err.Error()) return } chainAsyncResult, err := server.SendChainWithContext(context.Background(), chain) if err != nil { log.Printf("Could not send chain: %s", err.Error()) return } results, err := chainAsyncResult.Get(time.Duration(time.Millisecond * 5)) if err != nil { log.Printf("Getting chain result failed with error: %s", err.Error()) } log.Printf(" %v\n", tasks.HumanReadableResults(results))
上面的例子执行task1,然后是task2,然后是task3。当一个任务成功完成时,结果被附加到chain
中下一个任务的参数列表的末尾,最终执行callback
任务。
到此,相信大家对“machinery的功能有哪些”有了更深的了解,不妨来实际操作一番吧!这里是亿速云网站,更多相关内容可以进入相关频道进行查询,关注我们,继续学习!
亿速云「云服务器」,即开即用、新一代英特尔至强铂金CPU、三副本存储NVMe SSD云盘,价格低至29元/月。点击查看>>
免责声明:本站发布的内容(图片、视频和文字)以原创、转载和分享为主,文章观点不代表本网站立场,如果涉及侵权请联系站长邮箱:is@yisu.com进行举报,并提供相关证据,一经查实,将立刻删除涉嫌侵权内容。
原文链接:https://my.oschina.net/u/4113533/blog/4698819