41线程3_RLock_Condition_Barrier

 

10年积累的成都做网站、网站建设经验,可以快速应对客户对网站的新想法和需求。提供各种问题对应的解决方案。让选择我们的客户得到更好、更有力的网络服务。我虽然不认识你,你也不认识我。但先网站设计后付款的网站建设流程,更有织金免费网站建设让你可以放心的选择与我们合作。

目录

threading.RLock类:...1

threading.Condition类:...2

threading.Barrier类:...4

 

 

 

threading.RLock类:

可重入锁,是线程相关的锁;

线程A获得可重复锁,并可多次成功获取,不会阻塞,最后要在线程A中做和acquire次数相同的release(拿多少次锁,还多少回来);

 

注,线程相关:

threading.local类;

 

例:

lock = threading.RLock()

ret = lock.acquire()

print(ret)

ret = lock.acquire(timeout=5)

print(ret)

ret = lock.acquire(False)

print(ret)

ret = lock.acquire(False)   #全能拿到锁

print(ret)

 

lock.release()

lock.release()

lock.release()

lock.release()

 

# lock.release()   #前面没有对应的acquire,抛RuntimeError: cannot release un-acquired lock

 

def sub(lock:threading.RLock):

    lock.release()   #主线程中加的,不能在子线程中释放,理解线程级别

   

threading.Thread(target=sub, args=(lock,)).start()

输出:

True

True

True

True

Exception in thread Thread-1:

Traceback (most recent call last):

  File "D:\Python\Python35\lib\threading.py", line 914, in _bootstrap_inner

    self.run()

  File "D:\Python\Python35\lib\threading.py", line 862, in run

    self._target(*self._args, **self._kwargs)

  File "E:/git_practice/cmdb/example_threading2.py", line 249, in sub

    lock.release()

RuntimeError: cannot release un-acquired lock

 

 

 

threading.Condition类:

Condition(lock=None),构造方法,可传入一个lock或RLock对象,默认是RLock;

 

cond = threading.Condition()

cond.acquire(*args),获取锁;

cond.release()

cond.wait(timeout=None),等待或超时;

cond.notify(n=1),唤醒至多指定数目个数的等待线程,默认1个,没有等待的线程就没有任何操作,源码中waiter;

cond.notify_all(),唤醒所有等待的线程,源码中waiters;

 

总结:

Condition用于生产者-消费者模型,解决生产者-消费者速度匹配问题;

采用了通知机制,非常有效率;

使用Condition,必须先acquire,用完要release,因为内部使用了锁,默认使用RLock,最好的方式是使用with上下文;

消费者wait等待通知,生产者生产好消息,对消费者发通知,可使用notify或notify_all;

 

可把Condition理解为一把高级的锁,它提供了Lock、RLock更高级的功能,允许我们能够控制复杂的线程同步问题;

threading.Condition内部维护了一个锁对象(默认是RLock),可在创建Condition对象时把锁对象作为参数传入;

threading.Condition也提供了acquire和release方法,含义与锁的一致,其实它只是简单调用内部锁对象的对应的方法而已;

threading.Condition还提供了wait、notify、notify_all方法:

wait([timeout]),释放内部所占用的锁,同时线程被挂起,直至接收到通知被唤醒或超时(如果提供timeout),当线程被唤醒并重新占用锁时,程序才会继续执行下去;

notify(),唤醒一个挂起的线程(如果存在挂起的线程),notify()不会释放所占用的锁;

notify_all(),唤醒所有挂起的线程(如果存在挂起的线程),不会释放所占用的锁;

 

Lock与RLock:

RLock允许在同一线程中被多次acquire,而Lock不允许这种情况;

如果使用RLock,那么 acquire和release必须成对出现,即调用了n次acquire,必须调用n次release才能真正释放所占用的锁;

 

例:

class Dispatcher:

    def __init__(self):

        self.data = 0

        self.event = threading.Event()

 

    def produce(self):

        for i in range(100):

            data = random.randint(1,100)

            self.data = data

            self.event.wait(1)

 

    def custom(self):

        while True:   #消费者浪费了大量cpu时间,主动来查看有没有数据

            logging.info(self.data)   #有重复消费问题

            self.event.wait(1)   #隔1秒生成1个

 

d = Dispatcher()

p = threading.Thread(target=d.produce)

c = threading.Thread(target=d.custom)

c.start()   #消费者先启动

p.start()

输出:

……

2018-08-06-15:54:25       Thread info: 13052 Thread-1 13

2018-08-06-15:54:25       Thread info: 12052 Thread-2 13

2018-08-06-15:54:26       Thread info: 12052 Thread-2 13

……

 

例:

class Dispatcher:

    def __init__(self):

        self.data = 0

        self.event = threading.Event()

        self.cond = threading.Condition()

 

    def produce(self):

        for i in range(100):

            data = random.randint(1,100)

            # logging.info(data)

            with self.cond:

                self.data = data

                self.cond.notify(2)   #通知机制,有数据,通知消费者来消费;交给2个人做,一般是1(生产者)对多(消费者)

                self.cond.notify_all()   #通知所有消费者,1对多

            self.event.wait(1)

 

    def custom(self):

        # while True:

        while not self.event.is_set():

            # logging.info(self.data)

            with self.cond:   #消费者被迫匹配生产者

                self.cond.wait()

                logging.info(self.data)

            # self.event.wait(1)

 

d = Dispatcher()

p = threading.Thread(target=d.produce)

# c = threading.Thread(target=d.custom)

# c1 = threading.Thread(target=d.custom)   #开启2个消费线程

# c.start()

# c1.start()

for i in range(5):   #开启5个消费线程;如果produce中self.conf.notify(2),生产者通知2个线程处理,5个消费者中谁抢在前谁处理

    threading.Thread(target=d.custom, name='c-{}'.format(i)).start()

p.start()   #如果生产者先启动,已经生成的数据不会被消费者消费,除非在队列中

 

注:

以上有线程安全问题,解决:中间加MQ;

上例不是线程安全的,程序逻辑有很多瑕疵,但可很好的理解Condition的使用和生产者消费者模型;

一对多,其实就是广播模式;

 

 

 

threading.Barrier类:

屏障、栅栏,可以想象成路障、道闸,3.2引入;

Barrier(paties,action=None,timeout=None),构建Barrier对象,指定参与方数目,timeout是wait方法未指定超时的默认值;

n_waiting,当前在屏障中等待的线程数;

paties,参与方数目,需要多少个等待;

wait(timeout=None),等待通过屏障,返回0到线程数count-1的整数,count为等待的线程总数,每个线程返回不同;如果wait方法设置了超时,并超时发送,屏障将处于broken状态;wait方法超时发生,屏障处于broken状态,直至reset;

broken,如果屏障处于打破的状态,返回True;

abort(),将屏障置于broken状态,等待中的线程或调用等待方法的线程中都会抛BrokenBarrierError异常,直至reset方法来恢复屏障;

reset(),恢复屏障,重新开始拦截;

 

应用场景:

1、并发初始化;如,centos7中systemd,能并行启动就并行;

所有线程都必须初始化完成后,才能继续工作,如运行前加载数据、检查,如果这些工作没完成就开始运行,将不能正常工作;

10个线程做10种工作准备,只有这10个线程都完成后,才能继续工作,先完成的要等待后完成的线程;

如,启动一个程序,先加载磁盘文件、缓存预热、初始化连接池等,这些工作齐头并进,不过只有等满足了,程序才能继续后向执行,假设数据库连接失败,则初始化工作失效,就要abort,屏障broken,所有线程收到异常退出;

2、工作量,有10个计算任务,完成6个就算工作完成,如求样本数、求平均数;

 

例:

def worker(barrier:threading.Barrier):

    logging.info('n_waiting={}'.format(barrier.n_waiting))

    try:

        bid = barrier.wait()

        logging.info('after barrier {}'.format(bid))

    except threading.BrokenBarrierError:

        logging.info('broken barrier is {}'.format(threading.current_thread()))

 

barrier = threading.Barrier(3)   #3个一拨3个一拨

 

for _ in range(3):   #依次3,4,5,6

    threading.Thread(target=worker,args=(barrier,)).start()

输出:

2018-08-07-08:27:53       Thread info: 11496 Thread-1 n_waiting=0

2018-08-07-08:27:53       Thread info: 12540 Thread-2 n_waiting=1

2018-08-07-08:27:53       Thread info: 4612 Thread-3 n_waiting=2

2018-08-07-08:27:53       Thread info: 4612 Thread-3 after barrier 2

2018-08-07-08:27:53       Thread info: 11496 Thread-1 after barrier 0

2018-08-07-08:27:53       Thread info: 12540 Thread-2 after barrier 1

 

例:

for i in range(6):

    if i == 2:   #屏障中等待2个,屏障被broken,wait的线程抛异常,新wait的线程也抛异常,直至屏障恢复,才继续按达到参与方的数目继续拦截

        barrier.abort()

    elif i == 3:

        barrier.reset()

    threading.Event().wait(1)

    threading.Thread(target=worker,args=(barrier,)).start()

输出:

2018-08-07-09:21:49       Thread info: 12668 Thread-1 n_waiting=0

2018-08-07-09:21:50       Thread info: 12424 Thread-2 n_waiting=1

2018-08-07-09:21:50       Thread info: 12424 Thread-2 broken barrier is

2018-08-07-09:21:50       Thread info: 12668 Thread-1 broken barrier is

2018-08-07-09:21:51       Thread info: 11468 Thread-3 n_waiting=0

2018-08-07-09:21:51       Thread info: 11468 Thread-3 broken barrier is

2018-08-07-09:21:52       Thread info: 9788 Thread-4 n_waiting=0

2018-08-07-09:21:53       Thread info: 12680 Thread-5 n_waiting=1

2018-08-07-09:21:54       Thread info: 10948 Thread-6 n_waiting=2

2018-08-07-09:21:54       Thread info: 10948 Thread-6 after barrier 2

2018-08-07-09:21:54       Thread info: 9788 Thread-4 after barrier 0

2018-08-07-09:21:54       Thread info: 12680 Thread-5 after barrier 1

 

例:

wait方法超时发生,屏障处于broken状态,直至reset;

 

def worker(barrier:threading.Barrier, i:int):

    logging.info('waiting for {} threads'.format(barrier.n_waiting))

    try:

        logging.info(barrier.broken)

        if i < 3:

            barrier_id = barrier.wait(1)

        else:

            if i == 6:

                barrier.reset()

            barrier_id = barrier.wait()

        logging.info('after barrier {}'.format(barrier_id))

    except threading.BrokenBarrierError:

        logging.info('broken barrier. run.')

 

barrier = threading.Barrier(3)

 

for x in range(9):

    threading.Event().wait(2)

    threading.Thread(target=worker, args=(barrier,x), name='worker-{}'.format(x)).start()

输出:

2018-08-07-09:33:24       Thread info: 10556 worker-0 waiting for 0 threads

2018-08-07-09:33:24       Thread info: 10556 worker-0 False

2018-08-07-09:33:25       Thread info: 10556 worker-0 broken barrier. run.

2018-08-07-09:33:26       Thread info: 12752 worker-1 waiting for 0 threads

2018-08-07-09:33:26       Thread info: 12752 worker-1 True

2018-08-07-09:33:26       Thread info: 12752 worker-1 broken barrier. run.

2018-08-07-09:33:28       Thread info: 5324 worker-2 waiting for 0 threads

2018-08-07-09:33:28       Thread info: 5324 worker-2 True

2018-08-07-09:33:28       Thread info: 5324 worker-2 broken barrier. run.

2018-08-07-09:33:30       Thread info: 6716 worker-3 waiting for 0 threads

2018-08-07-09:33:30       Thread info: 6716 worker-3 True

2018-08-07-09:33:30       Thread info: 6716 worker-3 broken barrier. run.

2018-08-07-09:33:32       Thread info: 9180 worker-4 waiting for 0 threads

2018-08-07-09:33:32       Thread info: 9180 worker-4 True

2018-08-07-09:33:32       Thread info: 9180 worker-4 broken barrier. run.

2018-08-07-09:33:34       Thread info: 6788 worker-5 waiting for 0 threads

2018-08-07-09:33:34       Thread info: 6788 worker-5 True

2018-08-07-09:33:34       Thread info: 6788 worker-5 broken barrier. run.

2018-08-07-09:33:36       Thread info: 12044 worker-6 waiting for 0 threads

2018-08-07-09:33:36       Thread info: 12044 worker-6 True

2018-08-07-09:33:38       Thread info: 5020 worker-7 waiting for 1 threads

2018-08-07-09:33:38       Thread info: 5020 worker-7 False

2018-08-07-09:33:40       Thread info: 13052 worker-8 waiting for 2 threads

2018-08-07-09:33:40       Thread info: 13052 worker-8 False

2018-08-07-09:33:40       Thread info: 13052 worker-8 after barrier 2

2018-08-07-09:33:40       Thread info: 5020 worker-7 after barrier 1

2018-08-07-09:33:40       Thread info: 12044 worker-6 after barrier 0

 

 

 


网页名称:41线程3_RLock_Condition_Barrier
文章链接:http://csdahua.cn/article/joeihd.html
扫二维码与项目经理沟通

我们在微信上24小时期待你的声音

解答本文疑问/技术咨询/运营咨询/技术建议/互联网交流