<!--
Licensed to the Apache Software Foundation (ASF) under one
or more contributor license agreements. See the NOTICE file
distributed with this work for additional information
regarding copyright ownership. The ASF licenses this file
to you under the Apache License, Version 2.0 (the
"License"); you may not use this file except in compliance
with the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing,
software distributed under the License is distributed on an
"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
KIND, either express or implied. See the License for the
specific language governing permissions and limitations
under the License.
-->
<p align="center">
<a href="https://www.fireframework.cn/">
<img src="./img/fire-framework-logo.jpeg" width="600" height="600">
</a>
<p align="center">
<a href='https://gitee.com/fire-framework/fire/stargazers'><img src='https://gitee.com/fire-framework/fire/badge/star.svg?theme=gray' alt='star'></img></a>
<a href='https://gitee.com/fire-framework/fire/members'><img src='https://gitee.com/fire-framework/fire/badge/fork.svg?theme=gray' alt='fork'></img></a>
</p>
</p>
# Fire框架
Fire框架是由**中通大数据**自主研发并开源的、专门用于进行**Spark**和**Flink**任务开发的大数据框架。该框架屏蔽技术细节,提供大量简易API帮助开发者更快的构建实时计算任务。同时Fire框架也内置了平台化的功能,用于与实时平台集成。基于Fire框架的任务在中通每天处理的数据量高达**几千亿以上**,覆盖了**Spark计算**(离线&实时)、**Flink计算**等众多计算场景。
## 一、就这么简单!
### 1.1 Flink开发示例
```scala
@Config(
"""
|state.checkpoints.num-retained=30 # 支持任意Flink调优参数、Fire框架参数、用户自定义参数等
|state.checkpoints.dir=hdfs:///user/flink/checkpoint
|""")
@Hive("thrift://localhost:9083") // 配置连接到指定的hive
@Streaming(interval = 100, unaligned = true) // 100s做一次checkpoint,开启非对齐checkpoint
@Kafka(brokers = "localhost:9092", topics = "fire", groupId = "fire")
object FlinkDemo extends FlinkStreaming {
@Process
def kafkaSource: Unit = {
val dstream = this.fire.createKafkaDirectStream() // 使用api的方式消费kafka
sql("""create table statement ...""")
sql("""insert into statement ...""")
}
}
```
### 1.2 Spark开发示例
```scala
@Config(
"""
|spark.shuffle.compress=true # 支持任意Spark调优参数、Fire框架参数、用户自定义参数等
|spark.ui.enabled=true
|""")
@Hive("thrift://localhost:9083") // 配置连接到指定的hive
@Streaming(interval = 100, maxRatePerPartition = 100) // 100s一个Streaming batch,并限制消费速率
@Kafka(brokers = "localhost:9092", topics = "fire", groupId = "fire")
object SparkDemo extends SparkStreaming {
@Process
def kafkaSource: Unit = {
val dstream = this.fire.createKafkaDirectStream() // 使用api的方式消费kafka
sql("""select * from xxx""").show()
}
}
```
***说明:structured streaming、spark core、flink sql、flink批任务均支持,代码结构与上述示例一致。***
## *[二、开发文档](./docs/index.md)*
## 三、亮点多多!
### 3.1 兼容主流版本
fire框架适配了不同的spark与flink版本,支持spark2.x及以上所有版本,flink1.10及以上所有版本,支持基于scala2.11或scala2.12进行编译。
```shell
# 可根据实际需要选择不同的引擎版本进行fire框架的构建
mvn clean install -DskipTests -Pspark-3.0 -Pflink-1.14 -Pscala-2.12
```
| Apache Spark | Apache Flink |
| ------------ | ------------ |
| 2.3.x | 1.12.x |
| 2.4.x | 1.13.x |
| 3.0.x | 1.14.x |
| 3.1.x | 1.15.x |
| 3.2.x | 1.16.x |
| 3.3.x | |
### **3.2 简单好用**
Fire框架高度封装,屏蔽大量技术细节,许多connector仅需一行代码即可完成主要功能。同时Fire框架统一了spark与flink两大引擎常用的api,使用统一的代码风格即可实现spark与flink的代码开发。
- **HBase API**
```scala
// 读取HBase中指定rowkey数据并将结果集封装为DataFrame返回
val studentDF: DataFrame = this.fire.hbaseGetDF(hTableName, classOf[Student], getRDD)
// 将指定数据集分布式插入到指定HBase表中
this.fire.hbasePutDF(hTableName, studentDF, classOf[Student])
```
- **JDBC API**
1. **通过注解配置数据源:**
```java
@Jdbc(url = "jdbc:mysql://mysql-server:3306/fire", username = "root", password = "root")
```
2. **Spark示例:**
```scala
// 将DataFrame中指定几列插入到关系型数据库中,每100条一插入
df.jdbcBatchUpdate(insertSql, Seq("name", "age", "createTime", "length", "sex"), batch = 100)
// 将查询结果通过反射映射到DataFrame中
val df: DataFrame = this.fire.jdbcQueryDF(querySql, Seq(1, 2, 3), classOf[Student])
```
3. **Flink示例:**
```scala
val dstream = this.fire.createKafkaDirectStream().map(t => JSONUtils.parseObject[Student](t))
val sql =
s"""
|insert into spark_test(name, age, createTime) values(?, ?, ?)
|ON DUPLICATE KEY UPDATE age=18
|""".stripMargin
// sinkJdbc只需指定sql语句即可,fire会自动推断sql中占位符与JavaBean中成员变量的对应关系
dstream.sinkJdbc(sql)
dstream.sinkJdbcExactlyOnce(sql, keyNum = 2)
```
### **3.3 灵活的配置方式**
支持基于接口、apollo、配置文件以及注解等多种方式配置,支持将spark&flink等**引擎参数**、**fire框架参数**以及**用户自定义参数**混合配置,支持运行时动态修改配置。几种常用配置方式如下([配置手册](./docs/config.md)):
1. **基于配置文件:** 创建类名同名的properties文件进行参数配置
2. **基于接口配置:** fire框架提供了配置接口调用,通过接口获取所需的配置,可用于平台化的配置管理
3. **基于注解配置:** 通过注解的方式实现集群环境、connector、调优参数的配置,常用注解如下:
```scala
@Config(
"""
|# 支持Flink调优参数、Fire框架参数、用户自定义参数等
|state.checkpoints.num-retained=30
|state.checkpoints.dir=hdfs:///user/flink/checkpoint
|""")
@Hive("thrift://localhost:9083")
@Checkpoint(interval = 100, unaligned = true)
@Kafka(brokers = "localhost:9092", topics = "fire", groupId = "fire")
@RocketMQ(brokers = "bigdata_test", topics = "fire", groupId = "fire", tag = "*", startingOffset = "latest")
@Jdbc(url = "jdbc:mysql://mysql-server:3306/fire", username = "root", password = "..root726")
@HBase("localhost:2181")
```
**配置获取:**
Fire框架封装了统一的配置获取api,基于该api,无论是spark还是flink,无论是在Driver | JobManager端还是在Executor | TaskManager端,都可以一行代码获取所需配置。这套配置获取api,无需再在flink的map等算子中复写open方法了,用起来十分方便。
```scala
this.conf.getString("my.conf")
this.conf.getInt("state.checkpoints.num-retained")
...
```
### **3.4 多集群支持**
Fire框架的配置支持N多集群,比如同一个任务中可以同时配置多个HBase、Kafka数据源,使用不同的数值后缀即可区分(**keyNum**):
```scala
// 假设基于注解配置HBase多集群如下:
@HBase("localhost:2181")
@HBase2(cluster = "192.168.0.1:2181", storageLevel = "DISK_ONLY")
// 代码中使用对应的数值后缀进行区分
this.fire.hbasePutDF(hTableName, studentDF, classOf[Student]) // 默认keyNum=1,表示使用@HBase注解配置的集群信息
this.fire.hbasePutDF(hTableName2, studentDF, classOf[Student], keyNum=2) // keyNum=2,表示使用@HBase2注解配置的集群信息
```
### **3.5 常用connector支持**
支持kafka、rocke
没有合适的资源?快使用搜索试试~ 我知道了~
资源推荐
资源详情
资源评论
收起资源包目录
fire-master.zip (983个子文件)
CNAME 20B
CNAME 20B
org.apache.flink.table.factories.Factory 981B
org.apache.flink.table.factories.Factory 850B
org.apache.flink.table.factories.Factory 69B
org.apache.flink.table.factories.Factory 69B
org.apache.flink.table.factories.Factory 69B
.gitignore 382B
.gitignore 370B
.gitignore 370B
.gitignore 124B
index.html 764B
RocksDB.java 207KB
RocksDB.java 207KB
RocksDB.java 207KB
StreamExecutionEnvironment.java 110KB
RocksDB.java 108KB
StreamExecutionEnvironment.java 108KB
CheckpointCoordinator.java 99KB
CheckpointCoordinator.java 98KB
CheckpointCoordinator.java 92KB
CheckpointCoordinator.java 92KB
TableEnvironmentImpl.java 91KB
CheckpointCoordinator.java 90KB
TableEnvironmentImpl.java 87KB
TableEnvironmentImpl.java 84KB
TableEnvironmentImpl.java 79KB
FilterProtos.java 74KB
TableEnvironmentImpl.java 69KB
FlinkKafkaConsumerBase.java 58KB
FlinkKafkaConsumerBase.java 58KB
FlinkKafkaConsumerBase.java 58KB
RocksDBStateBackend.java 49KB
EmbeddedRocksDBStateBackend.java 46KB
EmbeddedRocksDBStateBackend.java 45KB
EmbeddedRocksDBStateBackend.java 43KB
EmbeddedRocksDBStateBackend.java 43KB
RocketMQSourceFunction.java 30KB
ExceptionUtils.java 29KB
ExceptionUtils.java 28KB
ExceptionUtils.java 26KB
ExceptionUtils.java 26KB
ExceptionUtils.java 26KB
RowDeserializationSchema.java 24KB
ClickHouseCatalog.java 23KB
ClickHouseCatalog.java 23KB
ClickHouseCatalog.java 23KB
ClickHouseCatalog.java 23KB
EnvironmentInformation.java 22KB
EnvironmentInformation.java 22KB
EnvironmentInformation.java 22KB
EnvironmentInformation.java 21KB
EnvironmentInformation.java 21KB
ClickHouseCatalog.java 20KB
RocketMQCatalog.java 20KB
RetractableTopNFunction.java 19KB
RetractableTopNFunction.java 19KB
ApplicationDispatcherBootstrap.java 19KB
FlinkKafkaConsumer.java 18KB
RocketMQPartitionSplitReader.java 17KB
RocketMQSourceEnumerator.java 17KB
FlinkKafkaConsumer.java 17KB
FlinkKafkaConsumer.java 17KB
ApplicationDispatcherBootstrap.java 16KB
ReflectionUtils.java 16KB
RocksDBFullRestoreOperation.java 16KB
RowKeyValueDeserializationSchema.java 16KB
RocketMQSourceWithTag.java 16KB
RocketMQSourceWithTag.java 16KB
RocketMQSourceWithTag.java 16KB
RocketMQSourceWithTag.java 16KB
RocketMQSourceWithTag.java 16KB
GlobalConfiguration.java 16KB
GlobalConfiguration.java 15KB
GlobalConfiguration.java 15KB
RocketMQSource.java 15KB
GlobalConfiguration.java 15KB
ApplicationDispatcherBootstrap.java 14KB
ApplicationDispatcherBootstrap.java 14KB
GlobalConfiguration.java 14KB
SchedulerManager.java 13KB
ConnectionFactory.java 13KB
SqlCreateTable.java 12KB
ParameterTool.java 12KB
AbstractClickHouseInputFormat.java 11KB
AbstractClickHouseInputFormat.java 11KB
AbstractClickHouseInputFormat.java 11KB
AbstractClickHouseInputFormat.java 11KB
RocketMQDynamicTableSink.java 11KB
ClickHouseRowConverter.java 10KB
ClickHouseRowConverter.java 10KB
ClickHouseRowConverter.java 10KB
ClickHouseRowConverter.java 10KB
RocketMQDynamicTableSourceFactory.java 10KB
RocketMQScanTableSource.java 10KB
ClickHouseRowConverter.java 10KB
SparkSQLPushDownFilter.java 10KB
FilterPushDownHelper.java 9KB
FilterPushDownHelper.java 9KB
FilterPushDownHelper.java 9KB
共 983 条
- 1
- 2
- 3
- 4
- 5
- 6
- 10
资源评论
m0_72731342
- 粉丝: 2
- 资源: 1832
上传资源 快速赚钱
- 我的内容管理 展开
- 我的资源 快来上传第一个资源
- 我的收益 登录查看自己的收益
- 我的积分 登录查看自己的积分
- 我的C币 登录后查看C币余额
- 我的收藏
- 我的下载
- 下载帮助
安全验证
文档复制为VIP权益,开通VIP直接复制
信息提交成功