package com.data.binlog.listener;
import cn.hutool.core.util.ObjectUtil;
import com.data.binlog.config.DataSourceConfig;
import com.data.binlog.context.TableContext;
import com.data.binlog.listener.biz.BizTableRouteProcess;
import com.data.binlog.param.BinLogItem;
import com.data.binlog.param.ColumnInfo;
import com.github.shyiko.mysql.binlog.BinaryLogClient;
import com.github.shyiko.mysql.binlog.event.*;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j;
import net.sf.jsqlparser.JSQLParserException;
import net.sf.jsqlparser.parser.CCJSqlParserUtil;
import net.sf.jsqlparser.statement.Statement;
import net.sf.jsqlparser.statement.alter.Alter;
import net.sf.jsqlparser.util.TablesNamesFinder;
import org.springframework.stereotype.Component;
import javax.annotation.Resource;
import java.io.Serializable;
import java.sql.*;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
/**
* binlog监听主类
*/
@Component
@Slf4j
public class BinLogEventListener implements BinaryLogClient.EventListener {
@Resource
private BizTableRouteProcess bizTableRouteProcess;
@Resource
private DataSourceConfig dataSourceConfig;
@Override
public void onEvent(Event event) {
EventHeader header = event.getHeader();
EventType eventType = header.getEventType();
System.out.println("监听的事件类型:" + eventType);
Map<String, Map<String, ColumnInfo>> tableIdColumnMap = TableContext.getTableIdColumnMap();
Map<String, String> tableNameMap = TableContext.getTableNameMap();
Map<String, String> tableIdMap = TableContext.getTableIdMap();
if (eventType == EventType.TABLE_MAP) {
TableMapEventData tableData = event.getData();
String db = tableData.getDatabase();
String table = tableData.getTable();
String tableId = String.valueOf(tableData.getTableId());
try {
tableNameMap.put(table, tableId);
tableIdMap.put(tableId, table);
//如果缓存中有数据结构则直接掉过
if (tableIdColumnMap.get(tableId) == null) {
Map<String, ColumnInfo> columnInfoMap = getColMap(db, table);
tableIdColumnMap.put(tableId, columnInfoMap);
}
} catch (Exception e) {
throw new RuntimeException(e);
}
}
if (eventType == EventType.QUERY) {
QueryEventData queryEventData = event.getData();
String execSql = queryEventData.getSql();
Statement statement = null;
try {
statement = CCJSqlParserUtil.parse(execSql);
} catch (JSQLParserException e) {
log.error("{} sql语句格式错误", execSql);
return;
}
//如果字段发生更新需要删除重新获取
if (statement instanceof Alter) {
Alter alterStatement = (Alter) statement;
String tableName = alterStatement.getTable().getName();
//数据结构有变化清除
tableIdColumnMap.remove(tableNameMap.get(tableName));
log.error("数据结构变化", tableName);
}
}
if (EventType.isWrite(eventType)) {
//获取事件体
WriteRowsEventData data = event.getData();
} else if (EventType.isUpdate(eventType)) {
UpdateRowsEventData data = (UpdateRowsEventData) event.getData();
for (Map.Entry<Serializable[], Serializable[]> mapEntry : data.getRows()) {
Map<String, ColumnInfo> columnInfoMap = tableIdColumnMap.get(String.valueOf(data.getTableId()));
Map<String, Serializable> before = Maps.newHashMap();
Map<String, Serializable> after = Maps.newHashMap();
Map<String, Object[]> change = Maps.newHashMap();
BinLogItem binLogItem = new BinLogItem();
binLogItem.setTableName(tableIdMap.get(String.valueOf(data.getTableId())));
columnInfoMap.entrySet().forEach(entry -> {
String column = entry.getKey();
ColumnInfo columnInfo = entry.getValue();
Serializable beforeValue = mapEntry.getKey()[columnInfo.getIdx()];
Serializable afterValue = mapEntry.getValue()[columnInfo.getIdx()];
before.put(column, beforeValue);
after.put(column, afterValue);
if (!ObjectUtil.equals(beforeValue, afterValue)) {
change.put(column, Lists.newArrayList(beforeValue, afterValue).toArray());
}
});
binLogItem.setEventType(eventType);
binLogItem.setTimestamp(event.getHeader().getTimestamp());
binLogItem.setBefore(before);
binLogItem.setAfter(after);
binLogItem.setColumnChangeMap(change);
binLogItem.setColumnInfoMap(columnInfoMap);
bizTableRouteProcess.route(binLogItem);
}
System.out.println(data);
} else if (EventType.isDelete(eventType)) {
DeleteRowsEventData data = event.getData();
BinLogItem binLogItem = new BinLogItem();
binLogItem.setTableName(tableIdMap.get(String.valueOf(data.getTableId())));
binLogItem.setEventType(eventType);
binLogItem.setTimestamp(event.getHeader().getTimestamp());
bizTableRouteProcess.route(binLogItem);
System.out.println(data);
}
}
/**
* 获取表中所有的字段信息
*
* @param db
* @param table
* @return
* @throws Exception
*/
public Map<String, ColumnInfo> getColMap(String db, String table) throws Exception {
Map<String, ColumnInfo> map = new HashMap<>();
try {
Class.forName("com.mysql.jdbc.Driver");
// 保存当前注册的表的column信息
Connection connection = DriverManager.getConnection("jdbc:mysql://" + dataSourceConfig.getUrl() + ":"
+ dataSourceConfig.getPort(), dataSourceConfig.getUsername(), dataSourceConfig.getPasswd());// 执行sql
String preSql = "SELECT TABLE_SCHEMA, TABLE_NAME, COLUMN_NAME, " +
"DATA_TYPE, ORDINAL_POSITION -1 as ORDINAL_POSITION , case when COLUMN_KEY = 'PRI' THEN 'Y' ELSE 'N' END IS_PKC" +
" FROM INFORMATION_SCHEMA.COLUMNS WHERE TABLE_SCHEMA = ? and TABLE_NAME = ? order by ordinal_position";
String sql = "SELECT * FROM TENANT_DOMAIN_REL";
PreparedStatement ps = connection.prepareStatement(preSql);
ps.setString(1, db);
ps.setString(2, table);
ResultSet rs = ps.executeQuery();
while (rs.next()) {
String schema = rs.getString("TABLE_SCHEMA");
String tableName = rs.getString("TABLE_NAME");
String column = rs.getString("COLUMN_NAME");
int idx = rs.getInt("ORDINAL_POSITION");
String dataType = rs.getString("DATA_TYPE");
String isPKC = rs.getString("IS_PKC");
ColumnInfo columnInfo = new ColumnInfo(idx, schema, tableName, column, dataType, "Y".equals(isPKC));
map.put(column, columnInfo);
}
ps.close();
rs.close();
} catch (SQLException e) {
}
return map;
}
}
没有合适的资源?快使用搜索试试~ 我知道了~
binlog数据库自定义表同步附件
共12个文件
java:12个
需积分: 0 0 下载量 195 浏览量
2024-04-25
17:59:04
上传
评论
收藏 10KB ZIP 举报
温馨提示
附件是基于mysql-binlog-connector-java实现的简单binlog同步,插件可热插拔,用户可自定义自己的表加工逻辑
资源推荐
资源详情
资源评论
收起资源包目录
binlog.zip (12个子文件)
binlog
param
BinLogItem.java 592B
ColumnInfo.java 661B
annotation
BinLogTable.java 378B
Starter.java 2KB
context
SpringContextHolder.java 2KB
TableContext.java 731B
listener
BinLogEventListener.java 8KB
biz
BizTableListener.java 230B
user
UserTableListener.java 1KB
BizTableRouteProcess.java 2KB
config
DataSourceConfig.java 450B
BinaryLogClientConfig.java 1KB
共 12 条
- 1
资源评论
zhaoyonghenghcl
- 粉丝: 70
- 资源: 1
上传资源 快速赚钱
- 我的内容管理 展开
- 我的资源 快来上传第一个资源
- 我的收益 登录查看自己的收益
- 我的积分 登录查看自己的积分
- 我的C币 登录后查看C币余额
- 我的收藏
- 我的下载
- 下载帮助
安全验证
文档复制为VIP权益,开通VIP直接复制
信息提交成功