监听数据变更一般有两种方式,一种是数据变更后发送MQ, 下游系统监听MQ,然后处理对应的变更逻辑。先简单说说MQ方式不足
1)需要处理消息丢失,重复消费等问题
2)需要上游保证事务提交后才发送MQ。
3)无法感知表结构变更
4)消息内容没有规范,难以统一处理
今天介绍另外一种基于binlog 方式监听数据变更。本文选择Canal 进行处理,并进行了简易封装,使其尽可能通用。
第一步Canal环境配置1)mysql 配置
mysql 开启binlog , binlog-format 选择ROW 格式
授权 canal 链接 MySQL 账号具有作为 MySQL REPLICATION的权限
2) Canal
下载canal.deployer-1.0.17.tar.gz , 并解压
修改 conf/example/instance.properties 相关信息
3)jdk 安装
canal 依赖jdk , 版本至少1.5 ,可以查看canal 启动脚本 startup.sh
安装后需要配置环境变量,jdk安装可以自行安装。
然后启动 canal , 执行startup.sh 即可。
放码过来定义一个处理接口 BinLogListenergetSupportTable() //处理的表
insert(List<CanalEntry.Column> columns) 插入记录处理方法
update(List<CanalEntry.Column>beforeColumns,List<CanalEntry.Column>afterColumns)更新记录处理方法
delete(List<CanalEntry.Column> columns) 删除记录处理方法
定义一个模板类 AbstractBinLogListener将变更内容转换用JSON转换为对象,具体实现类只需要操作具体对象即可
protected T columnToObj(List<CanalEntry.Column> columns){ JSONObject jsonObject = new JSONObject(); for(CanalEntry.Column column : columns){ jsonObject.put(column.getName(),column.getValue()); } return jsonObject.toJavaObject(getTargetClass());}通用的处理类 BinLogResolveService属性介绍
CanalConnector 用于连接拉取binlog 数据
List binLogListeners 所有的binlog处理类
初始化方法
建立Canal连接,订阅binlog
并开启线程异步处理
核心处理逻辑1)根据变更的表,获取对应的 BinLogListener
String tableName = entry.getHeader().getTableName();BinLogListener binLogListener = binLogLisenerMap.get(tableName);2)根据变更类型调用不同的处理方法
if (eventType == CanalEntry.EventType.DELETE) { binLogListener.delete(rowData.getBeforeColumnsList());} else if (eventType == CanalEntry.EventType.INSERT) { binLogListener.insert(rowData.getAfterColumnsList());} else { binLogListener.update(rowData.getBeforeColumnsList(),rowData.getAfterColumnsList());}资源释放,有始有终
停止处理线程
断开Canal连接
具体Binlog处理类
测试效果
项目结构树
不足之处欢迎大佬指点江山,激扬文字,不胜感激。
限于篇幅,文章省略部分非核心步骤或代码