package com.twq;
import java.io.Serializable;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.PreparedStatement;
import java.util.Iterator;
import java.util.List;
import org.apache.log4j.Logger;
import org.apache.log4j.Level;
import org.apache.spark.HashPartitioner;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.api.java.function.Function2;
import org.apache.spark.api.java.function.PairFunction;
import org.apache.spark.api.java.function.VoidFunction;
import org.apache.spark.streaming.Duration;
import org.apache.spark.streaming.api.java.JavaDStream;
import org.apache.spark.streaming.api.java.JavaPairDStream;
import org.apache.spark.streaming.api.java.JavaReceiverInputDStream;
import org.apache.spark.streaming.api.java.JavaStreamingContext;
import org.apache.spark.api.java.Optional;
import scala.Tuple2;
//import com.google.common.base.Optional;
public class StreamingGoods implements Serializable{
//define file for saving pre rdd data
public static String checkpointDir="checkDir";
public static boolean flag=false;
public static void main(String[] args) throws InterruptedException {
//setLogger
Logger.getLogger("org").setLevel(Level.OFF);
Logger.getLogger("com").setLevel(Level.OFF);
System.setProperty("spark.ui.showConsoleProgress","false");
Logger.getRootLogger().setLevel(Level.OFF);
//setmaster>=2
SparkConf sparkconf=new SparkConf().setAppName("StreamingGoods").setMaster("local[2]");
//Spark Streaming需要指定处理数据的时间间隔
JavaStreamingContext jsc=new JavaStreamingContext(sparkconf,new Duration(5000));
//恢复点
jsc.checkpoint(checkpointDir);
JavaReceiverInputDStream<String> jds=jsc.socketTextStream("127.0.0.1", 9999);
//获取数据
JavaDStream<String> mess=jds.map(new Function<String,String>(){
private static final long serialVersionUID=1L;
public String call(String arg0) throws Exception{
if(arg0.isEmpty()){
System.out.println("没有数据流进来。。。。。。");
}
return arg0;
}
});
//mess.print(30); //打印接收到的数据
//分割数据以及计算关注度
JavaPairDStream<String,Double> splitmess=jds.mapToPair(new PairFunction<String,String,Double>(){
private static final long serialVersionUID=1L;
public Tuple2<String,Double> call(String line) throws Exception{
String[] lineSplit=line.toString().split("::");
if(lineSplit.length==0){
flag=true;
}
//权重计算:浏览次数0.15 停留时间0.15 收藏0.2 购买件数0.2 评分0.3 合为1
Double followValue=Double.parseDouble(lineSplit[2])*0.15+Double.parseDouble(lineSplit[3])*0.15+
Double.parseDouble(lineSplit[4])*0.2+Double.parseDouble(lineSplit[5])*0.2+
Double.parseDouble(lineSplit[4])*0.3;
return new Tuple2<String,Double>(lineSplit[1],followValue);
}
});
//更新关注度
JavaPairDStream<String,Double> updateFollowValue=splitmess.updateStateByKey(
new Function2<List<Double>,Optional<Double>,Optional<Double>>(){
public Optional<Double> call(List<Double> newValues,Optional<Double> startValue) throws Exception{
Double updateValue=startValue.or(0.0);
for(Double values : newValues) {
updateValue+=values;
}
return Optional.of(updateValue);
}
});
//将updateFollowValue写进数据库
updateFollowValue.foreachRDD(new VoidFunction<JavaPairRDD<String, Double>>() {
@Override
public void call(JavaPairRDD<String, Double> stringDoubleJavaPairRDD) throws Exception {
if(stringDoubleJavaPairRDD.isEmpty()||flag){
System.out.println("没有数据流入!!!");
}else{
JavaPairRDD<String,Double> printdata=stringDoubleJavaPairRDD.mapToPair(new PairFunction<Tuple2<String, Double>, String, Double>() {
@Override
public Tuple2<String, Double> call(Tuple2<String, Double> stringDoubleTuple2) throws Exception {
//System.out.println(stringDoubleTuple2._1+" "+stringDoubleTuple2._2);
return new Tuple2<String, Double>(stringDoubleTuple2._1,stringDoubleTuple2._2);
}
});
printdata.foreachPartition(new VoidFunction<Iterator<Tuple2<String, Double>>>() {
@Override
public void call(Iterator<Tuple2<String, Double>> tuple2Iterator) throws Exception {
Connection connection=null;
try{
connection=ConnectionPool.getConnection();
String sql="insert into followData(itemID,followValue) values(?,?) on duplicate key update followValue=?";
PreparedStatement pst=connection.prepareStatement(sql);
System.out.println();
while(tuple2Iterator.hasNext()){
Tuple2<String,Double> t2=tuple2Iterator.next();
//打印结果
System.out.println(t2._1+" "+t2._2);
pst.setInt(1,Integer.parseInt(t2._1));
pst.setDouble(2,t2._2);
pst.setDouble(3,t2._2);
pst.executeUpdate();
}
pst.close();
}finally {
ConnectionPool.returnConnection(connection);
}
}
});
/*printdata.foreach(new VoidFunction<Tuple2<String, Double>>() {
@Override
public void call(Tuple2<String, Double> stringDoubleTuple2) throws Exception {
System.out.println(stringDoubleTuple2._1+" "+stringDoubleTuple2._2);
}
});*/
}
}
});
//滑动窗口
JavaPairDStream<String,Double> windowFollowValue=splitmess.window(new Duration(10000),new Duration(5000));
windowFollowValue.foreachRDD(new VoidFunction<JavaPairRDD<String, Double>>() {
@Override
public void call(JavaPairRDD<String, Double> stringDoubleJavaPairRDD) throws Exception {
if(stringDoubleJavaPairRDD.isEmpty()||flag){
System.out.println("滑动窗口未满");
}else{
JavaPairRDD<String,Double> printdata=stringDoubleJavaPairRDD.mapToPair(new PairFunction<Tuple2<String, Double>, String, Double>() {
@Override
public Tuple2<String, Double> call(Tuple2<String, Double> stringDoubleTuple2) throws Exception {
//System.out.println(stringDoubleTuple2._1+" "+stringDoubleTuple2._2);
return new Tuple2<String, Double>(stringDoubleTuple2._1,stringDoubleTuple2._2);
}
});
printdata.foreachPartition(new VoidFunction<Iterator<Tuple2<String, Double>>>() {
@Override
public void call(Iterator<Tuple2<String, Double>> tuple2Iterator) throws Exception {
Connection connection=null;
try{
connection=ConnectionPool.getConnection();
String sql="insert into windowFollowData(itemID,followValue) values(?,?) on duplicate key update followValue=?";
String singeSql="insert into singleFollowValue(windowFollowValue) values(?)";
PreparedStatement pst=connection.prepareStatement(sql);
PreparedStatement pst2=connection.prepareStatement(singeSql);
System.out.println();
boolean has=false;
while(tuple2Iterator.hasNext()){
Tuple2<String,Double> t2=tuple2Iterator.next();
//打印结果
System.out.println(t2._1+" "+t2._2);
pst.setInt(1,Integer.parseInt(t2._1));
pst.setDouble(2,t2._2);
pst.setDouble(3,t2._2);
pst.executeUpdate();
if(Integer.parseInt(t2._1)==1){
pst2.setDouble(1,t2._2);
pst2.executeUpdate();
has=true;
}
}
if(has==false){
pst2.setDouble(1,0);
pst2.executeUpdate();
}
pst.close();
pst2.close();
}finally {
ConnectionPool.returnConnection(connection);
}
}
});
/*printdata.foreach(new VoidFunction<Tuple2<String, Double>>() {
@Override
public void call(Tuple2<String, Double> stringDoubleTuple2) throws Exception {
System.out.println(stringDoubleTuple2._1+" "+stringDoubleTuple2._2);
}
});*/
}
}
});
/*输出关注度
updateFollowValue.foreachRDD(new
没有合适的资源?快使用搜索试试~ 我知道了~
资源推荐
资源详情
资源评论
![zip](https://img-home.csdnimg.cn/images/20210720083736.png)
![zip](https://img-home.csdnimg.cn/images/20210720083736.png)
![zip](https://img-home.csdnimg.cn/images/20210720083736.png)
![zip](https://img-home.csdnimg.cn/images/20210720083736.png)
![zip](https://img-home.csdnimg.cn/images/20210720083736.png)
![rar](https://img-home.csdnimg.cn/images/20210720083606.png)
收起资源包目录
![package](https://csdnimg.cn/release/downloadcmsfe/public/img/package.f3fc750b.png)
![file-type](https://csdnimg.cn/release/download/static_files/pc/images/minetype/UNKNOWN.png)
![file-type](https://csdnimg.cn/release/download/static_files/pc/images/minetype/UNKNOWN.png)
![file-type](https://csdnimg.cn/release/download/static_files/pc/images/minetype/UNKNOWN.png)
![file-type](https://csdnimg.cn/release/download/static_files/pc/images/minetype/UNKNOWN.png)
![file-type](https://csdnimg.cn/release/download/static_files/pc/images/minetype/UNKNOWN.png)
![file-type](https://csdnimg.cn/release/download/static_files/pc/images/minetype/UNKNOWN.png)
![file-type](https://csdnimg.cn/release/download/static_files/pc/images/minetype/UNKNOWN.png)
![file-type](https://csdnimg.cn/release/download/static_files/pc/images/minetype/UNKNOWN.png)
![file-type](https://csdnimg.cn/release/download/static_files/pc/images/minetype/UNKNOWN.png)
![file-type](https://csdnimg.cn/release/download/static_files/pc/images/minetype/UNKNOWN.png)
![file-type](https://csdnimg.cn/release/download/static_files/pc/images/minetype/UNKNOWN.png)
![file-type](https://csdnimg.cn/release/download/static_files/pc/images/minetype/UNKNOWN.png)
![file-type](https://csdnimg.cn/release/download/static_files/pc/images/minetype/UNKNOWN.png)
![file-type](https://csdnimg.cn/release/download/static_files/pc/images/minetype/UNKNOWN.png)
![file-type](https://csdnimg.cn/release/download/static_files/pc/images/minetype/UNKNOWN.png)
![file-type](https://csdnimg.cn/release/download/static_files/pc/images/minetype/UNKNOWN.png)
![file-type](https://csdnimg.cn/release/download/static_files/pc/images/minetype/UNKNOWN.png)
![file-type](https://csdnimg.cn/release/download/static_files/pc/images/minetype/UNKNOWN.png)
![file-type](https://csdnimg.cn/release/download/static_files/pc/images/minetype/UNKNOWN.png)
![file-type](https://csdnimg.cn/release/download/static_files/pc/images/minetype/UNKNOWN.png)
![file-type](https://csdnimg.cn/release/download/static_files/pc/images/minetype/UNKNOWN.png)
![file-type](https://csdnimg.cn/release/download/static_files/pc/images/minetype/UNKNOWN.png)
![file-type](https://csdnimg.cn/release/download/static_files/pc/images/minetype/UNKNOWN.png)
![file-type](https://csdnimg.cn/release/download/static_files/pc/images/minetype/UNKNOWN.png)
![file-type](https://csdnimg.cn/release/download/static_files/pc/images/minetype/UNKNOWN.png)
![file-type](https://csdnimg.cn/release/download/static_files/pc/images/minetype/UNKNOWN.png)
![file-type](https://csdnimg.cn/release/download/static_files/pc/images/minetype/UNKNOWN.png)
![file-type](https://csdnimg.cn/release/download/static_files/pc/images/minetype/UNKNOWN.png)
![file-type](https://csdnimg.cn/release/download/static_files/pc/images/minetype/UNKNOWN.png)
![file-type](https://csdnimg.cn/release/download/static_files/pc/images/minetype/UNKNOWN.png)
![file-type](https://csdnimg.cn/release/download/static_files/pc/images/minetype/UNKNOWN.png)
![file-type](https://csdnimg.cn/release/download/static_files/pc/images/minetype/UNKNOWN.png)
![file-type](https://csdnimg.cn/release/download/static_files/pc/images/minetype/UNKNOWN.png)
![file-type](https://csdnimg.cn/release/download/static_files/pc/images/minetype/UNKNOWN.png)
![file-type](https://csdnimg.cn/release/download/static_files/pc/images/minetype/UNKNOWN.png)
![file-type](https://csdnimg.cn/release/download/static_files/pc/images/minetype/UNKNOWN.png)
![file-type](https://csdnimg.cn/release/download/static_files/pc/images/minetype/UNKNOWN.png)
![file-type](https://csdnimg.cn/release/download/static_files/pc/images/minetype/UNKNOWN.png)
![file-type](https://csdnimg.cn/release/download/static_files/pc/images/minetype/UNKNOWN.png)
![file-type](https://csdnimg.cn/release/download/static_files/pc/images/minetype/UNKNOWN.png)
![file-type](https://csdnimg.cn/release/download/static_files/pc/images/minetype/UNKNOWN.png)
![file-type](https://csdnimg.cn/release/download/static_files/pc/images/minetype/UNKNOWN.png)
![file-type](https://csdnimg.cn/release/download/static_files/pc/images/minetype/UNKNOWN.png)
![file-type](https://csdnimg.cn/release/download/static_files/pc/images/minetype/UNKNOWN.png)
![file-type](https://csdnimg.cn/release/download/static_files/pc/images/minetype/UNKNOWN.png)
![file-type](https://csdnimg.cn/release/download/static_files/pc/images/minetype/UNKNOWN.png)
![file-type](https://csdnimg.cn/release/download/static_files/pc/images/minetype/UNKNOWN.png)
![file-type](https://csdnimg.cn/release/download/static_files/pc/images/minetype/UNKNOWN.png)
![file-type](https://csdnimg.cn/release/download/static_files/pc/images/minetype/UNKNOWN.png)
![file-type](https://csdnimg.cn/release/download/static_files/pc/images/minetype/UNKNOWN.png)
![file-type](https://csdnimg.cn/release/download/static_files/pc/images/minetype/UNKNOWN.png)
![file-type](https://csdnimg.cn/release/download/static_files/pc/images/minetype/UNKNOWN.png)
![file-type](https://csdnimg.cn/release/download/static_files/pc/images/minetype/UNKNOWN.png)
![file-type](https://csdnimg.cn/release/download/static_files/pc/images/minetype/UNKNOWN.png)
![file-type](https://csdnimg.cn/release/download/static_files/pc/images/minetype/UNKNOWN.png)
![file-type](https://csdnimg.cn/release/download/static_files/pc/images/minetype/UNKNOWN.png)
![file-type](https://csdnimg.cn/release/download/static_files/pc/images/minetype/UNKNOWN.png)
![file-type](https://csdnimg.cn/release/download/static_files/pc/images/minetype/UNKNOWN.png)
![file-type](https://csdnimg.cn/release/download/static_files/pc/images/minetype/UNKNOWN.png)
![file-type](https://csdnimg.cn/release/download/static_files/pc/images/minetype/UNKNOWN.png)
![file-type](https://csdnimg.cn/release/download/static_files/pc/images/minetype/UNKNOWN.png)
![file-type](https://csdnimg.cn/release/download/static_files/pc/images/minetype/UNKNOWN.png)
![file-type](https://csdnimg.cn/release/download/static_files/pc/images/minetype/UNKNOWN.png)
![file-type](https://csdnimg.cn/release/download/static_files/pc/images/minetype/UNKNOWN.png)
![file-type](https://csdnimg.cn/release/download/static_files/pc/images/minetype/UNKNOWN.png)
![file-type](https://csdnimg.cn/release/download/static_files/pc/images/minetype/UNKNOWN.png)
![file-type](https://csdnimg.cn/release/download/static_files/pc/images/minetype/UNKNOWN.png)
![file-type](https://csdnimg.cn/release/download/static_files/pc/images/minetype/UNKNOWN.png)
![file-type](https://csdnimg.cn/release/download/static_files/pc/images/minetype/UNKNOWN.png)
![file-type](https://csdnimg.cn/release/download/static_files/pc/images/minetype/UNKNOWN.png)
![file-type](https://csdnimg.cn/release/download/static_files/pc/images/minetype/UNKNOWN.png)
![file-type](https://csdnimg.cn/release/download/static_files/pc/images/minetype/UNKNOWN.png)
![file-type](https://csdnimg.cn/release/download/static_files/pc/images/minetype/UNKNOWN.png)
![file-type](https://csdnimg.cn/release/download/static_files/pc/images/minetype/UNKNOWN.png)
![file-type](https://csdnimg.cn/release/download/static_files/pc/images/minetype/UNKNOWN.png)
![file-type](https://csdnimg.cn/release/download/static_files/pc/images/minetype/UNKNOWN.png)
![file-type](https://csdnimg.cn/release/download/static_files/pc/images/minetype/UNKNOWN.png)
![file-type](https://csdnimg.cn/release/download/static_files/pc/images/minetype/UNKNOWN.png)
![file-type](https://csdnimg.cn/release/download/static_files/pc/images/minetype/UNKNOWN.png)
![file-type](https://csdnimg.cn/release/download/static_files/pc/images/minetype/UNKNOWN.png)
![file-type](https://csdnimg.cn/release/download/static_files/pc/images/minetype/UNKNOWN.png)
![file-type](https://csdnimg.cn/release/download/static_files/pc/images/minetype/UNKNOWN.png)
![file-type](https://csdnimg.cn/release/download/static_files/pc/images/minetype/UNKNOWN.png)
![file-type](https://csdnimg.cn/release/download/static_files/pc/images/minetype/UNKNOWN.png)
![file-type](https://csdnimg.cn/release/download/static_files/pc/images/minetype/UNKNOWN.png)
![file-type](https://csdnimg.cn/release/download/static_files/pc/images/minetype/UNKNOWN.png)
![file-type](https://csdnimg.cn/release/download/static_files/pc/images/minetype/UNKNOWN.png)
![file-type](https://csdnimg.cn/release/download/static_files/pc/images/minetype/UNKNOWN.png)
![file-type](https://csdnimg.cn/release/download/static_files/pc/images/minetype/UNKNOWN.png)
![file-type](https://csdnimg.cn/release/download/static_files/pc/images/minetype/UNKNOWN.png)
![file-type](https://csdnimg.cn/release/download/static_files/pc/images/minetype/UNKNOWN.png)
![file-type](https://csdnimg.cn/release/download/static_files/pc/images/minetype/UNKNOWN.png)
![file-type](https://csdnimg.cn/release/download/static_files/pc/images/minetype/UNKNOWN.png)
![file-type](https://csdnimg.cn/release/download/static_files/pc/images/minetype/UNKNOWN.png)
![file-type](https://csdnimg.cn/release/download/static_files/pc/images/minetype/UNKNOWN.png)
![file-type](https://csdnimg.cn/release/download/static_files/pc/images/minetype/UNKNOWN.png)
![file-type](https://csdnimg.cn/release/download/static_files/pc/images/minetype/UNKNOWN.png)
![file-type](https://csdnimg.cn/release/download/static_files/pc/images/minetype/UNKNOWN.png)
![file-type](https://csdnimg.cn/release/download/static_files/pc/images/minetype/UNKNOWN.png)
![file-type](https://csdnimg.cn/release/download/static_files/pc/images/minetype/UNKNOWN.png)
共 939 条
- 1
- 2
- 3
- 4
- 5
- 6
- 10
资源评论
![avatar-default](https://csdnimg.cn/release/downloadcmsfe/public/img/lazyLogo2.1882d7f4.png)
![avatar](https://profile-avatar.csdnimg.cn/e908d3f559a440a89ffab26bc7ee7a71_weixin_38343072.jpg!1)
马coder
- 粉丝: 1211
- 资源: 6602
上传资源 快速赚钱
我的内容管理 展开
我的资源 快来上传第一个资源
我的收益
登录查看自己的收益我的积分 登录查看自己的积分
我的C币 登录后查看C币余额
我的收藏
我的下载
下载帮助
![voice](https://csdnimg.cn/release/downloadcmsfe/public/img/voice.245cc511.png)
![center-task](https://csdnimg.cn/release/downloadcmsfe/public/img/center-task.c2eda91a.png)
最新资源
- MSFRosslerAll (2).for
- vs2022安装包在线安装包
- Microsoft C++ 生成工具
- 74LS190实现10以内、100以内十进制加/减计数器-由multisim软件电路仿真设计
- 毕业项目:网上图书订阅管理系统的设计与开发(ASP.NET技术+程序源代码+毕业文档)
- zjk-1.ipynb
- Delphi 12 控件之EhLib.v10.2.42 for Delphi 5-12.src.rar
- cloudreve3.8.3的docker镜像
- RB108A-SOT23-5封装 单节锂电池保护IC 深圳市可芯电子有限公司.pdf
- Apache Maven的安装与配置.pdf
资源上传下载、课程学习等过程中有任何疑问或建议,欢迎提出宝贵意见哦~我们会及时处理!
点击此处反馈
![feedback](https://img-home.csdnimg.cn/images/20220527035711.png)
![feedback](https://img-home.csdnimg.cn/images/20220527035711.png)
![feedback-tip](https://img-home.csdnimg.cn/images/20220527035111.png)
安全验证
文档复制为VIP权益,开通VIP直接复制
![dialog-icon](https://csdnimg.cn/release/downloadcmsfe/public/img/green-success.6a4acb44.png)