这篇文章主要介绍了如何通过celery_one避免Celery定时任务重复执行的问题,具有一定借鉴价值,感兴趣的朋友可以参考下,希望大家阅读完这篇文章之后大有收获,下面让小编带着大家一起了解一下。
在使用Celery统计每日访问数量的时候,发现一个任务会同时执行两次,发现同一时间内(1s内)竟然同时发送了两次任务,也就是同时产生了两个worker,造成统计两次,一直找不到原因。
参考:https://blog.csdn.net/qq_41333582/article/details/83899884
有人使用 Redis 实现了分布式锁,然后也有人使用了 Celery Once。
Celery Once 也是利用 Redis 加锁来实现, Celery Once 在 Task 类基础上实现了 QueueOnce 类,该类提供了任务去重的功能,所以在使用时,我们自己实现的方法需要将 QueueOnce 设置为 base
@task(base=QueueOnce, once={'graceful': True})
后面的 once 参数表示,在遇到重复方法时的处理方式,默认 graceful 为 False,那样 Celery 会抛出 AlreadyQueued 异常,手动设置为 True,则静默处理。
另外如果要手动设置任务的 key,可以指定 keys 参数
@celery.task(base=QueueOnce, once={'keys': ['a']}) def slow_add(a, b): sleep(30) return a + b
Celery One允许你将Celery任务排队,防止多次执行
安装
pip install -U celery_once
要求,需要Celery4.0,老版本可能运行,但不是官方支持的。
使用celery_once,tasks需要继承一个名为QueueOnce的抽象base tasks
Once安装完成后,需要配置一些关于ONCE的选项在Celery配置中
from celery import Celery from celery_once import QueueOnce from time import sleep celery = Celery('tasks', broker='amqp://guest@localhost//') # 一般之前的配置没有这个,需要添加上 celery.conf.ONCE = { 'backend': 'celery_once.backends.Redis', 'settings': { 'url': 'redis://localhost:6379/0', 'default_timeout': 60 * 60 } } # 在原本没有参数的里面加上base @celery.task(base=QueueOnce) def slow_task(): sleep(30) return "Done!"
要确定配置,需要取决于使用哪个backend进行锁定,查看Backends
在后端,这将覆盖apply_async和delay。它不影响直接调用任务。
在运行任务时,celery_once检查是否没有锁定(针对Redis键)。否则,任务将正常运行。一旦任务完成(或由于异常而结束),锁将被清除。如果在任务完成之前尝试再次运行该任务,将会引发AlreadyQueued异常。
example.delay(10)
example.delay(10)
Traceback (most recent call last):
..
AlreadyQueued()
result = example.apply_async(args=(10))
result = example.apply_async(args=(10))
Traceback (most recent call last):
..
AlreadyQueued()
graceful:如果在任务的选项中设置了once={'graceful': True},或者在运行时设置了apply_async,则任务可以返回None,而不是引发AlreadyQueued异常。
from celery_once import AlreadyQueued # Either catch the exception, try: example.delay(10) except AlreadyQueued: pass # Or, handle it gracefully at run time. result = example.apply(args=(10), once={'graceful': True}) # or by default. @celery.task(base=QueueOnce, once={'graceful': True}) def slow_task(): sleep(30) return "Done!"
感谢你能够认真阅读完这篇文章,希望小编分享的“如何通过celery_one避免Celery定时任务重复执行的问题”这篇文章对大家有帮助,同时也希望大家多多支持亿速云,关注亿速云行业资讯频道,更多相关知识等着你来学习!
免责声明:本站发布的内容(图片、视频和文字)以原创、转载和分享为主,文章观点不代表本网站立场,如果涉及侵权请联系站长邮箱:is@yisu.com进行举报,并提供相关证据,一经查实,将立刻删除涉嫌侵权内容。