package com.twq;
import java.io.Serializable;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.PreparedStatement;
import java.util.Iterator;
import java.util.List;
import org.apache.log4j.Logger;
import org.apache.log4j.Level;
import org.apache.spark.HashPartitioner;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.api.java.function.Function2;
import org.apache.spark.api.java.function.PairFunction;
import org.apache.spark.api.java.function.VoidFunction;
import org.apache.spark.streaming.Duration;
import org.apache.spark.streaming.api.java.JavaDStream;
import org.apache.spark.streaming.api.java.JavaPairDStream;
import org.apache.spark.streaming.api.java.JavaReceiverInputDStream;
import org.apache.spark.streaming.api.java.JavaStreamingContext;
import org.apache.spark.api.java.Optional;
import scala.Tuple2;
//import com.google.common.base.Optional;
public class StreamingGoods implements Serializable{
//define file for saving pre rdd data
public static String checkpointDir="checkDir";
public static boolean flag=false;
public static void main(String[] args) throws InterruptedException {
//setLogger
Logger.getLogger("org").setLevel(Level.OFF);
Logger.getLogger("com").setLevel(Level.OFF);
System.setProperty("spark.ui.showConsoleProgress","false");
Logger.getRootLogger().setLevel(Level.OFF);
//setmaster>=2
SparkConf sparkconf=new SparkConf().setAppName("StreamingGoods").setMaster("local[2]");
//Spark Streaming需要指定处理数据的时间间隔
JavaStreamingContext jsc=new JavaStreamingContext(sparkconf,new Duration(5000));
//恢复点
jsc.checkpoint(checkpointDir);
JavaReceiverInputDStream<String> jds=jsc.socketTextStream("127.0.0.1", 9999);
//获取数据
JavaDStream<String> mess=jds.map(new Function<String,String>(){
private static final long serialVersionUID=1L;
public String call(String arg0) throws Exception{
if(arg0.isEmpty()){
System.out.println("没有数据流进来。。。。。。");
}
return arg0;
}
});
//mess.print(30); //打印接收到的数据
//分割数据以及计算关注度
JavaPairDStream<String,Double> splitmess=jds.mapToPair(new PairFunction<String,String,Double>(){
private static final long serialVersionUID=1L;
public Tuple2<String,Double> call(String line) throws Exception{
String[] lineSplit=line.toString().split("::");
if(lineSplit.length==0){
flag=true;
}
//权重计算:浏览次数0.15 停留时间0.15 收藏0.2 购买件数0.2 评分0.3 合为1
Double followValue=Double.parseDouble(lineSplit[2])*0.15+Double.parseDouble(lineSplit[3])*0.15+
Double.parseDouble(lineSplit[4])*0.2+Double.parseDouble(lineSplit[5])*0.2+
Double.parseDouble(lineSplit[4])*0.3;
return new Tuple2<String,Double>(lineSplit[1],followValue);
}
});
//更新关注度
JavaPairDStream<String,Double> updateFollowValue=splitmess.updateStateByKey(
new Function2<List<Double>,Optional<Double>,Optional<Double>>(){
public Optional<Double> call(List<Double> newValues,Optional<Double> startValue) throws Exception{
Double updateValue=startValue.or(0.0);
for(Double values : newValues) {
updateValue+=values;
}
return Optional.of(updateValue);
}
});
//将updateFollowValue写进数据库
updateFollowValue.foreachRDD(new VoidFunction<JavaPairRDD<String, Double>>() {
@Override
public void call(JavaPairRDD<String, Double> stringDoubleJavaPairRDD) throws Exception {
if(stringDoubleJavaPairRDD.isEmpty()||flag){
System.out.println("没有数据流入!!!");
}else{
JavaPairRDD<String,Double> printdata=stringDoubleJavaPairRDD.mapToPair(new PairFunction<Tuple2<String, Double>, String, Double>() {
@Override
public Tuple2<String, Double> call(Tuple2<String, Double> stringDoubleTuple2) throws Exception {
//System.out.println(stringDoubleTuple2._1+" "+stringDoubleTuple2._2);
return new Tuple2<String, Double>(stringDoubleTuple2._1,stringDoubleTuple2._2);
}
});
printdata.foreachPartition(new VoidFunction<Iterator<Tuple2<String, Double>>>() {
@Override
public void call(Iterator<Tuple2<String, Double>> tuple2Iterator) throws Exception {
Connection connection=null;
try{
connection=ConnectionPool.getConnection();
String sql="insert into followData(itemID,followValue) values(?,?) on duplicate key update followValue=?";
PreparedStatement pst=connection.prepareStatement(sql);
System.out.println();
while(tuple2Iterator.hasNext()){
Tuple2<String,Double> t2=tuple2Iterator.next();
//打印结果
System.out.println(t2._1+" "+t2._2);
pst.setInt(1,Integer.parseInt(t2._1));
pst.setDouble(2,t2._2);
pst.setDouble(3,t2._2);
pst.executeUpdate();
}
pst.close();
}finally {
ConnectionPool.returnConnection(connection);
}
}
});
/*printdata.foreach(new VoidFunction<Tuple2<String, Double>>() {
@Override
public void call(Tuple2<String, Double> stringDoubleTuple2) throws Exception {
System.out.println(stringDoubleTuple2._1+" "+stringDoubleTuple2._2);
}
});*/
}
}
});
//滑动窗口
JavaPairDStream<String,Double> windowFollowValue=splitmess.window(new Duration(10000),new Duration(5000));
windowFollowValue.foreachRDD(new VoidFunction<JavaPairRDD<String, Double>>() {
@Override
public void call(JavaPairRDD<String, Double> stringDoubleJavaPairRDD) throws Exception {
if(stringDoubleJavaPairRDD.isEmpty()||flag){
System.out.println("滑动窗口未满");
}else{
JavaPairRDD<String,Double> printdata=stringDoubleJavaPairRDD.mapToPair(new PairFunction<Tuple2<String, Double>, String, Double>() {
@Override
public Tuple2<String, Double> call(Tuple2<String, Double> stringDoubleTuple2) throws Exception {
//System.out.println(stringDoubleTuple2._1+" "+stringDoubleTuple2._2);
return new Tuple2<String, Double>(stringDoubleTuple2._1,stringDoubleTuple2._2);
}
});
printdata.foreachPartition(new VoidFunction<Iterator<Tuple2<String, Double>>>() {
@Override
public void call(Iterator<Tuple2<String, Double>> tuple2Iterator) throws Exception {
Connection connection=null;
try{
connection=ConnectionPool.getConnection();
String sql="insert into windowFollowData(itemID,followValue) values(?,?) on duplicate key update followValue=?";
String singeSql="insert into singleFollowValue(windowFollowValue) values(?)";
PreparedStatement pst=connection.prepareStatement(sql);
PreparedStatement pst2=connection.prepareStatement(singeSql);
System.out.println();
boolean has=false;
while(tuple2Iterator.hasNext()){
Tuple2<String,Double> t2=tuple2Iterator.next();
//打印结果
System.out.println(t2._1+" "+t2._2);
pst.setInt(1,Integer.parseInt(t2._1));
pst.setDouble(2,t2._2);
pst.setDouble(3,t2._2);
pst.executeUpdate();
if(Integer.parseInt(t2._1)==1){
pst2.setDouble(1,t2._2);
pst2.executeUpdate();
has=true;
}
}
if(has==false){
pst2.setDouble(1,0);
pst2.executeUpdate();
}
pst.close();
pst2.close();
}finally {
ConnectionPool.returnConnection(connection);
}
}
});
/*printdata.foreach(new VoidFunction<Tuple2<String, Double>>() {
@Override
public void call(Tuple2<String, Double> stringDoubleTuple2) throws Exception {
System.out.println(stringDoubleTuple2._1+" "+stringDoubleTuple2._2);
}
});*/
}
}
});
/*输出关注度
updateFollowValue.foreachRDD(new
没有合适的资源?快使用搜索试试~ 我知道了~
温馨提示
本资源中的源码都是经过本地编译过可运行的,下载后按照文档配置好环境就可以运行。资源项目的难度比较适中,内容都是经过助教老师审定过的,应该能够满足学习、使用需求,如果有需要的话可以放心下载使用。有任何问题也可以随时私信博主,博主会第一时间给您解答!!! 本资源中的源码都是经过本地编译过可运行的,下载后按照文档配置好环境就可以运行。资源项目的难度比较适中,内容都是经过助教老师审定过的,应该能够满足学习、使用需求,如果有需要的话可以放心下载使用。有任何问题也可以随时私信博主,博主会第一时间给您解答!!! 本资源中的源码都是经过本地编译过可运行的,下载后按照文档配置好环境就可以运行。资源项目的难度比较适中,内容都是经过助教老师审定过的,应该能够满足学习、使用需求,如果有需要的话可以放心下载使用。有任何问题也可以随时私信博主,博主会第一时间给您解答!!!
资源推荐
资源详情
资源评论
收起资源包目录
计算机课程毕设:基于spark的电商商品智能分析系统,采用流式计算电商商品关注度,实现商品智能推荐及关联分析.zip (939个子文件)
_partitioner 147B
_partitioner 147B
_partitioner 147B
_partitioner 147B
_partitioner 147B
_partitioner 147B
_partitioner 147B
_partitioner 147B
_partitioner 147B
_partitioner 147B
_partitioner 147B
_partitioner 147B
_partitioner 147B
_partitioner 147B
_partitioner 147B
_partitioner 147B
_partitioner 147B
_partitioner 147B
_partitioner 147B
_partitioner 147B
_partitioner 147B
_partitioner 147B
_partitioner 147B
_partitioner 147B
_partitioner 147B
_partitioner 147B
_partitioner 147B
_partitioner 147B
_partitioner 147B
_partitioner 147B
_partitioner 147B
_partitioner 147B
_partitioner 147B
_partitioner 147B
_partitioner 147B
_partitioner 147B
_partitioner 147B
_partitioner 147B
_partitioner 147B
_partitioner 147B
_partitioner 147B
_partitioner 147B
_partitioner 147B
_partitioner 147B
_partitioner 147B
_partitioner 147B
_partitioner 147B
_partitioner 147B
_partitioner 147B
_partitioner 147B
_partitioner 147B
_partitioner 147B
_partitioner 147B
_partitioner 0B
_partitioner 0B
_SUCCESS 0B
_SUCCESS 0B
_SUCCESS 0B
checkpoint-1512733020000.bk 4KB
checkpoint-1512733005000.bk 4KB
checkpoint-1512733010000.bk 4KB
checkpoint-1512733000000 4KB
checkpoint-1512733005000 4KB
checkpoint-1512733010000 4KB
checkpoint-1512733015000 4KB
checkpoint-1512733020000 4KB
checkpoint-1512733025000 4KB
checkpoint-1512733030000 4KB
testEvaluation$.class 14KB
trainEvaluation$.class 12KB
testMLlib$.class 11KB
trainAls$.class 9KB
WordCountJava8.class 6KB
fpg$.class 6KB
WsController.class 5KB
DataSimulator.class 5KB
productDataToAls$.class 4KB
WordCountScala$.class 4KB
productDataToFpg$.class 4KB
Chart$.class 4KB
trainEvaluation.class 4KB
StreamingGoods.class 4KB
testEvaluation.class 4KB
StreamingGoods$5$2.class 3KB
WordCountJava7.class 3KB
testEvaluation$$anonfun$evaluateParameter$1$$anonfun$apply$mcVI$sp$1$$anonfun$apply$mcVI$sp$2.class 3KB
JDBCUtils.class 3KB
trainEvaluation$$anonfun$evaluateParameter$1$$anonfun$apply$mcVI$sp$1$$anonfun$apply$mcVI$sp$2.class 3KB
UserItemDataSimulator.class 3KB
StreamingGoods$4$2.class 3KB
trainEvaluation$$anonfun$1$$anonfun$apply$1.class 3KB
testEvaluation$$anonfun$1$$anonfun$apply$1.class 3KB
trainEvaluation$$anonfun$1.class 2KB
testEvaluation$$anonfun$1.class 2KB
testGroupby.class 2KB
trainEvaluation$$anonfun$1$$anonfun$apply$1$$anonfun$apply$2.class 2KB
testEvaluation$$anonfun$1$$anonfun$apply$1$$anonfun$apply$2.class 2KB
testMLlib$$anonfun$saveModelToSql$1.class 2KB
testMysql.class 2KB
testMLlib.class 2KB
共 939 条
- 1
- 2
- 3
- 4
- 5
- 6
- 10
资源评论
- 风眼里的平静2024-03-30资源太好了,解决了我当下遇到的难题,抱紧大佬的大腿~
白话机器学习
- 粉丝: 1w+
- 资源: 7673
上传资源 快速赚钱
- 我的内容管理 展开
- 我的资源 快来上传第一个资源
- 我的收益 登录查看自己的收益
- 我的积分 登录查看自己的积分
- 我的C币 登录后查看C币余额
- 我的收藏
- 我的下载
- 下载帮助
最新资源
- 8021X-2020.pdf
- Screenshot_2024-10-12-01-45-58-260_coding.yu.ccompiler.new.jpg
- 示波器实验报告,实验目的:掌握使用示波器和信号发生器的基本方法
- 示波器实验项目方案及报告(使用示波器观察与分析RC电路充放电过程).doc
- 易支付源代码易支付源代码易支付源代码易支付源代码易支付源代码易支付源代码易支付源代码易支付源代码
- 基于Jupyter Notebook的joyful-pandas数据分析与可视化设计源码
- 基于Java语言开发的智慧自助餐饮系统后端设计源码
- 基于若依框架的Java报修系统设计源码
- 基于Java和Kotlin的永州特产溯源系统设计源码
- 基于Java与Kotlin的居家生活交流社区SmallNest设计源码
资源上传下载、课程学习等过程中有任何疑问或建议,欢迎提出宝贵意见哦~我们会及时处理!
点击此处反馈
安全验证
文档复制为VIP权益,开通VIP直接复制
信息提交成功