RPC服务器多线程、多进程同步模型

上节编写了一个最简单的 RPC 服务器模型,简单到同时只能处理单个连接。本节为服务器增加多线程、多进程并发处理能力,同时可以处理多个客户端连接。

多线程

代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
#!/usr/bin/env python3
# -*- coding: utf-8 -*-

"""
rpc 多线程模型
"""

__author__ = 'heqingliang'


import json
import struct
import socket
import threading


def fib(conn, params):
a, b = 0, 1
for i in range(params):
a, b = b, a + b
send_result(conn, 'result', a)


def send_result(conn, out, result):
response = json.dumps({'out': out, 'result': result}) # 响应的消息体
length_prefix = struct.pack('I', len(response)) # 响应的长度
conn.send(length_prefix)
conn.sendall(response.encode('utf-8'))


def handle_conn(conn, addr, handlers):
print(addr, 'comes')
while True:
length_prefix = conn.recv(4)
if not length_prefix: # 连接关闭了
conn.close()
break

length, = struct.unpack('I', length_prefix)
body = conn.recv(length) # 请求消息体
request = json.loads(body.decode('utf-8'))
in_ = request['in']
params = request['params']
print(in_, params)
handler = handlers[in_]
handler(conn, params) # 处理请求


def loop(sock, handlers):
while True:
conn, addr = sock.accept()
t = threading.Thread(target=handle_conn, args=(conn, addr, handlers))
t.start() # 启动新线程处理连接


def main():
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
sock.bind(('localhost', 9987))
sock.listen(256)

handlers = { # 注册请求处理器
'fib': fib
}

loop(sock, handlers)


if __name__ == '__main__':
main()

对于每来一个新连接,服务器则开启一个新的线程单独进行处理。每个线程都是同步读写客户端连接。

多进程

上面完成了一个简单的多线程服务器,可以并发处理多个客户端连接。但是Python里多线程使用的并不常见。

因为Python的线程虽然是真正的线程,但解释器执行代码时,有一个GIL锁:Global Interpreter Lock,任何Python线程执行前,必须先获得GIL锁,然后,每执行100条字节码,解释器就自动释放GIL锁,让别的线程有机会执行。这个GIL全局锁实际上把所有线程的执行代码都给上了锁,所以,多线程在Python中只能交替执行,即使100个线程跑在100核CPU上,也只能用到1个核。

所以多数Python服务器推荐使用多进程模型。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
#!/usr/bin/env python3
# -*- coding: utf-8 -*-

"""
rpc 多进程模型
"""

__author__ = 'heqingliang'


import json
import struct
import socket
import os
import signal
import errno


def fib(conn, params):
a, b = 0, 1
for i in range(params):
a, b = b, a + b
send_result(conn, 'result', a)


def send_result(conn, out, result):
response = json.dumps({'out': out, 'result': result}) # 响应的消息体
length_prefix = struct.pack('I', len(response)) # 响应的长度
conn.send(length_prefix)
conn.sendall(response.encode('utf-8'))


def handle_conn(conn, addr, handlers):
print(addr, 'comes')
while True:
length_prefix = conn.recv(4) # 连接关闭了
if not length_prefix:
conn.close()
break

length, = struct.unpack('I', length_prefix)
body = conn.recv(length) # 请求消息体
request = json.loads(body.decode('utf-8'))
in_ = request['in']
params = request['params']
print(in_, params)
handler = handlers[in_]
handler(conn, params) # 处理请求


def sig_child(sig, frame):
while True:
try:
os.waitpid(-1, os.WNOHANG)
break
except OSError as e:
if e.args[0] == errno.ECHILD:
return


def loop(sock, handlers):
while True:
conn, addr = sock.accept()
pid = os.fork()
if pid < 0: # fork error
return
elif pid == 0: # 子进程处理连接
sock.close()
handle_conn(conn, addr, handlers)
break
else:
conn.close()
continue


def main():
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
sock.bind(('localhost', 9987))
sock.listen(256)

handlers = { # 注册请求处理器
'fib': fib
}

signal.signal(signal.SIGCHLD, sig_child)

loop(sock, handlers)


if __name__ == '__main__':
main()

对于每来一个新连接,服务器则开启一个新的进程单独进行处理。每个进程都是同步读写客户端连接。