package com.twq;
import java.io.Serializable;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.PreparedStatement;
import java.util.Iterator;
import java.util.List;
import org.apache.log4j.Logger;
import org.apache.log4j.Level;
import org.apache.spark.HashPartitioner;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.api.java.function.Function2;
import org.apache.spark.api.java.function.PairFunction;
import org.apache.spark.api.java.function.VoidFunction;
import org.apache.spark.streaming.Duration;
import org.apache.spark.streaming.api.java.JavaDStream;
import org.apache.spark.streaming.api.java.JavaPairDStream;
import org.apache.spark.streaming.api.java.JavaReceiverInputDStream;
import org.apache.spark.streaming.api.java.JavaStreamingContext;
import org.apache.spark.api.java.Optional;
import scala.Tuple2;
//import com.google.common.base.Optional;
public class StreamingGoods implements Serializable{
//define file for saving pre rdd data
public static String checkpointDir="checkDir";
public static boolean flag=false;
public static void main(String[] args) throws InterruptedException {
//setLogger
Logger.getLogger("org").setLevel(Level.OFF);
Logger.getLogger("com").setLevel(Level.OFF);
System.setProperty("spark.ui.showConsoleProgress","false");
Logger.getRootLogger().setLevel(Level.OFF);
//setmaster>=2
SparkConf sparkconf=new SparkConf().setAppName("StreamingGoods").setMaster("local[2]");
//Spark Streaming需要指定处理数据的时间间隔
JavaStreamingContext jsc=new JavaStreamingContext(sparkconf,new Duration(5000));
//恢复点
jsc.checkpoint(checkpointDir);
JavaReceiverInputDStream<String> jds=jsc.socketTextStream("127.0.0.1", 9999);
//获取数据
JavaDStream<String> mess=jds.map(new Function<String,String>(){
private static final long serialVersionUID=1L;
public String call(String arg0) throws Exception{
if(arg0.isEmpty()){
System.out.println("没有数据流进来。。。。。。");
}
return arg0;
}
});
//mess.print(30); //打印接收到的数据
//分割数据以及计算关注度
JavaPairDStream<String,Double> splitmess=jds.mapToPair(new PairFunction<String,String,Double>(){
private static final long serialVersionUID=1L;
public Tuple2<String,Double> call(String line) throws Exception{
String[] lineSplit=line.toString().split("::");
if(lineSplit.length==0){
flag=true;
}
//权重计算:浏览次数0.15 停留时间0.15 收藏0.2 购买件数0.2 评分0.3 合为1
Double followValue=Double.parseDouble(lineSplit[2])*0.15+Double.parseDouble(lineSplit[3])*0.15+
Double.parseDouble(lineSplit[4])*0.2+Double.parseDouble(lineSplit[5])*0.2+
Double.parseDouble(lineSplit[4])*0.3;
return new Tuple2<String,Double>(lineSplit[1],followValue);
}
});
//更新关注度
JavaPairDStream<String,Double> updateFollowValue=splitmess.updateStateByKey(
new Function2<List<Double>,Optional<Double>,Optional<Double>>(){
public Optional<Double> call(List<Double> newValues,Optional<Double> startValue) throws Exception{
Double updateValue=startValue.or(0.0);
for(Double values : newValues) {
updateValue+=values;
}
return Optional.of(updateValue);
}
});
//将updateFollowValue写进数据库
updateFollowValue.foreachRDD(new VoidFunction<JavaPairRDD<String, Double>>() {
@Override
public void call(JavaPairRDD<String, Double> stringDoubleJavaPairRDD) throws Exception {
if(stringDoubleJavaPairRDD.isEmpty()||flag){
System.out.println("没有数据流入!!!");
}else{
JavaPairRDD<String,Double> printdata=stringDoubleJavaPairRDD.mapToPair(new PairFunction<Tuple2<String, Double>, String, Double>() {
@Override
public Tuple2<String, Double> call(Tuple2<String, Double> stringDoubleTuple2) throws Exception {
//System.out.println(stringDoubleTuple2._1+" "+stringDoubleTuple2._2);
return new Tuple2<String, Double>(stringDoubleTuple2._1,stringDoubleTuple2._2);
}
});
printdata.foreachPartition(new VoidFunction<Iterator<Tuple2<String, Double>>>() {
@Override
public void call(Iterator<Tuple2<String, Double>> tuple2Iterator) throws Exception {
Connection connection=null;
try{
connection=ConnectionPool.getConnection();
String sql="insert into followData(itemID,followValue) values(?,?) on duplicate key update followValue=?";
PreparedStatement pst=connection.prepareStatement(sql);
System.out.println();
while(tuple2Iterator.hasNext()){
Tuple2<String,Double> t2=tuple2Iterator.next();
//打印结果
System.out.println(t2._1+" "+t2._2);
pst.setInt(1,Integer.parseInt(t2._1));
pst.setDouble(2,t2._2);
pst.setDouble(3,t2._2);
pst.executeUpdate();
}
pst.close();
}finally {
ConnectionPool.returnConnection(connection);
}
}
});
/*printdata.foreach(new VoidFunction<Tuple2<String, Double>>() {
@Override
public void call(Tuple2<String, Double> stringDoubleTuple2) throws Exception {
System.out.println(stringDoubleTuple2._1+" "+stringDoubleTuple2._2);
}
});*/
}
}
});
//滑动窗口
JavaPairDStream<String,Double> windowFollowValue=splitmess.window(new Duration(10000),new Duration(5000));
windowFollowValue.foreachRDD(new VoidFunction<JavaPairRDD<String, Double>>() {
@Override
public void call(JavaPairRDD<String, Double> stringDoubleJavaPairRDD) throws Exception {
if(stringDoubleJavaPairRDD.isEmpty()||flag){
System.out.println("滑动窗口未满");
}else{
JavaPairRDD<String,Double> printdata=stringDoubleJavaPairRDD.mapToPair(new PairFunction<Tuple2<String, Double>, String, Double>() {
@Override
public Tuple2<String, Double> call(Tuple2<String, Double> stringDoubleTuple2) throws Exception {
//System.out.println(stringDoubleTuple2._1+" "+stringDoubleTuple2._2);
return new Tuple2<String, Double>(stringDoubleTuple2._1,stringDoubleTuple2._2);
}
});
printdata.foreachPartition(new VoidFunction<Iterator<Tuple2<String, Double>>>() {
@Override
public void call(Iterator<Tuple2<String, Double>> tuple2Iterator) throws Exception {
Connection connection=null;
try{
connection=ConnectionPool.getConnection();
String sql="insert into windowFollowData(itemID,followValue) values(?,?) on duplicate key update followValue=?";
String singeSql="insert into singleFollowValue(windowFollowValue) values(?)";
PreparedStatement pst=connection.prepareStatement(sql);
PreparedStatement pst2=connection.prepareStatement(singeSql);
System.out.println();
boolean has=false;
while(tuple2Iterator.hasNext()){
Tuple2<String,Double> t2=tuple2Iterator.next();
//打印结果
System.out.println(t2._1+" "+t2._2);
pst.setInt(1,Integer.parseInt(t2._1));
pst.setDouble(2,t2._2);
pst.setDouble(3,t2._2);
pst.executeUpdate();
if(Integer.parseInt(t2._1)==1){
pst2.setDouble(1,t2._2);
pst2.executeUpdate();
has=true;
}
}
if(has==false){
pst2.setDouble(1,0);
pst2.executeUpdate();
}
pst.close();
pst2.close();
}finally {
ConnectionPool.returnConnection(connection);
}
}
});
/*printdata.foreach(new VoidFunction<Tuple2<String, Double>>() {
@Override
public void call(Tuple2<String, Double> stringDoubleTuple2) throws Exception {
System.out.println(stringDoubleTuple2._1+" "+stringDoubleTuple2._2);
}
});*/
}
}
});
/*输出关注度
updateFollowValue.foreachRDD(new
没有合适的资源?快使用搜索试试~ 我知道了~
计算机课程毕设:基于spark的电商商品智能分析系统,采用流式计算电商商品关注度,实现商品智能推荐及关联分析.zip
共939个文件
xml:238个
crc:182个
class:147个
1.该资源内容由用户上传,如若侵权请联系客服进行举报
2.虚拟产品一经售出概不退款(资源遇到问题,请及时私信上传者)
2.虚拟产品一经售出概不退款(资源遇到问题,请及时私信上传者)
版权申诉
0 下载量 104 浏览量
2023-09-16
20:16:52
上传
评论
收藏 5.49MB ZIP 举报
温馨提示
计算机类毕业设计源码
资源推荐
资源详情
资源评论
收起资源包目录
计算机课程毕设:基于spark的电商商品智能分析系统,采用流式计算电商商品关注度,实现商品智能推荐及关联分析.zip (939个子文件)
_partitioner 147B
_partitioner 147B
_partitioner 147B
_partitioner 147B
_partitioner 147B
_partitioner 147B
_partitioner 147B
_partitioner 147B
_partitioner 147B
_partitioner 147B
_partitioner 147B
_partitioner 147B
_partitioner 147B
_partitioner 147B
_partitioner 147B
_partitioner 147B
_partitioner 147B
_partitioner 147B
_partitioner 147B
_partitioner 147B
_partitioner 147B
_partitioner 147B
_partitioner 147B
_partitioner 147B
_partitioner 147B
_partitioner 147B
_partitioner 147B
_partitioner 147B
_partitioner 147B
_partitioner 147B
_partitioner 147B
_partitioner 147B
_partitioner 147B
_partitioner 147B
_partitioner 147B
_partitioner 147B
_partitioner 147B
_partitioner 147B
_partitioner 147B
_partitioner 147B
_partitioner 147B
_partitioner 147B
_partitioner 147B
_partitioner 147B
_partitioner 147B
_partitioner 147B
_partitioner 147B
_partitioner 147B
_partitioner 147B
_partitioner 147B
_partitioner 147B
_partitioner 147B
_partitioner 147B
_partitioner 0B
_partitioner 0B
_SUCCESS 0B
_SUCCESS 0B
_SUCCESS 0B
checkpoint-1512733020000.bk 4KB
checkpoint-1512733005000.bk 4KB
checkpoint-1512733010000.bk 4KB
checkpoint-1512733000000 4KB
checkpoint-1512733005000 4KB
checkpoint-1512733010000 4KB
checkpoint-1512733015000 4KB
checkpoint-1512733020000 4KB
checkpoint-1512733025000 4KB
checkpoint-1512733030000 4KB
testEvaluation$.class 14KB
trainEvaluation$.class 12KB
testMLlib$.class 11KB
trainAls$.class 9KB
WordCountJava8.class 6KB
fpg$.class 6KB
WsController.class 5KB
DataSimulator.class 5KB
productDataToAls$.class 4KB
WordCountScala$.class 4KB
productDataToFpg$.class 4KB
Chart$.class 4KB
trainEvaluation.class 4KB
StreamingGoods.class 4KB
testEvaluation.class 4KB
StreamingGoods$5$2.class 3KB
WordCountJava7.class 3KB
testEvaluation$$anonfun$evaluateParameter$1$$anonfun$apply$mcVI$sp$1$$anonfun$apply$mcVI$sp$2.class 3KB
JDBCUtils.class 3KB
trainEvaluation$$anonfun$evaluateParameter$1$$anonfun$apply$mcVI$sp$1$$anonfun$apply$mcVI$sp$2.class 3KB
UserItemDataSimulator.class 3KB
StreamingGoods$4$2.class 3KB
trainEvaluation$$anonfun$1$$anonfun$apply$1.class 3KB
testEvaluation$$anonfun$1$$anonfun$apply$1.class 3KB
trainEvaluation$$anonfun$1.class 2KB
testEvaluation$$anonfun$1.class 2KB
testGroupby.class 2KB
trainEvaluation$$anonfun$1$$anonfun$apply$1$$anonfun$apply$2.class 2KB
testEvaluation$$anonfun$1$$anonfun$apply$1$$anonfun$apply$2.class 2KB
testMLlib$$anonfun$saveModelToSql$1.class 2KB
testMysql.class 2KB
testMLlib.class 2KB
共 939 条
- 1
- 2
- 3
- 4
- 5
- 6
- 10
资源评论
学术菜鸟小晨
- 粉丝: 1w+
- 资源: 5462
下载权益
C知道特权
VIP文章
课程特权
开通VIP
上传资源 快速赚钱
- 我的内容管理 展开
- 我的资源 快来上传第一个资源
- 我的收益 登录查看自己的收益
- 我的积分 登录查看自己的积分
- 我的C币 登录后查看C币余额
- 我的收藏
- 我的下载
- 下载帮助
最新资源
资源上传下载、课程学习等过程中有任何疑问或建议,欢迎提出宝贵意见哦~我们会及时处理!
点击此处反馈
安全验证
文档复制为VIP权益,开通VIP直接复制
信息提交成功