分布式RPC的实现

前面几节讲的都是单机RPC服务的模式,无论是多线程也好多进程也好,它们都只能算是单点的设计。如果节点故障,则不能再提供服务。如果要使得服务可以容忍个别节点故障仍能继续对外提供服务,则要实现分布式。

服务发现

通过服务发现技术,当RPC服务节点增加或减少时,客户端可以动态快速收到服务列表的变更信息,从而可以实时调整连接配置,这样无需重启就可以完成服务的扩容和缩容。

服务发现技术依赖于服务之间的特殊中间节点。这个节点的作用就是接受服务的注册,提供服务的查找,以及服务列表变更的实时通知功能。这可以通过Zookeeper来实现。

rpc_node.png

  • 服务注册——服务节点在启动时将自己的服务地址注册到中间节点
  • 服务查找——客户端启动时去中间节点查询服务地址列表
  • 服务变更通知——客户端在中间节点上订阅依赖服务列表的变更事件。当依赖的服务列表变更时,中间节点负责将变更信息实时通知给客户端。

服务端的实现

代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
#!/usr/bin/env python3
# -*- coding: utf-8 -*-

"""
分布式 rpc
"""

__author__ = 'heqingliang'


import json
import struct
import socket
import asyncore
import sys
from io import BytesIO
from kazoo.client import KazooClient


class RPCHandler(asyncore.dispatcher_with_send):

def __init__(self, sock, addr):
super(RPCHandler, self).__init__(sock=sock)
self.addr = addr
self.handlers = { # 注册请求处理器
'fib': self.fib
}
self.rbuf = BytesIO() # 读缓冲区

def handle_close(self):
"""
连接关闭之前的回调方法
"""
print(self.addr, 'bye')
self.close()

def handle_read(self):
"""
有读事件时的回调方法
"""
while True:
content = self.recv(1024)
if content:
self.rbuf.write(content)
if len(content) < 1024:
break
self.handle_rpc()

def handle_rpc(self):
"""
将读到的消息解包并处理
"""
while True:
self.rbuf.seek(0)
length_prefix = self.rbuf.read(4)
if len(length_prefix) < 4: # 不足一个消息
break
length, = struct.unpack('I', length_prefix)
body = self.rbuf.read(length)
if len(body) < length: # 不足一个消息
break

request = json.loads(body.decode('utf-8'))
in_ = request['in']
params = request['params']
print(in_, params)
handler = self.handlers[in_]
handler(params) # 处理请求

left = self.rbuf.getvalue()[length + 4:] # 消息处理完了,截断缓冲区
self.rbuf = BytesIO()
self.rbuf.write(left)
self.rbuf.seek(0, 2) # 将游标挪到文件结尾,读到的数据内容继续添加到缓冲区末尾

def fib(self, params):
a, b = 0, 1
for i in range(params):
a, b = b, a + b
self.send_result('result', a)

def send_result(self, out, result):
response = json.dumps({'out': out, 'result': result}) # 响应的消息体
length_prefix = struct.pack('I', len(response)) # 响应的长度
self.send(length_prefix)
self.send(response.encode('utf-8'))


class RPCServer(asyncore.dispatcher):

zk_root = '/demo'
zk_rpc = zk_root + '/rpc'

def __init__(self, host, port):
super(RPCServer, self).__init__()
self.host = host
self.port = port
self.create_socket(socket.AF_INET, socket.SOCK_STREAM)
self.set_reuse_addr()
self.bind((host, port))
self.listen(256)
self.register_zk()

def register_zk(self):
"""
注册zookeeper服务
"""
self.zk = KazooClient(hosts='127.0.0.1:2181')
self.zk.start()
self.zk.ensure_path(self.zk_root) # 创建根节点
value = json.dumps({'host': self.host, 'port': self.port})

# 创建子节点
self.zk.create(self.zk_rpc, value.encode('utf-8'), ephemeral=True, sequence=True)

def handle_accept(self):
pair = self.accept()
if pair is not None:
sock, addr = pair
print(addr, 'comes')
RPCHandler(sock, addr)


def main():
host = sys.argv[1]
port = int(sys.argv[2])
RPCServer(host, port)
asyncore.loop()


if __name__ == '__main__':
main()

服务端使用的异步IO模型,只是多了一个Zookeeper服务的注册过程。register_zk创建一个根节点/demo,每个rpc服务都会在根节点下创建一个临时顺序节点,节点中的数据为:本机的ip和端口。当有某个rpc服务出现故障时,则对应的临时顺序节点也会被删除。

客户端的实现

代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
#!/usr/bin/env python3
# -*- coding: utf-8 -*-

"""
rpc customer
"""

__author__ = 'heqingliang'


import json
import struct
import socket
import random
from kazoo.client import KazooClient


__zk_root = '/demo'

__remote_servers = {'servers': None}


class RemoteServer(object):

def __init__(self, addr):
self.addr = addr
self._socket = None

@property
def socket(self):
if not self._socket:
self.connect()
return self._socket

def connect(self):
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
host, port = self.addr.split(':')
sock.connect((host, int(port)))
self._socket = sock

def close(self):
if self._socket:
self._socket.close()
self._socket = None

def fib(self, n):
return self.rpc('fib', n)

def rpc(self, in_, params):
sock = self.socket
request = json.dumps({'in': in_, 'params': params}) # 请求的消息体
length_prefix = struct.pack('I', len(request)) # 请求的消息长度
sock.send(length_prefix)
sock.sendall(request.encode('utf-8'))

length_prefix = sock.recv(4) # 响应的长度
length, = struct.unpack('I', length_prefix)
body = sock.recv(length) # 响应的消息体
response = json.loads(body.decode('utf-8'))

return response['out'], response['result']


def get_servers():
zk = KazooClient(hosts='127.0.0.1:2181')
zk.start()
current_addrs = set() # 当前活跃地址列表

def watch_servers(*args):
"""
zookeeper的'/demo'节点发生变化时的回调函数
"""
new_addrs = set()

# 获取节点列表
for child in zk.get_children(__zk_root, watch=watch_servers):
node = zk.get(__zk_root + '/' + child)
addr = json.loads(node[0].decode('utf-8'))
new_addrs.add('%s:%d' % (addr['host'], addr['port']))

print('current_addrs: %s', current_addrs)
print('new_addrs: %s', new_addrs)

# 新添加的server地址
add_addrs = new_addrs - current_addrs

# 删除的server地址
del_addrs = current_addrs - new_addrs
del_servers = []

# 先找出所有的待删除server对象
for addr in del_addrs:
for s in __remote_servers['servers']:
if s.addr == addr:
del_servers.append(s)
break

# 依次删除每个server
for server in del_servers:
__remote_servers['servers'].remove(server)
current_addrs.remove(server.addr)

# 新增server
for addr in add_addrs:
__remote_servers['servers'].append(RemoteServer(addr))
current_addrs.add(addr)

# 获取节点列表并监听服务列表变更
for child in zk.get_children(__zk_root, watch=watch_servers):
node = zk.get(__zk_root + '/' + child)
addr = json.loads(node[0].decode('utf-8'))
current_addrs.add('%s:%d' % (addr['host'], addr['port']))
__remote_servers['servers'] = [RemoteServer(s) for s in current_addrs]
return __remote_servers['servers']


def random_server():
"""
随机获取一个服务节点
"""
if __remote_servers['servers'] is None:
if not get_servers(): # 初始化服务列表
return
return random.choice(__remote_servers['servers'])


def main():
for i in range(1, 1000):
server = random_server()
if not server:
break

try:
out, result = server.fib(i)
print(server.addr, out, result)
except Exception as e:
server.close() # 遇到错误,关闭连接
print(e)


if __name__ == '__main__':
main()

客户端在启动时,通过get_servers获得Zookeeper根节点/demo的所有节点,得到的是一个服务列表,有多个IP端口对。并在/demo节点设置了监听器watch_servers,当有节点被添加和删除时,此监听器会被调用,用于维护__remote_servers的服务列表。客户端调用random_server函数随机地从__remote_servers的服务列表地挑选任意的RPC服务节点进行连接。