Canal监听MySQL的实现步骤

目录

1、Mysql数据库开启binlog模式

2、Docker下Canal容器安装

3、Canal Client项目搭建

1、Mysql数据库开启binlog模式

注意:Mysql容器,此处Mysql版本为5.7

#进入容器 docker exec -it mysql /bin/bash #进入配置目录 cd /etc/mysql/mysql.conf.d #修改配置文件 vi mysqld.cnf

(1) 修改mysqld.cnf配置文件,添加如下配置:

log-bin=mysql-bin server-id=12345

(2) 创建账号 用于测试使用,使用root账号创建用户并授予权限

create user canal@'%' IDENTIFIED by 'canal'; GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT,SUPER ON *.* TO 'canal'@'%'; FLUSH PRIVILEGES;

(3) 重启mysql容器

docker restart mysql 2、Docker下Canal容器安装

(1)拉取canal镜像

docker pull docker.io/canal/canal-server

(2)创建Canal容器

docker run -p 11111:11111 --name canal -d docker.io/canal/canal-server

(3)进入容器,修改核心配置canal.properties 和instance.properties,canal.properties 是canal自身的配置,instance.properties是需要同步数据的数据库连接配置。

#进入容器 docker exec -it canal /bin/bash cd canal-server/conf/ #修改 canal.properties vi canal.properties cd example/ #修改 instance.properties vi instance.properties

修改canal.properties的id,不能和mysql的server-id重复,如下图:

修改instance.properties,配置数据库连接地址:

这里的canal.instance.filter.regex有多种配置,如下:

可以参考地址如下: https://github.com/alibaba/canal/wiki/AdminGuide

mysql 数据解析关注的表,Perl正则表达式.

多个正则之间以逗号(,)分隔,转义符需要双斜杠(\\) 

常见例子:

1.  所有表:.*   or  .*\\..*
2.  canal schema下所有表: canal\\..*
3.  canal下的以canal打头的表:canal\\.canal.*
4.  canal schema下的一张表:canal.test1
5.  多个规则组合使用:canal\\..*,mysql.test1,mysql.test2 (逗号分隔)
注意:此过滤条件只针对row模式的数据有效(ps. mixed/statement因为不解析sql,所以无法准确提取tableName进行过滤)

配置完成后,设置开机启动,并记得重启canal。

docker update --restart=always canal docker restart canal 3、Canal Client项目搭建

(1)引入依赖

<parent> <artifactId>spring-boot-parent</artifactId> <groupId>org.springframework.boot</groupId> <version>2.1.4.RELEASE</version> </parent> <dependencies> <!--canal依赖--> <dependency> <groupId>com.xpand</groupId> <artifactId>starter-canal</artifactId> <version>0.0.1-SNAPSHOT</version> </dependency> </dependencies>

注意:canal依赖stater在中央仓库是不存在的,需要手动放进本地仓库或者你公司里面的nexus

这里以放进本地仓库为例:

首先解压spring-boot-starter-canal-master.zip

在spring-boot-starter-canal-master\starter-canal文件夹下执行mvn clean install

此时在本地仓库就会存在jar包

引入依赖

<!--canal依赖--> <dependency> <groupId>com.xpand</groupId> <artifactId>starter-canal</artifactId> <version>0.0.1-SNAPSHOT</version> </dependency>

(2)启动类编写

@SpringBootApplication @EnableCanalClient public class CanalApplication { public static void main(String[] args) { SpringApplication.run(CanalApplication.class,args); } }

(3)监听器编写

@CanalEventListener public class CanalDataEventListener { /*** * 增加数据监听 * @param eventType * @param rowData */ @InsertListenPoint public void onEventInsert(CanalEntry.EventType eventType, CanalEntry.RowData rowData) { rowData.getAfterColumnsList().forEach((c) -> System.out.println("By--Annotation: " + c.getName() + " :: " + c.getValue())); } /*** * 修改数据监听 * @param rowData */ @UpdateListenPoint public void onEventUpdate(CanalEntry.RowData rowData) { System.out.println("UpdateListenPoint"); rowData.getAfterColumnsList().forEach((c) -> System.out.println("By--Annotation: " + c.getName() + " :: " + c.getValue())); } /*** * 删除数据监听 * @param eventType */ @DeleteListenPoint public void onEventDelete(CanalEntry.EventType eventType) { System.out.println("DeleteListenPoint"); } /*** * 自定义数据修改监听 * @param eventType * @param rowData */ @ListenPoint(destination = "example", schema = "torlesse_test", table = {"tb_user", "tb_order"}, eventType = CanalEntry.EventType.UPDATE) public void onEventCustomUpdate(CanalEntry.EventType eventType, CanalEntry.RowData rowData) { System.err.println("DeleteListenPoint"); rowData.getAfterColumnsList().forEach((c) -> System.out.println("By--Annotation: " + c.getName() + " :: " + c.getValue())); } @ListenPoint(destination = "example", schema = "test_canal", //所要监听的数据库名 table = {"tb_user"}, //所要监听的数据库表名 eventType = {CanalEntry.EventType.UPDATE, CanalEntry.EventType.INSERT, CanalEntry.EventType.DELETE}) public void onEventCustomUpdateForTbUser(CanalEntry.EventType eventType, CanalEntry.RowData rowData){ getChangeValue(eventType,rowData); } public static void getChangeValue(CanalEntry.EventType eventType, CanalEntry.RowData rowData){ if(eventType == CanalEntry.EventType.DELETE){ rowData.getBeforeColumnsList().forEach(column -> { //获取删除前的数据 System.out.println(column.getName() + " == " + column.getValue()); }); }else { rowData.getBeforeColumnsList().forEach(column -> { //打印改变前的字段名和值 System.out.println(column.getName() + " == " + column.getValue()); }); rowData.getAfterColumnsList().forEach(column -> { //打印改变后的字段名和值 System.out.println(column.getName() + " == " + column.getValue()); }); } } }

到此就可以实现Canal监听Mysql

项目gitee地址:test-canal

详情可以查看:

https://github.com/alibaba/canal (阿里官方)

https://github.com/alibaba/canal/wiki/AdminGuide (阿里官方)

https://github.com/chenqian56131/spring-boot-starter-canal (自制starter)

到此这篇关于Canal监听MySQL的实现步骤的文章就介绍到这了,更多相关Canal监听MySQL内容请搜索易知道(ezd.cc)以前的文章或继续浏览下面的相关文章希望大家以后多多支持易知道(ezd.cc)!

推荐阅读

    mac怎么安装mysql| macbook怎么下载mysql

    mac怎么安装mysql| macbook怎么下载mysql,标签,学习python 不得不学习的就是数据库,那么在开始学习数据库之前,那么今天我们先说下怎么安装m

    MySQL更新更新页面1/4。

    MySQL更新更新页面1/4。,,插入和替换 插入和替换语句的功能是将新数据插入表中,这两个语句的语法相似,它们之间的主要区别是如何处理重复的

    mysql数据库启动失败

    mysql数据库启动失败,报错,非正常,1、原因 公司服务器故障,非正常停机导致数据库启动失败。 报错信息 [ [email protected] dmp]# /etc/ini

    mysql长整型是什么

    mysql长整型是什么,整型,数据类型,语法,用户,填充,版本,MySQL长整型是“BIGINT”,是MySQL中最常用的数据类型之一,其可以用来存储较大的整数值,与

    深入理解MySQL分区

    深入理解MySQL分区,查询,数据,列表,索引,操作,按键,MySQL数据库是一个开源的关系型数据库管理系统。在一些大型数据环境中,为了更好地管理数据、

    mysql 如何查询

    mysql 如何查询,查询,数据,语句,条件,选取,排序,MySQL是一款常用的关系型数据库管理系统,被广泛应用于各种网站和应用开发。在MySQL中查询数据是

    MySQL中怎么实现分页操作

    MySQL中怎么实现分页操作,数据,显示,偏移量,分页,查询,初始,一、 背景什么是分页,就是查询时候数据量太大,一次性返回所有查询结果既耗费网络资源

    mysql怎么连接数据库

    mysql怎么连接数据库,服务,启动,数据库,命令,登录,步骤,mysql连接数据库的方法:1、通过计算机管理方式或通过命令行方式执行“net start mysql”

    mysql 查询拼接

    mysql 查询拼接,函数,字段,字符串,查询,分隔符,连接,MySQL 查询拼接在使用 MySQL 进行查询时,有时需要将多个字段或多张表中的字段进行拼接,这时

    怎么启动 mysql

    怎么启动 mysql,启动,服务器,输入,命令提示符,终端,命令,MySQL是一种广泛使用的关系型数据库管理系统。它是一种可扩展性强、性能卓越、跨平台

    mysql 如何卸载

    mysql 如何卸载,卸载,服务,命令,软件包,数据,安装,MySQL是一种广泛使用的数据库管理系统,用于管理数据的存储、检索和更新。在某些情况下,您需要

    mysql视图是什么

    mysql视图是什么,视图,查询,数据,替换,年龄,引用,mysql视图是一个虚拟表,其内容由查询定义;视图包含系列带有名称的列和行数据,而行和列数据来自定