1.缓存机制 数据一致性:先修改数据库再删除redis缓存。先删缓再删数据发生线程安全的问题更大,因为数据库修改数据耗时长,可能删了缓存后在数据库信息修改还没好之前,又读取了该信息并缓存了。
2.缓存穿透 客户端访问数据库中不存在的数据,缓存永远不生效,可能会被恶意攻击数据库导致瘫痪。 方法1.缓存空对象 2.布隆过滤
3.缓存雪崩 指的是同一时间段内,redis中大量的缓存的key同时失效或者redis服务器宕机了,导致有大量请求到达数据库,带来巨大压力 解决方法:1.给TTL添加随机值 2.利用redis集群 3.给缓存业务添加降级限流策略(微服务内容)4.添加多级缓存
4.缓存击穿 指的是某一个被高并发访问并且该缓存重建业务较复杂的key突然失效了,导致大量请求到达数据库,带来巨大冲击。 解决方法:1.添加互斥锁到redis中 2.逻辑过期
实战 通用的redis缓存工具 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 @Component public class CacheClient { @Autowired StringRedisTemplate stringRedisTemplate; private static final ExecutorService CACHE_REBUILD_EXECUTOR = Executors.newFixedThreadPool(10 ); public void set (String key, String value, Long expireTime, TimeUnit unit) { stringRedisTemplate.opsForValue().set(key, value, expireTime, unit); } public void setWithExpire (String key, Object value, Long expireTime, TimeUnit unit) { RedisData redisData = new RedisData (); redisData.setData(value); redisData.setExpireTime(LocalDateTime.now().plusSeconds(unit.toSeconds(expireTime))); stringRedisTemplate.opsForValue().set(key, JSONUtil.toJsonStr(redisData)); } public <ID, R> R queryPassThrough (String keyPrefix, ID id, Class<R> type, Function<ID, R> function, Long expireTime, TimeUnit unit) { String key = keyPrefix + id; String str = stringRedisTemplate.opsForValue().get(key); if (StrUtil.isNotBlank(str)){ R r = JSONUtil.toBean(str, type); return r; } R r = function.apply(id); if (r == null ){ stringRedisTemplate.opsForValue().set(key, "" , expireTime, unit); return null ; } stringRedisTemplate.opsForValue().set(key, JSONUtil.toJsonStr(r), expireTime, unit); return r; } public <ID, R> R queryWithLogicExpire (String keyPrefix, ID id, Class<R> type, Function<ID, R> function, String lockPrefix, Long prolongTime, TimeUnit unit) { String key = CACHE_SHOP_KEY + id; String strJson = stringRedisTemplate.opsForValue().get(key); if (!StrUtil.isNotBlank(strJson)){ return null ; } RedisData redisData = JSONUtil.toBean(strJson, RedisData.class); LocalDateTime expireTime = redisData.getExpireTime(); JSONObject data = (JSONObject) redisData.getData(); if (expireTime.isAfter(LocalDateTime.now())){ return JSONUtil.toBean(data, type); } boolean lock = tryLock(lockPrefix+id); if (lock){ strJson = stringRedisTemplate.opsForValue().get(key); redisData = JSONUtil.toBean(strJson, RedisData.class); expireTime = redisData.getExpireTime(); if (expireTime.isAfter(LocalDateTime.now())){ data = (JSONObject)redisData.getData(); return JSONUtil.toBean(data, type); } CACHE_REBUILD_EXECUTOR.submit(()-> { R r = function.apply(id); setWithExpire(key, r, prolongTime, unit); releaseLock(lockPrefix+id); }); } return JSONUtil.toBean(data, type); } public boolean tryLock (String key) { Boolean b = stringRedisTemplate.opsForValue().setIfAbsent(key, "1" , 10 , TimeUnit.SECONDS); return BooleanUtil.isTrue(b); } public void releaseLock (String key) { stringRedisTemplate.delete(key); } }
上面的通用方法写的太神了,很好的利用到了泛型和消费者机制,把原本写在serviceImpl里的方法直接提取出来了,以shop模块的查询功能为例,结果如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 @Override public Result queryById (Long id) { Shop shop = cacheClient.queryWithLogicExpire(CACHE_SHOP_KEY, id, Shop.class, this ::getById, LOCK_SHOP_KEY, 20L , TimeUnit.SECONDS); if (shop == null ){ return Result.fail("商铺id不存在" ); } return Result.ok(shop); }
PS: 查看方法参数的快捷键是ctrl+P ,我现在才知道啊。。。。。
全局唯一id方法 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 @Component public class RedisIdWorker { @Autowired StringRedisTemplate stringRedisTemplate; public static int COUNT_BIT = 32 ; public static long BEGIN_TIMESTAMP = 1741464196 ; public Long nextId (String keyPrefix) { LocalDateTime now = LocalDateTime.now(); long dateSeconds = now.toEpochSecond(ZoneOffset.UTC); long timestamp = dateSeconds - BEGIN_TIMESTAMP; String date = now.format(DateTimeFormatter.ofPattern("yyyy:MM:dd" )); Long id = stringRedisTemplate.opsForValue().increment("incr:" + keyPrefix + date); return timestamp << COUNT_BIT | id; } }
优惠券秒杀机制 这里很好的用到了锁和事务的内容
1.乐观锁,即每次访问数据库时再次校验数据库内容是否发生变更,在尚庭公寓项目里是利用了version字段来实现,而这里因为是对优惠券的库存操作,每次库存减一对应版本号加一,所以可用让库存充当版本号的作用,即每次更新库存时,都加上之前获取到的库存值判断。
但又因为这样子并发情况又会出问题。假设两个线程一开始获取的库存数都是100,线程一先修改列库存,变成99,此时线程二取修改,发现库存不对,就失败,导致明明有库存,但获取失败的情况。
所以判断条件改为了stock > 0。
2.悲观锁,解决一人一单问题,一人一单用的是查询操作,所以无法使用乐观锁。锁的对象是常量池中的字符串,而不能是每次访问都会new的对象(这样子的话,锁完全没用)
3.事务失效需要交给代理,所以需要获取代理对象来执行。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 @Override public Result seckillVoucher (Long voucherId) { SeckillVoucher voucher = seckillVoucherService.getById(voucherId); if (voucher == null ){ return Result.fail("秒杀券不存在!" ); } if (voucher.getBeginTime().isAfter(LocalDateTime.now())){ return Result.fail("秒杀尚未开始!" ); } if (voucher.getEndTime().isBefore(LocalDateTime.now())){ return Result.fail("秒杀已经结束!" ); } if (voucher.getStock() < 1 ){ return Result.fail("库存不足!" ); } Long userId = UserHolder.getUser().getId(); synchronized (userId.toString().intern()){ IVoucherOrderService proxy = (IVoucherOrderService)AopContext.currentProxy(); return proxy.createVoucherOrder(voucherId, userId); } } @Transactional public synchronized Result createVoucherOrder (Long voucherId, Long userId) { Integer count = this .query().eq("user_id" , userId).eq("voucher_id" , voucherId).count(); if (count > 0 ){ return Result.fail("用户已经购买过一次!" ); } boolean success = seckillVoucherService.update().setSql("stock = stock - 1" ) .eq("voucher_id" , voucherId) .gt("stock" , 0 ).update(); if (!success){ return Result.fail("库存不足!" ); } VoucherOrder order = new VoucherOrder (); long orderId = redisIdWorker.nextId("order:" ); order.setId(orderId); order.setUserId(userId); order.setVoucherId(voucherId); this .save(order); return Result.ok(orderId); }
5.分布式锁 上面的锁只能锁单个服务器,当面临多个集群部署时,没办法锁住全部的服务器,所以需要使用分布式锁来解决。
而redis恰好可以实现分布式锁(当多个集群公用一个redis时)。
下面是自定义分布式锁工具类
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 public class SimpleRedisLock { StringRedisTemplate stringRedisTemplate; private String name; private static String uuid = UUID.randomUUID().toString(); public static String keyPrefix = "lock:" ; public SimpleRedisLock (StringRedisTemplate stringRedisTemplate, String name) { this .stringRedisTemplate = stringRedisTemplate; this .name = name; } public boolean tryLock (Long expireSecond) { long id = Thread.currentThread().getId(); Boolean b = stringRedisTemplate.opsForValue().setIfAbsent(keyPrefix + name, uuid+id, expireSecond, TimeUnit.SECONDS); return BooleanUtil.isTrue(b); } public void unlock () { String val = stringRedisTemplate.opsForValue().get(keyPrefix + name); if (val.equals(uuid + Thread.currentThread().toString())){ stringRedisTemplate.delete(keyPrefix + name); }*/ } }
然而上面代码依然会有线程安全的问题,因为unlock() 方法获取redis锁中的值和释放锁是两步操作,极端情况下,可能获取到锁中的值是当前线程的,但在删除锁时遭遇阻塞,可能期间就已经过期,并且还被其他线程获取到锁了,当该线程最终继续执行删除时,可能就把别的线程的锁给释放了。这就是原子性 问题了。
6.Lua脚本 使用lua语言写脚本可以保证原子性(上面查询redis中的锁和释放锁不是一起进行的,仍然会有线程安全问题,应该保证查询和释放一起进行),Lua本身不具备原子性,而是因为redis执行命令是单线程的,会把lua脚本做为一个命令执行
1.无参脚本
可以在redis中直接写
1 eval "return redis.call('命令/例如get', 'name' ,'ldy')" 0
参数解释
eval: redis中执行lua脚本的命令
return: 返回结果
redis.call(‘命令’, ‘key值’, ‘value值’) 参数个数 (参数…)
2.有参脚本
这里KEYSP和ARGV数组(必须大写)用于接收后面的参数
注意 , 索引是从1开始的
示例:
1 2 3 4 127.0 .0 .1 :6379 > eval "return redis.call('set', KEYS[1], ARGV[1])" 1 name heiOK 127.0 .0 .1 :6379 > get name"hei"
编写释放锁脚本 无参,写死了
1 2 3 4 5 6 7 8 9 10 11 12 local key = "lock:order:5" local threadid = "faefjeapff-33" local id = redis.call('get' , key)if (id == threadid) then redis.call('del' , key) end
传参,通过外部传参给KEYS和ARGV
1 2 3 4 5 6 7 8 9 10 11 12 13 14 local id = redis.call('get' , KEYS[1 ])if (id == ARGV[1 ]) then redis.call('del' , KEYS[1 ]) end if (redis.call('get' , KEYS[1 ]) == ARGV[1 ]) then redis.call('del' , KEYS[1 ]) end
java调用lua脚本 1.下载插件EmmyLua
2.编写lua脚本
3.调用lua脚本
tip , ctrl + h 查看类的继承/实现关系,可以看到,我这里按了ctrl+h后,显示了RedisScript类的继承类,直接使用实现类。
执行lua脚本由RedisTemplate 的execute 方法实现
实战代码
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 public class SimpleRedisLock { StringRedisTemplate stringRedisTemplate; private String name; private static String uuid = UUID.randomUUID().toString(); public static String keyPrefix = "lock:" ; public static final DefaultRedisScript<Long> UNLOCK_SCRIPT; static { UNLOCK_SCRIPT = new DefaultRedisScript <>(); UNLOCK_SCRIPT.setLocation(new ClassPathResource ("unlock.lua" )); } public SimpleRedisLock (StringRedisTemplate stringRedisTemplate, String name) { this .stringRedisTemplate = stringRedisTemplate; this .name = name; } public boolean tryLock (Long expireSecond) { long id = Thread.currentThread().getId(); Boolean b = stringRedisTemplate.opsForValue().setIfAbsent(keyPrefix + name, uuid+id, expireSecond, TimeUnit.SECONDS); return BooleanUtil.isTrue(b); } public void unlock () { stringRedisTemplate.execute(UNLOCK_SCRIPT, Collections.singletonList(keyPrefix + name), uuid + Thread.currentThread().getId() ); } }
7.秒杀优化 之前的秒杀机制大部分操作都直接会用到数据库,在性能方面比较差。
优化思路:
新增秒杀券的同时,将优惠券信息保存到Redi中。
基于Lua脚本,判断秒杀库存、一人一单,决定用户是否抢购成功。
如果抢购成功,将优惠券id和用户id封装后存入阻塞队列。
开启线程任务,不断从阻塞队列中获取数据,实现异步下单。
1.基于redis进行库存和一人一单校验
库存:秒杀优惠券在新增时就将库存加入redis中。
一人一单:使用set类型存储对应优惠券订单的用户。使用sismember 命令判断用户id是否已存在于set集合中。
编写lua脚本
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 local userId = ARGV[1 ]local voucherId = ARGV[2 ]local stockKey = "seckill:stock:" .. voucherIdlocal orderKey = "seckill:order:" .. voucherIdif (tonumber (redis.call('get' , stockKey)) <= 0 ) then return 1 end if (redis.call('sismember' , orderKey, userId) == 1 ) then return 2 end redis.call('incrby' , stockKey, -1 ) redis.call("sadd" , orderKey, userId) return 0
重写seckillVoucher方法
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 public static final DefaultRedisScript<Long> SECKILL_SCRIPT;static { SECKILL_SCRIPT = new DefaultRedisScript <>(); SECKILL_SCRIPT.setLocation(new ClassPathResource ("seckill.lua" )); SECKILL_SCRIPT.setResultType(Long.class); } public Result seckillVoucher (Long voucherId) throws InterruptedException { Long execute = stringRedisTemplate.execute( SECKILL_SCRIPT, Collections.emptyList(), UserHolder.getUser().getId().toString(), voucherId.toString() ); if (execute != 0 ){ return execute == 1 ? Result.fail("库存不足!" ) : Result.fail("不能重复下单" ); } Long orderId = redisIdWorker.nextId(SECKILL_ORDER_KEY); return Result.ok(orderId); }
2.使用消息队列异步创建订单
使用java自带的阻塞队列
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 private static BlockingQueue<VoucherOrder> blockingQueue = new ArrayBlockingQueue <>(1024 );private static final ExecutorService executor = Executors.newSingleThreadExecutor();public static final DefaultRedisScript<Long> SECKILL_SCRIPT;static { SECKILL_SCRIPT = new DefaultRedisScript <>(); SECKILL_SCRIPT.setLocation(new ClassPathResource ("seckill.lua" )); SECKILL_SCRIPT.setResultType(Long.class); } @PostConstruct private void init () { executor.submit(new VoucherHanler ()); } private class VoucherHanler implements Runnable { @Override public void run () { try { VoucherOrder order = blockingQueue.take(); handleVoucherOrder(order); } catch (InterruptedException e) { throw new RuntimeException (e); } } } public void handleVoucherOrder (VoucherOrder order) { Long userId = order.getUserId(); Long voucherId = order.getVoucherId(); RLock lock = redissonClient.getLock("lock:order:" + userId); boolean b = lock.tryLock(); if (!b){ log.error("发送异常,没获取到锁" ); } try { proxy.createVoucherOrder(order); }finally { lock.unlock(); } } @Override @Transactional public void createVoucherOrder (VoucherOrder order) { Long userId = order.getUserId(); Long voucherId = order.getVoucherId(); Integer count = this .query().eq("user_id" , userId).eq("voucher_id" , voucherId).count(); if (count > 0 ){ log.error("用户已经下过单" ); } boolean success = seckillVoucherService.update().setSql("stock = stock - 1" ) .eq("voucher_id" , voucherId) .gt("stock" , 0 ).update(); if (!success){ log.error("库存不足" ); } this .save(order); } private IVoucherOrderService proxy;@Override public Result seckillVoucher (Long voucherId) throws InterruptedException { Long userId = UserHolder.getUser().getId(); Long execute = stringRedisTemplate.execute( SECKILL_SCRIPT, Collections.emptyList(), userId.toString(), voucherId.toString() ); if (execute != 0 ){ return execute == 1 ? Result.fail("库存不足!" ) : Result.fail("不能重复下单" ); } Long orderId = redisIdWorker.nextId(SECKILL_ORDER_KEY); proxy = (IVoucherOrderService) AopContext.currentProxy(); VoucherOrder order = new VoucherOrder (); order.setVoucherId(voucherId); order.setUserId(userId); order.setId(orderId); blockingQueue.add(order); return Result.ok(orderId); }
8.消息队列 Redis实现消息队列
1.基于List实现 2.基于PubSub实现 3.基于stream实现 单消费者
XADD
XREAD
消费者组
实战