中介绍了线程的基本概念以及_thread模块的简单示例。然而,_thread模块过于简单,使得我们无法用它来准确地控制线程,本文介绍threading模块,它提供了更强大的多线程管理方案。
threading模块的对象
Thread 表示一个执行线程的对象
Lock 锁原语
RLock 可重入锁对象,使单一线程可以再次获得已持有的锁(递归锁)
Condition 条件变量对象,使得一个线程等待另一个线程满足特定条件
Event 条件变量的通用版本,任意数量的线程等待某个事件的发生,该事件发生后所有线程将被激活
Semaphore 为线程间的共享资源提供了一个计数器,如果没有可用资源时会被阻塞
BoundedSemaphone 与Semaphore相似,不过它不允许超过初始值
Timer 与Thread相似,不过运行前要等待一段时间
Barrier 创建一个”障碍“,必须要达到指定数量的线程才能继续
Thread对象
Thread类表示在单独的控制线程中运行的活动。
主要方法:
threading.Thread(group=None, target=None, name=None, args=(), kwargs={}, *, daemon=None)
//target是将被run()方法调用的可调用对象。默认为None
,表示不调用任何东西。
//name是线程的名字。默认情况下,以“Thread-N”的形式构造一个唯一的名字,N是一个小的十进制整数。
//args是给调用目标的参数元组。默认为()
。
//kwargs是给调用目标的关键字参数的一个字典。默认为{}
//daemon表示是否为守护线程
start() 开始执行线程
run() //定义线程功能
join(timeout=None) //直至启动的线程之前一直挂起,除非给出timeout时间,否则一直阻塞
gerName() //返回线程名
属性:
name //线程名
ident //线程标识符
daemon //是否为守护线程
Threading模块主要函数
threading.
active_count
() // 返回当前处于alive状态的Thread对象的个数。返回的数目等于enumerate()
返回的列表的长度。 threading.
current_thread
() // 返回当前的Thread
对象,对应于调用者控制的线程。 threading.
get_ident
() // 返回当前线程的'线程标识符'。它是一个非零的整数。它的价值没有直接的意义。 threading.
enumerate
() // 返回当前活着的Thread
对象的列表。该列表包括守护线程、由current_thread()
创建的虚假线程对象和主线程。它不包括已终止的线程和尚未开始的线程。 threading.
main_thread
() // 返回主 Thread
对象。在正常情况下,主线程是从 Python 解释器中启动的线程。 守护线程 如果把一个线程设置为守护线程,就表示这个线程是不重要的,进程退出时不需要等待这些线程执行完成。
执行如下赋值语句可以将一个线程设置为守护线程。thread.daemon=True。
一个新线程会继承父线程的守护标记。
使用锁
锁有两种状态:锁定和未锁定。同时也只支持两种函数,获得锁和释放锁。
lock.acquire() //获取锁
lock.release() //释放锁
当多线程竞争时,允许第一个获得锁的线程进入临界区,执行相应代码。所有之后到达的线程将被阻塞,直到第一个线程执行结束,退出临界区,释放锁。此时,等待的线程可以获得锁并进入临界区,哪个正在等待的线程获取锁是随机的。
1 #!/usr/bin/env python3 2 # coding:utf-8 3 from atexit import register 4 from random import randrange 5 from threading import Thread, current_thread, Lock 6 from time import sleep, ctime 7 8 9 class CleanOutputSet(set):10 def __str__(self): //改变输出格式11 return ', '.join(x for x in self)12 13 14 #lock = Lock()15 loops = (randrange(2, 5) for x in range(randrange(3, 7)))16 remaining = CleanOutputSet()17 18 19 def loop(nsec):20 myname = current_thread().name //获取当前线程名21 remaining.add(myname) //添加到集合22 print('[{0}] Start {1}'.format(ctime(), myname))23 sleep(nsec)24 remaining.remove(myname)25 print('[{0}] Completed {1} ({2} secs)'.format(ctime(), myname, nsec))26 print(' (remaining: {0})'.format(remaining or 'None'))27 28 29 def main():30 for pause in loops:31 Thread(target=loop, args=(pause,)).start()32 33 34 @register35 def _atexit():36 print('all DONE at:', ctime())37 38 39 if __name__ == '__main__':40 main()
1 [Wed Jan 24 20:41:48 2018] Start Thread-1 2 [Wed Jan 24 20:41:48 2018] Start Thread-2 3 [Wed Jan 24 20:41:48 2018] Start Thread-3 4 [Wed Jan 24 20:41:48 2018] Start Thread-4 5 [Wed Jan 24 20:41:48 2018] Start Thread-5 6 [Wed Jan 24 20:41:48 2018] Start Thread-6 7 [Wed Jan 24 20:41:50 2018] Completed Thread-3 (2 secs) 8 (remaining: Thread-6, Thread-1, Thread-2, Thread-5, Thread-4) 9 [Wed Jan 24 20:41:50 2018] Completed Thread-4 (2 secs)10 (remaining: Thread-6, Thread-1, Thread-2, Thread-5)11 [Wed Jan 24 20:41:51 2018] Completed Thread-5 (3 secs)12 (remaining: Thread-6, Thread-1, Thread-2)13 [Wed Jan 24 20:41:52 2018] Completed Thread-1 (4 secs)14 [Wed Jan 24 20:41:52 2018] Completed Thread-6 (4 secs)15 (remaining: Thread-2)16 (remaining: Thread-2)17 [Wed Jan 24 20:41:52 2018] Completed Thread-2 (4 secs)18 (remaining: None)19 all DONE at: Wed Jan 24 20:41:52 2018
输出结果中可以看到,多个线程并行I/O导致结果混乱。I/O和访问相同的数据结构都属于临界区,因此需要用锁来防止多个线程同时进入临界区。
1 #!/usr/bin/env python3 2 # coding:utf-8 3 from atexit import register 4 from random import randrange 5 from threading import Thread, current_thread, Lock 6 from time import sleep, ctime 7 8 9 class CleanOutputSet(set):10 def __str__(self): # 改变输出格式11 return ', '.join(x for x in self)12 13 14 lock = Lock()15 loops = (randrange(2, 5) for x in range(randrange(3, 7)))16 remaining = CleanOutputSet()17 18 19 def loop(nsec):20 myname = current_thread().name # 获取当前线程名21 lock.acquire()22 remaining.add(myname) # 加入集合23 print('[{0}] Start {1}'.format(ctime(), myname))24 lock.release()25 sleep(nsec)26 lock.acquire()27 remaining.remove(myname)28 print('[{0}] Completed {1} ({2} secs)'.format(ctime(), myname, nsec))29 print(' (remaining: {0})'.format(remaining or 'None'))30 lock.release()31 32 33 def main():34 for pause in loops:35 Thread(target=loop, args=(pause,)).start()36 37 38 @register39 def _atexit():40 print('all DONE at:', ctime())41 42 43 if __name__ == '__main__':44 main()
1 [Wed Jan 24 20:51:11 2018] Start Thread-1 2 [Wed Jan 24 20:51:11 2018] Start Thread-2 3 [Wed Jan 24 20:51:11 2018] Start Thread-3 4 [Wed Jan 24 20:51:11 2018] Start Thread-4 5 [Wed Jan 24 20:51:11 2018] Start Thread-5 6 [Wed Jan 24 20:51:13 2018] Completed Thread-2 (2 secs) 7 (remaining: Thread-3, Thread-1, Thread-4, Thread-5) 8 [Wed Jan 24 20:51:14 2018] Completed Thread-3 (3 secs) 9 (remaining: Thread-1, Thread-4, Thread-5)10 [Wed Jan 24 20:51:15 2018] Completed Thread-4 (4 secs)11 (remaining: Thread-1, Thread-5)12 [Wed Jan 24 20:51:15 2018] Completed Thread-1 (4 secs)13 (remaining: Thread-5)14 [Wed Jan 24 20:51:15 2018] Completed Thread-5 (4 secs)15 (remaining: None)16 all DONE at: Wed Jan 24 20:51:15 2018
使用信号量
锁只能控制临界区的访问,面对一些更复杂的情况就无能为力了。比如,读者与写者共享一段8单位长的缓冲区,读者每次取走1单位,写者每次产生1单位。使用锁显然无法解决,这就用到信号量。
信号量时最古老的原语之一。它是一个计数器,当资源消耗时递减,资源释放时递增。
threading.Semaphore(value=1) //构造函数(也可使用threading.BoundedSemaphore(value=1),value可自行设置,BoundedSemaphore创建的信号量不可超过初值,超过将抛出ValueError)
semaphore.acquire(block=true,timeout=None) //资源消耗,block为true时,若无法获得资源,将阻塞timeout时间等待其他线程释放资源。block为false时,无法获得资源将抛出ValueError
semaphore.release() //资源增加
1 #!/usr/bin/env python3 2 # coding:utf-8 3 from atexit import register 4 from random import randrange 5 from threading import BoundedSemaphore, Lock, Thread 6 from time import sleep, ctime 7 8 lock = Lock() 9 MAX = 510 candytray = BoundedSemaphore(MAX)11 12 13 def refill():14 lock.acquire()15 print('Refilling candy...')16 try:17 candytray.release()18 except ValueError:19 print('full, skipping')20 else:21 print('OK')22 lock.release()23 24 25 def buy():26 lock.acquire()27 print('Buying candy...')28 if candytray.acquire(False):29 print('OK')30 else:31 print('empty, skipping')32 lock.release()33 34 35 def producer(loops):36 for i in range(loops):37 refill()38 sleep(randrange(3))39 40 41 def consumer(loops):42 for i in range(loops):43 buy()44 sleep(randrange(3))45 46 47 def main():48 print('starting at:', ctime())49 nloops = randrange(2, 6)50 print('THE CANDY MACHINE (full with {0} bars)'.format(MAX))51 Thread(target=consumer, args=(randrange(nloops, nloops+MAX+2),)).start()52 Thread(target=producer, args=(nloops,)).start()53 54 55 @register56 def _atexit():57 print('all DONE at:', ctime())58 59 60 if __name__ == '__main__':61 main()
以上为糖果机信号量示例。糖果机5个槽,refill()函数添加糖果,buy()函数买走糖果。这两个函数对型号量的作用是相反的。
starting at: Wed Jan 24 21:16:00 2018THE CANDY MACHINE (full with 5 bars)Buying candy...OKRefilling candy...OKRefilling candy...full, skippingBuying candy...OKRefilling candy...OKBuying candy...OKBuying candy...OKBuying candy...OKBuying candy...OKBuying candy...OKBuying candy...empty, skippingBuying candy...empty, skippingall DONE at: Wed Jan 24 21:16:05 2018
使用Condition
可以把Condiftion理解为一把高级的琐,它提供了比Lock, RLock更高级的功能,允许我们能够控制复杂的线程同步问题。threadiong.Condition在内部维护一个琐对象(默认是RLock),可以在创建Condigtion对象的时候把琐对象作为参数传入。Condition也提供了acquire, release方法,其含义与琐的acquire, release方法一致,其实它只是简单的调用内部琐对象的对应的方法而已。Condition还提供了如下方法(特别要注意:这些方法只有在占用琐(acquire)之后才能调用,否则将会报RuntimeError异常。):
Condition.wait([timeout]) //wait方法释放内部所占用的琐,同时线程被挂起,直至接收到通知被唤醒或超时(如果提供了timeout参数的话)。当线程被唤醒并重新占有琐的时候,程序才会继续执行下去。
Condition.notify() //唤醒一个挂起的线程(如果存在挂起的线程)。注意:notify()方法不会释放所占用的琐。
Condition.notify_all()
Condition.notifyAll() //唤醒所有挂起的线程(如果存在挂起的线程)。注意:这些方法不会释放所占用的琐。
1 from threading import Condition, Thread 2 from time import ctime 3 4 5 con = Condition() 6 7 8 def get_odd(): 9 count = 310 while(True):11 con.acquire(timeout=1)12 print(1)13 count -= 114 if count is 0:15 break16 con.notify()17 con.wait()18 19 20 def get_even():21 count = 322 while(True):23 con.acquire(timeout=1)24 print(2)25 count -= 126 if count is 0:27 break28 con.notify()29 con.wait()30 31 32 33 def main():34 print('start at:', ctime())35 Thread(target=get_odd, name='').start()36 Thread(target=get_even, name='').start()37 38 39 if __name__ == '__main__':40 main()
创建线程的三种方法
以生产者消费者模型为例,缓冲区大小为5,开始为空。生产者产生一个数置入缓冲区,消费者取走一个数。生产者运行10次,消费者运行5次
方法一:创建Thread实例,并传给它一个函数(包括函数参数)
1 #!/usr/bin/env python3 2 # coding:utf-8' 3 4 from atexit import register 5 from time import ctime 6 from random import randint 7 from threading import BoundedSemaphore, Semaphore, Lock, Thread 8 9 lock = Lock()10 buffer = [0 for x in range(5)]11 MAX = len(buffer)12 sem_full = Semaphore(0)13 sem_empty = BoundedSemaphore(MAX)14 15 16 def producer():17 sem_empty.acquire()18 lock.acquire()19 print("produce a number")20 for i in range(len(buffer)):21 if buffer[i] is 0:22 buffer[i] = randint(1, 10)23 break24 print(buffer)25 sem_full.release()26 lock.release()27 28 29 def consumer():30 sem_full.acquire()31 lock.acquire()32 print("consume a number")33 for i in range(len(buffer)):34 if buffer[i] is not 0:35 buffer[i] = 036 break37 print(buffer)38 sem_empty.release()39 lock.release()40 41 42 def main():43 print('starting at:', ctime())44 for i in range(5):45 Thread(target=consumer, args='').start()46 Thread(target=producer, args='').start()47 Thread(target=producer, args='').start()48 49 50 @register51 def _atexit():52 print("all DOne at:", ctime())53 54 55 if __name__ == '__main__':56 main()
方法二:创建Thread实例,传给它一个可调用的类实例(将调用__call__方法)
1 #!/usr/bin/env python3 2 # coding:utf-8 3 4 from atexit import register 5 from time import ctime 6 from random import randint 7 from threading import BoundedSemaphore, Semaphore, Lock, Thread 8 9 lock = Lock()10 buffer = [0 for x in range(5)]11 MAX = len(buffer)12 sem_full = Semaphore(0)13 sem_empty = BoundedSemaphore(MAX)14 15 16 class ThreadFunc(object):17 18 def __init__(self, func, args, name=""):19 self.name = name20 self.func = func21 self.args = args22 23 def __call__(self, *args, **kwargs):24 self.func(*self.args)25 26 27 def producer():28 sem_empty.acquire()29 lock.acquire()30 print("produce a number")31 for i in range(len(buffer)):32 if buffer[i] is 0:33 buffer[i] = randint(1, 10)34 break35 print(buffer)36 sem_full.release()37 lock.release()38 39 40 def consumer():41 sem_full.acquire()42 lock.acquire()43 print("consume a number")44 for i in range(len(buffer)):45 if buffer[i] is not 0:46 buffer[i] = 047 break48 print(buffer)49 sem_empty.release()50 lock.release()51 52 53 threads = []54 55 56 def main():57 print('starting at:', ctime())58 for i in range(5):59 c = Thread(target=ThreadFunc(consumer, '', consumer.__name__))60 threads.append(c)61 p1 = Thread(target=ThreadFunc(producer, '', producer.__name__))62 threads.append(p1)63 p2 = Thread(target=ThreadFunc(producer, '', producer.__name__))64 threads.append(p2)65 66 for i in range(len(threads)):67 threads[i].start()68 69 for i in range(len(threads)):70 threads[i].join()71 72 73 @register74 def _atexit():75 print("all DOne at:", ctime())76 77 78 if __name__ == '__main__':79 main()
方法三:派生Thread子类,创建子类的实例(重写run方法)
1 #!/usr/bin/env python3 2 # coding:utf-8 3 4 from atexit import register 5 from time import ctime 6 from random import randint 7 from threading import BoundedSemaphore, Semaphore, Lock, Thread 8 9 lock = Lock()10 buffer = [0 for x in range(5)]11 MAX = len(buffer)12 sem_full = Semaphore(0)13 sem_empty = BoundedSemaphore(MAX)14 15 16 class MyThread(Thread):17 18 def __init__(self, func, args, name=""):19 Thread.__init__(self)20 self.name = name21 self.func = func22 self.args = args23 24 def run(self):25 self.func(*self.args)26 27 28 def producer():29 sem_empty.acquire()30 lock.acquire()31 print("produce a number")32 for i in range(len(buffer)):33 if buffer[i] is 0:34 buffer[i] = randint(1, 10)35 break36 print(buffer)37 sem_full.release()38 lock.release()39 40 41 def consumer():42 sem_full.acquire()43 lock.acquire()44 print("consume a number")45 for i in range(len(buffer)):46 if buffer[i] is not 0:47 buffer[i] = 048 break49 print(buffer)50 sem_empty.release()51 lock.release()52 53 54 threads = []55 56 57 def main():58 print('starting at:', ctime())59 for i in range(5):60 c = MyThread(consumer, '', consumer.__name__)61 threads.append(c)62 p1 = MyThread(producer, '', producer.__name__)63 threads.append(p1)64 p2 = MyThread(producer, '', producer.__name__)65 threads.append(p2)66 67 for i in range(len(threads)):68 threads[i].start()69 70 for i in range(len(threads)):71 threads[i].join()72 73 74 @register75 def _atexit():76 print("all DOne at:", ctime())77 78 79 if __name__ == '__main__':80 main()
输出结果
starting at: Wed Jan 24 22:25:05 2018produce a number[7, 0, 0, 0, 0]produce a number[7, 9, 0, 0, 0]consume a number[0, 9, 0, 0, 0]produce a number[7, 9, 0, 0, 0]consume a number[0, 9, 0, 0, 0]consume a number[0, 0, 0, 0, 0]produce a number[9, 0, 0, 0, 0]consume a number[0, 0, 0, 0, 0]produce a number[9, 0, 0, 0, 0]produce a number[9, 5, 0, 0, 0]produce a number[9, 5, 6, 0, 0]produce a number[9, 5, 6, 10, 0]consume a number[0, 5, 6, 10, 0]produce a number[7, 5, 6, 10, 0]produce a number[7, 5, 6, 10, 2]all DOne at: Wed Jan 24 22:25:05 2018
其他未提及的对象及方法可参考python标准库