这篇文章主要讲解了“python多进程和多线程的实际用法”,文中的讲解内容简单清晰,易于学习与理解,下面请大家跟着小编的思路慢慢深入,一起来研究和学习“python多进程和多线程的实际用法”吧!
写在前面
总所周知,unix/linux 为多任务操作系统,,即可以支持远大于CPU数量的任务同时运行
理解多任务就需要知道操作系统的CPU上下文:
首先,我们都知道cpu一个时间段其实只能运行单个任务,只不过在很短的时间内,CPU快速切换到不同的任务进行执行,造成一种多任务同时执行的错觉
而在CPU切换到其他任务执行之前,为了确保在切换任务之后还能够继续切换回原来的任务继续执行,并且看起来是一种连续的状态,就必须将任务的状态保持起来,以便恢复原始任务时能够继续之前的状态执行,状态保存的位置位于CPU的寄存器和程序计数器(,PC)
简单来说寄存器是CPU内置的容量小、但速度极快的内存,用来保存程序的堆栈信息即数据段信息。程序计数器保存程序的下一条指令的位置即代码段信息。
所以,CPU上下文就是指CPU寄存器和程序计数器中保存的任务状态信息;CPU上下文切换就是把前一个任务的CPU上下文保存起来,然后加载下一个任务的上下文到这些寄存器和程序计数器,再跳转到程序计数器所指示的位置运行程序。
python程序默认都是执行单任务的进程,也就是只有一个线程。如果我们要同时执行多个任务怎么办?
有两种解决方案:
一种是启动多个进程,每个进程虽然只有一个线程,但多个进程可以一块执行多个任务。
还有一种方法是启动一个进程,在一个进程内启动多个线程,这样,多个线程也可以一块执行多个任务。
当然还有第三种方法,就是启动多个进程,每个进程再启动多个线程,这样同时执行的任务就更多了,当然这种模型更复杂,实际很少采用。
Python中的多进程
在Unix/Linux系统中,提供了一个fork()函数调用,相较于普通函数调用一次,返回一次的机制,fork()调用一次,返回两次,具体表现为操作系统自动把当前进程(称为父进程)复制了一份(称为子进程),然后分别在父进程和子进程内返回。
子进程永远返回0,而父进程返回子进程的ID,这样一个父进程可以轻松fork出很多子进程。且父进程会记下每个子进程的ID,而子进程只需要调用getppid()就可以拿到父进程的ID。
python的os模块封装了fork调用方法以实现在python程序中创建子进程,下面具体看两个例子:
[root@test-yw-01 opt]# cat test.py
import os
print('Process ({}) start...'.format(os.getpid()))
pid = os.fork()
print(pid)
[root@test-yw-01 opt]# python3 test.py
Process (26620) start...
26621
0
[root@test-yunwei-01 opt]# cat process.py
import os
print('Process ({}) start...'.format(os.getpid()))
pid = os.fork()
if pid == 0:
print('The child process is {} and parent process is{}'.format(os.getpid(),os.getppid()))
else:
print('I (%s) just created a child process (%s).' % (os.getpid(), pid))
[root@test-yunwei-01 opt]# pyhton3 process.py
Process (25863) start...
I (25863) just created a child process (25864)
The child process is 25864 and parent process is 25863
通过fork调用这种方法,一个进程在接到新任务时就可以复制出一个子进程来处理新任务,例如nginx就是由父进程(master process)监听端口,再fork出子进程(work process)来处理新的http请求。
注意:
Windows没有fork调用,所以在window pycharm上运行以上代码无法实现以上效果。
multiprocessing模块
虽然Windows没有fork调用,但是可以凭借multiprocessing该多进程模块所提供的Process类来实现。
下面看一例子:
首先模拟一个使用单进程的下载任务,并打印出进程号
1)单进程执行:
import os
from random import randint
import time
def download(filename):
print("进程号是:%s"%os.getpid())
downloadtime = 3
print('现在开始下载:{}'.format(filename))
time.sleep(downloadtime)
def runtask():
start_time = time.time()
download('水浒传')
download('西游记')
stop_time = time.time()
print('下载耗时:{}'.format(stop_time - start_time))
if __name__ == '__main__':
runtask()
得出结果
接着通过调用Process模拟开启两个子进程:
import time
from os import getpid
from multiprocessing import Process
def download(filename):
print("进程号是:%s"%getpid())
downloadtime = 3
print('现在开始下载:{}'.format(filename))
time.sleep(downloadtime)
def runtask():
start_time = time.time()
task1 = Process(target=download,args=('西游记',))
task1.start() #调用start()开始执行
task2 = Process(target=download,args=('水浒传',))
task2.start()
task1.join() # join()方法可以等待子进程结束后再继续往下运行,通常用于进程间的同步
task2.join()
stop_time = time.time()
print('下载耗时:{}'.format(stop_time - start_time))
if __name__ == '__main__':
runtask()
连接池Pool
可以用进程池Pool批量创建子进程的方式来创建大量工作子进程
import os
from random import randint
from multiprocessing import Process,Pool
import time
def download(taskname):
print("进程号是:%s"%os.getpid())
downloadtime = randint(1,3)
print('现在开始下载:{}'.format(taskname))
time.sleep(downloadtime)
def runtask():
start_time = time.time()
pool = Pool(4) #定义进程连接池可用连接数量
for task in range(5):
pool.apply_async(download,args=(task,))
pool.close()
pool.join()
stop_time = time.time()
print('完成下载,下载耗时:{}'.format(stop_time - start_time))
if __name__ == '__main__':
runtask()
需要注意的点:
对pool对象调用join()方法会等待所有子进程执行完毕,调用join()之前必须先调用close(),调用close()之后就不能继续添加新的进程
pool的默认大小是主机CPU的核数,所以这里设置成4个进程,这样就避免了cpu关于进程间切换带来的额外资源消耗,提高了任务的执行效率
进程间通信
from multiprocessing.Queue import Queue
相较于普通Queue普通的队列的先进先出模式,get方法会阻塞请求,直到有数据get出来为止。这个是多进程并发的Queue队列,用于解决多进程间的通信问题。
from multiprocessing import Process, Queue
import os, time, random
datas = []
def write_data(args):
print('Process to write: %s' % os.getpid())
for v in "helloword":
datas.append(v)
print("write {} to queue".format(v))
args.put(v)
time.sleep(random.random())
print(datas)
def read_data(args):
print('Process to read: %s' % os.getpid())
while True:
value = args.get(True)
print("read {} from queue".format(value))
if __name__ == '__main__':
queue = Queue()
write = Process(target=write_data,args=(queue,))
read = Process(target=read_data,args=(queue,))
write.start()
read.start()
write.join()
read.terminate()
进程池中使用队列
由于队列对象不能在父进程与子进程间通信,所以需要使用Manager().Queue()才能实现队列中各子进程间进行通信
from multiprocessing import Manager
if __name__=='__main__':
manager = multiprocessing.Manager()
# 父进程创建Queue,并传给各个子进程:
queue = manager.Queue()
pool = Pool()
write = Process(target=write_data,args=(queue,))
read = Process(target=read_data,args=(queue,))
write.start()
read.start()
write.join()
read.terminate()
如果是用进程池,就需要使用Manager().Queue()队列才能实现在各子进程间进行通信
参考文档: https://blog.csdn.net/qq_32446743/article/details/79785684
https://blog.csdn.net/u013713010/article/details/53325438
Python中的多线程
相较于资源分配的基本单位进程,线程是任务运行调度的基本单位,且由于每一个进程拥有自己独立的内存空间,而线程共享所属进程的内存空间,所以在涉及多任务执行时,线程的上下文切换比进程少了操作系统内核将虚拟内存资源即寄存器中的内容切换出这一步骤,也就大大提升了多任务执行的效率。
首先需要明确几个概念:
1.当一个进程启动之后,会默认产生一个主线程,因为线程是程序执行流的最小单元,当设置多线程时,主线程会创建多个子线程,在python中,默认情况下(其实就是setDaemon(False)),主线程执行完自己的任务以后,就退出了,此时子线程会继续执行自己的任务,直到自己的任务结束
python的提供了关于多线程的threading模块,和多进程的启动类似,就是把函数传入并创建Thread实例,然后调用start()开始执行:
import os
import random
from threading import Thread
import threading
import time
def mysql_dump():
print('开始执行线程{}'.format(threading.current_thread().name)) #返回当前线程实例名称
dumptime = random.randint(1,3)
time.sleep(dumptime) #利用time.sleep()方法模拟备份数据库所花费时间
class Mutil_thread(Thread):
def runtask(slef):
thread_list = []
print('当前线程的名字是: ', threading.current_thread().name)
start_time = time.time()
for t in range(5):
task = Mutil_thread(target=slef.tasks)
thread_list.append(task)
for i in thread_list:
# i.setDaemon(False)
i.start()
i.join()#join()所完成的工作就是线程同步,即主线程任务结束之后,进入阻塞状态,一直等待其他的子线程执行结束之后,主线程在终止
stop_time = time.time()
print('主线程结束!', threading.current_thread().name)
print('下载耗时:{}'.format(stop_time - start_time))
if __name__ == '__main__':
run = Mutil_thread()
run.tasks = mysql_dump
run.runtask()
执行结果:
进程默认就会启动一个线程,我们把该线程称为主线程,实例的名为MainThread,主线程又可以启动新的线程,线程命名依次为Thread-1,Thread-2…
LOCK
多线程不同于进程,在多进程中,例如针对同一个变量,各自有一份拷贝存在于每个进程中,资源相互隔离,互不影响
但在进程中,线程间可以共享进程像系统申请的内存空间,虽然实现多个线程间的通信相对简单,但是当同一个资源(临界资源)被多个线程竞争使用时,例如线程共享进程的变量,其就有可能被任何一个线程修改,所以对这种临界资源的访问需要加上保护,否则资源会处于“混乱”的状态。
import time
from threading import Thread,Lock
class Account(object): # 假定这是一个银行账户
def __init__(self):
self.balance = 0 #初始余额为0元
def count(self,money):
new_balance = self.balance + money
time.sleep(0.01) # 模拟每次存款需要花费的时间
self.balance = new_balance #存完之后更新账户余额
@property
def get_count(self):
return(self.balance)
class Addmoney(Thread): #模拟存款业务,直接继承Thread
def __init__(self,action,money):
super().__init__() #在继承Thread类的基础上,再新增action及money属性,便于main()的直接调用
self.action = action
self.money = money
def run(self):
self.action.count(self.money)
def main():
action = Account()
threads = []
for i in range(1000): #开启1000个线程同时向账户存款
t = Addmoney(action,1) #每次只存入一元
threads.append(t)
t.start()
for task in threads:
task.join()
print('账户余额为: ¥%s元'%action.get_count)
main()
查看执行结果: 郑州哪个妇科医院好 http://www.sptdfk.com/
运行上面的程序,1000线程分别向账户中转入1元钱,结果小于100元。之所以出现这种情况是因为我们没有对balance余额该线程共享的变量加以保护,当多个线程同时向账户中存钱时,会一起执行到new_balance = self.balance + money这行代码,多个线程得到的账户余额都是初始状态下的0,所以都是0上面做了+1的操作,因此得到了错误的结果。
如果我们要确保balance计算正确,就要给Account().count()上一把锁,当某个线程开始执行Account().count()时,该线程因为获得了锁,因此其他线程不能同时执行,只能等待锁被释放后,获得该锁以后才能更改改。由于锁只有一个,无论多少线程,同一时刻最多只有一个线程持有该锁,所以,不会造成修改的冲突。创建一个锁就是通过threading.Lock()来实现:
import time
from threading import Thread,Lock
import time
from threading import Thread,Lock
class Account(object): # 假定这是一个银行账户
def __init__(self):
self.balance = 0 #初始余额为0元
self.lock = Lock()
def count(self,money):
self.lock.acquire()
try:
new_balance = self.balance + money
time.sleep(0.01) # 模拟每次存款需要花费的时间
self.balance = new_balance #存完之后更新账户余额
finally: #在finally中执行释放锁的操作保证正常异常锁都能释放
self.lock.release()
def get_count(self):
return(self.balance)
class Addmoney(Thread): #模拟存款业务
def __init__(self,action,money):
super().__init__()
self.action = action
self.money = money
self.lock = Lock()
def run(self):
self.action.count(self.money)
def main():
action = Account()
threads = []
for i in range(1000): #开启100000个线程同时向账户存款
t = Addmoney(action,1) #每次只存入一元
threads.append(t)
t.start()
for task in threads:
task.join()
print('账户余额为: ¥%s元'%action.get_count())
main()
执行结果:
感谢各位的阅读,以上就是“python多进程和多线程的实际用法”的内容了,经过本文的学习后,相信大家对python多进程和多线程的实际用法这一问题有了更深刻的体会,具体使用情况还需要大家实践验证。这里是亿速云,小编将为大家推送更多相关知识点的文章,欢迎关注!
免责声明:本站发布的内容(图片、视频和文字)以原创、转载和分享为主,文章观点不代表本网站立场,如果涉及侵权请联系站长邮箱:is@yisu.com进行举报,并提供相关证据,一经查实,将立刻删除涉嫌侵权内容。