# Federation
Federation is a kind of clustering mechanism which provides high-availability and horizontal scaling. In Federation
mode, multiple gmqtt brokers can be grouped together and "act as one". However, it is impossible to fulfill all
requirements in MQTT specification in a distributed environment. There are some limitations:
1. Persistent session cannot be resumed from another node.
2. Clients with same client id can connect to different nodes at the same time and will not be kicked out.
This is because session information only stores in local node and does not share between nodes.
## Quick Start
The following commands will start a two nodes federation, the configuration files can be found [here](./examples).
Start node1 in Terminal1:
```bash
$ mqttd start -c path/to/retry_join/node1_config.yml
```
Start node2 in Terminate2:
```bash
$ mqttd start -c path/to/retry_join/node2_config2.yml
```
After node1 and node2 is started, they will join into one federation atomically.
We can test the federation with `mosquitto_pub/sub`:
Connect to node2 and subscribe topicA:
```bash
$ mosquitto_sub -t topicA -h 127.0.0.1 -p 1884
```
Connect to node1 and send a message to topicA:
```bash
$ mosquitto_pub -t topicA -m 123 -h 127.0.0.1 -p 1883
```
The `mosquitto_sub` will receive "123" and print it in the terminal.
```bash
$ mosquitto_sub -t topicA -h 127.0.0.1 -p 1884
123
```
## Join Nodes via REST API
Federation provides gRPC/REST API to join/leave and query members information,
see [swagger](./swagger/federation.swagger.json) for details. In addition to join nodes upon starting up, you can join a
node into federation by using `Join` API.
Start node3 with the configuration with empty `retry_join` which means that the node will not join any nodes upon
starting up.
```bash
$ mqttd start -c path/to/retry_join/join_node3_config.yml
```
We can send `Join` request to any nodes in the federation to get node3 joined, for example, sends `Join` request to
node1:
```bash
$ curl -X POST -d '{"hosts":["127.0.0.1:8932"]}' '127.0.0.1:57091/v1/federation/join'
{}
```
And check the members in federation:
```bash
curl http://127.0.0.1:57091/v1/federation/members
{
"members": [
{
"name": "node1",
"addr": "192.168.0.105:8902",
"tags": {
"fed_addr": "192.168.0.105:8901"
},
"status": "STATUS_ALIVE"
},
{
"name": "node2",
"addr": "192.168.0.105:8912",
"tags": {
"fed_addr": "192.168.0.105:8911"
},
"status": "STATUS_ALIVE"
},
{
"name": "node3",
"addr": "192.168.0.105:8932",
"tags": {
"fed_addr": "192.168.0.105:8931"
},
"status": "STATUS_ALIVE"
}
]
}%
```
You will see there are 3 nodes ara alive in the federation.
## Configuration
```go
// Config is the configuration for the federation plugin.
type Config struct {
// NodeName is the unique identifier for the node in the federation. Defaults to hostname.
NodeName string `yaml:"node_name"`
// FedAddr is the gRPC server listening address for the federation internal communication.
// Defaults to :8901.
// If the port is missing, the default federation port (8901) will be used.
FedAddr string `yaml:"fed_addr"`
// AdvertiseFedAddr is used to change the federation gRPC server address that we advertise to other nodes in the cluster.
// Defaults to "FedAddr" or the private IP address of the node if the IP in "FedAddr" is 0.0.0.0.
// However, in some cases, there may be a routable address that cannot be bound.
// If the port is missing, the default federation port (8901) will be used.
AdvertiseFedAddr string `yaml:"advertise_fed_addr"`
// GossipAddr is the address that the gossip will listen on, It is used for both UDP and TCP gossip. Defaults to :8902
GossipAddr string `yaml:"gossip_addr"`
// AdvertiseGossipAddr is used to change the gossip server address that we advertise to other nodes in the cluster.
// Defaults to "GossipAddr" or the private IP address of the node if the IP in "GossipAddr" is 0.0.0.0.
// If the port is missing, the default gossip port (8902) will be used.
AdvertiseGossipAddr string `yaml:"advertise_gossip_addr"`
// RetryJoin is the address of other nodes to join upon starting up.
// If port is missing, the default gossip port (8902) will be used.
RetryJoin []string `yaml:"retry_join"`
// RetryInterval is the time to wait between join attempts. Defaults to 5s.
RetryInterval time.Duration `yaml:"retry_interval"`
// RetryTimeout is the timeout to wait before joining all nodes in RetryJoin successfully.
// If timeout expires, the server will exit with error. Defaults to 1m.
RetryTimeout time.Duration `yaml:"retry_timeout"`
// SnapshotPath will be pass to "SnapshotPath" in serf configuration.
// When Serf is started with a snapshot,
// it will attempt to join all the previously known nodes until one
// succeeds and will also avoid replaying old user events.
SnapshotPath string `yaml:"snapshot_path"`
// RejoinAfterLeave will be pass to "RejoinAfterLeave" in serf configuration.
// It controls our interaction with the snapshot file.
// When set to false (default), a leave causes a Serf to not rejoin
// the cluster until an explicit join is received. If this is set to
// true, we ignore the leave, and rejoin the cluster on start.
RejoinAfterLeave bool `yaml:"rejoin_after_leave"`
}
```
## Implementation Details
### Inner-node Communication
Nodes in the same federation communicate with each other through a couple of gRPC streaming apis:
```proto
message Event {
uint64 id = 1;
oneof Event {
Subscribe Subscribe = 2;
Message message = 3;
Unsubscribe unsubscribe = 4;
}
}
service Federation {
rpc Hello(ClientHello) returns (ServerHello){}
rpc EventStream (stream Event) returns (stream Ack){}
}
```
In general, a node is both Client and Server which implements the `Federation` gRPC service.
* As Client, the node will send subscribe, unsubscribe and message published events to other nodes if necessary.
Each event has a EventID, which is incremental and unique in a session.
* As Server, when receives a event from Client, the node returns an acknowledgement after the event has been handled
successfully.
### Session State
The event is designed to be idempotent and will be delivered at least once, just like the QoS 1 message in MQTT
protocol. In order to implement QoS 1 protocol flows, the Client and Server need to associate state with a SessionID,
this is referred to as the Session State. The Server also stores the federation tree and retained messages as part of
the Session State.
The Session State in the Client consists of:
* Events which have been sent to the Server, but have not been acknowledged.
* Events pending transmission to the Server.
The Session State in the Server consists of:
* The existence of a Session, even if the rest of the Session State is empty.
* The EventID of the next event that the Server is willing to accept.
* Events which have been received from the Client, but have not sent acknowledged yet.
The Session State stores in memory only. When the Client starts, it generates a random UUID as SessionID. When the
Client detects a new node is joined or reconnects to the Server, it sends the `Hello` request which contains the
SessionID to perform a handshake. During the handshake, the Server will check whether the session for the SessionID
exists.
* If the session not exists, the Server sends response with `clean_start=true`.
* If the session exists, the Server sends response with `clean_start=false` and sets the next EventID that it is willing
to accept to `next_event_id`.
After handshake succeed, the Client will start
没有合适的资源?快使用搜索试试~ 我知道了~
Golang编写的超轻量级物联网平台,具有轻量级、快速、极低的内存占用等特性,特别适用于个人开发者或初创公司承接中小型物联网项目
共668个文件
go:544个
png:20个
yml:13个
1.该资源内容由用户上传,如若侵权请联系客服进行举报
2.虚拟产品一经售出概不退款(资源遇到问题,请及时私信上传者)
2.虚拟产品一经售出概不退款(资源遇到问题,请及时私信上传者)
版权申诉
0 下载量 172 浏览量
2023-12-11
11:09:29
上传
评论
收藏 19.74MB ZIP 举报
温馨提示
hummingbird(蜂鸟)是由Golang编写的超轻量级物联网平台,具有轻量级、快速、极低的内存占用等特性,特别适用于个人开发者或初创公司承接中小型物联网项目
资源推荐
资源详情
资源评论
收起资源包目录
Golang编写的超轻量级物联网平台,具有轻量级、快速、极低的内存占用等特性,特别适用于个人开发者或初创公司承接中小型物联网项目 (668个子文件)
CURRENT.bak 16B
extfile.cnf 78B
openssl.conf 113B
ca.crt 1KB
server.csr 798B
main.21abe5ff.css 96KB
CURRENT 16B
core.db 6.32MB
configuration.yml.dist 7KB
configuration.toml.dist 836B
Dockerfile 1KB
Dockerfile 1KB
.gitignore 159B
docs.go 149KB
server.go 42KB
client.go 41KB
federation.pb.go 41KB
subscription.pb.go 34KB
alertapp.go 30KB
client.pb.go 28KB
thinkmodel.go 26KB
servicemanager.go 24KB
sceneapp.go 22KB
properties.go 21KB
client.go 21KB
account.pb.go 21KB
client.go 21KB
test_suite.go 20KB
jwtauth.go 19KB
client.go 19KB
thingmodeltemplate.go 19KB
thingmodelapp.go 18KB
deviceapp.go 18KB
manager.go 17KB
federation.go 17KB
account.pb.gw.go 17KB
stats.go 17KB
packets.go 16KB
subscription.pb.gw.go 16KB
test_suite.go 15KB
federation.pb.gw.go 15KB
hook.go 14KB
federation_grpc.pb.go 13KB
client.pb.gw.go 13KB
device.go 13KB
service_mock.go 13KB
deviceservice.go 12KB
alertrule.go 12KB
deviceaction.go 12KB
parser.go 12KB
init.go 12KB
publish.pb.go 12KB
en.go 11KB
variables.go 11KB
device.go 11KB
dataresource.go 11KB
zh.go 11KB
client.go 11KB
trie_db.go 11KB
product.go 10KB
client.go 10KB
device.go 10KB
device.go 10KB
snowflake.go 10KB
account_grpc.pb_mock.go 9KB
logger.go 9KB
productapp.go 9KB
devicelibrary.go 9KB
product.go 9KB
product.go 9KB
redis.go 9KB
db.go 9KB
devicealert.go 9KB
subscription_test.go 9KB
config.go 8KB
subscription_grpc.pb.go 8KB
client.go 8KB
timerapp.go 8KB
alertrule.go 8KB
alertrule.go 8KB
api_registrar.go 8KB
account_grpc.pb.go 8KB
gateway.go 8KB
ruleengineapp.go 8KB
thinkmodelmessage.go 8KB
federation_grpc.pb_mock.go 8KB
peer.go 7KB
devicelibrary.go 7KB
topic_trie_test.go 7KB
user.go 7KB
config.go 7KB
driverlib.go 7KB
manage.go 7KB
hooks.go 7KB
subscription_mock.go 7KB
device.go 7KB
mem.go 7KB
server.go 6KB
subscription.go 6KB
queue_mock.go 6KB
共 668 条
- 1
- 2
- 3
- 4
- 5
- 6
- 7
资源评论
Java程序员-张凯
- 粉丝: 1w+
- 资源: 6713
上传资源 快速赚钱
- 我的内容管理 展开
- 我的资源 快来上传第一个资源
- 我的收益 登录查看自己的收益
- 我的积分 登录查看自己的积分
- 我的C币 登录后查看C币余额
- 我的收藏
- 我的下载
- 下载帮助
安全验证
文档复制为VIP权益,开通VIP直接复制
信息提交成功