canal监听binlog从环境搭建到封装一个简易的处理器全过程

香薇说科技世界 2024-12-05 22:56:55
背景

监听数据变更一般有两种方式,一种是数据变更后发送MQ, 下游系统监听MQ,然后处理对应的变更逻辑。先简单说说MQ方式不足

1)需要处理消息丢失,重复消费等问题

2)需要上游保证事务提交后才发送MQ。

3)无法感知表结构变更

4)消息内容没有规范,难以统一处理

今天介绍另外一种基于binlog 方式监听数据变更。本文选择Canal 进行处理,并进行了简易封装,使其尽可能通用。

第一步Canal环境配置

1)mysql 配置

mysql 开启binlog , binlog-format 选择ROW 格式

授权 canal 链接 MySQL 账号具有作为 MySQL REPLICATION的权限

2) Canal

下载canal.deployer-1.0.17.tar.gz , 并解压

修改 conf/example/instance.properties 相关信息

3)jdk 安装

canal 依赖jdk , 版本至少1.5 ,可以查看canal 启动脚本 startup.sh

安装后需要配置环境变量,jdk安装可以自行安装。

然后启动 canal , 执行startup.sh 即可。

放码过来

定义一个处理接口 BinLogListener

getSupportTable() //处理的表

insert(List<CanalEntry.Column> columns) 插入记录处理方法

update(List<CanalEntry.Column>beforeColumns,List<CanalEntry.Column>afterColumns)更新记录处理方法

delete(List<CanalEntry.Column> columns) 删除记录处理方法

定义一个模板类 AbstractBinLogListener

将变更内容转换用JSON转换为对象,具体实现类只需要操作具体对象即可

protected T columnToObj(List<CanalEntry.Column> columns){ JSONObject jsonObject = new JSONObject(); for(CanalEntry.Column column : columns){ jsonObject.put(column.getName(),column.getValue()); } return jsonObject.toJavaObject(getTargetClass());}通用的处理类 BinLogResolveService

属性介绍

CanalConnector 用于连接拉取binlog 数据

List binLogListeners 所有的binlog处理类

初始化方法

建立Canal连接,订阅binlog

并开启线程异步处理

核心处理逻辑

1)根据变更的表,获取对应的 BinLogListener

String tableName = entry.getHeader().getTableName();BinLogListener binLogListener = binLogLisenerMap.get(tableName);

2)根据变更类型调用不同的处理方法

if (eventType == CanalEntry.EventType.DELETE) { binLogListener.delete(rowData.getBeforeColumnsList());} else if (eventType == CanalEntry.EventType.INSERT) { binLogListener.insert(rowData.getAfterColumnsList());} else { binLogListener.update(rowData.getBeforeColumnsList(),rowData.getAfterColumnsList());}

资源释放,有始有终

停止处理线程

断开Canal连接

具体Binlog处理类

测试效果

项目结构树

不足之处欢迎大佬指点江山,激扬文字,不胜感激。

限于篇幅,文章省略部分非核心步骤或代码

0 阅读:0

香薇说科技世界

简介:感谢大家的关注