本篇内容主要讲解“python concurrent.futures模块的测试方法”,感兴趣的朋友不妨来看看。本文介绍的方法操作简单快捷,实用性强。下面就让小编来带大家学习“python concurrent.futures模块的测试方法”吧!
concurrent.futures 是 3.2 中引入的新模块,它为异步执行可调用对象提供了高层接口。
可以使用 ThreadPoolExecutor 来进行多线程编程,ProcessPoolExecutor 进行多进程编程,两者实现了同样的接口,这些接口由抽象类 Executor 定义。
这个模块提供了两大类型,一个是执行器类 Executor,另一个是 Future 类。
执行器用来管理工作池,future 用来管理工作计算出来的结果,通常不用直接操作 future 对象,因为有丰富的 API。
Python3.2开始,标准库为我们提供了concurrent.futures模块,它提供了ThreadPoolExecutor和ProcessPoolExecutor两个类,实现了对threading和multiprocessing的进一步抽象,对编写线程池/进程池提供了直接的支持.
#! /usr/bin/env python
# -*- coding: utf-8 -*-#
# -------------------------------------------------------------------------------
# Name: demo3
# Author: yunhgu
# Date: 2021/7/8 15:17
# Description:
# -------------------------------------------------------------------------------
import os
import time
import threading
from concurrent.futures import ProcessPoolExecutor, ThreadPoolExecutor, as_completed
def work(x):
time.sleep(1)
temp = f"父进程{os.getppid()}:子进程{os.getpid()}:线程{threading.get_ident()}:{x}"
return temp
def sub_thread():
temp_list = []
with ThreadPoolExecutor(max_workers=3) as t:
task_list = [t.submit(work, i) for i in range(5)]
for task in as_completed(task_list):
if task.done():
temp_list.append(task.result())
return temp_list
def main():
print(f"主进程:{os.getpid()}")
path_list = []
with ProcessPoolExecutor(max_workers=3) as p:
task_list = [p.submit(sub_thread) for i in range(5)]
for task in as_completed(task_list):
if task.done():
path_list.append(task.result())
for path in path_list:
print(path)
if __name__ == '__main__':
main()
不论你在什么时候开始,重要的是开始之后就不要停止。不论你在什么时候结束,重要的是结束之后就不要悔恨。
到此,相信大家对“python concurrent.futures模块的测试方法”有了更深的了解,不妨来实际操作一番吧!这里是亿速云网站,更多相关内容可以进入相关频道进行查询,关注我们,继续学习!
亿速云「云服务器」,即开即用、新一代英特尔至强铂金CPU、三副本存储NVMe SSD云盘,价格低至29元/月。点击查看>>
免责声明:本站发布的内容(图片、视频和文字)以原创、转载和分享为主,文章观点不代表本网站立场,如果涉及侵权请联系站长邮箱:is@yisu.com进行举报,并提供相关证据,一经查实,将立刻删除涉嫌侵权内容。