目录
multiprocessing模块:... 1
多进程举例:... 2
multiprocessing.Pool,进程池:... 3
concurrent包:... 3
多进程:
由于py的GIL,多线程未必是cpu密集型程序好的选择;
多进程,可在完全独立的进程环境中运行程序,可充分利用多处理器;
但进程本身的隔离带来的数据不共享是个问题;
线程比进程轻量级;
Process类:
遵循了Thread类的API,减少了学习难度;
多进程,一定要在__main__()中,否则抛错;
p=multiprocessing.Process()
p.pid,进程ID;
p.exitcode,进程的退出状态码;
p.terminate(),终止指定的进程;
进程间同步:
提供了和线程同步一样的类,使用的方法一样,使用的效果也类似;
不过,进程间代价要高于线程,而且底层实现不同,只不过py屏蔽了这些,让用户能简单使用(py提供的库抹平了它们之间的差别,为方便使用);
multiprocessing还提供了共享内存、服务器进程来共享数据;还提供了Queue、Pipe(上一个进程的标准输出到下一个进程的标准输入)用来进程间通信;
Queue.queue是线程级别的锁;
multiprocessing.Queue可跨进程用,用的少;此时应用第三方工具MQ,单机进程间通信用的也少,一般应用都是跨节点的进程间通信,即RPC;RPC框架很多,如swfit、dubbo;
通信方式不同:
多进程就是启动多个解释器进程,进程间通信必须序列化、反序列化;
数据的线程安全性问题,由于每个进程中没有实现多线程,GIL可以说没用了;
进程池:
只允许使用计算机这么多资源(不能抢占计算机其它资源);
很多进程要反复创建的情形下,用进程池;
多进程、多线程的选择:
CPU密集型,cpython中使用GIL,多线程时锁相互竞争,多核优势不能发挥,用py多进程(多个解释器进程)效率更高;
IO密集型,适合用多线程,减少IO序列化开销,且在IO等待时,切换到其它线程继续执行,效率不错;
应用场景:
请求-应答模型;
web应用中常见的处理模型,用多进程+多线程;
如,nginx工作模式:
master启动多个worker工作进程,一般和cpu数目相同;
worker中启动多线程,提高并发处理能力,worker处理用户的请求,往往需要等待数据;
nginx应就地本地编译(本地指令集,甚至用指定cpu(如intel)编译器编译,这样可优化指令,性能更高);
例:
def calc(i):
sum = 0
for _ in range(100000000):
sum += 1
if __name__ == '__main__': #使用多进程,必须要在main中执行,否则报错
start = datetime.datetime.now()
lst = []
for i in range(5):
p = multiprocessing.Process(target=calc, args=(i,), name='p-{}'.format(i))
p.start()
lst.append(p)
for p in lst:
p.join()
delta = (datetime.datetime.now()-start).total_seconds()
print(delta) #win上查看,有5个py进程
输出:
20.037146
例,进程池:
def calc(i):
sum = 0
for _ in range(100000000):
sum += 1
if __name__ == '__main__':
start = datetime.datetime.now()
pool = multiprocessing.Pool(5)
for i in range(5):
pool.apply_async(calc,args=(i,))
pool.close() #重要,要有此步
pool.join()
delta = (datetime.datetime.now()-start).total_seconds()
print(delta)
目前仅有一个concurrent.futures,3.2引入;
异步并行任务编程模块,提供一个高级的异步可执行的便利接口;
提供了2个池执行器:
ThreadPoolExecutor,异步调用的线程池的Executor;
ProcessPoolExecutor,异步调用的进程池的Executor;
from conncurrent import futures
executor=futures.ThreadPoolExecutor(max_workers=1),池中至多创建max_workers个线程来同时异步执行,默认1个,返回Executor实例;
f=executor.submit(fn,*args,**kwargs),提交执行的函数及其参数,返回Futures实例;
executor.shutdown(wait=True),清理池;
Future类(submit()的实例):
f=executor.submit(work,2)
f.result(),可查看调用的返回的结果,函数的return结果;
f.done(),如果调用被成功的取消或执行完成,返回True,是标志,注意不是函数的返回值;
f.cancelled(),如果调用被成功的取消,返回True;
f.running(),如果正在运行且不能被取消,返回True;
f.cancel(),尝试取消调用,如果已经执行且不能取消,返回False,否则返回True;
f.result(timeout=None),取返回的结果,超时为None,一直等待返回;
f.exception(timeout=None),取返回的异常,超时为None,一直等待返回;
支持上下文管理:
futures.ThreadPoolExecutor继承自_base.Executor,父类有__enter__()和__exit__()方法;
例:
with futures.ThreadPoolExecutor(max_workers=1) as executor:
future = executor.submit(pow, 2, 3)
print(future.result())
输出:
8
总结:
统一了线程池、进程池调用,简化了编程;
是py简单的思想哲学的体现;
唯一的缺点,无法设置线程名称;
例,多线程:
def work(n):
logging.info('working-{}'.format(n))
time.sleep(5)
logging.info('end-work-{}'.format(n))
executor = futures.ThreadPoolExecutor(3)
fs = []
for i in range(3):
f = executor.submit(work, i)
fs.append(f)
for i in range(3):
f = executor.submit(work, i)
fs.append(f)
for i in range(3):
f = executor.submit(work, i)
fs.append(f)
while True:
time.sleep(2)
logging.info(threading.enumerate())
flag = True
for f in fs:
flag = flag and f.done()
if flag:
executor.shutdown()
logging.info(threading.enumerate())
break
例,多进程:
def work(n):
logging.info('working-{}'.format(n))
time.sleep(5)
logging.info('end-work-{}'.format(n))
if __name__ == '__main__':
executor = futures.ProcessPoolExecutor(3)
fs = []
for i in range(3):
f = executor.submit(work, i)
fs.append(f)
for i in range(3):
f = executor.submit(work, i)
fs.append(f)
for i in range(3):
f = executor.submit(work, i)
fs.append(f)
while True:
time.sleep(2)
logging.info(threading.enumerate())
flag = True
for f in fs:
flag = flag and f.done()
if flag:
executor.shutdown()
# logging.info(threading.enumerate()) #多进程用不上
break
免责声明:本站发布的内容(图片、视频和文字)以原创、转载和分享为主,文章观点不代表本网站立场,如果涉及侵权请联系站长邮箱:is@yisu.com进行举报,并提供相关证据,一经查实,将立刻删除涉嫌侵权内容。