# Federation
Federation is a kind of clustering mechanism which provides high-availability and horizontal scaling.
In Federation mode, multiple gmqtt brokers can be grouped together and "act as one".
However, it is impossible to fulfill all requirements in MQTT specification in a distributed environment.
There are some limitations:
1. Persistent session cannot be resumed from another node.
2. Clients with same client id can connect to different nodes at the same time and will not be kicked out.
This is because session information only stores in local node and does not share between nodes.
## Quick Start
The following commands will start a two nodes federation, the configuration files can be found [here](./examples).
Start node1 in Terminal1:
```bash
$ gmqttd start -c path/to/retry_join/node1_config.yml
```
Start node2 in Terminate2:
```bash
$ gmqttd start -c path/to/retry_join/node2_config2.yml
```
After node1 and node2 is started, they will join into one federation atomically.
We can test the federation with `mosquitto_pub/sub`:
Connect to node2 and subscribe topicA:
```bash
$ mosquitto_sub -t topicA -h 127.0.0.1 -p 1884
```
Connect to node1 and send a message to topicA:
```bash
$ mosquitto_pub -t topicA -m 123 -h 127.0.0.1 -p 1883
```
The `mosquitto_sub` will receive "123" and print it in the terminal.
```bash
$ mosquitto_sub -t topicA -h 127.0.0.1 -p 1884
123
```
## Join Nodes via REST API
Federation provides gRPC/REST API to join/leave and query members information, see [swagger](./swagger/federation.swagger.json) for details.
In addition to join nodes upon starting up, you can join a node into federation by using `Join` API.
Start node3 with the configuration with empty `retry_join` which means that the node will not join any nodes upon starting up.
```bash
$ gmqttd start -c path/to/retry_join/join_node3_config.yml
```
We can send `Join` request to any nodes in the federation to get node3 joined, for example, sends `Join` request to node1:
```bash
$ curl -X POST -d '{"hosts":["127.0.0.1:8932"]}' '127.0.0.1:8083/v1/federation/join'
{}
```
And check the members in federation:
```bash
curl http://127.0.0.1:8083/v1/federation/members
{
"members": [
{
"name": "node1",
"addr": "192.168.0.105:8902",
"tags": {
"fed_addr": "192.168.0.105:8901"
},
"status": "STATUS_ALIVE"
},
{
"name": "node2",
"addr": "192.168.0.105:8912",
"tags": {
"fed_addr": "192.168.0.105:8911"
},
"status": "STATUS_ALIVE"
},
{
"name": "node3",
"addr": "192.168.0.105:8932",
"tags": {
"fed_addr": "192.168.0.105:8931"
},
"status": "STATUS_ALIVE"
}
]
}%
```
You will see there are 3 nodes ara alive in the federation.
## Configuration
```go
// Config is the configuration for the federation plugin.
type Config struct {
// NodeName is the unique identifier for the node in the federation. Defaults to hostname.
NodeName string `yaml:"node_name"`
// FedAddr is the gRPC server listening address for the federation internal communication.
// Defaults to :8901.
// If the port is missing, the default federation port (8901) will be used.
FedAddr string `yaml:"fed_addr"`
// AdvertiseFedAddr is used to change the federation gRPC server address that we advertise to other nodes in the cluster.
// Defaults to "FedAddr" or the private IP address of the node if the IP in "FedAddr" is 0.0.0.0.
// However, in some cases, there may be a routable address that cannot be bound.
// If the port is missing, the default federation port (8901) will be used.
AdvertiseFedAddr string `yaml:"advertise_fed_addr"`
// GossipAddr is the address that the gossip will listen on, It is used for both UDP and TCP gossip. Defaults to :8902
GossipAddr string `yaml:"gossip_addr"`
// AdvertiseGossipAddr is used to change the gossip server address that we advertise to other nodes in the cluster.
// Defaults to "GossipAddr" or the private IP address of the node if the IP in "GossipAddr" is 0.0.0.0.
// If the port is missing, the default gossip port (8902) will be used.
AdvertiseGossipAddr string `yaml:"advertise_gossip_addr"`
// RetryJoin is the address of other nodes to join upon starting up.
// If port is missing, the default gossip port (8902) will be used.
RetryJoin []string `yaml:"retry_join"`
// RetryInterval is the time to wait between join attempts. Defaults to 5s.
RetryInterval time.Duration `yaml:"retry_interval"`
// RetryTimeout is the timeout to wait before joining all nodes in RetryJoin successfully.
// If timeout expires, the server will exit with error. Defaults to 1m.
RetryTimeout time.Duration `yaml:"retry_timeout"`
// SnapshotPath will be pass to "SnapshotPath" in serf configuration.
// When Serf is started with a snapshot,
// it will attempt to join all the previously known nodes until one
// succeeds and will also avoid replaying old user events.
SnapshotPath string `yaml:"snapshot_path"`
// RejoinAfterLeave will be pass to "RejoinAfterLeave" in serf configuration.
// It controls our interaction with the snapshot file.
// When set to false (default), a leave causes a Serf to not rejoin
// the cluster until an explicit join is received. If this is set to
// true, we ignore the leave, and rejoin the cluster on start.
RejoinAfterLeave bool `yaml:"rejoin_after_leave"`
}
```
## Implementation Details
### Inner-node Communication
Nodes in the same federation communicate with each other through a couple of gRPC streaming apis:
```proto
message Event {
uint64 id = 1;
oneof Event {
Subscribe Subscribe = 2;
Message message = 3;
Unsubscribe unsubscribe = 4;
}
}
service Federation {
rpc Hello(ClientHello) returns (ServerHello){}
rpc EventStream (stream Event) returns (stream Ack){}
}
```
In general, a node is both Client and Server which implements the `Federation` gRPC service.
* As Client, the node will send subscribe, unsubscribe and message published events to other nodes if necessary.
Each event has a EventID, which is incremental and unique in a session.
* As Server, when receives a event from Client, the node returns an acknowledgement after the event has been handled successfully.
### Session State
The event is designed to be idempotent and will be delivered at least once, just like the QoS 1 message in MQTT protocol.
In order to implement QoS 1 protocol flows, the Client and Server need to associate state with a SessionID,
this is referred to as the Session State. The Server also stores the federation tree and retained messages as part of the Session State.
The Session State in the Client consists of:
* Events which have been sent to the Server, but have not been acknowledged.
* Events pending transmission to the Server.
The Session State in the Server consists of:
* The existence of a Session, even if the rest of the Session State is empty.
* The EventID of the next event that the Server is willing to accept.
* Events which have been received from the Client, but have not sent acknowledged yet.
The Session State stores in memory only. When the Client starts, it generates a random UUID as SessionID.
When the Client detects a new node is joined or reconnects to the Server, it sends the `Hello` request which contains the SessionID to perform a handshake.
During the handshake, the Server will check whether the session for the SessionID exists.
* If the session not exists, the Server sends response with `clean_start=true`.
* If the session exists, the Server sends response with `clean_start=false` and sets the next EventID that it is willing to accept to `next_event_id`.
After handshake succeed, the Client will start `EventStr
代码先觉
- 粉丝: 7003
- 资源: 274
最新资源
- 基于hadoop的百度云盘源代码(亲测可用完整项目代码)
- CTF比赛工具合集-多种竞赛场景源码.zip
- 基于深度学习4j价格预测与语义分析源码+实战项目.zip
- Django+MySQL新冠疫情数据可视化平台源码+设计报告.zip
- 基于Django自动化测试管理系统python源码+设计报告(高分项目).zip
- Django高考志愿填报智能推荐系统python源码+设计论文(2024年毕业设计).zip
- Docker和Kubernetes构建的服务管理系统设计与实现 + 设计报告(Java版).zip
- DQN柔性作业车间调度-带插单的动态调度问题(含源码+项目说明+设计报告).zip
- ESP32-CAM+MicroPython+Flask智能Web视频监控与目标检测系统 + 设计报告.zip
- ESP8266墨水屏开发板项目(C++源码+硬件PCB资料+开发手册).zip
- Fisco Bcos支持的NFT数字藏品网站-交易不可篡改且可追溯溯源(含源码及论文资料).zip
- Facenet深度学习人脸识别系统(含源码+项目说明+亚洲人脸数据集+设计报告).zip
- FISCOBCOS匿名投票系统源码+设计报告及全部资料(Paillier加密).zip
- FPGA加速交通标志识别-卷积神经网络实现(含源码+项目说明+硬件设计).zip
- Flask框架YOLOv5检测训练源码+快速搭建手册.zip
- GAILC2024无人机双光检测Rank6源码+项目报告文档.zip
资源上传下载、课程学习等过程中有任何疑问或建议,欢迎提出宝贵意见哦~我们会及时处理!
点击此处反馈