Python中使用concurrent类


纸上得来终觉浅,绝知此事要躬行。

Python中使用concurrent类


在多线程或多进程编程中,不可避免的需要使用startjoin等方法,复杂的话还需要使用一到两个队列才能完成要求。如果没有一个良好的设计,随着代码量越来越多,会变得越来越复杂。而没有没有什么东西,可以将上述这些步骤抽象一下,让我们不关注这些细节轻装上阵呢?

1. 原理介绍

核心原理:concurrent.futures会以子进程multiprocessing的形式,平行的运行多个Python解释器,从而令Python程序可以利用多核CPU来提升执行速度。由于子进程与主进程的Python解释器是相对分离,且它们的全局解释器锁也是相互独立的,所以每个子进程都能够完整的使用一个CPU内核,实现真正的平行计算。

使用说明

  • Python3.2开始,这个concurrent.futures模块已经被划到标准库,所以不需要手动安装。而在Python2中,则需要自行安装和引入第三方库futures才能够使用。
# Python2需要安装
$ pip install futures

原理解释

  • 在它的源码注释的内容中,干货很多,表达也很清晰。需要多多理解这个数据流图,对于理解该模块的原理是非常重要。我们需要注意一下这里面的future的用途和作用。
  • 在传统的并发编程中,调用函数是同步的,也就是只能等待请求返回之后才能够处理其他的工作。而在future的这种模式下,调用方式改为了异步,而原先等待返回的时间段,在主调动函数里面就可以拥有处理其他事物的能力了。
  • concurrent.futures模块中,最为重要的就是ExecutorFuture这两个核心类,Executor接收一个包含带有回调及参数的异步的任务请求,返回一个Future去执行该请求任务。
  • 这个模块主要包含两个核心类,分别是多线程ThreadPoolExecutor多进程ProcessPoolExecutor。它们就是对threadingmultiprocessing进行了高级别的抽象,暴露出统一的接口,方便开发者使用。

我们结合源码和下面的数据流分析一下

图示说明

  • [步骤 1]:executor.mapexecutor.submit会创建多个_WorkItem对象和对应的任务编号Work Ids,每个对象都传入了新创建的一个Future对象。

  • [步骤 2]:然后把每个_WorkItem对象放进一个叫做「Work Items」dict中,键是不同的「Work Ids」

  • [步骤 3]:创建一个管理「Work Ids」队列的线程「Local worker thread」,它能做两件事情。

    • [事情 1]:「Work Ids」队列中获取Work Id, 通过「Work Items」找到对应的_WorkItem。如果这个Item被取消了,就从「Work Items」里面把它删掉,否则重新打包成一个_CallItem放入「Call Q」这个队列。executor的那些进程会从队列中取_CallItem执行,并把结果封装成_ResultItems放入「Result Q」队列中。
    • [事情 2]:「Result Q」队列中获取_ResultItems,然后从「Work Items」更新对应的Future对象并删掉入口。
  • [总结]:看起来就是一个「生产者/消费者」模型,不过要注意,整个过程并不是多个进程与任务+结果-两个队列直接通信的,而是通过一个中间的「Local worker thread」完成的。

  • [总结]:设想一下,当某一段程序提交了一个请求,期望得到一个答复。但服务程序对这个请求可能很慢,在传统的单线程环境下,调用函数是同步的,也就是说它必须等到服务程序返回结果后,才能进行其他处理。而在Future模式下,调用方式改为异步,而原先等待返回的时间段,在主调用函数中,则可用于处理其他事物。

# ProcessPoolExecutor data-flow through the system

|======================= In-process =====================|== Out-of-process ==|

+----------+     +----------+       +--------+     +-----------+    +---------+
|          |  => | Work Ids |    => |        |  => | Call Q    | => |         |
|          |     +----------+       |        |     +-----------+    |         |
|          |     | ...      |       |        |     | ...       |    |         |
|          |     | 6        |       |        |     | 5, call() |    |         |
|          |     | 7        |       |        |     | ...       |    |         |
| Process  |     | ...      |       | Local  |     +-----------+    | Process |
|  Pool    |     +----------+       | Worker |                      |  #1..n  |
| Executor |                        | Thread |                      |         |
|          |     +----------- +     |        |     +-----------+    |         |
|          | <=> | Work Items | <=> |        | <=  | Result Q  | <= |         |
|          |     +------------+     |        |     +-----------+    |         |
|          |     | 6: call()  |     |        |     | ...       |    |         |
|          |     |    future  |     |        |     | 4, result |    |         |
|          |     | ...        |     |        |     | 3, except |    |         |
+----------+     +------------+     +--------+     +-----------+    +---------+

2. 性能阐述

IO 密集型任务使用多线程,计算密集型任务使用多进程

普通循环计算

  • 这里使用了解释器自带的map方法,比普通的循环性能更优且更优雅。
from time import time

NUMBERS = range(30, 36)


def fib(n):
    if n <= 2:
        return 1
    return fib(n-1) + fib(n-2)


time_start = time()
print(list(map(fib, NUMBERS)))
time_end = time()
print(f'COST: {time_end - time_start}')
$ python futures.py
[832040, 1346269, 2178309, 3524578, 5702887, 9227465]
COST: 6.766396999359131

使用多线程计算

  • 这是一个计算密集型函数,因为GIL的原因,多线程是无法提升效率的。同时,线程启动的时候,有一定的开销,与线程池进行通信,也会有开销,所以这个程序使用了多线程反而更慢了。
from time import time
from concurrent.futures import ThreadPoolExecutor

NUMBERS = range(30, 36)


def fib(n):
    if n <= 2:
        return 1
    else:
        return fib(n-1) + fib(n-2)


time_start = time()
executor = ThreadPoolExecutor(max_workers=2)
print(list(executor.map(fib, NUMBERS)))
time_end = time()
print(f'COST: {time_end - time_start}')
$ python futures.py
[832040, 1346269, 2178309, 3524578, 5702887, 9227465]
COST: 6.824882984161377

使用多进程计算

  • 在多核的操作系统上,运行多进程程序,比其他两个版本都快,而且快很多。这是因为ProcessPoolExecutor底层就是利用multiprocessing模块所提供的机制实现的。
  • 需要我们注意的是,主进程和子进程之间通信必须进行序列化和反序列化的操作,而且数据量较大的时候内存消耗也比较严重,所以multiprocessing开销相对来说是比较大。

具体的执行步骤

  • [1] 将NUMBERS列表中的每一项传递给多进程实例executormap方法
  • [2] 用pickle模块对数据进行序列化,将其变成二进制形式
  • [3] 通过本地套接字,将序列化之后的数据从解释器所在的进程,发送到子解释器所在的进程
  • [4] 在子进程中,用pickle对二进制数据进行反序列化,将其还原成Python对象
  • [5] 引入包含fib函数的Python模块
  • [6] 各子进程并行的对各自的输入数据进行计算
  • [7] 各子进程对运行的结果进行序列化操作,将其转变成字节
  • [8] 各子进程将这些字节通过socket复制到主进程之中
  • [9] 主进程对这些字节执行反序列化操作,将其还原成Python对象
  • [10] 最后把每个子进程所求出的计算结果合并到一份列表之中,并返回给调用者
from time import time
from concurrent.futures import ProcessPoolExecutor

NUMBERS = range(30, 36)


def fib(n):
    if n <= 2:
        return 1
    return fib(n-1) + fib(n-2)


time_start = time()
with ProcessPoolExecutor(max_workers=3) as executor:
    for num, result in zip(NUMBERS, executor.map(fib, NUMBERS)):
        print(f'fib({num}) = {result}')
time_end = time()
print(f'COST: {time_end - time_start}')
$ python futures.py
fib(30) = 832040
fib(31) = 1346269
fib(32) = 2178309
fib(33) = 3524578
fib(34) = 5702887
fib(35) = 9227465
COST: 5.294888973236084

3. 相关接口

虽然concurrent中只有futures这一个模块,但是功能还是很强大的。

接口说明

  • 主要的两个核心类ThreadPoolExecutorProcessPoolExecutor都是继承自Executor类,分别被用来创建线程池和进程池。
  • 值得一提的是,Executor实现了__enter____exit__方法,使得其对象可以使用with操作符进行上下文管理,很是方便呀。

Executor是一个抽象类,它提供了方法来执行异步调用。

Executor Objects

  • map(func, *iterables, timeout=None, chunksize=1)
    • 返回并发计算的结果,顺序和*iterables迭代器的顺序是一致。
    • 这里我们使用with操作符,使得当任务执行完成之后自动执行shutdown函数,而无需编写相关释放代码。
  • submit(fn, *args, **kwargs)
    • 用于提交一个可并行的方法,同时返回一个future实例。
    • future对象标识这个线程/进程异步进行,并在未来的某个时间执行完成。
  • shutdown(wait=True)
    • 类似与进程池/进程池中的closejoin一起的效果
  • __enter__()/__exit__(exc_type, exc_val, exc_tb)
    • 可以使用with操作符进行上下文管理。
# 【1】map

from time import time
from concurrent.futures import ProcessPoolExecutor

NUMBERS = range(30, 36)

def fib(n):
    if n <= 2:
        return 1
    return fib(n-1) + fib(n-2)

time_start = time()
with ProcessPoolExecutor(max_workers=2) as executor:
    results = list(executor.map(fib, NUMBERS))
print(f'results: {results}')
time_end = time()
print(f'COST: {time_end - time_start}')
# 【2】submit

from time import time
from concurrent.futures import ProcessPoolExecutor

futures = []
NUMBERS = range(30, 36)

def fib(n):
    if n <= 2:
        return 1
    return fib(n-1) + fib(n-2)

time_start = time()
with ProcessPoolExecutor(max_workers=2) as executor:
    for num in NUMBERS:
        future = executor.submit(fib, num)
        futures.append(future)
print(f'Results: {future.result() for future in futures}')
time_end = time()
print(f'COST: {time_end - time_start}')

Future类封装了异步执行操作,执行Executor.submit返回一个Future对象。

Future Objects

  • cancel()
    • 判断任务是否已经取消
  • cancelled()
    • 判断任务是否可以取消
  • running()
    • 判断任务是否执行中
  • done()
    • 判断任务是否执行完成
  • result(timeout=None)
    • 返回执行的结果
  • exception(timeout=None)
    • 等待多长时间之后自动抛出异常信息
  • add_done_callback(fn)
    • 默认接收一个future对象,可以在定义的函数中对其进行操作
#【1】exception
from concurrent.futures import ProcessPoolExecutor

NUMBERS = range(30, 36)


def fib(n):
    if n <= 2:
        return 1
    return fib(n-1) + fib(n-2)


executor = ProcessPoolExecutor(max_workers=3)
for num in NUMBERS:
    executor.submit(fib, num).exception(timeout=10)
executor.shutdown(wait=True)
#【2】add_done_callback

from concurrent.futures import ProcessPoolExecutor

NUMBERS = range(30, 36)

def fib(n):
    if n <= 2:
        return 1
    return fib(n-1) + fib(n-2)

def get_result(future):
    print(f'>>> {future.result()}')

executor = ProcessPoolExecutor(max_workers=3)
for num in NUMBERS:
    executor.submit(fib, num).add_done_callback(get_result)
executor.shutdown(wait=True)

concurrent.futures提供的常用方法。

模块方法

  • ThreadPoolExecutor(max_size)
    • 多线程
  • ProcessPoolExecutor(max_size)
    • 多进程
  • wait(fs, timeout=None, return_when=ALL_COMPLETED)
    • 使用wait方法会返回一个元组,其中包含两个集合,一个是completed(已完成的)另外一个是uncompleted(未完成的)。
    • 使用wait方法的另一个优势就是能够获得更大的自由度,它接收三个参数FIRST_COMPLETED(表示其会等待直到第一个任务执行完成并返回当时所有执行成功的任务), FIRST_EXCEPTION(表示其会等待直到第一个任务执行报错并返回当时所有执行成功的任务)和ALL_COMPLETE(表示其会等待所有任务执行完成并返回当时所有执行成功的任务),默认设置为ALL_COMPLETED
  • as_completed(fs, timeout=None)
    • 该方法传入futures迭代器和timeout两个参数
    • 默认timeout参数的值为None,会阻塞等待任务执行完成,之后返回执行完成的future对象迭代器。这里的迭代器,是通过yield实现的。
    • timeout参数的值大于0,则等待timeout时间,如果timeout设置的时间到了但仍有任务未能完成,不再执行并抛出TimeoutError异常。
#【1】as_completed

from time import time
from concurrent.futures import ProcessPoolExecutor, as_completed

NUMBERS = range(30, 36)

def fib(n):
    if n <= 2:
        return 1
    return fib(n-1) + fib(n-2)

time_start = time()
with ProcessPoolExecutor(max_workers=3) as executor:
    futures = [executor.submit(fib, num) for num in NUMBERS]
    for future in futures:
        print(f'执行中:{future.running()}, 已完成:{future.done()}')
    print(f'#### 分界线 ####')
    for future in as_completed(futures, timeout=2):
        print(f'执行中:{future.running()}, 已完成:{future.done()}')
time_end = time()
print(f'COST: {time_end - time_start}')
$ python futures.py
执行中:True, 已完成:False
执行中:True, 已完成:False
执行中:True, 已完成:False
执行中:True, 已完成:False
执行中:True, 已完成:False
执行中:True, 已完成:False
#### 分界线 ####
执行中:False, 已完成:True
执行中:False, 已完成:True
执行中:False, 已完成:True
----------------------------------------------------------------
TimeoutError                   Traceback (most recent call last)
~/Escape/MorePractise/test/func.py in <module>()
     16         print(f'执行中:{future.running()}, 已完成:{future.done()}')
     17     print(f'#### 分界线 ####')
---> 18     for future in as_completed(futures, timeout=2):
     19         print(f'执行中:{future.running()}, 已完成:{future.done()}')
     20 time_end = time()

~/.pyenv/versions/3.6.4/lib/python3.6/concurrent/futures/_base.py in as_completed(fs, timeout)
    236                     raise TimeoutError(
    237                             '%d (of %d) futures unfinished' % (
--> 238                             len(pending), total_futures))
    239
    240             waiter.event.wait(wait_timeout)

TimeoutError: 3 (of 6) futures unfinished
#【2】wait

from time import time
from concurrent.futures import ProcessPoolExecutor, wait, ALL_COMPLETED

NUMBERS = range(30, 36)


def fib(n):
    if n <= 2:
        return 1
    return fib(n-1) + fib(n-2)


time_start = time()
with ProcessPoolExecutor(max_workers=3) as executor:
    futures = [executor.submit(fib, num) for num in NUMBERS]
    print('>>> Start process...')
    for future in futures:
        print(f'执行中:{future.running()}, 已完成:{future.done()}')
    print('###### 分界线 ######')

    done, unfinished = wait(futures, timeout=2, return_when=ALL_COMPLETED)
    print('>>> done process...')
    for task in done:
        print(f'执行中:{task.running()}, 已完成:{task.done()}')
    print('>>> unfinished process...')
    for task in unfinished:
        print(f'执行中:{task.running()}, 已完成:{task.done()}')
time_end = time()
print(f'COST: {time_end - time_start}')
$ python futures.py
>>> Start process...
执行中:True, 已完成:False
执行中:True, 已完成:False
执行中:True, 已完成:False
执行中:True, 已完成:False
执行中:True, 已完成:False
执行中:False, 已完成:False
###### 分界线 ######
>>> done process...
执行中:False, 已完成:True
执行中:False, 已完成:True
执行中:False, 已完成:True
>>> unfinished process...
执行中:True, 已完成:False
执行中:True, 已完成:False
执行中:True, 已完成:False
COST: 4.277174949645996

常见的异常类

  • 可以编程中主动抛出如下类型的异常信息
exception concurrent.futures.CancelledError
    Raised when a future is cancelled.

exception concurrent.futures.TimeoutError
    Raised when a future operation exceeds the given timeout.

exception concurrent.futures.BrokenExecutor
    Derived from RuntimeError, this exception class is raised when an executor is broken for some reason, and cannot be used to submit or execute new tasks.
    New in version 3.7.

exception concurrent.futures.thread.BrokenThreadPool
    Derived from BrokenExecutor, this exception class is raised when one of the workers of a ThreadPoolExecutor has failed initializing.
    New in version 3.7.

exception concurrent.futures.process.BrokenProcessPool
    Derived from BrokenExecutor (formerly RuntimeError), this exception class is raised when one of the workers of a ProcessPoolExecutor has terminated in a non-clean fashion (for example, if it was killed from the outside).
    New in version 3.3.

4. 实际使用

如果需要并发的执行一个任务,是选择map呢?还是submit呢?

[1] map 和 submit 的选择

  • 如果我们需要提交的函数是一样的,就可以使用map进行处理。但是如果我们提交的函数是不一样或执行过程中可能存在异常的情况下,就需要使用到另一个submit进行处理。
  • 因为使用map在执行过程中,如果出现异常就会直接抛出错误,后续步骤不会再执行了。但是submit可以分开进行处理,我们可以使用as_completed来检测其是否执行成功了。如果其执行成功会返回对应的值,如果没有成功的话,会抛出我们自己的定义的异常信息。
  • 我们都知道future_to_num是定义好的一个字典,包含对应的执行函数的执行结果以及对应的输入值。而as_completed就是遍历这个函数执行结果的字典,一旦发现结果报错,就会自动执行我们事先自定义的输出信息。
from concurrent.futures import ThreadPoolExecutor, as_completed

NUMBERS = range(10, 20)


def fib(n):
    if n == 17:
        raise Exception("Don't do this")
    if n<= 2:
        return 1
    return fib(n-1) + fib(n-2)


with ThreadPoolExecutor(max_workers=3) as executor:
    future_to_num = {executor.submit(fib, num): num for num in NUMBERS}
    for future in as_completed(future_to_num):
        num = future_to_num[future]
        try:
            result = future.result()
        except Exception as e:
            print(f'raise an exception: {e}')
        else:
            print(f'fib({num}) = {result}')


with ThreadPoolExecutor(max_workers=3) as executor:
    for num, result in zip(NUMBERS, executor.map(fib, NUMBERS)):
        print(f'fib({num}) = {result}')
$ python futures.py
fib(10) = 55
fib(11) = 89
fib(12) = 144
fib(13) = 233
fib(14) = 377
fib(15) = 610
fib(16) = 987
Don not do this.
Don not do this.
Don not do this.

fib(10) = 55
fib(11) = 89
fib(12) = 144
fib(13) = 233
fib(14) = 377
fib(15) = 610
fib(16) = 987
fib(17) = 1597
fib(18) = 2584
fib(19) = 4181

multiprocessing中的Pool还是concurrent.futures中的PoolExecutor

[2] 线程/进程池的选择

  • Future是很常用的一种并发设计的模式,在其他语言中也可以看到这种解决方案。一个Future对象代表了一些尚未就绪(完成)的结果,在「将来」的某个时间就绪了之后就可以获取到这个结果。比如上面的例子,我们期望并发的执行一些参数不同的fib函数,获取全部的结果。传统模式就是在等待queue.get返回结果,这个是同步模式,而在Future模式下,调用方式改为异步。而原先等待返回的时间段,由于「Local worker thread」的存在,这个时候可以完成其他工作。

  • 上面说到的map很像进程池或线程池的效果,但是我们发现在使用多进程时时间反而变短了。这是因为concurrent.futures底层调用的还是threaingmultiprocessing这两个模块,相对于在其上封装了一份,隐藏内部细节,方便开发者使用。

  • 如何选择还是看具体需求和开发习惯了,我比较喜欢用concurrent.futures的。因为PoolExecutor由于用了future这种设计模式,一旦完成就会吐出答案,是一行一行的输出,而multiprocessing.pool是把全部结果都算完了一起返回结果。在体验上来说,第一种方式更好。

  • concurrent.futures的架构明显要复杂一些,不过更利于写出高效、异步、非阻塞的并行代码,而ThreadPeool/Pool更像一个黑盒,你用就好了,细节不仅屏蔽定制性也差。

  • concurrent.futures的接口更简单一些。ThreadPool/PoolAPI中有processesinitializerinitargsmaxtasksperchildcontext等参数,新人看起来容易不解。而concurrent.futures的参数就一个max_workers而已。

import time
from multiprocessing.pool import Pool

NUMBERS = range(10, 20)


def fib(n):
    if n <= 2:
        return 1
    return fib(n-1) + fib(n-2)


start = time.time()

pool = Pool(3)

for num, result in zip(NUMBERS, pool.map(fib, NUMBERS)):
    print(f'fib({num}) = {result}')
print(f'COST: {time.time() - start}')
$ python futures.py
fib(10) = 55
fib(11) = 89
fib(12) = 144
fib(13) = 233
fib(14) = 377
fib(15) = 610
fib(16) = 987
fib(17) = 1597
fib(18) = 2584
fib(19) = 4181
COST: 0.0200132116133264350

[3] 使用注意事项

  • 当「Python版本小于3.5」并且「待处理的任务量比较大时」不应该使用concurrent.futures这种方案。阅读作者的博客,解释说是multiprocessing.pool是批量提交任务的,这样可以节省IPC(进程间通信)的开销。而PoolExecutor则是每一只提交一个任务,所以导致性能差距很大。
  • Python3.5的时候给修复了,可以通过给map方法传递一个chunksize的参数来解决。所以当我们写代码的时候需要处理大量任务的时候,就只需要给一个比较大的chunksize参数值即可。
multiprocessing.Pool.map outperforms ProcessPoolExecutor.map. Note that the
performance difference is very small per work item, so you'll probably only
notice a large performance difference if you're using map on a very large
iterable. The reason for the performance difference is that multiprocessing.Pool
will batch the iterable passed to map into chunks, and then pass the chunks
to the worker processes, which reduces the overhead of IPC between the parent
and children. ProcessPoolExecutor always passes one item from the iterable
at a time to the children, which can lead to much slower performance with
large iterables, due to the increased IPC overhead. The good news is this
issue will be fixed in Python 3.5, as as chunksize keyword argument has been
added to ProcessPoolExecutor.map, which can be used to specify a larger chunk
size if you know you're dealing with large iterables. See this bug for more info.
import time
from multiprocessing.pool import Pool
from concurrent.futures import ProcessPoolExecutor

K = 50
NUMBERS = range(1, 100000)


def f(x):
    r = 0
    for k in range(1, K+2):
        r += x ** (1 / k**1.5)
    return r


print('multiprocessing.pool.Pool:\n')
start = time.time()
l = []
pool = Pool(3)
for num, result in zip(NUMBERS, pool.map(f, NUMBERS)):
    l.append(result)
print(len(l))
print('COST: {}'.format(time.time() - start))


print('ProcessPoolExecutor without chunksize:\n')
start = time.time()
l = []
with ProcessPoolExecutor(max_workers=3) as executor:
    for num, result in zip(NUMBERS, executor.map(f, NUMBERS)):
        l.append(result)

print(len(l))
print('COST: {}'.format(time.time() - start))


print('ProcessPoolExecutor with chunksize:\n')
start = time.time()
l = []
with ProcessPoolExecutor(max_workers=3) as executor:
    # 保持和multiprocessing.pool的默认chunksize一样
    chunksize, extra = divmod(len(NUMBERS), executor._max_workers * 4)
    for num, result in zip(NUMBERS, executor.map(f, NUMBERS, chunksize=chunksize)):
        l.append(result)

print(len(l))
print('COST: {}'.format(time.time() - start))

5. 应用场景

  • 并发计算
from concurrent import futures
import math

PRIMES = [
    112272535095293,
    112582705942171,
    112272535095293,
    115280095190773,
    115797848077099,
    1099726899285419]

def is_prime(n):
    if n % 2 == 0:
        return False

    sqrt_n = int(math.floor(math.sqrt(n)))
    for i in range(3, sqrt_n + 1, 2):
        if n % i == 0:
            return False
    return True

def main():
    with futures.ProcessPoolExecutor() as executor:
        for number, prime in zip(PRIMES, executor.map(is_prime, PRIMES)):
            print('%d is prime: %s' % (number, prime))

if __name__ == '__main__':
    main()
  • 网络爬虫
from concurrent import futures
import urllib.request

URLS = ['http://www.foxnews.com/',
        'http://www.cnn.com/',
        'http://europe.wsj.com/',
        'http://www.bbc.co.uk/',
        'http://some-made-up-domain.com/']

def load_url(url, timeout):
    return urllib.request.urlopen(url, timeout=timeout).read()

def main():
    with futures.ThreadPoolExecutor(max_workers=5) as executor:
        future_to_url = dict(
            (executor.submit(load_url, url, 60), url)
             for url in URLS)

        for future in futures.as_completed(future_to_url):
            url = future_to_url[future]
            try:
                print('%r page is %d bytes' % ( url, len(future.result())))
            except Exception as e:
                print('%r generated an exception: %s' % ( url, e))

if __name__ == '__main__':
    main()

6. 源码展示

concurrent.futures模块中,最为重要的就是ExecutorFuture这两个核心类,Executor接收一个包含带有回调及参数的异步的任务请求,返回一个Future去执行该请求任务。

  • Executor
    • Executor是一个抽象类,它提供了方法来执行异步调用
class Executor(object):
    """This is an abstract base class for concrete asynchronous executors."""

    def submit(self, fn, *args, **kwargs):
        """Submits a callable to be executed with the given arguments.
        Schedules the callable to be executed as fn(*args, **kwargs) and returns
        a Future instance representing the execution of the callable.
        Returns:
            A Future representing the given call.
        """
        raise NotImplementedError()

    def map(self, fn, *iterables, timeout=None, chunksize=1):
        """Returns an iterator equivalent to map(fn, iter).
        Args:
            fn: A callable that will take as many arguments as there are
                passed iterables.
            timeout: The maximum number of seconds to wait. If None, then there
                is no limit on the wait time.
            chunksize: The size of the chunks the iterable will be broken into
                before being passed to a child process. This argument is only
                used by ProcessPoolExecutor; it is ignored by
                ThreadPoolExecutor.
        Returns:
            An iterator equivalent to: map(func, *iterables) but the calls may
            be evaluated out-of-order.
        Raises:
            TimeoutError: If the entire result iterator could not be generated
                before the given timeout.
            Exception: If fn(*args) raises for any values.
        """
        if timeout is not None:
            end_time = timeout + time.time()

        fs = [self.submit(fn, *args) for args in zip(*iterables)]

        # Yield must be hidden in closure so that the futures are submitted
        # before the first iterator value is required.
        def result_iterator():
            try:
                # reverse to keep finishing order
                fs.reverse()
                while fs:
                    # Careful not to keep a reference to the popped future
                    if timeout is None:
                        yield fs.pop().result()
                    else:
                        yield fs.pop().result(end_time - time.time())
            finally:
                for future in fs:
                    future.cancel()
        return result_iterator()

    def shutdown(self, wait=True):
        """Clean-up the resources associated with the Executor.
        It is safe to call this method several times. Otherwise, no other
        methods can be called after this one.
        Args:
            wait: If True then shutdown will not return until all running
                futures have finished executing and the resources used by the
                executor have been reclaimed.
        """
        pass

    def __enter__(self):
        return self

    def __exit__(self, exc_type, exc_val, exc_tb):
        self.shutdown(wait=True)
        return False
  • Future
    • Future类封装了可调用的异步执行,可以通过Executor.submit返回一个Future实例
class Future(object):
    """Represents the result of an asynchronous computation."""

    def __init__(self):
        """Initializes the future. Should not be called by clients."""
        self._condition = threading.Condition()
        self._state = PENDING
        self._result = None
        self._exception = None
        self._waiters = []
        self._done_callbacks = []

    def _invoke_callbacks(self):
        for callback in self._done_callbacks:
            try:
                callback(self)
            except Exception:
                LOGGER.exception('exception calling callback for %r', self)

    def __repr__(self):
        with self._condition:
            if self._state == FINISHED:
                if self._exception:
                    return '<%s at %#x state=%s raised %s>' % (
                        self.__class__.__name__,
                        id(self),
                        _STATE_TO_DESCRIPTION_MAP[self._state],
                        self._exception.__class__.__name__)
                else:
                    return '<%s at %#x state=%s returned %s>' % (
                        self.__class__.__name__,
                        id(self),
                        _STATE_TO_DESCRIPTION_MAP[self._state],
                        self._result.__class__.__name__)
            return '<%s at %#x state=%s>' % (
                    self.__class__.__name__,
                    id(self),
                   _STATE_TO_DESCRIPTION_MAP[self._state])

    def cancel(self):
        """Cancel the future if possible.
        Returns True if the future was cancelled, False otherwise. A future
        cannot be cancelled if it is running or has already completed.
        """
        with self._condition:
            if self._state in [RUNNING, FINISHED]:
                return False

            if self._state in [CANCELLED, CANCELLED_AND_NOTIFIED]:
                return True

            self._state = CANCELLED
            self._condition.notify_all()

        self._invoke_callbacks()
        return True

    def cancelled(self):
        """Return True if the future was cancelled."""
        with self._condition:
            return self._state in [CANCELLED, CANCELLED_AND_NOTIFIED]

    def running(self):
        """Return True if the future is currently executing."""
        with self._condition:
            return self._state == RUNNING

    def done(self):
        """Return True of the future was cancelled or finished executing."""
        with self._condition:
            return self._state in [CANCELLED, CANCELLED_AND_NOTIFIED, FINISHED]

    def __get_result(self):
        if self._exception:
            raise self._exception
        else:
            return self._result

    def add_done_callback(self, fn):
        """Attaches a callable that will be called when the future finishes.
        Args:
            fn: A callable that will be called with this future as its only
                argument when the future completes or is cancelled. The callable
                will always be called by a thread in the same process in which
                it was added. If the future has already completed or been
                cancelled then the callable will be called immediately. These
                callables are called in the order that they were added.
        """
        with self._condition:
            if self._state not in [CANCELLED, CANCELLED_AND_NOTIFIED, FINISHED]:
                self._done_callbacks.append(fn)
                return
        fn(self)

    def result(self, timeout=None):
        """Return the result of the call that the future represents.
        Args:
            timeout: The number of seconds to wait for the result if the future
                isn't done. If None, then there is no limit on the wait time.
        Returns:
            The result of the call that the future represents.
        Raises:
            CancelledError: If the future was cancelled.
            TimeoutError: If the future didn't finish executing before the given
                timeout.
            Exception: If the call raised then that exception will be raised.
        """
        with self._condition:
            if self._state in [CANCELLED, CANCELLED_AND_NOTIFIED]:
                raise CancelledError()
            elif self._state == FINISHED:
                return self.__get_result()

            self._condition.wait(timeout)

            if self._state in [CANCELLED, CANCELLED_AND_NOTIFIED]:
                raise CancelledError()
            elif self._state == FINISHED:
                return self.__get_result()
            else:
                raise TimeoutError()

    def exception(self, timeout=None):
        """Return the exception raised by the call that the future represents.
        Args:
            timeout: The number of seconds to wait for the exception if the
                future isn't done. If None, then there is no limit on the wait
                time.
        Returns:
            The exception raised by the call that the future represents or None
            if the call completed without raising.
        Raises:
            CancelledError: If the future was cancelled.
            TimeoutError: If the future didn't finish executing before the given
                timeout.
        """

        with self._condition:
            if self._state in [CANCELLED, CANCELLED_AND_NOTIFIED]:
                raise CancelledError()
            elif self._state == FINISHED:
                return self._exception

            self._condition.wait(timeout)

            if self._state in [CANCELLED, CANCELLED_AND_NOTIFIED]:
                raise CancelledError()
            elif self._state == FINISHED:
                return self._exception
            else:
                raise TimeoutError()

    # The following methods should only be used by Executors and in tests.
    def set_running_or_notify_cancel(self):
        """Mark the future as running or process any cancel notifications.
        Should only be used by Executor implementations and unit tests.
        If the future has been cancelled (cancel() was called and returned
        True) then any threads waiting on the future completing (though calls
        to as_completed() or wait()) are notified and False is returned.
        If the future was not cancelled then it is put in the running state
        (future calls to running() will return True) and True is returned.
        This method should be called by Executor implementations before
        executing the work associated with this future. If this method returns
        False then the work should not be executed.
        Returns:
            False if the Future was cancelled, True otherwise.
        Raises:
            RuntimeError: if this method was already called or if set_result()
                or set_exception() was called.
        """
        with self._condition:
            if self._state == CANCELLED:
                self._state = CANCELLED_AND_NOTIFIED
                for waiter in self._waiters:
                    waiter.add_cancelled(self)
                # self._condition.notify_all() is not necessary because
                # self.cancel() triggers a notification.
                return False
            elif self._state == PENDING:
                self._state = RUNNING
                return True
            else:
                LOGGER.critical('Future %s in unexpected state: %s',
                                id(self),
                                self._state)
                raise RuntimeError('Future in unexpected state')

    def set_result(self, result):
        """Sets the return value of work associated with the future.
        Should only be used by Executor implementations and unit tests.
        """
        with self._condition:
            self._result = result
            self._state = FINISHED
            for waiter in self._waiters:
                waiter.add_result(self)
            self._condition.notify_all()
        self._invoke_callbacks()

    def set_exception(self, exception):
        """Sets the result of the future as being the given exception.
        Should only be used by Executor implementations and unit tests.
        """
        with self._condition:
            self._exception = exception
            self._state = FINISHED
            for waiter in self._waiters:
                waiter.add_exception(self)
            self._condition.notify_all()
        self._invoke_callbacks()

文章作者: Escape
版权声明: 本博客所有文章除特別声明外,均采用 CC BY 4.0 许可协议。转载请注明来源 Escape !
  目录