温馨提示×

温馨提示×

您好,登录后才能下订单哦!

密码登录×
登录注册×
其他方式登录
点击 登录注册 即表示同意《亿速云用户服务条款》

为什么python ThreadPoolExecutor 出现线程池异常捕获的问题

发布时间:2021-03-10 15:36:46 来源:亿速云 阅读:353 作者:TREX 栏目:开发技术

本篇内容介绍了“为什么python ThreadPoolExecutor 出现线程池异常捕获的问题”的有关知识,在实际案例的操作过程中,不少人都会遇到这样的困境,接下来就让小编带领大家学习一下如何处理这些情况吧!希望大家仔细阅读,能够学有所成!

问题

最近写了涉及线程池及线程的 python 脚本,运行过程中发现一个有趣的现象,线程池中的工作线程出现问题,引发了异常,但是主线程没有捕获异常,还在发现 BUG 之前一度以为线程池代码正常返回。

先说重点

这里主要想介绍 python concurrent.futuresthread.ThreadPoolExecutor 线程池中的 worker 引发异常的时候,并不会直接向上抛起异常,而是需要主线程通过调用concurrent.futures.Future.exception(timeout=None) 方法主动获取 worker 的异常。

问题重现及解决

引子

问题主要由这样一段代码引起的:

def thread_executor():
 logger.info("I am slave. I am working. I am going to sleep 3s")
 sleep(3)
 logger.info("Exit thread executor")


def main():
 thread_obj = threading.Thread(target=thread_executor)
 while True:
  logger.info("Master starts thread worker")

  try:
   # 工作线程由于某种异常而结束并退出了,想重启工作线程的工作,但又不想重复创建线程
   thread_obj.start() # 这一行会报错,同一线程不能重复启动
  except Exception as e:
   logger.error("Master start thread error", exc_info=True)
   raise e

  logger.info("Master is going to sleep 5s")
  sleep(5)

上面这段代码的功能如注释中解释的,主要要实现类似生产者消费者的功能,工作线程一直去生产资源,主线程去消费工作线程生产的资源。但是工作线程由于异常推出了,想重新启动生产工作。显然,这个代码会报错。

运行结果:

thread: MainThread [INFO] Master starts thread worker
thread: Thread-1 [INFO] I am slave. I am working. I am going to sleep 3s
thread: MainThread [INFO] Master is going to sleep 5s
thread: Thread-1 [INFO] Exit thread executor because of some exception
thread: MainThread [INFO] Master starts thread worker
thread: MainThread [ERROR] Master start thread error
Traceback (most recent call last):
File "xxx.py", line 47, in main
 thread_obj.start()
File "E:\anaconda\lib\threading.py", line 843, in start
 raise RuntimeError("threads can only be started once")
RuntimeError: threads can only be started once
Traceback (most recent call last):
File "xxx.py", line 56, in <module>
 main()
File "xxx.py", line 50, in main
 raise e
File "xxx.py", line 47, in main
 thread_obj.start()
File "E:\anaconda\lib\threading.py", line 843, in start
 raise RuntimeError("threads can only be started once")
RuntimeError: threads can only be started once

切入正题

然而脚本还有其他业务代码要运行,所以需要把上面的资源生产和消费的代码放到一个线程里完成,所以引入线程池来执行这段代码:

def thread_executor():
 while True:
  logger.info("I am slave. I am working. I am going to sleep 3s")
  sleep(3)
  logger.info("Exit thread executor because of some exception")
  break


def main():
 thread_obj = threading.Thread(target=thread_executor)
 while True:
  logger.info("Master starts thread worker")

  # 工作线程由于某种异常而结束并退出了,想重启工作线程的工作,但又不想重复创建线程
  # 没有想到这里会有异常
  thread_obj.start() # 这一行会报错,同一线程不能重复启动

  logger.info("Master is going to sleep 5s")
  sleep(5)


def thread_pool_main():
 thread_obj = ThreadPoolExecutor(max_workers=1, thread_name_prefix="WorkExecutor")
 logger.info("Master ThreadPool Executor starts thread worker")
 thread_obj.submit(main)

 while True:
  logger.info("Master ThreadPool Executor is going to sleep 5s")
  sleep(5)

if __name__ == '__main__':
 thread_pool_main()

代码运行结果如下:

INFO [thread: MainThread] Master ThreadPool Executor starts thread worker
INFO [thread: WorkExecutor_0] Master starts thread worker
INFO [thread: MainThread] Master ThreadPool Executor is going to sleep 5s
INFO [thread: Thread-1] I am slave. I am working. I am going to sleep 3s
INFO [thread: WorkExecutor_0] Master is going to sleep 5s
INFO [thread: Thread-1] Exit thread executor because of some exception
INFO [thread: MainThread] Master ThreadPool Executor is going to sleep 5s
INFO [thread: WorkExecutor_0] Master starts thread worker
INFO [thread: MainThread] Master ThreadPool Executor is going to sleep 5s
INFO [thread: MainThread] Master ThreadPool Executor is going to sleep 5s
INFO [thread: MainThread] Master ThreadPool Executor is going to sleep 5s
INFO [thread: MainThread] Master ThreadPool Executor is going to sleep 5s
INFO [thread: MainThread] Master ThreadPool Executor is going to sleep 5s
INFO [thread: MainThread] Master ThreadPool Executor is going to sleep 5s
INFO [thread: MainThread] Master ThreadPool Executor is going to sleep 5s

... ...

显然,由上面的结果,在线程池 worker 执行到 INFO [thread: WorkExecutor_0] Master starts thread worker 的时候,是会有异常产生的,但是整个代码并没有抛弃任何异常。

解决方法

发现上面的 bug 后,想在线程池 worker 出错的时候,把异常记录到日志。查阅资料,要获取线程池的异常信息,需要调用 concurrent.futures.Future.exception(timeout=None) 方法,为了记录日志,这里加了线程池执行结束的回调函数。同时,日志中记录异常信息,用了 logging.exception() 方法。

def thread_executor():
 while True:
  logger.info("I am slave. I am working. I am going to sleep 3s")
  sleep(3)
  logger.info("Exit thread executor because of some exception")
  break


def main():
 thread_obj = threading.Thread(target=thread_executor)
 while True:
  logger.info("Master starts thread worker")

  # 工作线程由于某种异常而结束并退出了,想重启工作线程的工作,但又不想重复创建线程
  # 没有想到这里会有异常
  thread_obj.start() # 这一行会报错,同一线程不能重复启动

  logger.info("Master is going to sleep 5s")
  sleep(5)


def thread_pool_callback(worker):
 logger.info("called thread pool executor callback function")
 worker_exception = worker.exception()
 if worker_exception:
  logger.exception("Worker return exception: {}".format(worker_exception))


def thread_pool_main():
 thread_obj = ThreadPoolExecutor(max_workers=1, thread_name_prefix="WorkExecutor")
 logger.info("Master ThreadPool Executor starts thread worker")
 thread_pool_exc = thread_obj.submit(main)
 thread_pool_exc.add_done_callback(thread_pool_callback)
 # logger.info("thread pool exception: {}".format(thread_pool_exc.exception()))

 while True:
  logger.info("Master ThreadPool Executor is going to sleep 5s")
  sleep(5)


if __name__ == '__main__':
 thread_pool_main()

代码运行结果:

INFO [thread: MainThread] Master ThreadPool Executor starts thread worker
INFO [thread: WorkExecutor_0] Master starts thread worker
INFO [thread: MainThread] Master ThreadPool Executor is going to sleep 5s
INFO [thread: Thread-1] I am slave. I am working. I am going to sleep 3s
INFO [thread: WorkExecutor_0] Master is going to sleep 5s
INFO [thread: Thread-1] Exit thread executor because of some exception
INFO [thread: MainThread] Master ThreadPool Executor is going to sleep 5s
INFO [thread: WorkExecutor_0] Master starts thread worker
INFO [thread: WorkExecutor_0] called thread pool executor callback function
ERROR [thread: WorkExecutor_0] Worker return exception: threads can only be started once
Traceback (most recent call last):
File "E:\anaconda\lib\concurrent\futures\thread.py", line 57, in run
 result = self.fn(*self.args, **self.kwargs)
File "xxxx.py", line 46, in main
 thread_obj.start() # 这一行会报错,同一线程不能重复启动
File "E:\anaconda\lib\threading.py", line 843, in start
 raise RuntimeError("threads can only be started once")
RuntimeError: threads can only be started once
INFO [thread: MainThread] Master ThreadPool Executor is going to sleep 5s
INFO [thread: MainThread] Master ThreadPool Executor is going to sleep 5s
INFO [thread: MainThread] Master ThreadPool Executor is going to sleep 5s
... ...

最终的写法

其实,上面写法中,想重复利用一个线程去实现生产者线程的实现方法是有问题的,在此处,一般情况下,线程执行结束后,线程资源会被会被操作系统,所以线程不能被重复调用 start() 。

为什么python ThreadPoolExecutor 出现线程池异常捕获的问题

为什么python ThreadPoolExecutor 出现线程池异常捕获的问题

一种可行的实现方式就是,用线程池替代。当然,这样做得注意上面提到的线程池执行体的异常捕获问题。

def thread_executor():
 while True:
  logger.info("I am slave. I am working. I am going to sleep 3s")
  sleep(3)
  logger.info("Exit thread executor because of some exception")
  break

def executor_callback(worker):
 logger.info("called worker callback function")
 worker_exception = worker.exception()
 if worker_exception:
  logger.exception("Worker return exception: {}".format(worker_exception))
  # raise worker_exception


def main():
 slave_thread_pool = ThreadPoolExecutor(max_workers=1, thread_name_prefix="SlaveExecutor")
 restart_flag = False
 while True:
  logger.info("Master starts thread worker")

  if not restart_flag:
   restart_flag = not restart_flag
   logger.info("Restart Slave work")
  slave_thread_pool.submit(thread_executor).add_done_callback(executor_callback)

  logger.info("Master is going to sleep 5s")
  sleep(5)

总结

这个问题主要还是因为对 Python 的 concurrent.futuresthread.ThreadPoolExecutor 不够了解导致的,接触这个包是在书本上,但是书本没完全介绍包的全部 API 及用法,所以代码产生异常情况后,DEBUG 了许久在真正找到问题所在。查阅 python docs 后才对其完整用法有所认识,所以,以后学习新的 python 包的时候还是可以查一查官方文档的。

参考资料

英文版: docs of python concurrent.futures

中文版: python docs concurrent.futures — 启动并行任务

exception(timeout=None)

返回由调用引发的异常。如果调用还没完成那么这个方法将等待 timeout 秒。如果在 timeout 秒内没有执行完成,concurrent.futures.TimeoutError 将会被触发。timeout 可以是整数或浮点数。如果 timeout 没有指定或为 None,那么等待时间就没有限制。

如果 futrue 在完成前被取消则 CancelledError 将被触发。

如果调用正常完成那么返回 None。

add_done_callback(fn)

附加可调用 fn 到期程。当期程被取消或完成运行时,将会调用 fn,而这个期程将作为它唯一的参数。

加入的可调用对象总被属于添加它们的进程中的线程按加入的顺序调用。如果可调用对象引发一个 Exception 子类,它会被记录下来并被忽略掉。如果可调用对象引发一个 BaseException 子类,这个行为没有定义。

如果期程已经完成或已取消,fn 会被立即调用。


“为什么python ThreadPoolExecutor 出现线程池异常捕获的问题”的内容就介绍到这里了,感谢大家的阅读。如果想了解更多行业相关的知识可以关注亿速云网站,小编将为大家输出更多高质量的实用文章!

向AI问一下细节

免责声明:本站发布的内容(图片、视频和文字)以原创、转载和分享为主,文章观点不代表本网站立场,如果涉及侵权请联系站长邮箱:is@yisu.com进行举报,并提供相关证据,一经查实,将立刻删除涉嫌侵权内容。

AI