线程是操作系统能够进行运算调度的最小单位。它被包含在进程之中,是进程中的实际运作单位。一条线程指的是进程中一个单一顺序的控制流,一个进程中可以并发多个线程,每条线程并行执行不同的任务。一个进程至少有一个线程,一个进程必定有一个主线程。
创建线程的两个模块:
(1)thread(在python3中改名为_thread)
(2)threding
_thread提供了低级别的、原始的线程以及一个简单的锁。threading基于Java的线程模型设计。thread和threading模块都可以用来创建和管理线程,而thread模块提供了基本的线程和锁支持。threading提供的是更高级的完全的线程管理。低级别的thread模块是推荐给高手用,一般应用程序推荐使用更高级的threading模块:
1.它更先进,有完善的线程管理支持,此外,在thread模块的一些属性会和threading模块的这些属性冲突。
2.thread模块有很少的(实际上是一个)同步原语,而threading却有很多。
3.thread模块没有很好的控制,特别当你的进程退出时,比如:当主线程执行完退出时,其他的线程都会无警告,无保存的死亡,而threading会允许默认,重要的子线程完成后再退出,它可以特别指定daemon类型的线程。
_thread模块创建进程
import _thread
def job(name):
print("%s正在做工作........" %name)
print("%s工作完成..........." %name)
if __name__ == "__main__":
try:
#_thread模块 创建2个线程,再加上主线程,这个程序运行就一共有三个线程
_thread.start_new_thread(job,('ddd',))
_thread.start_new_thread(job,('eee',))
except Exception as e:
print("创建线程失败。",e)
else:
print("创建线程成功。")
print('主线程结束')
每次运行程序可以看到不同的结果:
(1)
创建线程成功。eee正在做工作........
eee工作完成...........
ddd正在做工作........
ddd工作完成...........
(2)
创建线程成功。ddd正在做工作........eee正在做工作........
eee工作完成...........
ddd工作完成...........
(3)
创建线程成功。
ddd正在做工作........
(4)
创建线程成功。eee正在做工作........
这些结果不同,是因为线程并发执行,三个线程来回切换在cpu工作,且当主线程结束后,不管其它线程是否完成工作都被迫结束。
通过threading模块创建线程
def job(name):
print("%s正在做第一部分工作........" %name)
print("%s正在做第二部分工作........" %name)
print("%s正在做第三部分工作........" %name)
print("%s工作完成..........." %name)
if __name__ == "__main__":
try:
#threading模块 创建新的线程 返回一个线程对象
#target 为线程需要做的任务,args为任务传递所需要参数(参数用元组组织起来),name为创建的线程命名(可以不取名)
t1 = threading.Thread(target=job,args=('aaa',),name='job1_name')
# start方法使线程开始执行
t1.start()
t2 = threading.Thread(target=job,args=('bbb',),name='job2_name')
t2.start()
except Exception as e:
print("创建线程失败\n",e)
print('主线程结束.....')
每次运行程序的结果:
(1)
aaa正在做第一部分工作........
aaa正在做第二部分工作........
bbb正在做第一部分工作........
主线程结束.....
aaa正在做第三部分工作........
aaa工作完成...........
bbb正在做第二部分工作........
bbb正在做第三部分工作........
bbb工作完成...........
(2)
aaa正在做第一部分工作........
bbb正在做第一部分工作........
bbb正在做第二部分工作........
bbb正在做第三部分工作........主线程结束.....
bbb工作完成...........
aaa正在做第二部分工作........
aaa正在做第三部分工作........
aaa工作完成...........
(3)
aaa正在做第一部分工作........
aaa正在做第二部分工作........
aaa正在做第三部分工作........bbb正在做第一部分工作........
aaa工作完成...........bbb正在做第二部分工作........
主线程结束.....
bbb正在做第三部分工作........
bbb工作完成...........
可以看到,不同的多个线程是相互交叉着在cpu执行的,和_thread不同的是它创建了一个线程类对象,也不会因为主线程的结束而结束所有的线程。
使用join方法
在A线程中调用了B线程的join法时,表示只有当B线程执行完毕时,A线程才能继续执行。多个线程使用了join方法,剩下的其它线程只有在这些线程执行完后才能继续执行。
这里调用的join方法是没有传参的,join方法其实也可以传递一个参数给它的。
join方法中如果传入参数,则表示这样的意思:如果A线程中掉用B线程的join(10),则表示A线程会等待B线程执行10毫秒,10毫秒过后,A、B线程并行执行。
需要注意的是,jdk规定,join(0)的意思不是A线程等待B线程0秒,而是A线程等待B线程无限时间,直到B线程执行完毕,即join(0)等价于join()。
def job(name):
print("%s正在做第一部分工作........" %name)
print("%s正在做第二部分工作........" %name)
print("%s正在做第三部分工作........" %name)
print("%s工作完成..........." %name)
if __name__ == "__main__":
try:
#threading模块 创建新的线程 返回一个线程对象
#target 为线程需要做的任务,args为任务传递所需要参数(参数用元组组织起来),name为创建的线程命名(可以不取名)
t1 = threading.Thread(target=job,args=('aaa',),name='job1_name')
# start方法使线程开始执行
t1.start()
t2 = threading.Thread(target=job,args=('bbb',),name='job2_name')
t2.start()
t1.join()
t2.join()
except Exception as e:
print("创建线程失败\n",e)
print('主线程结束.....')
每次运行程序的结果:
(1)
aaa正在做第一部分工作........
aaa正在做第二部分工作........bbb正在做第一部分工作........
aaa正在做第三部分工作........
aaa工作完成...........
bbb正在做第二部分工作........
bbb正在做第三部分工作........
bbb工作完成...........
主线程结束.....
(2)
aaa正在做第一部分工作........
bbb正在做第一部分工作........
bbb正在做第二部分工作........
bbb正在做第三部分工作........
bbb工作完成...........
aaa正在做第二部分工作........
aaa正在做第三部分工作........
aaa工作完成...........
主线程结束.....
(3)
aaa正在做第一部分工作........bbb正在做第一部分工作........
bbb正在做第二部分工作........
bbb正在做第三部分工作........aaa正在做第二部分工作........
aaa正在做第三部分工作........
bbb工作完成...........
aaa工作完成...........
主线程结束.....
当通过继承Thread类来创建线程时,需要传入参数,可以在构造方法增加相应的属性,以此来传入所需要的参数。
Thread类有一个run方法,当创建一个线程后,使用start方法时,实际上就是在调用类里面的run方法,因此可以在继承Thread类的时候,重写run方法来完成自己的任务。
import threading
class Jobthread(threading.Thread):
def __init__(self,name):
super(Jobthread, self).__init__()
self.name = name
#重写Thread类的run方法
def run(self):
print('%s线程待完成第一部分工作' %self.name)
print("%s正在做第二部分工作........" %self.name)
print("%s正在做第二部分工作........" %self.name)
if __name__ == "__main__":
#实例化类创建第一个线程对象
t1 = Jobthread('aaa')
t1.start()
#实例化类创建第二个线程对象
t2 = Jobthread('bbb')
t2.start()
t1.join()
t2.join()
print('主线程结束.....')
每次运行程序的结果:
(1)
wwww正在做第一部分工作
wwww正在做第二部分工作........
eeee正在做第一部分工作
eeee正在做第二部分工作........
eeee正在做第二部分工作........
wwww正在做第二部分工作........
主线程结束.....
(2)
wwww正在做第一部分工作
eeee正在做第一部分工作
wwww正在做第二部分工作........
eeee正在做第二部分工作........
wwww正在做第二部分工作........
eeee正在做第二部分工作........
主线程结束.....
(3)
wwww正在做第一部分工作eeee正在做第一部分工作
eeee正在做第二部分工作........
wwww正在做第二部分工作........
wwww正在做第二部分工作........
eeee正在做第二部分工作........
主线程结束.....
可以看到,通过继承线程类,然后重写run方法,实例化这个类,这样也可以新创建线程,在某些情况下,这样还更加方便。
线程的Daemon属性:当主线程执行结束, 让没有执行完成的线程强制结束的一个属性:daemon
setDaemon方法是改变线程类的一个属性:daemon,也可以在创建线程的时候指定这个属性的值,他的值默认为None
import threading
import time
# 任务1:
def music(name):
for i in range(2):
print("正在听音乐%s" %(name))
time.sleep(2)
print('听音乐结束')
# 任务2:
def code(name):
for i in range(2):
print("正在编写代码%s" %(name))
time.sleep(2)
print('写代码结束')
if __name__ == '__main__':
t1 = threading.Thread(target=music, args=("中国梦",))
t2 = threading.Thread(target=code, args=("爬虫", ))
# 将t1线程声明为守护线程, 如果设置为True, 子线程启动, 当主线程执行结束, 子线程也结束
# 设置setDaemon必须在启动线程之前进行设置;
t1.setDaemon(True)
t2.setDaemon(True)
t1.start()
t2.start()
print('完成任务......')
运行结果:
(1)
正在听音乐中国梦正在编写代码爬虫
完成任务......
(2)
正在听音乐中国梦
正在编写代码爬虫完成任务......
当设置daemon属性为True,就和_thread模块的线程一样主线程结束,其它线程也被迫结束
什么是全局解释器锁(GIL)
Python代码的执行由Python 虚拟机(也叫解释器主循环,CPython版本)来控制,Python 在设计之初就考虑到要在解释器的主循环中,同时只有一个线程在执行,即在任意时刻,只有一个线程在解释器中运行。对Python 虚拟机的访问由全局解释器锁(GIL)来控制,正是这个锁能保证同一时刻只有一个线程在运行。
即全局解释器锁,使得在同一时间内,python解释器只能运行一个线程的代码,这大大影响了python多线程的性能。
需要明确的一点是GIL并不是Python的特性
GIL是在实现Python解析器(CPython)时所引入的一个概念。就好比C++是一套语言(语法)标准,但是可以用不同的编译器来编译成可执行代码。有名的编译器例如GCC,INTEL C++,Visual C++等。Python也一样,同样一段代码可以通过CPython,PyPy,Psyco等不同的Python执行环境来执行。像其中的JPython就没有GIL。然而因为CPython是大部分环境下默认的Python执行环境。所以在很多人的概念里CPython就是Python,也就想当然的把GIL归结为Python语言的缺陷。
python GIL 会影响多线程等性能的原因:
因为在多线程的情况下,只有当线程获得了一个全局锁的时候,那么该线程的代码才能运行,而全局锁只有一个,所以使用python多线程,在同一时刻也只有一个线程在运行,因此在即使在多核的情况下也只能发挥出单核的性能。
经过GIL这一道关卡处理,会增加执行的开销。这意味着,如果你想提高代码的运行速度,使用threading包并不是一个很好的方法。
在多线程环境中,Python 虚拟机按以下方式执行:
这里就可以将操作分两种:
i/o密集型
cpu密集型(计算密集型)
对于前者我们尽可能的采用多线程方式,后者尽可能采用多进程方式
为什么会需要线程锁?
多个线程对同一个数据进行修改时, 会出现不可预料的情况。
例如:
def add():
global money
for i in range(1000000):
money += 1
def reduce():
global money
for i in range(1000000):
money -= 1
if __name__ =="__main__":
money = 0
t1 = threading.Thread(target=add)
t2 = threading.Thread(target=reduce)
t1.start()
t2.start()
t1.join()
t2.join()
print(money)
因为没有对变量money做访问限制,在某一个线程对其进行操作时,另一个线程仍可以对它进行访问、操作,致使最终结果出错,且不可预料,不是期待值。
(1)
55651
(2)
-133447
(3)
-236364
当我们使用线程锁的时候:
import threading
def add(lock):
global money
lock.acquire()
for i in range(100000):
money += 1
lock.release()
def reduce(lock):
global money
lock.acquire()
for i in range(1000000):
money -= 1
lock.release()
if __name__ =="__main__":
money = 0
lock = threading.Lock()
t1 = threading.Thread(target=add,args=(lock,))
t2 = threading.Thread(target=reduce,args=(lock,))
t1.start()
t2.start()
t1.join()
t2.join()
print(money)
运行结果正确,始终为0
使用多线程来查ip的地理位置
import json
from urllib.request import urlopen
class Job(threading.Thread):
def __init__(self,ip):
super(Job,self).__init__()
self.ip = ip
def check_ip(self):
url = 'http://ip.taobao.com/service/getIpInfo.php?ip=%s' % self.ip
text = urlopen(url).read().decode('utf-8')
d = json.loads(text)['data']
country = d['country']
city = d['city']
print(self.ip+':\t'+country+'\t'+city)
def run(self):
self.check_ip()
if __name__ == "__main__":
tt = []
ips = ['172.25.254.23', '111.213.215.66', '152.158.32.54', '164.52.196.89','214.63.145.189']
for ip in ips:
t = Job(ip)
t.start()
tt.append(t)
[i.join() for i in tt]
结果:
172.25.254.23: XX 内网IP
214.63.145.189: 美国 XX
164.52.196.89: 印度 XX
111.213.215.66: 中国 上海
152.158.32.54: 欧洲 XX
1). 理论上多线程执行任务, 会产生一些数据, 为其他程序执行作铺垫;
2). 多线程是不能返回任务执行结果的, 因此需要一个容器来存储多线程产生的数据
3). 这个容器如何选择? list(栈, 队列), tuple(x), set(x), dict(x), 此处选择队列来实现
队列与多线程
import threading
from queue import Queue
def job(l,queue):
# 将任务的结果存储到队列中
queue.put(sum(l))
def use_thread():
# 实例化一个队列, 用来存储每个线程执行的结果
q = Queue()
li = [[1,2,3,4,5,6],[2,3,4,5,6,7],[3,4,5,6,7,8],[4,5,6,7,8,9]]
threads = []
for i in li:
t = threading.Thread(target=job,args=(i,q))
threads.append(t)
t.start()
# join方法等待所有子线程执行结束
[i.join() for i in threads]
# 从队列里面拿出所有的运行结果
result = [q.get() for i in li]
print(result)
if __name__ == "__main__":
use_thread()
运行结果:
[21, 27, 33, 39]
在软件开发的过程中,经常碰到这样的场景:
某些模块负责生产数据,这些数据由其他模块来负责处理(此处的模块可能是:函数、线程、进程等)。产生数据的模块称为生产者,而处理数据的模块称为消费者。在生产者与消费者之间的缓冲区称之为仓库。生产者负责往仓库运输商品,而消费者负责从仓库里取出商品,这就构成了生产者消费者模式。
为了容易理解,我们举一个寄信的例子。假设你要寄一封信,大致过程如下:
1、你把信写好——相当于生产者生产数据
2、你把信放入邮箱——相当于生产者把数据放入缓冲区
3、邮递员把信从邮箱取出,做相应处理——相当于消费者把数据取出缓冲区,处理数据
生产者消费者模式的优点
1.解耦
假设生产者和消费者分别是两个线程。如果让生产者直接调用消费者的某个方法,那么生产者对于消费者就会产生依赖(也就是耦合)。如果未来消费者的代码发生变化,可能会影响到生产者的代码。而如果两者都依赖于某个缓冲区,两者之间不直接依赖,耦合也就相应降低了。
举个例子:我们去邮局投递信件,如果不使用邮箱(也就是缓冲区),你必须得把信直接交给邮递员。有同学会说,直接给邮递员不是挺简单的嘛?其实不简单,你必须 得认识谁是邮递员,才能把信给他。这就产生了你和邮递员之间的依赖(相当于生产者和消费者的强耦合)。万一哪天邮递员 换人了,你还要重新认识一下(相当于消费者变化导致修改生产者代码)。而邮箱相对来说比较固定,你依赖它的成本就比较低(相当于和缓冲区之间的弱耦合)。
2.并发
由于生产者与消费者是两个独立的并发体,他们之间是用缓冲区通信的,生产者只需要往缓冲区里丢数据,就可以继续生产下一个数据,而消费者只需要从缓冲区拿数据即可,这样就不会因为彼此的处理速度而发生阻塞。
继续上面的例子:如果我们不使用邮箱,就得在邮局等邮递员,直到他回来,把信件交给他,这期间我们啥事儿都不能干(也就是生产者阻塞)。或者邮递员得挨家挨户问,谁要寄信(相当于消费者轮询)。
3.支持忙闲不均
当生产者制造数据快的时候,消费者来不及处理,未处理的数据可以暂时存在缓冲区中,慢慢处理掉。而不至于因为消费者的性能造成数据丢失或影响生产者生产。
我们再拿寄信的例子:假设邮递员一次只能带走1000封信,万一碰上情人节(或是圣诞节)送贺卡,需要寄出去的信超过了1000封,这时候邮箱这个缓冲区就派上用场了。邮递员把来不及带走的信暂存在邮箱中,等下次过来时再拿走。
实例:
1.文件ipfile.txt中有大量的ip地址,要求将ip地址取出来再与端口号组合,放入队列中
2.从队列中取出地址,依次访问并返回访问结果
import threading
from queue import Queue
from urllib.request import urlopen
import time
class Procuder(threading.Thread):
def __init__(self, q):
super(Procuder, self).__init__()
self.q = q
def run(self):
portlist = [80, 443, 7001, 8000, 8080]
with open('ipfile.txt') as f:
for ip in f:
for port in portlist:
url = 'http://%s:%s' % (ip.strip(), port)
self.q.put(url)
class Consumer(threading.Thread):
def __init__(self, q):
super(Consumer, self).__init__()
self.q = q
def run(self):
# 阻塞0.001妙使生产者先运行再队列中放入数据
time.sleep(0.001)
#只要队列不为空就一直“消费”数据
while not self.q.empty():
try:
url = self.q.get()
urlObj = urlopen(url)
except Exception as e:
print("%s unknown url" %(url))
else:
print("%s is ok" %(url))
if __name__ == '__main__':
q = Queue(20)
p1 = Procuder(q)
p1.start()
c = Consumer(q)
c.start()
# 阻塞调用线程,直到队列中的所有任务被处理掉,再继续向下执行。
q.join()
运行结果就不截图了。
传统多线程方案会使用“即时创建, 即时销毁”的策略。尽管与创建进程相比,创建线程的时间已经大大的缩短,但是如果提交给线程的任务是执行时间较短,而且执行次数极其频繁,那么服务器将处于不停的创建线程,销毁线程的状态。
一个线程的运行时间可以分为3部分:线程的启动时间、线程体的运行时间和线程的销毁时间。在多线程处理的情景中,如果线程不能被重用,就意味着每次创建都需要经过启动、销毁和运行3个过程。这必然会增加系统相应的时间,降低了效率。
使用线程池:
由于线程预先被创建并放入线程池中,同时处理完当前任务之后并不销毁而是被安排处理下一个任务,因此能够避免多次创建线程,从而节省线程创建和销毁的开销,能带来更好的性能和系统稳定性。
#导入模块 注意: python3.2版本以后才可以使用;
from concurrent.futures import ThreadPoolExecutor
import time
#需要完成的任务
def job(n):
sum = 0
for i in range(1,n+1):
sum += i
return sum
if __name__ =="__main__":
#实例化线程池对象,设置线程池有10个线程
pool = ThreadPoolExecutor(max_workers=10)
#向线程池提交任务submit方法返回一个_base.Future对象,这个对象含有许多方法。便于我们对线程操作
f1 = pool.submit(job,20)
f2 = pool.submit(job,16)
#查看线程是否完成任务(线程是否被销毁,完成任务的线程会被释放)
#这里休眠1妙是因为线程在完成工作后会被释放,如果立即查看线程状态,可能线程正在释放中,会返False,这里等待1妙让线程完成释放之后在查看线程状态。
time.sleep(1)
print(f1.done())
print(f2.done())
#直接获取任务执行结果
print(f1.result())
print(f2.result())
运行结果:
True
True
210
136
concurrent.futures.ThreadPoolExecutor,在提交任务的时候,有两种方式,一种是submit()函数,另一种是map()函数,两者的主要区别在于:
(1)map可以保证输出的顺序, submit输出的顺序是乱的
(2)如果你要提交的任务的函数是一样的,就可以简化成map。但是假如提交的任务函数是不一样的,或者执行的过程之可能出现异常(使用map执行过程中发现问题会直接抛出错误)就要用到submit()
(3)submit和map的参数是不同的,submit每次都需要提交一个目标函数和对应的参数,map只需要提交一次目标函数,目标函数的参数放在一个迭代器(列表,字典)里就可以。
from concurrent.futures import as_completed
from concurrent.futures.thread import ThreadPoolExecutor
from urllib.request import urlopen
urls = ['http://a.cn','http://1688.cn','http://jd.cn','http://qq.cn','http://qq.com','http://111.231.215.66']*10
def get_page(url):
try:
content = urlopen(url).read()
except:
return {'url:'+url+' page_len:'+str(0)}
else:
return {'url:'+url+' page_len:'+str(len(content))}
# 1.通过for循环打印结果
print('第一种方法:')
pool = ThreadPoolExecutor(max_workers=10)
resultlist = [pool.submit(get_page,url) for url in urls]
for i in resultlist:
print(i.result())
# 2.通过方法as_completed
print('第二种方法:')
pool = ThreadPoolExecutor(max_workers=10)
resultlist = [pool.submit(get_page,url) for url in urls]
for i in as_completed(resultlist):
print(i.result())
# 3.通过map方法
print('第三种方法:')
pool = ThreadPoolExecutor(max_workers=10)
for res in pool.map(get_page,urls):
print(res)
免责声明:本站发布的内容(图片、视频和文字)以原创、转载和分享为主,文章观点不代表本网站立场,如果涉及侵权请联系站长邮箱:is@yisu.com进行举报,并提供相关证据,一经查实,将立刻删除涉嫌侵权内容。