基于zookeeper的分布式RPC(python)

  1. 客户端
  2. 服务端
  3. 测试

基于 zookeeper 实现分布式的 RPC服务,服务节点自由伸缩,客户端可以动态的接收到服务节点的变更

客户端

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
#!/usr/bin/ python3
# -*- coding: utf-8 -*-
import json
import time
import struct
import socket
import random
from kazoo.client import KazooClient

zk_root = "/demo"
# 全局变量,RemoteServer 对象列表
G = {"servers": None}


def random_server():
"""随机获取一个服务节点"""
if G["servers"] is None:
# 首次初始化服务列表
get_servers()
if not G["servers"]:
return
return random.choice(G["servers"])


def get_servers():
"""服务发现 获取服务节点列表"""
zk = KazooClient(hosts="127.0.0.1:2181")
zk.start()
# 当前活跃地址列表
current_addrs = set()

def watch_servers(*args):
"""服务变更通知 监听服务列表变更"""
new_addrs = set()
# 获取新的服务地址列表,并支持监听服务列表变动
for child in zk.get_children(zk_root, watch=watch_servers):
node = zk.get(zk_root + "/" + child)
addr = json.loads(node[0])
new_addrs.add("{}:{}".format(addr["host"], addr["port"]))

del_addrs = current_addrs - new_addrs # # 服务列表变更后,原列表中要删除的服务地址
del_servers = []

# 服务列表变更后,原列表要删除的RemoteServerr对象
for addr in del_addrs:
for s in G["servers"]:
if s.addr == addr:
del_servers.append(s)
break

# 删除待删除的RemoteServer
for server in del_servers:
G["servers"].remove(server)
current_addrs.remove(server.addr)

add_addrs = new_addrs - current_addrs # 新增的地址
# 新增server
for addr in add_addrs:
G["servers"].append(RemoteServer(addr))
current_addrs.add(addr)

# 获取节点列表并持续监听服务列表变更
for child in zk.get_children(zk_root, watch=watch_servers):
node = zk.get(zk_root + "/" + child)
addr = json.loads(node[0].decode())
current_addrs.add("{}:{}".format(addr["host"], addr["port"]))

G["servers"] = [RemoteServer(s) for s in current_addrs]


class RemoteServer:
"""封装rpc套接字对象"""
def __init__(self, addr):
self.addr = addr
self._socket = None

@property
def socket(self):
"""惰性连接"""
if not self._socket:
self.connect()
return self._socket

def connect(self):
"""创建连接"""
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
host, port = self.addr.split(":")
sock.connect((host, int(port)))
self._socket = sock

def reconnect(self):
"""重连"""
self.close()
self.connect()

def close(self):
"""关闭连接"""
if self._socket:
self._socket.close()
self._socket = None

def rpc(self, in_, params):
"""处理请求"""
sock = self.socket
request = json.dumps({"in": in_, "params": params})
length_prefix = struct.pack("I", len(request))
sock.send(length_prefix)
sock.sendall(request.encode())
length_prefix = sock.recv(4)
length, = struct.unpack("I", length_prefix)
body = sock.recv(length)
response = json.loads(body.decode())
return response["out"], response["result"]

def ping(self, message):
return self.rpc("ping", message)


if __name__ == '__main__':
for i in range(100):
server = random_server()
if not server:
break # 如果没有节点存活,就退出
time.sleep(1)
try:
out, result = server.ping("hello {}".format(i))
print(server.addr, out, result)
except Exception as ex:
server.close() # 遇到错误,关闭连接
print(ex)

server = random_server()
if not server:
break
time.sleep(1)
try:
out, result = server.pi(i)
print(server.addr, out, result)
except Exception as ex:
server.close()
print(ex)

服务端

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
#!/usr/bin/ python3
# -*- coding: utf-8 -*-
"""分布式多进程 RPC 服务"""
import os
import sys
import json
import errno
import struct
import signal
import socket
import asyncore
from io import BytesIO
from kazoo.client import KazooClient


class RPCServer(asyncore.dispatcher):
zk_root = "/demo"
zk_rpc = zk_root + "/rpc"

def __init__(self, host, port):
asyncore.dispatcher.__init__(self)
self.host = host
self.port = port
self.create_socket(socket.AF_INET, socket.SOCK_STREAM)
self.set_reuse_addr()
self.bind((host, port))
self.listen(1)
self.child_pids = []
# 创建子进程
if self.prefork(10):
self.register_zk() # 父进程 注册 zookeeper 服务
self.register_parent_signal() # 父进程善后处理
else:
self.register_child_signal() # 子进程善后处理

def prefork(self, n):
"""创建子进程 父进程中返回 True, 子进程返回 False"""
for i in range(n):
pid = os.fork()
if pid < 0:
raise RuntimeError()
if pid > 0:
self.child_pids.append(pid) # 父进程,记录下子进程的pid
continue
if pid == 0: # 子进程
return False
return True

def register_zk(self):
"""父进程创建zookeeper连接"""
self.zk = KazooClient(hosts='127.0.0.1:2181')
self.zk.start()
# 创建根节点
self.zk.ensure_path(self.zk_root)
value = json.dumps({"host": self.host, "port": self.port})
# 创建服务临时子节点, 路径后缀索引
self.zk.create(self.zk_rpc, value.encode(), ephemeral=True, sequence=True)

def register_parent_signal(self):
"""父进程监听信号量"""
signal.signal(signal.SIGINT, self.exit_parent) # 监听父进程退出
signal.signal(signal.SIGTERM, self.exit_parent) # 监听父进程退出
signal.signal(signal.SIGCHLD, self.reap_child) # 监听子进程退出, 处理意外退出的子进程,避免僵尸进程

def exit_parent(self, sig, frame):
"""父进程监听到 sigint 和 sigterm 信号, 关闭所有连接所有子进程"""
self.zk.stop() # 关闭 zk 客户端
self.close() # 关闭 serversocker
asyncore.close_all() # 关闭所有 clientsocket
pids = []
# 关闭子进程
for pid in self.child_pids:
print("before kill")
try:
os.kill(pid, signal.SIGINT) # 关闭子进程
pids.append(pid)
except OSError as ex:
# 目标子进程已经提前挂了
if ex.args[0] == errno.ECHILD:
continue
raise ex
print("after kill ", pid)
# 收割目标子进程
for pid in pids:
while True:
try:
# 子进程退出后,父进程必须通过 waitpid 收割子进程,否则子进程会称为僵尸进程
os.waitpid(pid, 0) # 收割目标子进程
break
except OSError as ex:
# 子进程已经被收割过了
if ex.args[0] == errno.ECHILD:
break
if ex.args[0] != errno.EINTR:
raise ex # 被其它信号打断了,要重试
print("wait over", pid)

def reap_child(self, sig, frame):
"""父进程监听到 sigchld 信号, 退出子进程"""
print("before reap")
while True:
try:
info = os.waitpid(-1, os.WNOHANG) # 收割任意子进程
break
except OSError as ex:
# 子进程已经被收割
if ex.args[0] == errno.ECHILD:
return
# 被其他信号打断要重试
if ex.args[0] != errno.EINTR:
raise ex
pid = info[0]
try:
self.child_pids.remove(pid)
except ValueError:
pass
print("after reap", pid)

def register_child_signal(self):
"""子进程监听信号"""
signal.signal(signal.SIGINT, self.exit_child) # 退出子进程
signal.signal(signal.SIGTERM, self.exit_child) # 退出子进程

def exit_child(self, sig, frame):
"""子进程监听到 sigint 和 sigterm 信号, 关闭子进程所有连接"""
self.close() # 关闭所有 server socket
asyncore.close_all() # 关闭所有 client socket
print("all closed")

def handle_accept(self):
"""接收连接"""
pair = self.accept()
if pair is not None:
sock, addr = pair
RPCHandler(sock, addr)


class RPCHandler(asyncore.dispatcher_with_send):
"""处理请求"""
def __init__(self, sock, addr):
asyncore.dispatcher_with_send.__init__(self, sock)
self.addr = addr
self.handlers = {
"ping": self.ping,
}
self.rbuf = BytesIO()

def handle_connect(self):
print(self.addr, "comes")

def handle_read(self):
"""接收请求"""
while True:
connect = self.recv(1024)
if connect:
self.rbuf.write(connect)
if len(connect) < 1024:
break
self.handle_rpc()

def handle_rpc(self):
"""处理一个接收一个完整的请求"""
while True:
self.rbuf.seek(0)
length_prefix = self.rbuf.read(4)
if len(length_prefix) < 4:
break
length, = struct.unpack("I", length_prefix)
body = self.rbuf.read(length)
if len(body) < length:
break
request = json.loads(body.decode())
in_ = request['in']
params = request['params']
print(os.getpid(), in_, params)
handler = self.handlers[in_]
handler(params)
left = self.rbuf.getvalue()[length+4:]
self.rbuf = BytesIO()
self.rbuf.write(left)

def ping(self, params):
self.send_result("pong", params)

def send_result(self, out, result):
"""给客户端发送消息"""
response = {"out": out, "result": result}
body = json.dumps(response)
length_prefix = struct.pack("I", len(body))
self.send(length_prefix)
self.send(body.encode())

def handle_close(self):
print(self.addr, "bye")
self.close()


if __name__ == '__main__':
host = sys.argv[1]
port = int(sys.argv[2])
RPCServer(host, port)
# 启动事件循环
asyncore.loop()

测试

image

  • 启动一个服务端 127.0.0.1:8080, 它就会把这个地址注册到 zookeeper
  • 启动两个客户端, 客户端会从 zookeeper 中取到服务端的地址 127.0.0.1:8080, 然后向服务端发送请求并收到响应
  • 再启动一个服务端 127.0.0.1"8081, 它同样会向 zookeeper 注册自己的节点
  • 两个客户端从 zookeeper 接收到新增的服务端 127.0.0.1:8081
  • 两个客户端随机的向两个服务端发送请求且接收到响应
  • 关闭一个服务端 127.0.0.1:8081, 从 zookeeper 中删除 127.0.0.1:8081 这个笛地址
  • 两个客户端接收到服务地址的更改,只向 127.0.0.1:8080 发送请求
  • 重新启动 127.0.0.1:8081, 发现客户端的请求有可以正常的发送到两个服务端

文章标题:基于zookeeper的分布式RPC(python)

文章字数:2.1k

本文作者:Waterandair

发布时间:2018-05-27, 09:24:06

最后更新:2019-12-28, 14:03:59

原始链接:https://waterandair.github.io/2018-05-27-rpc-distribute-zookeeper-python.html

版权声明: "署名-非商用-相同方式共享 4.0" 转载请保留原文链接及作者。

目录
×

喜欢就点赞,疼爱就打赏

github