RabbitMq消息队列

总结参考黑马教程day06-MQ基础 - 飞书云文档

1.概述

RabbitMQ是基于Erlang语言开发的开源消息通信中间件,官网地址:

https://www.rabbitmq.com/

使用背景:

同步调用

在微服务项目中,我们使用的OpenFeign都是同步调用,可以理解为阻塞,只有当前任务执行完毕才能往下执行,同步调用存在以下问题:

第一拓展性差

在大多数电商业务中,用户支付成功后都会以短信或者其它方式通知用户,告知支付成功。假如后期产品经理提出这样新的需求,你怎么办?是不是要在上述业务中再加入通知用户的业务?

某些电商项目中,还会有积分或金币的概念。假如产品经理提出需求,用户支付成功后,给用户以积分奖励或者返还金币,你怎么办?是不是要在上述业务中再加入积分业务、返还金币业务?

image-20250226170410101

由此可见,服务会越来越臃肿。

第二性能下降

由于我们采用了同步调用,调用者需要等待服务提供者执行完返回结果后,才能继续向下执行,也就是说每次远程调用,调用者都是阻塞等待状态。最终整个业务的响应时长就是每次远程调用的执行时长之和。

image-20250226170342094

第三,级联失败

由于我们是基于OpenFeign调用交易服务、通知服务。当交易服务、通知服务出现故障时,整个事务都会回滚,交易失败。

这其实就是同步调用的级联失败问题。

比方说一个订单支付业务,用户支付成功后还会获得积分,但积分服务出现了问题,导致整个事务回滚,用户的钱又退回去了,这怎么行呢,毕竟拿来的钱哪有还回去的道理。因此,不能因为个别业务问题而导致整个事务的回滚。

异步调用

至于如何解决上述问题,这就用到了消息队列的异步调用了。

异步调用方式其实就是基于消息通知的方式,一般包含三个角色:

  • 消息发送者(Publisher):投递消息的人,就是原来的调用方
  • 消息Broker(Exchange、Queue):管理、暂存、转发消息,你可以把它理解成微信服务器
  • 消息接收者(Consumer):接收和处理消息的人,就是原来的服务提供方

在异步调用中,发送者不再直接同步调用接收者的业务接口,而是发送一条消息投递给消息Broker。然后接收者根据自己的需求从消息Broker那里订阅消息。每当发送方发送消息后,接受者都能获取消息并处理。

image-20250226171254619

这样,发送消息的人和接收消息的人就完全解耦了。发送消息的人只需要发消息,至于接收者怎么接收、是否接收成功都与发送者无关了。

通过消息队列,可以改变服务的架构,如下支付业务所示

image-20250226171542545

除了扣减余额、更新支付流水单状态以外,其它调用逻辑全部取消。而是改为发送一条消息到Broker。而相关的微服务都可以订阅消息通知,一旦消息到达Broker,则会分发给每一个订阅了的微服务,处理各自的业务。

并且不管后期增加了多少消息订阅者,作为支付服务来讲,执行问扣减余额、更新支付流水状态后,发送消息即可。业务耗时仅仅是这三部分业务耗时,仅仅100ms,大大提高了业务性能。

另外,不管是交易服务、通知服务,还是积分服务,他们的业务与支付关联度低。现在采用了异步调用,解除了耦合,他们即便执行过程中出现了故障,也不会影响到支付服务。

综上,异步调用的优势包括:

  • 耦合度更低
  • 性能更好
  • 业务拓展性强
  • 故障隔离,避免级联失败

2.基本使用

1.基础用法

1.安装

基于Docker来安装RabbitMQ,使用下面的命令即可:

1
2
3
4
5
6
7
8
9
10
11
docker run \
-e RABBITMQ_DEFAULT_USER=itheima \
-e RABBITMQ_DEFAULT_PASS=123321 \
-v mq-plugins:/plugins \
--name mq \
--hostname mq \
-p 15672:15672 \
-p 5672:5672 \
--network hm-net\
-d \
rabbitmq:3.8-management

安装完成后,我们访问 http://192.168.10.101:15672(注意根据自己的虚拟机访问ip调整)即可看到管理控制台。首次访问需要登录,默认的用户名和密码在配置文件中已经指定了。

2.基本概念

RabbitMq的基本架构如图所示。

image-20250226172318386

其中包含几个概念:

  • **publisher**:生产者,也就是发送消息的一方
  • **consumer**:消费者,也就是消费消息的一方
  • **queue**:队列,存储消息。生产者投递的消息会暂存在消息队列中,等待消费者处理
  • **exchange**:交换机,负责消息路由。生产者发送的消息由交换机决定投递到哪个队列。
  • **virtual host**:虚拟主机,起到数据隔离的作用。每个虚拟主机相互独立,有各自的exchange、queue

RabbitMQ管理控制台看黑马教程day06-MQ基础 - 飞书云文档

交换机exchange根据routekey路由消息到指定的队列queue。

direct

3.SpringAMQP

将来我们开发业务功能的时候,肯定不会在控制台收发消息,而是应该基于编程的方式。由于RabbitMQ采用了AMQP协议,因此它具备跨语言的特性。任何语言只要遵循AMQP协议收发消息,都可以与RabbitMQ交互。并且RabbitMQ官方也提供了各种不同语言的客户端。

但是,RabbitMQ官方提供的Java客户端编码相对复杂,一般生产环境下我们更多会结合Spring来使用。而Spring的官方刚好基于RabbitMQ提供了这样一套消息收发的模板工具:SpringAMQP。并且还基于SpringBoot对其实现了自动装配,使用起来非常方便。

SpringAmqp的官方地址:

https://spring.io/projects/spring-amqp

1
2
3
4
5
<!--AMQP依赖,包含RabbitMQ-->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>

SpringAMQP提供了三个功能:

  • 自动声明队列、交换机及其绑定关系 (在配置类中使用@Bean声明或者在监听器注解@RabbitListener中声明)
  • 基于注解的监听器模式,异步接收消息 (使用@RabbitListerner注解)
  • 封装了RabbitTemplate工具,用于发送消息 (使用时直接注入即可)

队列、交换机、绑定关系的声明方法有两种,如下所示

1.基于配置类,以Direct交换机为例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
@Configuration
public class DirectConfig {
@Bean
public DirectExchange directExchange(){
return ExchangeBuilder.directExchange("hmall.direct").build();
}
@Bean
public Queue directQueue(){
return QueueBuilder.durable("direct.queue").build();
}
@Bean
public Binding bindingQueueToDirect1(DirectExchange directExchange, Queue directQueue ){
return BindingBuilder.bind(directQueue).to(directExchange).with("red");
}
}

2.基于监听器注解@RabbitListener的bindings属性

image-20250226173122373

3.WorkQueue(工作队列)

简单来说就是多个消费者绑定到一个队列,共同消费队列里的消息。

  • 默认情况下,消息是轮询(Round-Robin)分配给各个消费者的。也就是说,消息会依次分配给每个消费者,而不是提前平均分配。每个消费者依次从队列中获取消息进行处理。
  • 能者多劳:在某些情况下,你可能希望消费者根据自己的处理能力来消费消息,而不是简单地轮询分配。这时可以通过配置来实现“能者多劳”的效果。在 RabbitMQ 中,可以通过设置 prefetch 参数来控制每个消费者一次最多可以获取多少条消息。如果某个消费者处理速度较快,它可以更快地处理完当前的消息并获取新的消息,从而实现“能者多劳”的效果。配置消息如下
1
2
3
4
5
spring:
rabbitmq:
listener:
simple:
prefetch: 1 # 每次只能获取一条消息,处理完成才能获取下一个消息

总结

  • 多个消费者绑定到一个队列,同一条消息只会被一个消费者处理
  • 通过设置prefetch来控制消费者预取的消息数量

4.交换机

交换机的作用:

  • 接收publisher发送的消息
  • 将消息按照规则路由到与之绑定的队列中

注意,交换机不能存储消息,若路由失败,则消息丢失

1.Fanout交换机

作用:广播,将消息路由给所有与该交换机绑定的队列中。

fanout

2.Direct交换机

作用:定向,根据消息的RoutingKey判断路由给哪个(些)队列。

消息发送时需指定routingkey(路由key)。direct交换机会将信息路由到具有相同的路由key的队列中。一个队列可以设置多个路由key,每个队列路由key可以相同,若相同则接收时都会接收到消息。

描述下Direct交换机与Fanout交换机的差异?

  • Fanout交换机将消息路由给每一个与之绑定的队列

  • Direct交换机根据RoutingKey判断路由给哪个队列

  • 如果多个队列具有相同的RoutingKey,则效果和Fanout类似

    direct

3.Topic交换机

作用:加强版定向,可以让队列在绑定routingkey时使用通配符。

通配符规则:

  • #:匹配0个或多个词
  • *:匹配不多不少恰好1个词

举例:

  • item.#:能够匹配item.spu.insert 或者 item.spu
  • item.*:只能匹配item.spu

假如此时publisher发送的消息使用的RoutingKey共有四种:

  • china.news 代表有中国的新闻消息;
  • china.weather 代表中国的天气消息;
  • japan.news 则代表日本新闻
  • japan.weather 代表日本的天气消息;

那么:

  • topic.queue1:绑定的是china.# ,凡是以 china.开头的routing key 都会被匹配到,包括:
    • china.news
    • china.weather
  • topic.queue2:绑定的是#.news ,凡是以 .news结尾的 routing key 都会被匹配。包括:
    • china.news
    • japan.news

描述下Direct交换机与Topic交换机的差异?

  • Direct交换机是路由完全匹配规则,Topic交换机则是模式匹配(支持通配符)
  • Topic交换机与队列绑定时的bindingKey可以指定通配符
  • #:代表0个或多个词
  • *:代表1个词

消息队列高级篇

1.发送者可靠性

1.生产者重试机制

2生产者确认机制

2.MQ的可靠性

1.数据持久化

2.Lazy-Queue惰性队列

3.消费者可靠性

1.消费者确认机制

2.失败重试机制

3.失败处理策略

消息处理失败后重新投递到处理消息失败的交换机当中,专门处理

4.业务幂等

解决消息重复处理问题

3.延迟消息

1.死信交换机和延迟消息

1.死信交换机

什么是死信?

当一个队列中的消息满足下列情况之一时,可以成为死信(dead letter):

  • 消费者使用basic.rejectbasic.nack声明消费失败,并且消息的requeue参数设置为false
  • 消息是一个过期消息,超时无人消费
  • 要投递的队列消息满了,无法投递

通过给队列添加dead-letter-exchange属性指定的交换机就是死信交换机。当该队列的消息成为死信后,即会把消息投递到这个指定的交换机当中。死信交换机配置如下所示:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
@Configuration
public class DeadLetterConfig {
@Bean
public DirectExchange dlxExchange(){
return ExchangeBuilder.directExchange("dlx.direct").build();
}

@Bean
public DirectExchange normalExchange(){
return new DirectExchange("normal.direct");
}
@Bean
public Queue dlxqueue(){
return QueueBuilder.durable("dlx.queue").build();
}

@Bean
public Queue normalQueue(){
return QueueBuilder.durable("normal.queue").deadLetterExchange("dlx.direct").build();
}
@Bean
public Binding dlxBinding(Queue dlxqueue, DirectExchange dlxExchange){
return BindingBuilder.bind(dlxqueue).to(dlxExchange).with("dlx");
}
@Bean
public Binding normalBinding(Queue normalQueue, DirectExchange normalExchange){

return BindingBuilder.bind(normalQueue).to(normalExchange).with("dlx");
}

}

image-20250226185117798

死信交换机有什么作用呢?

  1. 收集那些因处理失败而被拒绝的消息
  2. 收集那些因队列满了而被拒绝的消息
  3. 收集因TTL(有效期)到期的消息

2.延迟消息

延迟消息是在publisher发送消息到交换机时给消息设置有效期expiration。则当正常队列里的消息没有被消费且过期时,会投递到死信交换机当中。使用场景比如订单限时支付,订单创建时就把消息(附带上过期时间)投递给正常交换机,同时不给正常队列设置消费者,到达过期时间则会投递到死信交换机,交给删除订单的listener处理。

2.DelayExchange插件

基于死信队列虽然可以实现延迟消息,但是太麻烦了。因此RabbitMQ社区提供了一个延迟消息插件来实现相同的效果。

官方文档说明:

https://blog.rabbitmq.com/posts/2015/04/scheduling-messages-with-rabbitmq

插件下载地址:

https://github.com/rabbitmq/rabbitmq-delayed-message-exchange

因为我们是基于Docker安装,所以需要先查看RabbitMQ的插件目录对应的数据卷。

1
docker volume inspect mq-plugins

结果如下:

1
2
3
4
5
6
7
8
9
10
11
[
{
"CreatedAt": "2024-06-19T09:22:59+08:00",
"Driver": "local",
"Labels": null,
"Mountpoint": "/var/lib/docker/volumes/mq-plugins/_data",
"Name": "mq-plugins",
"Options": null,
"Scope": "local"
}
]

插件目录被挂载到了/var/lib/docker/volumes/mq-plugins/_data这个目录,我们上传插件到该目录下。若不是,则根据自己定义的挂载目录上传。

接下来进入插件目录后执行命令,安装插件:

1
docker exec -it mq rabbitmq-plugins enable rabbitmq_delayed_message_exchange

1.声明延迟交换机

基于注解方式:

1
2
3
4
5
6
7
8
@RabbitListener(bindings = @QueueBinding(
value = @Queue(name = "delay.queue", durable = "true"),
exchange = @Exchange(name = "delay.direct", delayed = "true"),
key = "delay"
))
public void listenDelayMessage(String msg){
log.info("接收到delay.queue的延迟消息:{}", msg);
}

基于@Bean的方式:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
package com.itheima.consumer.config;

import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.*;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Slf4j
@Configuration
public class DelayExchangeConfig {

@Bean
public DirectExchange delayExchange(){
return ExchangeBuilder
.directExchange("delay.direct") // 指定交换机类型和名称
.delayed() // 设置delay的属性为true
.durable(true) // 持久化
.build();
}

@Bean
public Queue delayedQueue(){
return new Queue("delay.queue");
}

@Bean
public Binding delayQueueBinding(){
return BindingBuilder.bind(delayedQueue()).to(delayExchange()).with("delay");
}
}

2.发送延迟消息

发送消息时,必须通过x-delay属性设定延迟时间:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
@Test
void testPublisherDelayMessage() {
// 1.创建消息
String message = "hello, delayed message";
// 2.发送消息,利用消息后置处理器添加消息头
rabbitTemplate.convertAndSend("delay.direct", "delay", message, new MessagePostProcessor() {
@Override
public Message postProcessMessage(Message message) throws AmqpException {
// 添加延迟消息属性
message.getMessageProperties().setDelay(5000);
return message;
}
});
}