RPC服务器异步模型、进程池异步模型

asyncore是Python内置的异步IO库。asyncore负责socket事件轮询,用户编写代码时只需要提供回调方法即可,asyncore会在相应的事件到来时,调用用户提供的回调方法。比如当serversocketread事件到来时,会自动调用handle_accept方法, 当socket的read事件到来时,调用handle_read方法。

RPC服务器异步模型的实现

代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
#!/usr/bin/env python3
# -*- coding: utf-8 -*-

"""
rpc 异步IO模型
"""

__author__ = 'heqingliang'


import json
import struct
import socket
import asyncore
from io import BytesIO


class RPCHandler(asyncore.dispatcher_with_send):

def __init__(self, sock, addr):
super(RPCHandler, self).__init__(sock=sock)
self.addr = addr
self.handlers = { # 注册请求处理器
'fib': self.fib
}
self.rbuf = BytesIO() # 读缓冲区

def handle_close(self):
"""
连接关闭之前的回调方法
"""
print(self.addr, 'bye')
self.close()

def handle_read(self):
"""
有读事件时的回调方法
"""
while True:
content = self.recv(1024)
if content:
self.rbuf.write(content)
if len(content) < 1024:
break
self.handle_rpc()

def handle_rpc(self):
"""
将读到的消息解包并处理
"""
while True:
self.rbuf.seek(0)
length_prefix = self.rbuf.read(4)
if len(length_prefix) < 4: # 不足一个消息
break
length, = struct.unpack('I', length_prefix)
body = self.rbuf.read(length)
if len(body) < length: # 不足一个消息
break

request = json.loads(body.decode('utf-8'))
in_ = request['in']
params = request['params']
print(in_, params)
handler = self.handlers[in_]
handler(params) # 处理请求

left = self.rbuf.getvalue()[length + 4:] # 消息处理完了,截断缓冲区
self.rbuf = BytesIO()
self.rbuf.write(left)
self.rbuf.seek(0, 2) # 将游标挪到文件结尾,读到的数据内容继续添加到缓冲区末尾

def fib(self, params):
a, b = 0, 1
for i in range(params):
a, b = b, a + b
self.send_result('result', a)

def send_result(self, out, result):
response = json.dumps({'out': out, 'result': result}) # 响应的消息体
length_prefix = struct.pack('I', len(response)) # 响应的长度
self.send(length_prefix)
self.send(response.encode('utf-8'))


class RPCServer(asyncore.dispatcher):

def __init__(self, host, port):
super(RPCServer, self).__init__()
self.create_socket(socket.AF_INET, socket.SOCK_STREAM)
self.set_reuse_addr()
self.bind((host, port))
self.listen(256)

def handle_accept(self):
pair = self.accept()
if pair is not None:
sock, addr = pair
print(addr, 'comes')
RPCHandler(sock, addr)


def main():
RPCServer("localhost", 9987)
asyncore.loop()


if __name__ == '__main__':
main()

上面的代码使用了BytesIO作为读缓冲,用于缓存半包消息和刚刚从套接字那里读取到的字节数据。消息处理完毕之后要对读缓冲进行截断处理,将已经处理的字节数据截掉。BytesIO的读写游标要小心使用,读的时候游标从头开始,写的时候游标从尾部开始追加,seek函数用来移动游标。

RPC服务器进程池异步模型的实现

单个进程的异步模型,最多只能榨干一个CPU。在Python中,因为存在GIL锁,所以多线程并不能使用多核。如果要充分利用现代处理器的多核优势,可以使用多进程。

将进程池和事件轮询异步读写结合起来。进程池中的每个子进程内部都是一个事件循环,一个进程可以榨干一个 CPU 核心,多个进程就可以榨干多个CPU核心。

代码实现和前面的单进程异步模型差别不大,就是多了个创建进程池的调用。进程池的创建在服务器套接字启用监听队列之后进行,这样每个子进程都可以使用服务器套接字来获取新连接进行处理。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
class RPCServer(asyncore.dispatcher):

def __init__(self, host, port):
super(RPCServer, self).__init__()
self.create_socket(socket.AF_INET, socket.SOCK_STREAM)
self.set_reuse_addr()
self.bind((host, port))
self.listen(256)
signal.signal(signal.SIGCHLD, self.sig_child)
self.make_child(10) # 开辟 10 个子进程

def make_child(self, n):
for i in range(n):
pid = os.fork()
if pid < 0:
raise Exception("fork error")
elif pid == 0: # child process
break
else: # parent process
continue

def sig_child(self,sig, frame):
while True:
try:
os.waitpid(-1, os.WNOHANG)
break
except OSError as e:
if e.args[0] == errno.ECHILD:
return