博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
python多线程与threading模块
阅读量:6247 次
发布时间:2019-06-22

本文共 14499 字,大约阅读时间需要 48 分钟。

 中介绍了线程的基本概念以及_thread模块的简单示例。然而,_thread模块过于简单,使得我们无法用它来准确地控制线程,本文介绍threading模块,它提供了更强大的多线程管理方案。

 

threading模块的对象

Thread  表示一个执行线程的对象 

Lock  锁原语

RLock  可重入锁对象,使单一线程可以再次获得已持有的锁(递归锁)

Condition  条件变量对象,使得一个线程等待另一个线程满足特定条件

Event  条件变量的通用版本,任意数量的线程等待某个事件的发生,该事件发生后所有线程将被激活

Semaphore  为线程间的共享资源提供了一个计数器,如果没有可用资源时会被阻塞

BoundedSemaphone  与Semaphore相似,不过它不允许超过初始值

Timer  与Thread相似,不过运行前要等待一段时间

Barrier  创建一个”障碍“,必须要达到指定数量的线程才能继续

Thread对象

Thread类表示在单独的控制线程中运行的活动。

主要方法:

threading.Thread(group=None, target=None, name=None, args=(), kwargs={}, *, daemon=None)

//target是将被run()方法调用的可调用对象。默认为None,表示不调用任何东西。

//name是线程的名字。默认情况下,以“Thread-N”的形式构造一个唯一的名字,N是一个小的十进制整数。

//args是给调用目标的参数元组。默认为()

//kwargs是给调用目标的关键字参数的一个字典。默认为{}

//daemon表示是否为守护线程

 

start()  开始执行线程  

run()  //定义线程功能

join(timeout=None)  //直至启动的线程之前一直挂起,除非给出timeout时间,否则一直阻塞

gerName()  //返回线程名

属性:

name  //线程名

ident  //线程标识符

daemon  //是否为守护线程

 

Threading模块主要函数

threading.active_count()  //
返回当前处于alive状态的Thread对象的个数。返回的数目等于enumerate()返回的列表的长度。
threading.current_thread()  //
返回当前的Thread对象,对应于调用者控制的线程。
threading.get_ident()  //
返回当前线程的'线程标识符'。它是一个非零的整数。它的价值没有直接的意义。
threading.enumerate()  //
返回当前活着的Thread对象的列表。该列表包括守护线程、由current_thread()创建的虚假线程对象和主线程。它不包括已终止的线程和尚未开始的线程。
threading.main_thread()  //
返回主 Thread 对象。在正常情况下,主线程是从 Python 解释器中启动的线程。
守护线程

如果把一个线程设置为守护线程,就表示这个线程是不重要的,进程退出时不需要等待这些线程执行完成。

执行如下赋值语句可以将一个线程设置为守护线程。thread.daemon=True。

一个新线程会继承父线程的守护标记。 

使用锁

锁有两种状态:锁定和未锁定。同时也只支持两种函数,获得锁和释放锁。

lock.acquire()  //获取锁

lock.release()  //释放锁

当多线程竞争时,允许第一个获得锁的线程进入临界区,执行相应代码。所有之后到达的线程将被阻塞,直到第一个线程执行结束,退出临界区,释放锁。此时,等待的线程可以获得锁并进入临界区,哪个正在等待的线程获取锁是随机的

1 #!/usr/bin/env python3 2 # coding:utf-8 3 from atexit import register 4 from random import randrange 5 from threading import Thread, current_thread, Lock 6 from time import sleep, ctime 7  8  9 class CleanOutputSet(set):10     def __str__(self):    //改变输出格式11         return ', '.join(x for x in self)12 13 14 #lock = Lock()15 loops = (randrange(2, 5) for x in range(randrange(3, 7)))16 remaining = CleanOutputSet()17 18 19 def loop(nsec):20     myname = current_thread().name    //获取当前线程名21     remaining.add(myname)    //添加到集合22     print('[{0}] Start {1}'.format(ctime(), myname))23     sleep(nsec)24     remaining.remove(myname)25     print('[{0}] Completed {1} ({2} secs)'.format(ctime(), myname, nsec))26     print('  (remaining: {0})'.format(remaining or 'None'))27 28 29 def main():30     for pause in loops:31         Thread(target=loop, args=(pause,)).start()32 33 34 @register35 def _atexit():36     print('all DONE at:', ctime())37 38 39 if __name__ == '__main__':40     main()
不使用锁的情况
1 [Wed Jan 24 20:41:48 2018] Start Thread-1 2 [Wed Jan 24 20:41:48 2018] Start Thread-2 3 [Wed Jan 24 20:41:48 2018] Start Thread-3 4 [Wed Jan 24 20:41:48 2018] Start Thread-4 5 [Wed Jan 24 20:41:48 2018] Start Thread-5 6 [Wed Jan 24 20:41:48 2018] Start Thread-6 7 [Wed Jan 24 20:41:50 2018] Completed Thread-3 (2 secs) 8   (remaining: Thread-6, Thread-1, Thread-2, Thread-5, Thread-4) 9 [Wed Jan 24 20:41:50 2018] Completed Thread-4 (2 secs)10   (remaining: Thread-6, Thread-1, Thread-2, Thread-5)11 [Wed Jan 24 20:41:51 2018] Completed Thread-5 (3 secs)12   (remaining: Thread-6, Thread-1, Thread-2)13 [Wed Jan 24 20:41:52 2018] Completed Thread-1 (4 secs)14 [Wed Jan 24 20:41:52 2018] Completed Thread-6 (4 secs)15   (remaining: Thread-2)16   (remaining: Thread-2)17 [Wed Jan 24 20:41:52 2018] Completed Thread-2 (4 secs)18   (remaining: None)19 all DONE at: Wed Jan 24 20:41:52 2018
输出结果

输出结果中可以看到,多个线程并行I/O导致结果混乱。I/O和访问相同的数据结构都属于临界区,因此需要用锁来防止多个线程同时进入临界区。

1 #!/usr/bin/env python3 2 # coding:utf-8 3 from atexit import register 4 from random import randrange 5 from threading import Thread, current_thread, Lock 6 from time import sleep, ctime 7  8  9 class CleanOutputSet(set):10     def __str__(self):  # 改变输出格式11         return ', '.join(x for x in self)12 13 14 lock = Lock()15 loops = (randrange(2, 5) for x in range(randrange(3, 7)))16 remaining = CleanOutputSet()17 18 19 def loop(nsec):20     myname = current_thread().name   # 获取当前线程名21     lock.acquire()22     remaining.add(myname)   # 加入集合23     print('[{0}] Start {1}'.format(ctime(), myname))24     lock.release()25     sleep(nsec)26     lock.acquire()27     remaining.remove(myname)28     print('[{0}] Completed {1} ({2} secs)'.format(ctime(), myname, nsec))29     print('  (remaining: {0})'.format(remaining or 'None'))30     lock.release()31 32 33 def main():34     for pause in loops:35         Thread(target=loop, args=(pause,)).start()36 37 38 @register39 def _atexit():40     print('all DONE at:', ctime())41 42 43 if __name__ == '__main__':44     main()
使用锁
1 [Wed Jan 24 20:51:11 2018] Start Thread-1 2 [Wed Jan 24 20:51:11 2018] Start Thread-2 3 [Wed Jan 24 20:51:11 2018] Start Thread-3 4 [Wed Jan 24 20:51:11 2018] Start Thread-4 5 [Wed Jan 24 20:51:11 2018] Start Thread-5 6 [Wed Jan 24 20:51:13 2018] Completed Thread-2 (2 secs) 7   (remaining: Thread-3, Thread-1, Thread-4, Thread-5) 8 [Wed Jan 24 20:51:14 2018] Completed Thread-3 (3 secs) 9   (remaining: Thread-1, Thread-4, Thread-5)10 [Wed Jan 24 20:51:15 2018] Completed Thread-4 (4 secs)11   (remaining: Thread-1, Thread-5)12 [Wed Jan 24 20:51:15 2018] Completed Thread-1 (4 secs)13   (remaining: Thread-5)14 [Wed Jan 24 20:51:15 2018] Completed Thread-5 (4 secs)15   (remaining: None)16 all DONE at: Wed Jan 24 20:51:15 2018
输出结果

使用信号量

锁只能控制临界区的访问,面对一些更复杂的情况就无能为力了。比如,读者与写者共享一段8单位长的缓冲区,读者每次取走1单位,写者每次产生1单位。使用锁显然无法解决,这就用到信号量。

信号量时最古老的原语之一。它是一个计数器,当资源消耗时递减,资源释放时递增。

threading.Semaphore(value=1)  //构造函数(也可使用threading.BoundedSemaphore(value=1),value可自行设置,BoundedSemaphore创建的信号量不可超过初值,超过将抛出ValueError)

semaphore.acquire(block=true,timeout=None)  //资源消耗,block为true时,若无法获得资源,将阻塞timeout时间等待其他线程释放资源。block为false时,无法获得资源将抛出ValueError

semaphore.release()  //资源增加

1 #!/usr/bin/env python3 2 # coding:utf-8 3 from atexit import register 4 from random import randrange 5 from threading import BoundedSemaphore, Lock, Thread 6 from time import sleep, ctime 7  8 lock = Lock() 9 MAX = 510 candytray = BoundedSemaphore(MAX)11 12 13 def refill():14     lock.acquire()15     print('Refilling candy...')16     try:17         candytray.release()18     except ValueError:19         print('full, skipping')20     else:21         print('OK')22     lock.release()23 24 25 def buy():26     lock.acquire()27     print('Buying candy...')28     if candytray.acquire(False):29         print('OK')30     else:31         print('empty, skipping')32     lock.release()33 34 35 def producer(loops):36     for i in range(loops):37         refill()38         sleep(randrange(3))39 40 41 def consumer(loops):42     for i in range(loops):43         buy()44         sleep(randrange(3))45 46 47 def main():48     print('starting at:', ctime())49     nloops = randrange(2, 6)50     print('THE CANDY MACHINE (full with {0} bars)'.format(MAX))51     Thread(target=consumer, args=(randrange(nloops, nloops+MAX+2),)).start()52     Thread(target=producer, args=(nloops,)).start()53 54 55 @register56 def _atexit():57     print('all DONE at:', ctime())58 59 60 if __name__ == '__main__':61     main()
使用信号量

以上为糖果机信号量示例。糖果机5个槽,refill()函数添加糖果,buy()函数买走糖果。这两个函数对型号量的作用是相反的。

starting at: Wed Jan 24 21:16:00 2018THE CANDY MACHINE (full with 5 bars)Buying candy...OKRefilling candy...OKRefilling candy...full, skippingBuying candy...OKRefilling candy...OKBuying candy...OKBuying candy...OKBuying candy...OKBuying candy...OKBuying candy...OKBuying candy...empty, skippingBuying candy...empty, skippingall DONE at: Wed Jan 24 21:16:05 2018
输出结果

使用Condition

可以把Condiftion理解为一把高级的琐,它提供了比Lock, RLock更高级的功能,允许我们能够控制复杂的线程同步问题。threadiong.Condition在内部维护一个琐对象(默认是RLock),可以在创建Condigtion对象的时候把琐对象作为参数传入。Condition也提供了acquire, release方法,其含义与琐的acquire, release方法一致,其实它只是简单的调用内部琐对象的对应的方法而已。Condition还提供了如下方法(特别要注意:这些方法只有在占用琐(acquire)之后才能调用,否则将会报RuntimeError异常。):

Condition.wait([timeout])  //wait方法释放内部所占用的琐,同时线程被挂起,直至接收到通知被唤醒或超时(如果提供了timeout参数的话)。当线程被唤醒并重新占有琐的时候,程序才会继续执行下去。

Condition.notify()  //唤醒一个挂起的线程(如果存在挂起的线程)。注意:notify()方法不会释放所占用的琐。

Condition.notify_all()

Condition.notifyAll()  //唤醒所有挂起的线程(如果存在挂起的线程)。注意:这些方法不会释放所占用的琐。

 

1 from threading import Condition, Thread 2 from time import ctime 3  4  5 con = Condition() 6  7  8 def get_odd(): 9     count = 310     while(True):11         con.acquire(timeout=1)12         print(1)13         count -= 114         if count is 0:15             break16         con.notify()17         con.wait()18 19 20 def get_even():21     count = 322     while(True):23         con.acquire(timeout=1)24         print(2)25         count -= 126         if count is 0:27             break28         con.notify()29         con.wait()30 31 32 33 def main():34     print('start at:', ctime())35     Thread(target=get_odd, name='').start()36     Thread(target=get_even, name='').start()37 38 39 if __name__ == '__main__':40     main()
View Code

 

创建线程的三种方法

以生产者消费者模型为例,缓冲区大小为5,开始为空。生产者产生一个数置入缓冲区,消费者取走一个数。生产者运行10次,消费者运行5次 

方法一:创建Thread实例,并传给它一个函数(包括函数参数)

1 #!/usr/bin/env python3 2 # coding:utf-8' 3  4 from atexit import register 5 from time import ctime 6 from random import randint 7 from threading import BoundedSemaphore, Semaphore, Lock, Thread 8  9 lock = Lock()10 buffer = [0 for x in range(5)]11 MAX = len(buffer)12 sem_full = Semaphore(0)13 sem_empty = BoundedSemaphore(MAX)14 15 16 def producer():17     sem_empty.acquire()18     lock.acquire()19     print("produce a number")20     for i in range(len(buffer)):21         if buffer[i] is 0:22             buffer[i] = randint(1, 10)23             break24     print(buffer)25     sem_full.release()26     lock.release()27 28 29 def consumer():30     sem_full.acquire()31     lock.acquire()32     print("consume a number")33     for i in range(len(buffer)):34         if buffer[i] is not 0:35             buffer[i] = 036             break37     print(buffer)38     sem_empty.release()39     lock.release()40 41 42 def main():43     print('starting at:', ctime())44     for i in range(5):45         Thread(target=consumer, args='').start()46         Thread(target=producer, args='').start()47         Thread(target=producer, args='').start()48 49 50 @register51 def _atexit():52     print("all DOne at:", ctime())53 54 55 if __name__ == '__main__':56     main()

方法二:创建Thread实例,传给它一个可调用的类实例(将调用__call__方法)

1 #!/usr/bin/env python3 2 # coding:utf-8 3  4 from atexit import register 5 from time import ctime 6 from random import randint 7 from threading import BoundedSemaphore, Semaphore, Lock, Thread 8  9 lock = Lock()10 buffer = [0 for x in range(5)]11 MAX = len(buffer)12 sem_full = Semaphore(0)13 sem_empty = BoundedSemaphore(MAX)14 15 16 class ThreadFunc(object):17 18     def __init__(self, func, args, name=""):19         self.name = name20         self.func = func21         self.args = args22 23     def __call__(self, *args, **kwargs):24         self.func(*self.args)25 26 27 def producer():28     sem_empty.acquire()29     lock.acquire()30     print("produce a number")31     for i in range(len(buffer)):32         if buffer[i] is 0:33             buffer[i] = randint(1, 10)34             break35     print(buffer)36     sem_full.release()37     lock.release()38 39 40 def consumer():41     sem_full.acquire()42     lock.acquire()43     print("consume a number")44     for i in range(len(buffer)):45         if buffer[i] is not 0:46             buffer[i] = 047             break48     print(buffer)49     sem_empty.release()50     lock.release()51 52 53 threads = []54 55 56 def main():57     print('starting at:', ctime())58     for i in range(5):59         c = Thread(target=ThreadFunc(consumer, '', consumer.__name__))60         threads.append(c)61         p1 = Thread(target=ThreadFunc(producer, '', producer.__name__))62         threads.append(p1)63         p2 = Thread(target=ThreadFunc(producer, '', producer.__name__))64         threads.append(p2)65 66     for i in range(len(threads)):67         threads[i].start()68 69     for i in range(len(threads)):70         threads[i].join()71 72 73 @register74 def _atexit():75     print("all DOne at:", ctime())76 77 78 if __name__ == '__main__':79     main()

方法三:派生Thread子类,创建子类的实例(重写run方法)

1 #!/usr/bin/env python3 2 # coding:utf-8 3  4 from atexit import register 5 from time import ctime 6 from random import randint 7 from threading import BoundedSemaphore, Semaphore, Lock, Thread 8  9 lock = Lock()10 buffer = [0 for x in range(5)]11 MAX = len(buffer)12 sem_full = Semaphore(0)13 sem_empty = BoundedSemaphore(MAX)14 15 16 class MyThread(Thread):17 18     def __init__(self, func, args, name=""):19         Thread.__init__(self)20         self.name = name21         self.func = func22         self.args = args23 24     def run(self):25         self.func(*self.args)26 27 28 def producer():29     sem_empty.acquire()30     lock.acquire()31     print("produce a number")32     for i in range(len(buffer)):33         if buffer[i] is 0:34             buffer[i] = randint(1, 10)35             break36     print(buffer)37     sem_full.release()38     lock.release()39 40 41 def consumer():42     sem_full.acquire()43     lock.acquire()44     print("consume a number")45     for i in range(len(buffer)):46         if buffer[i] is not 0:47             buffer[i] = 048             break49     print(buffer)50     sem_empty.release()51     lock.release()52 53 54 threads = []55 56 57 def main():58     print('starting at:', ctime())59     for i in range(5):60         c = MyThread(consumer, '', consumer.__name__)61         threads.append(c)62         p1 = MyThread(producer, '', producer.__name__)63         threads.append(p1)64         p2 = MyThread(producer, '', producer.__name__)65         threads.append(p2)66 67     for i in range(len(threads)):68         threads[i].start()69 70     for i in range(len(threads)):71         threads[i].join()72 73 74 @register75 def _atexit():76     print("all DOne at:", ctime())77 78 79 if __name__ == '__main__':80     main()

输出结果

starting at: Wed Jan 24 22:25:05 2018produce a number[7, 0, 0, 0, 0]produce a number[7, 9, 0, 0, 0]consume a number[0, 9, 0, 0, 0]produce a number[7, 9, 0, 0, 0]consume a number[0, 9, 0, 0, 0]consume a number[0, 0, 0, 0, 0]produce a number[9, 0, 0, 0, 0]consume a number[0, 0, 0, 0, 0]produce a number[9, 0, 0, 0, 0]produce a number[9, 5, 0, 0, 0]produce a number[9, 5, 6, 0, 0]produce a number[9, 5, 6, 10, 0]consume a number[0, 5, 6, 10, 0]produce a number[7, 5, 6, 10, 0]produce a number[7, 5, 6, 10, 2]all DOne at: Wed Jan 24 22:25:05 2018
View Code

其他未提及的对象及方法可参考python标准库

 

转载于:https://www.cnblogs.com/lht-record/p/8343876.html

你可能感兴趣的文章
[Android Studio 权威教程]断点调试和高级调试
查看>>
阶乘求和之最后一位
查看>>
Eclipse 乱码解决方案(UTF8 -- GBK)
查看>>
网络编程
查看>>
Debian安装Chrome
查看>>
民生银行十五年的数据体系建设,深入解读阿拉丁大数据生态圈、人人BI 是如何养成的?【转】...
查看>>
使用别的电脑连接另一台电脑当中的虚拟机中的kylin项目
查看>>
空间统计笔记之二(分布模式工具集,Analyzing Patterns Toolset)
查看>>
一定要为了成功才去创业吗?
查看>>
4.2 列表生成式、迭代器与生成器
查看>>
Sql Server系列:分区表操作
查看>>
myeclipse maven tomcat插件 创建web工程
查看>>
2.java线程之ThreadLocal
查看>>
Unsafe 的简单使用
查看>>
明确价值体现
查看>>
myeclipse修改内存大小不足tomcat内存不足
查看>>
C++STL学习笔记_(2)deque双端数组知识
查看>>
CodeFoces 489E 01分数规划(二分的dp)
查看>>
浅谈CSRF攻击方式[转]
查看>>
一道淘汰85%面试者的百度开发者面试题参考答案
查看>>