# 使用`protobuf`和`gRPC`实现消息订阅系统
## 一、 实现过程
### 1.1 准备工作
本次作业我使用了3.6.9版本的`python`作为编程语言。在终端使用以下指令在`python`环境中安装`grpc`工具:
```shell
sudo pip3 install grpcio-tools
```
### 1.2 `proto`文件的编写和处理
`Protobuf`是一套类似`Json`或者`XML`的数据传输格式和规范,用于不同应用或进程之间进行通信时使用。通信时所传递的信息是通过`Protobuf`定义的`message`数据结构进行打包,然后编译成二进制的码流再进行传输或者存储。
`Protobuf`的消息结构是通过一种叫做`Protocol Buffer Language`的语言进行定义和描述的。编辑文件`pubsub.proto`,代码如下:
```protobuf
syntax = "proto3";
package rpc_package;
// 定义服务
service pubsub {
// 定义服务的接口
rpc pubsubServe (mes2server) returns (mes2client) {}
}
// 定义上述接口的参数数据类型
message mes2server {
string mes1 = 1;
}
message mes2client {
string mes2 = 1;
}
```
其中,`service pubsub`定义了需要编写的服务的名称,其接口为`rpc pubsubServe (mes2server) returns (mes2client) {}`,即通信时,客户端向服务器发送消息`mes2serve`,服务器返回消息`mes2client`给客户端。这两个消息在之后生成的代码中会以结构体的形式保存。之后的`message`定义了`mes2server`和`mes2client`结构体的数据,二者都为字符串。
在终端运行以下指令,使用`gRPC protobuf`生成工具生成对应语言的库函数:
```shell
python3 -m grpc_tools.protoc -I=./ --python_out=./ --grpc_python_out=./ ./pubsub.proto
```
各个参数的功能如下:
- `-m grpc_tools.protoc`表示使用`grpc_tools.protoc`的库模块,是之前`grpcio-tools`安装的内容
- `-I=./`设定源路径为当前文件夹下
- `--python_out=./`表示输出的`pb2`模块为`py`文件,输出位置为当前文件夹
- `--grpc_python_out=./`表示输出的`pb2_grpc`模块为`py`文件,输出位置为当前文件夹
- 最后的`./pubsub.proto`指出了`proto`文件所在的路径
指令执行后,会在当前文件夹下生成文件`pubsub_pb2.py`和`pubsub_pb2_grpc.py`。
我将当前文件夹命名为`rpc`,并在上一级目录下编写之后的服务器和客户端程序。
### 1.3 客户端程序的编写
文件组织形式如下:
```
.
├── rpc
│ ├── pubsub_pb2.py
│ ├── pubsub_pb2_grpc.py
│ └── pubsub.proto
├── client.py
└── server.py
```
客户端需要从之前生成的`pubsub_pb2.py`和`pubsub_pb2_grpc.py`获得消息的定义以及发送消息封装好的函数,因此需要导入:
```python
from rpc.pubsub_pb2 import mes2client, mes2server
from rpc.pubsub_pb2_grpc import pubsubStub
```
通过`grpc.insecure_channel`可以配置通信的服务器的IP地址和端口。这里设置本机地址和端口`50000`:
```python
with grpc.insecure_channel('localhost:50000') as channel:
```
通过之前导入的`pubsubStub`和配置好的服务器地址和端口创建客户端存根:
```python
stub = pubsubStub(channel)
```
之后在循环中调用之前声明好的服务`pubsubServe`即可。考虑到在消息订阅系统中的本质是服务器能向订阅的客户端统一发送消息的群法功能,我实现的程序中实现的是服务器发送消息而连接的客户端集体各自接受消息并打印,客户端不需要向服务器发送消息。这里为了演示,客户端将向服务器发送`'client'`字符串作为演示。
```python
mes = stub.pubsubServe(mes2server(mes1='client'), timeout=500)
print(mes)
```
其中,`mes2server`为之前的`proto`文件定义的结构体,其中的`mes1`为字符串。传入参数时不能直接传入`'client'`而必须使用赋值的形式`mes1='client'`,否则会产生以下错误:
```python
TypeError: No positional arguments allowed
```
依据题目要求,使用参数`timeout=500`,从而能够控制消息在服务器端存储的时间。
客户端完整代码如下:
```python
import logging
import grpc
from rpc.pubsub_pb2 import mes2client, mes2server
from rpc.pubsub_pb2_grpc import pubsubStub
def run():
# 创建通信信道
with grpc.insecure_channel('localhost:50000') as channel:
# 客户端通过stub来实现rpc通信
stub = pubsubStub(channel)
# 客户端必须使用定义好的类型
while 1:
try:
mes = stub.pubsubServe(mes2server(mes1='client'), timeout=500)
print(mes)
# 遇到ctrl+c时推出
except KeyboardInterrupt:
exit(0)
if __name__ == "__main__":
logging.basicConfig()
run()
```
### 1.4 服务器程序的编写
同样,服务器也要从之前生成的模块中导入消息定义和相关的函数:
```python
from rpc.pubsub_pb2_grpc import add_pubsubServicer_to_server, pubsubServicer
from rpc.pubsub_pb2 import mes2client, mes2server
```
首先创建服务器实例:
```python
server = grpc.server(ThreadPoolExecutor(max_workers=3))
```
`ThreadPoolExecutor`创建线程池,`max_workers=3`表示只有3个线程,换句话说,最多有三个客户端与服务器相连。`max_workers`的参数配置实现了题目要求的控制访问请求的数量。
然后# 将对应的任务处理函数添加到`rpc server`中:
```python
add_pubsubServicer_to_server(PubsubServer(), server)
```
其中`PubsubServer`为实现服务功能的、能在客户端调用其中功能的类,将会在下文详细介绍。
最后设置IP值和端口,开放服务器即可:
```python
server.add_insecure_port('[::]:50000')
server.start()
```
在类`class PubsubServer()`中,需要实现之前在`proto`文件中定义的服务。定义的服务为:
```protobuf
rpc pubsubServe (mes2server) returns (mes2client) {}
```
因此在该类中对应这一个成员功能:
```python
def pubsubServe(self, request, context)
```
参数`request`和 `context`是在之前生成的模块中给出的,不能自己修改。按照之前`proto`文件的定义,该函数需要返回一个`mes2client`类型的数据。
依据之前的讨论,消息订阅系统相当于服务器将消息广播给连接的客户端的能力。我准备实现的基本功能是服务器能够随时输入一个字符串,输入完成后所有的客户端都能收到该字符串。而每个服务器-客户端的连接由一个子线程完成(主线程一直处于循环等待中,不参与服务器与客户端的信息交互),必须要考虑多线程的影响。要能做到:
- 只有这些子线程能从服务器发消息到客户端
- 服务器输入消息之后,所有的子线程将该消息发给各自负责的客户端
- 服务器没有输入消息时,所有的线程需要阻塞,等待服务器的消息输入
- 只有有客户端等待消息时,服务器才需要发送消息。换句话说,服务器输入消息是由子线程发起的
如果每个子线程都能要求服务器输入消息发给子线程对应的客户端,群发消息就不能做到。各个线程会依次要求输入消息,每条消息只能发给一个客户端。因此考虑将消息作为共享变量,通过对消息输入的互斥机制来控制消息的输入和发送。
在`class PubsubServer`创建构造函数如下:
```python
def __init__(self):
self.threadLock = threading.Lock()
self.n = 0
self.mes = "default"
```
其中`threadLock`为实现线程互斥机制的锁,之后通过锁来控制对消息的输入和线程的阻塞等待输入。`n`为标志位,起到类似信号量的作用,表示消息是否能输入。`n==1`表示消息已经被输入,否则`n==0`,消息还没有输入,需要一个线程发起消息输入的命令,其他所有线程阻塞等待。等`n==1`时,输入完成,所有线程将输入的消息发给对应的客户端并�
![avatar](https://profile-avatar.csdnimg.cn/default.jpg!1)
shejizuopin
- 粉丝: 1w+
- 资源: 1305
最新资源
- 单相Boost PFC双闭环控制仿真模型:高精度功率因数与详细数据测量注释,单相Boost PFC双闭环控制仿真模型:高功率因数0.9995下的电压外环PI与电感电流滞环控制,详细数据测量及模块注释
- 基于Vue框架的消防一体化系统设计源码
- 衢州市乡镇边界,shp格式
- hotgo-移动应用开发资源
- unisrc-单片机开发资源
- 固态继电器电路.zip
- 光控照明灯自动开关.zip
- 光控式道路施工闪烁警示灯控制电路.zip
- 光电传感器与应用电路.zip
- 安川伺服电机与S7-200SMART PLC及MCGS7.7触摸屏联机程序例程:含CAD图纸、参数详解及运行效果视频说明书,安川伺服电机与西门子S7-200SMART PLC及MCGS7.7触摸屏联机
- 红外测量控器的发射与接收.zip
- 红外探测自动开关.zip
- 红外线集成器件sNS9201在延时开关中的应用.zip
- 霍尔传感器与应用电路.zip
- 继电器电路.zip
- 家用彩色幻灯电路.zip
资源上传下载、课程学习等过程中有任何疑问或建议,欢迎提出宝贵意见哦~我们会及时处理!
点击此处反馈
![feedback](https://img-home.csdnimg.cn/images/20220527035711.png)
![feedback](https://img-home.csdnimg.cn/images/20220527035711.png)
![feedback-tip](https://img-home.csdnimg.cn/images/20220527035111.png)
- 1
- 2
前往页