当使用多个线程操作任务的时候,如果线程间有需要通信的地方,那么不可避免的要实现到线程间的通信,来互相通知消息,同步任务的执行。
1.线程threading共享内存地址,进程与进程Peocess之间相互独立,互不影响(相当于深拷贝);
2.在线程间通信的时候可以使用Queue模块完成,进程间通信也可以通过Queue完成,但是此Queue并非线程的Queue,进程间通信Queue是将数据 pickle 后传给另一个进程的 Queue,用于父进程与子进程之间的通信或同一父进程的子进程之间通信;
queue
python中的queue模块其实是对数据结构中栈和队列这种数据结构的封装,把抽象的数据结构封装成类的属性和方法
1 2 3 4 5 | #导入线程相关模块 import threading import queue
q = queue.Queue() |
1 2 3 4 5 | # 导入进程相关模块 from multiprocessing import Process from multiprocessing import Queue
q = Queue() |
1 2 3 4 5 | # 导入进程相关模块 from multiprocessing import Process from multiprocessing import Pipe
pipe = Pipe() |
python提供了多种进程通信的方式,主要Queue和Pipe这两种方式,Queue用于多个进程间实现通信,Pipe用于两个进程的通信;
put():以插入数据到队列中,他还有两个可选参数:blocked和timeout。详情自行百度
get():从队列读取并且删除一个元素。同样,他还有两个可选参数:blocked和timeout。详情自行百度
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 | # !usr/bin/env python # -*- coding:utf-8 _*- """ @Author:何以解忧 @Blog(个人博客地址): shuopython.com @WeChat Official Account(微信公众号):猿说python @Github:www.github.com
@File:python_process_queue.py @Time:2019/12/21 21:25
@Motto:不积跬步无以至千里,不积小流无以成江海,程序人生的精彩需要坚持不懈地积累! """
from multiprocessing import Process from multiprocessing import Queue import os,time,random
#写数据进程执行的代码 def proc_write(q,urls): print ('Process is write....') for url in urls: q.put(url) print ('put %s to queue... ' %url) time.sleep(random.random())
#读数据进程的代码 def proc_read(q): print('Process is reading...') while True: url = q.get(True) print('Get %s from queue' %url)
if __name__ == '__main__': #父进程创建Queue,并传给各个子进程 q = Queue() proc_write1 = Process(target=proc_write,args=(q,['url_1','url_2','url_3'])) proc_write2 = Process(target=proc_write,args=(q,['url_4','url_5','url_6'])) proc_reader = Process(target=proc_read,args=(q,)) #启动子进程,写入 proc_write1.start() proc_write2.start()
proc_reader.start() #等待proc_write1结束 proc_write1.join() proc_write2.join() #proc_raader进程是死循环,强制结束 proc_reader.terminate() print("mian") |
输出结果:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 | Process is write.... put url_1 to queue... Process is write.... put url_4 to queue... Process is reading... Get url_1 from queue Get url_4 from queue put url_5 to queue... Get url_5 from queue put url_2 to queue... Get url_2 from queue put url_3 to queue... Get url_3 from queue put url_6 to queue... Get url_6 from queue mian |
Pipe常用于两个进程,两个进程分别位于管道的两端 * Pipe方法返回(conn1,conn2)代表一个管道的两个端,Pipe方法有duplex参数,默认为True,即全双工模式,若为FALSE,conn1只负责接收信息,conn2负责发送,Pipe同样也包含两个方法:
send() : 发送信息;
recv() : 接收信息;
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 | from multiprocessing import Process from multiprocessing import Pipe import os,time,random #写数据进程执行的代码 def proc_send(pipe,urls): #print 'Process is write....' for url in urls:
print ('Process is send :%s' %url) pipe.send(url) time.sleep(random.random())
#读数据进程的代码 def proc_recv(pipe): while True: print('Process rev:%s' %pipe.recv()) time.sleep(random.random())
if __name__ == '__main__': #父进程创建pipe,并传给各个子进程 pipe = Pipe() p1 = Process(target=proc_send,args=(pipe[0],['url_'+str(i) for i in range(10) ])) p2 = Process(target=proc_recv,args=(pipe[1],)) #启动子进程,写入 p1.start() p2.start()
p1.join() p2.terminate() print("mian") |
输出结果:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 | Process is send :url_0 Process rev:url_0 Process is send :url_1 Process rev:url_1 Process is send :url_2 Process rev:url_2 Process is send :url_3 Process rev:url_3 Process is send :url_4 Process rev:url_4 Process is send :url_5 Process is send :url_6 Process is send :url_7 Process rev:url_5 Process is send :url_8 Process is send :url_9 Process rev:url_6 mian |
当然我们也可以尝试使用线程threading的Queue是否能完成线程间通信,示例代码如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 | from multiprocessing import Process # from multiprocessing import Queue # 进程间通信Queue,两者不要混淆 import queue # 线程间通信queue.Queue,两者不要混淆 import time
def p_put(q,*args): q.put(args) print('Has put %s' % args)
def p_get(q,*args): print('%s wait to get...' % args)
print(q.get()) print('%s got it' % args)
if __name__ == "__main__": q = queue.Queue() p1 = Process(target=p_put, args=(q,'p1', )) p2 = Process(target=p_get, args=(q,'p2', )) p1.start() p2.start() |
直接异常报错:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 | Traceback (most recent call last): File "E:/Project/python_project/untitled10/123.py", line 38, in <module> p1.start() File "G:\ProgramData\Anaconda3\lib\multiprocessing\process.py", line 105, in start self._popen = self._Popen(self) File "G:\ProgramData\Anaconda3\lib\multiprocessing\context.py", line 223, in _Popen return _default_context.get_context().Process._Popen(process_obj) File "G:\ProgramData\Anaconda3\lib\multiprocessing\context.py", line 322, in _Popen return Popen(process_obj) File "G:\ProgramData\Anaconda3\lib\multiprocessing\popen_spawn_win32.py", line 65, in __init__ reduction.dump(process_obj, to_child) File "G:\ProgramData\Anaconda3\lib\multiprocessing\reduction.py", line 60, in dump ForkingPickler(file, protocol).dump(obj) TypeError: can't pickle _thread.lock objects |
免责声明:本站发布的内容(图片、视频和文字)以原创、转载和分享为主,文章观点不代表本网站立场,如果涉及侵权请联系站长邮箱:is@yisu.com进行举报,并提供相关证据,一经查实,将立刻删除涉嫌侵权内容。