基于 Canel 数据实时同步到 Redis

简介

最近一个业务中,要求 Redis 中的数据和 MySQL 中的数据保持较高的一致性,如果缓存和数据库内容不一致,就会导致客户端出现一些问题。针对这个问题,该项目打算使用的解决办法是通过读取 binlog 并进行解析,将解析后的信息放入 MQ 中,然后专门提供一个服务消费 MQ 中的信息,将更新操作同步到 Redis,实现 Redis 和 MySQL 数据的一致性。
阿里刚好提供了开源工具 canal 能够实现读取 binlog 并解析的功能,这篇文章记录了使用 Canal 实现两个数据库消息同步的简单 demo,消息队列中间件使用的是 Kafka。

前期配置

MySQL 开启 binlog 日志主从同步

因为 Canal 是伪装成从服务来监听 MySQL 的 binlog,所以首先需要让 MySQL 允许 binlog 日志主从同步

  1. 本机上的 MySQL 是以 Docker 容器的方式运行的,通过以下命令打开并编辑 MySQL 的配置文件
1
2
docker exec -it 71cd220d1f8eef6df43ece34b6b38fe480791c82f3ac434b903fc25b06933384 /bin/sh
vim etc/mysql/mysql.conf.d/mysqld.cnf
  1. [mysqld]标签下添加以下内容
1
2
3
log-bin         = /var/lib/mysql/mysql-bin #日志记录到指定位置
binlog-format = ROW #记录只要数据发生修改,就记录到日志中
server_id = 1 #mysql主从复制的唯一id,不允许重复

重启 MySQL 后可以进入 MySQL 输入以下语句查看 binlog 日志主从同步是否生效

1
show variables like '%log_bin%';

输出结果为

1
2
3
4
5
6
7
8
9
10
11
12
mysql> show variables like '%log_bin%';
+---------------------------------+--------------------------------+
| Variable_name | Value |
+---------------------------------+--------------------------------+
| log_bin | ON |
| log_bin_basename | /var/lib/mysql/mysql-bin |
| log_bin_index | /var/lib/mysql/mysql-bin.index |
| log_bin_trust_function_creators | OFF |
| log_bin_use_v1_row_events | OFF |
| sql_log_bin | ON |
+---------------------------------+--------------------------------+
6 rows in set (0.01 sec)

可以看到 log_bin 为 ON 就是成功开启
3. 最后,还要为 MySQL 添加一个 canal 用户并授予相应权限,canal 才能监听日志

1
2
3
CREATE USER 'canal'@'%' IDENTIFIED BY 'canal';
grant all privileges on *.* to 'canal'@'%' identified by 'canal';
flush privileges;

Canal 配置并运行

Canal 仓库的 Github 地址为:https://github.com/alibaba/canal
最初尝试直接 docker 拉取镜像运行容器,但是尝试了很多次,能监听到 canal-server 服务,但是服务无法监听到 binlog,最后还是改为宿主机本地运行 canal-server 服务。

  1. 下载 canal-deployer 代码,此处是使用的最新的 1.1.5 版本
1
https://github.com/alibaba/canal/releases/download/canal-1.1.5/canal.deployer-1.1.5.tar.gz
  1. 创建文件夹
1
mkdir canal-server
  1. 解压到新建文件夹下
1
tar -zxvf canal.deployer-1.1.5.tar.gz -C canal-server/
  1. 修改 instance.properties 配置
1
vim conf/example/instance.properties

需要修改的内容为

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
#################################################
## mysql serverId , v1.0.26+ will autoGen
# canal.instance.mysql.slaveId=0

# enable gtid use true/false
canal.instance.gtidon=false

# position info
canal.instance.master.address=127.0.0.1:3306 # 此处为运行 MySQL 的 IP 地址
canal.instance.master.journal.name=
canal.instance.master.position=
canal.instance.master.timestamp=
canal.instance.master.gtid=

# rds oss binlog
canal.instance.rds.accesskey=
canal.instance.rds.secretkey=
canal.instance.rds.instanceId=

# table meta tsdb info
canal.instance.tsdb.enable=true
#canal.instance.tsdb.url=jdbc:mysql://127.0.0.1:3306/canal_tsdb
#canal.instance.tsdb.dbUsername=canal
#canal.instance.tsdb.dbPassword=canal

#canal.instance.standby.address =
#canal.instance.standby.journal.name =
#canal.instance.standby.position =
#canal.instance.standby.timestamp =
#canal.instance.standby.gtid=

# username/password
canal.instance.dbUsername=canal # 数据库 canal 的用户名,可以监听 binlog 日志
canal.instance.dbPassword=canal # 数据库 canal 的密码
canal.instance.connectionCharset = UTF-8
# enable druid Decrypt database password
canal.instance.enableDruid=false
#canal.instance.pwdPublicKey=MFwwDQYJKoZIhvcNAQEBBQADSwAwSAJBALK4BUxdDltRRE5/zXpVEVPUgunvscYFtEip3pmLlhrWpacX7y7GCMo2/JM6LeHmiiNdH1FWgGCpUfircSwlWKUCAwEAAQ==

# table regex
canal.instance.filter.regex=.*\\..* # 监听所有数据库
# table black regex
canal.instance.filter.black.regex=mysql\\.slave_.*
# table field filter(format: schema1.tableName1:field1/field2,schema2.tableName2:field1/field2)
#canal.instance.filter.field=test1.t_product:id/subject/keywords,test2.t_company:id/name/contact/ch
# table field black filter(format: schema1.tableName1:field1/field2,schema2.tableName2:field1/field2)
#canal.instance.filter.black.field=test1.t_product:subject/product_image,test2.t_company:id/name/contact/ch

# mq config
canal.mq.topic=canaltopic # 监听的 kafka topic 名称
# dynamic topic route by schema or table regex
#canal.mq.dynamicTopic=mytest1.user,mytest2\\..*,.*\\..*
canal.mq.partition=0
# hash partition config
#canal.mq.partitionsNum=3
#canal.mq.partitionHash=test.table:id^name,.*\\..*
#canal.mq.dynamicTopicPartitionNum=test.*:4,mycanal:6
#################################################

  1. 修改完成后进入 bin 目录,启动服务
1
2
cd ../../bin
./startup.sh

Kafka 安装并运行

  1. Mac 上直接使用 Homebrew 进行 ZooKeeper 和 Kafka 的安装
1
2
brew install kafka
brew install zookeeper
  1. 以服务的方式启动 kafka
1
brew services start kafka
  1. 创建一个 topic 来接收 canal 生产的信息,名称为 canaltopic
1
kafka-topics --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic canaltopic

然后要对 canal 进行 MQ 相关功能的配置
4. canal.properties

1
2
3
4
5
6
7
8
# tcp, kafka, RocketMQ 这里选择kafka模式
canal.serverMode = kafka
# 解析器的线程数,打开此配置,不打开则会出现阻塞或者不进行解析的情况
canal.instance.parser.parallelThreadSize = 16
# 配置MQ的服务地址,这里配置的是kafka对应的地址和端口
canal.mq.servers = 127.0.0.1:9092
# 配置instance,在conf目录下要有example同名的目录,可以配置多个
canal.destinations = example
  1. instance.properties
1
2
#MQ队列名称
canal.mq.topic=canaltopic
  1. 重启 canal-server
    配置完成后到 bin 目录下执行 ./restart.sh 重启 canal-server 服务

整合测试

测试 canal 是否能够顺利读取和解析 MySQL 的 binlog 并将信息发送到对应 topic

  1. 启动一个消费者来监听 canaltopic
1
kafka-console-consumer --bootstrap-server 127.0.0.1:9092 --from-beginning --topic canaltopic
  1. 执行 DML 语句
1
2
3
4
5
mysql> insert into xdual value(null,now());
Query OK, 1 row affected (0.01 sec)

mysql> delete from xdual where id=23;
Query OK, 1 row affected (0.01 sec)
  1. 消费者端输出
1
2
3
(base) Simbas-MacBook-Pro:~ simba$ kafka-console-consumer --bootstrap-server 127.0.0.1:9092 --from-beginning --topic canaltopic
{"data":[{"ID":"23","X":"2021-06-22 15:37:59"}],"database":"test","es":1624347479000,"id":10,"isDdl":false,"mysqlType":{"ID":"int(11)","X":"timestamp(0)"},"old":null,"pkNames":["ID"],"sql":"","sqlType":{"ID":4,"X":93},"table":"xdual","ts":1624347479906,"type":"INSERT"}
{"data":[{"ID":"23","X":"2021-06-22 15:37:59"}],"database":"test","es":1624347492000,"id":11,"isDdl":false,"mysqlType":{"ID":"int(11)","X":"timestamp(0)"},"old":null,"pkNames":["ID"],"sql":"","sqlType":{"ID":4,"X":93},"table":"xdual","ts":1624347493103,"type":"DELETE"}

Canal, Redis 和 Kafka 整合项目

  1. 引入所需的依赖
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
<!-- 所用的 Spring Boot 版本为 2.2.1 -->
<!-- 引入 Spring-Kafka 依赖 -->
<!-- 已经内置 kafka-clients 依赖,所以无需重复引入 -->
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
<version>2.3.3.RELEASE</version>
</dependency>
<!-- 实现对 Spring Data Redis 的自动化配置 -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-redis</artifactId>
<exclusions>
<!-- 去掉对 Lettuce 的依赖,因为 Spring Boot 优先使用 Lettuce 作为 Redis 客户端 -->
<exclusion>
<groupId>io.lettuce</groupId>
<artifactId>lettuce-core</artifactId>
</exclusion>
</exclusions>
</dependency>
<!-- 引入 Jedis 的依赖,这样 Spring Boot 实现对 Jedis 的自动化配置 -->
<dependency>
<groupId>redis.clients</groupId>
<artifactId>jedis</artifactId>
</dependency>
<!-- 使用 fastjson 作为 JSON 序列化的工具 -->
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
<version>1.2.61</version>
</dependency>
<dependency>
<groupId>com.alibaba.otter</groupId>
<artifactId>canal.client</artifactId>
<version>1.1.0</version>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<version>1.18.20</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>

<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<scope>test</scope>
</dependency>
  1. 在 application.yaml 中添加配置信息
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
spring:
# 对应 RedisProperties 类
redis:
host: 127.0.0.1
port: 6379
password: root123 # Redis 服务器密码,默认为空。生产中,一定要设置 Redis 密码!
database: 0 # Redis 数据库号,默认为 0 。
timeout: 0 # Redis 连接超时时间,单位:毫秒。
# 对应 RedisProperties.Jedis 内部类
jedis:
pool:
max-active: 8 # 连接池最大连接数,默认为 8 。使用负数表示没有限制。
max-idle: 8 # 默认连接数最小空闲的连接数,默认为 8 。使用负数表示没有限制。
min-idle: 0 # 默认连接池最小空闲的连接数,默认为 0 。允许设置 0 和 正数。
max-wait: -1 # 连接池最大阻塞等待时间,单位:毫秒。默认为 -1 ,表示不限制。
# Kafka 配置项,对应 KafkaProperties 配置类
kafka:
bootstrap-servers: 127.0.0.1:9092 # 指定 Kafka Broker 地址,可以设置多个,以逗号分隔
# Kafka Producer 配置项
producer:
acks: 1 # 0-不应答。1-leader 应答。all-所有 leader 和 follower 应答。
retries: 3 # 发送失败时,重试发送的次数
key-serializer: org.apache.kafka.common.serialization.StringSerializer # 消息的 key 的序列化
value-serializer: org.apache.kafka.common.serialization.StringSerializer # 消息的 value 的序列化
batch-size: 65536 # 每次批量发送消息的最大数量
buffer-memory: 524288 # 每次批量发送消息的最大内存
# Kafka Consumer 配置项
consumer:
# 指定一个默认的组名
group-id: consumer-group1
#auto-offset-reset: earliest # 设置消费者分组最初的消费进度为 earliest 。可参考博客 https://blog.csdn.net/lishuangzhe7047/article/details/74530417 理解
key-deserializer: org.apache.kafka.common.serialization.StringDeserializer # 消息的 key 的反序列化
value-deserializer: org.apache.kafka.common.serialization.StringDeserializer # 消息的 value 的反序列化
# Kafka Consumer Listener 监听器配置
listener:
missing-topics-fatal: false # 消费监听接口监听的主题不存在时,默认会报错。所以通过设置为 false ,解决报错
  1. 封装一个操作 Redis 的工具类
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
package com.simba.canalkafkaredis.redis;

import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.stereotype.Component;

import javax.annotation.Resource;
import java.util.concurrent.TimeUnit;

@Component
public class RedisClient {

@Resource
private StringRedisTemplate stringRedisTemplate;

/**
* 设置redis的key-value
*/
public void setString(String key, String value) {
setString(key, value, null);
}

/**
* 设置redis的key-value,带过期时间
* 时间单位:秒
*/
public void setString(String key, String value, Long timeOut) {
stringRedisTemplate.opsForValue().set(key, value);
if (timeOut != null) {
stringRedisTemplate.expire(key, timeOut, TimeUnit.SECONDS);
}
}

/**
* 获取redis中key对应的值
*/
public String getString(String key) {
return stringRedisTemplate.opsForValue().get(key);
}

/**
* 删除redis中key对应的值
*/
public Boolean deleteKey(String key) {
return stringRedisTemplate.delete(key);
}

}
  1. 新建一个 model 对象来保存 Xdual 表的数据
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
package com.simba.canalkafkaredis.model;

import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;

@Data
@AllArgsConstructor
@NoArgsConstructor
public class Xdual {

private Integer ID;

private String X;

}

  1. 根据 Canal 发送给 Kafka 的 json 数据格式,创建一个 CanalBean 来接收数据
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
package com.simba.canalkafkaredis.canal;

import com.simba.canalkafkaredis.model.Xdual;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;

import java.util.List;

@Data
@AllArgsConstructor
@NoArgsConstructor
public class CanalBean {

private List<Xdual> data;

private String database;

private Long es;

private Integer id;

private Boolean isDdl;

private MysqlType mysqlType;

private String old;

private List<String> pkNames;

private String sql;

private SqlType sqlType;

private String table;

private Long ts; //(新增)INSERT、(更新)UPDATE、(删除)DELETE、(删除表)ERASE等等

private String type;

}

  1. 对应的 MysqlType 和 SqlType 类
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
package com.simba.canalkafkaredis.canal;

import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;

@Data
@AllArgsConstructor
@NoArgsConstructor
public class MysqlType {

private String ID;

private String X;

}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
package com.simba.canalkafkaredis.canal;

import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;

@Data
@AllArgsConstructor
@NoArgsConstructor
public class SqlType {

private Integer ID;

private Integer X;

}
  1. 创建消费者类来消费信息
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
package com.simba.canalkafkaredis.consumer;

import com.alibaba.fastjson.JSONObject;
import com.simba.canalkafkaredis.canal.CanalBean;
import com.simba.canalkafkaredis.model.Xdual;
import com.simba.canalkafkaredis.redis.RedisClient;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Component;

import javax.annotation.Resource;
import java.util.List;

@Slf4j
@Component
public class CanalConsumer {

private static final Long TIME_OUT = 600L;

@Resource
private RedisClient redisClient;

@KafkaListener(topics = "canaltopic")
public void receive(ConsumerRecord<?,?> consumer) {

String value = (String)consumer.value();
log.info("topic名称:{},key:{},分区位置:{},下标:{},value:{}", consumer.topic(), consumer.key(),consumer.partition(), consumer.offset(), value);
// 转换为 javabean
CanalBean canalBean = JSONObject.parseObject(value, CanalBean.class);
// 是否是 DDL 语句
Boolean isDdl = canalBean.getIsDdl();
// 获取语句类型
String type = canalBean.getType();
if (!isDdl) {
List<Xdual> items = canalBean.getData();
if ("INSERT".equals(type)) {
//新增语句
for (Xdual item : items) {
Integer id = item.getID();
//新增到redis中,过期时间是10分钟
redisClient.setString(String.valueOf(id), JSONObject.toJSONString(item), TIME_OUT);
log.info("插入---id号:{},值:{}", id, item);
}
} else if ("UPDATE".equals(type)) {
//更新语句
for (Xdual item : items) {
Integer id = item.getID();
//更新到redis中,过期时间是10分钟
redisClient.setString(String.valueOf(id), JSONObject.toJSONString(item), TIME_OUT);
log.info("更新---id号:{},值:{}", id, item);
}
} else if ("DELETE".equals(type)){
//删除语句
for (Xdual item : items) {
Integer id = item.getID();
//从redis中删除
redisClient.deleteKey(String.valueOf(id));
log.info("删除---id号:{},值:{}", id, item);
}
} else {
log.error("Unsupported type!");
}
}
}
}

启动项目并测试

  1. 添加数据
    1. SQL
1
2
mysql> insert into xdual value(null,now());
Query OK, 1 row affected (0.01 sec)
  1. 日志
1
2
2021-06-22 15:57:01.301  INFO 91336 --- [ntainer#0-0-C-1] c.s.c.consumer.CanalConsumer             : topic名称:canaltopic,key:null,分区位置:0,下标:10,value:{"data":[{"ID":"24","X":"2021-06-22 15:57:01"}],"database":"test","es":1624348621000,"id":12,"isDdl":false,"mysqlType":{"ID":"int(11)","X":"timestamp(0)"},"old":null,"pkNames":["ID"],"sql":"","sqlType":{"ID":4,"X":93},"table":"xdual","ts":1624348621294,"type":"INSERT"}
2021-06-22 15:57:01.307 INFO 91336 --- [ntainer#0-0-C-1] c.s.c.consumer.CanalConsumer : 插入---id号:24,值:Xdual(ID=24, X=2021-06-22 15:57:01)
  1. Redis
1
2
127.0.0.1:6379> get 24
"{\"iD\":24,\"x\":\"2021-06-22 15:57:01\"}"
  1. 修改数据
    1. SQL
1
2
3
mysql> update xdual set X=now() where id=24;
Query OK, 1 row affected (0.01 sec)
Rows matched: 1 Changed: 1 Warnings: 0
  1. 日志
1
2
3
2021-06-22 15:59:24.315  INFO 91336 --- [ntainer#0-0-C-1] c.s.c.consumer.CanalConsumer             : topic名称:canaltopic,key:null,分区位置:0,下标:11,value:{"data":[{"ID":"24","X":"2021-06-22 15:59:24"}],"database":"test","es":1624348764000,"id":13,"isDdl":false,"mysqlType":{"ID":"int(11)","X":"timestamp(0)"},"old":[{"X":"2021-06-22 15:57:01"}],"pkNames":["ID"],"sql":"","sqlType":{"ID":4,"X":93},"table":"xdual","ts":1624348764309,"type":"UPDATE"}
2021-06-22 15:59:24.332 INFO 91336 --- [ntainer#0-0-C-1] c.s.c.consumer.CanalConsumer : 更新---id号:24,值:Xdual(ID=24, X=2021-06-22 15:59:24)

  1. Redis
1
2
127.0.0.1:6379> get 24
"{\"iD\":24,\"x\":\"2021-06-22 15:59:24\"}"
  1. 删除数据
    1. SQL
1
2
mysql> delete from xdual where id=24;
Query OK, 1 row affected (0.01 sec)
  1. 日志
1
2
2021-06-22 16:01:48.637  INFO 91336 --- [ntainer#0-0-C-1] c.s.c.consumer.CanalConsumer             : topic名称:canaltopic,key:null,分区位置:0,下标:12,value:{"data":[{"ID":"24","X":"2021-06-22 15:59:24"}],"database":"test","es":1624348908000,"id":14,"isDdl":false,"mysqlType":{"ID":"int(11)","X":"timestamp(0)"},"old":null,"pkNames":["ID"],"sql":"","sqlType":{"ID":4,"X":93},"table":"xdual","ts":1624348908632,"type":"DELETE"}
2021-06-22 16:01:48.646 INFO 91336 --- [ntainer#0-0-C-1] c.s.c.consumer.CanalConsumer : 删除---id号:24,值:Xdual(ID=24, X=2021-06-22 15:59:24)
  1. Redis
1
2
127.0.0.1:6379> get 24
(nil)

总结

以上就是一个试验成功的简单 demo

参考

基于阿里巴巴Canal框架,TCP协议实现监听Mysql数据变化
详细讲解!Canal+Kafka实现MySQL与Redis数据同步!