跳到主要内容

高级

分布式缓存

基于 Redis 集群解决单机 Redis 存在的问题

单机 Redis 存在的四大问题

20240821223738

Redis持久化

Redis 持久化有两种方案:

  • RDB 持久化
  • AOF 持久化

RDB持久化

RDB 全称 Redis Database Backup file(Redis数据备份文件),也被叫做 Redis 数据快照。简单来说就是把内存中的所有数据都记录到磁盘中。当 Redis 实例故障重启后,从磁盘读取快照文件,恢复数据。快照文件称为 RDB 文件,默认是保存在当前运行目录

执行时机

RDB 持久化在四种情况下会执行:

  • 执行 save 命令
  • 执行 bgsave 命令
  • Redis 停机时
  • 触发 RDB 条件时

save命令

执行下面的命令,可以立即执行一次 RDB:

20240821223757

save 命令会导致主进程执行 RDB,这个过程中其它所有命令都会被阻塞。只有在数据迁移时可能用到

bgsave命令

下面的命令可以异步执行RDB:

20240821223809

这个命令执行后会开启独立进程完成RDB,主进程可以持续处理用户请求,不受影响

停机时

Redis停机时会执行一次save命令,实现RDB持久化

触发RDB条件

Redis内部有触发RDB的机制,可以在redis.conf文件中找到,格式如下:

# 900秒内,如果至少有1个key被修改,则执行bgsave , 如果是save "" 则表示禁用RDB
save 900 1
save 300 10
save 60 10000

RDB的其它配置也可以在redis.conf文件中设置:

# 是否压缩 ,建议不开启,压缩也会消耗cpu,磁盘的话不值钱
rdbcompression yes

# RDB文件名称
dbfilename dump.rdb

# 文件保存的路径目录
dir ./
RDB原理

bgsave 开始时会 fork 主进程得到子进程,子进程共享主进程的内存数据。完成 fork 后读取内存数据并写入 RDB 文件 fork采用的是 copy-on-write 技术:

  • 当主进程执行读操作时,访问共享内存
  • 当主进程执行写操作时,则会拷贝一份数据,执行写操作

20240821223831

总结

RDB 方式 bgsave 的基本流程?

  • fork 主进程得到一个子进程,共享内存空间
  • 子进程读取内存数据并写入新的 RDB 文件
  • 用新 RDB 文件替换旧的 RDB 文件

RDB 会在什么时候执行?save 60 1000代表什么含义?

  • 默认是服务停止时
  • 代表 60 秒内至少执行 1000 次修改则触发 RDB

RDB 的缺点?

  • RDB 执行间隔时间长,两次 RDB 之间写入数据有丢失的风险
  • fork 子进程、压缩、写出 RDB 文件都比较耗时

AOF持久化

原理

AOF全称为Append Only File(追加文件)。Redis处理的每一个写命令都会记录在AOF文件,可以看做是命令日志文件

20240821223850

配置

AOF默认是关闭的,需要修改redis.conf配置文件来开启AOF:

# 是否开启AOF功能,默认是no
appendonly yes
# AOF文件的名称
appendfilename "appendonly.aof"

AOF的命令记录的频率也可以通过redis.conf文件来配:

# 表示每执行一次写命令,立即记录到AOF文件
appendfsync always
# 写命令执行完先放入AOF缓冲区,然后表示每隔1秒将缓冲区数据写到AOF文件,是默认方案
appendfsync everysec
# 写命令执行完先放入AOF缓冲区,由操作系统决定何时将缓冲区内容写回磁盘
appendfsync no

三种策略对比:

20240821223906

文件重写

因为是记录命令,AOF 文件会比 RDB 文件大的多。而且 AOF 会记录对同一个 key 的多次写操作,但只有最后一次写操作才有意义。通过执行 bgrewriteaof 命令,可以让 AOF文件 执行重写功能,用最少的命令达到相同效果

20240821223915

如图,AOF 本来有三个命令,但是set num 123set num 666都是对 num 的操作,第二次会覆盖第一次的值,因此第一个命令记录下来没有意义

所以重写命令后,AOF 文件内容就是:mset name jack num 666

Redis也会在触发阈值时自动去重写AOF文件。阈值也可以在redis.conf中配置:

# AOF文件比上次文件 增长超过多少百分比则触发重写
auto-aof-rewrite-percentage 100
# AOF文件体积最小多大以上才触发重写
auto-aof-rewrite-min-size 64mb

RDB和AOF对比

RDB和AOF各有自己的优缺点,如果对数据安全性要求较高,在实际开发中往往会结合两者来使用

20240821223941

Redis主从集群

搭建主从架构

单节点Redis的并发能力是有上限的,要进一步提高Redis的并发能力,就需要搭建主从集群,实现读写分离

集群结构

20240821223956

共包含三个节点,一个主节点,两个从节点

这里在同一台虚拟机中开启3个redis实例,模拟主从集群,信息如下:

IPPORT角色
192.168.150.1017001master
192.168.150.1017002slave
192.168.150.1017003slave
准备实例和配置

要在同一台虚拟机开启3个实例,必须准备三份不同的配置文件和目录,配置文件所在目录也就是工作目录

1、创建目录

我们创建三个文件夹,名字分别叫7001、7002、7003:

# 进入/tmp目录
cd /tmp
# 创建目录
mkdir 7001 7002 7003

20240821224022

2、恢复原始配置

修改redis-6.2.4/redis.conf文件,将其中的持久化模式改为默认的 RDB 模式,AOF 保持关闭状态

# 开启RDB
# save ""
save 3600 1
save 300 100
save 60 10000

# 关闭AOF
appendonly no

3、拷贝配置文件到每个实例目录

然后将redis-6.2.4/redis.conf文件拷贝到三个目录中(在/tmp目录执行下列命令):

# 方式一:逐个拷贝
cp redis-6.2.4/redis.conf 7001
cp redis-6.2.4/redis.conf 7002
cp redis-6.2.4/redis.conf 7003

# 方式二:管道组合命令,一键拷贝
echo 7001 7002 7003 | xargs -t -n 1 cp redis-6.2.4/redis.conf

4、修改每个实例的端口、工作目录

修改每个文件夹内的配置文件,将端口分别修改为7001、7002、7003,将rdb文件保存位置都修改为自己所在目录(在/tmp目录执行下列命令):

sed -i -e 's/6379/7001/g' -e 's/dir .\//dir \/tmp\/7001\//g' 7001/redis.conf
sed -i -e 's/6379/7002/g' -e 's/dir .\//dir \/tmp\/7002\//g' 7002/redis.conf
sed -i -e 's/6379/7003/g' -e 's/dir .\//dir \/tmp\/7003\//g' 7003/redis.conf

5、修改每个实例的声明IP

虚拟机本身有多个IP,为了避免将来混乱,我们需要在redis.conf文件中指定每一个实例的绑定ip信息,格式如下:

# redis实例的声明 IP
replica-announce-ip 192.168.150.101

每个目录都要改,我们一键完成修改(在/tmp目录执行下列命令):

# 逐一执行
sed -i '1a replica-announce-ip 192.168.150.101' 7001/redis.conf
sed -i '1a replica-announce-ip 192.168.150.101' 7002/redis.conf
sed -i '1a replica-announce-ip 192.168.150.101' 7003/redis.conf

# 或者一键修改
printf '%s\n' 7001 7002 7003 | xargs -I{} -t sed -i '1a replica-announce-ip 192.168.150.101' {}/redis.conf
启动

为了方便查看日志,我们打开3个ssh窗口,分别启动3个redis实例,启动命令:

# 第1个
redis-server 7001/redis.conf
# 第2个
redis-server 7002/redis.conf
# 第3个
redis-server 7003/redis.conf

启动后:

20240821224225

如果要一键停止,可以运行下面命令:

printf '%s\n' 7001 7002 7003 | xargs -I{} -t redis-cli -p {} shutdown
开启主从关系

现在三个实例还没有任何关系,要配置主从可以使用 replicaof 或者 slaveof(5.0以前)命令

有临时和永久两种模式:

  • 修改配置文件(永久生效)
    • redis.conf中添加一行配置:slaveof <masterip> <masterport>
  • 使用 redis-cli 客户端连接到redis服务,执行 slaveof 命令(重启后失效):
    • slaveof <masterip> <masterport>

**注意:**在 5.0 以后新增命令 replicaof,与 salveof 效果一致

为了演示方便,使用方式二

通过 redis-cli 命令连接 7002,执行下面命令:

# 连接 7002
redis-cli -p 7002
# 执行slaveof
slaveof 192.168.150.101 7001

通过redis-cli命令连接7003,执行下面命令:

# 连接 7003
redis-cli -p 7003
# 执行slaveof
slaveof 192.168.150.101 7001

然后连接 7001节点,查看集群状态:

# 连接 7001
redis-cli -p 7001
# 查看状态
info replication

结果:

20240821224252

测试

执行下列操作以测试:

  • 利用 redis-cli 连接7001,执行 set num 123
  • 利用 redis-cli 连接7002,执行 get num,再执行 set num 666
  • 利用 redis-cli 连接7003,执行 get num,再执行 set num 888

可以发现,只有在 7001 这个 master 节点上可以执行写操作,7002 和 7003 这两个 slave 节点只能执行读操作

主从数据同步原理

全量同步

主从第一次建立连接时,会执行全量同步,将 master 节点的所有数据都拷贝给 slave 节点,流程:

20240821224321

这里有一个问题,master 如何得知 salve 是第一次来连接呢?

有几个概念,可以作为判断依据:

  • Replication Id:简称 replid,是数据集的标记,id 一致则说明是同一数据集。每一个 master 都有唯一的 replid,slave 则会继承 master 节点的 replid
  • offset:偏移量,随着记录在 repl_baklog 中的数据增多而逐渐增大。slave 完成同步时也会记录当前同步的 offset。如果slave的 offset 小于 master 的 offset,说明 slave 数据落后于 master,需要更新。

因此 slave 做数据同步,必须向 master 声明自己的 replication id 和 offset,master 才可以判断到底需要同步哪些数据

因为 slave 原本也是一个 master,有自己的 replid 和 offset,当第一次变成 slave,与 master 建立连接时,发送的 replid 和 offset 是自己的 replid 和 offset

master 判断发现 slave 发送来的 replid 与自己的不一致,说明这是一个全新的 slave,就知道要做全量同步了

maste r会将自己的 replid 和 offset 都发送给这个 slave,slave 保存这些信息。以后 slave 的 replid 就与 master 一致了 因此,master 判断一个节点是否是第一次同步的依据,就是看 replid 是否一致

如图:

20240821224340

完整流程描述:

  • slave 节点请求增量同步
  • master 节点判断 replid,发现不一致,拒绝增量同步
  • master 将完整内存数据生成 RDB,发送 RDB 到 slave
  • slave 清空本地数据,加载 master 的 RDB
  • master 将 RDB 期间的命令记录在 repl_baklog,并持续将 log 中的命令发送给 slave
  • slave 执行接收到的命令,保持与 master 之间的同步
增量同步

全量同步需要先做 RDB,然后将 RDB 文件通过网络传输个 slave,成本太高了。因此除了第一次做全量同步,其它大多数时候 slave 与 master 都是做增量同步

什么是增量同步?就是只更新 slave 与 master 存在差异的部分数据。如图:

20240821224707

repl_backlog原理

master 怎么知道 slave 与自己的数据差异在哪里呢?

这就要说到全量同步时的 repl_baklog 文件了

这个文件是一个固定大小的数组,只不过数组是环形,也就是说角标到达数组末尾后,会再次从 0 开始读写,这样数组头部的数据就会被覆盖

repl_baklog 中会记录 Redis 处理过的命令日志及 offset,包括 master 当前的 offset,和 slave 已经拷贝到的 offset:

20240821224733

slave 与 master 的 offset 之间的差异,就是 salve 需要增量拷贝的数据了 随着不断有数据写入,master 的 offset 逐渐变大,slave 也不断的拷贝,追赶 master 的 offset:

20240821224744

直到数组被填满:

20240821224752

此时,如果有新的数据写入,就会覆盖数组中的旧数据。不过,旧的数据只要是绿色的,说明是已经被同步到 slave 的数据,即便被覆盖了也没什么影响。因为未同步的仅仅是红色部分

但是,如果 slave 出现网络阻塞,导致 master 的 offset 远远超过了 slave 的 offset:

20240821224803

如果 master 继续写入新数据,其 offset 就会覆盖旧的数据,直到将 slave 现在的 offset 也覆盖:

20240821224817

棕色框中的红色部分,就是尚未同步,但是却已经被覆盖的数据。此时如果 slave 恢复,需要同步,却发现自己的 offset 都没有了,无法完成增量同步了。只能做全量同步

20240821224826

主从同步优化

主从同步可以保证主从数据的一致性,非常重要

可以从以下几个方面来优化 Redis 主从就集群:

  • 在 master 中配置repl-diskless-sync yes启用无磁盘复制,避免全量同步时的磁盘 IO。
  • Redis 单节点上的内存占用不要太大,减少 RDB 导致的过多磁盘IO
  • 适当提高 repl_baklog 的大小,发现 slave 宕机时尽快实现故障恢复,尽可能避免全量同步
  • 限制一个 master 上的 slave 节点数量,如果实在是太多 slave,则可以采用主-从-从链式结构,减少 master 压力

主从从架构图:

20240821224840

总结

简述全量同步和增量同步区别?

  • 全量同步:master 将完整内存数据生成 RDB,发送 RDB 到 slave。后续命令则记录在 repl_baklog,逐个发送给 slave。
  • 增量同步:slave 提交自己的 offset 到 master,master 获取 repl_baklog 中从 offset 之后的命令给 slave

什么时候执行全量同步?

  • slave 节点第一次连接 master 节点时
  • slave 节点断开时间太久,repl_baklog 中的 offset 已经被覆盖时

什么时候执行增量同步?

  • slave 节点断开又恢复,并且在 repl_baklog 中能找到 offset 时

Redis哨兵

Redis提供了哨兵(Sentinel)机制来实现主从集群的自动故障恢复

哨兵原理

集群结构和作用

哨兵的结构如图:

20240821224900

哨兵的作用如下:

  • 监控:Sentinel 会不断检查您的 master 和 slave 是否按预期工作
  • 自动故障恢复:如果 maste r故障,Sentinel 会将一个 slave 提升为 master。当故障实例恢复后也以新的master为主
  • 通知:Sentinel 充当 Redis 客户端的服务发现来源,当集群发生故障转移时,会将最新信息推送给Redis的客户端
集群监控原理

Sentinel 基于心跳机制监测服务状态,每隔 1 秒向集群的每个实例发送 ping 命令:

  • 主观下线:如果某 sentinel 节点发现某实例未在规定时间响应,则认为该实例主观下线
  • 客观下线:若超过指定数量(quorum)的 sentinel 都认为该实例主观下线,则该实例客观下线。 quorum 值最好超过 Sentinel 实例数量的一半

20240821224916

集群故障恢复原理

一旦发现 master 故障,sentinel 需要在 salve 中选择一个作为新的 master,选择依据是这样的:

  • 首先会判断 slave 节点与 master 节点断开时间长短,如果超过指定值(down-after-milliseconds * 10)则会排除该 slave 节点
  • 然后判断 slave 节点的 slave-priority 值,越小优先级越高,如果是 0 则永不参与选举
  • 如果 slave-prority 一样,则判断 slave 节点的 offset 值,越大说明数据越新,优先级越高
  • 最后是判断 slave 节点的运行 id 大小,越小优先级越高

当选出一个新的 maste r后,该如何实现切换呢? 流程如下:

  • sentinel给备选的 slave1 节点发送 slaveof no one 命令,让该节点成为 master
  • sentinel 给所有其它 slave 发送 slaveof 192.168.150.101 7002 命令,让这些 slave 成为新 master 的从节点,开始从新的 master上同步数据。
  • 最后,sentinel 将故障节点标记为 slave,当故障节点恢复后会自动成为新的 master 的 slave 节点

20240821224944

总结

Sentinel 的三个作用是什么?

  • 监控
  • 故障转移
  • 通知

Sentinel 如何判断一个 redis 实例是否健康?

  • 每隔 1 秒发送一次 ping 命令,如果超过一定时间没有相向则认为是主观下线
  • 如果大多数 sentinel 都认为实例主观下线,则判定服务下线

故障转移步骤有哪些?

  • 首先选定一个 slave 作为新的master,执行 slaveof no one
  • 然后让所有节点都执行 slaveof 新 master
  • 修改故障节点配置,添加 slaveof 新 master

搭建哨兵集群

集群结构

这里我们搭建一个三节点形成的Sentinel集群,来监管之前的Redis主从集群。如图:

20240821224958

三个sentinel实例信息如下:

节点IPPORT
s1192.168.150.10127001
s2192.168.150.10127002
s3192.168.150.10127003
准备实例和配置

要在同一台虚拟机开启3个实例,必须准备三份不同的配置文件和目录,配置文件所在目录也就是工作目录。 我们创建三个文件夹,名字分别叫s1、s2、s3:

# 进入/tmp目录
cd /tmp
# 创建目录
mkdir s1 s2 s3

如图:

20240821225012

然后我们在s1目录创建一个sentinel.conf文件,添加下面的内容:

port 27001
sentinel announce-ip 192.168.150.101
sentinel monitor mymaster 192.168.150.101 7001 2
sentinel down-after-milliseconds mymaster 5000
sentinel failover-timeout mymaster 60000
dir "/tmp/s1"

解读:

  • port 27001:是当前sentinel实例的端口
  • sentinel monitor mymaster 192.168.150.101 7001 2:指定主节点信息
    • mymaster:主节点名称,自定义,任意写
    • 192.168.150.101 7001:主节点的ip和端口
    • 2:选举master时的quorum值

然后将s1/sentinel.conf文件拷贝到s2、s3两个目录中(在/tmp目录执行下列命令):

# 方式一:逐个拷贝
cp s1/sentinel.conf s2
cp s1/sentinel.conf s3
# 方式二:管道组合命令,一键拷贝
echo s2 s3 | xargs -t -n 1 cp s1/sentinel.conf

修改s2、s3两个文件夹内的配置文件,将端口分别修改为27002、27003:

sed -i -e 's/27001/27002/g' -e 's/s1/s2/g' s2/sentinel.conf
sed -i -e 's/27001/27003/g' -e 's/s1/s3/g' s3/sentinel.conf
启动

为了方便查看日志,我们打开3个ssh窗口,分别启动3个redis实例,启动命令:

# 第1个
redis-sentinel s1/sentinel.conf
# 第2个
redis-sentinel s2/sentinel.conf
# 第3个
redis-sentinel s3/sentinel.conf

启动后:

20240821225059

测试

尝试让master节点7001宕机,查看sentinel日志:

20240821225130

查看7003的日志:

20240821225140

查看7002的日志:

20240821225214

RedisTemplate

在 Sentinel 集群监管下的 Redis 主从集群,其节点会因为自动故障转移而发生变化,Redis 的客户端必须感知这种变化,及时更新连接信息。Spring 的 RedisTemplate 底层利用 lettuce 实现了节点的感知和自动切换

下面,我们通过一个测试来实现 RedisTemplate 集成哨兵机制

导入依赖

在项目的 pom 文件中引入依赖

<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-redis</artifactId>
</dependency>
配置Redis地址

然后在配置文件 application.yml 中指定 redis 的 sentinel 相关信息:

spring:
redis:
sentinel:
master: mymaster
nodes:
- 192.168.150.101:27001
- 192.168.150.101:27002
- 192.168.150.101:27003
配置读写分离

在项目的启动类中,添加一个新的bean:

@Bean
public LettuceClientConfigurationBuilderCustomizer clientConfigurationBuilderCustomizer(){
return clientConfigurationBuilder -> clientConfigurationBuilder.readFrom(ReadFrom.REPLICA_PREFERRED);
}

这个bean中配置的就是读写策略,包括四种:

  • MASTER:从主节点读取
  • MASTER_PREFERRED:优先从 master 节点读取,master 不可用才读取replica
  • REPLICA:从 slave(replica)节点读取
  • REPLICA _PREFERRED:优先从 slave(replica)节点读取,所有的 slave 都不可用才读取 master

Redis分片集群

搭建分片集群

集群结构

主从和哨兵可以解决高可用、高并发读的问题。但是依然有两个问题没有解决:

  • 海量数据存储问题
  • 高并发写的问题

使用分片集群可以解决上述问题,如图:

20240821225246

分片集群特征:

  • 集群中有多个master,每个master保存不同数据
  • 每个master都可以有多个slave节点
  • master之间通过ping监测彼此健康状态
  • 客户端请求可以访问集群任意节点,最终都会被转发到正确节点

这里我们会在同一台虚拟机中开启6个redis实例,模拟分片集群,信息如下:

IPPORT角色
192.168.150.1017001master
192.168.150.1017002master
192.168.150.1017003master
192.168.150.1018001slave
192.168.150.1018002slave
192.168.150.1018003slave
准备实例和配置

删除之前的7001、7002、7003这几个目录,重新创建出7001、7002、7003、8001、8002、8003目录:

# 进入/tmp目录
cd /tmp
# 删除旧的,避免配置干扰
rm -rf 7001 7002 7003
# 创建目录
mkdir 7001 7002 7003 8001 8002 8003

在/tmp下准备一个新的redis.conf文件,内容如下:

port 6379
# 开启集群功能
cluster-enabled yes
# 集群的配置文件名称,不需要我们创建,由redis自己维护
cluster-config-file /tmp/6379/nodes.conf
# 节点心跳失败的超时时间
cluster-node-timeout 5000
# 持久化文件存放目录
dir /tmp/6379
# 绑定地址
bind 0.0.0.0
# 让redis后台运行
daemonize yes
# 注册的实例ip
replica-announce-ip 192.168.150.101
# 保护模式
protected-mode no
# 数据库数量
databases 1
# 日志
logfile /tmp/6379/run.log

将这个文件拷贝到每个目录下:

# 进入/tmp目录
cd /tmp
# 执行拷贝
echo 7001 7002 7003 8001 8002 8003 | xargs -t -n 1 cp redis.conf

修改每个目录下的redis.conf,将其中的6379修改为与所在目录一致:

# 进入/tmp目录
cd /tmp
# 修改配置文件
printf '%s\n' 7001 7002 7003 8001 8002 8003 | xargs -I{} -t sed -i 's/6379/{}/g' {}/redis.conf
启动

因为已经配置了后台启动模式,所以可以直接启动服务:

# 进入/tmp目录
cd /tmp
# 一键启动所有服务
printf '%s\n' 7001 7002 7003 8001 8002 8003 | xargs -I{} -t redis-server {}/redis.conf

通过ps查看状态:

ps -ef | grep redis

发现服务都已经正常启动:

20240821225317

如果要关闭所有进程,可以执行命令:

ps -ef | grep redis | awk '{print $2}' | xargs kill

或者(推荐这种方式):

printf '%s\n' 7001 7002 7003 8001 8002 8003 | xargs -I{} -t redis-cli -p {} shutdown
创建集群

虽然服务启动了,但是目前每个服务之间都是独立的,没有任何关联

我们需要执行命令来创建集群,在Redis5.0之前创建集群比较麻烦,5.0之后集群管理命令都集成到了redis-cli中

1)Redis5.0之前

Redis5.0之前集群命令都是用redis安装包下的src/redis-trib.rb来实现的。因为redis-trib.rb是有ruby语言编写的所以需要安装ruby环境

# 安装依赖
yum -y install zlib ruby rubygems
gem install redis

然后通过命令来管理集群:

# 进入redis的src目录
cd /tmp/redis-6.2.4/src
# 创建集群
./redis-trib.rb create --replicas 1 192.168.150.101:7001 192.168.150.101:7002 192.168.150.101:7003 192.168.150.101:8001 192.168.150.101:8002 192.168.150.101:8003

2)Redis5.0以后

我们使用的是Redis6.2.4版本,集群管理以及集成到了redis-cli中,格式如下:

 redis-cli --cluster create --cluster-replicas 1 192.168.150.101:7001 192.168.150.101:7002 192.168.150.101:7003 192.168.150.101:8001 192.168.150.101:8002 192.168.150.101:8003

命令说明:

  • redis-cli --cluster或者./redis-trib.rb:代表集群操作命令
  • create:代表是创建集群
  • --replicas 1或者--cluster-replicas 1 :指定集群中每个 master 的副本个数为 1,此时节点总数 ÷ (replicas + 1) 得到的就是 master 的数量。因此节点列表中的前 n 个就是 master,其它节点都是 slave 节点,随机分配到不同 master

运行后的样子:

20240821225348

这里输入yes,则集群开始创建:

20240821225358

通过命令可以查看集群状态:

redis-cli -p 7001 cluster nodes

20240821225431

测试

尝试连接7001节点,存储一个数据:

# 连接
redis-cli -p 7001
# 存储数据
set num 123
# 读取数据
get num
# 再次存储
set a 1

结果发送了错误

20240821225505

集群操作时,需要给redis-cli加上-c参数才可以:

redis-cli -c -p 7001

这次可以了:

20240821225513

散列插槽

插槽原理

Redis 会把每一个 master 节点映射到0~16383共16384个插槽(hash slot)上,查看集群信息时就能看到:

20240821225549

数据 key不是与节点绑定,而是与插槽绑定。Redis 会根据 key 的有效部分计算插槽值,分两种情况:

  • key 中包含"{}",且“{}”中至少包含1个字符,“{}”中的部分是有效部分
  • key 中不包含“{}”,整个 key 都是有效部分

例如:key 是 num,那么就根据 num 计算,如果是{itcast}num,则根据 itcast 计算。计算方式是利用 CRC16 算法得到一个 Hash 值,然后对16384取余,得到的结果就是 slot 值

20240821225601

如图,在7001这个节点执行set a 1时,对a做hash运算,对16384取余,得到的结果是15495,因此要存储到103节点

到了7003后,执行get num时,对 num 做hash运算,对16384取余,得到的结果是2765,因此需要切换到7001节点

总结

Redis 如何判断某个 key 应该在哪个实例?

  • 将16384个插槽分配到不同的实例
  • 根据key的有效部分计算哈希值,对16384取余
  • 余数作为插槽,寻找插槽所在实例即可

如何将同一类数据固定的保存在同一个 Redis 实例?

  • 这一类数据使用相同的有效部分,例如 key 都以{typeId}为前缀

集群伸缩

redis-cli --cluster提供了很多操作集群的命令,可以通过下面方式查看:

20240821225620

比如,添加节点的命令:

20240821225631

需求分析

需求:向集群中添加一个新的master节点,并向其中存储 num = 10

  • 启动一个新的 Redis 实例,端口为7004
  • 添加7004到之前的集群,并作为一个 master 节点
  • 给7004节点分配插槽,使得 num 这个 key 可以存储到7004实例

这里需要两个新的功能:

  • 添加一个节点到集群中
  • 将部分插槽分配到新插槽
创建新的Redis实例

创建一个文件夹:

mkdir 7004

拷贝配置文件:

cp redis.conf /7004

修改配置文件:

sed /s/6379/7004/g 7004/redis.conf

启动

redis-server 7004/redis.conf
添加新节点到Redis

添加节点的语法如下:

20240821225712

执行命令:

redis-cli --cluster add-node  192.168.150.101:7004 192.168.150.101:7001

通过命令查看集群:

redis-cli -p 7001 cluster nodes

如图,7004加入了集群,并且默认是一个 master 节点:

20240821225728

但是,可以看到7004节点的插槽数量为0,因此没有任何数据可以存储到7004上

转移插槽

要将num存储到7004节点,因此需要先看看 num 的插槽是多少:

20240821225742

如上图所示,num 的插槽为2765

可以将0~3000的插槽从7001转移到7004,命令格式如下:

20240821225751

具体命令如下:

建立连接:

20240821225802

得到下面的反馈:

20240821225812

询问要移动多少个插槽,计划是3000个:

新的问题来了:

20240821225826

那个 node 来接收这些插槽?

显然是7004,那么7004节点的id是多少呢?

20240821225841

复制这个id,然后拷贝到刚才的控制台后:

20240821225855

这里询问,你的插槽是从哪里移动过来的?

  • all:代表全部,也就是三个节点各转移一部分
  • 具体的 id:目标节点的id
  • done:没有了

这里我们要从7001获取,因此填写7001的 id:

20240821225906

填完后,点击 done,这样插槽转移就准备好了:

20240821225914

确认要转移吗?输入 yes:

然后,通过命令查看结果:

20240821225924

可以看到

20240821225934

需求完成

故障转移

集群初始状态是这样的:

20240821225955

其中7001、7002、7003都是 master,我们计划让7002宕机

自动故障转移

当集群中有一个 master 宕机会发生什么呢?

直接停止一个 Redis 实例,例如7002:

redis-cli -p 7002 shutdown

1、首先是该实例与其它实例失去连接

2、然后是疑似宕机:

20240821230016

3、最后是确定下线,自动提升一个 slave 为新的 master:

20240821230023

4、当7002再次启动,就会变为一个 slave 节点了:

20240821230032

手动故障转移

利用 cluster failover 命令可以手动让集群中的某个 master 宕机,切换到执行 cluster failover 命令的这个 slave 节点,实现无感知的数据迁移。其流程如下:

20240821230043

这种 failove r命令可以指定三种模式:

  • 缺省:默认的流程,如图1~6歩
  • force:省略了对 offset 的一致性校验
  • takeover:直接执行第5歩,忽略数据一致性、忽略 master 状态和其它 master 的意见

案例需求:在7002这个 slave 节点执行手动故障转移,重新夺回 master 地位

步骤如下:

1)利用 redis-cli 连接7002这个节点

2)执行 cluster failover 命令

如图:

20240821230056

效果:

20240821230105

RedisTemplate访问分片集群

RedisTemplate 底层同样基于 lettuce 实现了分片集群的支持,而使用的步骤与哨兵模式基本一致:

1)引入 Redis 的 starter 依赖

2)配置分片集群地址

3)配置读写分离

与哨兵模式相比,其中只有分片集群的配置方式略有差异,如下:

spring:
redis:
cluster:
nodes:
- 192.168.150.101:7001
- 192.168.150.101:7002
- 192.168.150.101:7003
- 192.168.150.101:8001
- 192.168.150.101:8002
- 192.168.150.101:8003

多级缓存

什么是多级缓存

传统的缓存策略一般是请求到达 Tomcat 后,先查询 Redis,如果未命中则查询数据库,如图:

20240821230123

存在下面的问题:

  • 请求要经过 Tomcat 处理,Tomcat 的性能成为整个系统的瓶颈
  • Redis 缓存失效时,会对数据库产生冲击

多级缓存就是充分利用请求处理的每个环节,分别添加缓存,减轻 Tomcat 压力,提升服务性能:

  • 浏览器访问静态资源时,优先读取浏览器本地缓存
  • 访问非静态资源(Ajax 查询数据)时,访问服务端
  • 请求到达 Nginx 后,优先读取 Nginx 本地缓存
  • 如果 Nginx 本地缓存未命中,则去直接查询 Redis(不经过Tomcat)
  • 如果 Redis 查询未命中,则查询 Tomcat
  • 请求进入 Tomcat 后,优先查询 JVM 进程缓存
  • 如果 JVM 进程缓存未命中,则查询数据库

20240821230136

在多级缓存架构中,Nginx 内部需要编写本地缓存查询、Redis 查询、Tomcat 查询的业务逻辑,因此这样的 Nginx 服务不再是一个反向代理服务器,而是一个编写业务的 Web 服务器了

因此这样的业务Nginx服务也需要搭建集群来提高并发,再有专门的 Nginx 服务来做反向代理,如图:

20240821230149

另外,Tomcat 服务将来也会部署为集群模式:

20240821230156

可见,多级缓存的关键有两个:

  • 一个是在 Nginx 中编写业务,实现 Nginx 本地缓存、Redis、Tomcat 的查询
  • 另一个就是在 Tomcat 中实现 JVM 进程缓存

其中 Nginx 编程则会用到 OpenResty 框架结合 Lua 这样的语言

JVM进程缓存

Caffeine

缓存在日常开发中启动至关重要的作用,由于是存储在内存中,数据的读取速度是非常快的,能大量减少对数据库的访问,减少数据库的压力。把缓存分为两类:

  • 分布式缓存,例如 Redis:
    • 优点:存储容量更大、可靠性更好、可以在集群间共享
    • 缺点:访问缓存有网络开销
    • 场景:缓存数据量较大、可靠性要求较高、需要在集群间共享
  • 进程本地缓存,例如 HashMap、GuavaCache:
    • 优点:读取本地内存,没有网络开销,速度更快
    • 缺点:存储容量有限、可靠性较低、无法共享
    • 场景:性能要求较高,缓存数据量较小

**Caffeine **是一个基于 Java8 开发的,提供了近乎最佳命中率的高性能的本地缓存库。目前 Spring 内部的缓存使用的就是 Caffeine 缓存使用的基本 API:

@Test
void testBasicOps() {
// 构建cache对象
Cache<String, String> cache = Caffeine.newBuilder().build();

// 存数据
cache.put("gf", "迪丽热巴");

// 取数据
String gf = cache.getIfPresent("gf");
System.out.println("gf = " + gf);

// 取数据,包含两个参数:
// 参数一:缓存的key
// 参数二:Lambda表达式,表达式参数就是缓存的key,方法体是查询数据库的逻辑
// 优先根据key查询JVM缓存,如果未命中,则执行参数二的Lambda表达式
String defaultGF = cache.get("defaultGF", key -> {
// 根据key去数据库查询数据
return "柳岩";
});
System.out.println("defaultGF = " + defaultGF);
}

Caffeine 既然是缓存的一种,肯定需要有缓存的清除策略,不然的话内存总会有耗尽的时候

Caffeine 提供了三种缓存驱逐策略:

基于容量:设置缓存的数量上限

// 创建缓存对象
Cache<String, String> cache = Caffeine.newBuilder()
.maximumSize(1) // 设置缓存大小上限为 1
.build();

基于时间:设置缓存的有效时间

// 创建缓存对象
Cache<String, String> cache = Caffeine.newBuilder()
// 设置缓存有效期为 10 秒,从最后一次写入开始计时
.expireAfterWrite(Duration.ofSeconds(10))
.build();

基于引用:设置缓存为软引用或弱引用,利用 GC 来回收缓存数据。性能较差,不建议使用

注意:在默认情况下,当一个缓存元素过期的时候,Caffeine,不会自动立即将其清理和驱逐,而是在一次读或写操作后,或者在空闲时间完成对失效数据的驱逐

实现JVM进程缓存

需求

利用 Caffeine 实现下列需求:

  • 给根据 id 查询商品的业务添加缓存,缓存未命中时查询数据库
  • 给根据 id 查询商品库存的业务添加缓存,缓存未命中时查询数据库
  • 缓存初始大小为100
  • 缓存上限为10000
实现

首先,定义两个 Caffeine 的缓存对象,分别保存商品、库存的缓存数据

创建CaffeineConfig

package com.heima.item.config;

import com.github.benmanes.caffeine.cache.Cache;
import com.github.benmanes.caffeine.cache.Caffeine;
import com.heima.item.pojo.Item;
import com.heima.item.pojo.ItemStock;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
public class CaffeineConfig {

@Bean
public Cache<Long, Item> itemCache(){
return Caffeine.newBuilder()
.initialCapacity(100)
.maximumSize(10_000)
.build();
}

@Bean
public Cache<Long, ItemStock> stockCache(){
return Caffeine.newBuilder()
.initialCapacity(100)
.maximumSize(10_000)
.build();
}
}

然后,修改 Controller 类,添加缓存逻辑:

@RestController
@RequestMapping("item")
public class ItemController {

@Autowired
private IItemService itemService;
@Autowired
private IItemStockService stockService;

@Autowired
private Cache<Long, Item> itemCache;
@Autowired
private Cache<Long, ItemStock> stockCache;

// ...其它略

@GetMapping("/{id}")
public Item findById(@PathVariable("id") Long id) {
return itemCache.get(id, key -> itemService.query()
.ne("status", 3).eq("id", key)
.one()
);
}

@GetMapping("/stock/{id}")
public ItemStock findStockById(@PathVariable("id") Long id) {
return stockCache.get(id, key -> stockService.getById(key));
}
}

Lua语法

Nginx 编程需要用到 Lua 语言,因此我们必须先入门 Lua 的基本语法

Lua 是一种轻量小巧的脚本语言,用标准 C 语言编写并以源代码形式开放, 其设计目的是为了嵌入应用程序中,从而为应用程序提供灵活的扩展和定制功能

Lua 经常嵌入到 C 语言开发的程序中,例如游戏开发、游戏插件等

Nginx 本身也是 C 语言开发,因此也允许基于 Lua 做拓展

HelloWorld

CentOS7 默认已经安装了 Lua 语言环境,所以可以直接运行 Lua 代码

但是 Ubuntu 需要安装 Lua 环境

sudo apt install lua5.1

1、在 Linux 虚拟机的任意目录下,新建一个 hello.lua 文件

20240821230255

2、添加下面的内容

print("Hello World!")  

3、运行

20240821230312

变量和循环

Lua的数据结构

Lua 中支持的常见数据类型包括:

20240821230325

另外,Lua 提供了type()函数来判断一个变量的数据类型:

20240821230336

声明变量

Lua 声明变量的时候无需指定数据类型,而是用 local 来声明变量为局部变量:

-- 声明字符串,可以用单引号或双引号,
local str = 'hello'
-- 字符串拼接可以使用 ..
local str2 = 'hello' .. 'world'
-- 声明数字
local num = 21
-- 声明布尔类型
local flag = true

Lua 中的 table 类型既可以作为数组,又可以作为 Java 中的 map 来使用。数组就是特殊的 table,key 是数组角标而已:

-- 声明数组 ,key为角标的 table
local arr = {'java', 'python', 'lua'}
-- 声明table,类似java的map
local map = {name='Jack', age=21}

Lua 中的数组角标是从1开始,访问的时候与 Java 中类似:

-- 访问数组,lua数组的角标从1开始
print(arr[1])

Lua 中的 table 可以用 key 来访问:

-- 访问table
print(map['name'])
print(map.name)
循环

对于 table,我们可以利用 for 循环来遍历。不过数组和普通 table 遍历略有差异

遍历数组:

-- 声明数组 key为索引的 table
local arr = {'java', 'python', 'lua'}
-- 遍历数组
for index,value in ipairs(arr) do
print(index, value)
end

遍历普通 table

-- 声明map,也就是table
local map = {name='Jack', age=21}
-- 遍历table
for key,value in pairs(map) do
print(key, value)
end

条件控制、函数

Lua 中的条件控制和函数声明与 Java 类似

函数

定义函数的语法:

function 函数名( argument1, argument2..., argumentn)
-- 函数体
return 返回值
end

例如,定义一个函数,用来打印数组:

function printArr(arr)
for index, value in ipairs(arr) do
print(value)
end
end
条件控制

类似 Java 的条件控制,例如 if、else 语法:

if(布尔表达式)
then
--[ 布尔表达式为 true 时执行该语句块 --]
else
--[ 布尔表达式为 false 时执行该语句块 --]
end

与 Java不同,布尔表达式中的逻辑运算是基于英文单词:

20240821230507

案例

需求:自定义一个函数,可以打印 table,当参数为 nil 时,打印错误信息

function printArr(arr)
if not arr then
print('数组不能为空!')
end
for index, value in ipairs(arr) do
print(value)
end
end

实现多级缓存

多级缓存的实现离不开 Nginx 编程,而 Nginx 编程又离不开 OpenResty

安装OpenResty

OpenResty 是一个基于 Nginx 的高性能 Web 平台,用于方便地搭建能够处理超高并发、扩展性极高的动态 Web 应用、Web 服务和动态网关。具备下列特点:

  • 具备Nginx的完整功能
  • 基于Lua语言进行扩展,集成了大量精良的 Lua 库、第三方模块
  • 允许使用Lua自定义业务逻辑自定义库

在 Ubuntu 20.04 上通过 apt-get 安装 OpenResty - OpenResty 官方博客

为 OpenResty 应用编写你自己的 Lua 模块 - OpenResty 官方博客

快速入门

我们希望达到的多级缓存架构如图:

20240821230543

其中:

  • Windows 上的 Nginx 用来做反向代理服务,将前端的查询商品的 Ajax 请求代理到 OpenResty 集群
  • OpenResty 集群用来编写多级缓存业务
反向代理流程

请求地址是 localhost,端口是80,就被 Windows 上安装的 Nginx 服务给接收到了。然后代理给了OpenResty 集群:

20240821230556

需要在 OpenResty 中编写业务,查询商品数据并返回到浏览器

但是这次,我在 OpenResty 接收请求,返回假的商品数据

OpenResty监听请求

OpenResty 的很多功能都依赖于其目录下的 Lua 库,需要在 nginx.conf 中指定依赖库的目录,并导入依赖:

1、添加对 OpenResty 的 Lua 模块的加载

修改/usr/local/openresty/nginx/config/nginx.conf文件,在其中的 http 下面,添加下面代码:

#lua 模块
lua_package_path "/usr/local/openresty/lualib/?.lua;;";
#c模块
lua_package_cpath "/usr/local/openresty/lualib/?.so;;";

2、听/api/item路径

修改/usr/local/openresty/nginx/config/nginx.conf文件,在 nginx.conf 的 server 下面,添加对 /api/item 这个路径的监听:

location  /api/item {
# 默认的响应类型
default_type application/json;
# 响应结果由lua/item.lua文件来决定
content_by_lua_file lua/item.lua;
}

这个监听,就类似于SpringMVC中的@GetMapping("/api/item")做路径映射

content_by_lua_file lua/item.lua则相当于调用 item.lua 这个文件,执行其中的业务,把结果返回给用户。相当于 Java 中调用 service

编写item.lua

1、在/usr/loca/openresty/nginx目录创建文件夹:lua

20240821230620

2、在/usr/loca/openresty/nginx/lua文件夹下,新建文件:item.lua

20240821230629

3、编写 item.lua,返回假数据

item.lua 中,利用ngx.say()函数返回数据到 Response 中

ngx.say('{"id":10001,"name":"SALSA AIR","title":"RIMOWA 21寸托运箱拉杆箱 SALSA AIR系列果绿色 820.70.36.4","price":17900,"image":"https://m.360buyimg.com/mobilecms/s720x720_jfs/t6934/364/1195375010/84676/e9f2c55f/597ece38N0ddcbc77.jpg!q70.jpg.webp","category":"拉杆箱","brand":"RIMOWA","spec":"","status":1,"createTime":"2019-04-30T16:00:00.000+00:00","updateTime":"2019-04-30T16:00:00.000+00:00","stock":2999,"sold":31290}')

4、重写加载配置

nginx -s reload

请求参数处理

获取参数

OpenResty 中提供了一些 API 用来获取不同类型的前端请求参数:

20240821230648

获取参数并返回

1、获取商品 id

修改/usr/loca/openresty/nginx/nginx.conf文件中监听 /api/item 的代码,利用正则表达式获取 ID:

location ~ /api/item/(\d+) {
# 默认的响应类型
default_type application/json;
# 响应结果由lua/item.lua文件来决定
content_by_lua_file lua/item.lua;
}

2、拼接 id 并返回

修改/usr/loca/openresty/nginx/lua/item.lua文件,获取 id 并拼接到结果中返回:

-- 获取商品id
local id = ngx.var[1]
-- 拼接并返回
ngx.say('{"id":' .. id .. ',"name":"SALSA AIR","title":"RIMOWA 21寸托运箱拉杆箱 SALSA AIR系列果绿色 820.70.36.4","price":17900,"image":"https://m.360buyimg.com/mobilecms/s720x720_jfs/t6934/364/1195375010/84676/e9f2c55f/597ece38N0ddcbc77.jpg!q70.jpg.webp","category":"拉杆箱","brand":"RIMOWA","spec":"","status":1,"createTime":"2019-04-30T16:00:00.000+00:00","updateTime":"2019-04-30T16:00:00.000+00:00","stock":2999,"sold":31290}')

3、重新加载并测试

运行命令以重新加载 OpenResty 配置:

nginx -s reload

查询Tomcat

根据商品 id 去 Tomcat 查询商品信息

20240821230712

发送HTTP请求的API

Nginx 提供了内部 API 用以发送 HTTP 请求:

local resp = ngx.location.capture("/path",{
method = ngx.HTTP_GET, -- 请求方式
args = {a=1,b=2}, -- get方式传参数
})

回的响应内容包括:

  • resp.status:响应状态码
  • resp.header:响应头,是一个 table
  • resp.body:响应体,就是响应数据

注意:这里的 path 是路径,并不包含 IP 和端口。这个请求会被 Nginx 内部的 server 监听并处理

但是希望这个请求发送到 Tomcat 服务器,所以还需要编写一个 server 来对这个路径做反向代理:

 location /path {
# 这里是windows电脑的ip和Java服务端口,需要确保windows防火墙处于关闭状态
proxy_pass http://192.168.150.1:8081;
}

原理如下:

20240821230729

封装HTTP工具

装一个发送 HTTP 请求的工具,基于ngx.location.capture来实现查询 Tomcat

1、添加反向代理,到 Windows 的 Java 服务

因为 item-service 中的接口都是 /item 开头,所以我们监听 /item 路径,代理到 Windows 上的 Tomcat 服务

修改/usr/local/openresty/nginx/conf/nginx.conf文件,添加一个 location:

location /item {
proxy_pass http://192.168.150.1:8081;
}

以后,只要我们调用ngx.location.capture("/item"),就一定能发送请求到 Windows 的 Tomcat 服务

2、封装工具类

之前我们说过,OpenResty 启动时会加载以下两个目录中的工具文件:

20240821230746

所以,自定义的 HTTP 工具也需要放到这个目录下

/usr/local/openresty/lualib目录下,新建一个 common.lua 文件:

vi /usr/local/openresty/lualib/common.lua

内容如下:

-- 封装函数,发送http请求,并解析响应
local function read_http(path, params)
local resp = ngx.location.capture(path,{
method = ngx.HTTP_GET,
args = params,
})
if not resp then
-- 记录错误信息,返回404
ngx.log(ngx.ERR, "http请求查询失败, path: ", path , ", args: ", args)
ngx.exit(404)
end
return resp.body
end
-- 将方法导出
local _M = {
read_http = read_http
}
return _M

这个工具将read_http函数封装到 _M 这个 table 类型的变量中,并且返回,这类似于导出

使用的时候,可以利用require('common')来导入该函数库,这里的 common 是函数库的文件名

3、实现商品查询

最后,修改/usr/local/openresty/lua/item.lua文件,利用刚刚封装的函数库实现对 Tomcat 的查询:

-- 引入自定义common工具模块,返回值是common中返回的 _M
local common = require("common")
-- 从 common中获取read_http这个函数
local read_http = common.read_http
-- 获取路径参数
local id = ngx.var[1]
-- 根据id查询商品
local itemJSON = read_http("/item/".. id, nil)
-- 根据id查询商品库存
local itemStockJSON = read_http("/item/stock/".. id, nil)

这里查询到的结果是 JSON 字符串,并且包含商品、库存两个 JSON 字符串,页面最终需要的是把两个 JSON 拼接为一个 JSON:

20240821230816

需要我们先把 JSON 变为 lua 的 table,完成数据整合后,再转为 JSON

CJSON工具类

OpenResty 提供了一个 cjson 的模块用来处理 JSON 的序列化和反序列化

1、引入 CJSON 模块

local cjson = require('cjson')

2、序列化

local obj = {
name = 'jack',
age = 21
}
-- 把 table 序列化为 json
local json = cjson.encode(obj)

3、反序列化

local json = '{"name": "jack", "age": 21}'
-- 反序列化 json为 table
local obj = cjson.decode(json);
print(obj.name)
实现Tomcat查询
-- 导入common函数库
local common = require('common')
local read_http = common.read_http
-- 导入cjson库
local cjson = require('cjson')

-- 获取路径参数
local id = ngx.var[1]
-- 根据id查询商品
local itemJSON = read_http("/item/".. id, nil)
-- 根据id查询商品库存
local itemStockJSON = read_http("/item/stock/".. id, nil)

-- JSON转化为lua的table
local item = cjson.decode(itemJSON)
local stock = cjson.decode(stockJSON)

-- 组合数据
item.stock = stock.stock
item.sold = stock.sold

-- 把item序列化为json 返回结果
ngx.say(cjson.encode(item))
基于id负载均衡

OpenResty 需要对 Tomcat 集群做负载均衡

而默认的负载均衡规则是轮询模式,当我们查询/item/10001时:

  • 第一次会访问8081端口的 Tomcat 服务,在该服务内部就形成了 JVM 进程缓存
  • 第二次会访问8082端口的 Tomcat 服务,该服务内部没有 JVM 缓存(因为 JVM 缓存无法共享),会查询数据库
  • ...

因为轮询的原因,第一次查询8081形成的 JVM 缓存并未生效,直到下一次再次访问到8081时才可以生效,缓存命中率太低了。 怎么办?

如果能让同一个商品,每次查询时都访问同一个 Tomcat 服务,那么 JVM 缓存就一定能生效了

也就是说,需要根据商品 id 做负载均衡,而不是轮询

原理

Nginx 提供了基于请求路径做负载均衡的算法:

Nginx 根据请求路径做hash运算,把得到的数值对 Tomcat 服务的数量取余,余数是几,就访问第几个服务,实现负载均衡

例如:

  • 我们的请求路径是 /item/10001
  • Tomcat 总数为2台(8081、8082)
  • 对请求路径/item/1001做 Hash 运算求余的结果为1
  • 则访问第一个 Tomcat 服务,也就是8081

只要 id 不变,每次 Hash 运算结果也不会变,那就可以保证同一个商品,一直访问同一个 Tomcat 服务,确保 JVM 缓存生效

实现

修改/usr/local/openresty/nginx/conf/nginx.conf文件,实现基于 ID 做负载均衡

首先,定义 Tomcat 集群,并设置基于路径做负载均衡:

upstream tomcat-cluster {
hash $request_uri;
server 192.168.150.1:8081;
server 192.168.150.1:8082;
}

然后,修改对 Tomcat 服务的反向代理,目标指向 Tomcat 集群:

location /item {
proxy_pass http://tomcat-cluster;
}

重新加载 OpenResty

nginx -s reload

Redis缓存预热

Redis 缓存会面临冷启动问题:

冷启动:服务刚刚启动时,Redis 中并没有缓存,如果所有商品数据都在第一次查询时添加缓存,可能会给数据库带来较大压力

缓存预热:在实际开发中,我们可以利用大数据统计用户访问的热点数据,在项目启动时将这些热点数据提前查询并保存到 Redis 中

编写初始化类

缓存预热需要在项目启动时完成,并且必须是拿到 RedisTemplate 之后

这里我们利用InitializingBean接口来实现,因为InitializingBean可以在对象被 Spring 创建并且成员变量全部注入后执行

package com.heima.item.config;

import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.heima.item.pojo.Item;
import com.heima.item.pojo.ItemStock;
import com.heima.item.service.IItemService;
import com.heima.item.service.IItemStockService;
import org.springframework.beans.factory.InitializingBean;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.stereotype.Component;

import java.util.List;

@Component
public class RedisHandler implements InitializingBean {

@Autowired
private StringRedisTemplate redisTemplate;

@Autowired
private IItemService itemService;
@Autowired
private IItemStockService stockService;

private static final ObjectMapper MAPPER = new ObjectMapper();

@Override
public void afterPropertiesSet() throws Exception {
// 初始化缓存
// 1.查询商品信息
List<Item> itemList = itemService.list();
// 2.放入缓存
for (Item item : itemList) {
// 2.1.item序列化为JSON
String json = MAPPER.writeValueAsString(item);
// 2.2.存入redis
redisTemplate.opsForValue().set("item:id:" + item.getId(), json);
}

// 3.查询商品库存信息
List<ItemStock> stockList = stockService.list();
// 4.放入缓存
for (ItemStock stock : stockList) {
// 2.1.item序列化为JSON
String json = MAPPER.writeValueAsString(stock);
// 2.2.存入redis
redisTemplate.opsForValue().set("item:stock:id:" + stock.getId(), json);
}
}
}

查询Redis缓存

20240821230917

当请求进入 OpenResty 之后:

  • 优先查询 Redis 缓存
  • 如果 Redis 缓存未命中,再查询 Tomcat
封装Redis工具

OpenResty 提供了操作 Redis 的模块,我们只要引入该模块就能直接使用。但是为了方便,将 Redis 操作封装到之前的 common.lua 工具库中

修改/usr/local/openresty/lualib/common.lua文件:

1、引入 Redis 模块,并初始化 Redis 对象

-- 导入redis
local redis = require('resty.redis')
-- 初始化redis
local red = redis:new()
red:set_timeouts(1000, 1000, 1000)

2、封装函数,用来释放 Redis 连接,其实是放入连接池

-- 关闭redis连接的工具方法,其实是放入连接池
local function close_redis(red)
local pool_max_idle_time = 10000 -- 连接的空闲时间,单位是毫秒
local pool_size = 100 --连接池大小
local ok, err = red:set_keepalive(pool_max_idle_time, pool_size)
if not ok then
ngx.log(ngx.ERR, "放入redis连接池失败: ", err)
end
end

3、封装函数,根据 key 查询 Redis 数据

-- 查询redis的方法 ip和port是redis地址,key是查询的key
local function read_redis(ip, port, key)
-- 获取一个连接
local ok, err = red:connect(ip, port)
if not ok then
ngx.log(ngx.ERR, "连接redis失败 : ", err)
return nil
end
-- 查询redis
local resp, err = red:get(key)
-- 查询失败处理
if not resp then
ngx.log(ngx.ERR, "查询Redis失败: ", err, ", key = " , key)
end
--得到的数据为空处理
if resp == ngx.null then
resp = nil
ngx.log(ngx.ERR, "查询Redis数据为空, key = ", key)
end
close_redis(red)
return resp
end

4、导出

-- 将方法导出
local _M = {
read_http = read_http,
read_redis = read_redis
}
return _M

完整的 common.lua:

-- 导入redis
local redis = require('resty.redis')
-- 初始化redis
local red = redis:new()
red:set_timeouts(1000, 1000, 1000)

-- 关闭redis连接的工具方法,其实是放入连接池
local function close_redis(red)
local pool_max_idle_time = 10000 -- 连接的空闲时间,单位是毫秒
local pool_size = 100 --连接池大小
local ok, err = red:set_keepalive(pool_max_idle_time, pool_size)
if not ok then
ngx.log(ngx.ERR, "放入redis连接池失败: ", err)
end
end

-- 查询redis的方法 ip和port是redis地址,key是查询的key
local function read_redis(ip, port, key)
-- 获取一个连接
local ok, err = red:connect(ip, port)
if not ok then
ngx.log(ngx.ERR, "连接redis失败 : ", err)
return nil
end
-- 查询redis
local resp, err = red:get(key)
-- 查询失败处理
if not resp then
ngx.log(ngx.ERR, "查询Redis失败: ", err, ", key = " , key)
end
--得到的数据为空处理
if resp == ngx.null then
resp = nil
ngx.log(ngx.ERR, "查询Redis数据为空, key = ", key)
end
close_redis(red)
return resp
end

-- 封装函数,发送http请求,并解析响应
local function read_http(path, params)
local resp = ngx.location.capture(path,{
method = ngx.HTTP_GET,
args = params,
})
if not resp then
-- 记录错误信息,返回404
ngx.log(ngx.ERR, "http查询失败, path: ", path , ", args: ", args)
ngx.exit(404)
end
return resp.body
end
-- 将方法导出
local _M = {
read_http = read_http,
read_redis = read_redis
}
return _M
实现Redis查询

可以去修改 item.lua 文件,实现对 Redis 的查询了

查询逻辑是:

  • 根据 id 查询 Redis
  • 如果查询失败则继续查询 Tomcat
  • 将查询结果返回

1、修改/usr/local/openresty/lua/item.lua文件,添加一个查询函数:

-- 导入common函数库
local common = require('common')
local read_http = common.read_http
local read_redis = common.read_redis
-- 封装查询函数
function read_data(key, path, params)
-- 查询本地缓存
local val = read_redis("127.0.0.1", 6379, key)
-- 判断查询结果
if not val then
ngx.log(ngx.ERR, "redis查询失败,尝试查询http, key: ", key)
-- redis查询失败,去查询http
val = read_http(path, params)
end
-- 返回数据
return val
end

2、而后修改商品查询、库存查询的业务:20240821230957

3、完整的 item.lua 代码:

-- 导入common函数库
local common = require('common')
local read_http = common.read_http
local read_redis = common.read_redis
-- 导入cjson库
local cjson = require('cjson')

-- 封装查询函数
function read_data(key, path, params)
-- 查询本地缓存
local val = read_redis("127.0.0.1", 6379, key)
-- 判断查询结果
if not val then
ngx.log(ngx.ERR, "redis查询失败,尝试查询http, key: ", key)
-- redis查询失败,去查询http
val = read_http(path, params)
end
-- 返回数据
return val
end

-- 获取路径参数
local id = ngx.var[1]

-- 查询商品信息
local itemJSON = read_data("item:id:" .. id, "/item/" .. id, nil)
-- 查询库存信息
local stockJSON = read_data("item:stock:id:" .. id, "/item/stock/" .. id, nil)

-- JSON转化为lua的table
local item = cjson.decode(itemJSON)
local stock = cjson.decode(stockJSON)
-- 组合数据
item.stock = stock.stock
item.sold = stock.sold

-- 把item序列化为json 返回结果
ngx.say(cjson.encode(item))

Nginx本地缓存

本地缓存API

OpenResty 为 Nginx 提供了shard dict的功能,可以在 Nginx 的多个 worker 之间共享数据,实现缓存功能

1、开启共享字典,在 nginx.conf 的 http 下添加配置:

 # 共享字典,也就是本地缓存,名称叫做:item_cache,大小150m
lua_shared_dict item_cache 150m;

2、操作共享字典

-- 获取本地缓存对象
local item_cache = ngx.shared.item_cache
-- 存储, 指定key、value、过期时间,单位s,默认为0代表永不过期
item_cache:set('key', 'value', 1000)
-- 读取
local val = item_cache:get('key')
实现本地缓存查询

1、修改/usr/local/openresty/lua/item.lua文件,修改read_data查询函数,添加本地缓存逻辑:

-- 导入共享词典,本地缓存
local item_cache = ngx.shared.item_cache

-- 封装查询函数
function read_data(key, expire, path, params)
-- 查询本地缓存
local val = item_cache:get(key)
if not val then
ngx.log(ngx.ERR, "本地缓存查询失败,尝试查询Redis, key: ", key)
-- 查询redis
val = read_redis("127.0.0.1", 6379, key)
-- 判断查询结果
if not val then
ngx.log(ngx.ERR, "redis查询失败,尝试查询http, key: ", key)
-- redis查询失败,去查询http
val = read_http(path, params)
end
end
-- 查询成功,把数据写入本地缓存
item_cache:set(key, val, expire)
-- 返回数据
return val
end

2、修改 item.lua 中查询商品和库存的业务,实现最新的read_data函数:

20240821231033

其实就是多了缓存时间参数,过期后 Nginx 缓存会自动删除,下次访问即可更新缓存

这里给商品基本信息设置超时时间为30分钟,库存为1分钟

因为库存更新频率较高,如果缓存时间过长,可能与数据库差异较大

3、完整的 item.lua 文件:

-- 导入common函数库
local common = require('common')
local read_http = common.read_http
local read_redis = common.read_redis
-- 导入cjson库
local cjson = require('cjson')
-- 导入共享词典,本地缓存
local item_cache = ngx.shared.item_cache

-- 封装查询函数
function read_data(key, expire, path, params)
-- 查询本地缓存
local val = item_cache:get(key)
if not val then
ngx.log(ngx.ERR, "本地缓存查询失败,尝试查询Redis, key: ", key)
-- 查询redis
val = read_redis("127.0.0.1", 6379, key)
-- 判断查询结果
if not val then
ngx.log(ngx.ERR, "redis查询失败,尝试查询http, key: ", key)
-- redis查询失败,去查询http
val = read_http(path, params)
end
end
-- 查询成功,把数据写入本地缓存
item_cache:set(key, val, expire)
-- 返回数据
return val
end

-- 获取路径参数
local id = ngx.var[1]

-- 查询商品信息
local itemJSON = read_data("item:id:" .. id, 1800, "/item/" .. id, nil)
-- 查询库存信息
local stockJSON = read_data("item:stock:id:" .. id, 60, "/item/stock/" .. id, nil)

-- JSON转化为lua的table
local item = cjson.decode(itemJSON)
local stock = cjson.decode(stockJSON)
-- 组合数据
item.stock = stock.stock
item.sold = stock.sold

-- 把item序列化为json 返回结果
ngx.say(cjson.encode(item))

缓存同步

数据同步策略

缓存数据同步的常见方式有三种:

设置有效期:给缓存设置有效期,到期后自动删除。再次查询时更新

  • 优势:简单、方便
  • 缺点:时效性差,缓存过期之前可能不一致
  • 场景:更新频率较低,时效性要求低的业务

同步双写:在修改数据库的同时,直接修改缓存

  • 优势:时效性强,缓存与数据库强一致
  • 缺点:有代码侵入,耦合度高;
  • 场景:对一致性、时效性要求较高的缓存数据

**异步通知:**修改数据库时发送事件通知,相关服务监听到通知后修改缓存数据

  • 优势:低耦合,可以同时通知多个缓存服务
  • 缺点:时效性一般,可能存在中间不一致状态
  • 场景:时效性要求一般,有多个服务需要同步

而异步实现又可以基于 MQ 或者 Canal 来实现:

1)基于 MQ 的异步通知:

20240821231059

解读:

  • 商品服务完成对数据的修改后,只需要发送一条消息到 MQ 中
  • 缓存服务监听 MQ 消息,然后完成对缓存的更新

依然有少量的代码侵入

2)基于Canal的通知

20240821231113

解读:

  • 商品服务完成商品修改后,业务直接结束,没有任何代码侵入
  • Canal 监听 MySQL 变化,当发现变化后,立即通知缓存服务
  • 缓存服务接收到 Canal 通知,更新缓存

代码零侵入

安装Canal

安装Canal.pdf

Canal 是阿里巴巴旗下的一款开源项目,基于 Java 开发。基于数据库增量日志解析,提供增量数据订阅&消费

Canal 是基于 MySQL 的主从同步来实现的,MySQL 主从同步的原理如下:

20240821231128

  • 1)MySQL master 将数据变更写入二进制日志(binary log),其中记录的数据叫做 binary log events
  • 2)MySQL slave 将 master 的 binary log events 拷贝到它的中继日志(relay log)
  • 3)MySQL slave 重放 relay log 中事件,将数据变更反映它自己的数据

而 Canal 就是把自己伪装成 MySQL 的一个 slave 节点,从而监听 master 的 binary log 变化。再把得到的变化信息通知给 Canal 的客户端,进而完成对其它数据库的同步

20240821231149

监听Canal

Canal 提供了各种语言的客户端,当 Canal 监听到 binlog 变化时,会通知 Canal 的客户端

20240821231158

使用 Canal 提供的 Java 客户端,监听 Canal 通知消息。当收到变化的消息时,完成对缓存的更新

不过这里使用 GitHub 上的第三方开源的 canal-starter 客户端。地址:https://github.com/NormanGyllenhaal/canal-client

与 SpringBoot 完美整合,自动装配,比官方客户端要简单好用很多

引入依赖
<dependency>
<groupId>top.javatool</groupId>
<artifactId>canal-spring-boot-starter</artifactId>
<version>1.2.1-RELEASE</version>
</dependency>
编写配置
canal:
destination: heima # canal的集群名字,要与安装canal时设置的名称一致
server: 192.168.150.101:11111 # canal服务地址
修改Item实体类

通过@Id@Column等注解完成 Item 与数据库表字段的映射:

package com.heima.item.pojo;

import com.baomidou.mybatisplus.annotation.IdType;
import com.baomidou.mybatisplus.annotation.TableField;
import com.baomidou.mybatisplus.annotation.TableId;
import com.baomidou.mybatisplus.annotation.TableName;
import lombok.Data;
import org.springframework.data.annotation.Id;
import org.springframework.data.annotation.Transient;

import javax.persistence.Column;
import java.util.Date;

@Data
@TableName("tb_item")
public class Item {
@TableId(type = IdType.AUTO)
@Id
private Long id;//商品id
@Column(name = "name")
private String name;//商品名称
private String title;//商品标题
private Long price;//价格(分)
private String image;//商品图片
private String category;//分类名称
private String brand;//品牌名称
private String spec;//规格
private Integer status;//商品状态 1-正常,2-下架
private Date createTime;//创建时间
private Date updateTime;//更新时间
@TableField(exist = false)
@Transient
private Integer stock;
@TableField(exist = false)
@Transient
private Integer sold;
}
编写监听器

通过实现EntryHandler<T>接口编写监听器,监听 Canal 消息。注意两点:

实现类通过@CanalTable("tb_item")指定监听的表信息

EntryHandler的泛型是与表对应的实体类

package com.heima.item.canal;

import com.github.benmanes.caffeine.cache.Cache;
import com.heima.item.config.RedisHandler;
import com.heima.item.pojo.Item;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import top.javatool.canal.client.annotation.CanalTable;
import top.javatool.canal.client.handler.EntryHandler;

@CanalTable("tb_item")
@Component
public class ItemHandler implements EntryHandler<Item> {

@Autowired
private RedisHandler redisHandler;
@Autowired
private Cache<Long, Item> itemCache;

@Override
public void insert(Item item) {
// 写数据到JVM进程缓存
itemCache.put(item.getId(), item);
// 写数据到redis
redisHandler.saveItem(item);
}

@Override
public void update(Item before, Item after) {
// 写数据到JVM进程缓存
itemCache.put(after.getId(), after);
// 写数据到redis
redisHandler.saveItem(after);
}

@Override
public void delete(Item item) {
// 删除数据到JVM进程缓存
itemCache.invalidate(item.getId());
// 删除数据到redis
redisHandler.deleteItemById(item.getId());
}
}

在这里对 Redis 的操作都封装到了 RedisHandler 这个对象中,这是之前做缓存预热时编写的一个类,内容如下:

package com.heima.item.config;

import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.heima.item.pojo.Item;
import com.heima.item.pojo.ItemStock;
import com.heima.item.service.IItemService;
import com.heima.item.service.IItemStockService;
import org.springframework.beans.factory.InitializingBean;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.stereotype.Component;

import java.util.List;

@Component
public class RedisHandler implements InitializingBean {

@Autowired
private StringRedisTemplate redisTemplate;

@Autowired
private IItemService itemService;
@Autowired
private IItemStockService stockService;

private static final ObjectMapper MAPPER = new ObjectMapper();

@Override
public void afterPropertiesSet() throws Exception {
// 初始化缓存
// 1.查询商品信息
List<Item> itemList = itemService.list();
// 2.放入缓存
for (Item item : itemList) {
// 2.1.item序列化为JSON
String json = MAPPER.writeValueAsString(item);
// 2.2.存入redis
redisTemplate.opsForValue().set("item:id:" + item.getId(), json);
}

// 3.查询商品库存信息
List<ItemStock> stockList = stockService.list();
// 4.放入缓存
for (ItemStock stock : stockList) {
// 2.1.item序列化为JSON
String json = MAPPER.writeValueAsString(stock);
// 2.2.存入redis
redisTemplate.opsForValue().set("item:stock:id:" + stock.getId(), json);
}
}

public void saveItem(Item item) {
try {
String json = MAPPER.writeValueAsString(item);
redisTemplate.opsForValue().set("item:id:" + item.getId(), json);
} catch (JsonProcessingException e) {
throw new RuntimeException(e);
}
}

public void deleteItemById(Long id) {
redisTemplate.delete("item:id:" + id);
}
}

最佳实践

键值设计

优雅的Key结构

Redis 的 Key 虽然可以自定义,但最好遵循下面的几个最佳实践约定:

  • 遵循基本格式:[业务名称]:[数据名]:[id]
  • 长度不超过44字节
  • 不包含特殊字符

例如:登录业务,保存用户信息,其 key 可以设计成如下格式:

20240821231242

这样设计的好处:

  • 可读性强
  • 避免 key 冲突
  • 方便管理
  • 更节省内存: key 是 string 类型,底层编码包含 int、embstr 和 raw 三种。embstr 在小于44字节使用,采用连续内存空间,内存占用更小。当字节数大于44字节时,会转为 raw 模式存储,在 raw 模式下,内存空间不是连续的,而是采用一个指针指向了另外一段内存空间,在这段空间里存储 SDS 内容,这样空间不连续,访问的时候性能也就会收到影响,还有可能产生内存碎片

20240821231252

拒绝BigKey

BigKey 通常以 Key 的大小和 Key 中成员的数量来综合判定,例如:

  • Key 本身的数据量过大:一个 String 类型的 Key,它的值为5 MB
  • Key 中的成员数过多:一个 ZSET 类型的 Key,它的成员数量为10,000个
  • Key 中成员的数据量过大:一个 Hash 类型的 Key,它的成员数量虽然只有1,000个但这些成员的 Value(值)总大小为100 MB

那么如何判断元素的大小呢?Redis 也提供了命令

20240821231306

推荐值:

  • 单个 key 的 value 小于10KB
  • 对于集合类型的 key,建议元素数量小于1000
BigKey的危害
  • 网络阻塞
    • 对 BigKey 执行读请求时,少量的 QPS 就可能导致带宽使用率被占满,导致 Redis 实例,乃至所在物理机变慢
  • 数据倾斜
    • BigKey 所在的 Redis 实例内存使用率远超其他实例,无法使数据分片的内存资源达到均衡
  • Redis 阻塞
    • 对元素较多的 hash、list、zset 等做运算会耗时较旧,使主线程被阻塞
  • CPU 压力
    • 对 BigKey 的数据序列化和反序列化会导致 CPU 的使用率飙升,影响 Redis 实例和本机其它应用
如何发现BigKey

1、redis-cli --bigkeys

利用 redis-cli 提供的 --bigkeys 参数,可以遍历分析所有 key,并返回 Key 的整体统计信息与每个数据的 Top1 的 big key

命令:redis-cli -a 密码 --bigkeys

20240821231328

2、scan扫描

自己编程,利用 scan 扫描 Redis 中的所有 key,利用 strlen、hlen 等命令判断 key 的长度(此处不建议使用MEMORY USAGE

20240821231452

scan 命令调用完后每次会返回2个元素,第一个是下一次迭代的光标,第一次光标会设置为0,当最后一次scan 返回的光标等于0时,表示整个 scan 遍历结束了,第二个返回的是 List,一个匹配的 key 的数组

import com.heima.jedis.util.JedisConnectionFactory;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import redis.clients.jedis.Jedis;
import redis.clients.jedis.ScanResult;

import java.util.HashMap;
import java.util.List;
import java.util.Map;

public class JedisTest {
private Jedis jedis;

@BeforeEach
void setUp() {
// 1.建立连接
// jedis = new Jedis("192.168.150.101", 6379);
jedis = JedisConnectionFactory.getJedis();
// 2.设置密码
jedis.auth("123321");
// 3.选择库
jedis.select(0);
}

final static int STR_MAX_LEN = 10 * 1024;
final static int HASH_MAX_LEN = 500;

@Test
void testScan() {
int maxLen = 0;
long len = 0;

String cursor = "0";
do {
// 扫描并获取一部分key
ScanResult<String> result = jedis.scan(cursor);
// 记录cursor
cursor = result.getCursor();
List<String> list = result.getResult();
if (list == null || list.isEmpty()) {
break;
}
// 遍历
for (String key : list) {
// 判断key的类型
String type = jedis.type(key);
switch (type) {
case "string":
len = jedis.strlen(key);
maxLen = STR_MAX_LEN;
break;
case "hash":
len = jedis.hlen(key);
maxLen = HASH_MAX_LEN;
break;
case "list":
len = jedis.llen(key);
maxLen = HASH_MAX_LEN;
break;
case "set":
len = jedis.scard(key);
maxLen = HASH_MAX_LEN;
break;
case "zset":
len = jedis.zcard(key);
maxLen = HASH_MAX_LEN;
break;
default:
break;
}
if (len >= maxLen) {
System.out.printf("Found big key : %s, type: %s, length or size: %d %n", key, type, len);
}
}
} while (!cursor.equals("0"));
}

@AfterEach
void tearDown() {
if (jedis != null) {
jedis.close();
}
}

}

3、第三方工具

利用第三方工具,如 Redis-Rdb-Tools 分析 RDB 快照文件,全面分析内存使用情况

https://github.com/sripathikrishnan/redis-rdb-tools

4、网络监控

自定义工具,监控进出 Redis 的网络数据,超出预警值时主动告警

一般阿里云搭建的云服务器就有相关监控页面

20240821231511

如何删除BigKey

BigKey 内存占用较多,即便时删除这样的 key 也需要耗费很长时间,导致 Redis 主线程阻塞,引发一系列问题

redis 3.0 及以下版本:

如果是集合类型,则遍历 BigKey 的元素,先逐个删除子元素,最后删除 BigKey

20240821231523

Redis 4.0以后

Redis 在4.0后提供了异步删除的命令:unlink

恰当的数据类型

例1:存储一个 User 对象

方式一:JSON 字符串

user:1{"name": "Jack", "age": 21}

优点:实现简单粗暴

缺点:数据耦合,不够灵活

方式二:字段打散

user:1
Jack
user:1
21

优点:可以灵活访问对象任意字段

缺点:占用空间大、没办法做统一控制

方式三:Hash(推荐)

user:1namejack
age21

优点:底层使用 ziplist,空间占用小,可以灵活访问对象的任意字段

缺点:代码相对复杂

例2:有 Hash 类型的 Key,其中有 100 万对 field 和 value,field 是自增 id,这个 key 存在什么问题?如何优化?

keyfieldvalue
someKeyid:0value0
..........
id:999999value999999

存在的问题:

1、Hash 的 entry 数量超过500时,会使用哈希表而不是 ZipList,内存占用较多

20240821231548

2、可以通过hash-max-ziplist-entries配置 entry 上限。但是如果 entry 过多就会导致 BigKey 问题

方案一:拆分为 String 类型

keyvalue
id:0value0
..........
id:999999value999999

存在的问题:

1、string 结构底层没有太多内存优化,内存占用较多

20240821231600

2、想要批量获取这些数据比较麻烦

方案二:拆分为小的 Hash,将 id/100 作为 key,将 id%100 作为 field,这样每100个元素为一个 Hash

keyfieldvalue
key:0id:00value0
..........
id:99value99
key:1id:00value100
..........
id:99value199
....
key:9999id:00value999900
..........
id:99value999999

20240821231609

package com.heima.test;

import com.heima.jedis.util.JedisConnectionFactory;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import redis.clients.jedis.Jedis;
import redis.clients.jedis.Pipeline;
import redis.clients.jedis.ScanResult;

import java.util.HashMap;
import java.util.List;
import java.util.Map;

public class JedisTest {
private Jedis jedis;

@BeforeEach
void setUp() {
// 1.建立连接
// jedis = new Jedis("192.168.150.101", 6379);
jedis = JedisConnectionFactory.getJedis();
// 2.设置密码
jedis.auth("123321");
// 3.选择库
jedis.select(0);
}

@Test
void testSetBigKey() {
Map<String, String> map = new HashMap<>();
for (int i = 1; i <= 650; i++) {
map.put("hello_" + i, "world!");
}
jedis.hmset("m2", map);
}

@Test
void testBigHash() {
Map<String, String> map = new HashMap<>();
for (int i = 1; i <= 100000; i++) {
map.put("key_" + i, "value_" + i);
}
jedis.hmset("test:big:hash", map);
}

@Test
void testBigString() {
for (int i = 1; i <= 100000; i++) {
jedis.set("test:str:key_" + i, "value_" + i);
}
}

@Test
void testSmallHash() {
int hashSize = 100;
Map<String, String> map = new HashMap<>(hashSize);
for (int i = 1; i <= 100000; i++) {
int k = (i - 1) / hashSize;
int v = i % hashSize;
map.put("key_" + v, "value_" + v);
if (v == 0) {
jedis.hmset("test:small:hash_" + k, map);
}
}
}

@AfterEach
void tearDown() {
if (jedis != null) {
jedis.close();
}
}
}

总结

  • Key 的最佳实践
    • 固定格式:[业务名]:[数据名]:[id]
    • 足够简短:不超过44字节
    • 不包含特殊字符
  • Value 的最佳实践:
    • 合理的拆分数据,拒绝 BigKey
    • 选择合适数据结构
    • Hash 结构的 entry 数量不要超过1000
    • 设置合理的超时时间

批处理优化

Pipeline

客户端与 Redis 服务器是这样交互的

单个命令的执行流程

20240821231628

N条命令的执行流程

20240821231636

Redis 处理指令是很快的,主要花费的时候在于网络传输。于是乎很容易想到将多条指令批量的传输给 Redis

20240821231645

MSet

Redis 提供了很多 Mxxx 这样的命令,可以实现批量插入数据,例如:

  • mset
  • hmset

利用 mset 批量插入10万条数据

@Test
void testMxx() {
String[] arr = new String[2000];
int j;
long b = System.currentTimeMillis();
for (int i = 1; i <= 100000; i++) {
j = (i % 1000) << 1;
arr[j] = "test:key_" + i;
arr[j + 1] = "value_" + i;
if (j == 0) {
jedis.mset(arr);
}
}
long e = System.currentTimeMillis();
System.out.println("time: " + (e - b));
}
Pipeline

MSET 虽然可以批处理,但是却只能操作部分数据类型,因此如果有对复杂数据类型的批处理需要,建议使用 Pipeline

@Test
void testPipeline() {
// 创建管道
Pipeline pipeline = jedis.pipelined();
long b = System.currentTimeMillis();
for (int i = 1; i <= 100000; i++) {
// 放入命令到管道
pipeline.set("test:key_" + i, "value_" + i);
if (i % 1000 == 0) {
// 每放入1000条命令,批量执行
pipeline.sync();
}
}
long e = System.currentTimeMillis();
System.out.println("time: " + (e - b));
}

集群下的批处理

如 MSET 或 Pipeline 这样的批处理需要在一次请求中携带多条命令,而此时如果 Redis 是一个集群,那批处理命令的多个 key 必须落在一个插槽中,否则就会导致执行失败。大家可以想一想这样的要求其实很难实现,因为我们在批处理时,可能一次要插入很多条数据,这些数据很有可能不会都落在相同的节点上,这就会导致报错了

这个时候,可以找到 4 种解决方案

20240821231704

第一种方案:串行执行,所以这种方式没有什么意义,当然,执行起来就很简单了,缺点就是耗时过久

第二种方案:串行 slot,简单来说,就是执行前,客户端先计算一下对应的 key 的 slot,一样 slot 的 key 就放到一个组里边,不同的,就放到不同的组里边,然后对每个组执行 pipeline 的批处理,他就能串行执行各个组的命令,这种做法比第一种方法耗时要少,但是缺点呢,相对来说复杂一点,所以这种方案还需要优化一下

第三种方案:并行slot,相较于第二种方案,在分组完成后串行执行,第三种方案,就变成了并行执行各个命令,所以他的耗时就非常短,但是实现呢,也更加复杂

第四种:hash_tag,Redis 计算 key 的 slot 的时候,其实是根据 key 的有效部分来计算的,通过这种方式就能一次处理所有的 key,这种方式耗时最短,实现也简单,但是如果通过操作 key 的有效部分,那么就会导致所有的 key 都落在一个节点上,产生数据倾斜的问题,所以推荐使用第三种方式

串行化执行代码实践

代码实现:

public class JedisClusterTest {

private JedisCluster jedisCluster;

@BeforeEach
void setUp() {
// 配置连接池
JedisPoolConfig poolConfig = new JedisPoolConfig();
poolConfig.setMaxTotal(8);
poolConfig.setMaxIdle(8);
poolConfig.setMinIdle(0);
poolConfig.setMaxWaitMillis(1000);
HashSet<HostAndPort> nodes = new HashSet<>();
nodes.add(new HostAndPort("192.168.150.101", 7001));
nodes.add(new HostAndPort("192.168.150.101", 7002));
nodes.add(new HostAndPort("192.168.150.101", 7003));
nodes.add(new HostAndPort("192.168.150.101", 8001));
nodes.add(new HostAndPort("192.168.150.101", 8002));
nodes.add(new HostAndPort("192.168.150.101", 8003));
jedisCluster = new JedisCluster(nodes, poolConfig);
}

@Test
void testMSet() {
jedisCluster.mset("name", "Jack", "age", "21", "sex", "male");

}

@Test
void testMSet2() {
Map<String, String> map = new HashMap<>(3);
map.put("name", "Jack");
map.put("age", "21");
map.put("sex", "Male");
//对Map数据进行分组。根据相同的slot放在一个分组
//key就是slot,value就是一个组
Map<Integer, List<Map.Entry<String, String>>> result = map.entrySet()
.stream()
.collect(Collectors.groupingBy(
entry -> ClusterSlotHashUtil.calculateSlot(entry.getKey()))
);
//串行的去执行mset的逻辑
for (List<Map.Entry<String, String>> list : result.values()) {
String[] arr = new String[list.size() * 2];
int j = 0;
for (int i = 0; i < list.size(); i++) {
j = i<<2;
Map.Entry<String, String> e = list.get(0);
arr[j] = e.getKey();
arr[j + 1] = e.getValue();
}
jedisCluster.mset(arr);
}
}

@AfterEach
void tearDown() {
if (jedisCluster != null) {
jedisCluster.close();
}
}
}
Spring集群下批处理代码
@Test
void testMSetInCluster() {
Map<String, String> map = new HashMap<>(3);
map.put("name", "Rose");
map.put("age", "21");
map.put("sex", "Female");
stringRedisTemplate.opsForValue().multiSet(map);


List<String> strings = stringRedisTemplate.opsForValue().multiGet(Arrays.asList("name", "age", "sex"));
strings.forEach(System.out::println);

}

原理分析

在 RedisAdvancedClusterAsyncCommandsImpl 类中

首先根据 slotHash 算出来一个 partitioned的map,map 中的 key 就是 slot,而他的 value 就是对应的对应相同 slot 的 key 对应的数据 通过RedisFuture<String> mset = super.mset(op);进行异步的消息发送

@Override
public RedisFuture<String> mset(Map<K, V> map) {

Map<Integer, List<K>> partitioned = SlotHash.partition(codec, map.keySet());

if (partitioned.size() < 2) {
return super.mset(map);
}

Map<Integer, RedisFuture<String>> executions = new HashMap<>();

for (Map.Entry<Integer, List<K>> entry : partitioned.entrySet()) {

Map<K, V> op = new HashMap<>();
entry.getValue().forEach(k -> op.put(k, map.get(k)));

RedisFuture<String> mset = super.mset(op);
executions.put(entry.getKey(), mset);
}

return MultiNodeExecution.firstOfAsync(executions);
}

服务端优化

持久化配置

Redis 的持久化虽然可以保证数据安全,但也会带来很多额外的开销,因此持久化请遵循下列建议:

  • 用来做缓存的 Redis 实例尽量不要开启持久化功能
  • 建议关闭 RDB 持久化功能,使用 AOF 持久化
  • 利用脚本定期在 slave 节点做 RDB,实现数据备份
  • 设置合理的 rewrite 阈值,避免频繁的 bgrewrite
  • 配置no-appendfsync-on-rewrite = yes,禁止在 rewrite 期间做aof,避免因 AOF 引起的阻塞
  • 部署有关建议:
    • Redis 实例的物理机要预留足够内存,应对 fork 和 rewrite
    • 单个 Redis 实例内存上限不要太大,例如4G或8G。可以加快 fork 的速度、减少主从同步、数据迁移压力
    • 不要与 CPU 密集型应用部署在一起
    • 不要与高硬盘负载应用一起部署。例如:数据库、消息队列

慢查询优化

什么是慢查询

并不是很慢的查询才是慢查询,而是:在 Redis 执行时耗时超过某个阈值的命令,称为慢查询

慢查询的危害:由于 Redis 是单线程的,所以当客户端发出指令后,他们都会进入到 Redis 底层的 queue 来执行,如果此时有一些慢查询的数据,就会导致大量请求阻塞,从而引起报错,所以我们需要解决慢查询问题

20240821231749

慢查询的阈值可以通过配置指定:

slowlog-log-slower-than:慢查询阈值,单位是微秒。默认是10000,建议1000

慢查询会被放入慢查询日志中,日志的长度有上限,可以通过配置指定:

slowlog-max-len:慢查询日志(本质是一个队列)的长度。默认是128,建议1000

20240821231800

修改这两个配置可以使用:config set命令:

20240821231811

如何查看慢查询

如何去查看慢查询日志列表呢:

  • slowlog len:查询慢查询日志长度
  • slowlog get [n]:读取 n 条慢查询日志
  • slowlog reset:清空慢查询列表

20240821231821

命令及安全配置

Redis 会绑定在0.0.0.0:6379,这样将会将 Redis 服务暴露到公网上,而 Redis 如果没有做身份认证,会出现严重的安全漏洞

漏洞重现方式:https://cloud.tencent.com/developer/article/1039000

为什么会出现不需要密码也能够登录呢,主要是 Redis 考虑到每次登录都比较麻烦,所以 Redis 就有一种 ssh 免秘钥登录的方式,生成一对公钥和私钥,私钥放在本地,公钥放在 Redis 端,当登录时服务器,再登录时候,他会去解析公钥和私钥,如果没有问题,则不需要利用 Redis 的登录也能访问,这种做法本身也很常见,但是这里有一个前提,前提就是公钥必须保存在服务器上,才行,但是 Redis 的漏洞在于在不登录的情况下,也能把秘钥送到 Linux 服务器,从而产生漏洞

漏洞出现的核心的原因有以下几点:

  • Redis 未设置密码
  • 利用了 Redis 的config set命令动态修改 Redis 配置
  • 使用了 Root 账号权限启动 Redis

所以:如何解决呢?可以采用如下几种方案

为了避免这样的漏洞,这里给出一些建议:

  • Redis 一定要设置密码
  • 禁止线上使用下面命令:keys、flushall、flushdb、config set 等命令。可以利用 rename-command 禁用。
  • bind:限制网卡,禁止外网网卡访问
  • 开启防火墙
  • 不要使用 Root 账户启动 Redis
  • 尽量不是有默认的端口

Redis内存划分和内存配置

当 Redis 内存不足时,可能导致 Key 频繁被删除、响应时间变长、QPS 不稳定等问题。当内存使用率达到90%以上时就需要警惕,并快速定位到内存占用的原因

有关碎片问题分析

Redis 底层分配并不是这个 key 有多大,他就会分配多大,而是有他自己的分配策略,比如8,16,20等等,假定当前 key 只需要10个字节,此时分配 8 肯定不够,那么他就会分配 16 个字节,多出来的 6 个字节就不能被使用,这就是常说的碎片问题

进程内存问题分析:

这片内存,通常都可以忽略不计

缓冲区内存问题分析:

一般包括客户端缓冲区、AOF 缓冲区、复制缓冲区等。客户端缓冲区又包括输入缓冲区和输出缓冲区两种。这部分内存占用波动较大,所以这片内存也是需要重点分析的内存问题

内存占用说明
数据内存是 Redis 最主要的部分,存储 Redis 的键值信息。主要问题是 BigKey 问题、内存碎片问题
进程内存Redis 主进程本身运⾏肯定需要占⽤内存,如代码、常量池等等;这部分内存⼤约⼏兆,在⼤多数⽣产环境中与 Redis 数据占⽤的内存相⽐可以忽略。
缓冲区内存一般包括客户端缓冲区、AOF 缓冲区、复制缓冲区等。客户端缓冲区又包括输入缓冲区和输出缓冲区两种。这部分内存占用波动较大,不当使用 BigKey,可能导致内存溢出。

于是需要通过一些命令,可以查看到 Redis 目前的内存分配状态:

info memory:查看内存分配的情况

20240821231857

memory xxx:查看 key 的主要占用情况

20240821231908

看到了这些配置,最关键的缓存区内存如何定位和解决呢?

内存缓冲区常见的有三种:

  • 复制缓冲区:主从复制的 repl_backlog_buf,如果太小可能导致频繁的全量复制,影响性能。通过 replbacklog-size 来设置,默认1mb
  • AOF 缓冲区:AOF 刷盘之前的缓存区域,AOF 执行 rewrite 的缓冲区。无法设置容量上限
  • 客户端缓冲区:分为输入缓冲区和输出缓冲区,输入缓冲区最大1G且不能设置。输出缓冲区可以设置

以上复制缓冲区和 AOF 缓冲区 不会有问题,最关键就是客户端缓冲区的问题

客户端缓冲区:指的就是发送命令时,客户端用来缓存命令的一个缓冲区,也就是向 Redis 输入数据的输入端缓冲区和 Redis 向客户端返回数据的响应缓存区,输入缓冲区最大1G且不能设置,所以这一块根本不用担心,如果超过了这个空间,Redis 会直接断开,因为本来此时此刻就代表着 Redis 处理不过来了,需要担心的就是输出端缓冲区

20240821231922

在使用 Redis 过程中,处理大量的 big value,那么会导致输出结果过多,如果输出缓存区过大,会导致 Redis 直接断开,而默认配置的情况下, 其实他是没有大小的,这就比较坑了,内存可能一下子被占满,会直接导致 Redis 断开,所以解决方案有两个

1、设置一个大小

2、增加带宽的大小,避免出现大量数据从而直接超过了 Redis 的承受能力

集群优化

集群虽然具备高可用特性,能实现自动故障恢复,但是如果使用不当,也会存在一些问题:

  • 集群完整性问题
  • 集群带宽问题
  • 数据倾斜问题
  • 客户端性能问题
  • 命令的集群兼容性问题
  • lua和事务问题

问题1、在Redis的默认配置中,如果发现任意一个插槽不可用,则整个集群都会停止对外服务:

可以设想一下,如果有几个 slot 不能使用,那么此时整个集群都不能用了,在开发中,其实最重要的是可用性,所以需要把如下配置修改成 no,即有 slot 不能使用时,我们的 Redis 集群还是可以对外提供服务

20240821231936

问题2、集群带宽问题

集群节点之间会不断的互相 Ping 来确定集群中其它节点的状态。每次 Ping 携带的信息至少包括:

  • 插槽信息
  • 集群状态信息

集群中节点越多,集群状态信息数据量也越大,10个节点的相关信息可能达到1kb,此时每次集群互通需要的带宽会非常高,这样会导致集群中大量的带宽都会被 ping 信息所占用,这是一个非常可怕的问题,所以需要去解决这样的问题

解决途径:

  • 避免大集群,集群节点数不要太多,最好少于 1000,如果业务庞大,则建立多个集群。
  • 避免在单个物理机中运行太多 Redis 实例
  • 配置合适的 cluster-node-timeout 值

问题3、命令的集群兼容性问题

当使用批处理的命令时,Redis 要求 key 必须落在相同的 slot 上,然后大量的 key 同时操作时,是无法完成的,所以客户端必须要对这样的数据进行处理,这些方案之前已经探讨过了,所以不再这个地方赘述了

问题4、Lua和事务的问题

Lua 和事务都是要保证原子性问题,如果 key 不在一个节点,那么是无法保证 Lua 的执行和事务的特性的,所以在集群模式是没有办法执行 Lua和事务的

那我们到底是集群还是主从

单体 Redis(主从 Redis)已经能达到万级别的 QPS,并且也具备很强的高可用特性。如果主从能满足业务需求的情况下,所以如果不是在万不得已的情况下,尽量不搭建 Redis 集群


0%