温馨提示×

温馨提示×

您好,登录后才能下订单哦!

密码登录×
登录注册×
其他方式登录
点击 登录注册 即表示同意《亿速云用户服务条款》

Python3中如何实现定时任务

发布时间:2021-08-07 15:58:55 来源:亿速云 阅读:304 作者:Leah 栏目:编程语言

这篇文章给大家介绍Python3中如何实现定时任务,内容非常详细,感兴趣的小伙伴们可以参考借鉴,希望对大家能有所帮助。

1>定时更新微信token,需要2小时更新一次;

2>商品定时上线;

3>定时检测后台服务是否存活;

使用Python去实现这三个任务,这里需要使用定时相关知识点;

Python实现定点与定时任务方式比较多,找到下面四中实现方式,每个方式都有自己应用场景;下面来快速介绍Python中常用的定时任务实现方式:

1.循环+sleep;

2.线程模块中Timer类;

3.schedule模块;

4.定时框架:APScheduler

在开始之前先设定一个任务(这样不用依赖外部环境):

1:定时或者定点监测CPU与内存使用率;

2:将时间,CPU,内存使用情况保存到日志文件;

先来实现系统监测功能:

准备工作:安装psutil:pip install psutil

功能实现

#psutil:获取系统信息模块,可以获取CPU,内存,磁盘等的使用情况

import psutil

import time

import datetime

#logfile:监测信息写入文件

def MonitorSystem(logfile = None):

#获取cpu使用情况

cpuper = psutil.cpu_percent()

#获取内存使用情况:系统内存大小,使用内存,有效内存,内存使用率

mem = psutil.virtual_memory()

#内存使用率

memper = mem.percent

#获取当前时间

now = datetime.datetime.now()

ts = now.strftime('%Y-%m-%d %H:%M:%S')

line = f'{ts} cpu:{cpuper}%, mem:{memper}%'

print(line)

if logfile:

logfile.write(line)

代码运行结果:

2019-03-21 14:23:41 cpu:0.6%, mem:77.2%

接下来我们要实现定时监测,比如3s监测一下系统资源使用情况。

最简单使用方式:sleep

这种方式最简单,直接使用while+sleep就可以实现:

def loopMonitor():

while True:

MonitorSystem()

#2s检查一次

time.sleep(3)

loopMonitor()

输出结果:

2019-03-21 14:28:42 cpu:1.5%, mem:77.6%
2019-03-21 14:28:45 cpu:1.6%, mem:77.6%
2019-03-21 14:28:48 cpu:1.4%, mem:77.6%
2019-03-21 14:28:51 cpu:1.4%, mem:77.6%
2019-03-21 14:28:54 cpu:1.3%, mem:77.6%

这种方式存在问题:只能处理单个定时任务。

又来了新任务:需要每秒监测网络收发字节,代码实现如下:

def MonitorNetWork(logfile = None):

#获取网络收信息

netinfo = psutil.net_io_counters()

#获取当前时间

now = datetime.datetime.now()

ts = now.strftime('%Y-%m-%d %H:%M:%S')

line = f'{ts} bytessent={netinfo.bytes_sent}, bytesrecv={netinfo.bytes_recv}'

print(line)

if logfile:

logfile.write(line)

MonitorNetWork()

代码执行结果:

2019-03-21 14:47:21 bytessent=169752183, bytesrecv=1107900973

如果我们同时在while循环中监测两个任务会有等待问题,不能每秒监测网络情况。

Timer实现方式

timer最基本理解就是定时器,我们可以启动多个定时任务,这些定时器任务是异步执行,所以不存在等待顺序执行问题。

先来看Timer的基本使用:

导入:from threading import Timer

主要方法:

Timer方法说明Timer(interval, function, args=None, kwargs=None)创建定时器cancel()取消定时器start()使用线程方式执行join(self, timeout=None)等待线程执行结束

定时器只能执行一次,如果需要重复执行,需要重新添加任务;

我们先来看基本使用:

from threading import Timer

#记录当前时间

print(datetime.datetime.now())

#3S执行一次

sTimer = Timer(3, MonitorSystem)

#1S执行一次

nTimer = Timer(1, MonitorNetWork)

#使用线程方式执行

sTimer.start()

nTimer.start()

#等待结束

sTimer.join()

nTimer.join()

#记录结束时间

print(datetime.datetime.now())

输出结果:

2019-03-21 15:13:36.739798
2019-03-21 15:13:37 bytessent=171337324, bytesrecv=1109002349
2019-03-21 15:13:39 cpu:1.4%, mem:93.2%
2019-03-21 15:13:39.745187

可以看到,花费时间为3S,但是我们想要做的是每秒监控网络状态;如何处理。

Timer只能执行一次,所以执行完成之后需要再次添加任务,我们对代码进行修改:

from threading import Timer
import psutil
import time
import datetime
def MonitorSystem(logfile = None):
 cpuper = psutil.cpu_percent()
 mem = psutil.virtual_memory()
 memper = mem.percent
 now = datetime.datetime.now()
 ts = now.strftime('%Y-%m-%d %H:%M:%S')
 line = f'{ts} cpu:{cpuper}%, mem:{memper}%'
 print(line)
 if logfile:
 logfile.write(line)
 #启动定时器任务,每三秒执行一次
 Timer(3, MonitorSystem).start()
def MonitorNetWork(logfile = None):
 netinfo = psutil.net_io_counters()
 now = datetime.datetime.now()
 ts = now.strftime('%Y-%m-%d %H:%M:%S')
 line = f'{ts} bytessent={netinfo.bytes_sent}, bytesrecv={netinfo.bytes_recv}'
 print(line)
 if logfile:
 logfile.write(line)
 #启动定时器任务,每秒执行一次
 Timer(1, MonitorNetWork).start()
MonitorSystem()
MonitorNetWork()

执行结果:

2019-03-21 15:18:21 cpu:1.5%, mem:93.2%

2019-03-21 15:18:21 bytessent=171376522, bytesrecv=1109124678

2019-03-21 15:18:22 bytessent=171382215, bytesrecv=1109128294

2019-03-21 15:18:23 bytessent=171384278, bytesrecv=1109129702

2019-03-21 15:18:24 cpu:1.9%, mem:93.2%

2019-03-21 15:18:24 bytessent=171386341, bytesrecv=1109131110

2019-03-21 15:18:25 bytessent=171388527, bytesrecv=1109132600

2019-03-21 15:18:26 bytessent=171390590, bytesrecv=1109134008

从时间中可以看到,这两个任务可以同时进行不存在等待问题。

Timer的实质是使用线程方式去执行任务,每次执行完后会销毁,所以不必担心资源问题。

调度模块:schedule

schedule是一个第三方轻量级的任务调度模块,可以按照秒,分,小时,日期或者自定义事件执行时间;

安装方式:

pip install schedule

我们来看一个例子:

import datetime
import schedule
import time
def func():
 now = datetime.datetime.now()
 ts = now.strftime('%Y-%m-%d %H:%M:%S')
 print('do func time :',ts)
def func2():
 now = datetime.datetime.now()
 ts = now.strftime('%Y-%m-%d %H:%M:%S')
 print('do func2 time:',ts)
def tasklist():
 #清空任务
 schedule.clear()
 #创建一个按秒间隔执行任务
 schedule.every(1).seconds.do(func)
 #创建一个按2秒间隔执行任务
 schedule.every(2).seconds.do(func2)
 #执行10S
 for i in range(10):
 schedule.run_pending()
 time.sleep(1)
tasklist()

执行结果:

do func time : 2019-03-22 08:51:38

do func2 time: 2019-03-22 08:51:39

do func time : 2019-03-22 08:51:39

do func time : 2019-03-22 08:51:40

do func2 time: 2019-03-22 08:51:41

do func time : 2019-03-22 08:51:41

do func time : 2019-03-22 08:51:42

do func2 time: 2019-03-22 08:51:43

do func time : 2019-03-22 08:51:43

do func time : 2019-03-22 08:51:44

do func2 time: 2019-03-22 08:51:45

do func time : 2019-03-22 08:51:45

do func time : 2019-03-22 08:51:46

执行过程分析:

>1>因为在jupyter下执行,所以先将schedule任务清空;
>2>按时间间在schedule中隔添加任务;
>3>这里按照秒间隔添加func,按照两秒间隔添加func2;
>4>schedule添加任务后,需要查询任务并执行任务;
>5>为了防止占用资源,每秒查询到点任务,然后顺序执行;

第5个顺序执行怎么理解,我们修改func函数,里面添加time.sleep(2)

然后只执行func工作,输出结果:

do func time : 2019-03-22 09:00:59

do func time : 2019-03-22 09:01:02

do func time : 2019-03-22 09:01:05

可以看到时间间隔为3S,为什么不是1S?

因为这个按照顺序执行,func休眠2S,循环任务查询休眠1S,所以会存在这个问题。

在我们使用这种方式执行任务需要注意这种阻塞现象。

我们看下schedule模块常用使用方法:

#schedule.every(1)创建Job, seconds.do(func)按秒间隔查询并执行
schedule.every(1).seconds.do(func)
#添加任务按分执行
schedule.every(1).minutes.do(func)
#添加任务按天执行
schedule.every(1).days.do(func)
#添加任务按周执行
schedule.every().weeks.do(func)
#添加任务每周1执行,执行时间为下周一这一时刻时间
schedule.every().monday.do(func)
#每周1,1点15开始执行
schedule.every().monday.at("12:00").do(job)

这种方式局限性:如果工作任务回非常耗时就会影响其他任务执行。我们可以考虑使用并发机制配置这个模块使用。

任务框架APScheduler

APScheduler是Python的一个定时任务框架,用于执行周期或者定时任务,

可以基于日期、时间间隔,及类似于Linux上的定时任务crontab类型的定时任务;

该该框架不仅可以添加、删除定时任务,还可以将任务存储到数据库中,实现任务的持久化,使用起来非常方便。

安装方式:pip install apscheduler

apscheduler组件及简单说明:

1>triggers(触发器):触发器包含调度逻辑,每一个作业有它自己的触发器

2>job stores(作业存储):用来存储被调度的作业,默认的作业存储器是简单地把作业任务保存在内存中,支持存储到MongoDBRedis数据库中

3> executors(执行器):执行器用来执行定时任务,只是将需要执行的任务放在新的线程或者线程池中运行

4>schedulers(调度器):调度器是将其它部分联系在一起,对使用者提供接口,进行任务添加,设置,删除。

来看一个简单例子:

import time

from apscheduler.schedulers.blocking import BlockingScheduler

def func():

now = datetime.datetime.now()

ts = now.strftime('%Y-%m-%d %H:%M:%S')

print('do func time :',ts)

def func2():

#耗时2S

now = datetime.datetime.now()

ts = now.strftime('%Y-%m-%d %H:%M:%S')

print('do func2 time:',ts)

time.sleep(2)

def dojob():

#创建调度器:BlockingScheduler

scheduler = BlockingScheduler()

#添加任务,时间间隔2S

scheduler.add_job(func, 'interval', seconds=2, id='test_job1')

#添加任务,时间间隔5S

scheduler.add_job(func2, 'interval', seconds=3, id='test_job2')

scheduler.start()

dojob()

输出结果:

do func time : 2019-03-22 10:32:20
do func2 time: 2019-03-22 10:32:21
do func time : 2019-03-22 10:32:22
do func time : 2019-03-22 10:32:24
do func2 time: 2019-03-22 10:32:24
do func time : 2019-03-22 10:32:26

输出结果中可以看到:任务就算是有延时,也不会影响其他任务执行。

APScheduler框架提供丰富接口去实现定时任务,可以去参考官方文档去查看使用方式。

最后选择:

简单总结上面四种定时定点任务实现:

1:循环+sleep方式适合简答测试,

2:timer可以实现定时任务,但是对定点任务来说,需要检查当前时间点;

3:schedule可以定点定时执行,但是需要在循环中检测任务,而且存在阻塞;

4:APScheduler框架更加强大,可以直接在里面添加定点与定时任务;

关于Python3中如何实现定时任务就分享到这里了,希望以上内容可以对大家有一定的帮助,可以学到更多知识。如果觉得文章不错,可以把它分享出去让更多的人看到。

向AI问一下细节

免责声明:本站发布的内容(图片、视频和文字)以原创、转载和分享为主,文章观点不代表本网站立场,如果涉及侵权请联系站长邮箱:is@yisu.com进行举报,并提供相关证据,一经查实,将立刻删除涉嫌侵权内容。

AI