package spark.skynet;
import MockData.MockData;
import com.alibaba.fastjson.JSONObject;
import com.google.common.base.Optional;
import org.apache.spark.Accumulator;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.api.java.function.PairFlatMapFunction;
import org.apache.spark.api.java.function.PairFunction;
import org.apache.spark.api.java.function.VoidFunction;
import org.apache.spark.broadcast.Broadcast;
import org.apache.spark.sql.DataFrame;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SQLContext;
import scala.Tuple2;
import spark.conf.ConfigurationManager;
import spark.constant.Constants;
import spark.dao.IMonitorDAO;
import spark.dao.ITaskDAO;
import spark.dao.factory.DAOFactory;
import spark.domain.MonitorState;
import spark.domain.Task;
import spark.domain.TopNMonitor2CarCount;
import spark.domain.TopNMonitorDetailInfo;
import spark.skynet.accumlator.MonitorAndCameraStateAccumulator;
import spark.skynet.sort4key.SpeedSortKey;
import utils.ParamUtils;
import utils.SparkUtils;
import utils.StringUtils;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Iterator;
import java.util.List;
/**
* 【卡口流量监控模块】
* 1.检测卡扣状态
* 2.获取车流排名前N的卡扣号
* 3.数据库保存累加器5个状态(正常卡扣数,异常卡扣数,正常摄像头数,异常摄像头数,异常摄像头的详细信息)
* 4.topN 卡口的车流量具体信息存库
* 5.获取高速通过的TOPN卡扣
* 6.获取车辆高速通过的TOPN卡扣,每一个卡扣中车辆速度最快的前10名
* 7.区域碰撞分析
* 8.卡扣碰撞分析
*
* 【作业提交】
* ./spark-submit --master spark://mini:7077
* --class spark.skynet.MonitorFlowAnalyze
* --driver-class-path ../lib/mysql-connector-java-5.1.6.jar:../lib/fastjson-1.2.11.jar
* --jars ../lib/mysql-connector-java-5.1.6.jar,../lib/fastjson-1.2.11.jar
* ../lib/ProduceData2Hive.jar
* 1
*/
public class MonitorFlowAnalyze {
public static void main(String[] args) {
//构建Spark运行时的环境参数
SparkConf conf = new SparkConf().setAppName(Constants.SPARK_APP_NAME_SESSION);
//指定Master
SparkUtils.setMaster(conf);
JavaSparkContext sparkContext = new JavaSparkContext(conf);
/**
* 查看配置文件是否是本地测试,若是本地测试那么创建一个SQLContext 如果是集群测试HiveContext
*/
SQLContext sqlContext = SparkUtils.getSQLContext(sparkContext);
/**
* 基于本地测试生成模拟测试数据,如果在集群中运行的话,直接操作Hive中的表就可以
* 本地模拟数据注册成一张临时表
* monitor_flow_action 数据表:监控车流量所有数据
* monitor_camera_info 标准表:卡扣对应摄像头标准表
*/
if(ConfigurationManager.getBoolean(Constants.SPARK_LOCAL)){
//本地
MockData.mock(sparkContext,sqlContext);
}else{
//集群
sqlContext.sql("use traffic");
}
/**------------------------------------------------------------------------------------*/
/**------------------------------------------------------------------------------------*/
/**
* 给定一个时间段,统计出卡口数量的正常数量,异常数量,还有通道数
* 异常数:每一个卡口都会有n个摄像头对应每一个车道,如果这一段时间内卡口的信息没有第N车道的信息的话就说明这个卡口存在异常。
* 这需要拿到一份数据(每一个卡口对应的摄像头的编号),模拟数据在monitor_camera_info临时表中
*
* 从配置文件my.properties中拿到spark.local.taskId.monitorFlow的taskId
*/
long taskId = ParamUtils.getTaskIdFromArgs(args, Constants.SPARK_LOCAL_TASKID_MONITOR);
if(taskId == 0L){
System.out.println("args is null.....");
return;
}
/**
* 获取ITaskDAO的对象,通过taskId查询出来的数据封装到Task(自定义)对象
*/
ITaskDAO taskDAO = DAOFactory.getTaskDAO();
Task task = taskDAO.findTaskById(taskId);
if (task == null) return;
/**
* task.getTaskParams()是一个string格式的字符串 封装到taskParamsJsonObject
* 将 task_parm字符串转换成json格式数据。
*/
JSONObject jsonObject = JSONObject.parseObject(task.getTaskParams());
/**
* 通过params(json字符串)查询monitor_flow_action
*
* 获取指定日期内检测的monitor_flow_action中车流量数据,返回JavaRDD<Row>
*/
JavaRDD<Row> cameraRDD = SparkUtils.getCameraRDDByDateRange(sqlContext, jsonObject);
/**
* 持久化
*/
cameraRDD = cameraRDD.cache();
/**
* 自定义累加器,维护多个值
*/
Accumulator<String> accumulator = sparkContext.accumulator("", new MonitorAndCameraStateAccumulator());
/**
* 将row类型的RDD 转换成kv格式的RDD k:monitor_id v:row
*/
JavaPairRDD<String, Row> monitor2DetailRDD = getMonitor2DetailRDD(cameraRDD);
/**
* monitor2DetailRDD进行持久化
*/
monitor2DetailRDD = monitor2DetailRDD.cache();
/**
* 按照卡扣号分组,对应的数据是:每个卡扣号(monitor)对应的Row信息
* 由于一共有9个卡扣号,这里groupByKey后一共有9组数据。
*/
JavaPairRDD<String, Iterable<Row>> stringIterableJavaPairRDD = monitor2DetailRDD.groupByKey();
//持久化
stringIterableJavaPairRDD = stringIterableJavaPairRDD.cache();
/**
* 遍历分组后的RDD,拼接字符串
* 数据中一共就有9个monitorId信息,那么聚合之后的信息也是9条
* monitor_id=|cameraIds=|area_id=|camera_count=|carCount=
* 例如:("0005","monitorId=0005|areaId=02|camearIds=09200,03243,02435,03232|cameraCount=4|carCount=100")
*/
JavaPairRDD<String, String> aggregateMonitorId2DetailRDD = aggreagteByMonitor(stringIterableJavaPairRDD);
/**
* 检测卡扣状态
* carCount2MonitorRDD
* K:car_count V:monitor_id
* RDD(卡扣对应车流量总数,对应的卡扣号)
* 自定义累加器计算路口摄像头各种情况
*/
JavaPairRDD<Integer, String> carCount2MonitorRDD = checkMonitorState(sparkContext,sqlContext,aggregateMonitorId2DetailRDD,taskId,jsonObject,accumulator);
//持久化
carCount2MonitorRDD = carCount2MonitorRDD.cache();
//action 类算子触发以上操作,接下来需要读取累加器数据
carCount2MonitorRDD.count();
/**
* 往数据库表 monitor_state 中保存 累加器累加的五个状态
*/
saveMonitorState(taskId,accumulator);
/**------------------------------------------------------------------------------------*/
/**------------------------------------------------------------------------------------*/
/**
* 获取车流排名前N的卡扣号
* 并放入数据库表 topn_monitor_car_count 中
* return KV格式的RDD K:monitor_id V:monitor_id
* 返回的是topN的(monitor_id,monitor_id)
*/
JavaPairRDD<String, String> topNMonitor2CarFlow = getTopNMonitorCarFlow(sparkContext,taskId,jsonObject,carCount2MonitorRDD);
/**
* 获取topN卡口的车流量具体信息,存入数据库表 topn_monitor_detail_info 中
*/
getTopNDetails(taskI
基于Spark+hive的交通智能研判系统.zip
需积分: 0 141 浏览量
2023-09-03
20:58:25
上传
评论
收藏 947KB ZIP 举报
程皮
- 粉丝: 267
- 资源: 2567
最新资源
- student.sql
- 一个简单的卷积神经网络(Convolutional Neural Network, CNN)示例
- EXP9-1-dataPageDisplay.php
- buildType设计模式课程设计
- 省市区下拉选择:3个el-select(json)
- 堆排序(Heap Sort)是一种基于比较的排序算法
- ebatis 是一个简单方便上手的声明式 Elasticsearch ORM 框架
- 威纶通触摸屏编程软件Easy builder pro V6.09.02安装包(2024.06).txt
- ES查询客户端,elasticsearch可视化工具 elasticsearch查询客户端
- html css js网页制作实例 dldtdd实现列表功能
资源上传下载、课程学习等过程中有任何疑问或建议,欢迎提出宝贵意见哦~我们会及时处理!
点击此处反馈