温馨提示×

温馨提示×

您好,登录后才能下订单哦!

密码登录×
登录注册×
其他方式登录
点击 登录注册 即表示同意《亿速云用户服务条款》

celery (芹菜) 异步任务

发布时间:2020-07-16 22:57:49 阅读:515 作者:大牙啊 栏目:编程语言
开发者测试专用服务器限时活动,0元免费领,库存有限,领完即止! 点击查看>>

第1章 celery

1.1什么是celery

Celery是一个简单灵活,且可靠,处理大量消息的分布式系统

python写的,用来执行定时任务和异步任务的框架

1.2celery的组成

消息中间àbroker

任务执行单元àworker

任务执行结果存储àstore

1.3使用场景

异步任务:将耗时操作任务交给celery去异步执行,比如发送短信/邮件,消息推送,音视频处理等

定时任务:定时执行某件事情,比如每天的数据统计

第2章 celery的安装配置

pip install celery

消息中间件:rabbitmq/redis,这里使用redis

docker run -p 6379:6379 --name=redis -d redis:latest redis-server

第3章 celery执行异步任务

3.1基本使用

定义任务:

from celery import Celery
import time

任务产生存储
broker = 'redis://10.211.55.8:16379/1'
任务执行结果存储
backend = 'redis://10.211.55.8:16379/2'
第一个参数是别名,随便写即可
app = Celery('first_celery'backend=backendbroker=broker)

@
app.task
def test_celery(xy):
    time.sleep(
2)
    
return x + y

添加任务:

import task

if __name__ == '__main__':
    
# result不是函数的执行结果,是一个对象
    
result = tasks.add.delay(2,3)
    
print(result.id)

使用celery命令开启work进程

celery –A tasks worker –l info

3.2多任务结构:

当有多个任务需要执行时,会有多个任务函数,位了方便解藕,我们可以使用这种方式,方便管理

celery.py

from celeryimport Celery

broker = 
'redis://10.211.55.8:16379/2'
backend = 'redis://10.211.55.8:16379/3'
# include包含两个任务的函数,去相应文件中找对应的任务函数
cel = Celery('test'broker=brokerbackend=backendinclude=['celery_task.task''celery_task.task2'])

时区,有时候我们想在固定的时间来执行相关任务,就需要设置时区相关配置
cel.conf.timezone = 'Asia/Shanghai'
是否UTC
cel.conf.enable_utc = False

task.py

from .celeryimport app

@
app.task
def sum(xy):
    
returnx+y

task2.py

from .celeryimport app

@
app.task
def less(xy):
    
returnx-y

celery_task目录下创建添加任务的py文件

from celery_taskimport task
from celery_task import task2

task.sum.delay(
12)
task2.less.delay(
31)

第4章 celery执行定时任务

4.1设定时间让celery执行一个任务

from celery_taskimport task
from celery_task import task2
from datetime import datetimetimedelta

# # 方式一:
# #
获取时间对象
# time = datetime(2019, 8, 3, 15, 0, 0)
# print(time)
# # 
将时间转换成UTC时间
# utc_time = datetime.utcfromtimestamp(time.timestamp())
# print(utc_time)
# # 
apply_asyncargs是函数参数,eta是指定时间
# result = task.add.apply_async(args=[1, 2], eta=utc_time)

方式二:
ctime = datetime.now()
把当前时间转换成UTC时间
utc_time = datetime.utcfromtimestamp(ctime.utcfromtimestamp())
当前时间延迟十秒
time_delta = timedelta(seconds=10)
task_time = utc_time + time_delta
result = task.add.apply_async(
args=[23]eta=task_time)


# task.sum.delay(1, 2)
# task2.less.delay(3, 1)

4.2类似crontab的定时任务

 多任务结构中celery.py文件如下

from celeryimport Celery
from datetime import timedelta
from celery.schedules import crontab

broker = 
'redis://10.211.55.8:16379/2'
backend = 'redis://10.211.55.8:16379/3'

# include包含两个任务的函数,去相应文件中找对应的任务函数
app = Celery('test'broker=brokerbackend=backendinclude=['celery_task.task''celery_task.task2'])
时区
app.conf.timezone = 'Asia/Shanghai'
是否UTC
app.conf.enable_utc = False

app.conf.beat_schedules = {
    
#别名,可随意编写
    
'add_crontab': {
        
#执行task下的add函数
        
'task''celery_task.task.add',
        
#每隔两秒执行
        
'schedules': timedelta(seconds=2),

        
#固定时间执行,每年411号,842分执行
        #'schedules': crontab(minute=42, hour=8, day_of_month=11, month_of_year=4),

        #
传递参数
        
'args': [12],
    
}
}

第5章 Django中使用celery

在项目的目录下创建celeryconfig.py

import djcelery
djcelery.setup_loader()
CELERY_IMPORTS=(
    
'app01.tasks',
)
#有些情况可以防止死锁
CELERYD_FORCE_EXECV=True
设置并发worker数量
CELERYD_CONCURRENCY=4
#允许重试
CELERY_ACKS_LATE=True
每个worker最多执行100个任务被销毁,可以防止内存泄漏
CELERYD_MAX_TASKS_PER_CHILD=100
超时时间
CELERYD_TASK_TIME_LIMIT=12*30

app目录下创建tasks.py

from celeryimport task

@
task
def add(xy):
    
returnx+y

试图函数views中使用

from django.shortcutsimport render,HttpResponse
from app01.tasks import add
from datetime import datetimetimedelta
def test(request):
    
# result=add.delay(2,3)
    
ctime = datetime.now()
    
默认用utc时间
    
utc_ctime = datetime.utcfromtimestamp(ctime.timestamp())
    time_delay= timedelta(
seconds=5)
    task_time = utc_ctime + time_delay
    result = add.apply_async(
args=[43]eta=task_time)
    
print(result.id)
    
returnHttpResponse('ok')

settings中注册

INSTALLED_APPS= [
    
'django.contrib.admin',
    
'django.contrib.auth',
    
'django.contrib.contenttypes',
    
'django.contrib.sessions',
    
'django.contrib.messages',
    
'django.contrib.staticfiles',
    
'djcelery',
    
'app01',
]

from djceleryimport celeryconfig

默认使用rabbitmqredis的话需要指定下
BROKER_BACKEND=
'redis'
BROKER_URL='redis://127.0.0.1:6379/1'
CELERY_RESULT_BACKEND='redis://127.0.0.1:6379/2'

亿速云「云服务器」,即开即用、新一代英特尔至强铂金CPU、三副本存储NVMe SSD云盘,价格低至29元/月。点击查看>>

向AI问一下细节

免责声明:本站发布的内容(图片、视频和文字)以原创、转载和分享为主,文章观点不代表本网站立场,如果涉及侵权请联系站长邮箱:is@yisu.com进行举报,并提供相关证据,一经查实,将立刻删除涉嫌侵权内容。

AI

开发者交流群×