package com.ucar.datalink.biz.service.impl;
import com.google.common.cache.CacheBuilder;
import com.google.common.cache.CacheLoader;
import com.google.common.cache.LoadingCache;
import com.google.common.collect.Lists;
import com.google.common.collect.Sets;
import com.ucar.datalink.biz.dal.MediaDAO;
import com.ucar.datalink.biz.dal.MediaSourceDAO;
import com.ucar.datalink.biz.service.MediaSourceService;
import com.ucar.datalink.biz.service.SyncRelationService;
import com.ucar.datalink.biz.utils.DataLinkFactory;
import com.ucar.datalink.biz.utils.DataSourceFactory;
import com.ucar.datalink.biz.utils.ddl.DdlUtils;
import com.ucar.datalink.biz.utils.ddl.DdlSqlUtils;
import com.ucar.datalink.biz.utils.ddl.SQLStatementHolder;
import com.ucar.datalink.common.errors.DatalinkException;
import com.ucar.datalink.domain.media.*;
import com.ucar.datalink.domain.media.parameter.MediaSrcParameter;
import com.ucar.datalink.domain.media.parameter.rdb.RdbMediaSrcParameter;
import com.ucar.datalink.domain.media.parameter.sddl.SddlMediaSrcParameter;
import com.ucar.datalink.domain.relationship.*;
import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.lang.StringUtils;
import org.apache.ddlutils.model.Table;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.jdbc.core.JdbcTemplate;
import org.springframework.stereotype.Service;
import javax.sql.DataSource;
import java.util.*;
import java.util.stream.Collectors;
/**
* Created by lubiao on 2017/5/23.
*/
@Service
public class SyncRelationServiceImpl implements SyncRelationService {
private static final String ALL_MAPPING = "ALL_MAPPING";
private LoadingCache<String, List<MediaMappingInfo>> mediaMappingCache = CacheBuilder.newBuilder().build(
new CacheLoader<String, List<MediaMappingInfo>>() {
@Override
public List<MediaMappingInfo> load(String key) throws Exception {
List<MediaMappingInfo> list = mediaDAO.getAllMediaMappings();
return list == null ? Lists.newArrayList() : list;
}
}
);
private LoadingCache<Long, List<MediaMappingInfo>> taskMediaMappingCache = CacheBuilder.newBuilder().build(
new CacheLoader<Long, List<MediaMappingInfo>>() {
@Override
public List<MediaMappingInfo> load(Long taskId) throws Exception {
List<MediaMappingInfo> allList = mediaMappingCache.getUnchecked(ALL_MAPPING);
return allList.stream().filter(i -> i.getTaskId().equals(taskId)).collect(Collectors.toList());
}
}
);
private LoadingCache<String, List<SyncNode>> treesCache = CacheBuilder.newBuilder().build(
new CacheLoader<String, List<SyncNode>>() {
@Override
public List<SyncNode> load(String mediaName) throws Exception {
return generateTrees(mediaName);
}
}
);
@Override
public void clearSyncRelationCache() {
mediaMappingCache.invalidateAll();
taskMediaMappingCache.invalidateAll();
treesCache.invalidateAll();
}
@Autowired
MediaDAO mediaDAO;
@Autowired
MediaSourceDAO mediaSourceDAO;
@Override
public List<SyncNode> getSyncRelationTrees(Long mediaSourceId, String mediaName) {
Long sddlMediaSourceId = isSDDLSubDB(mediaSourceId);
Long realMediaSourceId;
if (sddlMediaSourceId != null) {
realMediaSourceId = sddlMediaSourceId;
} else {
realMediaSourceId = mediaSourceId;
}
List<SyncNode> list = treesCache.getUnchecked(mediaName);
return list.stream().filter(i -> isDbInNodeTree(realMediaSourceId, mediaName, i, true)).collect(Collectors.toList());
}
@Override
public List<SqlCheckResult> checkSqls(Long mediaSourceId, String sqls) {
Long sddlMediaSourceId = isSDDLSubDB(mediaSourceId);
if (sddlMediaSourceId != null) {
mediaSourceId = sddlMediaSourceId;
}
MediaSourceInfo mediaSourceInfo = mediaSourceDAO.getById(mediaSourceId);
List<SQLStatementHolder> holders = DdlSqlUtils.buildSQLStatement(mediaSourceInfo.getType(), sqls);
List<SqlCheckResult> list = new ArrayList<>();
if (!holders.isEmpty()) {
for (SQLStatementHolder holder : holders) {
SqlCheckResult sqlCheckResult = checkOneSql(mediaSourceInfo, holder);
if (sqlCheckResult != null) {
list.add(sqlCheckResult);
}
}
}
return list;
}
private SqlCheckResult checkOneSql(final MediaSourceInfo mediaSourceInfo, final SQLStatementHolder holder) {
SqlCheckResult sqlCheckResult = new SqlCheckResult(holder.getSqlString());
holder.check();
if (holder.getSqlType().equals(SqlType.CreateTable)) {
holder.getSqlCheckItems().forEach(i -> {
List<SqlCheckTree> trees = buildSqlCheckTrees(i, mediaSourceInfo);
if (!trees.isEmpty()) {
sqlCheckResult.getSqlCheckTrees().addAll(trees);
trees.forEach(t -> {
t.setSqlExeDirection(SqlExeDirection.Negative);
t.getSqlCheckNotes().add(new SqlCheckNote(
"请按照同步关系的反向顺序执行脚本.",
SqlCheckNote.RoleType.DBA,
SqlCheckNote.NoteLevel.INFO));
if (containsSddlDataSource(t.getRootNode())) {
t.getSqlCheckNotes().add(new SqlCheckNote(
"SDDL数据源内部先执行非0号库,再执行0号库.",
SqlCheckNote.RoleType.DBA,
SqlCheckNote.NoteLevel.INFO));
}
if (containsSddlDataSource(t.getRootNode())) {
t.getSqlCheckNotes().add(new SqlCheckNote(
"SDDL增加数据表,请在sddladmin中做对应的配置变更.",
SqlCheckNote.RoleType.DLA,
SqlCheckNote.NoteLevel.WARN));
}
t.getSqlCheckNotes().addAll(buildSqlCheckNotes(mediaSourceInfo, i, t.getRootNode(), true));
});
}
}
);
} else if (holder.getSqlType().equals(SqlType.AlterTable)) {
holder.getSqlCheckItems().forEach(i -> {
List<SqlCheckTree> trees = buildSqlCheckTrees(i, mediaSourceInfo);
if (!trees.isEmpty()) {
sqlCheckResult.getSqlCheckTrees().addAll(trees);
trees.forEach(t -> {
if (i.isContainsTableRename()) {
t.getSqlCheckNotes().add(new SqlCheckNote(
"参与数据同步的表,不支持[Table-Rename]操作.",
SqlCheckNote.RoleType.ALL,
SqlCheckNote.NoteLevel.ERROR));
}
if (i.isContainsColumnRename()) {
t.getSqlCheckNotes().add(new SqlCheckNote(
"参与数据同步的表,不支持[Column-Rename]操作",
SqlCheckNote.RoleType.ALL,
SqlCheckNo
没有合适的资源?快使用搜索试试~ 我知道了~
DataLink是一个满足各种异构数据源之间的实时增量同步分布式可扩展的数据交换平台
共1038个文件
java:639个
js:90个
jsp:80个
需积分: 50 28 下载量 164 浏览量
2019-08-08
04:52:24
上传
评论
收藏 8.79MB ZIP 举报
温馨提示
DataLink是一个满足各种异构数据源之间的实时增量同步,分布式、可扩展的数据交换平台
资源推荐
资源详情
资源评论
收起资源包目录
DataLink是一个满足各种异构数据源之间的实时增量同步分布式可扩展的数据交换平台 (1038个子文件)
startup.bat 1KB
ace.min.css 388KB
ace-part2.min.css 126KB
ace-rtl.min.css 114KB
bootstrap.min.css 113KB
ace-skins.min.css 76KB
font-awesome.min.css 27KB
font-awesome.min.css 27KB
jsoneditor.css 21KB
bootstrap-datepicker3.min.css 20KB
bootstrap-editable.min.css 17KB
jquery-ui.min.css 15KB
select2.min.css 14KB
ui.jqgrid.min.css 13KB
fullcalendar.min.css 12KB
chosen.min.css 11KB
dropzone.min.css 9KB
ace-ie.min.css 9KB
bootstrap-datetimepicker.min.css 8KB
daterangepicker.min.css 5KB
bootstrap-colorpicker.min.css 4KB
jquery-ui.custom.min.css 3KB
bootstrap-timepicker.min.css 3KB
colorbox.min.css 3KB
jquery.gritter.min.css 2KB
bootstrap-duallistbox.min.css 1KB
bootstrap-multiselect.min.css 1KB
prettify.min.css 532B
fonts.googleapis.com.css 494B
sigar-amd64-winnt.dll 393KB
sigar-x86-winnt.dll 260KB
fontawesome-webfont.eot 69KB
glyphicons-halflings-regular.eot 20KB
.gitattributes 87B
.gitignore 280B
HEADER 601B
sqljdbc4-4.0.jar 571KB
SyncRelationServiceImpl.java 32KB
GroupCoordinator.java 31KB
DLConfig.java 29KB
WorkerController.java 26KB
EsClient.java 25KB
WorkerKeeper.java 25KB
MediaMappingController.java 25KB
DdlUtils.java 23KB
AbstractHandler.java 21KB
WorkerConfig.java 21KB
MediaServiceImpl.java 19KB
HBaseTaskController.java 19KB
SQLStatementHolder.java 18KB
RdbEventRecordLoader.java 17KB
MysqlTaskController.java 16KB
HBasePluginUtil.java 16KB
ElasticSearchUtil.java 16KB
RDBMSUtil.java 15KB
MediaSourceController.java 15KB
MysqlTaskReader.java 15KB
MediaMetaController.java 15KB
StandaloneWorkerKeeper.java 14KB
BuiltInRdbEventRecordMerger.java 14KB
SqlUtils.java 14KB
MessageParser.java 14KB
UCarAlarmServiceImpl.java 14KB
WorkerCoordinator.java 14KB
TaskMonitorController.java 14KB
SddlMediaSourceController.java 14KB
BaseRecordHandler.java 13KB
MediaSourceServiceImpl.java 13KB
MetaMapping.java 12KB
ZkMediaSourceController.java 12KB
DelayedOperationPurgatory.java 12KB
RecordGroupHolder.java 11KB
HBaseMediaSourceController.java 11KB
MonitorServiceImpl.java 10KB
CustomCanalInstanceWithManager.java 10KB
EsMediaSourceController.java 10KB
DdlEventInterceptor.java 10KB
BatchContentBuilder.java 10KB
CanalConfigGenerator.java 10KB
RdbEventRecordHandler.java 10KB
WorkerTask.java 10KB
ReplicateHRegionServer.java 9KB
RdbEventRecordHandler.java 9KB
MonitorController.java 9KB
HRecordHandler.java 9KB
DataSourceFactory.java 9KB
WorkerCombinedTaskWriter.java 9KB
WorkerTaskReader.java 9KB
HDFSMediaSourceController.java 9KB
HBaseUtil.java 9KB
HBaseMetaResource.java 9KB
WorkerBootStrap.java 9KB
Assert.java 9KB
SessionHandler.java 9KB
TaskConfigServiceImpl.java 8KB
WorkerGroupMember.java 8KB
TimingWheel.java 8KB
HRecordBatchContentBuilder.java 8KB
ModeUtils.java 8KB
FileStreamHolder.java 8KB
共 1038 条
- 1
- 2
- 3
- 4
- 5
- 6
- 11
资源评论
weixin_39840650
- 粉丝: 409
- 资源: 1万+
上传资源 快速赚钱
- 我的内容管理 展开
- 我的资源 快来上传第一个资源
- 我的收益 登录查看自己的收益
- 我的积分 登录查看自己的积分
- 我的C币 登录后查看C币余额
- 我的收藏
- 我的下载
- 下载帮助
安全验证
文档复制为VIP权益,开通VIP直接复制
信息提交成功