一、项目介绍

八股介绍:之后再补吧。。。。。词穷了

这个项目主要是基于redis相关内容进行展开的,主要实现了用户登录、抢票、点赞、评论、关注等类似社交软件的功能。

二、短信登录

1.简介

短信登录逻辑是用户填入手机号和短信验证码来校验用户信息。此外,该登录功能还包含了注册,即未注册过的手机号进行登录时,将会默认注册为一个新用户。

2.验证码的获取

验证码缓存

用户发起获取短信验证码请求时,后端会先随机生成一个6位数验证码,以login:phone:填入的手机号为key,将验证码存入redis中。随后调用短信相关的api,给对应的手机号发送验证码。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
@Override
public Result sendCode(String phone, HttpSession session) {
//1.校验手机号
if (RegexUtils.isPhoneInvalid(phone)){
return Result.fail("非法手机号");
}
//2.生成验证码
String code = RandomUtil.randomNumbers(6);

//3.将验证码存入redis,以手机号码为key, 过期时间600秒
redisTemplate.opsForValue().set(RedisConstant.LOGIN_PHONE_KEY + phone, code, 600, TimeUnit.SECONDS);

//4.返回
log.debug("发送验证码成功{}", code);
return Result.ok();

}

3.登录校验

用户信息缓存

用户校验成功后,后端会利用UUID随机生成一个token值,并以login:user:token值为key,保存用户的相关消息到redis中。前端接收返回的token值并带入请求头中,以便访问其他页面时校验用户身份。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
@Override
public Result login(LoginFormDTO loginForm, HttpSession session) {
//1.根据手机号查询redis
String phone = loginForm.getPhone();
String code = redisTemplate.opsForValue().get(RedisConstant.LOGIN_PHONE_KEY + phone);
//2.校验验证码
if(code == null || !code.equals(loginForm.getCode())){
return Result.fail("验证码错误");
}
//3.判断用户是否存在
User user = query().eq("phone", phone).one();
if(user == null){
user = createUserWithPhone(phone);
}
//4.保存信息到redis
//4.1随机生成token,做为访问redis的key,同时返回前端存储该token
String token = UUID.randomUUID().toString();
String tokenKey = RedisConstant.LOGIN_USER_KEY + token;
//4.2将UserDTO对象转换为Map存储

UserDTO userDTO = new UserDTO();
BeanUtils.copyProperties(user, userDTO);
Map<String, Object> userMap = BeanUtil.beanToMap(userDTO, new HashMap<>(),
CopyOptions.create()
.setIgnoreNullValue(true)
.setFieldValueEditor((fieldName, fieldValue) -> fieldValue.toString())
);

redisTemplate.opsForHash().putAll(tokenKey, userMap);
//设置过期时间
redisTemplate.expire(tokenKey, RedisConstant.LOGIN_USER_TTL, TimeUnit.MINUTES);

//5.返回
return Result.ok(token);
}

4.登录拦截

拦截器配置类如下所示:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
@Configuration
public class WebMvcConfig implements WebMvcConfigurer {
@Autowired
StringRedisTemplate stringRedisTemplate;
@Override
public void addInterceptors(InterceptorRegistry registry) {
//登录拦截器
registry.addInterceptor(new LoginInterceptor()).excludePathPatterns(
"/shop/**",
"/voucher/**",
"/shop-type/**",
"/upload/**",
"/blog/hot",
"/user/code",
"/user/login"
).order(1);
//token刷新拦截器
registry.addInterceptor(new RefreshTokenInterceptor(stringRedisTemplate)).addPathPatterns(
"/**"
).order(0);
}
}

(1)token刷新拦截器

该拦截器目的是刷新用户token的有效期,并不具有拦截功能,真正实现拦截功能的是登录拦截器。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
@Override
public boolean preHandle(HttpServletRequest request, HttpServletResponse response, Object handler) throws Exception {
//1.从请求头的authorization中获取token
String token = request.getHeader("authorization");
//2.根据key 获取用户信息
String key = LOGIN_USER_KEY + token;
Map<Object, Object> entries = stringRedisTemplate.opsForHash().entries(key);
if(entries.isEmpty()){
return true;
}
//3.若用户信息存在,则添加到threadlocal
UserDTO userDTO = BeanUtil.fillBeanWithMap(entries, new UserDTO(), false);
UserHolder.saveUser(userDTO);
//4.刷新用户token
stringRedisTemplate.expire(key, LOGIN_USER_TTL, TimeUnit.MINUTES);
return true;
}

(2)登录拦截器

1
2
3
4
5
6
7
8
9
10
public class LoginInterceptor implements HandlerInterceptor {
@Override
public boolean preHandle(HttpServletRequest request, HttpServletResponse response, Object handler) throws Exception {
UserDTO user = UserHolder.getUser();
if(user == null){
return false;
}
return true;
}
}

提问:

为什么需要有两个拦截器

**答:**因为不是每个页面都会拦截,例如首页,非登录用户也可以进行访问。若token刷新功能设置在登录拦截器中,那么用户访问非登录拦截页面时,token也就无法刷新,所以只有一个拦截器的话,token刷新是存在问题的。

解决方法:设置一个全放行的全局拦截器,并把登录拦截器的功能移植到这个全局拦截器中,若存在token,则会刷新token,并将用户信息存入ThreadLocal中。在访问登录拦截器所拦截的页面时,登录拦截器只需要判断ThreadLocal中是否存在用户信息即可。这种情况下,token刷新拦截器的优先级高于登录拦截器。

5.功能改进

(1)验证码发送功能

待扩展,基于阿里云的短信api。

(2)jwt登录校验

不使用redis缓存,而使用jwt进行用户信息校验。

三、商户查询缓存

1.缓存穿透

缓存穿透指的是客户端请求在缓存和数据库中都不存在的数据,导致请求直接打到数据库。

解决方法

(1)缓存空对象

  • 优点:实现简单,维护方便
  • 缺点:
    • 额外的内存消耗
    • 可能造成短期的不一致(空对象存在时间内,新增了该id对应的商户信息)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
//通用读缓存方法 -- 解决缓存穿透
public <ID, R> R queryPassThrough(String keyPrefix, ID id, Class<R> type, Function<ID, R> function, Long expireTime, TimeUnit unit) {

//1.先查redis缓存
String key = keyPrefix + id;
String str = stringRedisTemplate.opsForValue().get(key);
//2.若redis中存在则直接返回
if(StrUtil.isNotBlank(str)){
R r = JSONUtil.toBean(str, type);
return r;
}

//3.若redis中没有,则访问数据库
R r = function.apply(id);
if(r == null){
//3.1数据库中不存在,空值写入redis,防止缓存穿透
stringRedisTemplate.opsForValue().set(key, "", expireTime, unit);
return null;
}
//3.2.若数据库中存在,则存入redis中, 缓存时间为30分钟
stringRedisTemplate.opsForValue().set(key, JSONUtil.toJsonStr(r), expireTime, unit);
//4.返回
return r;
}

知识点详解

1.方法泛型

方法泛型的语法格式如下:

1
2
3
public <T> ReturnType methodName(T param1, T param2, ...) {
// 方法体
}

其中,泛型T类型根据方法参数而定。

1
2
3
public <ID, R> R queryPassThrough((String keyPrefix, ID id, Class<R> type, Function<ID, R> function, Long expireTime, TimeUnit unit){
//方法体
}
  • ID类型:在调用queryPassThrough方法时,第二个参数的类型决定了ID的具体类型。
  • R类型:在调用queryPassThrough方法时,第三个参数Class<R>的类型决定了R的具体类型。

上述方法中使用了泛型<ID, R>,其中ID表示传入的ID类型,R表示返回结果的类型。这使得方法可以处理不同类型的ID和返回值,增强了代码的通用性。

简单示例:将从Redis获取对象并进行类型转换的代码进行了封装。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
public <R> R getBean(String key, Class<R> type){
String s = stringRedisTemplate.opsForValue().get(key);
if(StrUtil.isEmpty(s)){
return null;
}
return JSONUtil.toBean(s, type);
}

//使用封装方法
Shop shop = Cache.getBean(key, Shop.class);
if(shop == null){
return Result.fail();
}
//不使用封装方法
String s = stringRedisTemplate.opsForValue().get(key);
if(StrUtil.isEmpty(s)){
return Result.fail();
}
Shop shop = JSONUtil.toBean(s, Shop.class);

2.函数式接口

Function<ID, R> 是一个函数式接口,定义了一个方法 apply

1
R apply(ID id);

这个方法接收一个类型为 ID 的参数,并返回一个类型为 R 的结果。

1
Shop shop = cacheClient.queryPassThrough(CACHE_SHOP_KEY, id, Shop.class, this::getById, 1200L, TimeUnit.SECONDS);

上述代码的this::getById是一个方法引用,用作apply方法的实现。

3.方法引用

方法引用是 Java 8 引入的一种特性,它允许你直接引用已有的方法或构造函数,而无需显式地定义一个 Lambda 表达式。方法引用可以看作是 Lambda 表达式的语法糖,它使得代码更加简洁、易读,并且能够直接利用已有的方法逻辑。

方法引用与 Lambda 表达式的区别

  • Lambda 表达式:是一种匿名函数,可以定义新的逻辑。

    1
    Function<Integer, Integer> absFunction = x -> Math.abs(x);
  • 方法引用:直接引用已有的方法,避免重复定义逻辑。

    1
    Function<Integer, Integer> absFunction = Math::abs;

注意:

Function<S, T> 这种函数式接口可以使用方法引用,但方法引用必须满足以下条件:

  • 方法的参数类型必须与 S 匹配。
  • 方法的返回类型必须与 T 匹配。

只有当方法的签名的参数与 Function<S, T>apply 方法的签名参数一致时,才能使用方法引用。

方法的签名(Method Signature)指的是方法的名称和参数列表。

(2)布隆过滤

  • 优点:内存占用较少,没有多余key
  • 缺点:
    • 实现复杂
    • 存在误判可能

2.缓存雪崩

缓存雪崩指的是在同一时间段内大量的缓存key同时失效或者redis服务突然宕机,导致大量请求直接打到数据库,带来巨大压力。

解决方案:

  • 给不同的key添加随机的TTL
  • 利用Redis集群提高服务的可用性
  • 给缓存业务添加降级限流策略(限流:缓存未命中时,数据库的访问量也不会超过业务限流量,降级:缓存未命中时,返回默认值)
  • 给业务添加多级缓存

解决方案实现待补充

3.缓存击穿

缓存击穿指的是一个被高并发访问并且缓存重建业务较耗时的key突然失效了,导致大量请求直接打到数据库上。

1653328022622

逻辑描述:假设线程1查询缓存未命中的情况下,本该是查询数据库并重建缓存数据就好了,之后的线程就都可以命中缓存了。但在高并发情况下,可能会有多个线程2、3、4,虽然在线程1之后查询缓存,但他们在线程1重建缓存之前就已经查询完缓存了,此时缓存是未命中的,导致这些线程都会执行和线程1一样的逻辑,去查询数据库,导致数据库访问压力过大。

常见的解决方案有两种:

  • 互斥锁
  • 逻辑过期

(1)互斥锁

因为锁具有互斥性,所以我们可以设置一个互斥锁用于访问相应的数据库资源,只有拿到了锁的线程可以对数据库进行访问。这样可以使对同一资源的访问从并行变成了串行,从而减轻数据库压力。但这种方式的问题就是,其他没拿到锁的线程如何处理,是继续等待呢,还是直接返回呢?

继续等待的情况下,其他线程会等待锁的释放,当拿到了锁后,需要先访问缓存,判断拿到该锁之前,是否已经有线程访问过数据库并重建了缓存。如果有,则不需要再次访问数据库了,直接从缓存中拿即可。

1653328288627

提问

那么问题来了,既然选择了互斥锁方案,那么这个互斥锁是谁提供的呢?

那当然是redis来提供了!!!

因为redis是单线程的,所以不必担心锁的获取会有线程安全问题。给每个商户信息设置一个对应的互斥锁key即可。只有成功设置key-value的线程才算拿到了互斥锁。

获取互斥锁

1
2
3
4
5
6
7
//获取互斥锁
public boolean tryLock(String key){
//尝试设置redis内容,若设置成功则表明获取到了锁,若失败,则说明已经有人拿到了这个锁
//设置过期时间是怕获取到锁的线程发生问题时独占锁
Boolean b = stringRedisTemplate.opsForValue().setIfAbsent(key, "1", 10, TimeUnit.SECONDS);
return BooleanUtil.isTrue(b);
}

释放互斥锁

1
2
3
4
//释放互斥锁
public void releaseLock(String key){
stringRedisTemplate.delete(key);
}

注意,互斥锁也应当设置过期时间,以防某线程拿到锁却挂了,导致锁没释放成功的情况。当然了,不能排除这个线程后面又复活的情况,所以又引申出一个新的问题,原线程继续执行下去,但此时锁已经到期释放且被另一个线程获取了。原线程就有可能会把另一个线程获取到的锁给提前释放!!!这个问题到后面会解决,这里先埋个关子,我也是才意识到原来这里就已经出现这个问题了。

实现

商户信息访问的互斥锁解决方案如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
//缓存击穿--互斥锁方法
public Shop queryWithMutex(Long id){
//1.1先查redis缓存
String key = CACHE_SHOP_KEY + id;
String str = stringRedisTemplate.opsForValue().get(key);
Shop shop = null;
//1.2判断是否str非空(非null和非空字符串)
if(StrUtil.isNotBlank(str)){
shop = JSONUtil.toBean(str, Shop.class);
return shop;
}
//1.3若str是空字符串,则说明这个缓存是用于缓存穿透的,直接返回null即可
if(str != null){
return null;
}

try{
//2.若redis中没有,则访问数据库
//3.获取互斥锁
boolean b = tryLock(LOCK_SHOP_KEY + id);
if(!b){
//休眠50ms,递归获取数据,直至命中缓存或者得到锁
Thread.sleep(50);
return queryWithMutex(id);
}
//4拿到互斥锁时需要二次判断缓存中是否已经添加
// 因为可能在拿到锁时,上一个拿到锁的已经将数据存到redis中了
String shopJson = stringRedisTemplate.opsForValue().get(key);
if(StrUtil.isNotBlank(shopJson)){
//4.1若此时缓存中已经存在了,则直接返回,不需要再查数据库了
return JSONUtil.toBean(shopJson, Shop.class);
}
//4.2 若缓存中仍然不存在,则查询数据库
shop = this.getById(id);
if(shop == null){
//空值写入redis,防止缓存穿透,注意要设置过期时间
stringRedisTemplate.opsForValue().set(key, "", 10, TimeUnit.MINUTES);
return null;
}
stringRedisTemplate.opsForValue().set(key, JSONUtil.toJsonStr(shop), CACHE_SHOP_TTL, TimeUnit.MINUTES);
//5.释放互斥锁
releaseLock(LOCK_SHOP_KEY + id);
}catch (Exception e){
throw new RuntimeException(e);
}finally {
//6.返回
return shop;
}
}

注意点:获取锁之后的缓存二次判断

(2)逻辑过期

我们之所以会出现缓存击穿这个问题的是因为缓存中的key过期失效了,才会导致请求打到数据库上。那么如果我们给缓存设置一个逻辑过期时间在缓存的数据中,缓存本身并不设置过期时间,那么当其他线程访问该数据时就一定会命中缓存。

我们所需要做的,就是在线程获取到缓存时加上个逻辑过期的判断语句即可。若过期了,就试着获取互斥锁,若获取互斥锁成功,则开启一个独立线程去重建缓存。但是,无论是否获取到互斥锁,当前线程都则无需等待,直接返回旧数据

这个方案优点是异步构建缓存,响应速度快。缺点就是会造成脏读

此外,对于逻辑过期也要获取互斥锁这个现象,让我感觉逻辑过期本质上就是对互斥锁方案本身的改进吧。

1653328663897

实现

逻辑过期通用类

1
2
3
4
5
6
7
8
9
/*
* 逻辑过期策略的通用类 -- 解决缓存击穿
* 对象存入data,逻辑过期时间存入expireTime
* */
@Data
public class RedisData {
private LocalDateTime expireTime;
private Object data;
}

逻辑过期主方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
/*缓存击穿--逻辑过期主方法*/
public Shop queryWithLogicExpire(Long id) {
//1.先查redis缓存
String key = CACHE_SHOP_KEY + id;
String shopJson = stringRedisTemplate.opsForValue().get(key);
//2.若未命中,则直接返回null, 因为逻辑过期场景下,redis中的数据是提前写好的,没有就不需要查数据库了
if(!StrUtil.isNotBlank(shopJson)){
return null;
}
//3.若redis命中,则判断是否逻辑过期
RedisData redisData = JSONUtil.toBean(shopJson, RedisData.class);
LocalDateTime expireTime = redisData.getExpireTime();
//因为redisObject是一个通用类,所以data类型是Object,这种情况下需要先强转为JSONObject,再使用JSONUtil.toBean()
JSONObject data = (JSONObject) redisData.getData();
//4.判断是否过期
if(expireTime.isAfter(LocalDateTime.now())){
//4.1未过期,直接返回店铺信息
return JSONUtil.toBean(data, Shop.class);
}
//4.2已过期,需缓存重建

//5.缓存重建

//5.1获取互斥锁
boolean lock = tryLock(LOCK_SHOP_KEY+id);
//5.2判断是否获取成功
if(lock){
//5.3成功
//5.4二次判断redis当中此时是否已经更新过过期时间了
shopJson = stringRedisTemplate.opsForValue().get(key);
redisData = JSONUtil.toBean(shopJson, RedisData.class);
expireTime = JSONUtil.toBean(shopJson, RedisData.class).getExpireTime();
if(expireTime.isAfter(LocalDateTime.now())){
data = (JSONObject)redisData.getData();
return JSONUtil.toBean(data, Shop.class);
}
//5.5开启独立线程,实现缓存重建
CACHE_REBUILD_EXECUTOR.submit(()->
{
//5.6若未更新,则重建缓存
saveShop2Redis(id, 20L);
//5.7释放锁
releaseLock(LOCK_SHOP_KEY+id);
});
}
//5.7返回过期的店铺信息
return JSONUtil.toBean(data, Shop.class);

}

注意点:释放锁需要交给异步线程来释放。

异步重建缓存

1
2
3
4
5
6
7
8
9
10
11
12
13
//线程池,供逻辑过期方法使用
private static final ExecutorService CACHE_REBUILD_EXECUTOR = Executors.newFixedThreadPool(10);
/*逻辑过期信息存入redis方法*/
public void saveShop2Redis(Long id, long expireseconds){
//1.查询商品数据
Shop shop = getById(id);
//2.存入redisData
RedisData redisData = new RedisData();
redisData.setData(shop);
redisData.setExpireTime(LocalDateTime.now().plusSeconds(expireseconds));
//3.写入redis
stringRedisTemplate.opsForValue().set(CACHE_SHOP_KEY + id, JSONUtil.toJsonStr(redisData));
}

提问

1.为什么这里会把redisData.getData()强制类型转换为JSONObject,再利用JSONUtil把类型转化为Shop,而不是直接强制类型转换为Shop?

答:因为JSONObject可扩展性更强,功能强大,虽然代码中并未体现,但它可以使用get方法获取属性中的值,也可以使用getString判断是否存在某属性,这对类型不确定的情况下还是很有帮助的。具体参考Java中的JSONObject详解_java jsonobject-CSDN博客

2.开启独立线程是怎么实现的?

答:利用的是Executors获取线程池的方式开启独立线程。目前这个线程池是静态变量,在整个应用生命周期内保持存在(注意:静态变量的生命周期与类的加载和卸载相关,而不是由 Spring 容器管理),避免了在任务执行过程中动态创建线程。但由于这个线程池并非spring容器管理,所以无法动态调整线程池大小。改进方法就是可以设置一个线程池Bean交给Spring容器管理,可以在配置文件中设置线程的最大线程数量等参数配置。

方案对比

互斥锁方案:由于互斥性,所以数据一致性可以保证,且只需要在原代码上加把锁,没有复杂的逻辑,实现简单。问题就是可能会陷入死锁(当前代码只有一把锁,所以没有体现),而且数据访问是串行执行的,线程需要等待,性能肯定有影响。

逻辑过期方案:有一个额外线程去重建缓存,线程不需要等待,性能好。但在重构缓存之前,其他线程都只能返回旧数据。此外,实现较为复杂、且需要占用额外的内存来存放逻辑过期时间。

1653357522914

RedisTemplate工具类

基于上面的缓存穿透和缓存击穿实现方法,我们可以封装成一个通用的工具类。

这里我们把RedisTemplate工具类的相关方法写入CacheClient类中。

1.通用写缓存方法

1
2
3
public void set(String key, String value, Long expireTime, TimeUnit unit){
stringRedisTemplate.opsForValue().set(key, value, expireTime, unit);
}

2.通用读缓存方法–解决缓存穿透

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
//通用读缓存方法 -- 解决缓存穿透
public <ID, R> R queryPassThrough(String keyPrefix, ID id, Class<R> type, Function<ID, R> function, Long expireTime, TimeUnit unit) {

//1.先查redis缓存
String key = keyPrefix + id;
String str = stringRedisTemplate.opsForValue().get(key);
//2.若redis中存在则直接返回
if(StrUtil.isNotBlank(str)){
R r = JSONUtil.toBean(str, type);
return r;
}

//3.若redis中没有,则访问数据库
R r = function.apply(id);
if(r == null){
//3.1数据库中不存在,空值写入redis,防止缓存穿透
stringRedisTemplate.opsForValue().set(key, "", expireTime, unit);
return null;
}
//3.2.若数据库中存在,则存入redis中, 缓存时间为30分钟
stringRedisTemplate.opsForValue().set(key, JSONUtil.toJsonStr(r), expireTime, unit);
//4.返回
return r;
}

3.通用获取对象–解决缓存击穿–基于逻辑过期

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
//通用获取对象 -- 解决缓存击穿 -- 逻辑过期
public <ID, R> R queryWithLogicExpire(String keyPrefix, ID id, Class<R> type, Function<ID, R> function, String lockPrefix, Long prolongTime, TimeUnit unit) {
//1.先查redis缓存
String key = CACHE_SHOP_KEY + id;
String strJson = stringRedisTemplate.opsForValue().get(key);
//2.若未命中,则直接返回null, 因为逻辑过期场景下,redis中的数据是提前写好的,没有就不需要查数据库了
if(!StrUtil.isNotBlank(strJson)){
return null;
}
//3.若redis命中,则判断是否逻辑过期
RedisData redisData = JSONUtil.toBean(strJson, RedisData.class);
LocalDateTime expireTime = redisData.getExpireTime();
//因为redisObject是一个通用类,所以data类型是Object,这种情况下需要先强转为JSONObject,再使用JSONUtil.toBean()
JSONObject data = (JSONObject) redisData.getData();
//4.判断是否过期
if(expireTime.isAfter(LocalDateTime.now())){
//4.1未过期,直接返回
return JSONUtil.toBean(data, type);
}
//4.2已过期,需缓存重建
//5.缓存重建

//5.1获取互斥锁
boolean lock = tryLock(lockPrefix+id);
//5.2判断是否获取成功
if(lock){
//5.3成功
//5.4二次判断redis当中此时是否已经更新过过期时间了
strJson = stringRedisTemplate.opsForValue().get(key);
redisData = JSONUtil.toBean(strJson, RedisData.class);
expireTime = redisData.getExpireTime();
if(expireTime.isAfter(LocalDateTime.now())){
data = (JSONObject)redisData.getData();
return JSONUtil.toBean(data, type);
}
//5.5开启独立线程,实现缓存重建
CACHE_REBUILD_EXECUTOR.submit(()->
{
//5.6查询数据库
R r = function.apply(id);
setWithExpire(key, r, prolongTime, unit);
//5.7释放锁
releaseLock(lockPrefix+id);
});
}
//5.7返回过期的店铺信息
return JSONUtil.toBean(data, type);
}

通用写入缓存方法 – 逻辑过期

1
2
3
4
5
6
7
8
//通用写入缓存方法 -- 逻辑过期
public void setWithExpire(String key, Object value, Long expireTime, TimeUnit unit){
//创建一个逻辑过期对象通用类
RedisData redisData = new RedisData();
redisData.setData(value);
redisData.setExpireTime(LocalDateTime.now().plusSeconds(unit.toSeconds(expireTime)));
stringRedisTemplate.opsForValue().set(key, JSONUtil.toJsonStr(redisData));
}

四、优惠券秒杀

这个优惠券秒杀功能主要核心点:

1.全局唯一id的生成

2.乐观锁 – 一人一单

3.悲观锁 –

4.分布式锁

5.lua脚本

6.Redission

7.异步秒杀优化

8.基于redis实现的消息队列

8.Rabbitmq实现的消息队列

1.全局唯一id

当用户抢购时,就会生成订单并保存到订单表中,如果我们使用数据库自增id的话,就会存在id的规律性太明显的问题。

场景1:如果我们的id具有太明显的规则,那么用户或者说商业竞争者就很容易猜测出我们的一些敏感信息,比如一天总共卖出了多少单,这很明显不合适。

场景2:mysql的单表容量不宜超过500w,当我们数据量过大时,我们需要进行拆库拆表,但拆库拆表后,我们就需要保证id的唯一性。

全局ID生成器,是一种在分布式系统下用来生成全局唯一ID的工具,一般要满足下列特性:

1653363100502

同时,为了保证id的安全性,所以我们不能全靠redis的id自增来生成ID,可以用拼接其他信息。例如时间戳。

1653363172079

ID的组成部分

Long类型:占用8个字节,是64位的整数类型。

id组成:

  • 符号位:1位,永远为0
  • 时间戳:31位,以秒为单位,从起始点算起可以用69年
  • 序列号:32位,秒内计数器,支持每秒最多2^32个id

时间戳占用Long类型的第2为到第32位:先设置一个起始时间戳,代表起始点,然后我们可以用当前的时间戳-起始时间戳得到的数来填充Long类型的32位。经计算,2^32可以供我们用69年。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
@Component
public class RedisIdWorker {
@Autowired
StringRedisTemplate stringRedisTemplate;
//开始时间戳
public static int COUNT_BIT = 32;
//2025年3月8号做为起始时间点
public static long BEGIN_TIMESTAMP = 1741464196;
public Long nextId(String keyPrefix){
//1.生成时间戳
LocalDateTime now = LocalDateTime.now();
long dateSeconds = now.toEpochSecond(ZoneOffset.UTC);
long timestamp = dateSeconds - BEGIN_TIMESTAMP;
//2.生成序列号
//2.1获取当前时间,精确到天,做为当天自增长key的一部分
String date = now.format(DateTimeFormatter.ofPattern("yyyy:MM:dd"));
//2.2自增长
Long id = stringRedisTemplate.opsForValue().increment("incr:" + keyPrefix + date);
//3.拼接并返回, 时间戳(每秒都不一样)在前32位,自增长id在后32位,这样子可以支持每秒产生最多2^32次个不同的id
//当然,因为没有做redis的id超限判断,所以这一天最多只能产生2^32次个id,我们也可以自己加上超额判断
/*
if(id.equals(4294967295L)){
stringRedisTemplate.opsForValue().set("incr:"+keyPrefix+date, "0");
}
*/
return timestamp << COUNT_BIT | id;
}
}

注意:这里的 timestamp << COUNT_BIT | id,这里代表的是将时间戳左移32位并低32位由id填充,因为|运算符用来或运算,即有1填1,全0则0。当id < 2^32时,id只会在低32位存在1,不影响时间戳的加入。

2.乐观锁

乐观锁和悲观锁是用于解决超卖问题的

1653368562591

先讲乐观锁:

乐观锁的基本思路就是给数据增加一个版本号字段,每次对数据进行修改时,会先获取原数据的版本号,然后在修改时,带上这个版本号判断当前版本号是否和之前获取的版本号一致,若一致的话,则对数据进行修改,并对版本号加1。若不一致,则说明在对数据修改之前,已经被其他线程改动过了,则放弃修改。

乐观锁的核心代码实现

1
2
3
4
5
6
7
8
9
10
11
12
//6.试着获取秒杀券
boolean b = seckillVoucherService.update().setSql("stock = stock - 1").gt("stock", 0).update();
//7.若true,则表示扣减库存成功
if(b){
VoucherOrder voucherOrder = new VoucherOrder();
voucherOrder.setUserId(userId);
voucherOrder.setVoucherId(voucherId);
this.save(voucherOrder);
return Result.ok("抢票成功,订单号:"+voucherOrder.getId());
}else{
return Result.fail("抱歉,优惠券已抢光");
}

这里我们并没有设置版本号来对数据一致性进行判断,而是用更简单的方式(判断库存是否大于0),因为我们是秒杀抢票功能,如果我们设置版本号或者直接库存来保证数据的一致性,那么会导致在有充足票的情况下,要是多个人同时抢票得到的初始版本号或者库存相同,那么肯定只有一个人能抢票成功。这是不允许的。

3.悲观锁

问题引入:

假设一个场景,未抢票的用户几乎同时发起了2次抢票请求,在乐观锁的代码为准,这可能会导致,一个用户同时抢到2张票的情况。为什么呢?因为,即使原代码有判断一人一单的操作,但没有确保判断一人一单操作和扣减库存操作之间的原子性,这就会导致,多个线程都执行完判断一人一单的操作后第一个线程才开始扣减库存,导致一个人可能抢到多张票。究其原因,还是锁的粒度太小导致的。乐观锁只在最后修改数据时起判断作用。

悲观锁可以实现对于数据的串行化执行,比如syn,和lock都是悲观锁的代表,同时,悲观锁中又可以再细分为公平锁,非公平锁,可重入锁,等等。

尽管对于多个不同用户发起抢票的情况下,悲观锁不如乐观锁,试想下,如果给操作数据加个锁,那么即便在库存充足的情况下,多个用户发起的抢票也只能串行执行,严重影响效率。

但悲观锁可以解决乐观锁无法解决的同一个用户的多次频繁请求,即一人一单问题。因为悲观锁是对同一个用户的多次请求加锁。所以,这个业务中,悲观锁的实现是在乐观锁的基础上进行的!!!两者没有优劣之分,都很重要。

核心代码

1
2
3
4
5
6
7
//给用户id加锁,intern是指锁的对象是常量池中的字符串
//因为每次toString创建的都是新对象,加锁没有用,所以需要intern()方法
//同时,锁需要在事务提交之后才能释放,保证线程安全
synchronized(userId.toString().intern()){
IVoucherOrderService proxy = (IVoucherOrderService)AopContext.currentProxy();
return proxy.createVoucherOrder(voucherId, userId);
}

为了锁的正常释放和保证事务可以回滚,需要单独提取个方法,并给方法加上@Transactional注解

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
@Transactional
public synchronized Result createVoucherOrder(Long voucherId, Long userId) {
//5.一人一单逻辑
Integer count = this.query().eq("user_id", userId).eq("voucher_id", voucherId).count();
if(count > 0){
return Result.fail("用户已经购买过一次!");
}
//5.扣减库存,需要再次校验库存是否充足,保证线程安全
boolean success = seckillVoucherService.update().setSql("stock = stock - 1")
.eq("voucher_id", voucherId)
.gt("stock", 0).update();
if(!success){
//库存不足
return Result.fail("库存不足!");
}
//6.创建订单
VoucherOrder order = new VoucherOrder();
//6.1获取订单id
long orderId = redisIdWorker.nextId("order:");
order.setId(orderId);
//6.2用户id
order.setUserId(userId);
//6.3代金券id
order.setVoucherId(voucherId);
this.save(order);
return Result.ok(orderId);
}

知识点

1.synchronized

synchronized 块是 阻塞获取锁 的。当一个线程尝试进入一个 synchronized 块时,如果锁已经被其他线程持有,该线程会被阻塞(即进入等待状态),直到锁被释放。一旦锁被释放,等待的线程会竞争锁,获得锁的线程可以继续执行同步块中的代码。

2.intern()方法

intern() 方法是 Java 中 String 类的一个方法,用于将字符串对象放入字符串常量池中,并返回该字符串的引用。如果常量池中已经存在相同内容的字符串,则直接返回该字符串的引用;否则,将该字符串添加到常量池中,并返回其引用。

提问

1.为什么要在锁中使用代理对象执行创建订单的逻辑?

答:@Transactional 注解需要通过代理对象来生效。直接调用 this.createVoucherOrder(voucherId, userId) 不会触发 Spring 的事务管理,因为 this 指向的是当前对象实例,而不是代理对象。

2.为什么要创建订单逻辑代码提取出来改成一个方法呢?

答:防止锁提前释放

首先我们看看下面这段代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
@Transactional
public Result createVoucherOrder(Long voucherId) {
Long userId = UserHolder.getUser().getId();
//校验操作省略
synchronized(userId.toString().intern()){
// 5.1.查询订单
int count = query().eq("user_id", userId).eq("voucher_id", voucherId).count();
// 5.2.判断是否存在
if (count > 0) {
// 用户已经购买过了
return Result.fail("用户已经购买过一次!");
}

// 6.扣减库存
boolean success = seckillVoucherService.update()
.setSql("stock = stock - 1") // set stock = stock - 1
.eq("voucher_id", voucherId).gt("stock", 0) // where id = ? and stock > 0
.update();
if (!success) {
// 扣减失败
return Result.fail("库存不足!");
}

// 7.创建订单
VoucherOrder voucherOrder = new VoucherOrder();
// 7.1.订单id
long orderId = redisIdWorker.nextId("order");
voucherOrder.setId(orderId);
// 7.2.用户id
voucherOrder.setUserId(userId);
// 7.3.代金券id
voucherOrder.setVoucherId(voucherId);
save(voucherOrder);

// 7.返回订单id
return Result.ok(orderId);
}

}

你发现了什么?@Transaction这个注解是加在整个方法上面的对吧。但问题是,并不是说事务提交了锁才会释放!!!锁可能会在事务提交前提前释放,这是完全可能的,毕竟锁并不知道事务是否成功提交了。所以,我们才会选择在锁块中调用有@Transaction的方法来解决锁提前释放的问题。

问题

目前这个悲观锁只能使用于单个服务,如果部署了多个服务(tomcat),即集群的话,每个tomcat都有一个属于自己的jvm,所以多个tomcat之间无法共用同一把锁。

1653374044740

4.分布式锁

分布式锁:满足分布式系统或集群模式下多进程可见并且互斥的锁。

分布式锁的核心思想就是让大家都使用同一把锁,只要大家使用的是同一把锁,那么我们就能锁住线程,不让线程进行,让程序串行执行,这就是分布式锁的核心思路

那么分布式锁他应该满足一些什么样的条件呢?

可见性:多个线程都能看到相同的结果,注意:这个地方说的可见性并不是并发编程中指的内存可见性,只是说多个进程之间都能感知到变化的意思

互斥:互斥是分布式锁的最基本的条件,使得程序串行执行

高可用:程序不易崩溃,时时刻刻都保证较高的可用性

高性能:由于加锁本身就让性能降低,所有对于分布式锁本身需要他就较高的加锁性能和释放锁性能

安全性:安全也是程序中必不可少的一环

常见的分布式锁有三种

  1. mysql
  2. redis
  3. zookeeper

这里先使用redis来实现。至于mysql,基本不会考虑,zookeeper等学了之后再用

Redis分布式锁的核心思路

实现分布式锁需要满足以下两个条件

  • 获取锁:

    • 互斥:确保只能有一个线程获取锁
    • 非阻塞:尝试一次,成功返回true,失败返回false
  • 释放锁:

    • 手动释放
    • 超时释放:获取锁时添加一个超时时间

    利用redis的setNx方法,实现思路如下。

    1653382830810

首先,为了方便,我们可以创建一个锁的通用类SimpleRedisLock

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
public class SimpleRedisLock {
StringRedisTemplate stringRedisTemplate;
private String name;
//uuid用于区分进程
private static String uuid = UUID.randomUUID().toString();
public static String keyPrefix = "lock:";
//这里把脚本当作定值预先加载了,避免每次获取锁都要重新读取脚本内容而浪费时间
public SimpleRedisLock(StringRedisTemplate stringRedisTemplate, String name){
this.stringRedisTemplate = stringRedisTemplate;
this.name = name;
}


public boolean tryLock(Long expireSecond){
//获取线程id
long id = Thread.currentThread().getId();
Boolean b = stringRedisTemplate.opsForValue().setIfAbsent(keyPrefix + name, uuid+id, expireSecond, TimeUnit.SECONDS);
//防止b是null时拆箱出错
return BooleanUtil.isTrue(b);
}

public void unlock(){
String key = keyPrefix + name;
String val = stringRedisTemplate.opsForValue().get(keyPrefix);
if(val.equals(uuid + Thread.currentThread().toString())){
stringRedisTemplate.delete(key);
}
}

}

在抢票中使用自定义分布式锁的代码部分

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
//悲观锁解决方案
//给用户id加锁
//分布式锁解决悲观锁的分布式问题
String lockKey = "seckill:"+voucherId+":"+userId;
SimpleRedisLock lock = new SimpleRedisLock(stringRedisTemplate, lockKey);
if(lock.tryLock(1000L)){
try{
//乐观锁解决方案
//6.试着获取秒杀券
boolean b = seckillVoucherService.update().setSql("stock = stock - 1").gt("stock", 0).update();
//7.若true,则表示扣减库存成功
if(b){
VoucherOrder voucherOrder = new VoucherOrder();
voucherOrder.setUserId(userId);
voucherOrder.setVoucherId(voucherId);
this.save(voucherOrder);
return Result.ok("抢票成功,订单号:"+voucherOrder.getId());
}else{
return Result.fail("抱歉,优惠券已抢光");
}
}catch(Exception e){
return Result.fail(e.toString());
}finally {
//释放锁
lock.unlock();
}
}
else{
return Result.fail("请勿频繁点击抢票");
}

总之,现在我们知道了抢票过程中锁的流程创建是一步步递进的,乐观锁–>悲观锁–>分布式锁。思路也是循序渐进的。

提问

1.uuid的作用是什么

答:uuid是为了区分不同服务之间的标识。从代码中,我们可以看到,uuid是一个静态变量,也就是服务在初始化时就已经定下了,而uuid可以保证每次生成的内容不一致,所以,每个服务启动时都会有一个不同uuid值。这样可以方便区分锁究竟是哪个服务拿到的。

2.获取当前线程的threadId有什么用

答:用于区分同一个服务下,究竟是哪个线程拿到了锁。

3.uuid和threadId存入redis中有什么用

答:为了保证锁的安全释放。假设一个场景,锁被1号服务的1号线程获取到了,但在执行业务过程中发生了阻塞,导致长时间未完成,此时锁的过期时间又到了,导致锁被提前释放。这时,2号服务的1号线程过来拿到了锁,执行业务流程。但执行过程中,1号服务的1号线程业务执行完毕,准备执行释放锁的操作。这会导致2号服务的1号线程的锁被提前释放。为了避免这种问题的发生,我们需要区分不同进程和不同线程,在释放锁的时候先判断目前锁究竟是不是自己的,是则释放锁,不是则不进行释放。所以,我们需要引入uuid+线程id来保证锁的安全释放。

图片详解

1653385920025

5.Lua脚本

Redis提供了Lua脚本功能,在一个脚本中编写多条Redis命令,确保多条命令执行时的原子性。Lua是一种编程语言,它的基本语法大家可以参考网站:https://www.runoob.com/lua/lua-tutorial.html,这里重点介绍Redis提供的调用函数,我们可以使用lua去操作redis,又能保证他的原子性,这样就可以实现拿锁比锁删锁是一个原子性动作了。注意:lua脚本本身不具有原子性,只是用在redis时,redis赋予它实现原子性。

基本使用

redis提供的调用函数基本语法如下

1
redis.call('命令名称', 'key', '其他参数', ....)

例如,我们要执行set name ldy,则脚本这么写

1
redis.call('set', 'name', 'ldy')

例如我们要先执行完set name ldh,再执行get name,则脚本如下

1
2
3
4
5
6
# 先执行set name ldh
redis.call('set', 'name', 'ldh')
# 再执行get name
local name = redis.call('get', 'name')
# 返回
return name

其中local用于声明变量值。

写好脚本后,需要使用redis命令来调用脚本,调用脚本的常见命令如下:

image-20250404184421196

在redis中也可以直接写lua脚本语句进行执行

image-20250404184518534

上述语句没有需要从外部获取的key值,所以脚本内容后面跟个0。

其中的语法规则是 Eval "脚本内容" 脚本需要的key类型的参数个数 参数值,其中参数值的规则是这样子的,首先参数个数为n, 则后面跟的n个参数都是KEYS数组的参数值,从第n+1个参数起则是ARGV数组的参数值。

image-20250404185803664

要是脚本中的key和value不想写死,可以做为参数传递。key类型的参数需要放入KEYS数组,其他参数则需要放入ARGV数组中,在脚本中可以从KEYS和ARGV数组中获取这些参数。

注意:数组的索引是从1开始的

例如下图所示

image-20250404190732159

我们再来看看,在lua脚本中怎么写语句合适。如下所示,我们用自定义变量名来接收数组中的值,然后用自定义的变量名来做为参数。方便我们更好的理解代码。

1
2
3
4
5
6
local val1 = ARGV[1]
local val2 = ARGV[2]
local val3 = ARGV[3]
local key = KEYS[1]
local res redis.call('LPUSH', key, val1, val2, val3)
return res

把上述语句写在一行里,然后执行也是可以成功的。注意:lpush命令返回列表中的元素个数。

image-20250404191438460

ok,lua脚本的基本使用就这些了。可以开始优化分布式锁了。

分布式锁的优化

首先回顾下释放锁的业务逻辑

1.获取线程标识(uuid+线程id)

2.判断锁中存放的线程标识是否和当前的一致

3.如果一致,则释放锁(删除)

4.如果不一致,则什么都不做

用lua脚本实现

1
2
3
4
5
6
7
8
9
10
11
12
local key = KEYS[1]
local val1 = ARGV[1]
local val2 = redis.call('get', key) 0
if(val1 == val2) then
return redis.call('del', key)
end
# 简化后是这样子的
-- 比较线程标识
if(redis.call('get', KEYS[1]) == ARGV[1]) then
-- 释放锁 del key
redis.call('del', KEYS[1])
end

使用java代码调用lua脚本来改造分布式锁

我们的RedisTemplate中,可以利用execute方法去执行lua脚本,参数对应关系就如下图所示

image-20250404200842686

使用lua脚本释放锁代码如下

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
//静态脚本常量,提前加载
public static final DefaultRedisScript<Long> UNLOCK_SCRIPT;
static{
UNLOCK_SCRIPT = new DefaultRedisScript<>();
//设置脚本路径
UNLOCK_SCRIPT.setLocation(new ClassPathResource("unlock.lua"));
//设置返回值类型
UNLOCK_SCRIPT.setResultType(Long.class);
}

public void unlock(){
stringRedisTemplate.execute(UNLOCK_SCRIPT,
Collections.singletonList(keyPrefix + name),
uuid + Thread.currentThread().getId()
);
}

代码详解

  • **DefaultRedisScript<Long>**:这是变量的类型,表示这是一个DefaultRedisScript对象,泛型<Long>表示该脚本的执行结果类型为Long
  • static静态代码块用于用于初始化类的静态变量。
  • ClassPathResource是Spring框架提供的一个类,用于表示类路径下的资源文件。
  • Collections.singletonList用于设置单值的列表

6.Redisson

首先,我们基于setnx实现的自定义分布式锁存在下面的问题:

重入问题:重入问题是指 获得锁的线程可以再次进入到相同的锁的代码块中,可重入锁的意义在于防止死锁,比如HashTable这样的代码中,他的方法都是使用synchronized修饰的,假如他在一个方法内,调用另一个方法,那么此时如果是不可重入的,不就死锁了吗?所以可重入锁他的主要意义是防止死锁,我们的synchronized和Lock锁都是可重入的。

不可重试:是指目前的分布式只能尝试一次,我们认为合理的情况是:当线程在获得锁失败后,他应该能再次尝试获得锁。

超时释放:我们在加锁时增加了过期时间,这样的我们可以防止死锁,但是如果卡顿的时间超长,虽然我们采用了lua表达式防止删锁的时候,误删别人的锁,但是毕竟没有锁住,有安全隐患

主从一致性: 如果Redis提供了主从集群,当我们向集群写数据时,主机需要异步的将数据同步给从机,而万一在同步过去之前,主机宕机了,就会出现死锁问题。

什么是Redisson

Redisson是一个在Redis的基础上实现的Java驻内存数据网格(In-Memory Data Grid)。它不仅提供了一系列的分布式的Java常用对象,还提供了许多分布式服务,其中就包含了各种分布式锁的实现。

分布式锁-Redisson快速入门

引入依赖:

1
2
3
4
5
<dependency>
<groupId>org.redisson</groupId>
<artifactId>redisson</artifactId>
<version>3.13.6</version>
</dependency>

配置Redisson客户端,其中RedissonClien就是我们实现分布式锁的对象

1
2
3
4
5
6
7
8
9
10
11
12
13
14
@Configuration
public class RedissonConfig {

@Bean
public RedissonClient redissonClient(){
// 配置
Config config = new Config();
config.useSingleServer().setAddress("redis://192.168.150.101:6379")
.setPassword("123321");
// 创建RedissonClient对象
return Redisson.create(config);
}
}

利用Redisson自带的分布式锁实现抢票逻辑

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
//悲观锁解决方案
//给用户id加锁
String lockKey = "seckill:"+voucherId+":"+userId;
RLock lock = redissonClient.getLock(lockKey);
if(lock.tryLock(10, TimeUnit.SECONDS)){
try{
//乐观锁解决方案
//6.试着获取秒杀券
boolean b = seckillVoucherService.update().setSql("stock = stock - 1").gt("stock", 0).update();
//7.若true,则表示扣减库存成功
if(b){
VoucherOrder voucherOrder = new VoucherOrder();
voucherOrder.setUserId(userId);
voucherOrder.setVoucherId(voucherId);
this.save(voucherOrder);
return Result.ok("抢票成功,订单号:"+voucherOrder.getId());
}else{
return Result.fail("抱歉,优惠券已抢光");
}
}catch(Exception e){
return Result.fail(e.toString());
}finally {
//释放锁
lock.unlock();
}
}
else{
return Result.fail("请勿频繁点击抢票");
}

可见,和我们之前自定义分布式锁SimpleRedisLock的逻辑完全一致,我们自定义的分布式锁就相当于一个低配版的Redisson。

重入问题

首先,我们执行如下代码

1
2
3
4
5
6
7
8
9
10
@Test
public void method(){
RLock lock = redissonClient.getLock("mylock");
boolean b = lock.tryLock();
if(b){
System.out.println("拿到锁了");
}else{
System.out.println("没有拿到锁");
}
}

去查redis,可以看见,创建的锁是Hash类型。其中value是当前锁的重入次数,第1次是00110001,第2次是00110010,至于为什么0011开头,我也不清楚哈哈。field应该是线程标识符

image-20250404202916900

重入机制测试代码,如下,可以自己打断点调试下,查看value的变化

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
@Test
public void method1(){
RLock lock = redissonClient.getLock("lock:order:1");
boolean b = lock.tryLock();
if(b){
log.info("方法1获取锁成功");
method2();
lock.unlock();
log.info("方法1释放锁成功");
}
else{
log.info("获取锁失败");
}
}

public void method2(){
RLock lock = redissonClient.getLock("lock:order:1");
boolean b = lock.tryLock();
if(b){
log.info("方法2获取锁成功");
lock.unlock();
log.info("方法2释放锁成功");
}
}

重试机制

tryLock中有重载方法可以传入重试时间

image-20250404210813586

1
2
3
4
String lockKey = "seckill:"+voucherId+":"+userId;
RLock lock = redissonClient.getLock(lockKey);

lock.tryLock(10, 10, TimeUnit.SECONDS);

源码解读

晚点再解读

锁重试和看门狗机制

锁重试机制顾名思义就是可以对锁的获取进行不断的重试,这里暂不赘述。重要的是看门狗机制。

先回顾下,我们之前自定义的锁的释放是怎么样的,就分两种情况,1是过期自动释放,2是业务完成,手动释放。显然过期自动释放是有问题的,因为很可能过期自动释放后,原线程其实并未挂掉,只是因为某些原因执行时间较长,我们不能让没有挂掉的线程的锁提前给释放,所以需要不断的给过期时间进行延长。这就是看门狗机制。

那你可能会问了,那过期时间有什么用啊,既然都不断延长了?

答:就是为了让线程挂掉时可以释放掉锁,因为线程挂掉了,那么肯定不可能继续执行下去了,这个时候也就不会进行自动延迟过期时间了。

总之,看门狗机制就是为了确保线程在存活期间不会因锁的过期而失去锁的持有权,同时在线程挂掉时能够及时释放锁,避免死锁或资源泄露。

源码部分如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
RFuture<Long> ttlRemainingFuture = tryLockInnerAsync(waitTime,
commandExecutor.getConnectionManager().getCfg().getLockWatchdogTimeout(),
TimeUnit.MILLISECONDS, threadId, RedisCommands.EVAL_LONG);
ttlRemainingFuture.onComplete((ttlRemaining, e) -> {
if (e != null) {
return;
}

// lock acquired
if (ttlRemaining == null) {
scheduleExpirationRenewal(threadId);
}

});
return ttlRemainingFuture;

MutiLock原理

为了提高redis的可用性,我们会搭建集群或者主从,现在以主从为例

此时我们去写命令,写在主机上, 主机会将数据同步给从机,但是假设在主机还没有来得及把数据写入到从机去的时候,此时主机宕机,哨兵会发现主机宕机,并且选举一个slave变成master,而此时新的master中实际上并没有锁信息,此时锁信息就已经丢掉了。这就导致一种情况,明明获取到了锁,结果这突然锁没了,其他线程还能获取这把锁。

为了解决这个问题,我们获取这把锁时就不用主从关系获取锁了,而是把加锁逻辑写入到每个节点中,只有所有节点都加锁成功时,才算成功加锁。只要有一个节点拿不到锁,就不算加锁成功。保证了加锁的可靠性。

Redisson这块实在不好总结,毕竟源码也是一知半解,后面有时间再补吧。

7、异步优化

先回顾下我们之前抢秒杀券的流程

1、查询优惠卷

2、判断秒杀库存是否足够

3、校验是否是一人一单

4、扣减库存

5、创建订单

以上5步,都是直操作数据库,还是串行执行,执行速度有待提升。

执行速度提升有2种方法

1.利用redis进行数据校验

2.开启异步线程执行创建订单的操作

以上2种方法相结合可以有效提升运行速度,对于异步线程,我们可以使用阻塞队列来实现,让一个线程专门去对数据库执行扣减库存和创建订单的操作,不必担心线程池消耗殆尽的问题。

1653561657295

我们现在来看看整体思路:当用户下单之后,判断库存是否充足只需要到redis中去根据key找对应的value是否大于0即可,如果不充足,则直接结束,如果充足,继续在redis中判断用户是否可以下单,如果set集合中没有这条数据,说明他可以下单,如果set集合中没有这条记录,则将userId和优惠卷存入到redis中,并且返回0,整个过程需要保证是原子性的,我们可以使用lua来操作

当以上判断逻辑走完之后,我们可以判断当前redis中返回的结果是否是0 ,如果是0,则表示可以下单,则将之前说的信息存入到到queue中去,然后返回,然后再来个线程异步的下单,前端可以通过返回的订单id来判断是否下单成功。

1.Redis完成秒杀资格判断

需求:

  • 新增秒杀优惠券的同时,将优惠券信息保存到Redis中
  • 基于Lua脚本,判断秒杀库存、一人一单,决定用户是否抢购成功
  • 如果抢购成功,将优惠券id和用户id封装后存入阻塞队列
  • 开启线程任务,不断从阻塞队列中获取信息,实现异步下单功能

VoucherServiceImpl

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
@Override
@Transactional
public void addSeckillVoucher(Voucher voucher) {
// 保存优惠券
save(voucher);
// 保存秒杀信息
SeckillVoucher seckillVoucher = new SeckillVoucher();
seckillVoucher.setVoucherId(voucher.getId());
seckillVoucher.setStock(voucher.getStock());
seckillVoucher.setBeginTime(voucher.getBeginTime());
seckillVoucher.setEndTime(voucher.getEndTime());
seckillVoucherService.save(seckillVoucher);
// 保存秒杀库存到Redis中
//SECKILL_STOCK_KEY 这个变量定义在RedisConstans中
//private static final String SECKILL_STOCK_KEY ="seckill:stock:"
stringRedisTemplate.opsForValue().set(SECKILL_STOCK_KEY + voucher.getId(), voucher.getStock().toString());
}

完整lua表达式

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
-- 1.参数列表
-- 1.1.优惠券id
local voucherId = ARGV[1]
-- 1.2.用户id
local userId = ARGV[2]
-- 1.3.订单id
local orderId = ARGV[3]

-- 2.数据key
-- 2.1.库存key
local stockKey = 'seckill:stock:' .. voucherId
-- 2.2.订单key
local orderKey = 'seckill:order:' .. voucherId

-- 3.脚本业务
-- 3.1.判断库存是否充足 get stockKey
if(tonumber(redis.call('get', stockKey)) <= 0) then
-- 3.2.库存不足,返回1
return 1
end
-- 3.2.判断用户是否下单 SISMEMBER orderKey userId
if(redis.call('sismember', orderKey, userId) == 1) then
-- 3.3.存在,说明是重复下单,返回2
return 2
end
-- 3.4.扣库存 incrby stockKey -1
redis.call('incrby', stockKey, -1)
-- 3.5.下单(保存用户)sadd orderKey userId
redis.call('sadd', orderKey, userId)
return 0

当以上lua表达式执行完毕后,剩下的就是根据步骤3,4来执行我们接下来的任务了

VoucherOrderServiceImpl

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
@Override
public Result seckillVoucher(Long voucherId) {
//获取用户
Long userId = UserHolder.getUser().getId();
long orderId = redisIdWorker.nextId("order");
// 1.执行lua脚本
Long result = stringRedisTemplate.execute(
SECKILL_SCRIPT,
Collections.emptyList(),
voucherId.toString(), userId.toString(), String.valueOf(orderId)
);
int r = result.intValue();
// 2.判断结果是否为0
if (r != 0) {
// 2.1.不为0 ,代表没有购买资格
return Result.fail(r == 1 ? "库存不足" : "不能重复下单");
}
//TODO 保存阻塞队列
// 3.返回订单id
return Result.ok(orderId);
}

2.基于阻塞队列实现秒杀优化

实现代码如下

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
private static final ExecutorService EXECUTOR_SERVICE = Executors.newSingleThreadExecutor();
public static DefaultRedisScript<Long> script;

static{
script = new DefaultRedisScript<Long>();
script.setLocation(new ClassPathResource("seckill.lus"));
script.setResultType(Long.class);
}
@PostConstruct
public void init(){
EXECUTOR_SERVICE.submit(new SeckillVoucherHandler());
}

private BlockingQueue<VoucherOrder> orderTasks =new ArrayBlockingQueue<>(1024 * 1024);
private IVoucherOrderService proxy;
//在类初始化之后执行,因为当这个类初始化好了之后,随时都是有可能要执行的
private class SeckillVoucherHandler implements Runnable {

@Override
public void run() {
//不断循环去查看阻塞队列中是否存在订单信息
while(true){
try{
//1.从阻塞队列获取订单信息
VoucherOrder order = orderTasks.take();
//2.创建订单
createVoucherOrder(order);
}catch(Exception e){
log.error(e.toString());
}
}

}
public void createVoucherOrder(VoucherOrder order){
//1.获取用户id
Long userId = order.getUserId();
//2.获取优惠券id
Long voucherId = order.getVoucherId();
//3.获取锁
RLock lock = redissonClient.getLock("lock:seckill:" + voucherId + ":" + userId);
try {
boolean b = lock.tryLock(10, TimeUnit.SECONDS);
//4.拿到锁
if(b){
//获取代理对象
proxy = (IVoucherOrderService) AopContext.currentProxy();
//创建订单
proxy.createVoucherOrder(order);
}
//5.没拿到锁
log.error("限一人一单");
} catch (InterruptedException e) {
throw new RuntimeException(e);
}finally {
//释放锁
lock.unlock();
}
}
}
@Transactional
@Override
public void createVoucherOrder(VoucherOrder order) {
Long userId = order.getUserId();
// 5.1.查询订单
int count = query().eq("user_id", userId).eq("voucher_id", order.getVoucherId()).count();
// 5.2.判断是否存在
if (count > 0) {
// 用户已经购买过了
log.error("用户已经购买过了");
return ;
}

// 6.扣减库存
boolean success = seckillVoucherService.update()
.setSql("stock = stock - 1")
.eq("voucher_id", order.getVoucherId()).gt("stock", 0)
.update();
if (!success) {
// 扣减失败
log.error("库存不足");
return ;
}
save(order);
}
@Override
public Result seckillVoucher(Long voucherId) throws InterruptedException {

Long userId = UserHolder.getUser().getId();
//1.查询秒杀券信息
SeckillVoucher seckillVoucher = seckillVoucherService.getById(voucherId);
if(seckillVoucher == null){
return Result.fail("秒杀券不存在");
}
//2.判断秒杀活动是否开始
LocalDateTime beginTime = seckillVoucher.getBeginTime();
if(beginTime.isAfter(LocalDateTime.now())){
return Result.fail("秒杀活动暂未开始");
}
//3.判断秒杀是否已经结束
LocalDateTime endTime = seckillVoucher.getEndTime();
if(endTime.isBefore(LocalDateTime.now())){
return Result.fail("秒杀已经结束");
}
//执行lua脚本
Long execute = stringRedisTemplate.execute(script, Collections.emptyList(), voucherId, userId);
if(execute == null){
return Result.fail("lua脚本执行异常");
}
if(!execute.equals(0L)){
return execute == 1 ? Result.fail("库存不足") : Result.fail("限一人一单");
}
//生成订单号
Long orderId = redisIdWorker.nextId("order:");
//创建订单
VoucherOrder order = new VoucherOrder();
order.setUserId(userId);
order.setId(orderId);
order.setVoucherId(voucherId);
//将订单信息存入到阻塞队列中
orderTasks.add(order);
return Result.ok(orderId);
}

代码详解

1.如下代码作用是什么?

1
2
3
4
@PostConstruct
public void init(){
EXECUTOR_SERVICE.submit(new SeckillVoucherHandler());
}

答:因为类初始化之后,随时都可能要处理阻塞队列,所以在类实例初始化完成时,就开始执行new SeckillVoucherHandler()的run方法,处理阻塞队列。

8.基于redis的消息队列

什么是消息队列:字面意思就是存放消息的队列。最简单的消息队列模型包括3个角色:

  • 消息队列:存储和管理消息,也被称为消息代理(Message Broker)
  • 生产者:发送消息到消息队列
  • 消费者:从消息队列获取消息并处理消息

在redis中,一共有三种数据类型可以实现消息队列。

image-20250314175932374

基于List结构模拟消息队列

消息队列(Message Queue),字面意思就是存放消息的队列。而Redis的list数据结构是一个双向链表,很容易模拟出队列效果。

队列是入口和出口不在一边,因此我们可以利用:LPUSH 结合 RPOP、或者 RPUSH 结合 LPOP来实现。
不过要注意的是,当队列中没有消息时RPOP或LPOP操作会返回null,并不像JVM的阻塞队列那样会阻塞并等待消息。因此这里应该使用BRPOP或者BLPOP来实现阻塞效果。

1653575176451

基于List的消息队列有哪些优缺点?
优点:

  • 利用Redis存储,不受限于JVM内存上限
  • 基于Redis的持久化机制,数据安全性有保证
  • 可以满足消息有序性

缺点:

  • 无法避免消息丢失
  • 只支持单消费者

基于PubSub结构模拟消息队列

PubSub(发布订阅)是Redis2.0版本引入的消息传递模型。顾名思义,消费者可以订阅一个或多个channel,生产者向对应channel发送消息后,所有订阅者都能收到相关消息。

SUBSCRIBE channel [channel] :订阅一个或多个频道
PUBLISH channel msg :向一个频道发送消息
PSUBSCRIBE pattern[pattern] :订阅与pattern格式匹配的所有频道

1653575506373

基于PubSub的消息队列有哪些优缺点?
优点:

  • 采用发布订阅模型,支持多生产、多消费

缺点:

  • 不支持数据持久化
  • 无法避免消息丢失
  • 消息堆积有上限,超出时数据丢失

基于Stream实现消息队列

Stream 是 Redis 5.0 引入的一种新数据类型,可以实现一个功能非常完善的消息队列。

发送消息的命令:

1653577301737

例如:

1653577349691

读取消息的方式之一:XREAD

1653577445413

例如,使用XREAD读取第一个消息:

1653577643629

XREAD阻塞方式,读取最新的消息:

1653577659166

在业务开发中,我们可以循环的调用XREAD阻塞方式来查询最新消息,从而实现持续监听队列的效果,伪代码如下

1653577689129

注意:当我们指定起始ID为$时,代表读取最新的消息,如果我们处理一条消息的过程中,又有超过1条以上的消息到达队列,则下次获取时也只能获取到最新的一条,会出现漏读消息的问题。

STREAM类型消息队列的XREAD命令特点:

  • 消息可回溯
  • 一个消息可以被多个消费者读取
  • 可以阻塞读取
  • 有消息漏读的风险

基于Stream的消息队列-消费者组

消费者组(Consumer Group):将多个消费者划分到一个组中,监听同一个队列。具备下列特点:

1653577801668

创建消费者组:
1653577984924
key:队列名称
groupName:消费者组名称
ID:起始ID标示,$代表队列中最后一个消息,0则代表队列中第一个消息
MKSTREAM:队列不存在时自动创建队列
其它常见命令:

删除指定的消费者组

1
XGROUP DESTORY key groupName

给指定的消费者组添加消费者

1
XGROUP CREATECONSUMER key groupname consumername

删除消费者组中的指定消费者

1
XGROUP DELCONSUMER key groupname consumername

从消费者组读取消息:

1
XREADGROUP GROUP group consumer [COUNT count] [BLOCK milliseconds] [NOACK] STREAMS key [key ...] ID [ID ...]
  • group:消费组名称
  • consumer:消费者名称,如果消费者不存在,会自动创建一个消费者
  • count:本次查询的最大数量
  • BLOCK milliseconds:当没有消息时最长等待时间
  • NOACK:无需手动ACK,获取到消息后自动确认
  • STREAMS key:指定队列名称
  • ID:获取消息的起始ID:

对于起始消息ID,分两类情况,1.未消费 2.已消费未确认

“>”:从下一个未消费的消息开始

“0”:从第一个已消费未确认开始

“其他id”:从该id的后面开始获取已消费未确认的消息image-20250406143645095

确认消息

1
XACK key group id [id ...]

image-20250406142947663

基于Redis的Stream结构作为消息队列,实现异步秒杀下单

需求:

  • 创建一个Stream类型的消息队列,名为stream.orders

  • 修改之前的秒杀下单Lua脚本,在认定有抢购资格后,直接向stream.orders中添加消息,内容包含voucherId、userId、orderId

    或者

    修改seckillVoucher,在执行完lua脚本,认定有抢购资格后,向stream.orders中添加消息,代码如下

    1
    2
    3
    4
    5
    6
    7
    VoucherOrder order = new VoucherOrder();
    Map<String, String> map = new HashMap<>();
    map.put("userId", userId.toString());
    map.put("id", orderId.toString());
    map.put("voucherId", voucherId.toString());
    //3.3 发送消息到stream.order队列中
    stringRedisTemplate.opsForStream().add(MapRecord.create("stream.order", map));
  • 项目启动时,开启一个线程任务,尝试获取stream.orders中的消息,完成下单

实现整体代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
private static final ExecutorService executor = Executors.newSingleThreadExecutor();
public static final DefaultRedisScript<Long> SECKILL_SCRIPT;

static {
SECKILL_SCRIPT = new DefaultRedisScript<>();
SECKILL_SCRIPT.setLocation(new ClassPathResource("seckill.lua"));
SECKILL_SCRIPT.setResultType(Long.class);
}
@PostConstruct
private void init(){
executor.submit(new VoucherOrderHanler());
}
private class VoucherOrderHanler implements Runnable{
@Override
public void run() {
//循环获取消息队列中的信息
while(true){
try {
//1.获取消息队列中的订单信息 XREADGROUP GROUP g1 c1 COUNT 1 BLOCK 2000 STREAMS stream.order >
List<MapRecord<String, Object, Object>> list = stringRedisTemplate.opsForStream().read(
Consumer.from("g1", "c1"),
StreamReadOptions.empty().count(1).block(Duration.ofSeconds(2)),
StreamOffset.create("stream.order", ReadOffset.lastConsumed())
);
//2.判断消息是否获取成功
if(list == null || list.isEmpty()){
//3.没有消息,进入下一次循环
continue;
}
//3.解析订单消息
//这里的MapRecord中的第一个String是消息id,后面的Object和Object组合起来是一个map,存放键值对
MapRecord<String, Object, Object> entries =
list.get(0);
Map<Object, Object> map = entries.getValue();
VoucherOrder order = BeanUtil.fillBeanWithMap(map, new VoucherOrder(), true);
//4.获取成功,则下单
handleVoucherOrder(order);
//5.ACK确认消息
stringRedisTemplate.opsForStream().acknowledge("stream.order", "g1", entries.getId());
} catch (Exception e) {
//若消息处理发生异常,则需要处理PendingList中的消息
handlePendingList();
}
}

}
}
public void handlePendingList(){
while(true){
try {
//1.获取PendingList中的消息
List<MapRecord<String, Object, Object>> list = stringRedisTemplate.opsForStream().read(
Consumer.from("g1", "c1"),
StreamReadOptions.empty().count(1).block(Duration.ofSeconds(2)),
StreamOffset.create("stream.order", ReadOffset.from("0"))
);
//2.判断消息是否获取成功
if(list == null || list.isEmpty()){
//3.没有消息,说明消息已经全部确认完毕
break;
}
//3.解析订单消息
//这里的MapRecord中的第一个String是消息id,后面的Object和Object组合起来是一个map,存放键值对
MapRecord<String, Object, Object> entries =
list.get(0);
Map<Object, Object> map = entries.getValue();
VoucherOrder order = BeanUtil.fillBeanWithMap(map, new VoucherOrder(), true);
//4.获取成功,则下单
handleVoucherOrder(order);
//5.ACK确认消息
stringRedisTemplate.opsForStream().acknowledge("stream.order", "g1", entries.getId());
} catch (Exception e) {
log.error("处理PendingList中发生异常");
}
}
}

public void handleVoucherOrder(VoucherOrder order) {
//1.获取用户、优惠券id
Long userId = order.getUserId();
Long voucherId = order.getVoucherId();
//2.获取锁对象
RLock lock = redissonClient.getLock("lock:order:" + userId);
//3.获取锁
boolean b = lock.tryLock();
//4.判断锁是否获取成功
if(!b){
//获取锁失败,输出日志
log.error("发送异常,没获取到锁");
}
try{
proxy.createVoucherOrder(order);
}finally {
//释放锁
lock.unlock();
}
}
@Override
@Transactional
public void createVoucherOrder(VoucherOrder order) {
Long userId = order.getUserId();
Long voucherId = order.getVoucherId();
//5.一人一单逻辑
Integer count = this.query().eq("user_id", userId).eq("voucher_id", voucherId).count();
if(count > 0){
log.error("用户已经下过单");
}
//5.扣减库存,需要再次校验库存是否充足,保证线程安全
boolean success = seckillVoucherService.update().setSql("stock = stock - 1")
.eq("voucher_id", voucherId)
.gt("stock", 0).update();
if(!success){
//库存不足
log.error("库存不足");
}
this.save(order);
}
private IVoucherOrderService proxy;
@Override
public Result seckillVoucher(Long voucherId) throws InterruptedException {
//1.执行lua脚本
Long userId = UserHolder.getUser().getId();
Long execute = stringRedisTemplate.execute(
SECKILL_SCRIPT,
Collections.emptyList(),
userId.toString(),
voucherId.toString()
);
//2.判断结果是否为0
if(execute != 0){
//2.1不为0返回异常信息
return execute == 1 ? Result.fail("库存不足!") : Result.fail("不能重复下单");
}
//3.创建订单,异步下单
//3.1 获取订单id
Long orderId = redisIdWorker.nextId(SECKILL_ORDER_KEY);
//3.2将用户id, 优惠券id和 订单id存入stream中
proxy = (IVoucherOrderService) AopContext.currentProxy();
VoucherOrder order = new VoucherOrder();
Map<String, String> map = new HashMap<>();
map.put("userId", userId.toString());
map.put("id", orderId.toString());
map.put("voucherId", voucherId.toString());
//3.3 发送消息到stream.order队列中
stringRedisTemplate.opsForStream().add(MapRecord.create("stream.order", map));
//3.4返回订单id
return Result.ok(orderId);
}

代码详解

因为是基于redis的Stream实现消息队列,所以涉及到的api比较多,下面简单解释下api.

1.消费组消息获取

1
2
3
4
5
List<MapRecord<String, Object, Object>> list = stringRedisTemplate.opsForStream().read(
Consumer.from("g1", "c1"),
StreamReadOptions.empty().count(1).block(Duration.ofSeconds(2)),
StreamOffset.create("stream.order", ReadOffset.lastConsumed())
);

上述代码翻译成redis语句是

XREADGROUP GROUP g1 c1 COUNT 1 BLOCK 2000 STREAMS stream.order >

意思是从g1消费者组中获取一条未读取到的消息给消费者c1,若stream中有未读取的消息,则获取最早未读取的消息,若stream中没有消息,则阻塞2s。若2s内还未获取到新消息则结束。语句执行完的结果是读取到的消息。这里测试了下,如果有2个消费者c1和c2,c1读取到消息后,c2无法读取c1读取过的消息,即一条消息只能被一个消费者读取,这里就是单纯为了加快消费速度,将一个消费组交由多个消费者共同处理。不能把之前学习过的rabbitmq的广播机制代入到这里。

注意:代码中没有创建消费组这一代码的实现,因为这个消费组我们已经在redis中提前创建好了。

image-20250406140243528

2.自动类型转换

订单数据存入到消息队列

1
2
3
4
5
6
7
VoucherOrder order = new VoucherOrder();
Map<String, String> map = new HashMap<>();
map.put("userId", userId.toString());
map.put("id", orderId.toString());
map.put("voucherId", voucherId.toString());
//3.3 发送消息到stream.order队列中
stringRedisTemplate.opsForStream().add(MapRecord.create("stream.order", map));

从消息队列获取订单数据

1
2
3
MapRecord<String, Object, Object> entries = list.get(0);
Map<Object, Object> map = entries.getValue();
VoucherOrder order = BeanUtil.fillBeanWithMap(map, new VoucherOrder(), true);

可以看到,我们存入消息队列中的数据是Map<String, String>,最后从消息队列中读取出来的map类型是Map<Object,Object>,最后经由BeanUtil.fillBeanWithMap(),根据字段名称自动赋值和类型转换。

拿userId举例,实现了Long –> String –> Obejct –> Long的类型转换,很厉害吧。

至于BeanUtil.fillBeanWithMap(map, new VoucherOrder(), true)这句代码中的true,则是为了忽略错误。

即当从 map 填充数据到目标对象(这里是 VoucherOrder)时,如果遇到某些字段无法匹配或转换失败的情况,ignoreError() 会告诉工具类忽略这些错误,继续完成其他字段的填充,而不是抛出异常中断整个过程。

3.handlePendingList方法的作用

pending-list中存的是获取已消费但未确认的消息。当我们从消息队列中获取到消息,并消费完消息时,需要根据消息id及时确认该消息已经被处理完成,否则改消息依然会存在于pending-list中等待确认。这种情况是为了防止获取到消息后,在消息处理时发生意外而导致的消息丢失问题。至于消息可能被重复消费(指的是消息处理完,但是在确认消息时发生意外)问题,那就看最终的消息处理有没有校验机制了,在这里,订单在创建时依然会进行一人一单和库存校验,所以不用担心消息被重复消费的问题。

因为redis的消息队列实用性不如专门的消息队列,所以点到为止。

9.基于RabbitMq的消息队列

待添加。

五、达人探店

1.发布探店笔记

关于发布探店笔记,这里主要是存储相应的探店图片。

图片上传接口如下

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
@PostMapping("blog")
public Result uploadImage(@RequestParam("file") MultipartFile image) {
try {
// 获取原始文件名称
String originalFilename = image.getOriginalFilename();
// 生成新文件名
String fileName = createNewFileName(originalFilename);
// 保存文件
image.transferTo(new File(SystemConstants.IMAGE_UPLOAD_DIR, fileName));
// 返回结果
log.debug("文件上传成功,{}", fileName);
return Result.ok(fileName);
} catch (IOException e) {
throw new RuntimeException("文件上传失败", e);
}
}
private String createNewFileName(String originalFilename) {
// 获取后缀
String suffix = StrUtil.subAfter(originalFilename, ".", true);
// 生成目录
String name = UUID.randomUUID().toString();
int hash = name.hashCode();
int d1 = hash & 0xF;
int d2 = (hash >> 4) & 0xF;
// 判断目录是否存在
File dir = new File(SystemConstants.IMAGE_UPLOAD_DIR, StrUtil.format("/blogs/{}/{}", d1, d2));
if (!dir.exists()) {
dir.mkdirs();
}
// 生成文件名
return StrUtil.format("/blogs/{}/{}/{}.{}", d1, d2, name, suffix);
}

代码详解

1.获取文件后缀suffix

String suffix = StrUtil.subAfter(originalFilename, ".", true);这段代码是获取文件的后缀名,例如test.jpg,获取到的就是jpg后缀。

2.uuid,d1和d2的作用

首先根据UUID生成一个随机字符串并做为最后的文件名称,再获取这个随机字符串的hash值,根据这个hash值,先取低4位,再右移4位再取低4位。作用就是d1是文件第一层的编号(0-15),d2是文件第二层的编号(0-15)。所以d1和d2就是获取一个随机存放图片的路径。而uuid和suffix结合就是最后存储的文件名称。最后获取的到的文件名不仅包含了文件名,还包含了一部分存储文件路径。

3.transferTo方法作用

transferTo方法会将 MultipartFile 对象中的文件内容写入到目标文件路径中。

如果目标路径的目录不存在,transferTo 方法不会自动创建目录,因此在调用 transferTo 之前,需要确保目标目录已经存在(如在 createNewFileName 方法中已经创建了目录)。

2.查看探店笔记

核心代码

1
2
3
4
5
6
7
8
9
10
11
12
@Override
public Result queryBlogById(Long id) {
Blog blog = this.getById(id);
if(blog == null){
return Result.fail("笔记不存在");
}
// 获取博客对应的作者信息
queryBlogUser(blog);
// 查询已被点赞
isBlogLiked(blog);
return Result.ok(blog);
}

可以看到,除了经过数据库查询外,还进行了作者信息查询和点赞信息查询

所以Blog这个实体类中还有额外的参数设置,如下所示,icon、name、和isLike不是数据库对应表中的字段,所以加上了@TableFiled(exist = false)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
public class Blog implements Serializable {
private static final long serialVersionUID = 1L;
/**
* 用户图标
*/
@TableField(exist = false)
private String icon;
/**
* 用户姓名
*/
@TableField(exist = false)
private String name;
/**
* 是否点赞过了
*/
@TableField(exist = false)
private Boolean isLike;

}

3.点赞功能-ZSet

对于点赞功能的实现,我们需要保证,每篇笔记每个人只能点一次赞,再点赞只能取消,一个人不能刷赞。这个功能基于数据库来实现的话,还需要存储额外的一张表来对于用户id和笔记id。比较浪费时间,空间也比较浪费。所以我们基于redis中的zset这个数据类型来存储每篇笔记的点赞用户。

核心代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
@Override
public Result likeBlog(Long id) {
//1.查看用户是否点赞过该篇博客
UserDTO user = UserHolder.getUser();
if(user == null){
//若用户未登录,则无法点赞
return Result.fail("用户未登录,无法点赞");
}
Long userId = user.getId();
String key = BLOG_LIKE_KEY + id;
Double score = stringRedisTemplate.opsForZSet().score(key, userId.toString());
//2.若没点赞过,则点赞数加一
if(score == null){
boolean success = this.update().setSql("liked = liked + 1").eq("id", id).update();
if(success){
//2.1.保存用户到set集合
stringRedisTemplate.opsForZSet().add(key, userId.toString(), System.currentTimeMillis());
}

}
else{
//3.若已点赞,则数据库点赞数减一
boolean success = this.update().setSql("liked = liked - 1").eq("id", id).update();
if(success){
//3.1.把用户从redis的Set集合移出
stringRedisTemplate.opsForZSet().remove(key, userId.toString());
}
}
return Result.ok();
}

可以看到,首先在redis中校验当前用户是否已经点赞过,若已点赞,则先在数据库中点赞数减一,再把用户从redis集合中移出。未点赞同理。

4.点赞排行榜

上面点赞功能的实现为什么使用zset而不使用set,主要是因为为了实现点赞排行榜这个功能。

首先,先明确这个点赞排行榜的功能指的是最近点赞的用户,即越新的赞,排行越靠前。而zset中,我们存储的是点赞的当前时间戳,所以需要使用rever

核心代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
public Result queryBlogLikes(Long id) {
//1.查询近期点赞的top5用户
String key = BLOG_LIKE_KEY + id;
Set<String> top5 = stringRedisTemplate.opsForZSet().reverseRange(key, 0, 4);
//2.解析其中的用户id
if(top5 == null || top5.isEmpty()){
//若top5为空,表明没有人点赞。则直接返回空集合
return Result.ok(Collections.emptyList());
}
List<Long> ids = top5.stream().map(Long::valueOf).collect(Collectors.toList());
//3.根据用户id查询用户,并赋值信息到userDto,避免重要数据泄露
String str = StrUtil.join(",",ids);
List<User> users = userService.query().in("id",ids).last("order by Field(id," + str+ ")").list();
List<UserDTO> userDtos = users.stream().map(user -> BeanUtil.copyProperties(user, UserDTO.class)).collect(Collectors.toList());
//4.返回
return Result.ok(userDtos);
}

提问

1.Set集合不是无序吗,为什么能保证最后ids是有序的?

答:因为使用了Stream的方式。虽然 Set 是无序的,但在迭代时,Stream 会按照提取的顺序处理这些值。

  • reverseRange 方法返回的 Set<String> 的内容是按照 Redis 中有序集合的倒序排列提取的。
  • 使用 Streamcollect 方法将 Set<String> 转换为 List<Long> 时,会保留 Set 的迭代顺序。
  • 因此,ids 的顺序会和 Redis 中的有序集合的倒序一致。

2.为什么sql语句查询最后要使用order by Field(id," + str+ ")"

答:为了让最后获取到的字段按照ids的顺序排列,保证获取到的用户信息严格按照点赞顺序排列。

ORDER BY FIELD() 是一种特殊的排序方式,用于根据指定的值列表对结果进行排序。它允许你定义一个自定义的排序顺序,而不是按照默认的升序或降序排序。

ORDER BY FIELD() 的语法如下:

1
ORDER BY FIELD(column_name, value1, value2, value3, ...)
  • column_name:需要排序的列名。
  • value1, value2, value3, ...:定义的排序顺序。这些值是列中可能存在的值,它们将按照指定的顺序进行排序。
  • 如果查询到的字段的value并没有在order by field()的value中,则默认排到最后。它们之间的排序顺序是按照默认的升序或降序。

优点

在上面代码中,很好的使用了stream流的方式处理数据,先是Set转为List,然后是User转为UserDTO避免不必要字段泄露。

六、好友关注

1.关注功能

好友关注功能一共有三个接口,如下

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
@Autowired
IFollowService followService;
//关注好友
@PutMapping("/{id}/{isfollow}")
public Result follow(@PathVariable("id") Long id, @PathVariable("isfollow") Boolean isfollow){
return followService.follow(id, isfollow);
}
//判断是否关注
@GetMapping("/or/not/{id}")
public Result isFollow(@PathVariable("id") Long id){
return followService.isFollow(id);
}
//查询共同好友
@GetMapping("/common/{id}")
public Result followCommons(@PathVariable("id") Long id){
return followService.followCommons(id);
}

对应的实现方法如下

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
public Result follow(Long id, Boolean isfollow) {
//1.获取登录用户的id
Long userId = UserHolder.getUser().getId();
String key = "follow:"+userId;
//2.判断是取关还是关注
if(isfollow){
//3.1关注,新增数据
Follow follow = new Follow();
follow.setFollowUserId(id);
follow.setUserId(userId);
//3.2添加关注信息到redis
stringRedisTemplate.opsForSet().add(key, id.toString());
save(follow);
}else{
//4.1取关
remove(new QueryWrapper<Follow>().eq("user_id", userId).eq("follow_user_id", id));
//4.2从redis中移除信息
stringRedisTemplate.opsForSet().remove(key, id.toString());
}
return Result.ok();
}

@Override
public Result isFollow(Long id) {
//1.获取用户id
Long userId = UserHolder.getUser().getId();
//2.查询是否关注
Integer count = query().eq("follow_user_id", id).eq("user_id", userId).count();
//3.判断返回
return Result.ok(count > 0);
}

@Override
public Result followCommons(Long id) {
//1.获取用户id
Long userId = UserHolder.getUser().getId();
String key = "follow:"+userId;
String key1 = "follow:"+id;
//2.获取双方的关注set交集
Set<String> commons = stringRedisTemplate.opsForSet().intersect(key, key1);
if(CollectionUtil.isEmpty(commons)){
//若无交集,则返回空集合
return Result.ok(Collections.emptyList());
}
//3.解析id集合
List<Long> list = commons.stream().map(Long::valueOf).collect(Collectors.toList());
//4.获取用户
List<UserDTO> userDtos = userService.listByIds(list).stream().map(user -> BeanUtil.copyProperties(user, UserDTO.class)).collect(Collectors.toList());
return Result.ok(userDtos);
}

其中,添加关注和查看共同关注功能利用了redis的set集合。用户的关注消息存储到set集合中可以方便获取共同好友以及获取动态推送的粉丝信息,避免读取数据库花费较多时间。

2.好友动态推送-Feed流

首先介绍下什么是Feed流

当我们关注了用户后,这个用户发了动态,那么我们应该把这些数据推送给用户,这个需求,其实我们又把他叫做Feed流,关注推送也叫做Feed流,直译为投喂。为用户持续的提供“沉浸式”的体验,通过无限下拉刷新获取新的信息。

对于传统的模式的内容解锁:我们是需要用户去通过搜索引擎或者是其他的方式去解锁想要看的内容

1653808641260

对于新型的Feed流的的效果:不需要我们用户再去推送信息,而是系统分析用户到底想要什么,然后直接把内容推送给用户,从而使用户能够更加的节约时间,不用主动去寻找。

1653808993693

Feed流的实现有两种模式:

Feed流产品有两种常见模式:
Timeline:不做内容筛选,简单的按照内容发布时间排序,常用于好友或关注。例如朋友圈

  • 优点:信息全面,不会有缺失。并且实现也相对简单
  • 缺点:信息噪音较多,用户不一定感兴趣,内容获取效率低

智能排序:利用智能算法屏蔽掉违规的、用户不感兴趣的内容。推送用户感兴趣信息来吸引用户

  • 优点:投喂用户感兴趣信息,用户粘度很高,容易沉迷
  • 缺点:如果算法不精准,可能起到反作用
    本例中的个人页面,是基于关注的好友来做Feed流,因此采用Timeline的模式。该模式的实现方案有三种:

我们本次针对好友的操作,采用的就是Timeline的方式,只需要拿到我们关注用户的信息,然后按照时间排序即可

,因此采用Timeline的模式。该模式的实现方案有三种:

  • 拉模式
  • 推模式
  • 推拉结合

拉模式:也叫做读扩散

该模式的核心含义就是:当张三和李四和王五发了消息后,都会保存在自己的邮箱中,假设赵六要读取信息,那么他会从读取他自己的收件箱,此时系统会从他关注的人群中,把他关注人的信息全部都进行拉取,然后在进行排序。

简单来说就是把消息写到自己的发件箱,粉丝都从这个发件箱里拉信息。

优点:比较节约空间,因为赵六在读信息时,并没有重复读取,而且读取完之后可以把他的收件箱进行清楚。

缺点:比较延迟,当用户读取数据时才去关注的人里边去读取数据,假设用户关注了大量的用户,那么此时就会拉取海量的内容,对服务器压力巨大。

1653809450816

推模式:也叫做写扩散。

推模式是没有写邮箱的,当张三写了一个内容,此时会主动的把张三写的内容发送到他的粉丝收件箱中去,假设此时李四再来读取,就不用再去临时拉取了

简单来说就是把消息写到粉丝的收件箱,粉丝读他们自己的收件箱即可。

优点:时效快,不用临时拉取

缺点:内存压力大,假设一个大V写信息,很多人关注他, 就会写很多分数据到粉丝那边去

1653809875208

推拉结合模式:也叫做读写混合,兼具推和拉两种模式的优点。

推拉模式是一个折中的方案,站在发件人这一段,如果是个普通的人,那么我们采用写扩散的方式,直接把数据写入到他的粉丝中去,因为普通的人他的粉丝关注量比较小,所以这样做没有压力,如果是大V,那么他是直接将数据先写入到一份到发件箱里边去,然后再直接写一份到活跃粉丝收件箱里边去,现在站在收件人这端来看,如果是活跃粉丝,那么大V和普通的人发的都会直接写入到自己收件箱里边来,而如果是普通的粉丝,由于他们上线不是很频繁,所以等他们上线时,再从发件箱里边去拉信息。

1653812346852

需求:

  • 修改新增探店笔记的业务,在保存blog到数据库的同时,推送到粉丝的收件箱
  • 收件箱满足可以根据时间戳排序,必须用Redis的数据结构实现
  • 查询收件箱数据时,可以实现分页查询

Feed流中的数据会不断更新,所以数据的角标也在变化,因此不能采用传统的分页模式。

传统了分页在feed流是不适用的,因为我们的数据会随时发生变化

假设在t1 时刻,我们去读取第一页,此时page = 1 ,size = 5 ,那么我们拿到的就是106 这几条记录,假设现在t2时候又发布了一条记录,此时t3 时刻,我们来读取第二页,读取第二页传入的参数是page=2 ,size=5 ,那么此时读取到的第二页实际上是从6 开始,然后是62 ,那么我们就读取到了重复的数据,所以feed流的分页,不能采用原始方案来做。

1653813047671

Feed流的滚动分页

我们需要记录每次操作的最后一条,然后从这个位置开始去读取数据

举个例子:我们从t1时刻开始,拿第一页数据,拿到了10~6,然后记录下当前最后一次拿取的记录,就是6,t2时刻发布了新的记录,此时这个11放到最顶上,但是不会影响我们之前记录的6,此时t3时刻来拿第二页,第二页这个时候拿数据,还是从6后一点的5去拿,就拿到了5-1的记录。我们这个地方可以采用sortedSet来做,可以进行范围查询,并且还可以记录当前获取数据时间戳最小值,就可以实现滚动分页了

1653813462834

核心的意思:就是我们在保存完探店笔记后,获得到当前笔记的粉丝,然后把数据推送到粉丝的redis中去。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
@Override
public Result saveBlog(Blog blog) {
// 1.获取登录用户
UserDTO user = UserHolder.getUser();
blog.setUserId(user.getId());
// 2.保存探店笔记
boolean isSuccess = save(blog);
if(!isSuccess){
return Result.fail("新增笔记失败!");
}
// 3.查询笔记作者的所有粉丝 select * from tb_follow where follow_user_id = ?
List<Follow> follows = followService.query().eq("follow_user_id", user.getId()).list();
// 4.推送笔记id给所有粉丝
for (Follow follow : follows) {
// 4.1.获取粉丝id
Long userId = follow.getUserId();
// 4.2.推送
String key = FEED_KEY + userId;
stringRedisTemplate.opsForZSet().add(key, blog.getId().toString(), System.currentTimeMillis());
}
// 5.返回id
return Result.ok(blog.getId());
}

3.获取推送信息

需求:在个人主页的“关注”卡片中,查询并展示推送的Blog信息:

具体操作如下:

1、每次查询完成后,我们要分析出查询出数据的最小时间戳,这个值会作为下一次查询的条件

2、我们需要找到与上一次查询相同的查询个数作为偏移量,下次查询时,跳过这些查询过的数据,拿到我们需要的数据

综上:我们的请求参数中就需要携带 lastId:上一次查询的最小时间戳 和偏移量这两个参数。

这两个参数第一次会由前端来指定,以后的查询就根据后台结果作为条件,再次传递到后台。

1653819821591

一、定义出来具体的返回值实体类

1
2
3
4
5
6
@Data
public class ScrollResult {
private List<?> list;
private Long minTime;
private Integer offset;
}

BlogController

注意:RequestParam 表示接受url地址栏传参的注解,当方法上参数的名称和url地址栏不相同时,可以通过RequestParam 来进行指定

1
2
3
4
5
@GetMapping("/of/follow")
public Result queryBlogOfFollow(
@RequestParam("lastId") Long max, @RequestParam(value = "offset", defaultValue = "0") Integer offset){
return blogService.queryBlogOfFollow(max, offset);
}

BlogServiceImpl

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
@Override
public Result queryBlogOfFollow(Long max, Integer offset) {
// 1.获取当前用户
Long userId = UserHolder.getUser().getId();
// 2.查询收件箱 ZREVRANGEBYSCORE key Max Min LIMIT offset count
String key = FEED_KEY + userId;
Set<ZSetOperations.TypedTuple<String>> typedTuples = stringRedisTemplate.opsForZSet()
.reverseRangeByScoreWithScores(key, 0, max, offset, 2);
// 3.非空判断
if (typedTuples == null || typedTuples.isEmpty()) {
return Result.ok();
}
// 4.解析数据:blogId、minTime(时间戳)、offset
List<Long> ids = new ArrayList<>(typedTuples.size());
long minTime = 0; // 2
int os = 1; // 2
for (ZSetOperations.TypedTuple<String> tuple : typedTuples) { // 5 4 4 2 2
// 4.1.获取id
ids.add(Long.valueOf(tuple.getValue()));
// 4.2.获取分数(时间戳)
long time = tuple.getScore().longValue();
if(time == minTime){
os++;
}else{
minTime = time;
os = 1;
}
}
os = minTime == max ? os : os + offset;
// 5.根据id查询blog
String idStr = StrUtil.join(",", ids);
List<Blog> blogs = query().in("id", ids).last("ORDER BY FIELD(id," + idStr + ")").list();

for (Blog blog : blogs) {
// 5.1.查询blog有关的用户
queryBlogUser(blog);
// 5.2.查询blog是否被点赞
isBlogLiked(blog);
}

// 6.封装并返回
ScrollResult r = new ScrollResult();
r.setList(blogs);
r.setOffset(os);
r.setMinTime(minTime);

return Result.ok(r);
}

代码详解

1.下面这段代码在做什么

1
2
3
4
5
6
7
8
9
10
11
12
for (ZSetOperations.TypedTuple<String> tuple : typedTuples) { // 5 4 4 2 2
// 4.1.获取id
ids.add(Long.valueOf(tuple.getValue()));
// 4.2.获取分数(时间戳)
long time = tuple.getScore().longValue();
if(time == minTime){
os++;
}else{
minTime = time;
os = 1;
}
}

答:这段代码用于获取最小时间戳minTime,并且获取偏移量os。最小时间戳作为下一次查询的最大时间戳,offset是下一次查询时跳过的元素个数(默认为一,因为最大时间戳是上一次遍历的最小时间戳,所以查询时起码要跳过这1条)。设置偏移量是为了防止多条动态发布的时间戳一样的情况下,记录已经获取了的相同最小时间戳的动态的条数。在接着往下查询时会跳过偏移量数量的动态。这段代码是for循环,所以从前往后遍历,所以需要在time不等于minTime的情况下更新minTime并重置os,因为越后面时间戳越小。

2.reverseRangeByScoreWithScores 方法

reverseRangeByScoreWithScores 方法用于从 Redis 的有序集合(ZSet)中,按照分数范围从高到低获取元素及其分数。
参数说明:

  • key: 指定操作的 ZSet 键。
  • min 和 max: 分别表示分数范围的下限和上限(这里是 0 和 max)。
  • offset: 表示跳过的元素个数,即从符合条件的元素中跳过前面的 offset 个元素。
  • count: 表示最多返回的元素个数(这里是 3)。

七、附近商户

1.GEO数据结构的基本用法

GEO就是Geolocation的简写形式,代表地理坐标。Redis在3.2版本中加入了对GEO的支持,允许存储地理坐标信息,帮助我们根据经纬度来检索数据。常见的命令有:

  • GEOADD:添加一个地理空间信息,包含:经度(longitude)、纬度(latitude)、值(member)

    geoadd geo:1 10 10 dongyang 10.1 10.1 yiwu

  • GEODIST:计算指定的两个点之间的距离并返回

    geodist geo:1 dongyang yiwu km

    image-20250408202246048

  • GEOHASH:将指定member的坐标转为hash字符串形式并返回

    geohash geo:1 dongyang

    image-20250408202434818

  • GEOPOS:返回指定member的坐标

    image-20250408202457694

  • GEORADIUS:指定圆心、半径,找到该圆内包含的所有member,并按照与圆心之间的距离排序后返回。6.以后已废弃

  • GEOSEARCH:在指定范围内搜索member,并按照与指定点之间的距离排序后返回。范围可以是圆形或矩形。6.2.新功能

    image-20250408204702412

  • GEOSEARCHSTORE:与GEOSEARCH功能一致,不过可以把结果存储到一个指定的key。 6.2.新功能

注意:GEO数据结构本质就是ZSet数据结构

image-20250408201915030

2、 导入店铺数据到GEO

具体场景说明:

1653822036941

当我们点击美食之后,会出现一系列的商家,商家中可以按照多种排序方式,我们此时关注的是距离,这个地方就需要使用到我们的GEO,向后台传入当前app收集的地址(我们此处是写死的) ,以当前坐标作为圆心,同时绑定相同的店家类型type,以及分页信息,把这几个条件传入后台,后台查询出对应的数据再返回。

1653822021827

我们要做的事情是:将数据库表中的数据导入到redis中去,redis中的GEO,GEO在redis中就一个menber和一个经纬度,我们把x和y轴传入到redis做的经纬度位置去,但我们不能把所有的数据都放入到menber中去,毕竟作为redis是一个内存级数据库,如果存海量数据,redis还是力不从心,所以我们在这个地方存储他的id即可。

但是这个时候还有一个问题,就是在redis中并没有存储type,所以我们无法根据type来对数据进行筛选,所以我们可以按照商户类型做分组,类型相同的商户作为同一组,以typeId为key存入同一个GEO集合中即可

代码

HmDianPingApplicationTests

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
@Test
void loadShopData() {
// 1.查询店铺信息
List<Shop> list = shopService.list();
// 2.把店铺分组,按照typeId分组,typeId一致的放到一个集合
Map<Long, List<Shop>> map = list.stream().collect(Collectors.groupingBy(Shop::getTypeId));
// 3.分批完成写入Redis
for (Map.Entry<Long, List<Shop>> entry : map.entrySet()) {
// 3.1.获取类型id
Long typeId = entry.getKey();
String key = SHOP_GEO_KEY + typeId;
// 3.2.获取同类型的店铺的集合
List<Shop> value = entry.getValue();
List<RedisGeoCommands.GeoLocation<String>> locations = new ArrayList<>(value.size());
// 3.3.写入redis GEOADD key 经度 纬度 member
for (Shop shop : value) {
//方法1,每得到一个地理信息就添加到redis
// stringRedisTemplate.opsForGeo().add(key, new Point(shop.getX(), shop.getY()), shop.getId().toString());
//方法2,先存到集合里,最后再一次性添加到redis中
locations.add(new RedisGeoCommands.GeoLocation<>(
shop.getId().toString(),
new Point(shop.getX(), shop.getY())
));
}
//方法2,一次性添加
stringRedisTemplate.opsForGeo().add(key, locations);
}
}

代码详解

1.stream流分组

1
Map<Long, List<Shop>> collect = list.stream().collect(Collectors.groupingBy(Shop::getTypeId));

熟悉stream流的分组方法。

2.Geo相关api

看代码注释。

3 附近商户-实现附近商户功能

SpringDataRedis的2.3.9版本并不支持Redis 6.2提供的GEOSEARCH命令,因此我们需要提示其版本,修改自己的POM

第一步:导入pom

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-redis</artifactId>
<exclusions>
<exclusion>
<artifactId>spring-data-redis</artifactId>
<groupId>org.springframework.data</groupId>
</exclusion>
<exclusion>
<artifactId>lettuce-core</artifactId>
<groupId>io.lettuce</groupId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.springframework.data</groupId>
<artifactId>spring-data-redis</artifactId>
<version>2.6.2</version>
</dependency>
<dependency>
<groupId>io.lettuce</groupId>
<artifactId>lettuce-core</artifactId>
<version>6.1.6.RELEASE</version>
</dependency>

第二步:

ShopController

1
2
3
4
5
6
7
8
9
@GetMapping("/of/type")
public Result queryShopByType(
@RequestParam("typeId") Integer typeId,
@RequestParam(value = "current", defaultValue = "1") Integer current,
@RequestParam(value = "x", required = false) Double x,
@RequestParam(value = "y", required = false) Double y
) {
return shopService.queryShopByType(typeId, current, x, y);
}

ShopServiceImpl

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
@Override
public Result queryShopByType(Integer typeId, Integer current, Double x, Double y) {
// 1.判断是否需要根据坐标查询
if (x == null || y == null) {
// 不需要坐标查询,按数据库查询
Page<Shop> page = query()
.eq("type_id", typeId)
.page(new Page<>(current, SystemConstants.DEFAULT_PAGE_SIZE));
// 返回数据
return Result.ok(page.getRecords());
}

// 2.计算分页参数
int from = (current - 1) * SystemConstants.DEFAULT_PAGE_SIZE;
int end = current * SystemConstants.DEFAULT_PAGE_SIZE;

// 3.查询redis、按照距离排序、分页。结果:shopId、distance
String key = SHOP_GEO_KEY + typeId;
GeoResults<RedisGeoCommands.GeoLocation<String>> results = stringRedisTemplate.opsForGeo() // GEOSEARCH key BYLONLAT x y BYRADIUS 10 WITHDISTANCE
.search(
key,
GeoReference.fromCoordinate(x, y), //以(x, y)为坐标点
new Distance(5000), //搜素范围,5000米
RedisGeoCommands.GeoSearchCommandArgs.newGeoSearchArgs().includeDistance().limit(end)// limit表示返回的结果数量,includeDistance表示返回结果包含与中心点距离
);
// 4.解析出id
if (results == null) {
return Result.ok(Collections.emptyList());
}
List<GeoResult<RedisGeoCommands.GeoLocation<String>>> list = results.getContent();
if (list.size() <= from) {
// 没有下一页了,结束
return Result.ok(Collections.emptyList());
}
// 4.1.截取 from ~ end的部分
List<Long> ids = new ArrayList<>(list.size());
Map<String, Distance> distanceMap = new HashMap<>(list.size());
list.stream().skip(from).forEach(result -> {
// 4.2.获取店铺id
String shopIdStr = result.getContent().getName();
ids.add(Long.valueOf(shopIdStr));
// 4.3.获取距离
Distance distance = result.getDistance();
distanceMap.put(shopIdStr, distance);
});
// 5.根据id查询Shop
String idStr = StrUtil.join(",", ids);
List<Shop> shops = query().in("id", ids).last("ORDER BY FIELD(id," + idStr + ")").list();
for (Shop shop : shops) {
shop.setDistance(distanceMap.get(shop.getId().toString()).getValue());
}
// 6.返回
return Result.ok(shops);
}

代码详解

1.geoSearch方法看代码注释

new Distance(5000), 默认单位为米。若指定,则可以new Distance(50000, Metrics.KILOMETERS),代表50000公里。所以,多用ctrl+p查看有什么重载方法

2.stream流的过滤

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
 List<Long> ids = new ArrayList<>();
Map<String, Distance> distanceMap = new HashMap<>();
content.stream().skip(from).forEach(result -> {
String shopIdStr = result.getContent().getName();
ids.add(Long.valueOf(shopIdStr));
distanceMap.put(shopIdStr, result.getDistance());
});

//5.根据id查询shop
String idStr = StrUtil.join(",", ids);
List<Shop> shopList = query().in("id", ids).last("ORDER BY FIELD( id, " + idStr + ")").list();
for (Shop shop : shopList) {
shop.setDistance(distanceMap.get(shop.getId().toString()).getValue());
System.out.println(shop.getDistance());
}

因为是分页查询,虽然查了end个,但最后还是得跳过前面的from个数量。

同时,为了按位置顺序排列,使用forEach遍历时,要按序存储id到ids列表为sql查询作排序,此外,为了记录距离,还需要设置一个distanceMap存储对应id的离中心点的距离,方便最后查找出结果后再添加上距离。

八、用户签到

用户

1.BitMap数据结构

  • SETBIT:向指定位置(offset)存入一个0或1

    setbit sign:202504:1 0 1

  • GETBIT :获取指定位置(offset)的bit值

    getbit sign:202504:1 0

  • BITCOUNT :统计BitMap中值为1的bit位的数量

    bitcount sign:202504:1 0 4 闭包统计,即[0-4]

  • BITFIELD :操作(查询、修改、自增)BitMap中bit数组中的指定位置(offset)的值

    bitfield的命令选项有些多

    1.get选项,假设当前bit存储内容为10001000

    bitfield sign:202504:1 get u8 0,获取偏移量0开始的8位无符号整数。得136

    bitfield sign:202504:1 get u8 4,获取偏移量4开始的8位无符号整数,末尾自动补零。得128

    image-20250409004301279

    2.set,选项

    1
    2
    # 设置位于偏移量0的8位无符号整数为10,即 00001010
    BITFIELD mykey SET u8 0 10
  • BITFIELD_RO :获取BitMap中bit数组,并以十进制形式返回

    image-20250409004911234

  • BITOP :将多个BitMap的结果做位运算(与 、或、异或)

  • BITPOS :查找bit数组中指定范围内第一个0或1出现的位置

注意:BitMap本质上是String

2.实现签到功能

需求:实现签到接口,将当前用户当天签到信息保存到Redis中

思路:我们可以把年和月作为bitMap的key,然后保存到一个bitMap中,每次签到就到对应的位上把数字从0变成1,只要对应是1,就表明说明这一天已经签到了,反之则没有签到。

我们通过接口文档发现,此接口并没有传递任何的参数,没有参数怎么确实是哪一天签到呢?这个很容易,可以通过后台代码直接获取即可,然后到对应的地址上去修改bitMap。

1653833970361

代码

UserController

1
2
3
4
@PostMapping("/sign")
public Result sign(){
return userService.sign();
}

UserServiceImpl

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
@Override
public Result sign() {
// 1.获取当前登录用户
Long userId = UserHolder.getUser().getId();
// 2.获取日期
LocalDateTime now = LocalDateTime.now();
// 3.拼接key
String keySuffix = now.format(DateTimeFormatter.ofPattern(":yyyyMM"));
String key = USER_SIGN_KEY + userId + keySuffix;
// 4.获取今天是本月的第几天
int dayOfMonth = now.getDayOfMonth();
// 5.写入Redis SETBIT key offset 1
stringRedisTemplate.opsForValue().setBit(key, dayOfMonth - 1, true);
return Result.ok();
}

以年月为key进行存储,因为dayOfMonth是从1开始的,所以需要减一从0索引开始存储。

3. 签到统计

问题1:什么叫做连续签到天数?
从最后一次签到开始向前统计,直到遇到第一次未签到为止,计算总的签到次数,就是连续签到天数。

1653834455899

Java逻辑代码:获得当前这个月的最后一次签到数据,定义一个计数器,然后不停的向前统计,直到获得第一个非0的数字即可,每得到一个非0的数字计数器+1,直到遍历完所有的数据,就可以获得当前月的签到总天数了

问题2:如何得到本月到今天为止的所有签到数据?

BITFIELD key GET u[dayOfMonth] 0

假设今天是10号,那么我们就可以从当前月的第一天开始,获得到当前这一天的位数,是10号,那么就是10位,去拿这段时间的数据,就能拿到所有的数据了,那么这10天里边签到了多少次呢?统计有多少个1即可。

问题3:如何从后向前遍历每个bit位?

注意:bitMap返回的数据是10进制,哪假如说返回一个数字8,那么我哪儿知道到底哪些是0,哪些是1呢?我们只需要让得到的10进制数字和1做与运算就可以了,因为1只有遇见1 才是1,其他数字都是0 ,我们把签到结果和1进行与操作,每与一次,就把签到结果向右移动一位,依次内推,我们就能完成逐个遍历的效果了。

需求:实现下面接口,统计当前用户截止当前时间在本月的连续签到天数

有用户有时间我们就可以组织出对应的key,此时就能找到这个用户截止这天的所有签到记录,再根据这套算法,就能统计出来他连续签到的次数了

1653835784444

代码

UserController

1
2
3
4
@GetMapping("/sign/count")
public Result signCount(){
return userService.signCount();
}

UserServiceImpl

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
@Override
public Result signCount() {
// 1.获取当前登录用户
Long userId = UserHolder.getUser().getId();
// 2.获取日期
LocalDateTime now = LocalDateTime.now();
// 3.拼接key
String keySuffix = now.format(DateTimeFormatter.ofPattern(":yyyyMM"));
String key = USER_SIGN_KEY + userId + keySuffix;
// 4.获取今天是本月的第几天
int dayOfMonth = now.getDayOfMonth();
// 5.获取本月截止今天为止的所有的签到记录,返回的是一个十进制的数字 BITFIELD sign:5:202203 GET u14 0
List<Long> result = stringRedisTemplate.opsForValue().bitField(
key,
BitFieldSubCommands.create()
.get(BitFieldSubCommands.BitFieldType.unsigned(dayOfMonth)).valueAt(0)
);
if (result == null || result.isEmpty()) {
// 没有任何签到结果
return Result.ok(0);
}
Long num = result.get(0);
if (num == null || num == 0) {
return Result.ok(0);
}
// 6.循环遍历
int count = 0;
while (true) {
// 6.1.让这个数字与1做与运算,得到数字的最后一个bit位 // 判断这个bit位是否为0
if ((num & 1) == 0) {
// 如果为0,说明未签到,结束
break;
}else {
// 如果不为0,说明已签到,计数器+1
count++;
}
// 把数字右移一位,抛弃最后一个bit位,继续下一个bit位
num >>>= 1;
}
return Result.ok(count);
}

代码详解

1
2
3
4
5
List<Long> result = stringRedisTemplate.opsForValue().bitField(
key,
BitFieldSubCommands.create()
.get(BitFieldSubCommands.BitFieldType.unsigned(dayOfMonth)).valueAt(0)
);

其中的BitFieldSubCommands.BitFieldType.unsigned(dayOfMonth)).valueAt(0)用于设置读取方式为无符号的当前天数,偏移量为0。得到的是10进制的整数,从list中取第一个即可。

2.统计连续签到。

利用十进制转二进制的比较进行。每次都进行最低位&1比较,为1则结果加1并将该数往右移一位。若为0则停止统计。

九、UV统计

使用HyperLogLog数据结构进行访问量统计

1.HyperLogLog数据结构

先我们搞懂两个概念:

  • UV:全称Unique Visitor,也叫独立访客量,是指通过互联网访问、浏览这个网页的自然人。1天内同一个用户多次访问该网站,只记录1次。
  • PV:全称Page View,也叫页面访问量或点击量,用户每访问网站的一个页面,记录1次PV,用户多次打开页面,则记录多次PV。往往用来衡量网站的流量。

通常来说UV会比PV大很多,所以衡量同一个网站的访问量,我们需要综合考虑很多因素,所以我们只是单纯的把这两个值作为一个参考值

UV统计在服务端做会比较麻烦,因为要判断该用户是否已经统计过了,需要将统计过的用户信息保存。但是如果每个访问的用户都保存到Redis中,数据量会非常恐怖,那怎么处理呢?

Hyperloglog(HLL)是从Loglog算法派生的概率算法,用于确定非常大的集合的基数,而不需要存储其所有值。相关算法原理大家可以参考:https://juejin.cn/post/6844903785744056333#heading-0
Redis中的HLL是基于string结构实现的,单个HLL的内存永远小于16kb内存占用低的令人发指!作为代价,其测量结果是概率性的,有小于0.81%的误差。不过对于UV统计来说,这完全可以忽略。

1653837988985

1.pfadd

pfadd myuv 1 pfadd myuv2 3

2.pfcount

pfcount myuv 结果:1

3.pfmerge

pfmerge myuv myuv2

pfcount myuv 结果:2

注意,HyperLogLog底层是String类型

测试思路:我们直接利用单元测试,向HyperLogLog中添加100万条数据,看看内存占用和统计效果如何

1653838053608

经过测试:我们会发生他的误差是在允许范围内,并且内存占用极小

完结

至此,黑马点评的复习结束了。总算是复习完成了,现在是2025/4/9 1:08。比预想的晚了2天才结束,从关注开始就比较快速的过一遍了,不过也算够了吧,但也还是把关键代码都再写了一遍,也算是仓促吧,后面开始复习微服务了,还要背八股/(ㄒoㄒ)/~~。加油!!!