1.缓存机制

数据一致性:先修改数据库再删除redis缓存。先删缓再删数据发生线程安全的问题更大,因为数据库修改数据耗时长,可能删了缓存后在数据库信息修改还没好之前,又读取了该信息并缓存了。

2.缓存穿透

客户端访问数据库中不存在的数据,缓存永远不生效,可能会被恶意攻击数据库导致瘫痪。
方法1.缓存空对象 2.布隆过滤

3.缓存雪崩

指的是同一时间段内,redis中大量的缓存的key同时失效或者redis服务器宕机了,导致有大量请求到达数据库,带来巨大压力
解决方法:1.给TTL添加随机值 2.利用redis集群 3.给缓存业务添加降级限流策略(微服务内容)4.添加多级缓存

4.缓存击穿

指的是某一个被高并发访问并且该缓存重建业务较复杂的key突然失效了,导致大量请求到达数据库,带来巨大冲击。
解决方法:1.添加互斥锁到redis中 2.逻辑过期

实战

通用的redis缓存工具
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
/*
* 缓存工具类
* */
@Component
public class CacheClient {
@Autowired
StringRedisTemplate stringRedisTemplate;
private static final ExecutorService CACHE_REBUILD_EXECUTOR = Executors.newFixedThreadPool(10);
//通用写入缓存方法
public void set(String key, String value, Long expireTime, TimeUnit unit){
stringRedisTemplate.opsForValue().set(key, value, expireTime, unit);
}
//通用写入缓存方法 -- 逻辑过期
public void setWithExpire(String key, Object value, Long expireTime, TimeUnit unit){
//创建一个逻辑过期对象通用类
RedisData redisData = new RedisData();
redisData.setData(value);
redisData.setExpireTime(LocalDateTime.now().plusSeconds(unit.toSeconds(expireTime)));
stringRedisTemplate.opsForValue().set(key, JSONUtil.toJsonStr(redisData));
}

//通用读缓存方法 -- 解决缓存穿透
public <ID, R> R queryPassThrough(String keyPrefix, ID id, Class<R> type, Function<ID, R> function, Long expireTime, TimeUnit unit) {
//1.先查redis缓存
String key = keyPrefix + id;
String str = stringRedisTemplate.opsForValue().get(key);
//2.若redis中存在则直接返回
if(StrUtil.isNotBlank(str)){
R r = JSONUtil.toBean(str, type);
return r;
}

//3.若redis中没有,则访问数据库
R r = function.apply(id);
if(r == null){
//3.1数据库中不存在,空值写入redis,防止缓存穿透
stringRedisTemplate.opsForValue().set(key, "", expireTime, unit);
return null;
}
//3.2.若数据库中存在,则存入redis中, 缓存时间为30分钟
stringRedisTemplate.opsForValue().set(key, JSONUtil.toJsonStr(r), expireTime, unit);
//4.返回
return r;
}

//通用获取对象 -- 解决缓存击穿 -- 逻辑过期

public <ID, R> R queryWithLogicExpire(String keyPrefix, ID id, Class<R> type, Function<ID, R> function, String lockPrefix, Long prolongTime, TimeUnit unit) {
//1.先查redis缓存
String key = CACHE_SHOP_KEY + id;
String strJson = stringRedisTemplate.opsForValue().get(key);
//2.若未命中,则直接返回null, 因为逻辑过期场景下,redis中的数据是提前写好的,没有就不需要查数据库了
if(!StrUtil.isNotBlank(strJson)){
return null;
}
//3.若redis命中,则判断是否逻辑过期
RedisData redisData = JSONUtil.toBean(strJson, RedisData.class);
LocalDateTime expireTime = redisData.getExpireTime();
//因为redisObject是一个通用类,所以data类型是Object,这种情况下需要先强转为JSONObject,再使用JSONUtil.toBean()
JSONObject data = (JSONObject) redisData.getData();
//4.判断是否过期
if(expireTime.isAfter(LocalDateTime.now())){
//4.1未过期,直接返回
return JSONUtil.toBean(data, type);
}
//4.2已过期,需缓存重建
//5.缓存重建

//5.1获取互斥锁
boolean lock = tryLock(lockPrefix+id);
//5.2判断是否获取成功
if(lock){
//5.3成功
//5.4二次判断redis当中此时是否已经更新过过期时间了
strJson = stringRedisTemplate.opsForValue().get(key);
redisData = JSONUtil.toBean(strJson, RedisData.class);
expireTime = redisData.getExpireTime();
if(expireTime.isAfter(LocalDateTime.now())){
data = (JSONObject)redisData.getData();
return JSONUtil.toBean(data, type);
}
//5.5开启独立线程,实现缓存重建
CACHE_REBUILD_EXECUTOR.submit(()->
{

//5.6查询数据库
R r = function.apply(id);
setWithExpire(key, r, prolongTime, unit);
//5.7释放锁
releaseLock(lockPrefix+id);
});
}
//5.7返回过期的店铺信息
return JSONUtil.toBean(data, type);

}
//获取互斥锁
public boolean tryLock(String key){
//尝试设置redis内容,若设置成功则表明获取到了锁,若失败,则说明已经有人拿到了这个锁
//设置过期时间是怕获取到锁的线程发生问题时独占锁
Boolean b = stringRedisTemplate.opsForValue().setIfAbsent(key, "1", 10, TimeUnit.SECONDS);
return BooleanUtil.isTrue(b);
}

//释放互斥锁
public void releaseLock(String key){
stringRedisTemplate.delete(key);
}

}

上面的通用方法写的太神了,很好的利用到了泛型和消费者机制,把原本写在serviceImpl里的方法直接提取出来了,以shop模块的查询功能为例,结果如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
@Override
public Result queryById(Long id) {
//解决缓存穿透方案
//Result result = queryByIdByPassThrough();

//解决缓存击穿方案--互斥锁,包含了解决缓存穿透
//Shop shop = queryWithMutex(id);

//解决缓存击穿方案2--逻辑过期
//Shop shop = queryWithLogicExpire(id);
//将以上方法提炼到通用方法,使用通用方法结果如下
Shop shop = cacheClient.queryWithLogicExpire(CACHE_SHOP_KEY, id, Shop.class, this::getById, LOCK_SHOP_KEY, 20L, TimeUnit.SECONDS);
if(shop == null){
return Result.fail("商铺id不存在");
}
return Result.ok(shop);

}

PS: 查看方法参数的快捷键是ctrl+P,我现在才知道啊。。。。。

全局唯一id方法
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
@Component
public class RedisIdWorker {
@Autowired
StringRedisTemplate stringRedisTemplate;
//开始时间戳
public static int COUNT_BIT = 32;
//2025年3月8号做为起始时间点
public static long BEGIN_TIMESTAMP = 1741464196;
public Long nextId(String keyPrefix){
//1.生成时间戳
LocalDateTime now = LocalDateTime.now();
long dateSeconds = now.toEpochSecond(ZoneOffset.UTC);
long timestamp = dateSeconds - BEGIN_TIMESTAMP;
//2.生成序列号
//2.1获取当前时间,精确到天,做为当天自增长key的一部分
String date = now.format(DateTimeFormatter.ofPattern("yyyy:MM:dd"));
//2.2自增长
Long id = stringRedisTemplate.opsForValue().increment("incr:" + keyPrefix + date);
//3.拼接并返回, 时间戳(每秒都不一样)在前32位,自增长id在后32位,这样子可以支持每秒产生最多2^32次个不同的id,当然,这一天最多只能产生2^32次个id
return timestamp << COUNT_BIT | id;
}
}
优惠券秒杀机制

这里很好的用到了锁和事务的内容

1.乐观锁,即每次访问数据库时再次校验数据库内容是否发生变更,在尚庭公寓项目里是利用了version字段来实现,而这里因为是对优惠券的库存操作,每次库存减一对应版本号加一,所以可用让库存充当版本号的作用,即每次更新库存时,都加上之前获取到的库存值判断。

但又因为这样子并发情况又会出问题。假设两个线程一开始获取的库存数都是100,线程一先修改列库存,变成99,此时线程二取修改,发现库存不对,就失败,导致明明有库存,但获取失败的情况。

所以判断条件改为了stock > 0。

2.悲观锁,解决一人一单问题,一人一单用的是查询操作,所以无法使用乐观锁。锁的对象是常量池中的字符串,而不能是每次访问都会new的对象(这样子的话,锁完全没用)

3.事务失效需要交给代理,所以需要获取代理对象来执行。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
@Override
public Result seckillVoucher(Long voucherId) {
//1.查询秒杀券信息
SeckillVoucher voucher = seckillVoucherService.getById(voucherId);
if(voucher == null){
return Result.fail("秒杀券不存在!");
}
//2.判断秒杀券是否开始
if(voucher.getBeginTime().isAfter(LocalDateTime.now())){
return Result.fail("秒杀尚未开始!");
}
//3.判断秒杀券是否结束
if(voucher.getEndTime().isBefore(LocalDateTime.now())){
return Result.fail("秒杀已经结束!");
}
//4.判断库存是否充足
if(voucher.getStock() < 1){
return Result.fail("库存不足!");
}
Long userId = UserHolder.getUser().getId();
//给用户id加锁,intern是指锁的对象是常量池中的字符串
//因为每次toString创建的都是新对象,加锁没有用,所以需要intern()方法
//同时,锁需要在事务提交之后才能释放,保证线程安全
synchronized (userId.toString().intern()){
//因为事务生效实际上是由代理对象执行的,所以this.createVoucherOrder实际上并不会开启线程,所以需要获取事务的原始对象的代理对象
IVoucherOrderService proxy = (IVoucherOrderService)AopContext.currentProxy();
return proxy.createVoucherOrder(voucherId, userId);
}
}
@Transactional
public synchronized Result createVoucherOrder(Long voucherId, Long userId) {
//5.一人一单逻辑
Integer count = this.query().eq("user_id", userId).eq("voucher_id", voucherId).count();
if(count > 0){
return Result.fail("用户已经购买过一次!");
}
//5.扣减库存,需要再次校验库存是否充足,保证线程安全
boolean success = seckillVoucherService.update().setSql("stock = stock - 1")
.eq("voucher_id", voucherId)
.gt("stock", 0).update();
if(!success){
//库存不足
return Result.fail("库存不足!");
}
//6.创建订单
VoucherOrder order = new VoucherOrder();
//6.1获取订单id
long orderId = redisIdWorker.nextId("order:");
order.setId(orderId);
//6.2用户id
order.setUserId(userId);
//6.3代金券id
order.setVoucherId(voucherId);
this.save(order);
return Result.ok(orderId);
}

5.分布式锁

上面的锁只能锁单个服务器,当面临多个集群部署时,没办法锁住全部的服务器,所以需要使用分布式锁来解决。

而redis恰好可以实现分布式锁(当多个集群公用一个redis时)。

下面是自定义分布式锁工具类

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
public class SimpleRedisLock {

StringRedisTemplate stringRedisTemplate;
private String name;
//uuid用于区分进程
private static String uuid = UUID.randomUUID().toString();
public static String keyPrefix = "lock:";

public SimpleRedisLock(StringRedisTemplate stringRedisTemplate, String name){
this.stringRedisTemplate = stringRedisTemplate;
this.name = name;
}


public boolean tryLock(Long expireSecond){
//获取线程id
long id = Thread.currentThread().getId();
Boolean b = stringRedisTemplate.opsForValue().setIfAbsent(keyPrefix + name, uuid+id, expireSecond, TimeUnit.SECONDS);
//防止b是null时拆箱出错
return BooleanUtil.isTrue(b);
}

public void unlock(){
//获取redis锁中的值
String val = stringRedisTemplate.opsForValue().get(keyPrefix + name);
//判断锁中的uuid+线程id是否与当前线程一致 uuid用于标识不同进程, 线程id用于标识同一进程下的不同id.因为不同进程的线程id可能相同
//比较是为了防止当前线程阻塞且锁过期时,第二个线程获取锁后,当前线程又复活了,防止当前线程删除第二个线程的锁。
if(val.equals(uuid + Thread.currentThread().toString())){
stringRedisTemplate.delete(keyPrefix + name);
}*/
}

}

然而上面代码依然会有线程安全的问题,因为unlock()方法获取redis锁中的值和释放锁是两步操作,极端情况下,可能获取到锁中的值是当前线程的,但在删除锁时遭遇阻塞,可能期间就已经过期,并且还被其他线程获取到锁了,当该线程最终继续执行删除时,可能就把别的线程的锁给释放了。这就是原子性问题了。

6.Lua脚本

使用lua语言写脚本可以保证原子性(上面查询redis中的锁和释放锁不是一起进行的,仍然会有线程安全问题,应该保证查询和释放一起进行),Lua本身不具备原子性,而是因为redis执行命令是单线程的,会把lua脚本做为一个命令执行

1.无参脚本

可以在redis中直接写

1
eval "return redis.call('命令/例如get', 'name' ,'ldy')"0

参数解释

eval: redis中执行lua脚本的命令

return: 返回结果

redis.call(‘命令’, ‘key值’, ‘value值’) 参数个数 (参数…)

2.有参脚本

这里KEYSP和ARGV数组(必须大写)用于接收后面的参数

注意, 索引是从1开始的

示例:

1
2
3
4
127.0.0.1:6379> eval "return redis.call('set', KEYS[1], ARGV[1])" 1 name hei
OK
127.0.0.1:6379> get name
"hei"

编写释放锁脚本

无参,写死了

1
2
3
4
5
6
7
8
9
10
11
12
-- 锁的key
local key = "lock:order:5"
-- 当前线程标识
local threadid = "faefjeapff-33"

-- 获取锁中的线程标识
local id = redis.call('get', key)
-- 比较线程标识
if(id == threadid) then
-- 释放锁 del key
redis.call('del', key)
end

传参,通过外部传参给KEYS和ARGV

1
2
3
4
5
6
7
8
9
10
11
12
13
14
-- 获取锁中的线程标识
local id = redis.call('get', KEYS[1])
-- 比较线程标识
if(id == ARGV[1]) then
-- 释放锁 del key
redis.call('del', KEYS[1])
end

-- 更简写的方式
-- 比较线程标识
if(redis.call('get', KEYS[1]) == ARGV[1]) then
-- 释放锁 del key
redis.call('del', KEYS[1])
end

java调用lua脚本

1.下载插件EmmyLua

image-20250312161100974

2.编写lua脚本

image-20250312161135922

image-20250312161229537

3.调用lua脚本

tip, ctrl + h 查看类的继承/实现关系,可以看到,我这里按了ctrl+h后,显示了RedisScript类的继承类,直接使用实现类。

image-20250312161713668

执行lua脚本由RedisTemplateexecute方法实现

实战代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39

public class SimpleRedisLock {
StringRedisTemplate stringRedisTemplate;
private String name;
//uuid用于区分进程
private static String uuid = UUID.randomUUID().toString();
public static String keyPrefix = "lock:";
//这里把脚本当作定值预先加载了,避免每次获取锁都要重新读取脚本内容而浪费时间
public static final DefaultRedisScript<Long> UNLOCK_SCRIPT;
static {
UNLOCK_SCRIPT = new DefaultRedisScript<>();
//new ClassPathResource()即获取resource目录下的文件信息
UNLOCK_SCRIPT.setLocation(new ClassPathResource("unlock.lua"));
}

public SimpleRedisLock(StringRedisTemplate stringRedisTemplate, String name){
this.stringRedisTemplate = stringRedisTemplate;
this.name = name;
}

public boolean tryLock(Long expireSecond){
//获取线程id
long id = Thread.currentThread().getId();
Boolean b = stringRedisTemplate.opsForValue().setIfAbsent(keyPrefix + name, uuid+id, expireSecond, TimeUnit.SECONDS);
//防止b是null时拆箱出错
return BooleanUtil.isTrue(b);
}

public void unlock(){
/*使用lua脚本
Collections用于把一个对象转化为一个集合, 因为这里要求keys是一个集合
*/
stringRedisTemplate.execute(UNLOCK_SCRIPT,
Collections.singletonList(keyPrefix + name),
uuid + Thread.currentThread().getId()
);
}

}

7.秒杀优化

之前的秒杀机制大部分操作都直接会用到数据库,在性能方面比较差。

image-20250313202012520

优化思路:

  1. 新增秒杀券的同时,将优惠券信息保存到Redi中。
  2. 基于Lua脚本,判断秒杀库存、一人一单,决定用户是否抢购成功。
  3. 如果抢购成功,将优惠券id和用户id封装后存入阻塞队列。
  4. 开启线程任务,不断从阻塞队列中获取数据,实现异步下单。

1.基于redis进行库存和一人一单校验

image-20250313202241375

库存:秒杀优惠券在新增时就将库存加入redis中。

一人一单:使用set类型存储对应优惠券订单的用户。使用sismember命令判断用户id是否已存在于set集合中。

编写lua脚本

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
--获取用户id
local userId = ARGV[1]
--获取优惠券id
local voucherId = ARGV[2]
-- 获取库存key
local stockKey = "seckill:stock:" .. voucherId
-- 获取订单key
local orderKey = "seckill:order:" .. voucherId

-- 1.判断库存呢是否充足
if(tonumber(redis.call('get', stockKey)) <= 0) then
-- 库存不足返回1
return 1
end

-- 2.判断是否已经下单
if(redis.call('sismember', orderKey, userId) == 1 ) then
-- 已下单,返回2
return 2
end

-- 3.扣减库存
redis.call('incrby', stockKey, -1)

-- 4.将当前userId存入优惠券set集合
redis.call("sadd", orderKey, userId)

-- 5.返回0
return 0

重写seckillVoucher方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
public static final DefaultRedisScript<Long> SECKILL_SCRIPT;
static {
SECKILL_SCRIPT = new DefaultRedisScript<>();
SECKILL_SCRIPT.setLocation(new ClassPathResource("seckill.lua"));
SECKILL_SCRIPT.setResultType(Long.class);
}

public Result seckillVoucher(Long voucherId) throws InterruptedException {
//1.执行lua脚本
Long execute = stringRedisTemplate.execute(
SECKILL_SCRIPT,
Collections.emptyList(),
UserHolder.getUser().getId().toString(),
voucherId.toString()
);
//2.判断结果是否为0
if(execute != 0){
//2.1不为0返回异常信息
return execute == 1 ? Result.fail("库存不足!") : Result.fail("不能重复下单");
}
//3.创建订单,异步下单
//3.1 获取订单id
Long orderId = redisIdWorker.nextId(SECKILL_ORDER_KEY);
//3.2将用户id, 优惠券id和 订单id存入阻塞队列

//3.3返回订单id
return Result.ok(orderId);
}

2.使用消息队列异步创建订单

使用java自带的阻塞队列

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
// 创建阻塞队列
private static BlockingQueue<VoucherOrder> blockingQueue = new ArrayBlockingQueue<>(1024);
private static final ExecutorService executor = Executors.newSingleThreadExecutor();
public static final DefaultRedisScript<Long> SECKILL_SCRIPT;

static {
SECKILL_SCRIPT = new DefaultRedisScript<>();
SECKILL_SCRIPT.setLocation(new ClassPathResource("seckill.lua"));
SECKILL_SCRIPT.setResultType(Long.class);
}
//在类初始化完成后就开启子线程实时获取阻塞队列中的元素
@PostConstruct
private void init(){
executor.submit(new VoucherHanler());
}
private class VoucherHanler implements Runnable{

@Override
public void run() {
try {
//1.获取订单,阻塞队列没有元素则会一直阻塞等待
VoucherOrder order = blockingQueue.take();
//2.创建订单
handleVoucherOrder(order);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
}


public void handleVoucherOrder(VoucherOrder order) {
//1.获取用户、优惠券id
Long userId = order.getUserId();
Long voucherId = order.getVoucherId();
//2.获取锁对象
RLock lock = redissonClient.getLock("lock:order:" + userId);
//3.获取锁
boolean b = lock.tryLock();
//4.判断锁是否获取成功
if(!b){
//获取锁失败,输出日志
log.error("发送异常,没获取到锁");
}
try{
proxy.createVoucherOrder(order);
}finally {
//释放锁
lock.unlock();
}
}
//代理对象才能开启事务
@Override
@Transactional
public void createVoucherOrder(VoucherOrder order) {
Long userId = order.getUserId();
Long voucherId = order.getVoucherId();
//5.一人一单逻辑
Integer count = this.query().eq("user_id", userId).eq("voucher_id", voucherId).count();
if(count > 0){
log.error("用户已经下过单");
}
//5.扣减库存,需要再次校验库存是否充足,保证线程安全
boolean success = seckillVoucherService.update().setSql("stock = stock - 1")
.eq("voucher_id", voucherId)
.gt("stock", 0).update();
if(!success){
//库存不足
log.error("库存不足");
}
this.save(order);
}
//子线程不能创建代理对象,所以该由主线程创建并赋值给全局代理对象供子线程使用
private IVoucherOrderService proxy;
@Override
public Result seckillVoucher(Long voucherId) throws InterruptedException {

//1.执行lua脚本
Long userId = UserHolder.getUser().getId();
Long execute = stringRedisTemplate.execute(
SECKILL_SCRIPT,
Collections.emptyList(),
userId.toString(),
voucherId.toString()
);
//2.判断结果是否为0
if(execute != 0){
//2.1不为0返回异常信息
return execute == 1 ? Result.fail("库存不足!") : Result.fail("不能重复下单");
}
//3.创建订单,异步下单
//3.1 获取订单id
Long orderId = redisIdWorker.nextId(SECKILL_ORDER_KEY);
//3.2将用户id, 优惠券id和 订单id存入阻塞队列
proxy = (IVoucherOrderService) AopContext.currentProxy();
VoucherOrder order = new VoucherOrder();
order.setVoucherId(voucherId);
order.setUserId(userId);
order.setId(orderId);
blockingQueue.add(order);
//3.3返回订单id
return Result.ok(orderId);
}

8.消息队列

Redis实现消息队列

image-20250314175932374

1.基于List实现

2.基于PubSub实现

3.基于stream实现

单消费者

XADD

XREAD

消费者组

实战