# 基于Spark2.x新闻网大数据实时分析可视化系统项目
## 一、业务需求分析
捕获用户浏览日志信息
实时分析前20名流量最高的新闻话题
实时统计当前线上已曝光的新闻话题
统计哪个时段用户浏览量最高
## 二、系统架构图设计
![](https://www.writebug.com/myres/static/uploads/2021/10/23/c7d933cbd244ea9e98820be2fb09d36b.writebug)
## 三、系统数据流程设计
![](https://www.writebug.com/myres/static/uploads/2021/10/23/778a4435079a96e12aa947f9c6321527.writebug)
## 四、集群资源规划设计
![](https://www.writebug.com/myres/static/uploads/2021/10/23/c03fc1f67c05f521b6dabe220bee9202.writebug)
## 五、步骤详解
考虑到实际情况,本人集群配置共三个节点(node5、node6、node7)。
### 5.1Zookeeper分布式集群部署
参考 [博客](https://blog.csdn.net/u011254180/article/details/79480234)
### 5.2. Hadoop2.X HA架构与部署
参考 [博客](https://blog.csdn.net/u011254180/article/details/77922331)
### 5.3. HBase分布式集群部署与设计
参考 [博客](https://blog.csdn.net/u011254180/article/details/80171500)
### 5.4. Kafka分布式集群部署
参考 [博客](https://blog.csdn.net/u011254180/article/details/79481088)
### 5.5.Flume部署及数据采集准备
node6与node7中flume数据采集到node5中,而且node6和node7的flume配置文件大致相同,node7中将a2改为a3,如下
```js
a2.sources = r1
a2.sinks = k1
a2.channels = c1
a2.sources.r1.type = exec
a2.sources.r1.command = tail -F /opt/data/weblog-flume.log
a2.sources.r1.channels = c1
a2.channels.c1.type = memory
a2.channels.c1.capacity = 1000
a2.channels.c1.transactionCapacity = 1000
a2.channels.c1.keep-alive = 5
a2.sinks.k1.type = avro
a2.sinks.k1.channel = c1
a2.sinks.k1.hostname = node5
a2.sinks.k1.port = 5555
```
## 5.6Flume+HBase+Kafka集成与开发
#### 1.下载Flume源码并导入Idea开发工具
1)将apache-flume-1.7.0-src.tar.gz源码下载到本地解压
2)通过idea导入flume源码
打开idea开发工具,选择File——》Open
![](https://www.writebug.com/myres/static/uploads/2021/10/23/13116cdf97f73c834de5ae8f5ef17084.writebug)
然后找到flume源码解压文件,选中flume-ng-hbase-sink,点击ok加载相应模块的源码。\
![](https://www.writebug.com/myres/static/uploads/2021/10/23/354f466f056a87a0a30d247f3a0d7451.writebug)
#### 2.官方flume与hbase集成的参数介绍
![](https://www.writebug.com/myres/static/uploads/2021/10/23/51f290aa50a4a2c9ca3d99883892e9be.writebug)
#### 3.下载日志数据并分析
到搜狗实验室 下载用户查询日志
1)介绍
搜索引擎查询日志库设计为包括约1个月(2008年6月)Sogou搜索引擎部分网页查询需求及用户点击情况的网页查询日志数据集合。为进行中文搜索引擎用户行为分析的研究者提供基准研究语料。
2)格式说明
数据格式为:访问时间\t用户ID\t[查询词]\t该URL在返回结果中的排名\t用户点击的顺序号\t用户点击的URL
其中,用户ID是根据用户使用浏览器访问搜索引擎时的Cookie信息自动赋值,即同一次使用浏览器输入的不同查询对应同一个用户ID
![](https://www.writebug.com/myres/static/uploads/2021/10/23/e9eb560a751e2bf1dec0b56321f385b4.writebug)
#### 4.node5聚合节点与HBase和Kafka的集成配置
node5通过flume接收node6与node7中flume传来的数据,并将其分别发送至hbase与kafka中,配置内容如下
```js
a1.sources = r1
a1.channels = kafkaC hbaseC
a1.sinks = kafkaSink hbaseSink
a1.sources.r1.type = avro
a1.sources.r1.channels = hbaseC kafkaC
a1.sources.r1.bind = node5
a1.sources.r1.port = 5555
a1.sources.r1.threads = 5
# ****************************flume + hbase******************************
a1.channels.hbaseC.type = memory
a1.channels.hbaseC.capacity = 10000
a1.channels.hbaseC.transactionCapacity = 10000
a1.channels.hbaseC.keep-alive = 20
a1.sinks.hbaseSink.type = asynchbase
a1.sinks.hbaseSink.table = weblogs
a1.sinks.hbaseSink.columnFamily = info
a1.sinks.hbaseSink.serializer = org.apache.flume.sink.hbase.KfkAsyncHbaseEventSerializer
a1.sinks.hbaseSink.channel = hbaseC
a1.sinks.hbaseSink.serializer.payloadColumn = datetime,userid,searchname,retorder,cliorder,cliurl
# ****************************flume + kafka******************************
a1.channels.kafkaC.type = memory
a1.channels.kafkaC.capacity = 10000
a1.channels.kafkaC.transactionCapacity = 10000
a1.channels.kafkaC.keep-alive = 20
a1.sinks.kafkaSink.channel = kafkaC
a1.sinks.kafkaSink.type = org.apache.flume.sink.kafka.KafkaSink
a1.sinks.kafkaSink.brokerList = node5:9092,node6:9092,node7:9092
a1.sinks.kafkaSink.topic = weblogs
a1.sinks.kafkaSink.zookeeperConnect = node5:2181,node6:2181,node7:2181
a1.sinks.kafkaSink.requiredAcks = 1
a1.sinks.kafkaSink.batchSize = 1
a1.sinks.kafkaSink.serializer.class = kafka.serializer.StringEncoder
```
#### 5.对日志数据进行格式处理
1)将文件中的tab更换成逗号
```sh
cat weblog.log|tr "\t" "," > weblog.log
```
2)将文件中的空格更换成逗号
```sh
cat weblog2.log|tr " " "," > weblog.log
```
#### 6.自定义SinkHBase程序设计与开发
1)模仿SimpleAsyncHbaseEventSerializer自定义KfkAsyncHbaseEventSerializer实现类,修改一下代码即可。
```java
@Override
public List<PutRequest> getActions() {
List<PutRequest> actions = new ArrayList<PutRequest>();
if (payloadColumn != null) {
byte[] rowKey;
try {
/*---------------------------代码修改开始---------------------------------*/
// 解析列字段
String[] columns = new String(this.payloadColumn).split(",");
// 解析flume采集过来的每行的值
String[] values = new String(this.payload).split(",");
for(int i=0;i < columns.length;i++){
byte[] colColumn = columns[i].getBytes();
byte[] colValue = values[i].getBytes(Charsets.UTF_8);
// 数据校验:字段和值是否对应
if(colColumn.length != colValue.length) break;
// 时间
String datetime = values[0].toString();
// 用户id
String userid = values[1].toString();
// 根据业务自定义Rowkey
rowKey = SimpleRowKeyGenerator.getKfkRowKey(userid,datetime);
// 插入数据
PutRequest putRequest = new PutRequest(table, rowKey, cf,
colColumn, colValue);
actions.add(putRequest);
/*---------------------------代码修改结束---------------------------------*/
}
} catch (Exception e) {
throw new FlumeException("Could not get row key!", e);
}
}
return actions;
}
```
2)在SimpleRowKeyGenerator类中,根据具体业务自定义Rowkey生成方法
```java
public static byte[] getKfkRowKey(String userid, String datetime) throws UnsupportedEncodingException {
return (userid + "-" + datetime + "-" + String.valueOf(System.currentTimeMillis())).getBytes("UTF8");
}
```
#### 7.自定义编译程序打jar包
1)在idea工具中,选择File——》ProjectStructrue
![](https://www.writebug.com/myres/static/uploads/2021/10/23/3925a187dc892c30d6e5f4001808bc32.writebug)
2)左侧选中Artifacts,然后点击右侧的+号,最后选择JAR——》From modules with dependencies
![](https://www.writebug.com/myres/static/uploads/2021/10/23/3175462ce67877518374a62b90ae131f.writebug)
3)然后直接点击ok
![](https://www.writebug.com/myres/static/uploads/2021/10/23/6218feab2ff7a38f64c3ec29788f0d35.writebug)
4)删除其他依赖包,只把flume-ng-hbase-sink打成jar包就可以了。
![](https://www.writebug.com/myres/static/uploads/2021/10/23/981bebcae867dd110791f78ce9266298.writebug)
![](https://www.writebug.com/myre
基于Java实现Spark2x新闻网大数据实时分析可视化系统项目【100012794】
版权申诉
5星 · 超过95%的资源 147 浏览量
2023-06-20
14:23:25
上传
评论 3
收藏 3.44MB ZIP 举报
神仙别闹
- 粉丝: 2667
- 资源: 7640
最新资源
- 高性能量化工具 hikyuu 2.0.3 python3.9 ubuntu 安装包
- Cyclone Version 9.51
- 高性能量化回测工具 hikyuu 2.0.3 python 3.12 windows 安装包
- 省级城乡居民基本养老保险情况数据集(2010-2022年).xlsx
- 舞队填写版.cpp
- 基于BP神经网络的多输入单输出回归预测.zip
- 高性能量化回测工具 hikyuu 2.0.3 python 3.9 windows 安装包
- 省级城镇职工基本养老保险情况2000-2022年.xlsx
- 高性能量化回测工具 hikyuu 2.0.3 python 3.10 windows 安装包
- 算法部署-使用OpenVINO+C#部署PaddleOCR字符识别算法-项目源码-优质项目实战.zip
资源上传下载、课程学习等过程中有任何疑问或建议,欢迎提出宝贵意见哦~我们会及时处理!
点击此处反馈