温馨提示×

温馨提示×

您好,登录后才能下订单哦!

密码登录×
登录注册×
其他方式登录
点击 登录注册 即表示同意《亿速云用户服务条款》

python解决线程同步方案有哪些

发布时间:2020-05-28 11:19:47 来源:网络 阅读:216 作者:可爱的汁汁 栏目:编程语言
 先提到线程同步是个什么,概念是什么,就是线程通讯中通过使用某种技术访问数据时,而一旦此线程访问到,其他线程也就不能访问了,直到该线程对数据完成操作才结束。
     Event事件是一种实现方式:通过内部的标记看看是不是变化,也就是true or false了,
     将set(),clear(),is_set(),为true,wait(timeout=None)此种设置true的时长,等到返回false,不等到超时返回false,无线等待为None,

     来看一个wait的使用:
from threading import Event, Thread
import logging

logging.basicConfig(level=logging.INFO)

def A(event:Event, interval:int):
    while not event.wait(interval): # 要么true  or false
        logging.info('hello')

e = Event()
Thread(target=A, args=(e, 3)).start()

e.wait(8)
e.set()
print('end--------------')

使用锁Lock解决数据资源在争抢,从而使资源有效利用。
lock的方法:
acquire(blocking=True,timeout=-1),默认阻塞,阻塞设置超时时间,非阻塞,timeout禁止使用,成功获取锁,返回True,否则False。
有阻塞就有释放 ,解开锁,release(),从线程释放锁,上锁的锁重置为unloced未上锁调用,抛出RuntimeError异常。

import threading
from threading import Thread, Lock
import logging
import time

FORMAT = '%(asctime)s %(threadName)s %(thread)d %(message)s'

logging.basicConfig(format=FORMAT, level=logging.INFO)


cups = []
lock = Lock()

def worker(count=10):
    logging.info("I'm working for U.")
    flag = False
    while True:
    lock.acquire() # 获取锁

    if len(cups) >= count:
        flag = True

    time.sleep(0.0001) 
    if not flag:
        cups.append(1)

    if flag:
        break

logging.info('I finished. cups = {}'.format(len(cups)))

for _ in range(10):
    Thread(target=worker, args=(1000,)).start()

使用锁的过程中,总是不经意加上锁,出现死锁的产生,出现了死锁,如何解决呢?
使用try finally 将锁释放,另一种使用with上下文管理。
锁的使用场景在于应该少用锁,还要就是如若上锁,将锁的使用时间缩短,避免时间太长而出现无法释放锁的结果。

可重入锁Lock,


import threading
import time

lock = threading.RLock()
print(lock.acquire())
print('------------')
print(lock.acquire(blocking=False))
print(lock.acquire())
print(lock.acquire(timeout=3.55))
print(lock.acquire(blocking=False))
#print(lock.acquire(blocking=False, timeout=10)) # 异常
lock.release()
lock.release()
lock.release()
lock.release()
print('main thread {}'.format(threading.current_thread().ident))
print("lock in main thread {}".format(lock)) # 注意观察lock对象的信息
lock.release()
#lock.release() #多了一次
print('===============')
print()

print(lock.acquire(blocking=False)) # 1次
#threading.Timer(3, lambda x:x.release(), args=(lock,)).start() # 跨线程了,异常
lock.release()
print('~~~~~~~~~~~~~~~~~')
print()

# 测试多线程
print(lock.acquire())

def sub(l):
    print('{}: {}'.format(threading.current_thread(), l.acquire())) # 阻塞
    print('{}: {}'.format(threading.current_thread(), l.acquire(False)))
    print('lock in sub thread {}'.format(lock))
    l.release()
    print('sub 1')
    l.release()
    print('sub 2')
    # l.release() # 多了一次

threading.Timer(2, sub, args=(lock,)).start() # 传入同一个lock对象
print('++++++++++++++++++++++')
print()

print(lock.acquire())

lock.release()
time.sleep(5)
print('释放主线程锁')
lock.release()

使用构造方法Condition(lock=None),默认是Rloc,
具体方法为;
acquire(*args),获取锁
wait(self,timeout=None),等待或超时
notify(n=1),唤醒线程,没有等待就没有任何操作,指线程
notify_all(),唤醒所有等待的线程。
Condition主要用于生产者和消费者模型中,解决匹配的问题。
使用方式:先获取acquire,使用完了要释放release,避免死锁最好使用with上下文;生产者和消费者可以使用notify and notify_all。

如下例子:

from threading import Thread, Event
import logging
import random

FORMAT = '%(asctime)s %(threadName)s %(thread)d %(message)s'
logging.basicConfig(format=FORMAT, level=logging.INFO)

## 此例只是为了演示,不考虑线程安全问题

class Dispatcher:
    def __init__(self):
        self.data = None
        self.event = Event() # event只是为了使用方便,与逻辑无关

    def produce(self, total):
        for _ in range(total):
            data = random.randint(0,100)
            logging.info(data)
            self.data = data
            self.event.wait(1)
            self.event.set()

    def consume(self):
        while not self.event.is_set():
            data = self.data
            logging.info("recieved {}".format(data))
            self.data = None
            self.event.wait(0.5)

d = Dispatcher()
p = Thread(target=d.produce, args=(10,), name='producer')
c = Thread(target=d.consume, name='consumer')
c.start()
p.start()

这里代码会有缺陷:优化如下:

from threading import Thread, Event, Condition
import logging
import random

FORMAT = '%(asctime)s %(threadName)s %(thread)d %(message)s'
logging.basicConfig(format=FORMAT, level=logging.INFO)

## 此例只是为了演示,不考虑线程安全问题

class Dispatcher:
    def __init__(self):
        self.data = None
        self.event = Event() # event只是为了使用方便,与逻辑无关
        self.cond = Condition()

    def produce(self, total):
        for _ in range(total):
            data = random.randint(0,100)
            with self.cond:
                logging.info(data)
                self.data = data
                self.cond.notify_all()
                self.event.wait(1) # 模拟产生数据速度
                self.event.set()

    def consume(self):
        while not self.event.is_set():
            with self.cond:
                self.cond.wait() # 阻塞等通知
                logging.info("received {}".format(self.data))
            self.event.wait(0.5) # 模拟消费的速度

d = Dispatcher()
p = Thread(target=d.produce, args=(10,), name='producer')
# 增加消费者

for i in range(5):
    c = Thread(target=d.consume, name='consumer-{}'.format(i))
    c.start()
p.start()

Barrier的使用:
方法如下:
Barrier(parties, action=None,
timeout=None),构建barrier对象,timeout未指定的默认值;
n_waiting ,当前barrier等待的线程数。;
parties ,需要等待
wait(timeout=None),wait方法设置超时并超时发送,barrie处于broken状态。
而broken的状态方法:
broken,打开状态,返回true;
abort(),barrier在broken状态中,wait等待的线程会抛出BrokenBarrierError异常,直到reset恢复barrier;
reset(),恢复barrier,重新开始拦截。
barrier不做演示:

还有semaphore信号量,每次acquire时,都会减一,到0时的线程再到release后,大于0,恢复阻塞的线程。
方法:
Semaphore(value=1) 构造方法,alue小于0,抛ValueError异常;
acquire(blocking=True, timeout=None) 获取信号量,计数器减1,获取成功返回True;
release() 释放信号量,计数器加1。
使用信号量处理时,需要注意release超界问题,边界问题,其实,在使用中,python有GIL的存在,有的多线程就变成线程安全的,注意一点,但实际上它们并不是线程安全类型。因此我们在使用中要具体场景具体分析具体使用。

向AI问一下细节

免责声明:本站发布的内容(图片、视频和文字)以原创、转载和分享为主,文章观点不代表本网站立场,如果涉及侵权请联系站长邮箱:is@yisu.com进行举报,并提供相关证据,一经查实,将立刻删除涉嫌侵权内容。

AI