同样在services.py 中自定义需要实现消息协议、传输控制,并且实现客户端存根clientStub和服务器端存根serverStub,服务器定义以及channel的定义。
此时,添加DistributedChannel分布式的channel,并在ThreadServer中添加了registry方法。
```
import threading
import random
import struct
import time
from io import BytesIO
import socket
import json
from kazoo.client import KazooClient
class InvalidOperation(BaseException):
def __init__(self, message = None):
self.message = message or 'involid operation'
class MethodProtocol(object):
''''
解读方法名
'''
def __init__(self, connection):
self.conn = connection
def _read_all(self, size):
"""
帮助我们读取二进制数据
:param size: 想要读取的二进制数据大小
:return: 二进制数据bytes
"""
# self.conn
if isinstance(self.conn, BytesIO):
buff = self.conn.read(size)
return buff
else:
# 有时候长度大于每次读取的长度
have = 0
buff = b''
while have < size:
chunk = self.conn.recv(size - have)
buff += chunk
l = len(chunk)
have += l
if l == 0:
# 表示客户端已经关闭了
raise EOFError
return buff
def get_method_name(self):
# 读取字符串长度
buff = self._read_all(4)
length = struct.unpack('!I',buff)[0]
# 读取字符串
buff = self._read_all(length)
name = buff.decode()
return name
class DivideProtocol(object):
"""
divide过程消息协议转换工具
"""
def args_encode(self, num1, num2=1):
"""
将原始调用的请求参数转换打包成二进制消息数据
:param num1: int
:param num2: int
:return: bytes 二进制消息数据
"""
name = 'divide'
# 处理函数名
buff = struct.pack('!I', 6) # 无符号int
buff += name.encode()
# 处理参数1
buff2 = struct.pack('!B', 1) # 无符号byte
buff2 += struct.pack('!i', num1)
# 处理参数2
if num2 != 1:
# 没有传参的时候
buff2 += struct.pack('!B', 2)
buff2 += struct.pack('!i', num2)
# 处理参数边界和组合成完整数据
buff += struct.pack('!I',len(buff2))
buff += buff2
return buff
def _read_all(self, size):
"""
帮助我们读取二进制数据
:param size: 想要读取的二进制数据大小
:return: 二进制数据bytes
"""
# self.conn
if isinstance(self.conn, BytesIO):
buff = self.conn.read(size)
return buff
else:
# 有时候长度大于每次读取的长度
have = 0
buff = b''
while have < size:
chunk = self.conn.recv(size - have)
buff += chunk
l = len(chunk)
have += l
if l == 0:
# 表示客户端已经关闭了
raise EOFError
return buff
def args_decode(self, connection):
"""
接受调用请求数据病进行解析
:param connection: 链接请求数据 socket BytesIO
:return: 因为有多个参数,定义为字典
"""
param_len_map = {
1:4,
2:4,
}
param_fmt_map = {
1:'!i',
2:'!i',
}
param_name_map = {
1: 'num1',
2: 'num2',
}
# 保存用来返回的参数字典
args = {}
self.conn = connection
# 处理方法的名字,已经提前被处理,稍后处理
# 处理消息边界
# 1) 读取二进制数据----read , ------ByteIO.read
# 2) 将二进制数据转换为python的数据类型
buff = self._read_all(4)
length = struct.unpack('!I',buff)[0]
# 记录已经读取的长度值
have = 0
# 处理第一个参数
# 解析参数序号
buff = self._read_all(1)
have += 1
param_seq = struct.unpack('!B', buff)[0]
# 解析参数值
param_len = param_len_map[param_seq]
buff = self._read_all(param_len)
have += param_len
param_fmt = param_fmt_map[param_seq]
param = struct.unpack(param_fmt,buff)[0]
# 设置解析后的字典
param_name = param_name_map[param_seq]
args[param_name] = param
if have >= length:
return args
# 处理第二个参数
# 解析参数序号
buff = self._read_all(1)
param_seq = struct.unpack('!B', buff)[0]
# 解析参数值
param_len = param_len_map[param_seq]
buff = self._read_all(param_len)
param_fmt = param_fmt_map[param_seq]
param = struct.unpack(param_fmt, buff)[0]
# 设置解析后的字典
param_name = param_name_map[param_seq]
args[param_name] = param
return args
def result_encode(self, result):
"""
将原始结果数据转换为消息协议二进制数据
:param result:
:return:
"""
if isinstance(result,float):
# 处理返回值类型
buff = struct.pack('!B', 1)
buff += struct.pack('!f', result)
return buff
else:
buff = struct.pack('!B', 2)
# 处理返回值
length = len(result.message)
# 处理字符串长度
buff += struct.pack('!I', length)
buff += result.message.encode()
return buff
def result_decode(self, connection):
"""
将返回值消息数据转换为原始返回值
:param connection: socket BytesIo
:return: float InvalidOperation对象
"""
self.conn = connection
# 处理返回值类型
buff = self._read_all(1)
result_type = struct.unpack('!B', buff)[0]
if result_type == 1:
#正常情况
buff = self._read_all(4)
val = struct.unpack('!f', buff)[0]
return val
else:
buff = self._read_all(4)
length = struct.unpack('!I', buff)[0]
# 读取字符串
buff = self._read_all(length)
message = buff.decode()
return InvalidOperation(message)
class Channel(object):
"""
用于客户端建立网络链接
"""
def __init__(self, host, port):
self.host = host
self.port = port
def get_connection(self):
"""
获取链接对象
:return: 与服务器通讯的socket
"""
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock.connect((self.host, self.port))
return sock
class DistributedChannel(object):
"""
支持分布式的zookeeper的RPC客户端链接工具
"""
def __init__(self):
# 创建kazoo对象,用来跟zookeeper链接,获取信息
zk = KazooClient('127.0.0.1:2181')
zk.start()
self.zk = zk
self._servers = []
self._get_servers() # 第一次,手动开启
def _get_servers(self, event=None):
"""
从zookeeper中获取所有可用的RPC服务器的地址
:return:
"""
self._servers = [] # 每次重新获取地址信息
# 从zookeeper中获取/rpc节点下的所有可用的rpc服务器节点
servers = self.zk.get_children('/rpc', watch=self._get_servers) # 监视的回调函数为自身
for server in servers:
addr_data = self.zk.get('/rpc/' + server)[0]
addr = json.loads(addr_data.decode())
self._servers.append(addr)
def _get_server(self):
"""
从可用的服务器列表中选出一台服务器
:return: {"host":xxx,"port":xxx}
"""
return random.choice(self._servers)
def get_connection(self):
"""
提供一个具体的与RPC服务器的链接socket
:return:
"""
while True:
addr = self._get_server()
print(addr)
try:
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock.connect((addr['host'], addr['port']))
except ConnectionRefusedError:
time.sleep(1)
continue
else:
return sock
class ThreadServer(object):
"""
多线成RPC服务器
"""
def __init__(self, host, port, handlers):
sock = socket.socket(socket.AF_INET,socket.SOCK_STREAM)
# 地址复用
sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
self.host = host
self.port = port
# 绑定地址
sock.bind((self.host, self.port))
# 因为在启动的方法中才开启监听,所以不在此处开启
# sock.listen(128)
self.sock = sock
self.handlers = handlers
def serve(self):
"""
开启服务器运行,提供RPC服务
:return:
"""
# 开启服务器的监听,等待客户端的链接请求
self.sock.listen(128)
print("服务器开启监听,ip地址为%s,port为%d..." % (self.host,self.port))
# 注册到zookeeper
self.register_zookeeper()
while True:
# 不断的接收客户端的链接请求
client_sock, client_addr = self.sock.accept()
print("与客户端%s建立连接" % str(client_addr))
t = threading.Thread(target= self.handle, args=(client_sock,))
t.start()
def register_zookeeper(self):
"""
在zookeeper中心注册本服务器的地址信息
:return:
"""
# 创建kazoo的客户端
zk = KazooClient('127.0.0.1:2181')
# 建立与zookeeper的链接
zk.start()
# 在zookeeper中创建节点保存数据
zk.ensure_path('/rpc')
data = json.dumps({'host':self.host,'port':self.port})
zk.create('/rpc/server', data.encode(), ephemeral=True, sequence=True)
# 子线程函数
def handle(self,client_sock):
"""
子线程调用的方法,用来处理一个客户段的请求
:return:
"""
# 交个ServerStub,完成客户端的具体的RPC的调用请求
stub = ServerStub(client_sock, self.handlers)
try:
while True:
# 不断的接收
stub.process()
except EOFError:
# 表示客户端关闭了连接
print('客户端关闭了连接')
client_sock.close()
class Server(object):
"""
RPC服务器
"""
def __init__(self, host, port, handlers):
sock = socket.socket(socket.AF_INET,socket.SOCK_STREAM)
# 地址复用
sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
self.host = host
self.port = port
# 绑定地址
sock.bind((self.host, self.port))
# 因为在启动的方法中才开启监听,所以不在此处开启
# sock.listen(128)
self.sock = sock
self.handlers = handlers
def serve(self):
"""
开启服务器运行,提供RPC服务
:return:
"""
# 开启服务器的监听,等待客户端的链接请求
self.sock.listen(128)
print("服务器开启监听,ip地址为%s,port为%d..." % (self.host,self.port))
while True:
# 不断的接收客户端的链接请求
client_sock, client_addr = self.sock.accept()
print("与客户端%s建立连接" % str(client_addr))
# 交个ServerStub,完成客户端的具体的RPC的调用请求
stub = ServerStub(client_sock, self.handlers)
try:
while True:
# 不断的接收
stub.process()
except EOFError:
# 表示客户端关闭了连接
print('客户端关闭了连接')
client_sock.close()
class ClientStub(object):
"""
用来帮助客户端完成远程过程调用 RPC调用
stub = ClientStub()
stub.divide(200, 100)
"""
def __init__(self, channel):
self.channel = channel
self.conn = self.channel.get_connection()
def divide(self, num1, num2 = 1):
# 将调用的参数打包成消息协议的数据
proto = DivideProtocol()
args = proto.args_encode(num1, num2)
# 将消息数据通过网络发送给服务器
self.conn.sendall(args)
# 接受服务器返回的消息数据,并进行解析
result = proto.result_decode(self.conn)
# 将结果之(正常float 或 异常InvalidOperation)返回给客户端
if isinstance(result,float):
return result
else:
raise result
class ServerStub(object):
"""
服务端存根
帮助服务端完成远端过程调用
"""
def __init__(self, connection, handlers):
"""
:param connection: 与客户端的链接
:param handlers: 真正的本地函数路由
此处不以map的形式处理,实现类的形式
class Handler:
@staticmethod
def divide():
pass
@staticmethod
def add():
pass
"""
self.conn = connection
self.method_proto = MethodProtocol(self.conn)
self.process_map = {
'divide': self._process_divide,
'add': self._process_add
}
self.handlers = handlers
def process(self):
"""
当服务端接受了客户的链接,建立好链接后,完成远端调用的处理
:return:
"""
# 接收消息数据,并解析方法的名字
name = self.method_proto.get_method_name()
# 根据解析获得的方法名,调用相应的过程协议,接收并解析消息数据
self.process_map[name]()
def _process_divide(self):
"""
处理除法过程调用
:return:
"""
proto = DivideProtocol()
args = proto.args_decode(self.conn)
# args = {'num1':xxx, 'num2':xxx}
# 除法过程的本地调用------------------->>>>>>>>>
# 将本地调用过程的返回值(包括可能的异常)打包成消息协议的数据,通过网络返回给客户端
try:
val = self.handlers.divide(**args)
except InvalidOperation as e:
ret_message = proto.result_encode(e)
else:
ret_message = proto.result_encode(val)
self.conn.sendall(ret_message)
def _process_add(self):
"""
处理加法过程调用
此方法暂时不识闲
:return:
"""
pass
if __name__ == '__main__':
# 目的:消息协议测试,模拟网络传输
# 构造消息数据
proto = DivideProtocol()
# 测试一
# divide(200,100)
# message = proto.args_encode(200,100)
# 测试二
message = proto.args_encode(200)
conn = BytesIO()
conn.write(message)
conn.seek(0)
# 解析消息数据
method_proto = MethodProtocal(conn)
name = method_proto.get_method_name()
print(name)
args = proto.args_decode(conn)
print(args)
```
接下来,修改server.py文件
server.py
```
from services import InvalidOperation
# from services import Server
from services import ThreadServer
import sys
class Handlers:
@staticmethod
def divide(num1, num2 = 1):
if num2 == 0:
raise InvalidOperation('ck_god_err')
val = num1/num2
return val
if __name__ == '__main__':
# 开启服务器
# _server = Server('127.0.0.1', 8000, Handlers)
# _server.serve()
# 从启动命令中提取服务器运行的ip地址和端口号,启动的多线程服务器
host = sys.argv[1]
port = int(sys.argv[2])
_server = ThreadServer(host, port, Handlers)
_server.serve()
```
最后,将 client.py文件也稍作修改。
```
import time
from services import ClientStub
# from services import Channel
from services import DistributedChannel
from services import InvalidOperation
# 创建与服务器的连接
# channel = Channel('127.0.0.1', 8000)
channel = DistributedChannel()
# 进行调用
for i in range(50):
try:
# 创建用于rpc调用的工具
stub = ClientStub(channel) # 初始化的时候才真正的创建连接了,所以放到里面
val = stub.divide(i * 100,100)
# val = stub.divide(i * 100)
# val = stub.divide( 100, 0)
except InvalidOperation as e:
print(e.message)
else:
print(val)
time.sleep(1)
```
ctrl + shift + T在pycharm中打开多个Terminal窗口
右键运行客户端,可以看到不断地随机切换服务器。
亿速云「云服务器」,即开即用、新一代英特尔至强铂金CPU、三副本存储NVMe SSD云盘,价格低至29元/月。点击查看>>
免责声明:本站发布的内容(图片、视频和文字)以原创、转载和分享为主,文章观点不代表本网站立场,如果涉及侵权请联系站长邮箱:is@yisu.com进行举报,并提供相关证据,一经查实,将立刻删除涉嫌侵权内容。