package com.atguigu.iceberg.warhouse.service;
import com.alibaba.fastjson.JSONObject;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.data.GenericRowData;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.data.StringData;
import org.apache.flink.table.data.TimestampData;
import org.apache.iceberg.flink.TableLoader;
import org.apache.iceberg.flink.sink.FlinkSink;
import java.time.LocalDate;
import java.time.LocalDateTime;
import java.time.format.DateTimeFormatter;
public class DwdIcebergSerivce {
public void readOdsData(StreamExecutionEnvironment env) {
DataStream<String> baseadDS = env.readTextFile("hdfs://mycluster/ods/baseadlog.log");
DataStream<RowData> baseadInput = baseadDS.map(item -> {
JSONObject jsonObject = JSONObject.parseObject(item);
GenericRowData rowData = new GenericRowData(3);
rowData.setField(0, jsonObject.getIntValue("adid"));
rowData.setField(1, StringData.fromString(jsonObject.getString("adname")));
rowData.setField(2, StringData.fromString(jsonObject.getString("dn")));
return rowData;
});
TableLoader dwdbaseadTable = TableLoader.fromHadoopTable("hdfs://mycluster/flink/warehouse/iceberg/dwd_base_ad");
FlinkSink.forRowData(baseadInput).tableLoader(dwdbaseadTable).overwrite(true).build();
DataStream<String> basewebsiteDS = env.readTextFile("hdfs://mycluster/ods/baswewebsite.log");
DataStream<RowData> basewebsiteInput = basewebsiteDS.map(item -> {
JSONObject jsonObject = JSONObject.parseObject(item);
GenericRowData rowData = new GenericRowData(7);
rowData.setField(0, jsonObject.getIntValue("siteid"));
rowData.setField(1, StringData.fromString(jsonObject.getString("sitename")));
rowData.setField(2, StringData.fromString(jsonObject.getString("siteurl")));
rowData.setField(3, jsonObject.getIntValue("delete"));
LocalDateTime localDateTime = LocalDate.parse(jsonObject.getString("createtime"), DateTimeFormatter.ofPattern("yyyy-MM-dd"))
.atStartOfDay();
rowData.setField(4, TimestampData.fromLocalDateTime(localDateTime));
rowData.setField(5, StringData.fromString(jsonObject.getString("creator")));
rowData.setField(6, StringData.fromString(jsonObject.getString("dn")));
return rowData;
});
TableLoader dwdbasewebsiteTable = TableLoader.fromHadoopTable("hdfs://mycluster/flink/warehouse/iceberg/dwd_base_website");
FlinkSink.forRowData(basewebsiteInput).tableLoader(dwdbasewebsiteTable).overwrite(true).build();
DataStream<String> memberDS = env.readTextFile("hdfs://mycluster/ods/member.log");
DataStream<RowData> memberInput = memberDS.map(item -> {
JSONObject jsonObject = JSONObject.parseObject(item);
GenericRowData rowData = new GenericRowData(19);
rowData.setField(0, jsonObject.getIntValue("uid"));
rowData.setField(1, jsonObject.getIntValue("ad_id"));
rowData.setField(2, StringData.fromString(jsonObject.getString("birthday")));
rowData.setField(3, StringData.fromString(jsonObject.getString("email")));
rowData.setField(4, StringData.fromString(jsonObject.getString("fullname")));
rowData.setField(5, StringData.fromString(jsonObject.getString("iconurl")));
rowData.setField(6, StringData.fromString(jsonObject.getString("lastlogin")));
rowData.setField(7, StringData.fromString(jsonObject.getString("mailaddr")));
rowData.setField(8, StringData.fromString(jsonObject.getString("memberlevel")));
rowData.setField(9, StringData.fromString(jsonObject.getString("password")));
rowData.setField(10, StringData.fromString(jsonObject.getString("paymoney")));
rowData.setField(11, StringData.fromString(jsonObject.getString("phone")));
rowData.setField(12, StringData.fromString(jsonObject.getString("qq")));
rowData.setField(13, StringData.fromString(jsonObject.getString("register")));
rowData.setField(14, StringData.fromString(jsonObject.getString("regupdatetime")));
rowData.setField(15, StringData.fromString(jsonObject.getString("unitname")));
rowData.setField(16, StringData.fromString(jsonObject.getString("userip")));
rowData.setField(17, StringData.fromString(jsonObject.getString("zipcode")));
rowData.setField(18, StringData.fromString(jsonObject.getString("dt")));
return rowData;
});
TableLoader dwdmemberTable = TableLoader.fromHadoopTable("hdfs://mycluster/flink/warehouse/iceberg/dwd_member");
FlinkSink.forRowData(memberInput).tableLoader(dwdmemberTable).overwrite(true).build();
DataStream<String> memberregtypDS = env.readTextFile("hdfs://mycluster/ods/memberRegtype.log");
DataStream<RowData> memberregtypInput = memberregtypDS.map(item -> {
JSONObject jsonObject = JSONObject.parseObject(item);
GenericRowData rowData = new GenericRowData(10);
rowData.setField(0, jsonObject.getIntValue("uid"));
rowData.setField(1, StringData.fromString(jsonObject.getString("appkey")));
rowData.setField(2, StringData.fromString(jsonObject.getString("appregurl")));
rowData.setField(3, StringData.fromString(jsonObject.getString("bdp_uuid")));
LocalDateTime localDateTime = LocalDate.parse(jsonObject.getString("createtime"), DateTimeFormatter.ofPattern("yyyy-MM-dd"))
.atStartOfDay();
rowData.setField(4, TimestampData.fromLocalDateTime(localDateTime));
rowData.setField(5, StringData.fromString(jsonObject.getString("isranreg")));
rowData.setField(6, StringData.fromString(jsonObject.getString("regsource")));
rowData.setField(7, StringData.fromString(jsonObject.getString("regsource")));
rowData.setField(8, jsonObject.getIntValue("websiteid"));
rowData.setField(9, StringData.fromString(jsonObject.getString("dt")));
return rowData;
});
TableLoader dwdmemberregtypeTable = TableLoader.fromHadoopTable("hdfs://mycluster/flink/warehouse/iceberg/dwd_member_regtype");
FlinkSink.forRowData(memberregtypInput).tableLoader(dwdmemberregtypeTable).overwrite(true).build();
DataStream<String> memviplevelDS = env.readTextFile("hdfs://mycluster/ods/pcenterMemViplevel.log");
DataStream<RowData> memviplevelInput = memviplevelDS.map(item -> {
JSONObject jsonObject = JSONObject.parseObject(item);
GenericRowData rowData = new GenericRowData(10);
rowData.setField(0, jsonObject.getIntValue("vip_id"));
rowData.setField(1, StringData.fromString(jsonObject.getString("vip_level")));
LocalDateTime start_timeDate = LocalDate.parse(jsonObject.getString("start_time"), DateTimeFormatter.ofPattern("yyyy-MM-dd"))
.atStartOfDay();
LocalDateTime end_timeDate = LocalDate.parse(jsonObject.getString("end_time"), DateTimeFormatter.ofPattern("yyyy-MM-dd"))
.atStartOfDay();
LocalDateTime last_modify_timeDate = LocalDate.parse(jsonObject.getString("last_modify_time"), DateTimeFormatter.ofPattern("yyyy-MM-dd"))
.atStartOfDay();
rowData.setField(2, TimestampData.fromLocalDateTime(start_timeDate));
rowData.setField(3, TimestampData.fromLocalDateTime(end_timeDate));
rowData.setField(4, TimestampData.fromLocalDateTime(last_modify_timeDate));
没有合适的资源?快使用搜索试试~ 我知道了~
资源推荐
资源详情
资源评论
收起资源包目录
iceberg-demo.zip (49个子文件)
iceberg-demo
.DS_Store 6KB
pom.xml 6KB
src
test
java
main
resources
java
iceberg-demo.iml 81B
iceberg-spark-demo
pom.xml 4KB
src
test
java
main
resources
core-site.xml 1KB
hdfs-site.xml 3KB
hive-site.xml 2KB
log4j.properties 606B
yarn-site.xml 5KB
java
scala
com
atguigu
iceberg
spark
sql
TableOperations.scala 4KB
structuredstreaming
TestTopicOperators.scala 2KB
TestProducer.java 1KB
warehouse
controller
DwsIcebergController.scala 1012B
DwdIcebergController.scala 995B
AdsIcebergController.scala 1014B
dao
DwDIcebergDao.scala 2KB
DwsIcebergDao.scala 494B
service
AdsIcebergService.scala 2KB
DwsIcebergService.scala 6KB
DwdIcebergService.scala 4KB
util
bean
Models.scala 5KB
.idea
jarRepositories.xml 845B
codeStyles
codeStyleConfig.xml 149B
Project.xml 263B
workspace.xml 3KB
misc.xml 469B
scala_compiler.xml 212B
compiler.xml 622B
.gitignore 176B
iceberg-flink-demo
.DS_Store 6KB
pom.xml 6KB
src
.DS_Store 6KB
test
java
main
.DS_Store 6KB
resources
core-site.xml 1KB
hdfs-site.xml 3KB
hive-site.xml 2KB
log4j.properties 606B
yarn-site.xml 5KB
java
com
atguigu
iceberg
flink
sql
TableOperations.java 2KB
streaming
TestTopic.java 3KB
warhouse
controller
AdsIcebergController.java 722B
DwdIcebergController.java 583B
DwsIcebergController.java 761B
dao
DwsIcbergDao.java 1KB
DwdIcebergDao.java 4KB
service
DwsIcebergService.java 4KB
AdsIcebergService.java 3KB
DwdIcebergSerivce.java 10KB
bean
QueryResult.java 2KB
共 49 条
- 1
资源评论
数据与后端架构提升之路
- 粉丝: 1w+
- 资源: 42
上传资源 快速赚钱
- 我的内容管理 展开
- 我的资源 快来上传第一个资源
- 我的收益 登录查看自己的收益
- 我的积分 登录查看自己的积分
- 我的C币 登录后查看C币余额
- 我的收藏
- 我的下载
- 下载帮助
安全验证
文档复制为VIP权益,开通VIP直接复制
信息提交成功