## 基于Spark2.2新闻网大数据实时系统项目
### 1. 说明
[项目代码](https://github.com/pkeropen/BigData-News)是参考[基于Spark2.x新闻网大数据实时分析可视化系统项目](https://blog.csdn.net/u011254180/article/details/80172452) 或者[大数据项目实战之新闻话题的实时统计分析](http://www.raincent.com/content-10-11077-1.html),谢谢作者分享心得!
### 2.环境配置
##### 2.1 CDH-5.14.2 (安装步骤可参考[地址](https://blog.51cto.com/kaliarch/2122467)),关于版本是按实际操作, CDH的版本兼容性很好。
|Service | hadoop01 | hadoop02 | hadoop03
|:----------|:----------|:----------|:---------
|HDFS | NameNode | DateNode | DataNode
|HBase | HMaster、HRegionServer | HRegionServer| HRegionServer
|Hive | Hive
|Flume | Flume | Flume | Flume
|Kafka | Kafka
|YARN | ResourceManager | NodeManager | NodeManager
|Oozie | Oozie
|Hue | Hue
|Spark2 | Spark
|Zookeeper | Zookeeper
|MySQL | MySQL
##### 2.2 主机配置
```
1.Hadoop01, 4核16G , centos7.2
2.Hadoop02, 2核8G, centos7.2
3.Haddop03, 2核8G, centos7.2
```
##### 2.3 项目架构
![项目架构图](https://github.com/pkeropen/BigData-News/blob/master/pic/Architecture.png)
##### 2.4 安装依赖包
```
# yum -y install psmisc MySQL-python at bc bind-libs bind-utils cups-client cups-libs cyrus-sasl-gssapi cyrus-sasl-plain ed fuse fuse-libs httpd httpd-tools keyutils-libs-devel krb5-devel libcom_err-devel libselinux-devel libsepol-devel libverto-devel mailcap noarch mailx mod_ssl openssl-devel pcre-devel postgresql-libs python-psycopg2 redhat-lsb-core redhat-lsb-submod-security x86_64 spax time zlib-devel wget psmisc
# chmod +x /etc/rc.d/rc.local
# echo "echo 0 > /proc/sys/vm/swappiness" >>/etc/rc.d/rc.local
# echo "echo never > /sys/kernel/mm/transparent_hugepage/defrag" >>/etc/rc.d/rc.local
# echo 0 > /proc/sys/vm/swappiness
# echo never > /sys/kernel/mm/transparent_hugepage/defrag
# yum -y install rpcbind
# systemctl start rpcbind
# echo "systemctl start rpcbind" >> /etc/rc.d/rc.local
安装perl支持
yum install perl* (yum安装perl相关支持)
yum install cpan (perl需要的程序库,需要cpan的支持,详细自行百度)
```
### 3. 编写数据生成模拟程序
##### 3.1 模拟从nginx生成日志的log,数据来源(搜狗实验室[下载](https://www.sogou.com/labs/resource/q.php)用户查询日志,搜索引擎查询日志库设计为包括约1个月(2008年6月)Sogou搜索引擎部分网页查询需求及用户点击情况的网页查询日志数据集合。)
##### 3.2 数据清洗
##### 数据格式为:访问时间\t用户ID\t[查询词]\t该URL在返回结果中的排名\t用户点击的顺序号\t用户点击的URL其中,用户ID是根据用户使用浏览器访问搜索引擎时的Cookie信息自动赋值,即同一次使用浏览器输入的不同查询对应同一个用户ID
1. 将文件中的tab更换成逗号
```
cat weblog.log|tr "\t" "," > weblog2.log
```
2. 将文件中的空格更换成逗号
```
cat weblog2.log|tr " " "," > weblog.log
```
##### 3.3 主要代码段
```
public static void readFileByLines(String fileName) {
FileInputStream fis = null;
InputStreamReader isr = null;
BufferedReader br = null;
String tempString = null;
try {
System.out.println("以行为单位读取文件内容,一次读一整行:");
fis = new FileInputStream(fileName);
//// 从文件系统中的某个文件中获取字节
isr = new InputStreamReader(fis, "GBK");
br = new BufferedReader(isr);
int count = 0;
while ((tempString = br.readLine()) != null) {
count++;
//显示行号
Thread.sleep(300);
String str = new String(tempString.getBytes("GBK"), "UTF8");
System.out.println("row:"+count+">>>>>>>>"+str);
writeFile(writeFileName, str);
}
isr.close();
} catch (IOException e) {
e.printStackTrace();
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
if (isr != null) {
try {
isr.close();
} catch (IOException e1) {
}
}
}
}
```
#### 3.4 打包成weblogs.jar,[打包步骤](https://blog.csdn.net/xuemengrui12/article/details/74984731), 写Shell脚本weblog-shell.sh
```
#/bin/bash
echo "start log......"
#第一个参数是原日志文件,第二个参数是日志生成输出文件
java -jar /opt/jars/weblogs.jar /opt/datas/weblog.log /opt/datas/weblog-flume.log
```
#### 3.5 修改weblog-shell.sh可执行权限
```
chmod 777 weblog-shell.sh
```
### 4. Flume数据采集配置
##### 4.1 将hadoop02, hadoop03中Flume数据采集到hadoop01中,而且hadoop02和hadoop03的flume配置文件大致相同
```
flume-collect-conf.properties
# Name the components on this agent
a1.sources = r1
a1.sinks = k1
a1.channels = c1
# Describe/configure the source
a1.sources.r1.type =exec
a1.sources.r1.command= tail -F /opt/datas/weblog-flume.log
# Describe the sink
a1.sinks.k1.type = avro
a1.sinks.k1.hostname = hadoop01
a1.sinks.k1.port = 5555
# Use a channel which buffers events in memory
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 1000
a1.channels.c1.keep-alive = 5
# Bind the source and sink to the channel
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
```
##### 4.2 hadoop01通过flume接收hadoop02与hadoop03中flume传来的数据,并将其分别发送至hbase与kafka中,配置内容如下:
```
a1.sources = r1
a1.channels = kafkaC hbaseC
a1.sinks = kafkaSink hbaseSink
a1.sources.r1.type = avro
a1.sources.r1.channels = hbaseC kafkaC
a1.sources.r1.bind = hadoop01
a1.sources.r1.port = 5555
a1.sources.r1.threads = 5
#****************************flume + hbase******************************
a1.channels.hbaseC.type = memory
a1.channels.hbaseC.capacity = 10000
a1.channels.hbaseC.transactionCapacity = 10000
a1.channels.hbaseC.keep-alive = 20
a1.sinks.hbaseSink.type = asynchbase
## HBase表名
a1.sinks.hbaseSink.table = weblogs
## HBase表的列族名称
a1.sinks.hbaseSink.columnFamily = info
## 自定义异步写入Hbase
a1.sinks.hbaseSink.serializer = main.hbase.KfkAsyncHbaseEventSerializer
a1.sinks.hbaseSink.channel = hbaseC
## Hbase表的列 名称
a1.sinks.hbaseSink.serializer.payloadColumn = datetime,userid,searchname,retorder,cliorder,cliurl
#****************************flume + kafka******************************
a1.channels.kafkaC.type = memory
a1.channels.kafkaC.capacity = 10000
a1.channels.kafkaC.transactionCapacity = 10000
a1.channels.kafkaC.keep-alive = 20
a1.sinks.kafkaSink.channel = kafkaC
a1.sinks.kafkaSink.type = org.apache.flume.sink.kafka.KafkaSink
a1.sinks.kafkaSink.brokerList = hadoop01:9092
a1.sinks.kafkaSink.topic = webCount
a1.sinks.kafkaSink.zookeeperConnect = hadoop01:2181
a1.sinks.kafkaSink.requiredAcks = 1
a1.sinks.kafkaSink.batchSize = 1
a1.sinks.kafkaSink.serializer.class = kafka.serializer.StringEncoder
```
##### 4.3 配置Flume执行Shell脚本
```
flume-collect-start.sh 分发到hadoop02,hadoop03 ,/opt/shell/
#/bin/bash
echo "flume-collect start ......"
sh /bin/flume-ng agent --conf c