RPC服务器单线程同步模型

RPC (Remote Procedure Call)即远程过程调用,是分布式系统常见的一种通信方法。

RPC的作用

随着企业 IT 服务的不断发展,单台服务器逐渐无法承受用户日益增长的请求压力时,就需要多台服务器联合起来构成服务集群共同对外提供服务。同时业务服务会随着产品需求的增多越来越肿,架构上必须进行服务拆分,一个完整的大型服务会被打散成很多很多独立的小服务,每个小服务会由独立的进程去管理来对外提供服务,这就是微服务。

当用户的请求到来时,需要将用户的请求分散到多个服务去各自处理,然后又需要将这些子服务的结果汇总起来呈现给用户。那么服务之间该使用何种方式进行交互就是需要解决的核心问题。RPC就是为解决服务之间信息交互而发明和存在的。

什么是RPC ?

简单的来说:当有两台不同服务器A和B,当服务器A需要调用服务器B中的某个函数时,服务器A需要把调用服务器B的函数名和参数封装成一个消息,然后发送给服务器B。当服务器B收到请求后,根据消息中的函数名和参数调用相应的函数,然后把结果返回给服务器A。

HTTP与RPC

HTTP 与 RPC 的关系就好比普通话与方言的关系。要进行跨企业服务调用时,往往都是通过 HTTP API,也就是普通话,虽然效率不高,但是通用,没有太多沟通的学习成本。但是在企业内部还是 RPC 更加高效,同一个企业公用一套方言进行高效率的交流,要比通用的 HTTP 协议来交流更加节省资源。整个中国有非常多的方言,正如有很多的企业内部服务各有自己的一套交互协议一样。虽然国家一直在提倡使用普通话交流,但是这么多年过去了,你回一趟家乡探个亲什么的就会发现身边的人还是流行说方言。

流行的RPC框架

  1. gRPC是Google最近公布的开源软件,基于最新的HTTP2.0协议,并支持常见的众多编程语言。

  2. Thrift是Facebook的一个开源项目,主要是一个跨语言的服务开发框架。

  3. Dubbo是阿里集团开源的一个极为出名的RPC框架,在很多互联网公司和企业应用中广泛使用。

RPC服务器单线程同步模型的实现

下面使用python实现一个简单的rpc通信,客户端通过调用服务器的fib函数,进行计算斐波那契级数,计算完后把结果返回给客户端。

客户端实现

代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
#!/usr/bin/env python3
# -*- coding: utf-8 -*-

"""
rpc customer
"""

__author__ = 'heqingliang'


import json
import struct
import socket


def rpc(sock, in_, params):
request = json.dumps({'in': in_, 'params': params}) # 请求的消息体
length_prefix = struct.pack('I', len(request)) # 请求的消息长度
sock.send(length_prefix)
sock.sendall(request.encode('utf-8'))

length_prefix = sock.recv(4) # 响应的长度
length, = struct.unpack('I', length_prefix)
body = sock.recv(length) # 响应的消息体
response = json.loads(body.decode('utf-8'))

return response['out'], response['result']


def main():
s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
s.connect(('localhost', 9987))
for i in range(1, 11):
out, result = rpc(s, 'fib', i)
print(out, result)
s.close()


if __name__ == '__main__':
main()

客户端使用长度前缀法来确定消息边界,通过Python内置的struct包将消息体的长度整数转成4个字节的长度前缀字符串。消息体使用json序列化。

每个消息都有相应的名称,请求的名称使用in字段表示,请求的参数使用params字段表示,响应的名称是out字段表示,响应的结果用result字段表示:

// 输入
{
    in: "fib",
    params: 1
}

// 输出
{
    out: "result",
    result: 1
}
服务端实现
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
#!/usr/bin/env python3
# -*- coding: utf-8 -*-

"""
rpc server
"""

__author__ = 'heqingliang'


import json
import struct
import socket


def fib(conn, params):
a, b = 0, 1
for i in range(params):
a, b = b, a + b
send_result(conn, 'result', a)


def send_result(conn, out, result):
response = json.dumps({'out': out, 'result': result}) # 响应的消息体
length_prefix = struct.pack('I', len(response)) # 响应的长度
conn.send(length_prefix)
conn.sendall(response.encode('utf-8'))


def handle_conn(conn, addr, handlers):
print(addr, 'comes')
while True:
length_prefix = conn.recv(4)
if not length_prefix: # 连接关闭了
conn.close()
break

length, = struct.unpack('I', length_prefix)
body = conn.recv(length) # 请求消息体
request = json.loads(body.decode('utf-8'))
in_ = request['in']
params = request['params']
print(in_, params)
handler = handlers[in_]
handler(conn, params) # 处理请求


def loop(sock, handlers):
while True:
conn, addr = sock.accept()
handle_conn(conn, addr, handlers) # 处理连接


def main():
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
sock.bind(('localhost', 9987))
sock.listen(256)

handlers = { # 注册请求处理器
'fib': fib
}

loop(sock, handlers)


if __name__ == '__main__':
main()

服务器根据RPC请求的in字段来查找相应的RPC Handler进行处理。如果想支持多种消息,可以在代码中增加更多的处理器函数,并将处理器函数注册到全局的handlers字典中。

运行效果

服务器运行效果:

('127.0.0.1', 50310) comes
fib 1
fib 2
fib 3
fib 4
fib 5
fib 6
fib 7
fib 8
fib 9
fib 10

客户端运行效果:

result 1
result 1
result 2
result 3
result 5
result 8
result 13
result 21
result 34
result 55