package cn.edu.hust.session;
import cn.edu.hust.conf.ConfigurationManager;
import cn.edu.hust.constant.Constants;
import cn.edu.hust.dao.TaskDao;
import cn.edu.hust.dao.factory.DaoFactory;
import cn.edu.hust.domain.*;
import cn.edu.hust.mockData.MockData;
import cn.edu.hust.util.*;
import com.alibaba.fastjson.JSONObject;
import org.apache.spark.Accumulator;
import org.apache.spark.SparkConf;
import org.apache.spark.SparkContext;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.*;
import org.apache.spark.sql.DataFrame;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SQLContext;
import org.apache.spark.sql.hive.HiveContext;
import org.apache.spark.storage.StorageLevel;
import scala.Tuple2;
import java.util.*;
/**
* 用户可以查询的范围包含
* 1。用户的职业
* 2。用户的性别
* 3。用户城市
* 4。用户年龄
* 5。获取搜索词
* 6。获取点击品类
*/
public class UserVisitAnalyze {
public static void main(String[] args)
{
args=new String[]{"1"};
/**
* 构建spark上下文
*/
SparkConf conf=new SparkConf().setAppName(Constants.APP_NAME_SESSION).setMaster("local[3]");
JavaSparkContext context=new JavaSparkContext(conf);
SQLContext sc=getSQLContext(context.sc());
//生成模拟数据
mock(context,sc);
//拿到相应的Dao组建
TaskDao dao= DaoFactory.getTaskDao();
//从外部传入的参数获取任务的id
Long taskId=ParamUtils.getTaskIdFromArgs(args);
//从数据库中查询出相应的task
Task task=dao.findTaskById(taskId);
JSONObject jsonObject=JSONObject.parseObject(task.getTaskParam());
//获取指定范围内的Sesssion
JavaRDD<Row> sessionRangeDate=getActionRDD(sc,jsonObject);
//这里增加一个新的方法,主要是映射
JavaPairRDD<String,Row> sessionInfoPairRDD=getSessonInfoPairRDD(sessionRangeDate);
//重复用到的RDD进行持久化
sessionInfoPairRDD.persist(StorageLevel.DISK_ONLY());
//上面的两个RDD是
//按照Sesson进行聚合
JavaPairRDD<String,String> sesssionAggregateInfoRDD=aggregateBySessionId(sc,sessionInfoPairRDD);
//通过条件对RDD进行筛选
// 重构,同时统计
Accumulator<String> sessionAggrStatAccumulator=context.accumulator("",new SessionAggrStatAccumulator());
//在进行accumulator之前,需要aciton动作,不然会为空
JavaPairRDD<String,String> filteredSessionRDD=filterSessionAndAggrStat(sesssionAggregateInfoRDD,jsonObject,sessionAggrStatAccumulator);
//重复用到的RDD进行持久化
filteredSessionRDD.persist(StorageLevel.DISK_ONLY());
//获取符合过滤条件的全信息公共RDD
JavaPairRDD<String, Row> commonFullClickInfoRDD=getFilterFullInfoRDD(filteredSessionRDD,sessionInfoPairRDD);
//重复用到的RDD进行持久化
commonFullClickInfoRDD.persist(StorageLevel.DISK_ONLY());
//session聚合统计,统计出访问时长和访问步长的各个区间所占的比例
/**
* 重构实现的思路:
* 1。不要去生成任何的新RDD
* 2。不要去单独遍历一遍sesion的数据
* 3。可以在聚合数据的时候可以直接计算session的访问时长和访问步长
* 4。在以前的聚合操作中,可以在以前的基础上进行计算加上自己实现的Accumulator来进行一次性解决
* 开发Spark的经验准则
* 1。尽量少生成RDD
* 2。尽量少对RDD进行蒜子操作,如果可能,尽量在一个算子里面,实现多个需求功能
* 3。尽量少对RDD进行shuffle算子操作,比如groupBykey、reduceBykey、sortByKey
* shuffle操作,会导致大量的磁盘读写,严重降低性能
* 有shuffle的算子,和没有shuffle的算子,甚至性能相差极大
* 有shuffle的算子,很容易造成性能倾斜,一旦数据倾斜,简直就是性能杀手
* 4。无论做什么功能,性能第一
* 在大数据项目中,性能最重要。主要是大数据以及大数据项目的特点,决定了大数据的程序和项目速度,都比较满
* 如果不考虑性能的话,就会导致一个大数据处理程序运行长达数个小时,甚至是数个小时,对用户的体验,简直是
* 一场灾难。
*/
/**
* 使用CountByKey算子实现随机抽取功能
*/
randomExtractSession(taskId,filteredSessionRDD,sessionInfoPairRDD);
//在使用Accumulutor之前,需要使用Action算子,否则获取的值为空,这里随机计算
//filteredSessionRDD.count();
//计算各个session占比,并写入MySQL
calculateAndPersist(sessionAggrStatAccumulator.value(),taskId);
//获取热门品类数据Top10
List<Tuple2<CategorySortKey,String>> top10CategoryIds=getTop10Category(taskId,commonFullClickInfoRDD);
//获取热门每一个品类点击Top10session
getTop10Session(context,taskId,sessionInfoPairRDD,top10CategoryIds);
//关闭spark上下文
context.close();
}
/**
* 用于判断是否是生产环境
* @param sc
* @return
*/
public static SQLContext getSQLContext(SparkContext sc)
{
boolean local= ConfigurationManager.getBoolean(Constants.SPARK_LOCAL);
if(local)
{
return new SQLContext(sc);
}
return new HiveContext(sc);
}
private static void mock(JavaSparkContext context,SQLContext sc)
{
boolean local= ConfigurationManager.getBoolean(Constants.SPARK_LOCAL);
if(local)
{
MockData.mock(context,sc);
}
}
/**
* 获取指定日期范围内的数据
* @param sc
* @param taskParam
* @return
*/
private static JavaRDD<Row> getActionRDD(SQLContext sc, JSONObject taskParam)
{
String startTime=ParamUtils.getParam(taskParam,Constants.PARAM_STARTTIME);
String endTime=ParamUtils.getParam(taskParam,Constants.PARAM_ENDTIME);
String sql="select *from user_visit_action where date>='"+startTime+"' and date<='"+endTime+"'";
DataFrame df=sc.sql(sql);
return df.javaRDD();
}
/**
* 将数据进行映射成为Pair,键为SessionId,Value为Row
* @param sessionRangeDate
* @return
*/
private static JavaPairRDD<String,Row> getSessonInfoPairRDD(JavaRDD<Row> sessionRangeDate) {
return sessionRangeDate.mapToPair(new PairFunction<Row, String, Row>() {
@Override
public Tuple2<String, Row> call(Row row) throws Exception {
return new Tuple2<String, Row>(row.getString(2),row);
}
});
}
/**
* session粒度的聚合
* @param sc
* @param sessionInfoPairRDD
* @return
*/
private static JavaPairRDD<String,String> aggregateBySessionId(SQLContext sc, JavaPairRDD<String, Row> sessionInfoPairRDD) {
/**
* 先将数据映射成map格式
*/
/**
*
代码重构
JavaPairRDD<String,Row> sessionActionPair=sessionRangeDate.mapToPair(new PairFunction<Row, String,Row>() {
@Override
public Tuple2<String, Row> call(Row row) throws Exception {
return new Tuple2<String, Row>(row.getString(2),row);
}
});*/
/**
没有合适的资源?快使用搜索试试~ 我知道了~
温馨提示
1.基于Spark开发的平台 2.需要有spark基础 3.有很多高级知识和设计模式 4.电商用户行为分析大数据平台(项目名称) 5.访问行为,购物行为,广告点击行为,对这些行为进行分析,使用大数据技术来帮助公司提升业绩。 6.主要的功能模块有用户session分析,页面单跳转化率统计,热门商品离线统计,广告流量实时统计等4个业务模块。 7.所使用的知识点是spark core,spark SQL,spark streaming等三个技术框架。 8.主要是数据倾斜,线上故障,性能调优,troubleshooting等经验。 9.使用模拟数据,希望达到的效果。 10.需求分析,方案设计,数据设计,编码实现,测试以及性能调优等环节。
资源推荐
资源详情
资源评论
收起资源包目录
基于Spark的电商用户行为分析大数据平台项目源码+数据集.rar (236个子文件)
.gitattributes 68B
.gitignore 294B
UserActionAnalyzePlatform.iml 18KB
UserVisitAnalyze.java 53KB
SessionAggrStat.java 6KB
MockData.java 5KB
JDBCHelper.java 5KB
CategorySortKey.java 4KB
DateUtils.java 4KB
SessionDetail.java 3KB
ValidUtils.java 3KB
Constants.java 3KB
SessionAggrStatDaoImpl.java 3KB
StringUtils.java 2KB
Task.java 2KB
SessionDetailDaoImpl.java 2KB
SessionAggrStatAccumulator.java 2KB
ConfigurationManager.java 2KB
SessionRandomExtract.java 2KB
TaskDaoImpl.java 1KB
Top10CategoryDaoImpl.java 1KB
JDBCHelperTest.java 1KB
Top10Category.java 1KB
Top10CategorySession.java 1KB
Singleton.java 1KB
SessionRandomExtractDaoImpl.java 991B
DaoFactory.java 959B
Top10CategorySessionDaoImpl.java 949B
SessionRandomExtractDaoTest.java 895B
ParamUtils.java 884B
ParamUtilsTest.java 514B
NumberUtils.java 462B
SessionAggrDao.java 432B
SessionDetailTest.java 427B
FastJsonTest.java 407B
TaskDaoTest.java 349B
SessionAggrStatDao.java 309B
SessionDetailDao.java 297B
ConfigurationManagerTest.java 280B
Top10CategoryDao.java 247B
Top10CategorySessionDao.java 228B
SessionRandomExtractDao.java 228B
TaskDao.java 127B
LICENSE 11KB
.name 25B
任务总体流程.png 514KB
map端内存缓冲与reduce端占比.png 205KB
shuffle示意图.png 154KB
合并map端输出文件.png 153KB
map端合并文件.png 150KB
2019_02_22_1933083452.png 24KB
conf.properties 205B
pom.xml 5KB
workspace.xml 3KB
Maven__org_apache_hadoop_hadoop_mapreduce_client_jobclient_2_6_4.xml 706B
Maven__org_apache_directory_server_apacheds_kerberos_codec_2_0_0_M15.xml 704B
Maven__com_fasterxml_jackson_module_jackson_module_scala_2_10_2_4_4.xml 694B
Maven__org_apache_hadoop_hadoop_mapreduce_client_shuffle_2_6_4.xml 692B
Maven__org_eclipse_jetty_orbit_javax_servlet_3_0_0_v201112011016.xml 688B
Maven__org_apache_hadoop_hadoop_mapreduce_client_common_2_6_4.xml 685B
compiler.xml 677B
Maven__org_apache_hadoop_hadoop_mapreduce_client_core_2_6_4.xml 671B
Maven__org_apache_hadoop_hadoop_mapreduce_client_app_2_6_4.xml 664B
Maven__org_apache_calcite_calcite_avatica_1_2_0_incubating.xml 661B
Maven__org_apache_parquet_parquet_format_2_3_0_incubating.xml 654B
Maven__org_apache_calcite_calcite_linq4j_1_2_0_incubating.xml 654B
Maven__org_apache_curator_curator_client_2_1_0_incubating.xml 654B
Maven__org_apache_spark_spark_network_shuffle_2_10_1_5_1.xml 653B
Maven__org_apache_spark_spark_streaming_kafka_2_10_1_5_1.xml 653B
Maven__org_apache_hadoop_hadoop_yarn_server_common_2_6_4.xml 650B
Maven__org_apache_spark_spark_network_common_2_10_1_5_1.xml 646B
Maven__com_fasterxml_jackson_core_jackson_annotations_2_4_3.xml 644B
Maven__org_apache_calcite_calcite_core_1_2_0_incubating.xml 640B
Maven__org_spark_project_hive_hive_metastore_1_2_1_spark.xml 635B
Maven__org_apache_directory_server_apacheds_i18n_2_0_0_M15.xml 634B
Maven__commons_beanutils_commons_beanutils_core_1_8_0.xml 629B
Maven__org_tachyonproject_tachyon_underfs_local_0_7_1.xml 626B
Maven__commons_configuration_commons_configuration_1_6.xml 624B
Maven__com_fasterxml_jackson_core_jackson_databind_2_4_3.xml 623B
Maven__org_codehaus_jackson_jackson_mapper_asl_1_9_13.xml 620B
Maven__org_tachyonproject_tachyon_underfs_hdfs_0_7_1.xml 619B
Maven__commons_collections_commons_collections_3_2_2.xml 616B
Maven__org_apache_directory_api_api_asn1_api_1_0_0_M20.xml 615B
Maven__org_apache_spark_spark_streaming_2_10_1_5_1.xml 611B
Maven__com_esotericsoftware_reflectasm_reflectasm_shaded_1_07.xml 608B
Maven__org_codehaus_jackson_jackson_core_asl_1_9_13.xml 606B
Maven__org_apache_spark_spark_launcher_2_10_1_5_1.xml 604B
Maven__org_apache_spark_spark_catalyst_2_10_1_5_1.xml 604B
Maven__net_hydromatic_eigenbase_properties_1_1_5.xml 603B
Maven__io_dropwizard_metrics_metrics_graphite_3_1_2.xml 603B
Maven__org_apache_hadoop_hadoop_annotations_2_6_4.xml 601B
Maven__org_apache_hadoop_hadoop_yarn_common_2_6_4.xml 601B
Maven__org_apache_hadoop_hadoop_yarn_client_2_6_4.xml 601B
Maven__org_datanucleus_datanucleus_api_jdo_3_2_6.xml 600B
Maven__org_spark_project_hive_hive_exec_1_2_1_spark.xml 600B
Maven__com_twitter_parquet_hadoop_bundle_1_6_0.xml 598B
Maven__org_apache_curator_curator_framework_2_4_0.xml 598B
Maven__org_apache_parquet_parquet_generator_1_7_0.xml 598B
Maven__org_uncommons_maths_uncommons_maths_1_2_2a.xml 595B
Maven__com_fasterxml_jackson_core_jackson_core_2_4_3.xml 595B
共 236 条
- 1
- 2
- 3
资源评论
shangjg3
- 粉丝: 1033
- 资源: 101
下载权益
C知道特权
VIP文章
课程特权
开通VIP
上传资源 快速赚钱
- 我的内容管理 展开
- 我的资源 快来上传第一个资源
- 我的收益 登录查看自己的收益
- 我的积分 登录查看自己的积分
- 我的C币 登录后查看C币余额
- 我的收藏
- 我的下载
- 下载帮助
最新资源
- 物模块模型代码,前往设计物模块所属
- Java面试手册,助力大家面试过五关斩六将,面试成功
- HITK0303MP-VB一款P-Channel沟道SOT23的MOSFET晶体管参数介绍与应用说明
- mybatis动态sql之xml增删改查批量操作示例EmpMapper.xml
- C/C++内存检测工具Sanitizers
- HITK0302MP-VB一款N-Channel沟道SOT23的MOSFET晶体管参数介绍与应用说明
- 宝塔批量建站工具,很优秀的宝塔管理工具,基于宝塔api
- HITK0204MP-VB一款N-Channel沟道SOT23的MOSFET晶体管参数介绍与应用说明
- azeryhgtfxhj
- 操作系统实验页面置换算法
资源上传下载、课程学习等过程中有任何疑问或建议,欢迎提出宝贵意见哦~我们会及时处理!
点击此处反馈
安全验证
文档复制为VIP权益,开通VIP直接复制
信息提交成功