RPC服务器进程池、线程池同步模型

进程要比线程更加吃资源,如果来一个连接就开一个进程,当连接比较多时,进程数量也会跟着多起来,操作系统的调度压力也就会比较大。所以要对服务器开辟的进程数量进行限制,避免系统负载过重。

进程池

采用进程池可以对子进程的数量进行了限制。进程池是通过预先产生多个子进程,共同对服务器套接字进行竞争性的accept,当一个连接到来时,每个子进程都有机会拿到这个连接,但是最终只会有一个进程能accept成功返回拿到连接。子进程拿到连接后,进程内部可以继续使用单线程或者多线程同步的形式对连接进行处理。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
#!/usr/bin/env python3
# -*- coding: utf-8 -*-

"""
rpc 进程池模型
"""

__author__ = 'heqingliang'


import json
import struct
import socket
import os
import signal
import errno


def fib(conn, params):
a, b = 0, 1
for i in range(params):
a, b = b, a + b
send_result(conn, 'result', a)


def send_result(conn, out, result):
response = json.dumps({'out': out, 'result': result}) # 响应的消息体
length_prefix = struct.pack('I', len(response)) # 响应的长度
conn.send(length_prefix)
conn.sendall(response.encode('utf-8'))


def handle_conn(conn, addr, handlers):
print(addr, 'comes')
while True:
length_prefix = conn.recv(4)
if not length_prefix: # 连接关闭了
conn.close()
break

length, = struct.unpack('I', length_prefix)
body = conn.recv(length) # 请求消息体
request = json.loads(body.decode('utf-8'))
in_ = request['in']
params = request['params']
print(in_, params)
handler = handlers[in_]
handler(conn, params) # 处理请求


def loop(sock, handlers):
while True:
conn, addr = sock.accept()
handle_conn(conn, addr, handlers)


def sig_child(sig, frame):
while True:
try:
os.waitpid(-1, os.WNOHANG)
break
except OSError as e:
if e.args[0] == errno.ECHILD:
return


def make_child(n):
for i in range(n):
pid = os.fork()
if pid < 0:
raise Exception("fork error")
elif pid == 0: # child process
break
else: # parent process
continue


def main():
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
sock.bind(('localhost', 9987))
sock.listen(256)

signal.signal(signal.SIGCHLD, sig_child)

make_child(10)

handlers = { # 注册请求处理器
'fib': fib
}

loop(sock, handlers)


if __name__ == '__main__':
main()

如果并行的连接数超过进程池的数量,那么后来的客户端请求将会阻塞。上面的所有的进程都阻塞在accept上,这可能会发生惊群现象。

线程池

与进程池相对对应的,也可以采用线程池模型来实现。每个线程在调用accept时,需要获得锁。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
#!/usr/bin/env python3
# -*- coding: utf-8 -*-

"""
rpc 进程池模型
"""

__author__ = 'heqingliang'


import json
import struct
import socket
import threading


__lock = threading.Lock()


def fib(conn, params):
a, b = 0, 1
for i in range(params):
a, b = b, a + b
send_result(conn, 'result', a)


def send_result(conn, out, result):
response = json.dumps({'out': out, 'result': result}) # 响应的消息体
length_prefix = struct.pack('I', len(response)) # 响应的长度
conn.send(length_prefix)
conn.sendall(response.encode('utf-8'))


def handle_conn(conn, addr, handlers):
print(addr, 'comes')
while True:
length_prefix = conn.recv(4)
if not length_prefix: # 连接关闭了
conn.close()
break

length, = struct.unpack('I', length_prefix)
body = conn.recv(length) # 请求消息体
request = json.loads(body.decode('utf-8'))
in_ = request['in']
params = request['params']
print(in_, params)
handler = handlers[in_]
handler(conn, params) # 处理请求


def loop(sock, handlers):
while True:
__lock.acquire()
try:
conn, addr = sock.accept()
finally:
__lock.release()
continue
handle_conn(conn, addr, handlers)


def make_thread(n, sock, handlers):
for i in range(n):
t = threading.Thread(target=loop, args=(sock, handlers))
t.start()


def main():
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
sock.bind(('localhost', 9987))
sock.listen(256)

handlers = { # 注册请求处理器
'fib': fib
}

make_thread(10, sock, handlers)

loop(sock, handlers)


if __name__ == '__main__':
main()

与进程池类似,如果并行的连接数超过线程池的数量,那么后来的客户端请求将会阻塞。