package cn.chongho.inf.flink.service.impl;
import cn.chongho.inf.flink.constants.Constant;
import cn.chongho.inf.flink.constants.DbColunmDataType;
import cn.chongho.inf.flink.model.connector.HoloConfig;
import cn.chongho.inf.flink.model.connector.JdbcConfig;
import cn.chongho.inf.flink.model.connector.MysqlCdcConfig;
import cn.chongho.inf.flink.service.DbSourceService;
import cn.chongho.inf.flink.service.DbTableColumnService;
import cn.chongho.inf.flink.service.DbTableService;
import cn.chongho.inf.flink.utils.DesUtils;
import cn.chongho.inf.flink.utils.StringUtils;
import com.alibaba.fastjson.JSON;
import cn.chongho.inf.flink.model.*;
import cn.chongho.inf.flink.model.connector.ElasticSearchConfig;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Service;
import javax.annotation.Resource;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
/**
* @author ming
*/
@Service
@Slf4j
public class CdcSqlServiceImpl {
@Value("${sys.des.secretkey}")
private String desSecretkey;
@Value("${sys.des.offset}")
private String desOffset;
@Resource
private DbTableColumnService dbTableColumnService;
@Autowired
private DbSourceService dbSourceService;
@Autowired
private DbTableService dbTableService;
private ThreadLocal<Map<String, String>> tableNameThreadLocal = new ThreadLocal<>();
/**
* 构造源flinksql
* @param cdcJob
* @param sourceInfo
* @return
*/
public String buildSourceSql(CdcJob cdcJob, SourceInfo sourceInfo){
DbSource db = dbSourceService.getDbSourceById(sourceInfo.getDbId());
DbTable table = dbTableService.getDbTableById(sourceInfo.getTableId());
String tableName = createTableName(cdcJob.getId() , db.getId() ,db.getName(), table.getTableName());
Map<String, String> tableNameMap = tableNameThreadLocal.get();
if(tableNameMap == null){
tableNameMap = new HashMap<>();
}
tableNameMap.put(createTableNameKey(db.getId() ,db.getName(), table.getTableName()) ,tableName);
tableNameThreadLocal.set(tableNameMap);
List<String> sourceColumnNameList = null;
Integer jobType = cdcJob.getJobType();
StringBuilder sourcePrimaryColumn = null;
//这里如果是单表同步,指定了需要同步的列,只查出需要同步的列
if(jobType.equals(Constant.CdcJobType.POINT_TO_POINT_SQL.getValue())){
List<ColumnAssociation> columnAssociationList = JSON.parseArray(cdcJob.getColumnAssociation() ,ColumnAssociation.class);
sourceColumnNameList = columnAssociationList.stream().map(ColumnAssociation::getSourceColumnName).collect(Collectors.toList());
//单表同步的主键列为目标表指定的主键列对应的列
Map<String, String> colMap = columnAssociationList.stream()
.collect(Collectors.toMap(ColumnAssociation::getTargetColumnName, ColumnAssociation::getSourceColumnName));
sourcePrimaryColumn = new StringBuilder();
for(String key : cdcJob.getPrimaryColumn().split(",")){
sourcePrimaryColumn.append(colMap.get(key));
sourcePrimaryColumn.append(",");
}
sourcePrimaryColumn.deleteCharAt(sourcePrimaryColumn.length() - 1);
}else if(jobType.equals(Constant.CdcJobType.CONFLUENCE_SQL.getValue())){
sourcePrimaryColumn = new StringBuilder(sourceInfo.getTablePriKey());
}
//列信息
List<DbTableColumn> columnList = dbTableColumnService.getDbTableColumnByNames(table.getId() , sourceColumnNameList);
StringBuilder sqlBuilder = new StringBuilder();
String tableSql = createTableSql(db.getProviderType(), tableName, columnList, sourcePrimaryColumn.toString());
sqlBuilder.append(tableSql).append(" WITH ");
db.getProviderType().toUpperCase();
Constant.DbType dbType = Enum.valueOf(Constant.DbType.class, db.getProviderType().toUpperCase());
String dbUsernameAfterDecrypt = DesUtils.decrypt(desSecretkey, desOffset, db.getUsername());
String dbPasswordAfterDecrypt = DesUtils.decrypt(desSecretkey, desOffset, db.getPassword());
switch (dbType){
case MYSQL:
MysqlCdcConfig cdcConfig = MysqlCdcConfig.createConfigByUrl(db.getUrl(), dbUsernameAfterDecrypt, dbPasswordAfterDecrypt);
cdcConfig.setDatabaseName(db.getName());
cdcConfig.setTableName(table.getTableName());
cdcConfig.setScanStartupMode(cdcJob.getScanStartupMode());
cdcConfig.setDebeziumSkippedOperations(cdcJob.getSkippedOperations());
sqlBuilder.append(cdcConfig.doConfigToSql());
break;
case ELASTICSEARCH:
break;
default:
log.warn("unsupported db type:{}", dbType);
}
return sqlBuilder.toString();
}
/**
* 构造目标flinksql
* @param cdcJob
* @return
*/
public String buildTargetSql(CdcJob cdcJob){
DbSource db = dbSourceService.getDbSourceById(cdcJob.getTargetDbId());
DbTable table = dbTableService.getDbTableById(cdcJob.getTargetTableId());
String tableName = createTableName(cdcJob.getId(), db.getId() ,db.getName(), table.getTableName());
Map<String, String> tableNameMap = tableNameThreadLocal.get();
tableNameMap.put(createTableNameKey(db.getId() ,db.getName(), table.getTableName()) ,tableName);
tableNameThreadLocal.set(tableNameMap);
List<String> targetColumnNameList = null;
Integer jobType = cdcJob.getJobType();
//这里如果是单表同步,指定了需要同步的列,只查出需要同步的列
if(jobType.equals(Constant.CdcJobType.POINT_TO_POINT_SQL.getValue())){
List<ColumnAssociation> columnAssociationList = JSON.parseArray(cdcJob.getColumnAssociation() ,ColumnAssociation.class);
targetColumnNameList = columnAssociationList.stream().map(ColumnAssociation::getTargetColumnName).collect(Collectors.toList());
}
//列信息
List<DbTableColumn> columnList = dbTableColumnService.getDbTableColumnByNames(table.getId() , targetColumnNameList);
String tableSql = createTableSql(db.getProviderType(), tableName, columnList, cdcJob.getPrimaryColumn());
StringBuilder sqlBuilder = new StringBuilder();
sqlBuilder.append(tableSql).append(" WITH ");
Constant.DbType dbType = Enum.valueOf(Constant.DbType.class, db.getProviderType().toUpperCase());
String dbUsernameAfterDecrypt = "";
String dbPasswordAfterDecrypt = "";
if (!StringUtils.isEmpty(db.getUsername())) {
dbUsernameAfterDecrypt = DesUtils.decrypt(desSecretkey, desOffset, db.getUsername());
dbPasswordAfterDecrypt = DesUtils.decrypt(desSecretkey, desOffset, db.getPassword());
}
switch (dbType){
case MYSQL:
JdbcConfig jdbcConfig = new JdbcConfig(db.getUrl(), dbUsernameAfterDecrypt, dbPasswordAfterDecrypt);
jdbcConfig.setTableName(table.getTableName());
sqlBuilder.append(jdbcConfig.doConfigToSql());
break;
case ELASTICSEARCH:
ElasticSearchConfig elasticSearchConfig = new ElasticSearchConfig(db.getUrl(), dbUsernameAfterDecrypt, dbPasswordAfterDecrypt);
elasticSearchConfig.setIndex(table.getTableName());
sqlBuilder.append(elasticSearchConfig.doConfigToSql());
break;
case HOLO:
HoloConfig config = new HoloConfig(null, null, null, dbUsernameAfterDecrypt, dbPasswordAfterDecrypt);
sqlBuilder.ap
没有合适的资源?快使用搜索试试~ 我知道了~
温馨提示
基于Flink Rest API 二开的Web 控制台,支持作业管理、一键Savepoint及恢复,可视化实时数据同步功能(Binlog),多集群管理,开箱即用。基于 flink rest api 的 flink web admin,支持任务持久化管理,JOB编辑、提交、启动、停止。支持多集群发布任务,任务根据场景选择不同的集群。支持一键Savepoint及指定Savepoint恢复任务。支持数据源管理,加密保存数据源配置信息。支持一键同步表结构。支持可视化配置单表,多表,全量及增量数据库同步任务(FlinkCDC实现)。支持 mysql to mysql,es.
资源推荐
资源详情
资源评论
收起资源包目录
基于Flink Rest API 二开的Web 控制台,支持作业管理、一键Savepoint及恢复,可视化实时数据同步功能 (330个子文件)
ace.min.css 388KB
ace-part2.min.css 126KB
ace-rtl.min.css 114KB
bootstrap.min.css 113KB
ace-skins.min.css 76KB
font-awesome.min.css 27KB
font-awesome.min.css 27KB
bootstrap-datepicker3.min.css 20KB
bootstrap-editable.min.css 17KB
jquery-ui.min.css 15KB
select2.min.css 14KB
ui.jqgrid.min.css 13KB
fullcalendar.min.css 12KB
chosen.min.css 11KB
dropzone.min.css 9KB
ace-ie.min.css 9KB
bootstrap-datetimepicker.min.css 8KB
daterangepicker.min.css 5KB
bootstrap-colorpicker.min.css 4KB
jquery-ui.custom.min.css 3KB
bootstrap-timepicker.min.css 3KB
colorbox.min.css 3KB
jquery.gritter.min.css 2KB
bootstrap-duallistbox.min.css 1KB
bootstrap-multiselect.min.css 1KB
prettify.min.css 532B
fonts.googleapis.com.css 494B
fontawesome-webfont.eot 69KB
glyphicons-halflings-regular.eot 20KB
index.ftl 25KB
index.ftl 24KB
edit.ftl 23KB
index.ftl 22KB
menu.ftl 17KB
index.ftl 15KB
jars-list.ftl 14KB
index.ftl 12KB
index.ftl 12KB
addTable.ftl 11KB
index.ftl 10KB
index.ftl 9KB
edit.ftl 9KB
menu.ftl 7KB
login.ftl 7KB
edit.ftl 7KB
index.ftl 6KB
edit.ftl 6KB
password.ftl 5KB
role.ftl 5KB
edit.ftl 4KB
edit.ftl 4KB
authority.ftl 4KB
edit.ftl 4KB
edit.ftl 3KB
nologin.ftl 255B
303.ftl 204B
.gitignore 395B
.gitignore 395B
.gitignore 63B
CdcSqlServiceImpl.java 13KB
CdcJobServiceImpl.java 12KB
JobServiceImpl.java 9KB
SqlJobServiceImpl.java 9KB
SyncJobStatusTask.java 9KB
SyncCheckPoint.java 7KB
CdcJobController.java 6KB
JobController.java 6KB
SyncCheckPointTask.java 6KB
AdminUserServiceImpl.java 6KB
DataAuthorityServiceImpl.java 6KB
ClusterServiceImpl.java 5KB
JarsController.java 5KB
SqlJobController.java 5KB
RoleController.java 5KB
UserController.java 5KB
DbSourceController.java 5KB
DbInfoServiceImpl.java 5KB
Constant.java 4KB
JobApi.java 4KB
SecurityAspect.java 4KB
DbTableServiceImpl.java 4KB
JarApi.java 4KB
MysqlCdcConfig.java 4KB
RoleMenuServiceImpl.java 3KB
DbTableColumnController.java 3KB
DbTableController.java 3KB
DesUtils.java 3KB
AdminController.java 3KB
DbTableColumnServiceImpl.java 3KB
DbSourceServiceImpl.java 3KB
CheckPointInfoServiceImpl.java 3KB
ClusterController.java 3KB
MybatisConfiguration.java 3KB
ClusterStatusCheck.java 3KB
MenuServiceImpl.java 3KB
DruidDataSourceConfig.java 3KB
DataAuthorityController.java 3KB
WebResult.java 3KB
DbColunmDataType.java 2KB
MenuController.java 2KB
共 330 条
- 1
- 2
- 3
- 4
资源评论
Java程序员-张凯
- 粉丝: 1w+
- 资源: 7361
上传资源 快速赚钱
- 我的内容管理 展开
- 我的资源 快来上传第一个资源
- 我的收益 登录查看自己的收益
- 我的积分 登录查看自己的积分
- 我的C币 登录后查看C币余额
- 我的收藏
- 我的下载
- 下载帮助
最新资源
资源上传下载、课程学习等过程中有任何疑问或建议,欢迎提出宝贵意见哦~我们会及时处理!
点击此处反馈
安全验证
文档复制为VIP权益,开通VIP直接复制
信息提交成功