/**
* Copyright © 2016-2018 The Thingsboard Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.thingsboard.server.service.telemetry;
import com.google.common.util.concurrent.FutureCallback;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.protobuf.InvalidProtocolBufferException;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Lazy;
import org.springframework.stereotype.Service;
import org.springframework.util.StringUtils;
import org.thingsboard.rule.engine.api.util.DonAsynchron;
import org.thingsboard.server.common.data.DataConstants;
import org.thingsboard.server.common.data.EntityType;
import org.thingsboard.server.common.data.id.DeviceId;
import org.thingsboard.server.common.data.id.EntityId;
import org.thingsboard.server.common.data.id.EntityIdFactory;
import org.thingsboard.server.common.data.kv.AttributeKvEntry;
import org.thingsboard.server.common.data.kv.BaseAttributeKvEntry;
import org.thingsboard.server.common.data.kv.BaseTsKvQuery;
import org.thingsboard.server.common.data.kv.BasicTsKvEntry;
import org.thingsboard.server.common.data.kv.BooleanDataEntry;
import org.thingsboard.server.common.data.kv.DataType;
import org.thingsboard.server.common.data.kv.DoubleDataEntry;
import org.thingsboard.server.common.data.kv.KvEntry;
import org.thingsboard.server.common.data.kv.LongDataEntry;
import org.thingsboard.server.common.data.kv.StringDataEntry;
import org.thingsboard.server.common.data.kv.TsKvEntry;
import org.thingsboard.server.common.data.kv.TsKvQuery;
import org.thingsboard.server.common.msg.cluster.ServerAddress;
import org.thingsboard.server.dao.attributes.AttributesService;
import org.thingsboard.server.dao.timeseries.TimeseriesService;
import org.thingsboard.server.gen.cluster.ClusterAPIProtos;
import org.thingsboard.server.service.cluster.routing.ClusterRoutingService;
import org.thingsboard.server.service.cluster.rpc.ClusterRpcService;
import org.thingsboard.server.service.state.DefaultDeviceStateService;
import org.thingsboard.server.service.state.DeviceStateService;
import org.thingsboard.server.service.telemetry.sub.Subscription;
import org.thingsboard.server.service.telemetry.sub.SubscriptionErrorCode;
import org.thingsboard.server.service.telemetry.sub.SubscriptionState;
import org.thingsboard.server.service.telemetry.sub.SubscriptionUpdate;
import javax.annotation.Nullable;
import javax.annotation.PostConstruct;
import javax.annotation.PreDestroy;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.TreeMap;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.function.Predicate;
import java.util.stream.Collectors;
/**
* Created by ashvayka on 27.03.18.
*/
@Service
@Slf4j
public class DefaultTelemetrySubscriptionService implements TelemetrySubscriptionService {
@Autowired
private TelemetryWebSocketService wsService;
@Autowired
private AttributesService attrService;
@Autowired
private TimeseriesService tsService;
@Autowired
private ClusterRoutingService routingService;
@Autowired
private ClusterRpcService rpcService;
@Autowired
@Lazy
private DeviceStateService stateService;
private ExecutorService tsCallBackExecutor;
private ExecutorService wsCallBackExecutor;
@PostConstruct
public void initExecutor() {
tsCallBackExecutor = Executors.newSingleThreadExecutor();
wsCallBackExecutor = Executors.newSingleThreadExecutor();
}
@PreDestroy
public void shutdownExecutor() {
if (tsCallBackExecutor != null) {
tsCallBackExecutor.shutdownNow();
}
if (wsCallBackExecutor != null) {
wsCallBackExecutor.shutdownNow();
}
}
private final Map<EntityId, Set<Subscription>> subscriptionsByEntityId = new ConcurrentHashMap<>();
private final Map<String, Map<Integer, Subscription>> subscriptionsByWsSessionId = new ConcurrentHashMap<>();
@Override
public void addLocalWsSubscription(String sessionId, EntityId entityId, SubscriptionState sub) {
Optional<ServerAddress> server = routingService.resolveById(entityId);
Subscription subscription;
if (server.isPresent()) {
ServerAddress address = server.get();
log.trace("[{}] Forwarding subscription [{}] for device [{}] to [{}]", sessionId, sub.getSubscriptionId(), entityId, address);
subscription = new Subscription(sub, true, address);
tellNewSubscription(address, sessionId, subscription);
} else {
log.trace("[{}] Registering local subscription [{}] for device [{}]", sessionId, sub.getSubscriptionId(), entityId);
subscription = new Subscription(sub, true);
}
registerSubscription(sessionId, entityId, subscription);
}
@Override
public void cleanupLocalWsSessionSubscriptions(TelemetryWebSocketSessionRef sessionRef, String sessionId) {
cleanupLocalWsSessionSubscriptions(sessionId);
}
@Override
public void removeSubscription(String sessionId, int subscriptionId) {
log.debug("[{}][{}] Going to remove subscription.", sessionId, subscriptionId);
Map<Integer, Subscription> sessionSubscriptions = subscriptionsByWsSessionId.get(sessionId);
if (sessionSubscriptions != null) {
Subscription subscription = sessionSubscriptions.remove(subscriptionId);
if (subscription != null) {
processSubscriptionRemoval(sessionId, sessionSubscriptions, subscription);
} else {
log.debug("[{}][{}] Subscription not found!", sessionId, subscriptionId);
}
} else {
log.debug("[{}] No session subscriptions found!", sessionId);
}
}
@Override
public void saveAndNotify(EntityId entityId, List<TsKvEntry> ts, FutureCallback<Void> callback) {
saveAndNotify(entityId, ts, 0L, callback);
}
@Override
public void saveAndNotify(EntityId entityId, List<TsKvEntry> ts, long ttl, FutureCallback<Void> callback) {
ListenableFuture<List<Void>> saveFuture = tsService.save(entityId, ts, ttl);
addMainCallback(saveFuture, callback);
addWsCallback(saveFuture, success -> onTimeseriesUpdate(entityId, ts));
}
@Override
public void saveAndNotify(EntityId entityId, String scope, List<AttributeKvEntry> attributes, FutureCallback<Void> callback) {
ListenableFuture<List<Void>> saveFuture = attrService.save(entityId, scope, attributes);
addMainCallback(saveFuture, callback);
addWsCallback(saveFuture, success -> onAttributesUpdate(entityId, scope, attributes));
}
@Override
public void saveAttrAndNotify(EntityId entityId, String scope, String key, long value, FutureCallback<Void> callback) {
saveAndNotify(entityId, scope, Collections.singletonList(new BaseAttributeKvEntry(new LongDataEntry(key, value)
, System.currentTimeMillis())), callback);
}
@Override
pub
没有合适的资源?快使用搜索试试~ 我知道了~
thingsboard:开源物联网平台thingsboard源码分析
共1718个文件
java:985个
js:274个
html:177个
5星 · 超过95%的资源 43 下载量 179 浏览量
2021-03-22
21:54:26
上传
评论 1
收藏 4.82MB ZIP 举报
温馨提示
精尽thingsboard原始解析
资源详情
资源评论
资源推荐
收起资源包目录
thingsboard:开源物联网平台thingsboard源码分析 (1718个子文件)
.babelrc 98B
install.bat 3KB
upgrade.bat 1KB
install_dev_db.bat 871B
uninstall.bat 142B
actor-system.conf 8KB
thingsboard.conf 2KB
system-data.cql 804KB
schema.cql 22KB
schema_update.cql 9KB
schema_update.cql 3KB
schema_update.cql 3KB
system-test.cql 2KB
rulenode-core-config.css 2KB
Dockerfile 3KB
Dockerfile 799B
Dockerfile 775B
Dockerfile 775B
Dockerfile 712B
tb.env 658B
.eslintrc 287B
dashboard.gif 257KB
.gitignore 417B
.gitignore 43B
.gitignore 27B
.gitignore 12B
build.gradle 4KB
extension-form-mqtt.tpl.html 100KB
extension-form-modbus.tpl.html 80KB
extension-form-opc.tpl.html 50KB
extension-form-http.tpl.html 36KB
dashboard.tpl.html 22KB
widget-config.tpl.html 19KB
widget-editor.tpl.html 18KB
entity-filter.tpl.html 16KB
rulechain.tpl.html 16KB
dashboard-settings.tpl.html 14KB
attribute-table.tpl.html 11KB
widget-action-dialog.tpl.html 10KB
datasource-entity.tpl.html 9KB
extension-table.tpl.html 9KB
relation-table.tpl.html 8KB
manage-dashboard-states.tpl.html 8KB
dashboard.tpl.html 7KB
grid.tpl.html 7KB
alarms-table-widget.tpl.html 7KB
node-script-test.tpl.html 7KB
datasource-func.tpl.html 6KB
add-attribute-dialog.tpl.html 6KB
outgoing-mail-settings.tpl.html 5KB
entities-table-widget.tpl.html 5KB
alarm-details-dialog.tpl.html 5KB
devices.tpl.html 5KB
manage-widget-actions.tpl.html 5KB
rulechains.tpl.html 5KB
entity-aliases.tpl.html 5KB
extension-dialog.tpl.html 5KB
timeseries-table-widget.tpl.html 5KB
select-widget-type.tpl.html 5KB
timewindow-panel.tpl.html 5KB
assets.tpl.html 5KB
add-widget-to-dashboard-dialog.tpl.html 5KB
home.tpl.html 4KB
device-fieldset.tpl.html 4KB
user-fieldset.tpl.html 4KB
add-dashboards-to-customer.tpl.html 4KB
assign-to-customer.tpl.html 4KB
entity-alias-dialog.tpl.html 4KB
assign-to-customer.tpl.html 4KB
add-devices-to-customer.tpl.html 4KB
dashboard-fieldset.tpl.html 4KB
add-assets-to-customer.tpl.html 4KB
audit-log-table.tpl.html 4KB
dashboard-layout.tpl.html 4KB
relation-dialog.tpl.html 4KB
customers.tpl.html 4KB
import-dialog.tpl.html 4KB
edit-attribute-value.tpl.html 4KB
dashboard-state-dialog.tpl.html 4KB
asset-fieldset.tpl.html 3KB
device-credentials.tpl.html 3KB
manage-dashboard-layouts.tpl.html 3KB
rulenode-fieldset.tpl.html 3KB
relation-filters.tpl.html 3KB
datakey-config.tpl.html 3KB
add-widget.tpl.html 3KB
profile.tpl.html 3KB
tenants.tpl.html 3KB
widget-library.tpl.html 3KB
make-dashboard-public-dialog.tpl.html 3KB
material-icons-dialog.tpl.html 3KB
link-fieldset.tpl.html 3KB
change-password.tpl.html 3KB
reset-password.tpl.html 3KB
create-password.tpl.html 3KB
save-widget-type-as.tpl.html 3KB
alarm-table.tpl.html 3KB
kv-map.tpl.html 3KB
login.tpl.html 3KB
event-row-debug-rulenode.tpl.html 3KB
共 1718 条
- 1
- 2
- 3
- 4
- 5
- 6
- 18
穆庭秋
- 粉丝: 25
- 资源: 4671
上传资源 快速赚钱
- 我的内容管理 展开
- 我的资源 快来上传第一个资源
- 我的收益 登录查看自己的收益
- 我的积分 登录查看自己的积分
- 我的C币 登录后查看C币余额
- 我的收藏
- 我的下载
- 下载帮助
最新资源
- Sora AI-关于文生视频的使用场景说明
- suno AI文生视频的相关教程和介绍使用
- 什么是后端开发-关于后端开发的一些小介绍分享
- Jurassic Pack Vol. II Dinosaurs 侏罗纪包卷恐龙二号Unity游戏模型资源unitypackage
- Jurassic Pack Vol. III Dinosaurs 侏罗纪包卷恐龙三号Unity游戏模型资源unitypackag
- Ultimate Seating Controller 终极座椅控制器Unity游戏开发插件资源unitypackage
- 什么是人工智能-关于人工智能的相关介绍说明
- Figma Converter for Unity适用Unity的Figma转换器Unity游戏开发插件unitypackage
- Creepy Animatronic Anims 令人毛骨悚然的电子动画Unity游戏动画插件资源unitypackage
- Rankings & Leaderboards 排名和排行榜Unity游戏开发插件资源unitypackage
资源上传下载、课程学习等过程中有任何疑问或建议,欢迎提出宝贵意见哦~我们会及时处理!
点击此处反馈
安全验证
文档复制为VIP权益,开通VIP直接复制
信息提交成功
评论5