简介
最近一个业务中,要求 Redis 中的数据和 MySQL 中的数据保持较高的一致性,如果缓存和数据库内容不一致,就会导致客户端出现一些问题。针对这个问题,该项目打算使用的解决办法是通过读取 binlog 并进行解析,将解析后的信息放入 MQ 中,然后专门提供一个服务消费 MQ 中的信息,将更新操作同步到 Redis,实现 Redis 和 MySQL 数据的一致性。
阿里刚好提供了开源工具 canal 能够实现读取 binlog 并解析的功能,这篇文章记录了使用 Canal 实现两个数据库消息同步的简单 demo,消息队列中间件使用的是 Kafka。
前期配置
MySQL 开启 binlog 日志主从同步
因为 Canal 是伪装成从服务来监听 MySQL 的 binlog,所以首先需要让 MySQL 允许 binlog 日志主从同步
本机上的 MySQL 是以 Docker 容器的方式运行的,通过以下命令打开并编辑 MySQL 的配置文件
1 2 docker exec -it 71cd220d1f8eef6df43ece34b6b38fe480791c82f3ac434b903fc25b06933384 /bin/sh vim etc/mysql/mysql.conf.d/mysqld.cnf
在 [mysqld]
标签下添加以下内容
1 2 3 log-bin = /var/lib/mysql/mysql-bin #日志记录到指定位置 binlog-format = ROW #记录只要数据发生修改,就记录到日志中 server_id = 1 #mysql主从复制的唯一id,不允许重复
重启 MySQL 后可以进入 MySQL 输入以下语句查看 binlog 日志主从同步是否生效
1 show variables like '%log_bin%' ;
输出结果为
1 2 3 4 5 6 7 8 9 10 11 12 mysql> show variables like '%log_bin%'; +---------------------------------+--------------------------------+ | Variable_name | Value | +---------------------------------+--------------------------------+ | log_bin | ON | | log_bin_basename | /var/lib/mysql/mysql-bin | | log_bin_index | /var/lib/mysql/mysql-bin.index | | log_bin_trust_function_creators | OFF | | log_bin_use_v1_row_events | OFF | | sql_log_bin | ON | +---------------------------------+--------------------------------+ 6 rows in set (0.01 sec)
可以看到 log_bin 为 ON 就是成功开启
3. 最后,还要为 MySQL 添加一个 canal 用户并授予相应权限,canal 才能监听日志
1 2 3 CREATE USER 'canal' @'%' IDENTIFIED BY 'canal' ;grant all privileges on * .* to 'canal' @'%' identified by 'canal' ;flush privileges;
Canal 配置并运行
Canal 仓库的 Github 地址为:https://github.com/alibaba/canal
最初尝试直接 docker 拉取镜像运行容器,但是尝试了很多次,能监听到 canal-server 服务,但是服务无法监听到 binlog,最后还是改为宿主机本地运行 canal-server 服务。
下载 canal-deployer 代码,此处是使用的最新的 1.1.5 版本
1 https://github.com/alibaba/canal/releases/download/canal-1.1.5/canal.deployer-1.1.5.tar.gz
创建文件夹
解压到新建文件夹下
1 tar -zxvf canal.deployer-1.1.5.tar.gz -C canal-server/
修改 instance.properties 配置
1 vim conf/example/instance.properties
需要修改的内容为
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 canal.instance.gtidon=false canal.instance.master.address=127.0.0.1:3306 canal.instance.master.journal.name= canal.instance.master.position= canal.instance.master.timestamp= canal.instance.master.gtid= canal.instance.rds.accesskey= canal.instance.rds.secretkey= canal.instance.rds.instanceId= canal.instance.tsdb.enable=true canal.instance.dbUsername=canal canal.instance.dbPassword=canal canal.instance.connectionCharset = UTF-8 canal.instance.enableDruid=false canal.instance.filter.regex=.*\\..* canal.instance.filter.black.regex=mysql\\.slave_.* canal.mq.topic=canaltopic canal.mq.partition=0
修改完成后进入 bin 目录,启动服务
1 2 cd ../../bin ./startup.sh
Kafka 安装并运行
Mac 上直接使用 Homebrew 进行 ZooKeeper 和 Kafka 的安装
1 2 brew install kafka brew install zookeeper
以服务的方式启动 kafka
1 brew services start kafka
创建一个 topic 来接收 canal 生产的信息,名称为 canaltopic
1 kafka-topics --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic canaltopic
然后要对 canal 进行 MQ 相关功能的配置
4. canal.properties
1 2 3 4 5 6 7 8 canal.serverMode = kafka canal.instance.parser.parallelThreadSize = 16 canal.mq.servers = 127.0 .0 .1 :9092 canal.destinations = example
instance.properties
1 2 canal.mq.topic=canaltopic
重启 canal-server
配置完成后到 bin
目录下执行 ./restart.sh
重启 canal-server 服务
整合测试
测试 canal 是否能够顺利读取和解析 MySQL 的 binlog 并将信息发送到对应 topic
启动一个消费者来监听 canaltopic
1 kafka-console-consumer --bootstrap-server 127.0.0.1:9092 --from-beginning --topic canaltopic
执行 DML 语句
1 2 3 4 5 mysql> insert into xdual value (null ,now()); Query OK, 1 row affected (0.01 sec) mysql> delete from xdual where id= 23 ; Query OK, 1 row affected (0.01 sec)
消费者端输出
1 2 3 (base) Simbas-MacBook-Pro:~ simba$ kafka-console-consumer --bootstrap-server 127.0.0.1:9092 --from-beginning --topic canaltopic {"data":[{"ID":"23","X":"2021-06-22 15:37:59"}],"database":"test","es":1624347479000,"id":10,"isDdl":false,"mysqlType":{"ID":"int(11)","X":"timestamp(0)"},"old":null,"pkNames":["ID"],"sql":"","sqlType":{"ID":4,"X":93},"table":"xdual","ts":1624347479906,"type":"INSERT"} {"data":[{"ID":"23","X":"2021-06-22 15:37:59"}],"database":"test","es":1624347492000,"id":11,"isDdl":false,"mysqlType":{"ID":"int(11)","X":"timestamp(0)"},"old":null,"pkNames":["ID"],"sql":"","sqlType":{"ID":4,"X":93},"table":"xdual","ts":1624347493103,"type":"DELETE"}
Canal, Redis 和 Kafka 整合项目
引入所需的依赖
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 <dependency > <groupId > org.springframework.kafka</groupId > <artifactId > spring-kafka</artifactId > <version > 2.3.3.RELEASE</version > </dependency > <dependency > <groupId > org.springframework.boot</groupId > <artifactId > spring-boot-starter-data-redis</artifactId > <exclusions > <exclusion > <groupId > io.lettuce</groupId > <artifactId > lettuce-core</artifactId > </exclusion > </exclusions > </dependency > <dependency > <groupId > redis.clients</groupId > <artifactId > jedis</artifactId > </dependency > <dependency > <groupId > com.alibaba</groupId > <artifactId > fastjson</artifactId > <version > 1.2.61</version > </dependency > <dependency > <groupId > com.alibaba.otter</groupId > <artifactId > canal.client</artifactId > <version > 1.1.0</version > </dependency > <dependency > <groupId > org.projectlombok</groupId > <artifactId > lombok</artifactId > <version > 1.18.20</version > <scope > provided</scope > </dependency > <dependency > <groupId > org.springframework.boot</groupId > <artifactId > spring-boot-starter-test</artifactId > <scope > test</scope > </dependency > <dependency > <groupId > junit</groupId > <artifactId > junit</artifactId > <scope > test</scope > </dependency >
在 application.yaml 中添加配置信息
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 spring: redis: host: 127.0 .0 .1 port: 6379 password: root123 database: 0 timeout: 0 jedis: pool: max-active: 8 max-idle: 8 min-idle: 0 max-wait: -1 kafka: bootstrap-servers: 127.0 .0 .1 :9092 producer: acks: 1 retries: 3 key-serializer: org.apache.kafka.common.serialization.StringSerializer value-serializer: org.apache.kafka.common.serialization.StringSerializer batch-size: 65536 buffer-memory: 524288 consumer: group-id: consumer-group1 key-deserializer: org.apache.kafka.common.serialization.StringDeserializer value-deserializer: org.apache.kafka.common.serialization.StringDeserializer listener: missing-topics-fatal: false
封装一个操作 Redis 的工具类
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 package com.simba.canalkafkaredis.redis;import org.springframework.data.redis.core.StringRedisTemplate;import org.springframework.stereotype.Component;import javax.annotation.Resource;import java.util.concurrent.TimeUnit;@Component public class RedisClient { @Resource private StringRedisTemplate stringRedisTemplate; public void setString (String key, String value) { setString(key, value, null ); } public void setString (String key, String value, Long timeOut) { stringRedisTemplate.opsForValue().set(key, value); if (timeOut != null ) { stringRedisTemplate.expire(key, timeOut, TimeUnit.SECONDS); } } public String getString (String key) { return stringRedisTemplate.opsForValue().get(key); } public Boolean deleteKey (String key) { return stringRedisTemplate.delete(key); } }
新建一个 model 对象来保存 Xdual 表的数据
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 package com.simba.canalkafkaredis.model;import lombok.AllArgsConstructor;import lombok.Data;import lombok.NoArgsConstructor;@Data @AllArgsConstructor @NoArgsConstructor public class Xdual { private Integer ID; private String X; }
根据 Canal 发送给 Kafka 的 json 数据格式,创建一个 CanalBean 来接收数据
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 package com.simba.canalkafkaredis.canal;import com.simba.canalkafkaredis.model.Xdual;import lombok.AllArgsConstructor;import lombok.Data;import lombok.NoArgsConstructor;import java.util.List;@Data @AllArgsConstructor @NoArgsConstructor public class CanalBean { private List<Xdual> data; private String database; private Long es; private Integer id; private Boolean isDdl; private MysqlType mysqlType; private String old; private List<String> pkNames; private String sql; private SqlType sqlType; private String table; private Long ts; private String type; }
对应的 MysqlType 和 SqlType 类
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 package com.simba.canalkafkaredis.canal;import lombok.AllArgsConstructor;import lombok.Data;import lombok.NoArgsConstructor;@Data @AllArgsConstructor @NoArgsConstructor public class MysqlType { private String ID; private String X; }
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 package com.simba.canalkafkaredis.canal;import lombok.AllArgsConstructor;import lombok.Data;import lombok.NoArgsConstructor;@Data @AllArgsConstructor @NoArgsConstructor public class SqlType { private Integer ID; private Integer X; }
创建消费者类来消费信息
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 package com.simba.canalkafkaredis.consumer;import com.alibaba.fastjson.JSONObject;import com.simba.canalkafkaredis.canal.CanalBean;import com.simba.canalkafkaredis.model.Xdual;import com.simba.canalkafkaredis.redis.RedisClient;import lombok.extern.slf4j.Slf4j;import org.apache.kafka.clients.consumer.ConsumerRecord;import org.springframework.kafka.annotation.KafkaListener;import org.springframework.stereotype.Component;import javax.annotation.Resource;import java.util.List;@Slf4j @Component public class CanalConsumer { private static final Long TIME_OUT = 600L ; @Resource private RedisClient redisClient; @KafkaListener(topics = "canaltopic") public void receive (ConsumerRecord<?,?> consumer) { String value = (String)consumer.value(); log.info("topic名称:{},key:{},分区位置:{},下标:{},value:{}" , consumer.topic(), consumer.key(),consumer.partition(), consumer.offset(), value); CanalBean canalBean = JSONObject.parseObject(value, CanalBean.class); Boolean isDdl = canalBean.getIsDdl(); String type = canalBean.getType(); if (!isDdl) { List<Xdual> items = canalBean.getData(); if ("INSERT" .equals(type)) { for (Xdual item : items) { Integer id = item.getID(); redisClient.setString(String.valueOf(id), JSONObject.toJSONString(item), TIME_OUT); log.info("插入---id号:{},值:{}" , id, item); } } else if ("UPDATE" .equals(type)) { for (Xdual item : items) { Integer id = item.getID(); redisClient.setString(String.valueOf(id), JSONObject.toJSONString(item), TIME_OUT); log.info("更新---id号:{},值:{}" , id, item); } } else if ("DELETE" .equals(type)){ for (Xdual item : items) { Integer id = item.getID(); redisClient.deleteKey(String.valueOf(id)); log.info("删除---id号:{},值:{}" , id, item); } } else { log.error("Unsupported type!" ); } } } }
启动项目并测试
添加数据
SQL
1 2 mysql> insert into xdual value (null ,now()); Query OK, 1 row affected (0.01 sec)
日志
1 2 2021-06-22 15:57:01.301 INFO 91336 --- [ntainer#0-0-C-1] c.s.c.consumer.CanalConsumer : topic名称:canaltopic,key:null,分区位置:0,下标:10,value:{"data":[{"ID":"24","X":"2021-06-22 15:57:01"}],"database":"test","es":1624348621000,"id":12,"isDdl":false,"mysqlType":{"ID":"int(11)","X":"timestamp(0)"},"old":null,"pkNames":["ID"],"sql":"","sqlType":{"ID":4,"X":93},"table":"xdual","ts":1624348621294,"type":"INSERT"} 2021-06-22 15:57:01.307 INFO 91336 --- [ntainer#0-0-C-1] c.s.c.consumer.CanalConsumer : 插入---id号:24,值:Xdual(ID=24, X=2021-06-22 15:57:01)
Redis
1 2 127.0.0.1:6379> get 24 "{\"iD\":24,\"x\":\"2021-06-22 15:57:01\"}"
修改数据
SQL
1 2 3 mysql> update xdual set X= now() where id= 24 ; Query OK, 1 row affected (0.01 sec) Rows matched: 1 Changed: 1 Warnings: 0
日志
1 2 3 2021-06-22 15:59:24.315 INFO 91336 --- [ntainer#0-0-C-1] c.s.c.consumer.CanalConsumer : topic名称:canaltopic,key:null,分区位置:0,下标:11,value:{"data":[{"ID":"24","X":"2021-06-22 15:59:24"}],"database":"test","es":1624348764000,"id":13,"isDdl":false,"mysqlType":{"ID":"int(11)","X":"timestamp(0)"},"old":[{"X":"2021-06-22 15:57:01"}],"pkNames":["ID"],"sql":"","sqlType":{"ID":4,"X":93},"table":"xdual","ts":1624348764309,"type":"UPDATE"} 2021-06-22 15:59:24.332 INFO 91336 --- [ntainer#0-0-C-1] c.s.c.consumer.CanalConsumer : 更新---id号:24,值:Xdual(ID=24, X=2021-06-22 15:59:24)
Redis
1 2 127.0.0.1:6379> get 24 "{\"iD\":24,\"x\":\"2021-06-22 15:59:24\"}"
删除数据
SQL
1 2 mysql> delete from xdual where id= 24 ; Query OK, 1 row affected (0.01 sec)
日志
1 2 2021-06-22 16:01:48.637 INFO 91336 --- [ntainer#0-0-C-1] c.s.c.consumer.CanalConsumer : topic名称:canaltopic,key:null,分区位置:0,下标:12,value:{"data":[{"ID":"24","X":"2021-06-22 15:59:24"}],"database":"test","es":1624348908000,"id":14,"isDdl":false,"mysqlType":{"ID":"int(11)","X":"timestamp(0)"},"old":null,"pkNames":["ID"],"sql":"","sqlType":{"ID":4,"X":93},"table":"xdual","ts":1624348908632,"type":"DELETE"} 2021-06-22 16:01:48.646 INFO 91336 --- [ntainer#0-0-C-1] c.s.c.consumer.CanalConsumer : 删除---id号:24,值:Xdual(ID=24, X=2021-06-22 15:59:24)
Redis
1 2 127.0.0.1:6379> get 24 (nil)
总结
以上就是一个试验成功的简单 demo
参考
基于阿里巴巴Canal框架,TCP协议实现监听Mysql数据变化
详细讲解!Canal+Kafka实现MySQL与Redis数据同步!