threading模块
局域网IP扫描实例
# 单线程:
import subprocess,time,threading
a = time.clock()
with open("check_ping.txt",'w') as f:
for i in range(1,20):
my_ip = ".".join(["192.163.37",str(i)])
try:
subprocess.check_call('ping -n 1 -w 1 %s'%(my_ip),shell=True)
except subprocess.CalledProcessError:
pass
else:
f.write("%s 可以ping通\n"%my_ip)
b = time.clock()
print(b)
# 总共花费8s多
# 多线程(一)创建 Thread 的实例,传给它一个函数
a = time.clock()
def check_ping(IP,obj):
try:
subprocess.check_call('ping -n 1 -w 1 %s' % IP, shell=True)
except subprocess.CalledProcessError:
pass
else:
obj.write("%s 可以ping通\n" % IP)
def main():
threads = []
with open("check_ping_1.txt", 'w') as f:
for i in range(1, 20):
my_ip = ".".join(["192.163.37", str(i)])
t = threading.Thread(target=check_ping,args=(my_ip,f)) #Thread方法:实例化一个线程对象,把函数(target)和参数(args)传进去,然后返回Thread实例,这里并没有执行。
threads.append(t)
num = range(len(threads))
for i in num:
threads[i].start() #执行线程的start方法,线程开始执行
for i in num:
threads[i].join() #这行线程的join方法,等待线程结束,如果主进程不需要等待线程结束,可以不需要调用join方法。
if __name__ == '__main__':
main()
b = time.clock()
print(b)
# 总共花费0.9s
# 多线程(二)创建 Thread 的实例,传给它一个可调用的类实例
a = time.clock()
class Thread_func:
def __init__(self,func,args):
self.func = func
self.args = args
def __call__(self): #__call__方法:将类模拟成函数,实例化后的类再次实例化相当于执行了__call__方法。
self.func(*self.args)
def check_ping(IP,obj):
try:
subprocess.check_call('ping -n 1 -w 1 %s' % IP, shell=True)
except subprocess.CalledProcessError:
pass
else:
obj.write("%s 可以ping通\n" % IP)
def main():
threads = []
with open("check_ping_2.txt", 'w') as f:
for i in range(1, 20):
my_ip = ".".join(["192.163.37", str(i)])
t = threading.Thread(target=Thread_func(check_ping,(my_ip,f))) #Thread_func实例化时已经传入了参数,所以Thread方法中就不用args来传参了。
threads.append(t)
num = range(len(threads))
for i in num:
threads[i].start()
for i in num:
threads[i].join()
if __name__ == '__main__':
main()
b = time.clock()
print(b)
# 总共花费1
# 多线程(三)派生 Thread 的子类,并创建子类的实例
a = time.clock()
class Thread_func(threading.Thread): #继承Thread,调用更灵活。
def __init__(self,func,args):
threading.Thread.__init__(self)
self.func = func
self.args = args
def run(self): #这里必须使用run方法,当线程开启后执行。
self.func(*self.args)
def check_ping(IP,obj):
try:
subprocess.check_call('ping -n 1 -w 1 %s' % IP, shell=True)
except subprocess.CalledProcessError:
pass
else:
obj.write("%s 可以ping通\n" % IP)
def main():
threads = []
with open("check_ping_3.txt", 'w') as f:
for i in range(1, 20):
my_ip = ".".join(["192.163.37", str(i)])
t = Thread_func(check_ping,(my_ip,f))
threads.append(t)
num = range(len(threads))
for i in num:
threads[i].start()
for i in num:
threads[i].join()
if __name__ == '__main__':
main()
b = time.clock()
print(b)
# 花费0.7
免责声明:本站发布的内容(图片、视频和文字)以原创、转载和分享为主,文章观点不代表本网站立场,如果涉及侵权请联系站长邮箱:is@yisu.com进行举报,并提供相关证据,一经查实,将立刻删除涉嫌侵权内容。