Python网络编程进阶


纸上得来终觉浅,绝知此事要躬行。

Python网络编程进阶


使用socket进行TCPUDP编写的程序,其实只能说是一种玩具。因为服务端一次只能接收一个客户端的连接,再多的客户端连接会被阻塞掉。深层原因是,代码中调用了acceptrecvsend等方法,都会发生阻塞,导致无法响应多个客户端连接。而现实环境中大都是多客户端甚至多服务端,那怎么让服务端同时响应多个客户端的请求呢?

1. 编程基础知识

真正的程序 = I/O 多路复用 + setblocking 非阻塞模式

能够实现的基础原因

  • 操作系统提供了一个功能,当你的某个socket可读或者可写的时候,它可以给你一个通知。这样当配合非阻塞的socket使用时,只有当系统通知我哪个描述符可读了,才去执行read操作,可以保证每次read都能读到有效数据而不做纯返回-1之类无用功。写操作也类似。
  • 操作系统的这个功能通过支持I/O多路复用的系统调用来使用,这些系统调用的函数都可以同时监视多个文件描述符的读写就绪状况,这样,多个文件描述符的I/O操作都能在一个线程内并发交替地顺序完成,这就叫I/O多路复用,这里的复用指的是复用同一个线程。

I/O 多路复用适用场景

  • [1] 当客户需要处理多个描述符时,一般是交互式输入和网络套接字,那就必须使用I/O多路复用技术。
  • [2] 当一个客户同时处理多个套接字时,而这种情况是可能的,一般就要使用I/O多路复用技术,虽然很少出现。
  • [3] 如果一个 TCP 服务器既要处理监听套接字,又要处理已连接套接字,一般就要使用I/O多路复用技术。
  • [4] 如果一个服务器即要处理 TCP,又要处理 UDP,一般就要使用I/O多路复用技术。
  • [5] 如果一个服务器要处理多个服务或多个协议,一般就要使用I/O多路复用技术。

I/O 多路复用核心优势

  • I/O多路复用和多进程/多线程技术相比,其最大的优势就是系统开销比较小。因为系统不必创建进程和线程,也不必维护这些进程和线程,通过复用机制将一个进程或线程多次重复使用,从而大大减小了额外的开销。

支持 I/O 多路复用的系统调用

  • select
    • select支持的文件描述符数量太小了,默认为1024,但可以自行调整,超过1024可能导致性能下降。
    • 每次调用select都需要把fd(文件句柄)集合从用户态拷贝到内核态,同时每次调用select都需要在内核遍历传递进来的所有fd,这个过程在fd很多的时候性能消耗很大。
  • poll
    • 只是解决了文件描述符数量的限制
  • epoll
    • Linux系统特有的支持I/O多路复用的系统调用
    • 支持一个进程打开大数量的 socket 描述符
    • I/O效率不随fd数量的增长而线性下降
    • 使用mmap加速内核与用户空间的消息传递
    • 支持边缘触发和水平触发
  • kqueue
    • FreeBSD系统特有的支持I/O多路复用的系统调用

2. 实现 I/O 多路复用

select + poll + epoll

[0] 多客户端

  • 为了模仿多客户端的连接方式,通过socks创建了两个套接字表示通过两个套接字去连接服务器。然后从消息列表messages中获取消息,根据消息的位置从不同的客户端发送消息给服务器。
  • 默认的后面的几种实现 I/O 多路复用的客户端程序,都是使用相同的代码,且输出结果也是一致的。
import socket

HOST = '127.0.0.1'
PORT = 8001

messages = [
    'This is ',
    'the message. ',
    'It will be sent ',
    'in parts.',
]

socks = [
    socket.socket(socket.AF_INET, socket.SOCK_STREAM),
    socket.socket(socket.AF_INET, socket.SOCK_STREAM),
]

print(f'connecting to {HOST} port {PORT}')

for s in socks:
    s.connect((HOST, PORT))

for index, message in enumerate(messages):
    _, is_odd = divmod(index, 2)
    outgoing_data = message.encode()

    for index, s in enumerate(socks):
        if divmod(index, 2)[1] != is_odd:
            continue
        print(f'{s.getsockname()}: sending {outgoing_data}')
        s.send(outgoing_data)

    for index, s in enumerate(socks):
        if divmod(index, 2)[1] != is_odd:
            continue
        data = s.recv(1024)
        print(f'{s.getsockname()}: received {data}')
        if not data:
            s.close()
# 启动服务端
$ python select_server.py
Server startat: 127.0.0.1:8001
Connected by ('127.0.0.1', 50442)
Connected by ('127.0.0.1', 50443)
received "b'This is '"from ('127.0.0.1', 50442)
received "b'the message. '"from ('127.0.0.1', 50443)
received "b'It will be sent '"from ('127.0.0.1', 50442)
received "b'in parts.'"from ('127.0.0.1', 50443)
# 启动客户端
$ python client.py
connecting to 127.0.0.1 port 8001
('127.0.0.1', 50442): sending b'This is '
('127.0.0.1', 50442): received b"Server received b'This is '"
('127.0.0.1', 50443): sending b'the message. '
('127.0.0.1', 50443): received b"Server received b'the message. '"
('127.0.0.1', 50442): sending b'It will be sent '
('127.0.0.1', 50442): received b"Server received b'It will be sent '"
('127.0.0.1', 50443): sending b'in parts.'
('127.0.0.1', 50443): received b"Server received b'in parts.'"

[1] select

  • 代码逻辑
    • 这个示例代码比我们之前学习的程序要复杂很多,其中使用队列是为了存储不用连接下对应的消息。在可写之后,能够将对应的消息发送到对应的客户端。
    • select方法接收的前三个参数的值都是列表,会从第一个参数列表里面的对象读取数据,向第二个参数列表里面的对象发出数据,而第三个参数列表中包含了可能有错误的对象。
    • select方法也返回三个参数,包含了对应满足条件的对象。分别是可读对象,也就是可以接收内容;可写对象,也就是可以发送消息给客户端;第三个是异常对象。
    • 第一个循环是遍历可读对象列表,可分为两个部分。第一部分是,记录一个新的连接并将其放到inputs列表中,然后else里面接收数据存放到对应连接的队列中。第二部分是,如果可读的套接字data里面没有可读取的数据,说明客户端已经断开了。则会执行else里面的代码,将inputs和队列message_queues里面移除,关闭对应套接字。
    • 第二个循环是遍历可写对象列表,如果有准备好了可写入的套接字,就从对应连接的队列message_queues里面读取收到的消息,然后发送会给客户端。
    • 第三个循环是遍历异常对象列表,既然对应的套接字有问题,那么就不客气的将其从将inputs和队列message_queues里面移除,并关闭对应套接字。
  • 模式缺点
    • 单个进程所打开的文件描述符数量是有一定限制的,通常默认值是1024,这对于高并发的网络服务来说太小了。
    • socket进行扫描时是线性扫描,即采用轮询的方法,效率较低。看代码可知,套接字数量多时浪费很多CPU时间。
    • 需要维护一个用来存放大量的数据结构,这样会使得用户空间和内核空间在传递该结构时复制开销大。
import socket
import select
from queue import Queue, Empty

HOST = '127.0.0.1'
PORT = 8001

server = socket.socket(socket.AF_INET, socket.SOCK_STREAM)  # 创建套接字
server.setblocking(0) # 设置为非阻塞
server.bind((HOST, PORT))  # 绑定套接字到本地IP与端口
server.listen(5)  # 监听连接

inputs = [server]
outputs = []
message_queues = {}

print(f'Server start at: {HOST}:{PORT}')

while inputs:
    readable, writable, exceptional = select.select(inputs, outputs, inputs)

    for s in readable:
        if s is server:
            conn, addr = s.accept()  # 接受客户端连接
            print(f'Connected by {addr}')
            conn.setblocking(0)  # 设置连接为非阻塞
            inputs.append(conn)
            message_queues[conn] = Queue()
        else:
            data = s.recv(1024)  # 接收1024字节的内容
            if data:
                print(f'received "{data}" from {s.getpeername()}')
                message_queues[s].put(data)
                if s not in outputs:
                    outputs.append(s)
            else:
                if s in outputs:
                    outputs.remove(s)
                inputs.remove(s)
                del message_queues[s]
                s.close()

    for s in writable:
        try:
            next_msg = message_queues[s].get_nowait()
        except Empty:
            outputs.remove(s)
        else:
            s.send(bytes(f'Server received {next_msg}', 'utf-8'))

    for s in exceptional:
        inputs.remove(s)
        if s in outputs:
            outputs.remove(s)
        s.close()
        del message_queues[s]

[2] poll

  • 代码逻辑
    • Python中,用一个类来实现poll,由这个类管理监视的注册套接字。套接字通过register方法来进行注册,同时利用标志位READ_ONLY指示套接字关注那些事件。
    • poll的标志位有五种种:POLLIN,有数据读取;POLLPRI,有优先级数据读取;POLLOUT,能够输出POLLHUP,挂起;POLLERR,错误;POLLNVAL,套接字未打开。
    • 代码循环中,遍历每个事件,看每个标志位flag。第一个if表示标志位可读,里面的逻辑和select的很像,唯一的区别是新注册的套接字会注册下。如果套接字关闭了,那就取消注册直接关闭。如果标志位可写了,就会把消息取出,然后修改消息状态为不可写状态并发送数据。
    • 需要注意的是,把数据存储起来之后,会通过poller.modify将标志位改为读写,也就是关注POLLOUT事件。
  • 模式优点
    • poll的实现了select的非常的相似,只是描述集合的方式不同,poll是基于链表存储文件描述符的。它只解决了文件描述符数量的限制,并没有解决后面两个缺点,所以也不适用于大并发的场景。
    • poll还有一个特点就是水平触发,也就是报告了文件描述符之后一直没有处理,就会一直通知。如果用户一直不处理,就会导致每次都会有这个事件从内核到用户空间的拷贝,耗费性能。但是,水平触发的优点在于事件永远不会丢失。
import socket
import select
from queue import Queue, Empty

HOST = '127.0.0.1'
PORT = 8001

server = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
server.setblocking(0)
server.bind((HOST, PORT))
server.listen(5)

message_queues = {}
TIMEOUT = 500  # 超时时间0.5秒
fd_to_socket = {  # 一个文件描述符到套接字的映射
    server.fileno(): server,
}
READ_ONLY = (
    select.POLLIN |
    select.POLLPRI |
    select.POLLHUP |
    select.POLLERR
) # 4种事件的并集
READ_WRITE = READ_ONLY | select.POLLOUT

poller = select.poll()
poller.register(server, READ_ONLY)  # 给server套接字注册,它会关注READ_ONLY列出的4种事件

print(f'Server start at: {HOST}:{PORT}')

while 1:
    events = poller.poll(TIMEOUT)
    for fd, flag in events:
        s = fd_to_socket[fd]
        if flag & (select.POLLIN | select.POLLPRI):  # 输入准备就绪了,也就是可读了
            if s is server:
                conn, addr = s.accept()
                print(f'Connected by {addr}')
                conn.setblocking(0)
                fd_to_socket[conn.fileno()] = conn
                poller.register(conn, READ_ONLY)  # 新注册的套接字都关注READ_ONLY事件
                message_queues[conn] = Queue()
            else:
                data = s.recv(1024)
                if data:
                    print(f'received "{data}" from {s.getpeername()}')
                    message_queues[s].put(data)
                    poller.modify(s, READ_WRITE)  # 从缓冲区获取内容后,也关注POLLOUT事件了
                else:
                    poller.unregister(s)  # 没有可用数据的套接字说明客户端关闭了,取消注册
                    s.close()
        elif flag & select.POLLHUP:  # 套接字关闭了
            poller.unregister(s)
            s.close()
        elif flag & select.POLLOUT:  # 能够输出了,也就是可写了
            try:
                next_msg = message_queues[s].get_nowait()
            except Empty:
                poller.modify(s, READ_ONLY)  # 修改套接字关注的时间类型,因为它已经恢复不可写状态了
            else:
                s.send(bytes(f'Server received {next_msg}', 'utf-8'))
        elif flag & select.POLLERR:  # 错误的套接字
            poller.unregister(s)
            s.close()
            del message_queues[s]

[3] epoll

  • 代码逻辑
    • epoll的方法和用法和poll非常的像,为了便于了解这种I/O复用的写法,这里使用的是一个简化的示例。在这个示例中,关注的事件都是单一的,只是让事件在epoll.inepoll.out之间进行转换。所以编写I/O复用的代码时,就选择epoll就可以了。
  • 模式优点
    • epoll支持的上限是最大可以打开文件的数目,这个数目是很大的,完全可以满足现在对应高并发的数量需求。同时,epoll支持水平触发和边缘触发。
    • epoll的解决方法不像selectpoll每次对所有进行遍历轮询所有集合,而是在注册新的事件时,为每个指定一个回调函数,当设备就绪的时候,调用这个回调函数,这个回调函数就会把就绪的加入一个就绪表中,所以epoll实际只需要遍历就绪表。
    • epoll的解决方法是每次注册新的事件到epoll中,会把所有的拷贝进内核,而不是在等待的时候重复拷贝,保证了每个在整个过程中只会拷贝1次。
import socket
import select
from queue import Queue, Empty

HOST = '127.0.0.1'
PORT = 8001

server = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
server.setblocking(0)
server.bind((HOST, PORT))
server.listen(5)

message_queues = {}
TIMEOUT = 500
fd_to_socket = {
    server.fileno(): server,
}

epoller = select.epoll()
epoller.register(server, select.EPOLLIN)

print(f'Server start at: {HOST}:{PORT}')

while 1:
    events = epoller.poll(TIMEOUT)
    for fd, flag in events:
        s = fd_to_socket[fd]
        if s is server:
            conn, addr = s.accept()
            print(f'Connected by {addr}')
            conn.setblocking(0)
            epoller.register(conn, select.EPOLLIN)
            fd_to_socket[conn.fileno()] = conn
            message_queues[conn] = Queue()
        elif flag & (select.POLLIN | select.POLLPRI):
            data = s.recv(1024)
            if data:
                print(f'received "{data}" from {s.getpeername()}')
                message_queues[s].put(data)
                epoller.modify(s, select.EPOLLOUT)
            else:
                epoller.unregister(s)
                s.close()
        elif flag & select.POLLHUP:
            epoller.unregister(s)
            s.close()
        elif flag & select.POLLOUT:
            try:
                next_msg = message_queues[s].get_nowait()
            except Empty:
                epoller.modify(s, select.EPOLLIN)
            else:
                s.send(bytes(f'Server received {next_msg}', 'utf-8'))
        elif flag & select.POLLERR:
            poller.unregister(s)
            s.close()
            del message_queues[s]

3. 使用 selectors 模块

selectorsPython3.4新加入的一个模块,它封装了select模块,并暴露出了同一个调用接口,方便使用。这个模块定义了下述的五个子类,对应前面解释的几种I/O复用方案。还定义了一个DefaultSelector类,其为五个子类中的一个别名而已,它会自动选择当前环境中最有效的select方案,所以平时使用该类就可以了。

子类对应方案

  • SelectSelector - Select
  • PollSelector - Poll
  • EpollSelector - Epoll
  • DevpollSelector - 不常用
  • KqueueSelector - Kqueue

对应的标志位

  • EVENT_READ - 可读
  • EVENT_WRITE - 可写

示例说明和演示

  • 这是一个和之前效果相同的服务端写法,可以看到这个方案较epoll的那个有优化的地方,但是并不是很明显。
import socket
import selectors
from queue import Queue, Empty

HOST = '127.0.0.1'
PORT = 8001

sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock.setblocking(0)
sock.bind((HOST, PORT))
sock.listen(5)

sel = selectors.DefaultSelector()
message_queues = {}

print(f'Server start at: {HOST}:{PORT}')

sel.register(
    sock,
    selectors.EVENT_READ | selectors.EVENT_WRITE,
)

while 1:
    for key, mask in sel.select(timeout=0.5):
        conn = key.fileobj
        if conn is sock:
            conn, addr = sock.accept()
            print(f'Connected by {addr}')
            conn.setblocking(0)
            message_queues[conn] = Queue()
            sel.register(
                conn, selectors.EVENT_READ | selectors.EVENT_WRITE)
        elif mask & selectors.EVENT_READ:
            data = conn.recv(1024)
            if data:
                print(f'received "{data}" from {conn.getpeername()}')
                message_queues[conn].put(data)
        elif mask & selectors.EVENT_WRITE:
            try:
                next_msg = message_queues[conn].get_nowait()
            except Empty:
                pass
            else:
                conn.send(bytes(f'Server received {next_msg}', 'utf-8'))
                sel.modify(sock, selectors.EVENT_READ) # 从可写切换到可读状态
  • 而在Python3中,终极写法是这样带有回调函数的写法。通过示例可以看出,这种写法可以将读取和写入的部分抽象了出来,其不需要队列。如果之后,需要写这种I/O复用的程序,就推荐使用这种终极解决方案。
import socket
import selectors

HOST = '127.0.0.1'
PORT = 8001

sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock.setblocking(0)
sock.bind((HOST, PORT))
sock.listen(5)

sel = selectors.DefaultSelector()

print(f'Server start at: {HOST}:{PORT}')

def read(conn, mask):
    data = conn.recv(1024)
    if data:
        print(f'received "{data}" from {conn.getpeername()}')
        conn.send(bytes(f'Server received {data}', 'utf-8'))
    else:
        sel.unregister(conn)
        conn.close()


def accept(sock, mask):
    conn, addr = sock.accept()
    print(f'Connected by {addr}')
    conn.setblocking(0)
    sel.register(conn, selectors.EVENT_READ, read)


sel.register(sock, selectors.EVENT_READ, accept)

while 1:
    events = sel.select(0.5)
    for key, mask in events:
        callback = key.data
        callback(key.fileobj, mask)

文章作者: Escape
版权声明: 本博客所有文章除特別声明外,均采用 CC BY 4.0 许可协议。转载请注明来源 Escape !
  目录