在介绍多线程之前,首先需要熟悉并发和并行,同步和异步的概念。如果阅读完本文章后仍对并发和并行、同步和异步的概念不太理解,没有关系,可以通过代码来慢慢了解这些概念的意义。
1.0基础概念解释 1.1并发和并行 并发 (concurrency)和并行 (parallelism)是两个相似的概念。引用一个比较容易理解的说法,并发是指在一个时间段内发生若干时间的情况,并行是指在同一时刻发生若干事件的情况。
这个概念用单核CPU和多核CPU比较容易说明。在使用单核CPU是,多个工作任务是以并发的方式运行的,因为只有一个CPU,所以各个任务会分别占用CPU的一段时间依次执行。如果在自己分得的时间段没有完成任务,就会切换到另一个任务,然后在下一次得到CPU使用权的时候再继续执行,以此类推,直到完成任务。在这种情况下,因为各个任务的时间段很短、经常切换,所以给我们的感觉是“同时”进行。在使用多核CPU时,在各个核的任务能够同时运行,也就是并行。
1.2同步和异步 同步和异步也是两个值得比较的概念。下面在并发和并行框架的基础上理解同步和异步,同步就是并发或并行的各个任务不是独自运行的,任务之间有一定的交替顺序,可能在运行完成一个任务得到结果后,另一个任务才会开始运行。就像接力赛跑一样、要拿到交接棒之后下一个选手才可以开始跑。
异步则是并发或并行的各个任务可以独立运行,一个任务的运行不受另一个任务影响,任务之间就像比赛的各个选手在不同的赛道比赛一样,跑步的速度不受其他赛道选手的影响。
2.0多线程 _thread模块 官方文档说明
threading 官方文档说明
如果要使用多线程,那么需要先了解Python中使用多线程的两种方法。
函数式:调用_thread 模块中的**start_new_thread()**函数产生新线程。
类包装式:调用Threading 库创建线程,从threading.Thread 继承。
2.1_thread模块 首先介绍函数式,在Python3中不能继续使用thread模块。为了兼容性考虑,Python3将thread 重命名为_thread 。
下面用实例感受一下。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 import _threadimport timedef print_time (threadName, delay ): count = 0 while count < 3 : time.sleep(delay) count += 1 print(threadName,time.ctime(time.time())) try : _thread.start_new(print_time, ('Thread-1' ,1 )) _thread.start_new(print_time, ('Thread-2' ,2 )) except : print('Error:无法启动线程' ) while 1 : print('Main Finished' ) time.sleep(15 ) break
运行上述代码得到的结果是:
1 2 3 4 5 6 7 8 9 10 11 12 13 Main Finished Thread-1 Thu Aug 20 03:15:31 2020 Thread-1 Thu Aug 20 03:15:32 2020 Thread-2 Thu Aug 20 03:15:32 2020 Thread-1 Thu Aug 20 03:15:33 2020 Thread-2 Thu Aug 20 03:15:34 2020 Thread-2 Thu Aug 20 03:15:36 2020
_thread 中使用**start_new_thread()**函数来产生新线程,语法如下:
_thread.start_new_thread(function , args [, kwargs ])
其中,function表示线程函数,在上例中为print_time ;args 为传递给线程函数的参数,它必须是tuple 类型,在上例中为*(‘Thread-1’, 1)*;可选的 kwargs 参数指定一个关键字参数字典。
2.2threading模块 _thread提供了低级别、原始的线程,它相比于threading模块,功能还是比较有限的。threading模块提供了Thread 类来处理线程,threading 模块除了包含 _thread 模块中的所有方法外,包括以下方法。
threading.currentThread(): 返回当前的线程变量。
threading.enumerate(): 返回一个包含正在运行的线程的list。正在运行指线程启动后、结束前,不包括启动前和终止后的线程。
threading.activeCount(): 返回正在运行的线程数量,与len(threading.enumerate())有相同的结果。
除了使用方法外,线程模块同样提供了Thread类来处理线程,Thread类提供了以下方法:
run(): 用以表示线程活动的方法。
**start():**启动线程活动。
join([time]): 等待至线程中止。这阻塞调用线程直至线程的join() 方法被调用中止-正常退出或者抛出未处理的异常-或者是可选的超时发生。
isAlive(): 返回线程是否活动的。
getName(): 返回线程名。
setName(): 设置线程名。
setDaemon(bool)
一个表示这个线程是(True)否(False)守护线程的布尔值。一定要在调用 start()前设置好,不然会抛出 RuntimeError 。初始值继承于创建线程;主线程不是守护线程,因此主线程创建的所有线程默认都是 daemon = False。当没有存活的非守护线程时,整个Python程序才会退出。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 import threadingimport timeexitFlag = 0 class myThread (threading.Thread ): def __init__ (self, threadID, name, counter ): threading.Thread.__init__(self) self.threadID = threadID self.name = name self.counter = counter def run (self ): print ("开始线程:" + self.name) print_time(self.name, self.counter, 5 ) print ("退出线程:" + self.name) def print_time (threadName, delay, counter ): while counter: if exitFlag: threadName.exit() time.sleep(delay) print ("%s: %s" % (threadName, time.ctime(time.time()))) counter -= 1 thread1 = myThread(1 , "Thread-1" , 1 ) thread2 = myThread(2 , "Thread-2" , 2 ) thread1.start() thread2.start() thread1.join() thread2.join() print ("退出主线程" )
运行上述代码得到的结果是:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 开始线程:Thread-1 开始线程:Thread-2 Thread-1: Thu Aug 20 03:55:24 2020 Thread-1: Thu Aug 20 03:55:25 2020 Thread-2: Thu Aug 20 03:55:25 2020 Thread-1: Thu Aug 20 03:55:26 2020 Thread-1: Thu Aug 20 03:55:27 2020 Thread-2: Thu Aug 20 03:55:27 2020 Thread-1: Thu Aug 20 03:55:28 2020 退出线程:Thread-1 Thread-2: Thu Aug 20 03:55:29 2020 Thread-2: Thu Aug 20 03:55:31 2020 Thread-2: Thu Aug 20 03:55:33 2020 退出线程:Thread-2 退出主线程
2.3线程同步 2.3.1线程同步说明 如果多个线程共同对某个数据修改,则可能出现不可预料的结果,为了保证数据的正确性,需要对多个线程进行同步。
使用 Thread 对象的 Lock 和 Rlock 可以实现简单的线程同步,这两个对象都有 acquire 方法和 release 方法,对于那些需要每次只允许一个线程操作的数据,可以将其操作放到 acquire 和 release 方法之间。如下:
多线程的优势在于可以同时运行多个任务(至少感觉起来是这样)。但是当线程需要共享数据时,可能存在数据不同步的问题。
考虑这样一种情况:一个列表里所有元素都是0,线程”set”从后向前把所有元素改成1,而线程”print”负责从前往后读取列表并打印。
那么,可能线程”set”开始改的时候,线程”print”便来打印列表了,输出就成了一半0一半1,这就是数据的不同步。为了避免这种情况,引入了锁的概念。
锁有两种状态——锁定和未锁定。每当一个线程比如”set”要访问共享数据时,必须先获得锁定;如果已经有别的线程比如”print”获得锁定了,那么就让线程”set”暂停,也就是同步阻塞;等到线程”print”访问完毕,释放锁以后,再让线程”set”继续。
经过这样的处理,打印列表时要么全部输出0,要么全部输出1,不会再出现一半0一半1的尴尬场面。
2.3.2示例代码 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 import threadingimport timeclass myThread (threading.Thread ): def __init__ (self, threadID, name, counter ): threading.Thread.__init__(self) self.threadID = threadID self.name = name self.counter = counter def run (self ): print ("开启线程: " + self.name) threadLock.acquire() print_time(self.name, self.counter, 3 ) threadLock.release() def print_time (threadName, delay, counter ): while counter: time.sleep(delay) print ("%s: %s" % (threadName, time.ctime(time.time()))) counter -= 1 threadLock = threading.Lock() threads = [] thread1 = myThread(1 , "Thread-1" , 1 ) thread2 = myThread(2 , "Thread-2" , 2 ) thread1.start() thread2.start() threads.append(thread1) threads.append(thread2) for t in threads: t.join() print ("退出主线程" )
运行上述代码得到的结果是:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 开启线程: Thread-1 开启线程: Thread-2 Thread-1: Thu Aug 20 04:06:06 2020 Thread-1: Thu Aug 20 04:06:07 2020 Thread-1: Thu Aug 20 04:06:08 2020 Thread-2: Thu Aug 20 04:06:10 2020 Thread-2: Thu Aug 20 04:06:12 2020 Thread-2: Thu Aug 20 04:06:14 2020 退出主线程
2.4线程优先级队列(Queue) 2.4.1Queue模块基本说明 Python 的 Queue 模块中提供了同步的、线程安全的队列类,包括FIFO(先入先出)队列Queue,LIFO(后入先出)队列LifoQueue,和优先级队列 PriorityQueue。
这些队列都实现了锁原语,能够在多线程中直接使用,可以使用队列来实现线程间的同步。
Queue 模块中的常用方法:
Queue.qsize() 返回队列的大小
Queue.empty() 如果队列为空,返回True,反之False
Queue.full() 如果队列满了,返回True,反之False
Queue.full 与 maxsize 大小对应
Queue.get([block[, timeout]])获取队列,timeout等待时间
Queue.get_nowait() 相当Queue.get(False)
Queue.put(item) 写入队列,timeout等待时间
Queue.put_nowait(item) 相当Queue.put(item, False)
Queue.task_done() 在完成一项工作之后,Queue.task_done()函数向任务已经完成的队列发送一个信号
Queue.join() 实际上意味着等到队列为空,再执行别的操作
2.4.1示例代码 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 import queueimport threadingimport timeexitFlag = 0 class myThread (threading.Thread ): def __init__ (self, threadID, name, q ): threading.Thread.__init__(self) self.threadID = threadID self.name = name self.q = q def run (self ): print ("开启线程:" + self.name) process_data(self.name, self.q) print ("退出线程:" + self.name) def process_data (threadName, q ): while not exitFlag: queueLock.acquire() if not workQueue.empty(): data = q.get() queueLock.release() print ("%s processing %s" % (threadName, data)) else : queueLock.release() time.sleep(1 ) threadList = ["Thread-1" , "Thread-2" , "Thread-3" ] nameList = ["One" , "Two" , "Three" , "Four" , "Five" ] queueLock = threading.Lock() workQueue = queue.Queue(10 ) threads = [] threadID = 1 for tName in threadList: thread = myThread(threadID, tName, workQueue) thread.start() threads.append(thread) threadID += 1 queueLock.acquire() for word in nameList: workQueue.put(word) queueLock.release() while not workQueue.empty(): pass exitFlag = 1 for t in threads: t.join() print ("退出主线程" )
由于Queue是线程安全的队列,故不用Lock对象来进行线程同步也可以,下面代码和上述代码得到的结果一样
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 import queueimport threadingimport timeexitFlag = 0 class myThread (threading.Thread ): def __init__ (self, threadID, name, q ): threading.Thread.__init__(self) self.threadID = threadID self.name = name self.q = q def run (self ): print ("开启线程:" + self.name) process_data(self.name, self.q) print ("退出线程:" + self.name) def process_data (threadName, q ): while not exitFlag: if not workQueue.empty(): data = q.get() print ("%s processing %s" % (threadName, data)) time.sleep(1 ) threadList = ["Thread-1" , "Thread-2" , "Thread-3" ] nameList = ["One" , "Two" , "Three" , "Four" , "Five" ] workQueue = queue.Queue(10 ) threads = [] threadID = 1 for tName in threadList: thread = myThread(threadID, tName, workQueue) thread.start() threads.append(thread) threadID += 1 for word in nameList: workQueue.put(word) while not workQueue.empty(): pass exitFlag = 1 for t in threads: t.join() print ("退出主线程" )
运行上述代码得到的结果是:
1 2 3 4 5 6 7 8 9 10 11 12 开启线程:Thread-1 开启线程:Thread-2 开启线程:Thread-3 Thread-2 processing One Thread-3 processing Two Thread-1 processing Three Thread-2 processing Four Thread-3 processing Five 退出线程:Thread-1 退出线程:Thread-2 退出线程:Thread-3 退出主线程
3.0多进程