package com.erik.sparkproject.spark.session;
import java.util.ArrayList;
import java.util.Date;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Random;
import org.apache.spark.Accumulator;
import org.apache.spark.SparkConf;
import org.apache.spark.SparkContext;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.api.java.function.Function2;
import org.apache.spark.api.java.function.PairFlatMapFunction;
import org.apache.spark.api.java.function.PairFunction;
import org.apache.spark.api.java.function.VoidFunction;
import org.apache.spark.sql.DataFrame;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SQLContext;
import org.apache.spark.sql.hive.HiveContext;
import com.alibaba.fastjson.JSONObject;
import com.erik.sparkproject.conf.ConfigurationManager;
import com.erik.sparkproject.constant.Constants;
import com.erik.sparkproject.dao.ISessionAggrStatDAO;
import com.erik.sparkproject.dao.ISessionDetailDAO;
import com.erik.sparkproject.dao.ISessionRandomExtractDAO;
import com.erik.sparkproject.dao.ITaskDAO;
import com.erik.sparkproject.dao.ITop10CategoryDAO;
import com.erik.sparkproject.dao.ITop10SessionDAO;
import com.erik.sparkproject.dao.factory.DAOFactory;
import com.erik.sparkproject.domain.SessionAggrStat;
import com.erik.sparkproject.domain.SessionDetail;
import com.erik.sparkproject.domain.SessionRandomExtract;
import com.erik.sparkproject.domain.Task;
import com.erik.sparkproject.domain.Top10Category;
import com.erik.sparkproject.domain.Top10Session;
import com.erik.sparkproject.test.MockData;
import com.erik.sparkproject.util.*;
import com.google.common.base.Optional;
import scala.Tuple2;
/**
* 用户访问session分析spark作业
*
* 接收用户创建的分析任务,用户可能指定的条件如下:
* 1.时间范围:起始日期-结束日期
* 2.性别:男或女
* 3.年龄范围
* 4.职业:多选
* 5.城市:多选
* 6.搜索词:多个搜索词,只要某个session中的任何一个
* action搜索过指定的关键词,那么session就符合条件
* 7.点击品类:多个品类,只要某个session中的任何一个
* action点击过某个品类,那么session就符合条件
*
* 我们的Spark作业如何接受用户创建的任务呢?
* J2EE平台在接收用户创建任务的请求之后,会将任务信息插入MySQL的task表中,
* 任务参数以JSON格式封装在task_param字段中
* 接着J2EE平台执行我们的spark-submit shell脚本,并将taskid作为参数传递给spark-submit shell脚本
* spark-submit shell脚本,在执行时,是可以接收参数的,并且会将接收的参数传递给spark作业的main函数
* 参数就封装在main函数得到args数组中
*
* 这是spark本事提供的特性
*
*
* @author Erik
*
*/
public class UserVisitSessionAnalyzeSpark {
public static void main(String[] args) {
args = new String[]{"2"};
//构建spark上下文
//首先在Constants.java中设置spark作业相关的常量
//String SPARK_APP_NAME = "UserVisitSessionAnalyzeSpark";
//保存Constants.java配置
SparkConf conf = new SparkConf()
.setAppName(Constants.SPARK_APP_NAME)
.setMaster("local");
JavaSparkContext sc = new JavaSparkContext(conf);
SQLContext sqlContext = getSQLContext(sc.sc());
//生成模拟测试数据
mockData(sc, sqlContext);
//创建需要使用的DAO组件
ITaskDAO taskDAO = DAOFactory.getTaskDAO();
//那么就首先得查询出来指定的任务,并获取任务的查询参数
long taskid = ParamUtils.getTaskIdFromArgs(args);
Task task = taskDAO.findById(taskid);
JSONObject taskParam = JSONObject.parseObject(task.getTaskParam());
System.out.println(taskid);
System.out.println(taskParam);
//如果要进行session粒度的数据聚合,
//首先要从user_visit_action表中,查询出来指定日期范围内的数据
JavaRDD<Row> actionRDD = getActionRDDByDateRange(sqlContext, taskParam);
JavaPairRDD<String, Row> sessionid2actionRDD = getSessionid2ActionRDD(actionRDD);
//聚合
//首先,可以将行为数据按照session_id进行groupByKey分组
//此时的数据粒度就是session粒度了,然后可以将session粒度的数据与用户信息数据惊醒join
//然后就可以获取到session粒度的数据,同时数据里面还包含了session对应的user信息
//到这里为止,获取的数据是<sessionid,(sessionid,searchKeywords,
//clickCategoryIds,age,professional,city,sex)>
JavaPairRDD<String, String> sessionid2AggrInfoRDD =
aggregateBySession(sqlContext, actionRDD);
//接着,就要针对session粒度的聚合数据,按照使用者指定的筛选参数进行数据过滤
//相当于我们自己编写的算子,是要访问外面的任务参数对象的
//匿名内部类(算子函数),访问外部对象,是要给外部对象使用final修饰的
//重构,同时进行过滤和统计
Accumulator<String> sessionAggrStatAccumulator = sc.accumulator(
"", new SesssionAggrStatAccumulator());
JavaPairRDD<String, String> filteredSessionid2AggrInfoRDD = filterSessionAndAggrStat(
sessionid2AggrInfoRDD, taskParam, sessionAggrStatAccumulator);
//生成公共RDD:通过筛选条件的session的访问明细数据
JavaPairRDD<String, Row> sessionid2detailRDD = getSessionid2detailRDD(
filteredSessionid2AggrInfoRDD, sessionid2actionRDD);
//对于Accumulator这种分布式累加计算的变量的使用,有一个重要的说明
//从Accumulator中,获取数据,插入数据库的时候,一定要是在某一个action操作以后
//如果没有action的话,那么整个程序根本不会运行
//而且必须把能够触发job执行的操作,放在最终写入MySQL方法之前
//计算出的结果,在J2EE中,是以两张柱状图显示的
/**
* 特别说明
* 我们知道,要将一个功能的session聚合统计数据获取到,就必须是在一个action操作触发job之后才能
* 从Accmulator中获取数据,否则是获取不到数据的,因为没有job执行,Accumulator的值为空
* 所以,我们在这里,将随机抽取功能的实现代码放在session聚合统计功能的最终计算和写库之前
* 因为随机抽取功能中,有一个countByKey算子,是action操作,会触发job
*
*/
randomExtractSession(task.getTaskid(),filteredSessionid2AggrInfoRDD, sessionid2actionRDD);
//计算出各个范围的session占比,并写入MySQL
calculateAndPersistAggrStat(sessionAggrStatAccumulator.value(), task.getTaskid());
/**
* session聚合统计(统计出访问时长和访问步长,各个区间的session数量占总session数量的比例)
*
* 如果不进行重构,直接来实现,思路;
* 1.sessionRDD,映射成<sessioinid,Row>的格式
* 2.按session聚合,计算出每个session的访问时长和访问步长,生成一个新的RDD
* 3.遍历新生成的RDD,将每个session的访问时长和访问步长,去更新自定义Accumulator中对应的值
* 4.使用自定义Accumulator中的统计值,去计算各个区间的比例
* 5.将最后计算出来的结果,写入MySQL对应的表中
*
* 普通实现思路的问题:
* 1.为什么还要用actionRDD去映射?其实之前在session聚合的时候已经做过映射了,多次一举
* 2.是不是一定要为了session的聚合这个功能单独去遍历一遍session?其实没必要,已经有session数据
* 之前过滤session的时候,其实相当于是在遍历session了,那么这里就没
没有合适的资源?快使用搜索试试~ 我知道了~
资源推荐
资源详情
资源评论
收起资源包目录
java+《大型电商项目实战》源码.zip (111个子文件)
UserVisitSessionAnalyzeSpark.class 25KB
MockData.class 6KB
JdbcCRUD.class 5KB
JDBCHelper.class 4KB
SessionAggrStat.class 4KB
UserVisitSessionAnalyzeSpark$3.class 4KB
UserVisitSessionAnalyzeSpark$6.class 4KB
UserVisitSessionAnalyzeSpark$9.class 4KB
UserVisitSessionAnalyzeSpark$25.class 4KB
UserVisitSessionAnalyzeSpark$27.class 4KB
DateUtils.class 3KB
CategorySortKey.class 3KB
SessionDetail.class 3KB
UserVisitSessionAnalyzeSpark$5.class 3KB
UserVisitSessionAnalyzeSpark$28.class 3KB
UserVisitSessionAnalyzeSpark$10.class 3KB
UserVisitSessionAnalyzeSpark$23.class 3KB
UserVisitSessionAnalyzeSpark$24.class 3KB
UserVisitSessionAnalyzeSpark$11.class 3KB
UserVisitSessionAnalyzeSpark$22.class 3KB
StringUtils.class 2KB
Constants.class 2KB
UserVisitSessionAnalyzeSpark$17.class 2KB
UserVisitSessionAnalyzeSpark$20.class 2KB
UserVisitSessionAnalyzeSpark$12.class 2KB
SesssionAggrStatAccumulator.class 2KB
Task.class 2KB
ValidUtils.class 2KB
SessionAggrStatDAOImpl.class 2KB
UserVisitSessionAnalyzeSpark$8.class 2KB
TaskDAOImpl$1.class 2KB
UserVisitSessionAnalyzeSpark$7.class 2KB
UserVisitSessionAnalyzeSpark$14.class 2KB
UserVisitSessionAnalyzeSpark$26.class 2KB
UserVisitSessionAnalyzeSpark$16.class 2KB
UserVisitSessionAnalyzeSpark$19.class 2KB
UserVisitSessionAnalyzeSpark$13.class 2KB
ConfigurationManager.class 2KB
SessionDetailDAOImpl.class 2KB
UserVisitSessionAnalyzeSpark$4.class 2KB
UserVisitSessionAnalyzeSpark$2.class 1KB
UserVisitSessionAnalyzeSpark$1.class 1KB
SessionRandomExtract.class 1KB
DAOFactory.class 1KB
SessinoRandomExtractDAOImpl.class 1KB
TaskDAOImpl.class 1KB
UserVisitSessionAnalyzeSpark$18.class 1KB
UserVisitSessionAnalyzeSpark$15.class 1KB
UserVisitSessionAnalyzeSpark$21.class 1KB
Top10Category.class 1KB
Top10CategoryDAOImpl.class 1KB
Top10SessionDAOImpl.class 1KB
FastjsonTest.class 1KB
ParamUtils.class 1KB
JDBChelperTest.class 1KB
Top10Session.class 1KB
TaskDAOTest.class 1022B
ConfigurationManagerTest.class 829B
AppTest.class 629B
NumberUtils.class 609B
Singleton.class 584B
App.class 557B
JDBCHelper$QueryCallback.class 324B
ISessionRandomExtractDAO.class 229B
ISessionAggrStatDAO.class 214B
ISessionDetailDAO.class 208B
ITop10CategoryDAO.class 208B
ITop10SessionDAO.class 205B
ITaskDAO.class 183B
UserVisitSessionAnalyzeSpark.java 64KB
JdbcCRUD.java 8KB
JDBCHelper.java 7KB
MockData.java 6KB
SessionAggrStat.java 4KB
SesssionAggrStatAccumulator.java 4KB
CategorySortKey.java 4KB
DateUtils.java 3KB
ValidUtils.java 3KB
StringUtils.java 2KB
SessionDetail.java 2KB
JDBChelperTest.java 2KB
Constants.java 2KB
Singleton.java 2KB
Task.java 2KB
SessionAggrStatDAOImpl.java 2KB
DAOFactory.java 1KB
TaskDAOImpl.java 1KB
ConfigurationManager.java 1KB
SessionRandomExtract.java 1KB
SessionDetailDAOImpl.java 1KB
Top10Category.java 930B
ParamUtils.java 894B
SessinoRandomExtractDAOImpl.java 885B
Top10Session.java 785B
Top10CategoryDAOImpl.java 724B
Top10SessionDAOImpl.java 709B
AppTest.java 649B
FastjsonTest.java 627B
TaskDAOTest.java 533B
NumberUtils.java 472B
共 111 条
- 1
- 2
资源评论
GeekyGuru
- 粉丝: 1727
- 资源: 1099
上传资源 快速赚钱
- 我的内容管理 展开
- 我的资源 快来上传第一个资源
- 我的收益 登录查看自己的收益
- 我的积分 登录查看自己的积分
- 我的C币 登录后查看C币余额
- 我的收藏
- 我的下载
- 下载帮助
安全验证
文档复制为VIP权益,开通VIP直接复制
信息提交成功