Python多线程详解 - Go语言中文社区

Python多线程详解


转载:Py西游攻关之多线程 原文地址:http://www.cnblogs.com/yuanchenqi/articles/5733873.html
在原博客的基础上加入了自己的一些翻译和看法。

  1. 线程与进程

    1. 什么是线程
      线程是操作系统能够进行运算调度的最小单元。
      它被包含在进程中,是进程中的实际运作单位。
      一条线程指的是进程中一个单一顺序的控制流,一个进程中可以并发多个线程,每条线程并行执行不同的任务。
       

      A thread is an execution context, which is all the information a CPU needs to execute a stream of instructions.
      线程是执行上下文,它是cpu执行指令流所需的所有信息。

      Suppose you're reading a book, and you want to take a break right now, but you want to be able to come back and resume reading from the exact point where you stopped. One way to achieve that is by jotting down the page number, line number, and word number. So your execution context for reading a book is these 3 numbers.
      假如你正在阅读一本书,这时候你想休息一下,但是你想要能够回到你停下来的地方继续阅读。实现的一种方法是记住页码、行号和单词编号。因此,你的阅读一本书的执行上下文就是这三个编号。

      If you have a roommate, and she's using the same technique, she can take the book while you're not using it, and resume reading from where she stopped. Then you can take it back, and resume it from where you were.
      如果你有一个室友,并且她也使用同样的方法,她可以在你不看这本书的时候,从她上次停下来的地方回复阅读。当你拿回这本书的时候你可以从你上次停下来的地方继续阅读。

      Threads work in the same way. A CPU is giving you the illusion that it's doing multiple computations at the same time. It does that by spending a bit of time on each computation. It can do that because it has an execution context for each computation. Just like you can share a book with your friend, many tasks can share a CPU.
      线程工作的方式相同。一个CPU能够给你一种能够同时进行多个计算的错觉。它通过在每个计算上花费一些时间来做到这一点。它能够这样做是因为它的每一个计算都有不同的执行上下文。就像你能够和你的朋友分享一本书一样。多任务可以共享一个CPU。

      On a more technical level, an execution context (therefore a thread) consists of the values of the CPU's registers.
      在技术层面上,执行上下文(一个线程)是由CPU寄存器的值所组成。

      Last: threads are different from processes. A thread is a context of execution, while a process is a bunch of resources associated with a computation. A process can have one or many threads.
      最后:线程与进程不同。一个线程是一个执行的上下文,一个进程是一捆与计算相关联的资源。一个进程可以有一个或多个线程。

      Clarification: the resources associated with a process include memory pages (all the threads in a process have the same view of the memory), file descriptors (e.g., open sockets), and security credentials (e.g., the ID of the user who started the process).
      说明:一个进程的相关资源包括:内存页(在一个进程内的所有线程有相同的内存视图)、文件描述符(例如:开放sockets)和安全凭证(例如:开启进程的用户ID)。

    2. 什么是进程
      An executing instance of a program is called a process.
      程序中正在执行的实例被称作进程。
      Each process provides the resources needed to execute a program.A process has a virtual address space, executable code, open handles to system objects, a security context, a unique process identifier, environment variables, a priority class, minimum and maximum working set sizes, and at least one thread of execution. 
      每一个进程提供了程序运行所需的资源。进程有一个虚拟地址空间、执行代码、系统对象的打开句柄、安全上下文、唯一进程标识符、环境变量、优先级类、最小和最大工作集大小和至少一个执行进程。
      Each process is started with a single thread, often called the primary thread, but can create additional threads from any of its threads.
      每一个进程都是用一个线程(通常称为主线程)启动的,但是可以从它的任意线程创建额外的线程。

    3. 进程与线程的区别

      1. Threads share the address space of the process that created it; processes have their own address space.
        线程共享创建它的进程的地址空间;
        进程有自己的地址空间。
      2. Threads have direct access to the data segment of its process; processes have their own copy of the data segment of the parent process.
        线程可以直接访问所属进程的地址段;
        进程拥有父进程的数据段拷贝。
      3. Threads can directly communicate with other threads of its process; processes must use interprocess communication to communicate with sibling processes.
        线程可以直接与其进程下的其它线程通信;
        进程必须使用进程间通讯才能和其兄弟进程通信。
      4. New threads are easily created; new processes require duplication of the parent process.
        新线程很容易被创建;
        新进程需要复制其父进程。
      5. Threads can exercise considerable control over threads of the same process; processes can only exercise control over child processes.
        线程可以对同一进程下的线程进行相当大的控制;
        进程只能控制子进程。
      6. Changes to the main thread (cancellation, priority change, etc.) may affect the behavior of the other threads of the process; changes to the parent process does not affect child processes.
        改变主线程(取消、优先级改变等)可能会影响进程下其它线程的行为;
        改变父进程不会影响子进程。
    4. Python GIL(Global Interpreter Lock)全局解释器锁
      CPython implementation detail: In CPython, due to the Global Interpreter Lock, only one thread can execute Python code at once (even though certain performance-oriented libraries might overcome this limitation). If you want your application to make better use of the computational resources of multi-core machines, you are advised to use multiprocessing. However, threading is still an appropriate model if you want to run multiple I/O-bound tasks simultaneously.
      Cpython执行详情:
      在Cpython中,由于全局解释器所的存在,在同一时刻只能有一个线程执行python代码(即使某些面向性能的库可以克服这个限制)。如果你想你的程序能够更好的使用多核cpu电脑的计算资源,建议你使用multiprocessing。然而,如果你想运行密集型I/O任务,线程任然是一个合适的模型。
      由于GIL的限制,python中的多线程都是假的多线程。

  2. threading模块

    1. 线程的2种调用方式

      1. 直接调用:
         

        import threading
        import time
         
        def sayhi(num): #定义每个线程要运行的函数
         
            print("running on number:%s" %num)
         
            time.sleep(3)
         
        if __name__ == '__main__':
         
            t1 = threading.Thread(target=sayhi,args=(1,)) #生成一个线程实例
            t2 = threading.Thread(target=sayhi,args=(2,)) #生成另一个线程实例
         
            t1.start() #启动线程
            t2.start() #启动另一个线程
         
            print(t1.getName()) #获取线程名
            print(t2.getName())

         

      2. 继承式类:
         

        import threading
        import time
         
         
        class MyThread(threading.Thread):
            def __init__(self,num):
                threading.Thread.__init__(self)
                self.num = num
         
            def run(self):#定义每个线程要运行的函数
         
                print("running on number:%s" %self.num)
         
                time.sleep(3)
         
        if __name__ == '__main__':
         
            t1 = MyThread(1)
            t2 = MyThread(2)
            t1.start()
            t2.start()

         

    2. Join 和 Daemon
       

      import threading
      from time import ctime,sleep
      import time
      
      def music(func):
          for i in range(2):
              print ("Begin listening to %s. %s" %(func,ctime()))
              sleep(4)
              print("end listening %s"%ctime())
      
      def move(func):
          for i in range(2):
              print ("Begin watching at the %s! %s" %(func,ctime()))
              sleep(5)
              print('end watching %s'%ctime())
      
      threads = []
      t1 = threading.Thread(target=music,args=('七里香',))
      threads.append(t1)
      t2 = threading.Thread(target=move,args=('阿甘正传',))
      threads.append(t2)
      
      if __name__ == '__main__':
      
          for t in threads:
              # t.setDaemon(True)
              t.start()
              # t.join()
          # t1.join()
          t2.join()########考虑这三种join位置下的结果?
          print ("all over %s" %ctime())
      1. setDaemon(True)
        将线程声明为守护线程,必须在start()方法调用前设置。如果不设置为守护线程,程序会被无限挂起。这个方法基本和join是相反的。
        当我们程序运行中,执行一个主线程,如果主线程又创建一个子线程,主线程和子线程就兵分两路,分别运行。当主线程完成想要退出时,会检测子线程是否完成。如果子线程未完成,则主线程会等待子线程完成后退出。但有时候我们需要的是只要主线程完成,不管子线程是否完成,都要和主线程一起退出,这时,就需要使用setDaemon方法。

      2. join()
        在子线程完成之前,这个子线程的父线程会一直被阻塞。

      3. 其它方法

        1. threading.currentThread()
          返回当前线程的线程变量。

        2. threading.enumerate()
          返回一个包含正在运行的线程的列表。正在运行指线程启动后、结束前,不包括启动前和终止后的线程。

        3. threading.activeCount()
          返回正在运行的线程的数量,与len(threading.enumerate())有相同的效果。

        4. run()
          用以表示线程活动的方法。

        5. start()
          启动线程。

        6. join([time])
          等待至线程终止。阻塞调用线程直至线程的join()方法被调用终止,正常退出或者抛出未处理异常或者是可选的超时发生。

        7. isAlive()
          返回线程是否是活动的。

        8. getName()
          返回线程名。

        9. setName()
          设置线程名。

    3. 同步锁(Lock)
      下面的例子开启了100个线程,让每个线程对全局变量num-1操作。
      我们想要得到的结果是最终num为0。

      import time
      import threading
      
      def addNum():
          global num #在每个线程中都获取这个全局变量
          temp=num
          print('--get num:',num )
          time.sleep(0.1)
          num =temp-1 # 对此公共变量进行-1操作
      
      
      num = 100  #设定一个共享变量
      thread_list = []
      # 开启100个线程
      for i in range(100):
          t = threading.Thread(target=addNum)
          t.start()
          thread_list.append(t)
      
      for t in thread_list: #等待所有线程执行完毕
          t.join()
      
      print('final num:', num )

      执行结果:

      最终结果事与愿违没有等于0,却得到了98。

      注意:

      1:  why num-=1没问题呢?这是因为动作太快(完成这个动作在切换的时间内)

      2: if sleep(1),现象会更明显,100个线程每一个一定都没有执行完就进行了切换,我们说过sleep就等效于IO阻塞,1s之内不会再切换回来,所以最后的结果一定是99.
       

      多个线程都在同时操作同一个共享资源,所以造成了资源破坏,怎么办呢?

      有同学会想用join呗,但join会把整个线程给停住,造成了串行,失去了多线程的意义,而我们只需要把计算(涉及到操作公共数据)的时候串行执行。

      我们可以通过同步锁来解决这种问题。
       

      def addNum():
          global num # 在每个线程中都获取这个全局变量
          lock.acquire() # 上锁
          temp=num
          print('--get num:',num )
          time.sleep(0.1)
          num =temp-1 # 对此公共变量进行-1操作
          lock.release() # 解锁
      
      num = 100  # 设定一个共享变量
      thread_list = []
      lock = threading.Lock() # 获取线程锁
      
      for i in range(100):
          t = threading.Thread(target=addNum)
          t.start()
          thread_list.append(t)
      
      for t in thread_list: # 等待所有线程执行完毕
          t.join()
      
      print('final num:', num )
      执行结果:

      从执行结果可以看出问题已经解决。
      同步锁与GIL的关系?
      Python的线程在GIL的控制之下,线程之间,对真个python解释器,对python提供的C API的访问都是互斥的,这可以看作是Python内核级的互斥机制。但是这种互斥是我们不能控制的,我们还需要另外一种可控的互斥机制--用户级互斥。
      内核级通过互斥保护了内核的共享资源,同样,用户级互斥保护了用户程序中的共享资源。
      GIL的作用是:对于一个解释器,只能有一个thread在执行bytecode。所以,每时每刻只有一条bytecode在被一个thead执行。GIL保证了bytecode这层面上是线程安全的。
      但是如果有个操作,比如: x+=1,这个操作需要多个bytecodes操作,在自行这个操作的多条bytecodes期间的时候可能中途就换thread了,这样就出现了data races的情况了。
      我的同步锁也是保证同一个时刻只有一个线程被执行,是不是没有GIL也可以?是的。
      要GIL有什么鸟用?你没治。
       
    4. 线程死锁和递归锁
      在线程间共享多个资源的时候,如果两个线程分别占有一部分资源并且同时等待对方的资源,就会造成死锁,因为系统判断这部分资源都正在使用,所有这两个线程在无外力作用下将一直等待下去。下面是一个死锁的例子:
       

      import threading,time
      
      class myThread(threading.Thread):
          def doA(self):
              lockA.acquire()
              print(self.name,"gotlockA",time.ctime())
              time.sleep(3)
              lockB.acquire()
              print(self.name,"gotlockB",time.ctime())
              lockB.release()
              lockA.release()
      
          def doB(self):
              lockB.acquire()
              print(self.name,"gotlockB",time.ctime())
              time.sleep(2)
              lockA.acquire()
              print(self.name,"gotlockA",time.ctime())
              lockA.release()
              lockB.release()
          def run(self):
              self.doA()
              self.doB()
      if __name__=="__main__":
      
          lockA=threading.Lock()
          lockB=threading.Lock()
          threads=[]
          for i in range(5):
              threads.append(myThread())
          for t in threads:
              t.start()
          for t in threads:
              t.join()#等待线程结。

      解决办法:使用递归锁
      为了支持在同一线程中多次请求同一资源,python提供了“可重入锁”:threading.RLock。RLock内部维护着一个Lock和一个counter变量,counter记录了acquire的次数,从而使得资源可以被多次acquire。直到一个线程所有的acquire都被release,其他的线程才能获得资源。
       

      import threading
      
      
      class Account:
          def __init__(self, _id, balance):
              self.id = _id
              self.balance = balance
              self.lock = threading.RLock()  # 获取递归锁
      
          def withdraw(self, amount):
              with self.lock:  # 上锁
                  self.balance -= amount
      
          def deposit(self, amount):
              with self.lock:  # 上锁
                  self.balance += amount
      
          def drawcash(self, amount):  # lock.acquire中嵌套lock.acquire的场景
              with self.lock:  # 上锁
                  interest = 0.05
                  count = amount+amount*interest
                  self.withdraw(count)
      
      
      def transfer(_from, to, amount):
          # 锁不可以加在这里 因为其他的其它线程执行的其它方法在不加锁的情况下数据同样是不安全的
          _from.withdraw(amount)
          to.deposit(amount)
      
      
      alex = Account('alex', 1000)  # 帐户1000元
      yuan = Account('yuan', 1000)
      
      # 线程1:alex向yuan转账100元
      t1 = threading.Thread(target=transfer, args=(alex, yuan, 100))
      t1.start()
      # 线程2:yuan向alex转账200元
      t2 = threading.Thread(target=transfer, args=(yuan, alex, 200))
      t2.start()
      
      t1.join()
      t2.join()
      
      print('alex >>>', alex.balance)
      print('yuan >>>', yuan.balance)

      执行结果:

    5. 条件变量同步(Condition)
      有一类线程需要满足条件之后才能够继续执行,Python提供了threading.Condition 对象用于条件变量线程的支持,它除了能提供RLock()或Lock()的方法外,还提供了 wait()、notify()、notifyAll()方法。
      lock_con=threading.Condition([Lock/Rlock]): 锁是可选选项,不传人锁,对象自动创建一个RLock()。
       

      1. 常用方法:

        1. wait()
          条件不满足时调用,线程会释放锁并进入等待阻塞。

        2. notify()
          条件创造后调用,通知等待池激活一个线程。

        3. notifyAll()
          条件创造后调用,通知等待池激活所有线程。

      2. 例子:
        5个生产者(包包子)服务1个消费者(吃包子)。当笼屉(lists)中没有(包子)的时候,消费者等待,生产者造出包子,将包子放入笼屉,然后通知消费者。当消费者获得通知后,就开始吃包子。

        import threading,time
        from random import randint
        
        
        class Producer(threading.Thread):
            def run(self):
                global lists
                while True:
                    val = random.randint(0, 100)
                    print('生产者', self.name, 'append:', val, lists)
                    if lock.acquire():
                        lists.append(val)
                        lock.notify()  # 通知
                        lock.release()
                    time.sleep(3)
        
        
        class Consumer(threading.Thread):
            def run(self):
                global lists
                while True:
                    lock.acquire()
                    # lists为0的时候进入等待, 接收到通知后继续执行
                    if len(lists) == 0:
                        lock.wait()
                    print('消费者', self.name, 'delete:' + str(lists[0]), lists)
                    del lists[0]
                    lock.release()
                    time.sleep(0.25)
        
        
        if __name__ == '__main__':
            lists = []  # 全局lists
            lock = threading.Condition()  # 获取条件锁
            threads = []
            # 开启5个生产者进程
            for i in range(5):
                threads.append(Producer())
            # 开启1个消费者进程
            threads.append(Consumer())
            # 开启进程
            for t in threads:
                t.start()
            # join进程
            for t in threads:
                t.join()

        执行结果:

    6. 同步条件(Event)
      条件同步和条件变量同步差不多意思,只是少了锁功能,因为条件同步设计于不访问共享资源的条件环境。event=threading.Event():条件环境对象,初始值 为False;
      Event不需要通知,是根据状态值来进行控制。

      1. 方法:

        1. isSet()
          返回event的状态值。

        2. wait()
          如果event.isSet() == False 将阻塞线程

        3. set()
          设置event的装添置为True,所有阻塞池的线程激活进入就绪状态,等待操作系统调度。

        4. clear()
          恢复event的状态值为False

      2. 例子:
         

        import threading, time
        
        
        class Boss(threading.Thread):
            def run(self):
                print("BOSS:今晚大家都要加班到22:00。")
                event.isSet() or event.set() # 设置event状态为True
                time.sleep(5)
                print("BOSS:<22:00>可以下班了。")
                event.isSet() or event.set()
        
        
        class Worker(threading.Thread):
            def run(self):
                # 等待
                event.wait()
                print("Worker:哎……命苦啊!")
                time.sleep(0.25)
                # 清除状态
                event.clear()
                # 继续等待
                event.wait()
                print("Worker:OhYeah!")
        
        
        if __name__ == "__main__":
            event = threading.Event()
            threads = []
            # 5个屌丝进程
            for i in range(5):
                threads.append(Worker())
            # 1个老板进程
            threads.append(Boss())
        
            for t in threads:
                t.start()
            for t in threads:
                t.join()
        

        执行结果:
        交通灯例子:
         

        import random
        import threading
        import time
        
        
        def light():
            """
            event状态为True的话表示绿灯
            :return:
            """
            if not event.isSet():
                event.set()  # wait就不阻塞 #绿灯状态
            count = 0
            while True:
                if count < 10:
                    print('33[42;1m--green light on---33[0m')
                elif count < 13:
                    print('33[43;1m--yellow light on---33[0m')
                elif count < 20:
                    if event.isSet():
                        event.clear() # 将状态设置为false
                    print('33[41;1m--red light on---33[0m')
                else:
                    count = 0
                    event.set()  # 打开绿灯
                time.sleep(1)
                count += 1
        
        
        def car(n):
            while 1:
                time.sleep(random.randrange(10))
                if event.isSet():  # event状态为True = 绿灯
                    print("car [%s] is running.." % n)
                else:  # event状态为False = 红灯
                    print("car [%s] is waiting for the red light.." % n)
        
        
        if __name__ == '__main__':
            event = threading.Event()
            # 1个交通灯
            Light = threading.Thread(target=light)
            Light.start()
            # 3辆汽车
            for i in range(3):
                t = threading.Thread(target=car, args=(i,))
                t.start()

        执行结果:

    7. 信号量(Semaphore)
      信号量是用来控制线程并发数的,BoundedSemaphore或Semaphore管理一个内置的计数器,每当调用acquire()时-1,调用release()时+1。
      计数器不能小于0,当计数器为 0时,acquire()将阻塞线程至同步锁定状态,直到其他线程调用release()。(类似于停车位的概念)。
      BoundedSemaphore与Semaphore的唯一区别在于前者将在调用release()时检查计数 器的值是否超过了计数器的初始值,如果超过了将抛出一个异常。
       

      import threading
      import time
      
      
      class MyThread(threading.Thread):
      
          def run(self):
              if semaphore.acquire():
                  print(self.name)
                  time.sleep(3)
                  semaphore.release()
      
      
      if __name__ == '__main__':
          semaphore = threading.BoundedSemaphore(5)
          threads = []
          for i in range(100):
              threads.append(MyThread())
          for t in threads:
              t.start()

       

    8. 队列(queue)
      queue is especially useful in threaded programming when information must be exchanged safely between multiple threads.
      在线程编程的时候,当信息必须被安全的与多线程进行交换时,使用队列是非常有用的。

      1. 方法:

        1.  

           

          创建一个“队列”对象
          import Queue
          q = Queue.Queue(maxsize = 10)
          Queue.Queue类即是一个队列的同步实现。队列长度可为无限或者有限。可通过Queue的构造函数的可选参数maxsize来设定队列长度。如果maxsize小于1就表示队列长度无限。

           

          将一个值放入队列中
          q.put(10)
          调用队列对象的put()方法在队尾插入一个项目。put()有两个参数,第一个item为必需的,为插入项目的值;第二个block为可选参数,默认为
          1。如果队列当前为空且block为1,put()方法就使调用线程暂停,直到空出一个数据单元。如果block为0,put方法将引发Full异常。

          将一个值从队列中取出
          q.get()
          调用队列对象的get()方法从队头删除并返回一个项目。可选参数为block,默认为True。如果队列为空且block为True,get()就使调用线程暂停,直至有项目可用。如果队列为空且block为False,队列将引发Empty异常。

          Python Queue模块有三种队列及构造函数:
          1、Python Queue模块的FIFO队列先进先出。  class queue.Queue(maxsize)
          2、LIFO类似于堆,即先进后出。             class queue.LifoQueue(maxsize)
          3、还有一种是优先级队列级别越低越先出来。   class queue.PriorityQueue(maxsize)

          此包中的常用方法(q = Queue.Queue()):
          q.qsize() 返回队列的大小
          q.empty() 如果队列为空,返回True,反之False
          q.full() 如果队列满了,返回True,反之False
          q.full 与 maxsize 大小对应
          q.get([block[, timeout]]) 获取队列,timeout等待时间
          q.get_nowait() 相当q.get(False)
          非阻塞 q.put(item) 写入队列,timeout等待时间
          q.put_nowait(item) 相当q.put(item, False)
          q.task_done() 在完成一项工作之后,q.task_done() 函数向任务已经完成的队列发送一个信号
          q.join() 实际上意味着等到队列为空,再执行别的操作

      2. 例子:
        打炮的例子:一个大炮需要3名装弹手来为大炮弹仓装填炮弹,需要1名炮手来瞄准发射。弹仓中必须有炮弹炮手才能打炮。这里,弹仓就是队列。
         

        import queue
        import threading
        import time
        
        
        class Loader(threading.Thread):
            """
            装弹员
            """
        
            def run(self):
                global bullet_id
                while True:
                    lock.acquire()
                    q.put(bullet_id)
                    print('装弹员装填{0}号炮弹,弹仓装有炮弹{1}发'.format(bullet_id, q.qsize()))
                    bullet_id += 1
                    lock.release()
                    time.sleep(3)
        
        
        class Shooter(threading.Thread):
            """
            射手
            """
        
            def run(self):
                while True:
                    if not q.empty():
                        print('----弹仓有炮弹{0}发, 发射{1}号炮弹'.format(q.qsize(), q.get()))
                        time.sleep(0.3)
        
        
        if __name__ == '__main__':
            bullet_id = 1
            q = queue.Queue()
            print('==== 初始化弹仓数量:{0}'.format(q.qsize()))
            threads = []
            lock = threading.Lock()
            for i in range(3):
                threads.append(Loader())
            threads.append(Shooter())
            for t in threads:
                t.start()
        


        执行结果:

    9. 上下文管理器(contextlib模块)

      1. 如何使用上下文管理器
        如何打开一个文件,并写入"hello world"
         

        filename = "my.txt"
        mode = "w"
        f = open(filename,mode)
        f.write("hello world")
        f.close()

        当发生异常时(如磁盘写满),就没有机会执行第5行。当然,我们可以采用try-finally语句块进行包装:
         

        writer = open(filename,mode)
        try:
            writer.write("hello world")
        finally:
            writer.close()

        当我们进行复杂的操作时,try-finally语句就会变得丑陋,采用with语句重写:
         

        with open(filename,mode) as writer:
            writer.write("hello world")

        as指代了从open()函数返回的内容,并把它赋给了新值。with完成了try-finally的任务。

      2. 自定义上下文管理器
        with语句的作用类似于try-finally,提供一种上下文机制。要应用with语句的类,其内部必须提供两个内置函数__enter__和__exit__。前者在主体代码执行前执行,后者在主体代码执行后执行。as后面的变量,是在__enter__函数中返回的。
         

        class echo():
            def output(self):
                print "hello world"
            def __enter__(self):
                print "enter"
                return self  #可以返回任何希望返回的东西
            def __exit__(self, exception_type, value, trackback):
                print "exit"
                if exception_type == ValueError:
                    return True
                else:
                    return Flase
          
        >>>with echo as e:
            e.output()
             
        输出:
        enter
        hello world
        exit

        完整的__exit__函数如下:
         

        def __exit__(self,exc_type,exc_value,exc_tb)
        
        其中,exc_type:异常类型;
        exc_value:异常值;
        exc_tb:异常追踪信息;
        当__exit__返回True时,异常不传播

         

      3. contextlib模块
         

        contextlib模块的作用是提供更易用的上下文管理器,它是通过Generator实现的。contextlib中的contextmanager作为装饰器来提供一种针对函数级别的上下文管理机制,常用框架如下:
         

        from contextlib import contextmanager
        
        @contextmanager
        def make_context():
            print 'enter'
            try:
                yield "ok"
            except RuntimeError,err:
                print 'error',err
            finally:
                print 'exit'
                 
        >>>with make_context() as value:
            print value
             
        输出为:
            enter
            ok
            exit

        其中,yield写入try-finally中是为了保证异常安全(能处理异常)as后的变量的值是由yield返回。yield前面的语句可看作代码块执行前操作,yield之后的操作可以看作在__exit__函数中的操作。
         

        # 以线程锁为例
        
        @contextlib.contextmanager
        def loudLock():
            print 'Locking'
            lock.acquire()
            yield
            print 'Releasing'
            lock.release()
         
        with loudLock():
            print 'Lock is locked: %s' % lock.locked()
            print 'Doing something that needs locking'
         
        # Output:
        # Locking
        # Lock is locked: True
        # Doing something that needs locking
        # Releasing

         

      4. contextlib.nested减少嵌套
        对于:

        with open(filename, mode) as reader:
            with open(filename1, mode1) as writer:
                writer.write(reader.read())

        可以通过contextlib.nested进行简化:
         

        with contextlib.nested(open(filename, mode), open(filename1, mode1)) as (reader, writer):
            writer.write(reader.read())

        在python 2.7及以后,被一种新的语法取代:
         

        with open(filename, mode) as reader, open(filename1, mode1) as writer:
            writer.write(reader.read())

         

      5. contextlib.closing()
        file类直接支持上下文管理API,但有些表示打开句柄的对象并不支持,如urllib.urlopen()返回的对象。还有些遗留类,使用close()方法而不支持上下文管理器API。为了确保关闭句柄,需要使用closing()为它创建一个上下文管理器(调用类的close方法)。
         

        import contextlib
        
        
        class myclass():
            def __init__(self):
                print '__init__'
            def close(self):
                print 'close()'
             
        with contextlib.closing(myclass()):
            print 'ok'
            
        输出:
        __init__
        ok
        close()

         

    10. 自定义线程池

      1. 简单版本:
         

        import queue
        import threading
        import time
        
        
        class ThreadPool(object):
        
            def __init__(self, max_num=20):
                self.queue = queue.Queue(max_num)
                for i in range(max_num):
                    self.queue.put(threading.Thread)
        
            def get_thread(self):
                return self.queue.get()
        
            def add_thread(self):
                self.queue.put(threading.Thread)
        
        
        '''
        pool = ThreadPool(10)
        
        def func(arg, p):
            print(arg)
            time.sleep(1)
            p.add_thread()
        
        
        for i in range(30):
            Pool = pool.get_thread()
            t = Pool(target=func, args=(i, pool))
            t.start()
        '''

         

      2. 复杂版本:
         

        #!/usr/bin/env python
        # -*- coding:utf-8 -*-
        
        import queue
        import threading
        import contextlib
        import time
        
        
        StopEvent = object()
        
        class ThreadPool(object):
        
            def __init__(self, max_num, max_task_num = None):
                if max_task_num:
                    self.q = queue.Queue(max_task_num)
                else:
                    self.q = queue.Queue()
                self.max_num = max_num
                self.cancel = False
                self.terminal = False
                self.generate_list = []
                self.free_list = []
        
            def run(self, func, args, callback=None):
                """
                线程池执行一个任务
                :param func: 任务函数
                :param args: 任务函数所需参数
                :param callback: 任务执行失败或成功后执行的回调函数,回调函数有两个参数1、任务函数执行状态;2、任务函数返回值(默认为None,即:不执行回调函数)
                :return: 如果线程池已经终止,则返回True否则None
                """
                if self.cancel:
                    return
                if len(self.free_list) == 0 and len(self.generate_list) < self.max_num:
                    self.generate_thread()
                w = (func, args, callback,)#主线程
                self.q.put(w)#主线程
        
            def generate_thread(self):
                """
                创建一个线程
                """
                t = threading.Thread(target=self.call)
                t.start()
        
            def call(self):
                """
                循环去获取任务函数并执行任务函数
                """
                current_thread = threading.currentThread()
                self.generate_list.append(current_thread)
        
                event = self.q.get()#if q为空,则阻塞住,一直等到有任务进来并把它取出来
                while event != StopEvent:
        
                    func, arguments, callback = event
                    try:
                        result = func(*arguments)
                        success = True
                    except Exception as e:
                        success = False
                        result = None
        
                    if callback is not None:
                        try:
                            callback(success, result)
                        except Exception as e:
                            pass
        
                    with self.worker_state(self.free_list, current_thread):
                        if self.terminal:
                            event = StopEvent
                        else:
                            event = self.q.get()#key:该线程在这里继续等待新的任务,任务来了,继续执行
                                                #暂时将该线程对象放到free_list中。
                else:
        
                    self.generate_list.remove(current_thread)
        
            def close(self):
                """
                执行完所有的任务后,所有线程停止
                """
                self.cancel = True
                full_size = len(self.generate_list)
                while full_size:
                    self.q.put(StopEvent)
                    full_size -= 1
        
            def terminate(self):
                """
                无论是否还有任务,终止线程
                """
                self.terminal = True
        
                while self.generate_list:
                    self.q.put(StopEvent)
        
                self.q.queue.clear()
        
            @contextlib.contextmanager
            def worker_state(self, free_list, worker_thread):
                """
                用于记录线程中正在等待的线程数
                """
                free_list.append(worker_thread)#新的任务来的时候判断
                                         # if len(self.free_list) == 0 and len(self.generate_list) < self.max_num
                                         # 任务得创建新的线程来处理;如果len(self.free_list) != 0:由阻塞着的存在free_list中的线程处理(event = self.q.get())
                try:
                    yield
                finally:
                    free_list.remove(worker_thread)
        
        # How to use
        
        
        pool = ThreadPool(5)
        
        def callback(status, result):
            # status, execute action status
            # result, execute action return value
            pass
        
        
        def action(i):
            time.sleep(1)
            print(i)
        
        for i in range(30):
            ret = pool.run(action, (i,), callback)
        
        time.sleep(2)
        print(len(pool.generate_list), len(pool.free_list))
        print(len(pool.generate_list), len(pool.free_list))
        
        # pool.close()
        # pool.terminate()

         

      3. 延伸:

        import contextlib
        import socket
        
        @contextlib.contextmanager
        def context_socket(host,port):
            sk=socket.socket()
            sk.bind((host,port))
            sk.listen(5)
            try:
                yield sk
            finally:sk.close()
        
        with context_socket('127.0.0.1',8888) as socket:
            print(socket)


         

版权声明:本文来源CSDN,感谢博主原创文章,遵循 CC 4.0 by-sa 版权协议,转载请附上原文出处链接和本声明。
原文链接:https://blog.csdn.net/vevoly/article/details/86496499
站方申明:本站部分内容来自社区用户分享,若涉及侵权,请联系站方删除。
  • 发表于 2021-06-14 16:36:08
  • 阅读 ( 1253 )
  • 分类:Linux

0 条评论

请先 登录 后评论

官方社群

GO教程

猜你喜欢