package com.wjm.app.func;
import com.alibaba.druid.pool.DruidDataSource;
import com.alibaba.druid.pool.DruidPooledConnection;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONArray;
import com.alibaba.fastjson.JSONObject;
import com.wjm.bean.TableProcess;
import com.wjm.common.GmallConfig;
import com.wjm.util.DruidPhoenixDSUtil;
import com.wjm.util.PhoenixUtil;
import lombok.val;
import org.apache.flink.api.common.state.BroadcastState;
import org.apache.flink.api.common.state.MapStateDescriptor;
import org.apache.flink.api.common.state.ReadOnlyBroadcastState;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.functions.co.BroadcastProcessFunction;
import org.apache.flink.util.Collector;
import java.sql.SQLException;
import java.util.*;
/**
* @author 魏锦淼
* @create 2022/9/6
**/
public class MyBroadcastFunction extends BroadcastProcessFunction<JSONObject, String, JSONObject> {
private MapStateDescriptor<String, TableProcess> mapStateDescriptor;
private DruidDataSource druidDataSource;
public MyBroadcastFunction(MapStateDescriptor<String, TableProcess> mapStateDescriptor) {
this.mapStateDescriptor = mapStateDescriptor;
}
@Override
public void open(Configuration parameters) throws Exception {
druidDataSource = DruidPhoenixDSUtil.getDataSource();
}
@Override
public void processBroadcastElement(String value, Context ctx, Collector<JSONObject> out) throws Exception {
BroadcastState<String, TableProcess> broadcastState = ctx.getBroadcastState(mapStateDescriptor);
try {
// TODO 1 解析配置流数据
// 如果配置表删除数据
JSONObject jsonObject = JSON.parseObject(value);
String op = jsonObject.getString("op");
if ("d".equals(op)) {
// 只需要删除对应状态 不再往hbase中写数据 不删除hbase数据
JSONObject before = jsonObject.getJSONObject("before");
String sourceTable = before.getString("source_table");
broadcastState.remove(sourceTable);
} else {
// TODO 2 检查并创建表格
String after = jsonObject.getString("after");
TableProcess tableProcess = JSON.parseObject(after, TableProcess.class);
String sinkTable = tableProcess.getSinkTable();
String sinkColumns = tableProcess.getSinkColumns();
String sinkPk = tableProcess.getSinkPk();
String sinkExtend = tableProcess.getSinkExtend();
checkTable(sinkTable, sinkColumns, sinkPk, sinkExtend);
// TODO 3 将内容写入到状态
broadcastState.put(tableProcess.getSourceTable(), tableProcess);
}
} catch (Exception e) {
e.printStackTrace();
}
}
// 检查表格是否存在并创建表格
private void checkTable(String sinkTable, String sinkColumns, String sinkPk, String sinkExtend) {
if (sinkPk == null) {
sinkPk = "id";
}
if (sinkExtend == null){
sinkExtend = "";
}
String[] cols = sinkColumns.split(",");
// create table if not exists t (colName type...)
DruidPooledConnection connection = null;
try {
connection = druidDataSource.getConnection();
StringBuilder createSql = new StringBuilder();
createSql.append("create table if not exists ")
.append(GmallConfig.HBASE_SCHEMA)
.append(".")
.append(sinkTable)
.append(" ( ");
for (int i = 0; i < cols.length; i++) {
if (cols[i].equals(sinkPk)){
createSql.append(cols[i])
.append(" varchar not null primary key");
}else {
createSql.append(cols[i])
.append(" varchar ");
}
if (i < cols.length - 1){
createSql.append(",");
}
}
createSql.append(")")
.append(sinkExtend);
System.out.println(createSql.toString());
PhoenixUtil.sqlExecute(connection, createSql.toString());
} catch (SQLException throwables) {
throwables.printStackTrace();
System.out.println("获取连接错误");
} finally {
try {
if (connection != null) {
connection.close();
}
} catch (SQLException throwables) {
throwables.printStackTrace();
}
}
}
@Override
public void processElement(JSONObject value, ReadOnlyContext ctx, Collector<JSONObject> out) throws Exception {
// TODO 1 读取对应表格的配置状态
ReadOnlyBroadcastState<String, TableProcess> broadcastState = ctx.getBroadcastState(mapStateDescriptor);
String tableName = value.getString("table");
// 添加下游数据给type类型和sinkTable字段用来在维度数据修改的时候删除redis中的旁路缓存
String type = value.getString("type");
TableProcess tableProcess = broadcastState.get(tableName);
if (tableProcess != null){
// 不为空则是维度表
// TODO 2 根据状态中的输出列过滤字段
JSONObject data = value.getJSONObject("data");
String sinkColumns = tableProcess.getSinkColumns();
filterColumns(sinkColumns,data);
// TODO 3 添加sinkTable写出
data.put("sink_table",tableProcess.getSinkTable());
data.put("type",type);
out.collect(data);
}
}
/**
*
* @param sinkColumns id,activity_name,activity_type,activity_desc,start_time,end_time,create_time
* @param data
* "id": 1566313645614206994,
* "user_id": 100,
* "nick_name": null,
* "head_img": null,
* "sku_id": 10,
* "spu_id": 3,
* "order_id": 27003,
* "appraise": "1204",
* "comment_txt": "评论内容:83249288619696249353591922127276181426328491334656",
* "create_time": "2022-02-21 14:34:08",
* "operate_time": null
*/
private void filterColumns(String sinkColumns, JSONObject data) {
List<String> cols = Arrays.asList(sinkColumns.split(","));
Set<Map.Entry<String, Object>> entries = data.entrySet();
// entries.removeIf(next -> !cols.contains(next.getKey()));
Iterator<Map.Entry<String, Object>> iterator = entries.iterator();
while (iterator.hasNext()){
Map.Entry<String, Object> next = iterator.next();
if (!cols.contains(next.getKey())){
iterator.remove();
}
}
}
}
评论0