package com.gzyj.flink.gps.map.beidoualarm;
import cn.hutool.core.collection.CollectionUtil;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import com.alibaba.fastjson.serializer.SerializerFeature;
import com.alibaba.fastjson.support.spring.FastJsonRedisSerializer;
import com.gzyj.flink.dto.alarm.AlarmALLObject;
import com.gzyj.flink.dto.alarm.Kafka_VehicleNormalAlarm;
import com.gzyj.flink.dto.alarm.MonitorClientAlarm;
import com.gzyj.flink.dto.utils.ToolUtils;
import com.gzyj.flink.gps.config.RedisKeyPrefixConfig;
import com.gzyj.flink.gps.factory.MySQLMybatisSessionFactory;
import com.gzyj.flink.gps.factory.RedisSessionFactory;
import com.gzyj.flink.gps.mapper.mysql.MonYanzhongweiguiSetMapper;
import com.gzyj.flink.gps.utils.TimeUtils;
import com.gzyj.flink.pojo.gps.BaobiaoAlarmsummary;
import com.gzyj.flink.pojo.mysql.set.MonYanzhongweiguiSet;
import com.gzyj.flink.veh.VehicleIncrement;
import org.apache.commons.lang3.StringUtils;
import org.apache.flink.api.common.functions.RichFlatMapFunction;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.util.Collector;
import org.apache.ibatis.session.SqlSession;
import org.apache.ibatis.session.SqlSessionFactory;
import org.springframework.data.redis.connection.lettuce.LettuceConnectionFactory;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.data.redis.serializer.StringRedisSerializer;
import java.math.BigDecimal;
import java.util.*;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;
/**
* 北斗终端报警数据处理
*/
public class BeidouZongduanDataProcess extends RichFlatMapFunction<Kafka_VehicleNormalAlarm, Object> {
SqlSessionFactory factory;
LettuceConnectionFactory redisFactory;
@Override
public void open(Configuration parameters) throws Exception {
factory = MySQLMybatisSessionFactory.getSqlSessionFactory();
redisFactory = RedisSessionFactory.getRedisFactory();
}
private RedisTemplate<String, Object> getRedisTemplate(LettuceConnectionFactory redisSessionFactory) {
RedisTemplate<String, Object> redisTemplate = new RedisTemplate<>();
redisTemplate.setConnectionFactory(redisSessionFactory);
redisTemplate.setKeySerializer(new StringRedisSerializer());
redisTemplate.setValueSerializer(new FastJsonRedisSerializer<>(Object.class));
redisTemplate.setHashKeySerializer(new StringRedisSerializer());
redisTemplate.setHashValueSerializer(new FastJsonRedisSerializer<>(Object.class));
redisTemplate.afterPropertiesSet();
return redisTemplate;
}
public VehicleIncrement getVehicleInfo(String vehId, RedisTemplate<String, Object> clusterRedisTemplate) {
Set<String> keys = clusterRedisTemplate.keys(RedisKeyPrefixConfig.RedisKey_VehicleInfo + "*:" + vehId + "*");
VehicleIncrement vehicleInfo = null;
if (CollectionUtil.isNotEmpty(keys)) {
Object obj = clusterRedisTemplate.opsForValue().get(new ArrayList<String>(keys).get(0));
if (obj == null) {
return null;
} else {
vehicleInfo = JSONObject.parseObject(JSON.toJSONString(obj, SerializerFeature.DisableCircularReferenceDetect), VehicleIncrement.class);
}
} else {
return null;
}
return vehicleInfo;
}
public String trimend(String inStr, String suffix) {
while (inStr.endsWith(suffix)) {
inStr = inStr.substring(0, inStr.length() - suffix.length());
}
return inStr;
}
@Override
public void flatMap(Kafka_VehicleNormalAlarm VehAlarm, Collector<Object> collector) throws Exception {
SqlSession session = factory.openSession(true);
List<Object> list=new ArrayList<>();
try {
MonYanzhongweiguiSetMapper monYanzhongweiguiSetMapper = session.getMapper(MonYanzhongweiguiSetMapper.class);
MonYanzhongweiguiSet AlarmDeploy = monYanzhongweiguiSetMapper.selectByVehId(VehAlarm.getVehicleId());
session.commit();
RedisTemplate<String, Object> clusterRedisTemplate = getRedisTemplate(redisFactory);
VehicleIncrement vehinfo = getVehicleInfo(VehAlarm.getVehicleId(), clusterRedisTemplate);
Map<Object, Object> tmp = clusterRedisTemplate.opsForHash().entries(RedisKeyPrefixConfig.RedisKey_CmAlarm + VehAlarm.getVehicleId());
// 获取开始报警数据
ConcurrentHashMap<Object, Object> VehicleAlarm = new ConcurrentHashMap<>(tmp);
String[] sNote = trimend(VehAlarm.Note, ",").split(",");
if (VehAlarm.Note.length() > 0) {
//开始报警
Iterator<Map.Entry<Object, Object>> mapiter = VehicleAlarm.entrySet().iterator();
while (mapiter.hasNext()) {
Map.Entry<Object, Object> entry = mapiter.next();
String sAlarmNote = (String) entry.getKey();
if (sAlarmNote.equalsIgnoreCase("疲劳驾驶")) sAlarmNote = "疲劳驾驶报警";
if (!VehAlarm.Note.contains(sAlarmNote)) {
String sLastalarm = (String) entry.getValue();
AlarmALLObject Lastalarm = JSON.parseObject(sLastalarm, AlarmALLObject.class);
if (VehAlarm.Velocity > Lastalarm.MaxSpeed) Lastalarm.MaxSpeed = VehAlarm.Velocity;
if (VehAlarm.Latitude > 0 && VehAlarm.Longitude > 0) {
Lastalarm.Distance += ToolUtils.GetDistance(Lastalarm.Lon, Lastalarm.Lat, VehAlarm.Longitude, VehAlarm.Latitude);
Lastalarm.Lat = VehAlarm.Latitude;
Lastalarm.Lon = VehAlarm.Longitude;
}
// 结束报警
double KeepTime = TimeUtils.getMinusSecond(VehAlarm.Time, Lastalarm.BeginTime);
int ChaoSuBi = 0;
int status = 1;
if (KeepTime < 0) KeepTime = 0;
// 疲劳驾驶严重违规计算
if (sAlarmNote.equalsIgnoreCase("疲劳驾驶报警")) {
int plsj = -1;
int plsj1 = -1;
int plsj2 = -1;
if (AlarmDeploy != null) {
plsj = AlarmDeploy.getPlcxsj() == null ? -1 : AlarmDeploy.getPlcxsj();
plsj1 = AlarmDeploy.getPlcxsj1() == null ? -1 : AlarmDeploy.getPlcxsj1();
plsj2 = AlarmDeploy.getPlcxsj2() == null ? -1 : AlarmDeploy.getPlcxsj2();
if (plsj1 >= 0) {
plsj1 = plsj1 * 60;
if (KeepTime >= plsj1) status = 1;
}
if (plsj2 >= 0) {
plsj2 = plsj2 * 60;
if (KeepTime >= plsj2) status = 2;
}
if (plsj >= 0) {
plsj = plsj * 60;
if (KeepTime >= plsj) status = 3;
}
}
}
// 超速报警严重违规计算
if (sAlarmNote.equalsIgnoreCase("超速报警")) {
int cssj = -1;
int cssj1 = -1;
int cssj2 = -1;
double csbfb = -1;
double csbfb1 = -1;
double csbfb2 = -1;
if (AlarmDeploy != null) {
// 计算严重违�
没有合适的资源?快使用搜索试试~ 我知道了~
温馨提示
该项目是一款基于Java语言的公路运输协议数据处理设计源码,包含274个文件,其中包括244个Java源代码文件、24个XML配置文件、3个Git忽略文件以及2个属性文件,旨在实现高效的协议数据处理功能。
资源推荐
资源详情
资源评论
收起资源包目录
基于Java语言的公路运输协议数据处理设计源码 (274个子文件)
.gitignore 496B
.gitignore 496B
.gitignore 496B
BeidouZongduanDataProcess.java 25KB
AnbiaoVehicle.java 20KB
GPSApp.java 16KB
HexBytesUtil.java 15KB
TimeUtils.java 14KB
TimeUtils.java 14KB
TimeUtils.java 14KB
BeidouZongDuanAlarm2KafkaVehicleNormalAlarmFun.java 13KB
IsInDatabase.java 13KB
GPSPosition.java 13KB
GPSPosition.java 13KB
MysqlVehicleSource.java 12KB
VehicleSource.java 12KB
ZdaqService.java 12KB
Server808Byte2MsgInHandler.java 12KB
TCmd020064Process.java 11KB
GPSService.java 11KB
BeidouPlatformAlarmProcess.java 11KB
AbstractResultDeserializer.java 11KB
AnbiaoVehicle.java 10KB
DaoludengjiBroadcastFun.java 9KB
YjGpsAddrService.java 8KB
JTT808_0x0100_Handler.java 8KB
JTT808_0x0200.java 8KB
JTT808_0x0200_0x64.java 6KB
JTT808_0x0200_0x65.java 6KB
BeidouZongDuanAlarmFlow.java 6KB
JTT808_0x0200_0x64.java 6KB
TCmd020065Process.java 6KB
TsSafeAlarmSet.java 6KB
JTT808_0x0200.java 6KB
Package.java 6KB
JTT808_0x0200_0x65.java 6KB
Package.java 6KB
Package.java 6KB
JTT808_0x0200_Handler.java 5KB
JTT808_0x0702.java 5KB
JTT808_0x0702.java 5KB
BeidouAlarmProcess.java 5KB
JTT808_0x0200_0x70.java 4KB
ZdaqFlow.java 4KB
JTT808_0x0702_Handler.java 4KB
GnssDataBasicInfo.java 4KB
GnssDataBasicInfo.java 4KB
JTT808_0x0200.java 4KB
KafkaConsumerConfig.java 4KB
JTT808_0x0200_0x67.java 4KB
JTT808_0x0200_0x67.java 4KB
RoadUtils.java 4KB
CommonService.java 4KB
DriverInfoSink.java 3KB
JTT808_0x0200_0x66.java 3KB
JTT808_0x0200_0x66.java 3KB
GnssDataBasicInfo.java 3KB
AlarmFlag.java 3KB
RedisConfig.java 3KB
DriverLoginInfoSource.java 3KB
AlarmFlag.java 3KB
BaobiaoAlarmsummary.java 3KB
AlarmFlag.java 3KB
ProtocolDecoder.java 3KB
MonDaoludengjiSetSource.java 3KB
Netty808Server.java 3KB
LogConfig.java 3KB
DriverApp.java 3KB
JTT808_0x0100.java 3KB
VehInfoProcessFun.java 3KB
JTT808_0x0100.java 3KB
PointUtils.java 3KB
PointUtils.java 3KB
PackageHeader.java 3KB
JTT808_0x8300.java 3KB
RedisSessionFactory.java 3KB
RedisSessionFactory.java 3KB
PackageHeader.java 3KB
JTT808_0x0100.java 3KB
VehicleIncrement.java 3KB
PackageHeader.java 2KB
DispatcherHandler.java 2KB
ProtocolServer.java 2KB
NetWorkSerrvice.java 2KB
VehicleIncrement.java 2KB
JTT808_0x9208.java 2KB
NettyConfig.java 2KB
JTT808_0x0702.java 2KB
MonitorClientAlarm.java 2KB
MonYanzhongweiguiSet.java 2KB
HeaderMsgBodyProperty.java 2KB
JTT808_0x0200_0x11.java 2KB
Common808HandlerFactory.java 2KB
MybatisGPSSink.java 2KB
AlarmIdentificationProperty.java 2KB
AlarmIdentificationProperty.java 2KB
Kafka_VehicleNormalAlarm.java 2KB
YjGpsAddr.java 2KB
HeaderMsgBodyProperty.java 2KB
HeaderMsgBodyProperty.java 2KB
共 274 条
- 1
- 2
- 3
资源评论
csbysj2020
- 粉丝: 2593
- 资源: 5497
上传资源 快速赚钱
- 我的内容管理 展开
- 我的资源 快来上传第一个资源
- 我的收益 登录查看自己的收益
- 我的积分 登录查看自己的积分
- 我的C币 登录后查看C币余额
- 我的收藏
- 我的下载
- 下载帮助
最新资源
资源上传下载、课程学习等过程中有任何疑问或建议,欢迎提出宝贵意见哦~我们会及时处理!
点击此处反馈
安全验证
文档复制为VIP权益,开通VIP直接复制
信息提交成功