# syslogStreaming
基于Spark的实时日志分析及异常检测系统 Flume + Kafka + Hbase + Spark-Streaming + Scala
## 1,Flume
#### flume 配置文件
// 为该Flume Agent的source、channel以及sink命名
agent_log.sources = syslog_source1
agent_log.sinks = kafka_sink
agent_log.channels = memory_channel
// 配置Syslog源
agent_log.sources.syslog_source1.type = exec
agent_log.sources.syslog_source1.command = tail -F /home/hadoop/syslogStreaming/data/switchlog1.log
// Kafka Sink的相关配置
agent_log.sinks.kafka_sink.type = org.apache.flume.sink.kafka.KafkaSink
agent_log.sinks.kafka_sink.channel = memory_channel
agent_log.sinks.kafka_sink.kafka.topic = syslog_kafka_topic
agent_log.sinks.kafka_sink.kafka.bootstrap.servers = spark0:9092,spark1:9092,spark2:9092
agent_log.sinks.kafka_sink.kafka.flumeBatchSize = 20
agent_log.sinks.kafka_sink.kafka.producer.acks = 1
#agent_log.sinks.kafka_sink.kafka.producer.linger.ms = 1
#agent_log.sinks.kafka_sink.kafka.producer.compression.type = snappy
// Channel基于内存作为缓存
agent_log.channels.memory_channel.type = memory
agent_log.channels.memory_channel.capacity = 1000
agent_log.channels.memory_channel.transactionCapacity = 1000
// 将Source以及Sink绑定至Channel
agent_log.sources.syslog_source1.channels = memory_channel
agent_log.sinks.kafka_sink.channel = memory_channel
#### 启动flume连接到kafka
// flume-ng agent --conf "配置文件文件目录" --conf-file "配置文件" --name "配置文件里agent的名字"
flume-ng agent -n agent_log -c $FLUME_HOME/conf/ -f /home/hadoop/syslogStreaming/syslog_kafka.conf -Dflume.root.logger=INFO,console
## 2,kafka
#### 配置文件
修改server.properties文件
kafka安装配置参考:https://www.cnblogs.com/zhaojiankai/p/7181910.html?utm_source=itdadao&utm_medium=referral
#### 启动kafka broker
bin/kafka-server-start.sh -daemon config/server.properties &
#### 创建topic
bin/kafka-topics.sh --create --zookeeper spark0:2181 --replication-factor 3 --partitions 1 --topic syslog_kafka_topic
#### 创建后,查看topic情况
bin/kafka-topics.sh --list --zookeeper spark0:2181
bin/kafka-topics.sh --describe --zookeeper spark0:2181 --topic syslog_kafka_topic
#### 启动kafka消费者接受flume数据
bin/kafka-console-consumer.sh --bootstrap-server 192.168.1.52:9092 --topic syslog_kafka_topic --from-beginning
## 3, kafka -----> spark streaming
#### maven相关依赖porm文件
#### 定义好kafka参数
val kafkaParams = Map[String, Object](
ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG -> brokers,
ConsumerConfig.GROUP_ID_CONFIG -> groupId,
ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG -> classOf[StringDeserializer],
ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG -> classOf[StringDeserializer]
)
#### 订阅相应的topic:
val messages = KafkaUtils.createDirectStream[String, String](
ssc,
LocationStrategies.PreferConsistent,
ConsumerStrategies.Subscribe[String, String](topicsSet, kafkaParams)
)
## 4, spark streaming ------> HBase
#### 引入Hbase相关依赖
#### 将数据存储为HBase对应的格式
// 随机产生某个uuid为行键 (示例)
val put = new Put(Bytes.toBytes(UUID.randomUUID().toString))
// 将列簇,列明,列值添加进对应结构
put.addColumn(Bytes.toBytes("column_family"), Bytes.toBytes("column_name"), Bytes.toBytes("column_value"))
#### 插入HBase
// 表名
val tablename = "table_name"
// 创建初始配置
val hbaseconf = HBaseConfiguration.create()
// 创建链接
val conn = ConnectionFactory.createConnection(hbaseconf)
// 指定表
val table: HTable = new HTable(hbaseconf, Bytes.toBytes(tablename))
// 提交事务,插入数据
table.put(put)
没有合适的资源?快使用搜索试试~ 我知道了~
基于Spark的实时日志分析及异常检测系统 Flume + Kafka + Hbase + Spark-Streaming + ...
共15个文件
xml:7个
scala:3个
class:2个
1.该资源内容由用户上传,如若侵权请联系客服进行举报
2.虚拟产品一经售出概不退款(资源遇到问题,请及时私信上传者)
2.虚拟产品一经售出概不退款(资源遇到问题,请及时私信上传者)
版权申诉
0 下载量 83 浏览量
2024-12-17
12:36:46
上传
评论
收藏 18KB ZIP 举报
温馨提示
【资源说明】 基于Spark的实时日志分析及异常检测系统 Flume + Kafka + Hbase + Spark-Streaming + Scala文档+源码+优秀项目+全部资料.zip 【备注】 1、该项目是个人高分项目源码,已获导师指导认可通过,答辩评审分达到95分 2、该资源内项目代码都经过测试运行成功,功能ok的情况下才上传的,请放心下载使用! 3、本项目适合计算机相关专业(人工智能、通信工程、自动化、电子信息、物联网等)的在校学生、老师或者企业员工下载使用,也可作为毕业设计、课程设计、作业、项目初期立项演示等,当然也适合小白学习进阶。 4、如果基础还行,可以在此代码基础上进行修改,以实现其他功能,也可直接用于毕设、课设、作业等。 欢迎下载,沟通交流,互相学习,共同进步!
资源推荐
资源详情
资源评论
收起资源包目录
基于Spark的实时日志分析及异常检测系统 Flume + Kafka + Hbase + Spark-Streaming + Scala文档+源码+优秀项目+全部资料.zip (15个子文件)
syslogStreaming-master
pom.xml 5KB
src
main
scala
syslogStreaming
App.scala 52B
com
seven
spark
syslogInfo.scala 209B
syslogStreaming.scala 2KB
.idea
uiDesigner.xml 9KB
workspace.xml 19KB
misc.xml 513B
scala_compiler.xml 289B
compiler.xml 536B
encodings.xml 135B
target
classes
com
seven
spark
wordCoundStreaming$.class 7KB
wordCoundStreaming.class 764B
test-classes
META-INF
syslogStreaming.kotlin_module 16B
README.md 4KB
CSDN
软件
项目授权码.txt 268B
共 15 条
- 1
资源评论
Yuki-^_^
- 粉丝: 3100
- 资源: 1817
上传资源 快速赚钱
- 我的内容管理 展开
- 我的资源 快来上传第一个资源
- 我的收益 登录查看自己的收益
- 我的积分 登录查看自己的积分
- 我的C币 登录后查看C币余额
- 我的收藏
- 我的下载
- 下载帮助
最新资源
- 第七章:循环控制语句 包含循环写星星 循环写乘法表 循环累加计算
- 图神经网络进行视频字幕的动作知识
- BLE蓝牙单片机CC2540、CC2541带OSAL操作系统的例程-蓝牙从机广播功率配置修改.zip
- BLE蓝牙单片机CC2540、CC2541带OSAL操作系统的例程-看门狗WachDog例程.zip
- chapter7-Pandas数据分析实战.zip
- Python电影票售票系统
- BLE蓝牙单片机CC2540、CC2541带OSAL操作系统的例程-封装蓝牙模块AT指令.zip
- BLE蓝牙单片机CC2540、CC2541带OSAL操作系统的例程-对蓝牙传输的数据进行加密、解密传输.zip
- BLE蓝牙单片机CC2540、CC2541带OSAL操作系统的例程-串口双工收发.zip
- BLE蓝牙单片机CC2540、CC2541带OSAL操作系统的例程-按键操作.zip
资源上传下载、课程学习等过程中有任何疑问或建议,欢迎提出宝贵意见哦~我们会及时处理!
点击此处反馈
安全验证
文档复制为VIP权益,开通VIP直接复制
信息提交成功