0%

配置文件

电台项目中消费 binlog 的服务需要对 MySQL 的配置文件进行修改,修改参数如下

1
2
3
4
5
6
[mysqld]
log-bin=mysql-bin # 日志记录到指定位置
binlog-format=ROW # 只要记录发生修改,就写到 binlog 中
binlog_row_image=full
log_slave_updates=1
server-id=1

log_slave_updates=1 是因为 redis 同步服务实际消费的是从节点的 binlog,而从节点是通过消费主节点的 binlog 来实现主从同步。log_slave_updates=1 所代表的就是从服务器将在主服务器收到的更新记录记录到从服务器自己的 binlog 中,由此 redis 同步服务才能消费到对应 MySQL 服务器的 binlog
如果没有该配置,会造成的情况是,主节点的数据变更后,从节点能够正常同步主节点的信息,但是 redis 同步服务却无法获取到更新信息,因为从节点没有记录 binlog
binlog_row_image=full 记录的信息最为全面
数据库中有前镜像和后镜像的概念:

  • 前镜像(before image): 记录修改前的内容
  • 后镜像(after image): 记录修改后的内容
    对 insert 来说,只有后镜像没有前镜像;
    对 update 来说,有前镜像和后镜像;
    对 delete 来说,只有前镜像没有后镜像
    binlog_row_image=full 可以覆盖到所有修改信息

Binlog 日志模式

ROW Level
详细记录每一行数据被修改的细节。优点是数据准确,缺点是产生的 binlog 数据量巨大,尤其是发生批量修改时
Statement Level
记录的是修改记录的那条 SQL 语句。优点是数据量小,缺点是对函数和存储过程这类特殊功能的复制不友好
Mixed
会根据每一条 SQL 记录来选择适合的日志格式

简介

最近一个业务中,要求 Redis 中的数据和 MySQL 中的数据保持较高的一致性,如果缓存和数据库内容不一致,就会导致客户端出现一些问题。针对这个问题,该项目打算使用的解决办法是通过读取 binlog 并进行解析,将解析后的信息放入 MQ 中,然后专门提供一个服务消费 MQ 中的信息,将更新操作同步到 Redis,实现 Redis 和 MySQL 数据的一致性。
阿里刚好提供了开源工具 canal 能够实现读取 binlog 并解析的功能,这篇文章记录了使用 Canal 实现两个数据库消息同步的简单 demo,消息队列中间件使用的是 Kafka。

前期配置

MySQL 开启 binlog 日志主从同步

因为 Canal 是伪装成从服务来监听 MySQL 的 binlog,所以首先需要让 MySQL 允许 binlog 日志主从同步

  1. 本机上的 MySQL 是以 Docker 容器的方式运行的,通过以下命令打开并编辑 MySQL 的配置文件
1
2
docker exec -it 71cd220d1f8eef6df43ece34b6b38fe480791c82f3ac434b903fc25b06933384 /bin/sh
vim etc/mysql/mysql.conf.d/mysqld.cnf
  1. [mysqld]标签下添加以下内容
1
2
3
log-bin         = /var/lib/mysql/mysql-bin #日志记录到指定位置
binlog-format = ROW #记录只要数据发生修改,就记录到日志中
server_id = 1 #mysql主从复制的唯一id,不允许重复

重启 MySQL 后可以进入 MySQL 输入以下语句查看 binlog 日志主从同步是否生效

1
show variables like '%log_bin%';

输出结果为

1
2
3
4
5
6
7
8
9
10
11
12
mysql> show variables like '%log_bin%';
+---------------------------------+--------------------------------+
| Variable_name | Value |
+---------------------------------+--------------------------------+
| log_bin | ON |
| log_bin_basename | /var/lib/mysql/mysql-bin |
| log_bin_index | /var/lib/mysql/mysql-bin.index |
| log_bin_trust_function_creators | OFF |
| log_bin_use_v1_row_events | OFF |
| sql_log_bin | ON |
+---------------------------------+--------------------------------+
6 rows in set (0.01 sec)

可以看到 log_bin 为 ON 就是成功开启
3. 最后,还要为 MySQL 添加一个 canal 用户并授予相应权限,canal 才能监听日志

1
2
3
CREATE USER 'canal'@'%' IDENTIFIED BY 'canal';
grant all privileges on *.* to 'canal'@'%' identified by 'canal';
flush privileges;

Canal 配置并运行

Canal 仓库的 Github 地址为:https://github.com/alibaba/canal
最初尝试直接 docker 拉取镜像运行容器,但是尝试了很多次,能监听到 canal-server 服务,但是服务无法监听到 binlog,最后还是改为宿主机本地运行 canal-server 服务。

  1. 下载 canal-deployer 代码,此处是使用的最新的 1.1.5 版本
1
https://github.com/alibaba/canal/releases/download/canal-1.1.5/canal.deployer-1.1.5.tar.gz
  1. 创建文件夹
1
mkdir canal-server
  1. 解压到新建文件夹下
1
tar -zxvf canal.deployer-1.1.5.tar.gz -C canal-server/
  1. 修改 instance.properties 配置
1
vim conf/example/instance.properties

需要修改的内容为

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
#################################################
## mysql serverId , v1.0.26+ will autoGen
# canal.instance.mysql.slaveId=0

# enable gtid use true/false
canal.instance.gtidon=false

# position info
canal.instance.master.address=127.0.0.1:3306 # 此处为运行 MySQL 的 IP 地址
canal.instance.master.journal.name=
canal.instance.master.position=
canal.instance.master.timestamp=
canal.instance.master.gtid=

# rds oss binlog
canal.instance.rds.accesskey=
canal.instance.rds.secretkey=
canal.instance.rds.instanceId=

# table meta tsdb info
canal.instance.tsdb.enable=true
#canal.instance.tsdb.url=jdbc:mysql://127.0.0.1:3306/canal_tsdb
#canal.instance.tsdb.dbUsername=canal
#canal.instance.tsdb.dbPassword=canal

#canal.instance.standby.address =
#canal.instance.standby.journal.name =
#canal.instance.standby.position =
#canal.instance.standby.timestamp =
#canal.instance.standby.gtid=

# username/password
canal.instance.dbUsername=canal # 数据库 canal 的用户名,可以监听 binlog 日志
canal.instance.dbPassword=canal # 数据库 canal 的密码
canal.instance.connectionCharset = UTF-8
# enable druid Decrypt database password
canal.instance.enableDruid=false
#canal.instance.pwdPublicKey=MFwwDQYJKoZIhvcNAQEBBQADSwAwSAJBALK4BUxdDltRRE5/zXpVEVPUgunvscYFtEip3pmLlhrWpacX7y7GCMo2/JM6LeHmiiNdH1FWgGCpUfircSwlWKUCAwEAAQ==

# table regex
canal.instance.filter.regex=.*\\..* # 监听所有数据库
# table black regex
canal.instance.filter.black.regex=mysql\\.slave_.*
# table field filter(format: schema1.tableName1:field1/field2,schema2.tableName2:field1/field2)
#canal.instance.filter.field=test1.t_product:id/subject/keywords,test2.t_company:id/name/contact/ch
# table field black filter(format: schema1.tableName1:field1/field2,schema2.tableName2:field1/field2)
#canal.instance.filter.black.field=test1.t_product:subject/product_image,test2.t_company:id/name/contact/ch

# mq config
canal.mq.topic=canaltopic # 监听的 kafka topic 名称
# dynamic topic route by schema or table regex
#canal.mq.dynamicTopic=mytest1.user,mytest2\\..*,.*\\..*
canal.mq.partition=0
# hash partition config
#canal.mq.partitionsNum=3
#canal.mq.partitionHash=test.table:id^name,.*\\..*
#canal.mq.dynamicTopicPartitionNum=test.*:4,mycanal:6
#################################################

  1. 修改完成后进入 bin 目录,启动服务
1
2
cd ../../bin
./startup.sh

Kafka 安装并运行

  1. Mac 上直接使用 Homebrew 进行 ZooKeeper 和 Kafka 的安装
1
2
brew install kafka
brew install zookeeper
  1. 以服务的方式启动 kafka
1
brew services start kafka
  1. 创建一个 topic 来接收 canal 生产的信息,名称为 canaltopic
1
kafka-topics --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic canaltopic

然后要对 canal 进行 MQ 相关功能的配置
4. canal.properties

1
2
3
4
5
6
7
8
# tcp, kafka, RocketMQ 这里选择kafka模式
canal.serverMode = kafka
# 解析器的线程数,打开此配置,不打开则会出现阻塞或者不进行解析的情况
canal.instance.parser.parallelThreadSize = 16
# 配置MQ的服务地址,这里配置的是kafka对应的地址和端口
canal.mq.servers = 127.0.0.1:9092
# 配置instance,在conf目录下要有example同名的目录,可以配置多个
canal.destinations = example
  1. instance.properties
1
2
#MQ队列名称
canal.mq.topic=canaltopic
  1. 重启 canal-server
    配置完成后到 bin 目录下执行 ./restart.sh 重启 canal-server 服务

整合测试

测试 canal 是否能够顺利读取和解析 MySQL 的 binlog 并将信息发送到对应 topic

  1. 启动一个消费者来监听 canaltopic
1
kafka-console-consumer --bootstrap-server 127.0.0.1:9092 --from-beginning --topic canaltopic
  1. 执行 DML 语句
1
2
3
4
5
mysql> insert into xdual value(null,now());
Query OK, 1 row affected (0.01 sec)

mysql> delete from xdual where id=23;
Query OK, 1 row affected (0.01 sec)
  1. 消费者端输出
1
2
3
(base) Simbas-MacBook-Pro:~ simba$ kafka-console-consumer --bootstrap-server 127.0.0.1:9092 --from-beginning --topic canaltopic
{"data":[{"ID":"23","X":"2021-06-22 15:37:59"}],"database":"test","es":1624347479000,"id":10,"isDdl":false,"mysqlType":{"ID":"int(11)","X":"timestamp(0)"},"old":null,"pkNames":["ID"],"sql":"","sqlType":{"ID":4,"X":93},"table":"xdual","ts":1624347479906,"type":"INSERT"}
{"data":[{"ID":"23","X":"2021-06-22 15:37:59"}],"database":"test","es":1624347492000,"id":11,"isDdl":false,"mysqlType":{"ID":"int(11)","X":"timestamp(0)"},"old":null,"pkNames":["ID"],"sql":"","sqlType":{"ID":4,"X":93},"table":"xdual","ts":1624347493103,"type":"DELETE"}

Canal, Redis 和 Kafka 整合项目

  1. 引入所需的依赖
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
<!-- 所用的 Spring Boot 版本为 2.2.1 -->
<!-- 引入 Spring-Kafka 依赖 -->
<!-- 已经内置 kafka-clients 依赖,所以无需重复引入 -->
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
<version>2.3.3.RELEASE</version>
</dependency>
<!-- 实现对 Spring Data Redis 的自动化配置 -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-redis</artifactId>
<exclusions>
<!-- 去掉对 Lettuce 的依赖,因为 Spring Boot 优先使用 Lettuce 作为 Redis 客户端 -->
<exclusion>
<groupId>io.lettuce</groupId>
<artifactId>lettuce-core</artifactId>
</exclusion>
</exclusions>
</dependency>
<!-- 引入 Jedis 的依赖,这样 Spring Boot 实现对 Jedis 的自动化配置 -->
<dependency>
<groupId>redis.clients</groupId>
<artifactId>jedis</artifactId>
</dependency>
<!-- 使用 fastjson 作为 JSON 序列化的工具 -->
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
<version>1.2.61</version>
</dependency>
<dependency>
<groupId>com.alibaba.otter</groupId>
<artifactId>canal.client</artifactId>
<version>1.1.0</version>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<version>1.18.20</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>

<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<scope>test</scope>
</dependency>
  1. 在 application.yaml 中添加配置信息
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
spring:
# 对应 RedisProperties 类
redis:
host: 127.0.0.1
port: 6379
password: root123 # Redis 服务器密码,默认为空。生产中,一定要设置 Redis 密码!
database: 0 # Redis 数据库号,默认为 0 。
timeout: 0 # Redis 连接超时时间,单位:毫秒。
# 对应 RedisProperties.Jedis 内部类
jedis:
pool:
max-active: 8 # 连接池最大连接数,默认为 8 。使用负数表示没有限制。
max-idle: 8 # 默认连接数最小空闲的连接数,默认为 8 。使用负数表示没有限制。
min-idle: 0 # 默认连接池最小空闲的连接数,默认为 0 。允许设置 0 和 正数。
max-wait: -1 # 连接池最大阻塞等待时间,单位:毫秒。默认为 -1 ,表示不限制。
# Kafka 配置项,对应 KafkaProperties 配置类
kafka:
bootstrap-servers: 127.0.0.1:9092 # 指定 Kafka Broker 地址,可以设置多个,以逗号分隔
# Kafka Producer 配置项
producer:
acks: 1 # 0-不应答。1-leader 应答。all-所有 leader 和 follower 应答。
retries: 3 # 发送失败时,重试发送的次数
key-serializer: org.apache.kafka.common.serialization.StringSerializer # 消息的 key 的序列化
value-serializer: org.apache.kafka.common.serialization.StringSerializer # 消息的 value 的序列化
batch-size: 65536 # 每次批量发送消息的最大数量
buffer-memory: 524288 # 每次批量发送消息的最大内存
# Kafka Consumer 配置项
consumer:
# 指定一个默认的组名
group-id: consumer-group1
#auto-offset-reset: earliest # 设置消费者分组最初的消费进度为 earliest 。可参考博客 https://blog.csdn.net/lishuangzhe7047/article/details/74530417 理解
key-deserializer: org.apache.kafka.common.serialization.StringDeserializer # 消息的 key 的反序列化
value-deserializer: org.apache.kafka.common.serialization.StringDeserializer # 消息的 value 的反序列化
# Kafka Consumer Listener 监听器配置
listener:
missing-topics-fatal: false # 消费监听接口监听的主题不存在时,默认会报错。所以通过设置为 false ,解决报错
  1. 封装一个操作 Redis 的工具类
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
package com.simba.canalkafkaredis.redis;

import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.stereotype.Component;

import javax.annotation.Resource;
import java.util.concurrent.TimeUnit;

@Component
public class RedisClient {

@Resource
private StringRedisTemplate stringRedisTemplate;

/**
* 设置redis的key-value
*/
public void setString(String key, String value) {
setString(key, value, null);
}

/**
* 设置redis的key-value,带过期时间
* 时间单位:秒
*/
public void setString(String key, String value, Long timeOut) {
stringRedisTemplate.opsForValue().set(key, value);
if (timeOut != null) {
stringRedisTemplate.expire(key, timeOut, TimeUnit.SECONDS);
}
}

/**
* 获取redis中key对应的值
*/
public String getString(String key) {
return stringRedisTemplate.opsForValue().get(key);
}

/**
* 删除redis中key对应的值
*/
public Boolean deleteKey(String key) {
return stringRedisTemplate.delete(key);
}

}
  1. 新建一个 model 对象来保存 Xdual 表的数据
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
package com.simba.canalkafkaredis.model;

import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;

@Data
@AllArgsConstructor
@NoArgsConstructor
public class Xdual {

private Integer ID;

private String X;

}

  1. 根据 Canal 发送给 Kafka 的 json 数据格式,创建一个 CanalBean 来接收数据
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
package com.simba.canalkafkaredis.canal;

import com.simba.canalkafkaredis.model.Xdual;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;

import java.util.List;

@Data
@AllArgsConstructor
@NoArgsConstructor
public class CanalBean {

private List<Xdual> data;

private String database;

private Long es;

private Integer id;

private Boolean isDdl;

private MysqlType mysqlType;

private String old;

private List<String> pkNames;

private String sql;

private SqlType sqlType;

private String table;

private Long ts; //(新增)INSERT、(更新)UPDATE、(删除)DELETE、(删除表)ERASE等等

private String type;

}

  1. 对应的 MysqlType 和 SqlType 类
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
package com.simba.canalkafkaredis.canal;

import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;

@Data
@AllArgsConstructor
@NoArgsConstructor
public class MysqlType {

private String ID;

private String X;

}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
package com.simba.canalkafkaredis.canal;

import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;

@Data
@AllArgsConstructor
@NoArgsConstructor
public class SqlType {

private Integer ID;

private Integer X;

}
  1. 创建消费者类来消费信息
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
package com.simba.canalkafkaredis.consumer;

import com.alibaba.fastjson.JSONObject;
import com.simba.canalkafkaredis.canal.CanalBean;
import com.simba.canalkafkaredis.model.Xdual;
import com.simba.canalkafkaredis.redis.RedisClient;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Component;

import javax.annotation.Resource;
import java.util.List;

@Slf4j
@Component
public class CanalConsumer {

private static final Long TIME_OUT = 600L;

@Resource
private RedisClient redisClient;

@KafkaListener(topics = "canaltopic")
public void receive(ConsumerRecord<?,?> consumer) {

String value = (String)consumer.value();
log.info("topic名称:{},key:{},分区位置:{},下标:{},value:{}", consumer.topic(), consumer.key(),consumer.partition(), consumer.offset(), value);
// 转换为 javabean
CanalBean canalBean = JSONObject.parseObject(value, CanalBean.class);
// 是否是 DDL 语句
Boolean isDdl = canalBean.getIsDdl();
// 获取语句类型
String type = canalBean.getType();
if (!isDdl) {
List<Xdual> items = canalBean.getData();
if ("INSERT".equals(type)) {
//新增语句
for (Xdual item : items) {
Integer id = item.getID();
//新增到redis中,过期时间是10分钟
redisClient.setString(String.valueOf(id), JSONObject.toJSONString(item), TIME_OUT);
log.info("插入---id号:{},值:{}", id, item);
}
} else if ("UPDATE".equals(type)) {
//更新语句
for (Xdual item : items) {
Integer id = item.getID();
//更新到redis中,过期时间是10分钟
redisClient.setString(String.valueOf(id), JSONObject.toJSONString(item), TIME_OUT);
log.info("更新---id号:{},值:{}", id, item);
}
} else if ("DELETE".equals(type)){
//删除语句
for (Xdual item : items) {
Integer id = item.getID();
//从redis中删除
redisClient.deleteKey(String.valueOf(id));
log.info("删除---id号:{},值:{}", id, item);
}
} else {
log.error("Unsupported type!");
}
}
}
}

启动项目并测试

  1. 添加数据
    1. SQL
1
2
mysql> insert into xdual value(null,now());
Query OK, 1 row affected (0.01 sec)
  1. 日志
1
2
2021-06-22 15:57:01.301  INFO 91336 --- [ntainer#0-0-C-1] c.s.c.consumer.CanalConsumer             : topic名称:canaltopic,key:null,分区位置:0,下标:10,value:{"data":[{"ID":"24","X":"2021-06-22 15:57:01"}],"database":"test","es":1624348621000,"id":12,"isDdl":false,"mysqlType":{"ID":"int(11)","X":"timestamp(0)"},"old":null,"pkNames":["ID"],"sql":"","sqlType":{"ID":4,"X":93},"table":"xdual","ts":1624348621294,"type":"INSERT"}
2021-06-22 15:57:01.307 INFO 91336 --- [ntainer#0-0-C-1] c.s.c.consumer.CanalConsumer : 插入---id号:24,值:Xdual(ID=24, X=2021-06-22 15:57:01)
  1. Redis
1
2
127.0.0.1:6379> get 24
"{\"iD\":24,\"x\":\"2021-06-22 15:57:01\"}"
  1. 修改数据
    1. SQL
1
2
3
mysql> update xdual set X=now() where id=24;
Query OK, 1 row affected (0.01 sec)
Rows matched: 1 Changed: 1 Warnings: 0
  1. 日志
1
2
3
2021-06-22 15:59:24.315  INFO 91336 --- [ntainer#0-0-C-1] c.s.c.consumer.CanalConsumer             : topic名称:canaltopic,key:null,分区位置:0,下标:11,value:{"data":[{"ID":"24","X":"2021-06-22 15:59:24"}],"database":"test","es":1624348764000,"id":13,"isDdl":false,"mysqlType":{"ID":"int(11)","X":"timestamp(0)"},"old":[{"X":"2021-06-22 15:57:01"}],"pkNames":["ID"],"sql":"","sqlType":{"ID":4,"X":93},"table":"xdual","ts":1624348764309,"type":"UPDATE"}
2021-06-22 15:59:24.332 INFO 91336 --- [ntainer#0-0-C-1] c.s.c.consumer.CanalConsumer : 更新---id号:24,值:Xdual(ID=24, X=2021-06-22 15:59:24)

  1. Redis
1
2
127.0.0.1:6379> get 24
"{\"iD\":24,\"x\":\"2021-06-22 15:59:24\"}"
  1. 删除数据
    1. SQL
1
2
mysql> delete from xdual where id=24;
Query OK, 1 row affected (0.01 sec)
  1. 日志
1
2
2021-06-22 16:01:48.637  INFO 91336 --- [ntainer#0-0-C-1] c.s.c.consumer.CanalConsumer             : topic名称:canaltopic,key:null,分区位置:0,下标:12,value:{"data":[{"ID":"24","X":"2021-06-22 15:59:24"}],"database":"test","es":1624348908000,"id":14,"isDdl":false,"mysqlType":{"ID":"int(11)","X":"timestamp(0)"},"old":null,"pkNames":["ID"],"sql":"","sqlType":{"ID":4,"X":93},"table":"xdual","ts":1624348908632,"type":"DELETE"}
2021-06-22 16:01:48.646 INFO 91336 --- [ntainer#0-0-C-1] c.s.c.consumer.CanalConsumer : 删除---id号:24,值:Xdual(ID=24, X=2021-06-22 15:59:24)
  1. Redis
1
2
127.0.0.1:6379> get 24
(nil)

总结

以上就是一个试验成功的简单 demo

参考

基于阿里巴巴Canal框架,TCP协议实现监听Mysql数据变化
详细讲解!Canal+Kafka实现MySQL与Redis数据同步!

Redis 实现

Redis 分布式锁的实现主要基于 SETNX key value 这个指令

  • 如果 key 不存在,就创建对应的键值对,创建成功,返回 1
  • 如果 key 存在,创建失败,返回 0
1
2
3
4
5
6
> setnx 1 xin
(integer) 1
> setnx 1 xin2
(integer) 0
> setnx 2 xin
(integer) 1

除此之外还需要解决键值对长期有效的问题,还需要为键值对设置一个过期时间,对应的 redis 指令为 EXPIRE key seconds,到期之后键值会自动删除

1
2
3
4
5
6
7
> get 1
"xin"
> expire 1 2
(integer) 1
// 2秒后
> get 1
(nil)

在 redis 2.6.12 以后的版本,可以用组合指令来实现上述的分布式锁
SET key value [EX seconds] NX

1
2
3
4
5
6
7
8
9
> set 1 xin ex 10 nx
OK
> get 1
"xin"
> ttl 1
(integer) 4
// 10秒后
> ttl 1
(integer) -2

ttl key 命令用于查看键值对的剩余过期时间

  • 对于设置了过期时间并且尚未过期的键值对,ttl 命令返回剩余过期时间;
  • 对于已过期不存在的键值对,ttl 命令返回 -2;
  • 对于没有过期时间的键值对,ttl 命令返回 -1;

Jedis 实现

在 Jedis 中可以实现一个 setNxAndExp 方法来实现分布式锁的功能

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
public Integer setNxAndExp(String key, String val, int expSecond) {
// 如果键值对创建成功,设置一个过期时间
if (jedisCluster.setNx(key, val) > 0) {
if (expSecond > 0) {
jedisCluster.expire(key, expSecond);
}
return 1;
} else {
// 如果创建失败,并且已有的键值对没有过期时间,为其设置一个过期时间,一段时间后释放锁
if (jedisCluster.ttl(key).longValue() == -1) {
jedisCluster.expire(key, expSecond);
}
}
// 创建成功返回 1,创建失败返回 0,与 setNx 返回值相同
return 0;
}

参考

redis 基于SETNX和EXPIRE的用法 实现redis 分布式锁

只要提到 Maven 就离不开 pom.xml,POM 全称为 Project Object Model,这个文件在项目中常常是被用来配置各种需要用到的依赖,也可以配置不同环境下项目的参数信息。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
<project xmlns = "http://maven.apache.org/POM/4.0.0"
xmlns:xsi = "http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation = "http://maven.apache.org/POM/4.0.0
http://maven.apache.org/xsd/maven-4.0.0.xsd">

<!-- 模型版本 -->
<modelVersion>4.0.0</modelVersion>
<!-- 公司或者组织的唯一标志,并且配置时生成的路径也是由此生成, 如com.companyname.project-group,maven会将该项目打成的jar包放本地路径:/com/companyname/project-group -->
<groupId>com.companyname.project-group</groupId>

<!-- 项目的唯一ID,一个groupId下面可能多个项目,就是靠artifactId来区分的 -->
<artifactId>project</artifactId>

<!-- 版本号 -->
<version>1.0</version>
</project>

基本元素

  • project 是 pom.xml 文件描述符的根
  • modelVersion 指定 pom.xml 符合哪个版本的描述符,maven 2 和 3 只能为 4.0.0
  • parent 用来表示当前 pom 文件继承自的父类 pom.xml 文件

依赖配置

groupIdartifactIdversion 与基本配置中的意义相同
maven 根据 groupId、artifactId 和 version 组成的 groupId:artifactId:version 来唯一识别一个 jar 包
其中,version 表示一个项目的特定版本,在使用 maven 进行版本管理的时候有几个特殊的关键字:
snapshot 快照,alpha 内部测试,beta 公测,release 稳定,GA 正式发布
Scope 有 5 种可用的限定范围:

  • compile: maven 的默认范围,要求编译、测试、运行阶段都需要模块对应的 jar 包在 classpath 中
  • provided: 模块对应的 jar 包在编译、测试的 classpath 中
  • runtime: 模块对应的 jar 包在测试、运行时的 classpath 中
  • test: 模块对应的 jar 包在测试的 classpath 中
  • system: 模块对应的 jar 包在编译、测试的 classpath 中,与本机系统关联,可移植性差

继承

引入 parent 标签可以让当前 POM 继承自一个父 POM,父 POM 中包含一些公共依赖并统一管理这些依赖。如果需要改变公共依赖的版本信息,只需要修改父 POM 包,继承了父包的子 POM 都能同步该更新。

参考

Maven 教程之 pom.xml 详解

本文展示了在 Spring Boot 项目中 AOP 的简单实现

Pom 依赖

为了实现 AOP,需要在项目中引入 aspectjweaveraspectjrt 两个依赖,其中 aspectjweaver 是 aspectj 的织入包, aspectjrt 是 aspectj 的运行时包

1
2
3
4
5
6
7
8
9
10
<!-- AOP 依赖 -->
<dependency>
<groupId>org.aspectj</groupId>
<artifactId>aspectjweaver</artifactId>
</dependency>

<dependency>
<groupId>org.aspectj</groupId>
<artifactId>aspectjrt</artifactId>
</dependency>

AOP 层类

在类上添加 @Aspect 注释表明这是一个 AOP 类,@Component 注释表明这个类可以作为一个 JavaBean 被注入到 Spring 容器中。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
package com.ziqian.yudao.aop;

import org.aspectj.lang.ProceedingJoinPoint;
import org.aspectj.lang.annotation.*;
import org.springframework.stereotype.Component;

@Aspect
@Component
public class LoginAspect {

@Pointcut("execution( * com.ziqian.yudao.controller.*.*(..)) && @annotation(Login)")
public void cut(){}

@Before("cut()")
public void before() {
System.out.println("已经记录下操作日志@Before 方法执行前");
}

@Around("cut()")
public Object around(ProceedingJoinPoint proceedingJoinPoint) throws Throwable{
System.out.println("已经记录下操作日志@Around 方法执行前");
Object object = proceedingJoinPoint.proceed();
System.out.println("已经记录下操作日志@Around 方法执行后");
return object;
}

@After("cut()")
public void after() {
System.out.println("已经记录下操作日志@After 方法执行后");
}

}

切入 AOP 代码需要一个切入点 PointCut,代码中 @Pointcut("execution( * com.ziqian.yudao.controller.*.*(..)) && @annotation(Login)") 声明的切入点是 com.ziqian.yudao.controller 下面的类中的所有带有 @Login 注释的方法。也就是 AOP 代码会被插入到满足上述条件的方法中。引入注解作为触发 AOP 的条件后可以更方便地在需要的地方切入代码,声明注解的方法如下:

1
2
3
4
5
6
7
8
9
10
11
12
package com.ziqian.yudao.aop;

import java.lang.annotation.ElementType;
import java.lang.annotation.Retention;
import java.lang.annotation.RetentionPolicy;
import java.lang.annotation.Target;

@Retention(RetentionPolicy.RUNTIME)
@Target({ElementType.METHOD})
public @interface Login {
boolean loginRequired() default true;
}

在切入代码时我们也需要控制代码插入的位置,为此 Spring 提供了 @Before@After@Around 三个注解来帮助控制代码插入的大致位置。其中:

  • @Before 在所拦截方法执行前执行一段逻辑
  • @After 在所拦截方法执行后执行一段逻辑
  • @Around 可以同时在所拦截方法前后执行一段逻辑
    同样是在拦截方法的前后,三个注解也存在优先关系。为了搞清楚三者的先后关系,实现一个 http 接口进行测试
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
package com.ziqian.yudao.controller;

import com.ziqian.yudao.aop.Login;
import com.ziqian.yudao.service.UserService;
import com.ziqian.yudao.vo.UserVO;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.*;

import java.util.ArrayList;
import java.util.List;

@RestController
@RequestMapping("/users")
public class UserController {

@Autowired
UserService userService;

/**
* 查询用户列表
*
* @return 用户列表
*/
@Login
@GetMapping("/list") // URL 修改成 /list
public List<UserVO> list() {
// 查询列表
UserVO user1 = new UserVO(); user1.setId(1L); user1.setUserName("yudaoyuanma");
UserVO user2 = new UserVO(); user2.setId(2L); user2.setUserName("woshiyutou");
UserVO user3 = new UserVO(); user3.setId(3L); user3.setUserName("chifanshuijiao");
List<UserVO> result = new ArrayList<>();
result.add(user1);
result.add(user2);
result.add(user3);
// 返回列表
return result;
}

}

最后命令行的打印结果为:

已经记录下操作日志@Around 方法执行前
已经记录下操作日志@Before 方法执行前
已经记录下操作日志@After 方法执行后
已经记录下操作日志@Around 方法执行后

可见 @Around 中的逻辑在最外层

需要注意的是,@Around 注解下的 around() 方法中,proceedingJoinPoint.proceed(); 相当于切入的 list() 方法的方法体。如果 around() 的返回值为空,list() 对应的 /list 接口的返回值也为空。所以一定要为 around() 方法设定一个返回值。其中 Object object = proceedingJoinPoint.proceed(); 中的 obejct 对象就是原接口返回的值。

参考

spring AOP @Around @Before @After 区别
SpringBoot—集成AOP详解(面向切面编程Aspect)

三种注解的区别和使用场景

@RequestParam

用来处理 Content-Type 为 application/x-www-form-urlencoded 编码的内容
GET 和 POST 请求的参数会自动赋值给 @RequestParam 注解的变量
例如,@RequestParam(value=“username”) String userName 会直接将请求中名称为 username 变量的值赋值给 userName 参数
如果不加该注解,则必须保证请求中的参数名与函数的参数名一致,即 函数中的参数要是 String username

@RequestBody

一般用来处理 HttpEntity 传来的 Content-Type 非 application/x-www-form-urlencoded 编码的数据

GET 请求没有 HttpEntity,所以 @RequestBody 不适用于 GET 请求
POST 请求通过 HttpEntity 传输的参数,要在请求中声明 Content-Type 类型,Spring 会解析 HttpEntity 中的数据,并绑定到相应的 bean 上

@ModelAttribute

一般用于 GET 请求,将参数和 Model 对象参数绑定,变量名要和对象的属性值相对应


前端使用 GET 或 POST 方式提交数据时,数据编码格式由请求头的 Content-Type 指定,分为以下几种情况:

  1. application/x-www-form-urlencoded,这种情况的数据@RequestParam、@ModelAttribute可以处理,@RequestBody也可以处理。
  2. multipart/form-data,@RequestBody不能处理这种格式的数据。(form表单里面有文件上传时,必须要指定enctype属性值为multipart/form-data,意思是以二进制流的形式传输文件。)
  3. application/json、application/xml等格式的数据,必须使用@RequestBody来处理。

参考

@RequestParam、@RequestBody和@ModelAttribute区别

如果不在 Service 层进行判断而是直接使用 INSERT 语句添加时

唯一索引

查看当前表已有索引

1
SHOW INDEX FROM `table`

添加唯一索引

1
ALTER TABLE `table` ADD UNIQUE(`column`)

INSERT 语句

加入 ignore 字段

1
INSERT IGNORE INTO table(value1, value2) values('value1', 'value2');

Excel 文件导入导出的相关操作

引入 poi 相关依赖

1
2
3
4
5
6
7
8
9
10
11
<dependency>
<groupId>org.apache.poi</groupId>
<artifactId>poi</artifactId>
<version>3.16</version>
</dependency>

<dependency>
<groupId>org.apache.poi</groupId>
<artifactId>poi-ooxml</artifactId>
<version>3.16</version>
</dependency>

解析 Excel

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
import org.apache.poi.xssf.usermodel.*;

@CrossOrigin
@RequestMapping(value = {"/parseExcel"}, method = RequestMethod.POST, produces = {"application/json;charset=UTF-8"})
@ResponseBody
public Result<Boolean> parseExcel(@RequestParam("file") MultipartFile file) {
// @RequestParam("file")MultipartFile file 用来接受前端传过来的 Excel 文件
// 创建 Workbook 对象,读取整个文档
InputStream inputStream = file.getInputStream();
XSSFWorkbook xssfWorkbook = new XSSFWorkbook(inputStream);
// 读取页脚 sheet
XSSFSheet xssfSheet = xssfWorkbook.getSheetAt(0);
// 循环取每行的数据
// rowIndex 为 1 是为了跳过标题行
for (int rowIndex = 1; rowIndex < xssfSheet.getPhysicalNumberOfRows(); rowIndex++) {
XSSFRow xssfRow = xssfSheet.getRow(rowIndex);
if (xssfRow == null) {
continue;
}
// 获取单元格数据
Long id = ExcelUtils.getLong(xssfRow.getCell(0));
String str = ExcelUtils.getString(xssfRow.getCell(1)));
Integer num = ExcelUtils.getInteger(xssfRow.getCell(2));
Boolean isFalse = ExcelUtils.getBoolean(xssfRow.getCell(3));
}
}

导出 Excel

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
import org.apache.poi.xssf.usermodel.*;

@ApiOperation(value = "exportExcel")
@RequestMapping(value = {"/exportExcel"}, method = RequestMethod.GET)
@ResponseBody
public Result<Boolean> exportExcel(HttpServletResponse response) {
// 设置要导出的文件的名字
String fileName = "data" + new Date() + ".xlsx";
try {
XSSFWorkbook workbook = new XSSFWorkbook();
XSSFSheet sheet = workbook.createSheet("信息表");
// headers表示excel表中第一行的表头 在excel表中添加表头
String[] headers = { "id", "名称", "类型"};
XSSFRow head = sheet.createRow(0);
for(int i = 0; i < headers.length; i++){
XSSFCell cell = head.createCell(i);
XSSFRichTextString text = new XSSFRichTextString(headers[i]);
cell.setCellValue(text);
}
// 新增数据行,并且设置单元格数据
XSSFRow row =sheet.createRow(1);
row.createCell(0).setCellValue(id);
row.createCell(1).setCellValue(name);
row.createCell(2).setCellValue(type);
response.setContentType("application/octet-stream");
response.setHeader("Content-disposition", "attachment;filename=" + fileName);
response.flushBuffer();
workbook.write(response.getOutputStream());
}

单元格数据类型转换

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
import org.apache.poi.xssf.usermodel.*;

// 把单元格值转为 String
public static String getString(XSSFCell xssfCell) {
if (xssfCell == null) {
return "";
}
if (xssfCell.getCellTypeEnum() == CellType.NUMERIC) {
return String.valueOf(xssfCell.getNumericCellValue());
} else if (xssfCell.getCellTypeEnum() == CellType.BOOLEAN) {
return String.valueOf(xssfCell.getBooleanCellValue());
} else {
return xssfCell.getStringCellValue();
}
}

// 切割是为了去除小数点
public static Long getLong(XSSFCell xssfCell) {
String s = getString(xssfCell);
return Long.valueOf(s.substring(0, s.length() - 2));
}

public static Integer getInteger(XSSFCell xssfCell) {
String s = getString(xssfCell);
return Integer.valueOf(s.substring(0, s.length() - 2));
}

public static Boolean getBoolean(XSSFCell xssfCell) {
String s = getString(xssfCell);
return Boolean.valueOf(s.substring(0, s.length() - 2));
}

近期做了一个项目迁移工作,用到了 Guava cache 实现缓存功能

Guava cache 是 Google Guava 下的一个缓存模块
缓存分为本地缓存和远端缓存。Guava cache 则属于本地缓存,数据存储在 JVM 内存中

数据操作

一般在业务中操作数据时,都会操作缓存和数据源 (DB) 两部分

  • put 操作,先插入数据,再删除缓存;
  • get 操作,先查缓存,命中则返回,没有命中则查库,然后把结果放入缓存

Guava cache 的优势

  • 封装 get / put 操作,能集成数据源;
  • 实现了线程安全 (具体实现:与 CocurrentHashMap 类似,但是添加更多元素失效策略);
  • Guava Cache提供了三种基本的缓存回收方式:基于容量回收、定时回收和基于引用回收。定时回收有两种:按照写入时间,最早写入的最先回收;按照访问时间,最早访问的最早回收;
  • 监控缓存的 加载 / 命中 情况

和 Redis 相比

  • 因为应用和缓存在同一个进程内,请求缓存速度更加迅速
  • 单线程下 Guava cache 优于 Redis;并发下 Redis 读取时间下降较明显,Guava cache 影响不大
    过期清理机制
    项目达到过期时间后不会马上清除,会在写操作和读操作时顺带清理

Java 实例

  1. 在 pom.xml 中添加以下内容,引入 jar 包
1
2
3
4
5
<dependency>
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>
<version>19.0</version>
</dependency>
  1. 在代码中根据需求创建缓存对象
1
2
3
4
5
6
7
8
9
10
11
12
/**
* 缓存
*/
private LoadingCache<String, Map<String, String> cache = CacheBuilder
.newBuilder()
.refreshAfterWrite(EXPIRE_TIME_MINUTES, TimeUnit.MINUTES)
.build(new CacheLoader<String, Map<String, String>>() {
@Override
public Map<String, String> load(String key) {
return getFromDb();
}
});
  1. 获取缓存的方法

1
cache.getUnchecked("");

加载机制

CacheLoader

从 LoadingCache 查询缓存的正规方法是 get(key) 方法,要么返回已经缓存的值,要么使用 CacheLoader 向缓存原子地添加新值

Callable

如果没有合理的默认方法来加载或计算与键关联的值,或者想要覆盖默认的加载运算,同时保留“获取缓存-如果没有-则计算”[get-if-absent-compute]的原子语义。
所有类型的Guava Cache,不管有没有自动加载功能,都支持get(K, Callable)方法。这个方法实现的是,如果有缓存,返回相应的值;如果缓存不存在,用给定的 Callable 运算把结果加入到缓存

Cache.put

cache.put(key, value) 可以直接向缓存中插入值,会覆盖掉 key 之前映射的 value

参考

Guava cache 使用总结
Guava cache 和 Redis 性能对比
Google Guava Cache 全解析

Java虚拟机


一、运行时数据区域


JVM执行Java程序时会把内存分为多个数据区域,其中:

堆、方法区(包括运行时常量池)和直接内存被线程共享

每个线程有自己的程序计数器、本地方法栈和虚拟机栈

阅读全文 »