RabbitMq
RabbitMq消息队列
总结参考黑马教程day06-MQ基础 - 飞书云文档
1.概述
RabbitMQ是基于Erlang语言开发的开源消息通信中间件,官网地址:
使用背景:
同步调用
在微服务项目中,我们使用的OpenFeign都是同步调用,可以理解为阻塞,只有当前任务执行完毕才能往下执行,同步调用存在以下问题:
第一,拓展性差
在大多数电商业务中,用户支付成功后都会以短信或者其它方式通知用户,告知支付成功。假如后期产品经理提出这样新的需求,你怎么办?是不是要在上述业务中再加入通知用户的业务?
某些电商项目中,还会有积分或金币的概念。假如产品经理提出需求,用户支付成功后,给用户以积分奖励或者返还金币,你怎么办?是不是要在上述业务中再加入积分业务、返还金币业务?
由此可见,服务会越来越臃肿。
第二,性能下降
由于我们采用了同步调用,调用者需要等待服务提供者执行完返回结果后,才能继续向下执行,也就是说每次远程调用,调用者都是阻塞等待状态。最终整个业务的响应时长就是每次远程调用的执行时长之和。
第三,级联失败
由于我们是基于OpenFeign调用交易服务、通知服务。当交易服务、通知服务出现故障时,整个事务都会回滚,交易失败。
这其实就是同步调用的级联失败问题。
比方说一个订单支付业务,用户支付成功后还会获得积分,但积分服务出现了问题,导致整个事务回滚,用户的钱又退回去了,这怎么行呢,毕竟拿来的钱哪有还回去的道理。因此,不能因为个别业务问题而导致整个事务的回滚。
异步调用
至于如何解决上述问题,这就用到了消息队列的异步调用了。
异步调用方式其实就是基于消息通知的方式,一般包含三个角色:
- 消息发送者(Publisher):投递消息的人,就是原来的调用方
- 消息Broker(Exchange、Queue):管理、暂存、转发消息,你可以把它理解成微信服务器
- 消息接收者(Consumer):接收和处理消息的人,就是原来的服务提供方
在异步调用中,发送者不再直接同步调用接收者的业务接口,而是发送一条消息投递给消息Broker。然后接收者根据自己的需求从消息Broker那里订阅消息。每当发送方发送消息后,接受者都能获取消息并处理。
这样,发送消息的人和接收消息的人就完全解耦了。发送消息的人只需要发消息,至于接收者怎么接收、是否接收成功都与发送者无关了。
通过消息队列,可以改变服务的架构,如下支付业务所示
除了扣减余额、更新支付流水单状态以外,其它调用逻辑全部取消。而是改为发送一条消息到Broker。而相关的微服务都可以订阅消息通知,一旦消息到达Broker,则会分发给每一个订阅了的微服务,处理各自的业务。
并且不管后期增加了多少消息订阅者,作为支付服务来讲,执行问扣减余额、更新支付流水状态后,发送消息即可。业务耗时仅仅是这三部分业务耗时,仅仅100ms,大大提高了业务性能。
另外,不管是交易服务、通知服务,还是积分服务,他们的业务与支付关联度低。现在采用了异步调用,解除了耦合,他们即便执行过程中出现了故障,也不会影响到支付服务。
综上,异步调用的优势包括:
- 耦合度更低
- 性能更好
- 业务拓展性强
- 故障隔离,避免级联失败
2.基本使用
1.基础用法
1.安装
基于Docker来安装RabbitMQ,使用下面的命令即可:
1 | docker run \ |
安装完成后,我们访问 http://192.168.10.101:15672(注意根据自己的虚拟机访问ip调整)即可看到管理控制台。首次访问需要登录,默认的用户名和密码在配置文件中已经指定了。
2.基本概念
RabbitMq的基本架构如图所示。
其中包含几个概念:
- **
publisher
**:生产者,也就是发送消息的一方 - **
consumer
**:消费者,也就是消费消息的一方 - **
queue
**:队列,存储消息。生产者投递的消息会暂存在消息队列中,等待消费者处理 - **
exchange
**:交换机,负责消息路由。生产者发送的消息由交换机决定投递到哪个队列。 - **
virtual host
**:虚拟主机,起到数据隔离的作用。每个虚拟主机相互独立,有各自的exchange、queue
RabbitMQ管理控制台看黑马教程day06-MQ基础 - 飞书云文档
交换机exchange根据routekey路由消息到指定的队列queue。
3.SpringAMQP
将来我们开发业务功能的时候,肯定不会在控制台收发消息,而是应该基于编程的方式。由于RabbitMQ
采用了AMQP协议,因此它具备跨语言的特性。任何语言只要遵循AMQP协议收发消息,都可以与RabbitMQ
交互。并且RabbitMQ
官方也提供了各种不同语言的客户端。
但是,RabbitMQ官方提供的Java客户端编码相对复杂,一般生产环境下我们更多会结合Spring来使用。而Spring的官方刚好基于RabbitMQ提供了这样一套消息收发的模板工具:SpringAMQP。并且还基于SpringBoot对其实现了自动装配,使用起来非常方便。
SpringAmqp的官方地址:
https://spring.io/projects/spring-amqp
1 | <!--AMQP依赖,包含RabbitMQ--> |
SpringAMQP提供了三个功能:
- 自动声明队列、交换机及其绑定关系 (在配置类中使用@Bean声明或者在监听器注解@RabbitListener中声明)
- 基于注解的监听器模式,异步接收消息 (使用@RabbitListerner注解)
- 封装了RabbitTemplate工具,用于发送消息 (使用时直接注入即可)
队列、交换机、绑定关系的声明方法有两种,如下所示
1.基于配置类,以Direct交换机为例
1 |
|
2.基于监听器注解@RabbitListener的bindings属性
3.WorkQueue(工作队列)
简单来说就是多个消费者绑定到一个队列,共同消费队列里的消息。
- 默认情况下,消息是轮询(Round-Robin)分配给各个消费者的。也就是说,消息会依次分配给每个消费者,而不是提前平均分配。每个消费者依次从队列中获取消息进行处理。
- 能者多劳:在某些情况下,你可能希望消费者根据自己的处理能力来消费消息,而不是简单地轮询分配。这时可以通过配置来实现“能者多劳”的效果。在 RabbitMQ 中,可以通过设置
prefetch
参数来控制每个消费者一次最多可以获取多少条消息。如果某个消费者处理速度较快,它可以更快地处理完当前的消息并获取新的消息,从而实现“能者多劳”的效果。配置消息如下
1 | spring: |
总结
- 多个消费者绑定到一个队列,同一条消息只会被一个消费者处理
- 通过设置prefetch来控制消费者预取的消息数量
4.交换机
交换机的作用:
- 接收publisher发送的消息
- 将消息按照规则路由到与之绑定的队列中
注意,交换机不能存储消息,若路由失败,则消息丢失
1.Fanout交换机
作用:广播,将消息路由给所有与该交换机绑定的队列中。
2.Direct交换机
作用:定向,根据消息的RoutingKey判断路由给哪个(些)队列。
消息发送时需指定routingkey(路由key)。direct交换机会将信息路由到具有相同的路由key的队列中。一个队列可以设置多个路由key,每个队列路由key可以相同,若相同则接收时都会接收到消息。
描述下Direct交换机与Fanout交换机的差异?
Fanout交换机将消息路由给每一个与之绑定的队列
Direct交换机根据RoutingKey判断路由给哪个队列
如果多个队列具有相同的RoutingKey,则效果和Fanout类似
3.Topic交换机
作用:加强版定向,可以让队列在绑定routingkey时使用通配符。
通配符规则:
#
:匹配0个或多个词*
:匹配不多不少恰好1个词
举例:
item.#
:能够匹配item.spu.insert
或者item.spu
item.*
:只能匹配item.spu
假如此时publisher发送的消息使用的RoutingKey
共有四种:
china.news
代表有中国的新闻消息;china.weather
代表中国的天气消息;japan.news
则代表日本新闻japan.weather
代表日本的天气消息;
那么:
topic.queue1
:绑定的是china.#
,凡是以china.
开头的routing key
都会被匹配到,包括:china.news
china.weather
topic.queue2
:绑定的是#.news
,凡是以.news
结尾的routing key
都会被匹配。包括:china.news
japan.news
描述下Direct交换机与Topic交换机的差异?
- Direct交换机是路由完全匹配规则,Topic交换机则是模式匹配(支持通配符)
- Topic交换机与队列绑定时的bindingKey可以指定通配符
#
:代表0个或多个词*
:代表1个词
消息队列高级篇
1.发送者可靠性
1.生产者重试机制
2生产者确认机制
2.MQ的可靠性
1.数据持久化
2.Lazy-Queue惰性队列
3.消费者可靠性
1.消费者确认机制
2.失败重试机制
3.失败处理策略
消息处理失败后重新投递到处理消息失败的交换机当中,专门处理
4.业务幂等
解决消息重复处理问题
3.延迟消息
1.死信交换机和延迟消息
1.死信交换机
什么是死信?
当一个队列中的消息满足下列情况之一时,可以成为死信(dead letter):
- 消费者使用
basic.reject
或basic.nack
声明消费失败,并且消息的requeue
参数设置为false - 消息是一个过期消息,超时无人消费
- 要投递的队列消息满了,无法投递
通过给队列添加dead-letter-exchange
属性指定的交换机就是死信交换机。当该队列的消息成为死信后,即会把消息投递到这个指定的交换机当中。死信交换机配置如下所示:
1 |
|
死信交换机有什么作用呢?
- 收集那些因处理失败而被拒绝的消息
- 收集那些因队列满了而被拒绝的消息
- 收集因TTL(有效期)到期的消息
2.延迟消息
延迟消息是在publisher发送消息到交换机时给消息设置有效期expiration。则当正常队列里的消息没有被消费且过期时,会投递到死信交换机当中。使用场景比如订单限时支付,订单创建时就把消息(附带上过期时间)投递给正常交换机,同时不给正常队列设置消费者,到达过期时间则会投递到死信交换机,交给删除订单的listener处理。
2.DelayExchange插件
基于死信队列虽然可以实现延迟消息,但是太麻烦了。因此RabbitMQ社区提供了一个延迟消息插件来实现相同的效果。
官方文档说明:
https://blog.rabbitmq.com/posts/2015/04/scheduling-messages-with-rabbitmq
插件下载地址:
https://github.com/rabbitmq/rabbitmq-delayed-message-exchange
因为我们是基于Docker安装,所以需要先查看RabbitMQ的插件目录对应的数据卷。
1 | docker volume inspect mq-plugins |
结果如下:
1 | [ |
插件目录被挂载到了/var/lib/docker/volumes/mq-plugins/_data
这个目录,我们上传插件到该目录下。若不是,则根据自己定义的挂载目录上传。
接下来进入插件目录后执行命令,安装插件:
1 | docker exec -it mq rabbitmq-plugins enable rabbitmq_delayed_message_exchange |
1.声明延迟交换机
基于注解方式:
1 |
|
基于@Bean
的方式:
1 | package com.itheima.consumer.config; |
2.发送延迟消息
发送消息时,必须通过x-delay属性设定延迟时间:
1 |
|