Python多线程和多进程(四) 线程同步之信号量

作者 : IT 大叔 本文共2454个字,预计阅读时间需要7分钟 发布时间: 2020-08-31

同步方式3:信号量 semaphore

信号量是用于控制并发线程数量的锁。

还是以爬虫为例子。你可能有这么个需求:列表页爬到很多的详情页,我想对每个详情页开一个线程来爬。
但是如果1秒能够爬到30个详情页url,所以0.5秒内能够获取15个详情页url,假设每个详情页url要花0.5秒爬完,那么进程要维持平均15个线程来爬取详情页,才能保持生产者和消费者的速度一直。

如果1秒能爬100个详情页url,进程就要维持平均50个线程来爬详情页内容。

所以,此时开多少个线程取决于1秒能爬几个详情页url,而不是由开发者决定的。

但是我们知道,线程数量不是越多越好,线程数多了,CPU切换线程损耗的时间就多了。

所以,我们希望能够自己控制线程的数量。

例如,某个线程1秒能爬100个详情页url也好1000个url也好,但我希望能维持10个线程来根据详情页url进行爬取,一个线程爬取一个详情页内容。一个线程在爬取完之后,这个线程就关闭(线程执行完会自己关闭,无需手动关闭),并开启新线程,但始终保持有10个线程在工作。

from threading import Semaphore,Thread
from time import sleep
from random import uniform
class GetDetailContent(Thread):
    def __init__(self,sem,detail_url):
        super(GetDetailContent,self).__init__()
        self.sem = sem
        self.url = detail_url

    def run(self):
        sleep(uniform(0,1))      # 用sleep模拟爬取,为了展现线程是结束一个就生成一个而不是10个10个生成的,这里设定爬取每个页面的时间是随机的
        print("%s 成功爬取页面 %s" % (self.name,self.url))

        # 爬取完成后,释放信号量,没释放1次,计数器就会-1;如果计数器从满的状态-1,就会唤醒acquire()
        self.sem.release()

class GetDetailUrl:
    def __init__(self,thread_num=10):
        self.sem = Semaphore(thread_num)    # 定义一个信号量对象,允许并发的线程数为10个

    def do_task(self):
        for page in range(10):     # 假设有10页列表页
            for id in range(100):   # 每页有100个url
                self.sem.acquire()      # 信号量执行一次acquire就会在self.sem的内部计数器中加1,当计数器达到允许并发的线程数时就会进入等待状态
                url = "http://www.zbpblog.com/blog-%d.html" % id
                t = GetDetailContent(self.sem,url)      # 对每个详情页url创建一个线程来爬取
                t.start()

            sleep(1)    # 1秒爬取1个列表页

if __name__=="__main__":
    crawler = GetDetailUrl()
    crawler.do_task()
    
    

结果是:针对一个url会生成一个线程来爬。线程个数维持在10个不变。

下面贴出 信号量的源码

class Semaphore:

    def __init__(self, value=1):
        if value < 0:
            raise ValueError("semaphore initial value must be >= 0")
        self._cond = Condition(Lock())
        self._value = value

    def acquire(self, blocking=True, timeout=None):
        if not blocking and timeout is not None:
            raise ValueError("can't specify timeout for non-blocking acquire")
        rc = False
        endtime = None
        with self._cond:
            while self._value == 0:
                if not blocking:
                    break
                if timeout is not None:
                    if endtime is None:
                        endtime = _time() + timeout
                    else:
                        timeout = endtime - _time()
                        if timeout <= 0:
                            break
                self._cond.wait(timeout)
            else:
                self._value -= 1
                rc = True
        return rc


    def release(self):
        with self._cond:
            self._value += 1
            self._cond.notify()

信号量是用 条件变量+计数器实现的。

__init__()的_value记录了可继续开启线程的个数
每执行一次acquire(),计数器_value会-1。但_value为0时,acquire()会调用条件变量的wait进入休眠

当执行release()的时候,计数器_value会+1,并且notify唤醒wait()使得可以继续开启新线程。

semaphore 信号量不仅可以控制线程数量,还可以控制如mysql连接,网络连接这样的连接数。

免责声明:
1. 本站资源转自互联网,源码资源分享仅供交流学习,下载后切勿用于商业用途,否则开发者追究责任与本站无关!
2. 本站使用「署名 4.0 国际」创作协议,可自由转载、引用,但需署名原版权作者且注明文章出处
3. 未登录无法下载,登录使用金币下载所有资源。
IT小站 » Python多线程和多进程(四) 线程同步之信号量

常见问题FAQ

没有金币/金币不足 怎么办?
本站已开通每日签到送金币,每日签到赠送五枚金币,金币可累积。
所有资源普通会员都能下载吗?
本站所有资源普通会员都可以下载,需要消耗金币下载的白金会员资源,通过每日签到,即可获取免费金币,金币可累积使用。

发表评论