package com.scy.canal.client;
import java.util.List;
import java.util.Map;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.alibaba.otter.canal.client.CanalConnector;
import com.alibaba.otter.canal.client.CanalConnectors;
import com.alibaba.otter.canal.protocol.Message;
import com.alibaba.otter.canal.protocol.CanalEntry.Column;
import com.alibaba.otter.canal.protocol.CanalEntry.Entry;
import com.alibaba.otter.canal.protocol.CanalEntry.EntryType;
import com.alibaba.otter.canal.protocol.CanalEntry.EventType;
import com.alibaba.otter.canal.protocol.CanalEntry.RowChange;
import com.alibaba.otter.canal.protocol.CanalEntry.RowData;
import com.scy.canal.entity.CanalRowChange;
import com.scy.canal.process.BaseProcess;
/**
* 数据消费
* @author suicy
*
*/
public class CanalConsumer {
private static final Logger logger = LoggerFactory.getLogger(CanalConsumer.class);
//数据处理类key为表名,value为对应的处理类
private Map<String, BaseProcess> processor;
public static volatile boolean running = true;
private String destination;
private String zkServers;
public void init(){
// 创建链接
logger.error("--------CanalConsumer destination:"+destination+" start------------");
Thread thread = new Thread(new Runnable() {
public void run() {
CanalConnector connector = CanalConnectors.newClusterConnector(zkServers, destination, null, null);
int batchSize = 1000;
try
{
connector.connect();
connector.subscribe();
while ( running ) {
Message message = connector.getWithoutAck(batchSize); // 获取指定数量的数据
long batchId = message.getId();
int size = message.getEntries().size();
if (batchId == -1 || size == 0) {
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
} else {
boolean res = parseData(message.getEntries(),destination);
if(res){
connector.ack(batchId);
}else{
connector.rollback(batchId);
}
}
}
} finally {
connector.disconnect();
}
}
});
thread.start();
}
/**
* 数据处理
* @param entrys
* @param destination2
* @return
*/
private boolean parseData(List<Entry> entrys, String destination2) {
for (Entry entry : entrys) {
if ( EntryType.TRANSACTIONBEGIN.equals(entry.getEntryType()) || EntryType.TRANSACTIONEND.equals(entry.getEntryType()) ) {
continue;
}
RowChange rowChage = null;
try {
rowChage = RowChange.parseFrom(entry.getStoreValue());
} catch (Exception e) {
throw new RuntimeException("ERROR ## parser of eromanga-event has an error , data:" + entry.toString(), e);
}
EventType eventType = rowChage.getEventType();
String schemaName = entry.getHeader().getSchemaName();
String tableName = entry.getHeader().getTableName();
/*1. 输出数据变更日志*/
if( logger.isErrorEnabled() ){
if(eventType == EventType.INSERT || eventType == EventType.DELETE || eventType == EventType.UPDATE ){
logger.error("================> binlog["+entry.getHeader().getLogfileName()+":"+ entry.getHeader().getLogfileOffset()+"] , "
+ "name["+schemaName+":"+tableName+"] , eventType : "+eventType);
for (RowData rowData : rowChage.getRowDatasList()) {
if (eventType == EventType.DELETE) {
printColumn(rowData.getBeforeColumnsList());
} else if (eventType == EventType.INSERT) {
printColumn(rowData.getAfterColumnsList());
} else if (eventType == EventType.UPDATE){
logger.error("-------> before");
printColumn(rowData.getBeforeColumnsList());
logger.error("-------> after");
printColumn(rowData.getAfterColumnsList());
}
}
}
}
/*2. 数据处理*/
if( EventType.INSERT.equals(eventType)||EventType.DELETE.equals(eventType)||EventType.UPDATE.equals(eventType)){
//构造CanalRowChange对象
CanalRowChange rowChange = bulidCanalRowChange(schemaName, tableName, eventType, rowChage.getRowDatasList());
/*根据表明获取数据处理类*/
if(processor.containsKey(tableName.toLowerCase())){
boolean res = false;
//根据事件类型调用相应的数据处理方法
if (EventType.UPDATE.equals(eventType)) {
res = processor.get(tableName).processUpdate(rowChange);
} else if (EventType.INSERT.equals(eventType)) {
res = processor.get(tableName).processInsert(rowChange);
} else if (EventType.DELETE.equals(eventType)) {
res = processor.get(tableName).processDelete(rowChange);
}
if(!res){
logger.error("================> binlog["+entry.getHeader().getLogfileName()+":"+ entry.getHeader().getLogfileOffset()+"] , "
+ "name["+schemaName+":"+tableName+"] , eventType : "+eventType);
return res;
}
}
}
}
return true;
}
private CanalRowChange bulidCanalRowChange(String schemaName, String tableName, EventType eventType, List<RowData> rowData){
CanalRowChange change = new CanalRowChange();
change.setSchemaName(schemaName);
change.setEventType(eventType);
change.setRowData(rowData);
change.setTableName(tableName);
return change;
}
private void printColumn(List<Column> columns) {
for (Column column : columns) {
logger.error(column.getName() + " : " + column.getValue() + " update=" + column.getUpdated());
}
}
protected void stop() {
if (!running) {
return;
}
running = false;
}
public Map<String, BaseProcess> getProcessor() {
return processor;
}
public void setProcessor(Map<String, BaseProcess> processor) {
this.processor = processor;
}
public String getDestination() {
return destination;
}
public void setDestination(String destination) {
this.destination = destination;
}
public String getZkServers() {
return zkServers;
}
public void setZkServers(String zkServers) {
this.zkServers = zkServers;
}
}
没有合适的资源?快使用搜索试试~ 我知道了~
canal+spring的集成
共51个文件
xml:15个
class:11个
java:8个
4星 · 超过85%的资源 需积分: 31 81 下载量 30 浏览量
2018-04-25
09:31:22
上传
评论
收藏 53KB ZIP 举报
温馨提示
canal+spring的集成和封装,灵活使用,整合内容包含服务方,消费方 canal消费方采用策略模式,将不同表的数据分配到相应的Process类中。定义BaseProcess抽象类,包含processInsert、processUpdate、processDelete三个抽象方法,分别用于处理三种类型的数据操作。
资源推荐
资源详情
资源评论
收起资源包目录
canal_demo.zip (51个子文件)
canalDemo
.project 1KB
pom.xml 8KB
target
test-classes
canalDemo
Test.class 516B
m2e-wtp
web-resources
META-INF
maven
com.scy
canalDemo
pom.xml 8KB
pom.properties 213B
MANIFEST.MF 106B
classes
props
canal.properties 3KB
com
scy
canal
server
CanalServer$1.class 1KB
CanalServer.class 3KB
client
CanalConsumer.class 8KB
CanalConsumer$1.class 2KB
process
BaseProcess.class 4KB
GoodsProcess.class 2KB
BaseProcess$1.class 713B
entity
Goods.class 2KB
CanalRowChange.class 2KB
DateConverter.class 3KB
log4j.xml 2KB
spring
spring-config-canal-client.xml 850B
spring-config-canal-server.xml 527B
spring-config-canal-instance.xml 9KB
spring-config.xml 1KB
dbinstance
A1
instance.properties 773B
.settings
.jsdtscope 639B
org.eclipse.core.resources.prefs 153B
org.eclipse.wst.common.component 756B
org.eclipse.wst.validation.prefs 50B
org.eclipse.jdt.core.prefs 430B
org.eclipse.wst.common.project.facet.core.xml 252B
org.eclipse.wst.jsdt.ui.superType.container 49B
org.eclipse.m2e.core.prefs 90B
org.eclipse.wst.jsdt.ui.superType.name 6B
src
test
java
canalDemo
Test.java 267B
resource
log4j.xml 2KB
main
resources
props
canal.properties 3KB
log4j.xml 2KB
spring
spring-config-canal-client.xml 850B
spring-config-canal-server.xml 527B
spring-config-canal-instance.xml 9KB
spring-config.xml 1KB
dbinstance
A1
instance.properties 773B
java
com
scy
canal
server
CanalServer.java 2KB
client
CanalConsumer.java 6KB
process
BaseProcess.java 3KB
GoodsProcess.java 2KB
entity
Goods.java 1KB
CanalRowChange.java 1KB
DateConverter.java 2KB
webapp
WEB-INF
web.xml 919B
index.jsp 418B
.classpath 1KB
共 51 条
- 1
资源评论
- wxb19832018-07-09确实是一样的东西
- 乐观的Terry2019-01-08惠民,不想某些傻逼,要50积分
- jingshenbusi2018-06-01把别人的资源改个名字再发出来,有何意义??
- binibaichi2222018-05-21这是我找得东西,不错!
七夜天书
- 粉丝: 1
- 资源: 17
上传资源 快速赚钱
- 我的内容管理 展开
- 我的资源 快来上传第一个资源
- 我的收益 登录查看自己的收益
- 我的积分 登录查看自己的积分
- 我的C币 登录后查看C币余额
- 我的收藏
- 我的下载
- 下载帮助
安全验证
文档复制为VIP权益,开通VIP直接复制
信息提交成功