在介绍多线程之前,首先需要熟悉并发和并行,同步和异步的概念。如果阅读完本文章后仍对并发和并行、同步和异步的概念不太理解,没有关系,可以通过代码来慢慢了解这些概念的意义。

1.0基础概念解释

1.1并发和并行

并发(concurrency)和并行(parallelism)是两个相似的概念。引用一个比较容易理解的说法,并发是指在一个时间段内发生若干时间的情况,并行是指在同一时刻发生若干事件的情况。

这个概念用单核CPU和多核CPU比较容易说明。在使用单核CPU是,多个工作任务是以并发的方式运行的,因为只有一个CPU,所以各个任务会分别占用CPU的一段时间依次执行。如果在自己分得的时间段没有完成任务,就会切换到另一个任务,然后在下一次得到CPU使用权的时候再继续执行,以此类推,直到完成任务。在这种情况下,因为各个任务的时间段很短、经常切换,所以给我们的感觉是“同时”进行。在使用多核CPU时,在各个核的任务能够同时运行,也就是并行。

1.2同步和异步

同步和异步也是两个值得比较的概念。下面在并发和并行框架的基础上理解同步和异步,同步就是并发或并行的各个任务不是独自运行的,任务之间有一定的交替顺序,可能在运行完成一个任务得到结果后,另一个任务才会开始运行。就像接力赛跑一样、要拿到交接棒之后下一个选手才可以开始跑。

异步则是并发或并行的各个任务可以独立运行,一个任务的运行不受另一个任务影响,任务之间就像比赛的各个选手在不同的赛道比赛一样,跑步的速度不受其他赛道选手的影响。

2.0多线程

_thread模块官方文档说明

threading官方文档说明

如果要使用多线程,那么需要先了解Python中使用多线程的两种方法。

  • 函数式:调用_thread模块中的**start_new_thread()**函数产生新线程。
  • 类包装式:调用Threading库创建线程,从threading.Thread继承。

2.1_thread模块

首先介绍函数式,在Python3中不能继续使用thread模块。为了兼容性考虑,Python3将thread重命名为_thread

下面用实例感受一下。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
import _thread
import time

#为线程定义一个函数
def print_time(threadName, delay):
count = 0
while count < 3:
time.sleep(delay)
count += 1
print(threadName,time.ctime(time.time()))
#创建两个线程
try:
_thread.start_new(print_time, ('Thread-1',1))
_thread.start_new(print_time, ('Thread-2',2))
except:
print('Error:无法启动线程')

#如果主线程结束的话,其他线程也会随之结束
while 1:
print('Main Finished')
time.sleep(15)
break

运行上述代码得到的结果是:


1
2
3
4
5
6
7
8
9
10
11
12
13
Main Finished 

Thread-1 Thu Aug 20 03:15:31 2020

Thread-1 Thu Aug 20 03:15:32 2020

Thread-2 Thu Aug 20 03:15:32 2020

Thread-1 Thu Aug 20 03:15:33 2020

Thread-2 Thu Aug 20 03:15:34 2020

Thread-2 Thu Aug 20 03:15:36 2020

_thread中使用**start_new_thread()**函数来产生新线程,语法如下:

_thread.start_new_thread(function, args[, kwargs])

其中,function表示线程函数,在上例中为print_timeargs为传递给线程函数的参数,它必须是tuple类型,在上例中为*(‘Thread-1’, 1)*;可选的 kwargs 参数指定一个关键字参数字典。

2.2threading模块

_thread提供了低级别、原始的线程,它相比于threading模块,功能还是比较有限的。threading模块提供了Thread类来处理线程,threading 模块除了包含 _thread 模块中的所有方法外,包括以下方法。

  • threading.currentThread(): 返回当前的线程变量。
  • threading.enumerate(): 返回一个包含正在运行的线程的list。正在运行指线程启动后、结束前,不包括启动前和终止后的线程。
  • threading.activeCount(): 返回正在运行的线程数量,与len(threading.enumerate())有相同的结果。

除了使用方法外,线程模块同样提供了Thread类来处理线程,Thread类提供了以下方法:

  • run(): 用以表示线程活动的方法。

  • **start():**启动线程活动。

  • join([time]): 等待至线程中止。这阻塞调用线程直至线程的join() 方法被调用中止-正常退出或者抛出未处理的异常-或者是可选的超时发生。

  • isAlive(): 返回线程是否活动的。

  • getName(): 返回线程名。

  • setName(): 设置线程名。

  • setDaemon(bool)

    一个表示这个线程是(True)否(False)守护线程的布尔值。一定要在调用 start()前设置好,不然会抛出 RuntimeError 。初始值继承于创建线程;主线程不是守护线程,因此主线程创建的所有线程默认都是 daemon = False。当没有存活的非守护线程时,整个Python程序才会退出。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
import threading
import time

exitFlag = 0

class myThread (threading.Thread):
def __init__(self, threadID, name, counter):
threading.Thread.__init__(self)
self.threadID = threadID
self.name = name
self.counter = counter
def run(self):
print ("开始线程:" + self.name)
print_time(self.name, self.counter, 5)
print ("退出线程:" + self.name)

def print_time(threadName, delay, counter):
while counter:
if exitFlag:
threadName.exit()
time.sleep(delay)
print ("%s: %s" % (threadName, time.ctime(time.time())))
counter -= 1

# 创建新线程
thread1 = myThread(1, "Thread-1", 1)
thread2 = myThread(2, "Thread-2", 2)

# 开启新线程
thread1.start()
thread2.start()
thread1.join()
thread2.join()
print ("退出主线程")

运行上述代码得到的结果是:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
开始线程:Thread-1 

开始线程:Thread-2

Thread-1: Thu Aug 20 03:55:24 2020

Thread-1: Thu Aug 20 03:55:25 2020

Thread-2: Thu Aug 20 03:55:25 2020

Thread-1: Thu Aug 20 03:55:26 2020

Thread-1: Thu Aug 20 03:55:27 2020

Thread-2: Thu Aug 20 03:55:27 2020

Thread-1: Thu Aug 20 03:55:28 2020

退出线程:Thread-1

Thread-2: Thu Aug 20 03:55:29 2020

Thread-2: Thu Aug 20 03:55:31 2020

Thread-2: Thu Aug 20 03:55:33 2020

退出线程:Thread-2

退出主线程

2.3线程同步

2.3.1线程同步说明

如果多个线程共同对某个数据修改,则可能出现不可预料的结果,为了保证数据的正确性,需要对多个线程进行同步。

使用 Thread 对象的 Lock 和 Rlock 可以实现简单的线程同步,这两个对象都有 acquire 方法和 release 方法,对于那些需要每次只允许一个线程操作的数据,可以将其操作放到 acquire 和 release 方法之间。如下:

多线程的优势在于可以同时运行多个任务(至少感觉起来是这样)。但是当线程需要共享数据时,可能存在数据不同步的问题。

考虑这样一种情况:一个列表里所有元素都是0,线程”set”从后向前把所有元素改成1,而线程”print”负责从前往后读取列表并打印。

那么,可能线程”set”开始改的时候,线程”print”便来打印列表了,输出就成了一半0一半1,这就是数据的不同步。为了避免这种情况,引入了锁的概念。

锁有两种状态——锁定和未锁定。每当一个线程比如”set”要访问共享数据时,必须先获得锁定;如果已经有别的线程比如”print”获得锁定了,那么就让线程”set”暂停,也就是同步阻塞;等到线程”print”访问完毕,释放锁以后,再让线程”set”继续。

经过这样的处理,打印列表时要么全部输出0,要么全部输出1,不会再出现一半0一半1的尴尬场面。

2.3.2示例代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
import threading
import time

class myThread (threading.Thread):
def __init__(self, threadID, name, counter):
threading.Thread.__init__(self)
self.threadID = threadID
self.name = name
self.counter = counter
def run(self):
print ("开启线程: " + self.name)
# 获取锁,用于线程同步
threadLock.acquire()
print_time(self.name, self.counter, 3)
# 释放锁,开启下一个线程
threadLock.release()

def print_time(threadName, delay, counter):
while counter:
time.sleep(delay)
print ("%s: %s" % (threadName, time.ctime(time.time())))
counter -= 1

threadLock = threading.Lock()
threads = []

# 创建新线程
thread1 = myThread(1, "Thread-1", 1)
thread2 = myThread(2, "Thread-2", 2)

# 开启新线程
thread1.start()
thread2.start()

# 添加线程到线程列表
threads.append(thread1)
threads.append(thread2)

# 等待所有线程完成
for t in threads:
t.join()
print ("退出主线程")

运行上述代码得到的结果是:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
开启线程: Thread-1 

开启线程: Thread-2

Thread-1: Thu Aug 20 04:06:06 2020

Thread-1: Thu Aug 20 04:06:07 2020

Thread-1: Thu Aug 20 04:06:08 2020

Thread-2: Thu Aug 20 04:06:10 2020

Thread-2: Thu Aug 20 04:06:12 2020

Thread-2: Thu Aug 20 04:06:14 2020

退出主线程

2.4线程优先级队列(Queue)

2.4.1Queue模块基本说明

Python 的 Queue 模块中提供了同步的、线程安全的队列类,包括FIFO(先入先出)队列Queue,LIFO(后入先出)队列LifoQueue,和优先级队列 PriorityQueue。

这些队列都实现了锁原语,能够在多线程中直接使用,可以使用队列来实现线程间的同步。

Queue 模块中的常用方法:

  • Queue.qsize() 返回队列的大小
  • Queue.empty() 如果队列为空,返回True,反之False
  • Queue.full() 如果队列满了,返回True,反之False
  • Queue.fullmaxsize 大小对应
  • Queue.get([block[, timeout]])获取队列,timeout等待时间
  • Queue.get_nowait() 相当Queue.get(False)
  • Queue.put(item) 写入队列,timeout等待时间
  • Queue.put_nowait(item) 相当Queue.put(item, False)
  • Queue.task_done() 在完成一项工作之后,Queue.task_done()函数向任务已经完成的队列发送一个信号
  • Queue.join() 实际上意味着等到队列为空,再执行别的操作

2.4.1示例代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
import queue
import threading
import time

exitFlag = 0

class myThread (threading.Thread):
def __init__(self, threadID, name, q):
threading.Thread.__init__(self)
self.threadID = threadID
self.name = name
self.q = q
def run(self):
print ("开启线程:" + self.name)
process_data(self.name, self.q)
print ("退出线程:" + self.name)

def process_data(threadName, q):
while not exitFlag:
queueLock.acquire()
if not workQueue.empty():
data = q.get()
queueLock.release()
print ("%s processing %s" % (threadName, data))
else:
queueLock.release()
time.sleep(1)

threadList = ["Thread-1", "Thread-2", "Thread-3"]
nameList = ["One", "Two", "Three", "Four", "Five"]
queueLock = threading.Lock()
workQueue = queue.Queue(10)
threads = []
threadID = 1

# 创建新线程
for tName in threadList:
thread = myThread(threadID, tName, workQueue)
thread.start()
threads.append(thread)
threadID += 1

# 填充队列
queueLock.acquire()
for word in nameList:
workQueue.put(word)
queueLock.release()

# 等待队列清空
while not workQueue.empty():
pass

# 通知线程是时候退出
exitFlag = 1

# 等待所有线程完成
for t in threads:
t.join()
print ("退出主线程")

由于Queue是线程安全的队列,故不用Lock对象来进行线程同步也可以,下面代码和上述代码得到的结果一样

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
import queue
import threading
import time

exitFlag = 0

class myThread (threading.Thread):
def __init__(self, threadID, name, q):
threading.Thread.__init__(self)
self.threadID = threadID
self.name = name
self.q = q
def run(self):
print ("开启线程:" + self.name)
process_data(self.name, self.q)
print ("退出线程:" + self.name)

def process_data(threadName, q):
while not exitFlag:
if not workQueue.empty():
data = q.get()
print ("%s processing %s" % (threadName, data))
time.sleep(1)

threadList = ["Thread-1", "Thread-2", "Thread-3"]
nameList = ["One", "Two", "Three", "Four", "Five"]
workQueue = queue.Queue(10)
threads = []
threadID = 1

# 创建新线程
for tName in threadList:
thread = myThread(threadID, tName, workQueue)
thread.start()
threads.append(thread)
threadID += 1

# 填充队列
for word in nameList:
workQueue.put(word)

# 等待队列清空
while not workQueue.empty():
pass

# 通知线程是时候退出
exitFlag = 1

# 等待所有线程完成
for t in threads:
t.join()
print ("退出主线程")

运行上述代码得到的结果是:

1
2
3
4
5
6
7
8
9
10
11
12
开启线程:Thread-1
开启线程:Thread-2
开启线程:Thread-3
Thread-2 processing One
Thread-3 processing Two
Thread-1 processing Three
Thread-2 processing Four
Thread-3 processing Five
退出线程:Thread-1
退出线程:Thread-2
退出线程:Thread-3
退出主线程

3.0多进程