package com.scy.canal.client;
import java.util.List;
import java.util.Map;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.alibaba.otter.canal.client.CanalConnector;
import com.alibaba.otter.canal.client.CanalConnectors;
import com.alibaba.otter.canal.protocol.Message;
import com.alibaba.otter.canal.protocol.CanalEntry.Column;
import com.alibaba.otter.canal.protocol.CanalEntry.Entry;
import com.alibaba.otter.canal.protocol.CanalEntry.EntryType;
import com.alibaba.otter.canal.protocol.CanalEntry.EventType;
import com.alibaba.otter.canal.protocol.CanalEntry.RowChange;
import com.alibaba.otter.canal.protocol.CanalEntry.RowData;
import com.scy.canal.entity.CanalRowChange;
import com.scy.canal.process.BaseProcess;
/**
* 数据消费
* @author suicy
*
*/
public class CanalConsumer {
private static final Logger logger = LoggerFactory.getLogger(CanalConsumer.class);
//数据处理类key为表名,value为对应的处理类
private Map<String, BaseProcess> processor;
public static volatile boolean running = true;
private String destination;
private String zkServers;
public void init(){
// 创建链接
logger.error("--------CanalConsumer destination:"+destination+" start------------");
Thread thread = new Thread(new Runnable() {
public void run() {
CanalConnector connector = CanalConnectors.newClusterConnector(zkServers, destination, null, null);
int batchSize = 1000;
try
{
connector.connect();
connector.subscribe();
while ( running ) {
Message message = connector.getWithoutAck(batchSize); // 获取指定数量的数据
long batchId = message.getId();
int size = message.getEntries().size();
if (batchId == -1 || size == 0) {
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
} else {
boolean res = parseData(message.getEntries(),destination);
if(res){
connector.ack(batchId);
}else{
connector.rollback(batchId);
}
}
}
} finally {
connector.disconnect();
}
}
});
thread.start();
}
/**
* 数据处理
* @param entrys
* @param destination2
* @return
*/
private boolean parseData(List<Entry> entrys, String destination2) {
for (Entry entry : entrys) {
if ( EntryType.TRANSACTIONBEGIN.equals(entry.getEntryType()) || EntryType.TRANSACTIONEND.equals(entry.getEntryType()) ) {
continue;
}
RowChange rowChage = null;
try {
rowChage = RowChange.parseFrom(entry.getStoreValue());
} catch (Exception e) {
throw new RuntimeException("ERROR ## parser of eromanga-event has an error , data:" + entry.toString(), e);
}
EventType eventType = rowChage.getEventType();
String schemaName = entry.getHeader().getSchemaName();
String tableName = entry.getHeader().getTableName();
/*1. 输出数据变更日志*/
if( logger.isErrorEnabled() ){
if(eventType == EventType.INSERT || eventType == EventType.DELETE || eventType == EventType.UPDATE ){
logger.error("================> binlog["+entry.getHeader().getLogfileName()+":"+ entry.getHeader().getLogfileOffset()+"] , "
+ "name["+schemaName+":"+tableName+"] , eventType : "+eventType);
for (RowData rowData : rowChage.getRowDatasList()) {
if (eventType == EventType.DELETE) {
printColumn(rowData.getBeforeColumnsList());
} else if (eventType == EventType.INSERT) {
printColumn(rowData.getAfterColumnsList());
} else if (eventType == EventType.UPDATE){
logger.error("-------> before");
printColumn(rowData.getBeforeColumnsList());
logger.error("-------> after");
printColumn(rowData.getAfterColumnsList());
}
}
}
}
/*2. 数据处理*/
if( EventType.INSERT.equals(eventType)||EventType.DELETE.equals(eventType)||EventType.UPDATE.equals(eventType)){
//构造CanalRowChange对象
CanalRowChange rowChange = bulidCanalRowChange(schemaName, tableName, eventType, rowChage.getRowDatasList());
/*根据表明获取数据处理类*/
if(processor.containsKey(tableName.toLowerCase())){
boolean res = false;
//根据事件类型调用相应的数据处理方法
if (EventType.UPDATE.equals(eventType)) {
res = processor.get(tableName).processUpdate(rowChange);
} else if (EventType.INSERT.equals(eventType)) {
res = processor.get(tableName).processInsert(rowChange);
} else if (EventType.DELETE.equals(eventType)) {
res = processor.get(tableName).processDelete(rowChange);
}
if(!res){
logger.error("================> binlog["+entry.getHeader().getLogfileName()+":"+ entry.getHeader().getLogfileOffset()+"] , "
+ "name["+schemaName+":"+tableName+"] , eventType : "+eventType);
return res;
}
}
}
}
return true;
}
private CanalRowChange bulidCanalRowChange(String schemaName, String tableName, EventType eventType, List<RowData> rowData){
CanalRowChange change = new CanalRowChange();
change.setSchemaName(schemaName);
change.setEventType(eventType);
change.setRowData(rowData);
change.setTableName(tableName);
return change;
}
private void printColumn(List<Column> columns) {
for (Column column : columns) {
logger.error(column.getName() + " : " + column.getValue() + " update=" + column.getUpdated());
}
}
protected void stop() {
if (!running) {
return;
}
running = false;
}
public Map<String, BaseProcess> getProcessor() {
return processor;
}
public void setProcessor(Map<String, BaseProcess> processor) {
this.processor = processor;
}
public String getDestination() {
return destination;
}
public void setDestination(String destination) {
this.destination = destination;
}
public String getZkServers() {
return zkServers;
}
public void setZkServers(String zkServers) {
this.zkServers = zkServers;
}
}
没有合适的资源?快使用搜索试试~ 我知道了~
温馨提示
canal与spring整合demo,整合内容包含服务方,消费方 canal消费方采用策略模式,将不同表的数据分配到相应的Process类中。定义BaseProcess抽象类,包含processInsert、processUpdate、processDelete三个抽象方法,分别用于处理三种类型的数据操作。
资源推荐
资源详情
资源评论
收起资源包目录
canalDemo.zip (51个子文件)
canalDemo
.project 1KB
src
test
resource
log4j.xml 2KB
java
canalDemo
Test.java 267B
main
webapp
WEB-INF
web.xml 919B
index.jsp 418B
resources
log4j.xml 2KB
props
canal.properties 3KB
spring-config.xml 1KB
dbinstance
A1
instance.properties 773B
spring
spring-config-canal-client.xml 850B
spring-config-canal-server.xml 527B
spring-config-canal-instance.xml 9KB
java
com
scy
canal
client
CanalConsumer.java 6KB
process
BaseProcess.java 3KB
GoodsProcess.java 2KB
entity
CanalRowChange.java 1KB
DateConverter.java 2KB
Goods.java 1KB
server
CanalServer.java 2KB
target
m2e-wtp
web-resources
META-INF
MANIFEST.MF 106B
maven
com.scy
canalDemo
pom.properties 213B
pom.xml 8KB
classes
log4j.xml 2KB
props
canal.properties 3KB
spring-config.xml 1KB
dbinstance
A1
instance.properties 773B
com
scy
canal
client
CanalConsumer.class 8KB
CanalConsumer$1.class 2KB
process
BaseProcess$1.class 713B
GoodsProcess.class 2KB
BaseProcess.class 4KB
entity
Goods.class 2KB
DateConverter.class 3KB
CanalRowChange.class 2KB
server
CanalServer.class 3KB
CanalServer$1.class 1KB
spring
spring-config-canal-client.xml 850B
spring-config-canal-server.xml 527B
spring-config-canal-instance.xml 9KB
test-classes
canalDemo
Test.class 516B
.settings
org.eclipse.wst.jsdt.ui.superType.container 49B
org.eclipse.wst.common.project.facet.core.xml 252B
org.eclipse.m2e.core.prefs 90B
org.eclipse.jdt.core.prefs 430B
org.eclipse.wst.validation.prefs 50B
org.eclipse.wst.jsdt.ui.superType.name 6B
org.eclipse.core.resources.prefs 153B
org.eclipse.wst.common.component 756B
.jsdtscope 639B
pom.xml 8KB
.classpath 1KB
共 51 条
- 1
隋春羽
- 粉丝: 14
- 资源: 7
上传资源 快速赚钱
- 我的内容管理 展开
- 我的资源 快来上传第一个资源
- 我的收益 登录查看自己的收益
- 我的积分 登录查看自己的积分
- 我的C币 登录后查看C币余额
- 我的收藏
- 我的下载
- 下载帮助
安全验证
文档复制为VIP权益,开通VIP直接复制
信息提交成功
- 1
- 2
前往页