Python中的原子无锁计数器


本文翻译自Julien Danjou的博客文章

Python中的原子无锁计数器


1. 单线程实现

The Straightforward Implementation

在收集日志或者打点的时候,使用计数器是非常常用的情况。常用的简单实现方式,如下所示。

class SingleThreadCounter:
    def __init__(self):
        self.value = 0

    def increment(self):
        self.value += 1
In [2]: id = SingleThreadCounter()

In [3]: id.value
Out[3]: 0

In [4]: id.increment()

In [5]: id.value
Out[5]: 1

但是这样会存在一个问题,根据名称我们知道,上述实例是针对单线程而言的。对于单线程的应用程序,这样是可以正常使用的。但是在多线程的时候,则会出现下面这样的情况。由此可知,上述的 Counter 类并不是线程安全的。

Thread-1 reads the value as 23
Thread-1 adds 1 to 23 and get 24
Thread-2 reads the value as 23
Thread-1 stores 24 in value
Thread-2 adds 1 to 23
Thread-2 stores 24 in value

2. 多线程实现

The Thread-Safe Implementation

为了使线程安全,则必须使用锁。每次我们想递增一个数值的时候,都需要判断锁的状态,因此可以保证递增是串行进行的。

import threading

class FastReadCounter:
    def __init__(self):
        self.value = 0
        self._lock = threading.Lock()

    def increment(self):
        with self._lock:
            self.value += 1

此实现是线程安全的,多个线程无法同时递增数值,因此数值不会丢失。但是,该计数器的实现有一个缺点,那就是每次数值需要递增的时候,都需要锁定该计数器。如果有许多线程同时更新计数器的数值,则此时可能会存在锁竞争。虽然优缺点,但是如果用于不经常更新查询的话,那么这个类就是线程安全计数器的绝佳实现。


3. 快速写实现

A Fast Write Implementation

有一种方法可以实现在 Python 中无需在写入时加锁的线程安全的计数器。因为需要使用全局解释器锁,所以此技巧仅在 CPython 上有效。虽然全局解释器锁 GIL 一直被人所诟病,但是这次正是因为 GIL 的存在才能够实现快速写

当我们执行 C 函数且不执行任何 I/O 操作的时候,该函数不能被任何其他线程所中断,会一直持续运行。事实证明,在 Python 中已经实现了一个类似计数器的类,那就是 itertools.count 类。我们可以使用这个 count 类,来避免在增加计数时使用锁。

如果你阅读过 itertools.count 的文档,你可能会注意到无法读取计数器的当前值。这很棘手,这是我们需要使用锁来绕过此限制。

import itertools
import threading

class FastWriteCounter:
    def __init__(self):
        self._number_of_read = 0
        self._counter = itertools.count()
        self._read_lock = threading.Lock()

    def increment(self):
        next(self._counter)

    def value(self):
        with self._read_lock:
            value = next(self._counter) - self._number_of_read
            self._number_of_read += 1
        return value
In [12]: counter = FastWriteCounter()

In [13]: counter.value()
Out[13]: 0

In [14]: counter.increment()

In [15]: counter.value()
Out[15]: 1

在这种情况下,加值操作的代码非常简单,计数器只是递增而没有任何锁定。 并发读取的时候使用 GIL 进行访问限制,因此我们不需要锁定任何内容。

另一方面,Python 没有提供任何方法来读取 itertools.count 对象的值,所以我们需要使用一个小技巧来获取当前值。值方法递增计数器,然后获取值,同时减去已读取计数器的次数。因此,此计数器对于写操作是无锁的,但对于阅读而言则是有锁的。


4. 性能对比

虽然我一直认为 timeit 这样的性能测试没有什么意义!

编写完所有的代码之后,我想确定不同的实现方式会对执行速度有多少影响,所以使用 timeit 模块在我的笔记本电脑上进行测试,以下是对该计数器的读写性能。

OPERATION SINGLETHREADCOUNTER FASTREADCOUNTER FASTWRITECOUNTER
increment 176 ns 390 ns 169 ns
value 26 ns 26 ns 529 ns

Python中的原子无锁计数器

事实证明 SingleThreadCounterFastReadCounter 类具有相同的读取性能。SingleThreadCounterFastWriteCounter 也是一样,它们具有相同的递增性能。这很明显,但是如果你使用的是单线程,则并不需要关心并发访问的问题,你也应该使用简单的递增整数。


5. 完整实现

https://github.com/jd/fastcounter/blob/master/fastcounter/__init__.py

import itertools
import threading


class Counter(object):
    """A counter that is only suitable for application without any concurrency."""

    __slots__ = (
        "value",
        "_step",
    )

    def __init__(self, init=0, step=1):
        self.value = init
        self._step = step

    def increment(self):
        self.value += self._step


class FastReadCounter(Counter):

    __slots__ = (
        "value",
        "_lock",
        "_step",
    )

    def __init__(self, init=0, step=1):
        super().__init__(init, step)
        self._lock = threading.Lock()

    def increment(self):
        with self._lock:
            self.value += self._step


class FastWriteCounter(Counter):

    __slots__ = (
        "_number_of_read",
        "_counter",
        "_lock",
        "_step",
    )

    def __init__(self, init=0, step=1):
        self._number_of_read = 0
        self._step = step
        self._counter = itertools.count(init, step)
        self._lock = threading.Lock()

    def increment(self):
        next(self._counter)

    @property
    def value(self):
        with self._lock:
            value = next(self._counter) - self._number_of_read
            self._number_of_read += self._step
        return value

文章作者: Escape
版权声明: 本博客所有文章除特別声明外,均采用 CC BY 4.0 许可协议。转载请注明来源 Escape !
  目录