Python多线程编程


纸上得来终觉浅,绝知此事要躬行。

Python多线程编程


1. 基础知识

学习多线程、多进程编程之前,必须要先补充或者复习一下相关的基础知识,这样到后面应用的时候,就能够比较顺利了。其实对我来说,基础知识最好使用实体书籍,因为这才是心血的结晶,而并不是像培训班老师讲的那些,他们大多数都在瞎扯淡。

1.1 进程和线程

Python多线程编程

什么是进程

  • Linux 是一个多用户、多任务的操作系统,这就意味着一次可以运行一个以上的程序,每个占用一定时间运行的程序就叫一个进程。你运行的每一个命令会至少启动一个新进程,还有很多一直运行着的系统守护进程,用以维持系统的正常运作。
  • 但是个人电脑甚至服务器的 CPU 内核是有限的,而进程数会要远大于 CPU 个数,那怎么执行多任务呢?答案就是操作系统轮流让各个任务进程交替执行,任务轮流切换到前台,执行 0.01 秒后让出并切换到其他任务上,就这样反复执行下去。本质上每个任务都是交替执行的,但是由于 CPU 的执行速度实在是太快了,使用者的感觉好像所有的任务都在同时执行一样。

什么是线程

  • 有些进程还不止同时干一件事,比如打开浏览器虽然是一个进程,它可以同时访问多个网页,能输入网址,填写表单,鼠标点击翻页等等。在一个进程内部,要同时干多件事,就需要同时运行多个子任务,我们把进程内的这些子任务就称为线程

提高执行效率

  • 多进程模式
    • 启动多个进程,每个进程虽然只有一个线程,但多个进程可以一块执行多个任务
  • 多线程模式
    • 启动一个进程,进程内启动多个线程,这样多个线程也可以一块执行多个任务
  • 多进程+多线程模式
    • 启动多个进程,每个进程再启动多个线程,这样就能够更快、更多的执行多个任务

并发和并行

  • 并发 - Concurrency
    • 当有多个线程在执行的时候,但操作系统只有一个 CPU 的话,则它根本不可能真正同时执行一个以上的线程。它只能把 CPU 运行时间划分成若干个时间段,再将时间段分配给各个线程执行,在一个时间段的线程代码运行时,其它线程处于挂起的状态。
  • 并行 - Parallelism
    • 当操作系统有一个以上 CPU 时,则线程的操作有可能非并发。当一个 CPU 执行一个线程时,另一个 CPU 可以执行另一个线程,两个线程互不抢占 CPU 资源。可以同时进行并发的关键是操作系统有处理多个任务的能力,但不一定非要同时处理。
  • 区别和联系
    • 并发和并行的关键就在于操作系统是否有同时处理多个任务的能力。可以说并行是并发的一个子集。也就是说,你可以编写一个拥有多个线程或者进程的并发程序,但如果没有多核处理器来执行这个程序,那么就不能以并行方式来运行代码。

Python多线程编程


1.2 全局解释锁

全局解释锁 - Global Interpreter Lock

全局解释锁的含义

  • 全局解释锁
    • GIL 是计算机程序设计语言解释器用于同步线程的一种机制,它使得任何时刻一个进程中仅有一个线程在执行。CPython 是用 C 语言实现的 Python 解释器,作为官方实现,它使用程度最为广泛。在 CPython 里就用到了 GILGIL 也是经常被其他语言开发者吐槽 Python 语言的一个槽点。
  • 线程的分类
    • 我们知道在计算机里面,线程通常分为内核级线程和用户级线程。内核级线程的调度是由系统完成的,而用户级线程的调度是由用户来控制的。Python 官方版本提供的线程是内核级的,而 geventeventlet 则提供的是用户级的线程。这类用户级的线程,我们叫协程

线程安全

  • 线程安全的含义
    • 安全的含义:当多个线程同时运行时,能够保证运行结果我们的符合预期,这就是线程安全。由于多线程执行时,存在线程的切换,而 Python 线程的切换时机是不确定的。既有 CM 调度机制,也有 PM 调度机制。
    • CM 调度机制:当一个线程开始 sleep 等待或者进行 I/O 操作时,另一个线程就有机会拿到 GIL 锁,开始执行它的代码,从而保证最大程度上的利用 CPU 的执行效率。
    • PM 调度机制:在使用 Python2 编写的代码中,当一个线程无中断地运行了 1000 个字节码,或者在 Python3 中,运行了 5 毫秒,那么它就会放弃 GIL 锁,另一个线程就可能开始运行。
    • 安全的实现:Python 中的线程安全,就是通过加锁,来实现原子操作(不可中断),来避免不确定的线程切换导致逻辑错误。
  • 线程安全的实现
    • 天生线程安全:所谓天生线程安全,就是线程代码中只对全局对象进行读操作,而不存在写操作。这种情况下,不论线程在何处中断,都不会影响各个线程本来的执行逻辑。这时,不需要做任何额外的事情,线程本身就是安全的。
    • 实现原子操作:有时,在一个线程中需要保证某一行或者某一段代码的逻辑是不可中断的,也就是说要保证这段代码执行的原子性。实现其实很简单,就是在执行代码的前后加互斥锁,放互斥锁就可以了。标准库里面为我们提供的互斥锁有两种,一种是Lock锁,一种是RLock可重入锁。
    • 实现线程同步:线程同步是在锁的基础来实现的。通过锁来对各个线程的执行顺序进行控制。虽然在一定意义上,实现原子操作也是一种线程同步,但它更多是保证单个线程中的操作不被中断。而我理解的线程同步,是一个线程需要等待其它线程完成特定任务之后,才能执行。多个线程之间有依赖关系。

Python多线程编程

引入 GIL 的原因

  • 因为 CPython 的内存管理不是线程安全的,因此需要 GIL 来保证多个原生线程不会并发执行 Python 的字节码(bytecode)。所谓存在即合理,它在单线程的情况下是更快的,并且在与 C 库结合时更为方便,而且不用考虑线程安全问题,这也是早期 Python 最常见的应用场景和优势。
  • Python3.2 之后就开始使用了新的 GIL 机制,新的 GIL 实现中有设置了一个固定的超时时间,来指示当前线程放弃安全锁。在当前线程保持这个锁且其他的线程请求这个锁的时候,当前的线程就会在 5ms 之后被强制释放这个锁。

Python多线程编程

使用时的注意事项

  • GIL 只会影响那些严重依赖 CPU 的程序,即计算型任务。如果你的程序大部分时间都是用于 I/O 操作、网络交互等超时操作上的话,那么使用多线程就是非常合适的,因为它们大多数时间都是在等待。而对于一个计算型任务的话,采用多线程这种模可能会让这个过程变得更加慢,因为频繁的切换以及资源的复制和移动都会导致大量的内存和性能消耗。
  • 另外,在使用多线程编程的时候,可以放心的创建几千个线程。因为对于现在的操作系统来说,运行这些线程完全是没有压力的,不需要担心这个量级带来的影响。
  • 如果在使用多线程编程中如果遇到瓶颈问题的话,首先不要马上怪罪于 GIL,因为这不一定是它造成的。我们需要先排除是否是因为代码逻辑设计的问题,或者网络请求出现阻塞等问题导致的,之后再考虑 GIL 的原因。

Python多线程编程


2. 多线程编程

Python标准库中提供了thread这个线程模块,但是这是一个很底层的模块,通常会使用封装了thread的多线程模块threading

2.1 多线程的使用方式

  • 不带参数的示例
import threading


def worker():
    print('Worker')

threads = []
for i in range(5):
    t = threading.Thread(target=worker)
    threads.append(t)
    t.start()
$ python threading.py
Worker
Worker
Worker
Worker
Worker
  • 带了参数的示例
import threading


def worker(num):
    print(f'Worker {num}')

threads = []
for i in range(5):
    t = threading.Thread(target=worker, args=(i,))
    threads.append(t)
    t.start()
$ python threading.py
Worker 0
Worker 1
Worker 2
Worker 3
Worker 4

2.2 守护和非守护线程

  • 可以将一个线程设置为守护线程,表示这个线程是不重要的。这种线程可以一直运行,不阻塞主程序。在线程退出的时候,不需要等待这个线程完成就可以退出了。
  • 非守护线程就是需要等待线程全部执行完成之后,程序才能退出,否则一直等待着。如上述的这两个例子默认就是非守护线程。
  • 我们发现程序并没有等待daemon运行完毕,而是等待了non_daemon执行完毕之后才推出的。
import time
import threading


def daemon():
    print('Daemon starting ...')
    time.sleep(1.0)
    print('Daemon exiting...')

def non_daemon():
    print('NonDaemon starting ...')
    time.sleep(0.5)
    print('NonDaemon exiting...')


d = threading.Thread(name='daemon', target=daemon, daemon=True)
n = threading.Thread(name='non-daemon', target=non_daemon)
d.start()
n.start()
$ python threading.py
Daemon starting ...
NonDaemon starting ...
NonDaemon exiting...
  • 如果即希望这种守护线程不阻塞主程序,也希望主程序等待守护线程执行完毕再一起退出,可以使用join方法。
import time
import threading


def daemon():
    print('Daemon starting ...')
    time.sleep(1.0)
    print('Daemon exiting...')

def non_daemon():
    print('NonDaemon starting ...')
    time.sleep(0.5)
    print('NonDaemon exiting...')

# 设置守护线程有两种方式
d = threading.Thread(name='daemon', target=daemon, daemon=True)
n = threading.Thread(name='non-daemon', target=non_daemon)
n.setDaemon(False)

d.start()
n.start()
d.join()
n.join()
$ python threading.py
Daemon starting ...
NonDaemon starting ...
NonDaemon exiting...
Daemon exiting...

3. 同步机制

在多线程编程中,为了防止多个线程同时对同一个公共的资源进行修改,如一个全局的变量,需要对同时访问的资源数量进行限制,且同时访问的限制数量通常设为 1。在 Python 线程中,包含了多种同步机制(=^_^=)。

3.1 信号量 - Semaphore

信号量同步基于内部的计数器,每调用一次acquire计数器就会减1,表示获取了一个锁。每调用一次release计数器就会加1,表示释放了这个锁。当计数器为0的时候,acquire的调用就会被阻塞。

  • 可以使用Semaphore方法来限制同时访问资源的数量,我们这里设置的个数为3个。
  • 可以使用with关键字来实现acquirerelease功能,在每次进入的时候执行acquire,而在执行完成之后自动执行release。这样使用,可读性强且更为直观。
import time
from random import random
from threading import Thread, Semaphore


sema = Semaphore(value=3)

def foo(tid):
    with sema:
        print('{} acquire sema.'.format(tid))
        sleep_time = random() * 2
        time.sleep(sleep_time)
    print('{} release sema.'.format(tid))


threads = []
for i in range(5):
    t = Thread(target=foo, args=(i,))
    threads.append(t)
    t.start()

for t in threads:
    t.join()
$ python threading.py
0 acquire sema.
1 acquire sema.
2 acquire sema.
2 release sema.
3 acquire sema.
1 release sema.
4 acquire sema.
4 release sema.
3 release sema.
0 release sema.

3.2 锁 - Lock

互斥锁,即相对于信号量值为1Semaphore,表示同一时刻只能有一个线程来访问这个资源。但是使用了锁会损失一定的性能,因为需要其他线程等待锁的释放。

不加锁的示例

  • 理论上getlock会执行100次,value应该是100,但是结果却是10。这是因为同一时刻多个线程同时执行,如0+1的操作执行了10次,所有最后执行到9+1的时候,所有的线程都执行完毕,输出最后的结果为10
import time
from threading import Thread


value = 0

def getlock():
    global value
    new = value + 1
    # 使用sleep是为了让线程有机会进行切换
    time.sleep(0.001)
    value = new


threads = []
for i in range(100):
    t = Thread(target=getlock)
    threads.append(t)
    t.start()

for t in threads:
    t.join()
print(value)
$ python threading.py
10

加了锁的示例

  • 如果对value的自增长加了锁,则即使预期的结果,因为在同一个时间value只会被获取了锁的线程修改。
import time
from threading import Thread, Lock


value = 0
lock = Lock()

def getlock():
    global value
    with lock:
        new = value + 1
        time.sleep(0.001)
        value = new


threads = []
for i in range(100):
    t = Thread(target=getlock)
    threads.append(t)
    t.start()

for t in threads:
    t.join()
print(value)
$ python threading.py
100

3.3 可重入锁 - RLock

可重入锁就是acquire方法能够不被阻塞的被一个线程重复执行多次,但是需要注意的是release需要调用和acquire相同的次数才能够释放锁。

  • lock.acquire后面跟的参数0,表示不等待看是否能请求锁成功。我们可以看到的是使用了互斥锁第二次没有请求成功。
import threading

lock = threading.Lock()
print('First try: ', lock.acquire())
print('Second try: ', lock.acquire(0))
$ python threading.py
First try: True
Second try: False
  • 而使用了可重入锁,则第二次请求是成功的。
import threading

lock = threading.RLock()
print('First try: ', lock.acquire())
print('Second try: ', lock.acquire(0))
$ python threading.py
First try: True
Second try: True

3.4 条件 - Condition

接收条件,一个线程等待某种特定的条件,而另一个线程会发出满足这个特定条件的信号。这个同步机制最好的示例说明就是「生产者-消费者」模型。

  • 我们可以看到消费者是优先启动的,但是一直处于挂起状态,直到生产者启动之后告知消费者,最后所有的消费者才开始工作。
from time import sleep
from threading import Thread, Condition, currentThread


cond = Condition()

def consumer(cond):
    t = currentThread()
    with cond:
        # 挂起等待满足的条件
        cond.wait()
        print('{t.name}: consumer is start...'.format(t=t))

def producer(cond):
    t = currentThread()
    with cond:
        print('{t.name}: producer is start...'.format(t=t))
        # 用于唤醒所有的消费者
        cond.notifyAll()



c1 = Thread(name='consumer1', target=consumer, args=(cond,))
c2 = Thread(name='consumer2', target=consumer, args=(cond,))
pa = Thread(name='producer', target=producer, args=(cond,))

c1.start()
c2.start()
sleep(1)
pa.start()
$ python threading.py
producer: producer is start...
consumer1: consumer is start...
consumer2: consumer is start...

3.5 事件 - Event

事件模型,一个线程等待某种特定的条件,而另一个线程会发出满足这个特定条件的信号,最好的示例说明也是「生产者-消费者」模型。

  • 事件和条件是不同,在Condition条件中一个条件发出之后,所有接受这个条件的子线程都会处理,但是在Event事件中则是谁接收到谁来处理。
from time import sleep
from random import randint
from threading import Thread, Event, currentThread


event = Event()

def consumer(event, l):
    t = currentThread()
    while True:
        # 设置挂起等待的时间为2秒
        event_is_set = event.wait(2)
        if event_is_set:
            try:
                integer = l.pop()
                print(f'{integer} popped from list by {t.name}')
                # 重置时间的状态
                event.clear()
            except IndexError:
                pass

def producer(event, l):
    t = currentThread()
    while True:
        integer = randint(10, 100)
        l.append(integer)
        print(f'{integer} appended to list by {t.name}')
        # 设置事件通知消费者来处理
        event.set()
        sleep(1)


L = []
threads = []

for name in ('consumer1', 'consumer2'):
    c = Thread(name=name, target=consumer, args=(event, L))
    print(f'{name} is starting...')
    c.start()
    threads.append(c)

for name in ('producer',):
    p = Thread(name=name, target=producer, args=(event, L))
    print(f'{name} is starting...')
    p.start()
    threads.append(p)

for t in threads:
    t.join()
$ python threading.py
consumer1 is starting...
consumer2 is starting...
producer is starting...
58 appended to list by producer
58 popped from list by consumer1
16 appended to list by producer
16 popped from list by consumer1
77 appended to list by producer
77 popped from list by consumer2
99 appended to list by producer
99 popped from list by consumer1
76 appended to list by producer
76 popped from list by consumer2
10 appended to list by producer
10 popped from list by consumer1
......

4. 线程池

线程池就是把使用过的线程不销毁而直接利用,仿佛就像是把它们放到池子里面一样随意使用。一方面可以限制线程的个数,另一方面也可以避免资源的大量消耗。

4.1 队列 - Queue

队列在并发编程中是非常常用的,需要熟悉和理解。

  • 队列的方法
队列基本方法 解释说明
put 向队列中添加一个项
get 从队列中删除并返回一个项
task_done 在完成一项工作之后,该函数向任务已经完成的队列发送一个信号
join 阻塞直到所有的项目都被处理完,实际上意味着等到队列为空,再执行别的操作
  • 普通的队列
    • 我们这里借助「生产者-消费者」模型来理解队列的用法,生产者向队列中添加值,消费从队列中取值来执行。
from time import sleep
from random import random
from queue import Queue
from threading import Thread, currentThread


def consumer():
    curr = currentThread()
    while True:
        task, arg = q.get()
        print('[{}]> {}  {}'.format(curr.name, arg, task(arg)))
        q.task_done()

def producer():
    curr = currentThread()
    while True:
        rd = random()
        sleep(rd)
        q.put((lambda n: n * 2, rd))
        print('[{} ]> put value to queue.'.format(curr.name))


threads = []
q = Queue(maxsize=100)
for name in ('consumer1', 'consumer2'):
    print('{} is staring...'.format(name))
    c = Thread(name=name, target=consumer)
    c.start()
    threads.append(c)

for name in ('producer',):
    print('{} is staring...'.format(name))
    p = Thread(name=name, target=producer)
    p.start()
    threads.append(p)

for t in threads:
    t.join()
$ python threading.py
consumer1 is staring...
consumer2 is staring...
producer is staring...
[producer ]> put value to queue.
[consumer1]> 0.05485812015293734  0.10971624030587468
[producer ]> put value to queue.
[consumer2]> 0.6326654970685319  1.2653309941370638
[producer ]> put value to queue.
[consumer1]> 0.21880394859426866  0.4376078971885373
[producer ]> put value to queue.
[consumer2]> 0.41693638127336596  0.8338727625467319
[producer ]> put value to queue.
[consumer1]> 0.7139930847331822  1.4279861694663645
......
  • 优先级队列
    • 优先级队列要求我们put的消息至少有两项,其中第一项是优先级,第二项为向队列中添加的值。对于优先级而已,数字越小对应的优先级越高。
from time import sleep
from random import randint
from queue import PriorityQueue
from threading import Thread, currentThread


def double(n):
    return n * 2

def consumer():
    curr = currentThread()
    while True:
        if q.empty():
            break
        pri, task, arg = q.get()
        print(f'[{curr.name}] [PRI:{pri}] {arg} * 2 = {task(arg)}')
        q.task_done()
        sleep(0.1)

def producer():
    count = 0
    while True:
        if count > 5:
            break
        pri = randint(1, 100)
        print(f'[PUT:{pri}] to queue.')
        q.put((pri, double, pri))
        count += 1


threads = []
q = PriorityQueue(maxsize=100)
for name in ('producer',):
    print('{} is staring...'.format(name))
    p = Thread(name=name, target=producer)
    p.start()
    sleep(1)
    threads.append(p)

for name in ('consumer1', 'consumer2'):
    print('{} is staring...'.format(name))
    c = Thread(name=name, target=consumer)
    c.start()
    threads.append(c)

for t in threads:
    t.join()
$ python threading.py
producer is staring...
[PUT:39] to queue.
[PUT:29] to queue.
[PUT:17] to queue.
[PUT:92] to queue.
[PUT:16] to queue.
[PUT:89] to queue.
consumer1 is staring...
[consumer1] [PRI:16] 16 * 2 = 32
consumer2 is staring...
[consumer2] [PRI:17] 17 * 2 = 34
[consumer1] [PRI:29] 29 * 2 = 58
[consumer2] [PRI:39] 39 * 2 = 78
[consumer1] [PRI:89] 89 * 2 = 178
[consumer2] [PRI:92] 92 * 2 = 184

4.2 线程池 - ThreadPool

在面向对象编程中,创建和销毁对象都是很费时间的,因为会消耗内存等资源。如果无节制的创建和销毁线程是一个极大地浪费,所以就需要使用线程池了。

  • 线程池在官方文档中是有体现的,但是基本没有提及,查看之后我们会发现在多进程中却有线程池的用法。
In [1]: from multiprocessing.pool import ThreadPool

In [2]: pool = ThreadPool(5)

In [3]: pool.map(lambda x: x**2, range(5))
Out[3]: [0, 1, 4, 9, 16]
  • 当然我们也可以实现一个基于队列的线程池。
from time import sleep
from random import random
from queue import Queue
from threading import Thread


def double(n):
    return n * 2


class Worker(Thread):
    def __init__(self, queue):
        super().__init__()
        self._q = queue
        self.daemon = True
        self.start()

    def run(self):
        while True:
            fn, args, kwargs = self._q.get()
            try:
                print(f'>>> Use: {self.name}')
                print(fn(*args, **kwargs))
            except Exception as e:
                print(e)
            self._q.task_done()


class ThreadPool:
    def __init__(self, num_t=5):
        self._q = Queue(num_t)
        for _ in range(num_t):
            Worker(self._q)

    def add_task(self, fn, *args, **kwargs):
        self._q.put((fn, args, kwargs))

    def wait_complete(self):
        self._q.join()


pool = ThreadPool(num_t=4)
for _ in range(8):
    rd = random()
    pool.add_task(double, rd)
    sleep(rd)
pool.wait_complete()
$ python threading.py
>>> Use: Thread-1
1.539779042713962
>>> Use: Thread-2
1.872652705800131
>>> Use: Thread-3
1.9955554680033332
>>> Use: Thread-4
1.7931675109375225
>>> Use: Thread-1
0.8793273251165161
>>> Use: Thread-2
1.5254734232405138
>>> Use: Thread-3
0.5469670245064027
>>> Use: Thread-4
0.42296784387070985

文章作者: Escape
版权声明: 本博客所有文章除特別声明外,均采用 CC BY 4.0 许可协议。转载请注明来源 Escape !
  目录