MQ
同步调用
在微服务中,如果使用OpenFegin的同步方法调用,如下图流程所示:

- 拓展性差
目前的业务相对简单,但是随着业务规模扩大,产品的功能也在不断完善。在大多数电商业务中,用户支付成功后都会以短信或者其它方式通知用户,告知支付成功。某些项目中,还会有积分或金币的概念。用户支付成功后,给用户以积分奖励或者返还金币,那应该如何处理?最后,项目的支付业务会越来约臃肿;而且每次有新的需求,现有支付逻辑都要跟着变化,代码经常变动,不符合开闭原则,拓展性不好。

- 性能下降
由于采用了同步调用,调用者需要等待服务提供者执行完返回结果后,才能继续向下执行,也就是说每次远程调用,调用者都是阻塞等待状态。最终整个业务的响应时长就是每次远程调用的执行时长之和;每个微服务的执行时长都是50ms,则最终整个业务的耗时可能高达300ms

- 级联失败
由于我们是基于OpenFeign调用交易服务、通知服务。当交易服务、通知服务出现故障时,整个事务都会回滚,交易失败。这其实就是同步调用的级联失败问题。但是假设用户余额充足,扣款已经成功,此时我们应该确保支付流水单更新为已支付,确保交易成功。因此,这里不能因为短信通知、更新订单状态失败而回滚整个事务。
异步调用
异步调用方式其实就是基于消息通知的方式,一般包含三个角色:
- 消息发送者:投递消息的人,就是原来的调用方
- 消息Broker:管理、暂存、转发消息,你可以把它理解成微信服务器
- 消息接收者:接收和处理消息的人,就是原来的服务提供方

在异步调用中,发送者不再直接同步调用接收者的业务接口,而是发送一条消息投递给消息Broker。然后接收者根据自己的需求从消息Broker那里订阅消息。每当发送方发送消息后,接受者都能获取消息并处理。这样,发送消息的人和接收消息的人就完全解耦了

除了扣减余额、更新支付流水单状态以外,其它调用逻辑全部取消。而是改为发送一条消息到Broker。而相关的微服务都可以订阅消息通知,一旦消息到达Broker,则会分发给每一个订阅了的微服务,处理各自的业务。如果提出了新的需求,比如要在支付成功后更新用户积分。支付代码完全不用变更,而仅仅是让积分服务也订阅消息即可:

不管后期增加了多少消息订阅者,作为支付服务来讲,执行问扣减余额、更新支付流水状态后,发送消息即可。业务耗时仅仅是这三部分业务耗时,仅仅100ms,大大提高了业务性能。另外,不管是交易服务、通知服务,还是积分服务,他们的业务与支付关联度低。现在采用了异步调用,解除了耦合,他们即便执行过程中出现了故障,也不会影响到支付服务。
综上,异步调用的优势包括:
- 耦合度更低
- 性能更好
- 业务拓展性强
- 故障隔离,避免级联失败
异步调用的缺点包括:
- 完全依赖于Broker的可靠性、安全性和性能
- 架构复杂,后期维护和调试麻烦
技术选型
消息Broker,目前常见的实现方案就是消息队列(MessageQueue),简称为MQ. 目比较常见的MQ实现:
- ActiveMQ
- RabbitMQ
- RocketMQ
- Kafka
几种常见MQ的对比:
| RabbitMQ | ActiveMQ | RocketMQ | Kafka | |
|---|---|---|---|---|
| 公司/社区 | Rabbit | Apache | 阿里 | Apache |
| 开发语言 | Erlang | Java | Java | Scala&Java |
| 协议支持 | AMQP,XMPP,SMTP,STOMP | OpenWire,STOMP,REST,XMPP,AMQP | 自定义协议 | 自定义协议 |
| 可用性 | 高 | 一般 | 高 | 高 |
| 单机吞吐量 | 一般 | 差 | 高 | 非常高 |
| 消息延迟 | 微秒级 | 毫秒级 | 毫秒级 | 毫秒以内 |
| 消息可靠性 | 高 | 一般 | 高 | 一般 |
追求可用性:Kafka、 RocketMQ 、RabbitMQ 追求可靠性:RabbitMQ、RocketMQ 追求吞吐能力:RocketMQ、Kafka 追求消息低延迟:RabbitMQ、Kafka
据统计,目前国内消息队列使用最多的还是RabbitMQ,再加上其各方面都比较均衡,稳定性也好。
Docker安装RabbitMQ
- 搜索RabbitMQ镜像
docker search rabbitmq
- 下载RabbitMQ
docker pull rabbitmq:3.8.25-management
- 运行镜像的脚本
#!/bin/bash
docker run -d \
--name rabbitmq \
--hostname my-rabbitmq-host \
-e RABBITMQ_DEFAULT_USER=admin \
-e RABBITMQ_DEFAULT_PASS=admin123456 \
-p 5672:5672 \
-p 15672:15672 \
-v /var/www/dockerMountPoint/rabbitmq_data_management:/var/lib/rabbitmq \
--restart unless-stopped \
rabbitmq:3.8.25-management成功运行RabbitMQ容器,无法正常登录的原因
- 访问rabbitmq主页时会出现无法访问,这是因为没有开启插件
docker exec -it 容器ID bash
rabbitmq-plugins enable rabbitmq_management
虚拟机防火墙开放15672、5672的端口

腾讯云服务器放开对应的安全组

腾讯云服务器放开对应的防火墙

RabbitMQ 的整体架构和核心概念
RabbitMQ 有几个核心概念:
- Publisher:消息发送者
- Consumer:消息的消费者
- Queue:消息队列,存储消息
- Exchange:交换机,负责路由消息
- VirtualHost:虚拟主机,用于数据隔离


RabbitMQ 快速入门
注意事项:交换机只能路由和转发消息,不能存储消息
- 新建队列: 创建一个名为 hello.queue 的队列

绑定队列与交换机:将hello.queue绑定和交换机绑定

发送消息:在RabbitMQ中的amq.fanout交换机发送一条消息,并查看队列中的消息数量,以及队列中的具体消息信息

SpringBoot项目集成RabbitMQ
新建一个 SpringBoot 项目,并创建 consumer 和 publisher 两个子模块,项目的整体结构如下

在父工程中引入 SpringAMQP 的依赖
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>- 在 consumer 和 publisher 模块的 application.yml 中分别编写与 RabbitMQ 有关的配置信息
spring:
rabbitmq:
host: 主机的IP地址
port: 5672
virtual-host: /
username: admin
password: admin123456
listener:
simple:
prefetch: 1在RabbitMQ中创建一个simple.queue的队列
在 publisher 模块中编写测试类,用户向队列发送消息
@Test
void testSendMessage() {
String queueName = "simple.queue";
String msg = "hello.amqp!!!";
rabbitTemplate.convertAndSend(queueName, msg);
}- 查看RabbitMQ中simple.queue队列中的消息数量以及消息内容

- 设置消费者接受消息
@RabbitListener(queues = "simple.queue")
public void listenSimpleQueue(String msg) {
System.out.println("消费者收到消息:" + msg);
}
Work Queues 模型
Work Queues 的概念
Work Queues,简单地来说,就是让多个消费者绑定到一个队列,共同消费队列中的消息,虽然有多个消费者绑定同一个队列,但是队列中的某一条消息只会被一个消费者消费

Work Queues 实测
- 在 publisher服务的 SpringAmqp 测试类中添加以下方法,该方法可以在 1 秒内产生 50 条消息
@Test
void testSendMessage1() throws InterruptedException {
String queueName = "test1";
for (int i = 0; i < 50; i++) {
String msg = "hello.amqp------------" + i;
rabbitTemplate.convertAndSend(queueName, msg);
Thread.sleep(20);
}
}- 在 consumer 服务的 RabbitMQListener 类中添加以下方法,监听 work.queue 队列;分别在两个消费者中使用Thread.sleep()模拟消费者的速度
@RabbitListener(queues = "test1")
public void listTestQueue1(String msg) throws InterruptedException {
System.out.println("消费者1收到消息---" + msg);
Thread.sleep(20);
}
@RabbitListener(queues = "test1")
public void listTestQueue2(String msg) throws InterruptedException {
System.err.println("消费者2收到消息11111---" + msg);
Thread.sleep(200);
}
注意: 当消费者性能不一致的时候,队列仍然是采用轮询的方式将任务平均的分配给对应的消费者;但是正常情况下,当消费者的消费能力更强时,应该分配更多的任务,消费能力更弱的消费者分配相对更少的任务数量。
- 配置消费数量适用消费者的消费能力策略
spring:
rabbitmq:
listener:
simple:
prefetch: 1
交换机
真正的生产环境都会经过交换机来发送消息,而不是直接发送到队列
交换机的作用:
- 接收 publisher 发送的消息
- 将消息按照规则路由到与交换机绑定的队列
交换机的类型有以下三种:
- Fanout:广播
- Direct:定向
- Topic:话题
Fanout 交换机
Fanout 交换机会将接收到的消息广播到每一个跟其绑定的 queue ,所以也叫广播模式

Fanout 快速入门
- 在RabbitMQ中声明交换机(存在默认的fanout交换机)

- 在该交换机上绑定对应的队列

- 在 consumer 服务的 RabbitMQListener 类中添加以下方法,分别监听 fanout.queue1 和 fanout.queue2 队列
@RabbitListener(queues = "test1")
public void listTestQueue1(String msg) throws InterruptedException {
System.out.println("消费者1收到消息---" + msg);
Thread.sleep(20);
}
@RabbitListener(queues = "test1")
public void listTestQueue2(String msg) throws InterruptedException {
System.err.println("消费者2收到消息11111---" + msg);
Thread.sleep(200);
}- 在 publisher 服务的测试类中添加以下方法,向amq.fanout 交换机发送消息
@Test
void testSendMessage2() throws InterruptedException {
String msg = "Hello, Every One!!!";
String exchangeName = "amq.fanout";
rabbitTemplate.convertAndSend(exchangeName, null, msg);
}
Direct 交换机
Direct 交换机会将接收到的消息根据规则路由到指定的队列,被称为定向路由
- 每一个 Queue 都与 Exchange 设置一个 bindingKey
- 发布者发送消息时,指定消息的 RoutingKey
- Exchange 将消息路由到 bindingKey 与消息 routingKey 一致的队列

需要注意的是:同一个队列可以绑定多个 bindingKey ,如果有多个队列绑定了同一个 bindingKey ,就可以实现类似于 Fanout 交换机的效果。由此可以看出,Direct 交换机的功能比 Fanout 交换机更强大
快速上手
做案例来体验 Direct 交换机的效果,案例要求如下:
- 在 RabbitMQ 控制台中,声明队列 direct.queue1 和 direct.queue2
- 在 RabbitMQ 控制台中,声明交换机 zuo.direct ,将上面创建的两个队列与其绑定
- 在 consumer 服务中,编写两个消费者方法,分别监听 direct.queue1 和 direct.queue2
- 在 publisher 服务中编写测试方法,利用不同的 RoutingKey 向 zuo.direct 交换机发送消息
操作流程
- 在RabbitMQ中创建direct.queue1和direct.queue2队列

- 声明zuo.direct 的Direct 类型的交换机,并且通过对应的bindingKey绑定队列的队列信息

- 创建消费者监听对应的队列信息
@RabbitListener(queues = "direct.queue1")
public void listDirectQueue1(String msg) {
System.out.println("消费者1收到消息---" + msg);
}
@RabbitListener(queues = "direct.queue2")
public void listDirectQueue2(String msg) {
System.err.println("消费者2收到消息---" + msg);
}- 创建生产者发送消息到队列
@Test
void testSendMessage3() {
String msg = "Hello, 测试Direct类型的交换机---- RoutingKey = red";
String exchangeName = "zuo.direct";
rabbitTemplate.convertAndSend(exchangeName, "red", msg);
}
@Test
void testSendMessage4() {
String msg = "Hello, 测试Direct类型的交换机---- RoutingKey = blue";
String exchangeName = "zuo.direct";
rabbitTemplate.convertAndSend(exchangeName, "blue", msg);
}- 测试结果

Topic 交换机
Topic Exchange 与 Direct Exchange类似,区别在于 Topic Exchange 的 routingKey 可以是多个单词的列表(多个 routingKey 之间以.分割) Queue 与 Exchange 指定 bindingKey 时可以使用通配符 #:代指 0 个或多个单词 *:代指 1 个单词

快速上手
做一个案例来体验 Topic 交换机的效果,案例要求如下:
在 RabbitMQ 控制台中,声明队列 topic.queue1 和 topic.queue2
在 RabbitMQ 控制台中,声明交换机 zuo.topic ,将两个队列与其绑定
在 consumer 服务中编写两个消费者方法,分别监听 topic.queue1 和 topic.queue2
在 publisher 服务中编写测试方法,利用不同的 routingKey 向 zuo.topic 发送消息
- 创建topic.queue1 和 topic.queue2队列信息

- 创建zuo.topic的交换机,并绑定对应的队列和设置对应的Routingkey

- 在consumer模块设置消费者
@RabbitListener(queues = "topic.queue1")
public void listTopicQueue1(String msg) {
System.out.println("消费者1收到消息---" + msg);
}
@RabbitListener(queues = "topic.queue2")
public void listTopicQueue2(String msg) {
System.err.println("消费者2收到消息---" + msg);
}- 在publisher模块中设置生产者
@Test
void testSendMessage5() {
String msg = "Hello, 测试Topic类型的交换机 RoutingKey + china.news";
String exchangeName = "zuo.topic";
rabbitTemplate.convertAndSend(exchangeName, "china.news", msg);
}
@Test
void testSendMessage6() {
String msg = "Hello, 测试Topic类型的交换机 RoutingKey + china.queue";
String exchangeName = "zuo.topic";
rabbitTemplate.convertAndSend(exchangeName, "china.queue", msg);
}- 测试结果

代码声明队列和交换机
我们之前创建队列和交换机都是在 RabbitMQ 的控制台页面中创建的,不仅十分繁琐,还有可能打错队列和交换机的名。而且,不同的环境(开发环境、测试环境、生产环境)可能会有不同的队列和交换机,手动创建队列和交换机效率十分低下
SpringAQMP提供的创建队列和交换机的类
SpringAMQP 提供了几个类,用来声明队列、交换机及其绑定关系:
- Queue:用于声明队列,可以用工厂类 QueueBuilder 构建
- Exchange:用于声明交换机,可以用工厂类 ExchangeBuilder 构建
- Binding:用于声明队列和交换机的绑定关系,可以用工厂类 BindingBuilder 构建

快速上手
- 创建一个 Fanout 类型的交换机,并且创建队列与这个交换机绑定
编程式声明
- 在 consumer 服务中编写 FanoutConfiguration 配置类
绝密文件,请勿展开
@Configuration
public class DirectConfiguration {
/**
* 创建名称为hello.direct 的交换机
*
* @return
*/
@Bean
public DirectExchange directExchange() {
return ExchangeBuilder.directExchange("hello.direct").build();
}
/**
* 创建名称为hello.queue3的队列
*
* @return
*/
@Bean
public Queue directQueue3() {
return QueueBuilder.durable("hello.queue3").build();
}
/**
* 创建名称为hello.queue4的队列
*
* @return
*/
@Bean
public Queue directQueue4() {
return QueueBuilder.durable("hello.queue4").build();
}
@Bean
public Binding bindingQueue3WithBlue() {
return BindingBuilder.bind(directQueue3()).to(directExchange()).with("blue");
}
@Bean
public Binding bindingQueue3WithRed() {
return BindingBuilder.bind(directQueue3()).to(directExchange()).with("red");
}
@Bean
public Binding bindingQueue4WithRed() {
return BindingBuilder.bind(directQueue4()).to(directExchange()).with("red");
}
}注解式声明
- SpringAMOP 提供了基于@RabbitListener注解声明队列和交换机的方式
@RabbitListener(bindings = @QueueBinding(
value = @Queue(name = "direct.queue3"),
exchange = @Exchange(name = "you.direct", type = ExchangeTypes.DIRECT),
key = {"red", "blue"}
))
public void listDirectQueue3(String msg) {
System.out.println("消费者3收到消息----" + msg);
}消息转化器
在了解消息转换器之前,我们先来做一个小案例,案例的内容是利用 SpringAMQP 发送一条消息,消息的内容为一个 Java 对象,案例要求如下:
- 在 RabbitMQ 控制台创建一个队列,名为 object.queue
- 编写单元测试,向该队列中直接发送一条消息,消息的内容为 Map
- 在控制台查看消息
代码实现
- 向object.queue队列发送Map集合信息
@Test
void testSendMessage7() {
Map<String, Object> map = new HashMap<>();
map.put("name", "Tom");
map.put("age", 21);
rabbitTemplate.convertAndSend("object.queue", map);
}- 成功发送消息后,我们在 RabbitMQ 的控制台查看消息的具体内容

结论: 可以发现,消息的内容类型为 application/x-java-serialized-object,并且消息的内容也变成一堆乱码;本来是想发送一个简单的仅含有姓名和年龄两个字段的简短信息,但是消息却变成了一堆乱码,不仅可读性大大下降,而且占用的空间也大大地增加了,这显然不是我们想要的效果
默认的消息转换器
Spring 处理对象类型的消息是由 org.springframework.amap.support.converter.MessageConverter 接口来处理的,该接口默认实现是 SimpleMessageConverter
SimpleMessageConverter 类是基于 JDK 提供的 ObjectOutputStream 来类完成序列化的,这种序列化方式存在以下问题:
- 使用 JDK 序列化有安全风险(如果序列化后的消息被恶意篡改,在反序列化的过程中可能会执行一些高危的代码)
- 经过 JDK 序列化的消息占用空间很大
- 经过 JDK 序列化的消息可读性很差
自定义消息转换器
- 一般建议采用 JSON 序列化代替默认的 JDK 序列化;要使用 JSON 序列化,需要先引入 jackson 依赖(在项目的父工程中引入)
<dependency>
<groupId>com.fasterxml.jackson.dataformat</groupId>
<artifactId>jackson-dataformat-xml</artifactId>
</dependency>- 接着在 consumer 服务和 publisher 服务中配置 MessageConverter
@Bean
public MessageConverter jacksonMessageConvertor(){
return new Jackson2JsonMessageConverter();
}- 再次发送对象类型的消息,可以看到消息已经成功转换成 JSON 类型的字符串
@Test
void testSendMessage7() {
Map<String, Object> map = new HashMap<>();
map.put("name", "Tom");
map.put("age", 21);
rabbitTemplate.convertAndSend("object.queue", map);
}- 在 consumer 服务的 RabbitMQListener 类中添加对 object.queue 队列的监听(用什么类型发,就用什么类型接收)
@RabbitListener(queues = "object.queue")
public void listDirectObjectQueue(Map<String, Object> map) {
System.out.println("ObjectQueue队列中的消息为" + map);
}- 消费者的消费结果
生产者的可靠性
生产者重连
- 由于网络问题,可能会出现客户端连接 RabbitMQ 失败的情况,我们可以通过配置开启连接 RabbitMQ 失败后的重连机制
spring:
rabbitmq:
host: 101.43.73.194
port: 5672
virtual-host: /
username: admin
password: admin123456
connection-timeout: 1s # 连接超时时间
template:
retry:
initial-interval: 1000ms # 连接失败后的初始等待时间
multiplier: 1 # 连接失败后的等待时常倍数,下次等待时常 = (initial - interval) * multiplier
max-attempts: 3 # 最大重试次数
enabled: true # 开启超时重试机制- 填写完配置信息后,我们手动停止 RabbitMQ ,模拟生产者连接 RabbitMQ 失败的情况
docker stop rabbitmq- 启动测试类
@Test
void testSendMessageToQueue() {
String queueName = "simple.queue";
String msg = "Hello, SpringAMQP!";
rabbitTemplate.convertAndSend(queueName, msg);
}- 可以在控制台看到,总共有三次重新连接 RabbitMQ 的记录,三次连接都失败后,就直接抛异常了

注意事项:
- 当网络不稳定的时候,利用重试机制可以有效提高消息发送的成功率,但 SpringAMOP 提供的重试机制是阻塞式的重试,也就是说多次重试等待的过程中,线程会被阻塞,影响业务性能
- 如果对于业务性能有要求,建议禁用重试机制。如果一定要使用,请合理配置等待时长(比如 200 ms)和重试次数,也
- 可以考虑使用异步线程来执行发送消息的代码
生产者确认
RabbitMQ 提供了 Publisher Confirm 和 Publisher Return 两种确认机制。开启确机制认后,如果 MQ 成功收到消息后,会返回确认消息给生产者,返回的结果有以下几种情况:
- 消息投递到了 MQ,但是路由失败,此时会通过 PublisherReturn 机制返回路由异常的原因,然后返回 ACK,告知生产者消息投递成功
- 临时消息投递到了 MQ,并且入队成功,返回 ACK,告知生产者消息投递成功
- 持久消息投递到了MQ,并且入队完成持久化,返回 ACK,告知生产者消息投递成功
- 其它情况都会返回 NACK,告知生产者消息投递失败

生产者确认机制代码实现
- 在 publisher 服务中编写与生产者确认机制有关的配置信息( application.yml 文件)
spring:
rabbitmq:
publisher-returns: true
publisher-confirm-type: correlated- publisher-confirm-type 有三种模式
- none:关闭 confirm 机制
- simple:以同步阻塞等待的方式返回 MQ 的回执消息
- correlated:以异步回调方式的方式返回 MQ 的回执消息
注意: 每个 RabbitTemplate 只能配置一个 ReturnCallback
- 在 publisher 模块新增一个名为 RabbitMQConfig 的配置类,并让该类实现 ApplicationContextAware 接口
@Slf4j
@Configuration
public class MqConfirmConfig implements ApplicationContextAware {
@Override
public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
RabbitTemplate rabbitTemplate = applicationContext.getBean(RabbitTemplate.class);
// 配置消息没有到交换机回调函数
rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() {
@Override
public void confirm(CorrelationData correlationData, boolean b, String s) {
log.debug("消息发送到对应的详细信息交换机: id:{}, toString:{}, boolean:{}, s:{}" +
correlationData.getId(), correlationData, b, s);
}
});
// 配置消息没有到队列回调函数
rabbitTemplate.setReturnsCallback(new RabbitTemplate.ReturnsCallback() {
@Override
public void returnedMessage(ReturnedMessage returnedMessage) {
log.debug("收到消息的return callback, exchange:{}, key:{},msg:{},code:{},text:{}",
returnedMessage.getExchange(), returnedMessage.getRoutingKey(),
returnedMessage.getMessage(), returnedMessage.getReplyCode(),
returnedMessage.getReplyText());
}
});
}
}- 在 publisher 模块添加一个测试类,测试 ReturnCallback 的效果
@Test
void testConfirmCallBack() throws InterruptedException {
// 1、创建cd
CorrelationData cd = new CorrelationData(UUID.randomUUID().toString());
// 2、添加ConfirmCallBack
cd.getFuture().addCallback(new ListenableFutureCallback<CorrelationData.Confirm>() {
@Override
public void onFailure(Throwable ex) {
log.error("消息回调失败", ex);
}
@Override
public void onSuccess(CorrelationData.Confirm result) {
log.debug("收到confirm callback回执");
if (result.isAck()) {
// 消息发送成功
log.debug("消息发送成功,收到ack");
} else {
//消息发送失败
log.debug("消息发送失败,收到nack");
}
}
});
rabbitTemplate.convertAndSend("zuo.direct1", "blue", "测试发送消息回调机制", cd);
Thread.sleep(10000);
}- 发送成功后可以看到消息发送成功的回调信息

如果发送消息不存在对应的routingKey,如下如报错信息

如果发送消息不存在对应的交换机,如下如报错信息

数据的持久化
为了提升性能,默认情况下MQ的数据都是在内存存储的临时数据,重启后就会消失。为了保证数据的可靠性,必须配置数据持久化,包括:
- 交换机持久化
- 队列持久化
- 消息持久化
交换机持久化
在控制台的Exchanges页面,添加交换机时可以配置交换机的Durability参数:设置为Durable就是持久化模式,Transient就是临时模式。

当使用docker命令重启容器时,会发现该属性为Transient交换机不存在
队列持久化
在控制台的Queues页面,添加队列时,同样可以配置队列的Durability参数:可以创建一个名 test1 的队列;然后设置为 Transient ,再重启 mq 之后查看;它已经不存在,说明它是临时的,重启之后就不存在。

消息持久化
在控制台发送消息的时候,可以添加很多参数,而消息的持久化是要配置一个Delivery mode:在 simple.queue 下发两条消息;一条是持久化的,一条是临时的消息;然后重启mq再查看队列中的消息。发现持久化的消息存在,临时的消息不存在。

消息代理(RabbitMQ)的可靠性
队列 Page Out机制
在默认情况下,RabbitMQ 会将接收到的信息保存在内存中以降低消息收发的延迟,这样会导致两个问题:
- 一旦 RabbitMQ 宕机,内存中的消息会丢失
- 内存空间是有限的,当消费者处理过慢或者消费者出现故障或时,会导致消息积压,引发 MQ 阻塞( Paged Out 现象)
怎么理解 MQ 阻塞呢,当队列的空间被消息占满了之后,RabbitMQ 会先把老旧的信息存到磁盘,为新消息腾出空间,在这个过程中,整个 MQ 是被阻塞的,也就是说,在 MQ 完成这一系列工作之前,无法处理已有的消息和接收新的消息


演示一下 RabbitMQ 发生 Paged Out 现象(也就是队列的空间被消息占满了之后,将老旧消息移到磁盘,为新消息腾出空间的情况)
- 在发送消息之前,先把生产者确认机制关闭,提高消息发送的速度
spring:
rabbitmq:
publisher-returns: false
publisher-confirm-type: none- 测试发送一百万调非持久化信息
@Test
void testPagedOut() {
Message message = MessageBuilder.withBody("Hello, paged out".getBytes(StandardCharsets.UTF_8))
.setDeliveryMode(MessageDeliveryMode.NON_PERSISTENT)
.build();
for (int i = 0; i < 1000000; i++) {
rabbitTemplate.convertAndSend("simple.queue", message);
}
}- 测试结果:当内存满了之后,就会将数据信息转到磁盘上,这个时间段mq 是不会接收消息,相当于处于阻塞状态

向RabbitMQ 中发送持久化的消息,当消息存满内存,mq 是否会出现阻塞
- 测试发送持久化消息
@Test
void testPagedOut() {
Message message = MessageBuilder.withBody("Hello, paged out".getBytes(StandardCharsets.UTF_8))
.setDeliveryMode(MessageDeliveryMode.PERSISTENT)
.build();
for (int i = 0; i < 1; i++) {
rabbitTemplate.convertAndSend("simple.queue", message);
}
}- 测试结果:当内存满了之后,mq 不会阻塞,数据会存入磁盘中

LazyQueue(3.12 版本后所有队列都是 Lazy Queue 模式)
从 RabbitMQ 的 3.6.0 版本开始,增加了 Lazy Queue 的概念,也就是惰性队列,惰性队列的特征如下:
- 接收到消息后直接存入磁盘而非内存(内存中只保留最近的消息,默认2048条)
- 消费者要处理消息时才会从磁盘中读取并加载到内存
- 支持数百万条的消息存储,在 3.12 版本后,所有队列都是 Lazy Queue 模式,无法更改
开启持久化和生产者确认时,RabbitMQ 只有在消息持久化完成后才会给生产者返回 ACK 回执
创建Lazy Queue方式
- 在 RabbitMQ 控制台中,要创建一个惰性队列,只需要在声明队列时,指定 x-queue-mode 属性为 lazy 即可

- 在 Java 代码中,要创建一个惰性队列,只需要在声明队列时,指定 x-queue-mode 属性为 lazy
@Bean
public org.springframework.amqp.core.Queue lazeQueue() {
return QueueBuilder.durable("lazy.queue1")
.lazy()
.build();
}- 注解式创建
@RabbitListener(queuesToDeclare = @org.springframework.amqp.rabbit.annotation.Queue(
name = "lazy.queue2",
durable = "true",
arguments = @Argument(
name = "x-queue-mode",
value = "lazy"
)
))
public void listenLazeQueue(String message) {
System.out.println("消费者收到了 laze.queue2的消息: " + message);
}性能测试
测试向Lazy Queue队列中发送一百万条消息,查看mq 处理消息的效率以及是否会出现阻塞状态

结论:mq 将接收的消息存入Page Out中,所以不会出现mq阻塞的现象,而且消息处理的效率高
消费者的可靠性
消费者的确认机制
为了确认消费者是否成功处理消息,RabbitMQ 提供了消费者确认机制(Consumer Acknowledgement)。处理消息后,消费者应该向 RabbitMQ 发送一个回执,告知 RabbitMQ 消息的处理状态,回执有三种可选值:
- ack:成功处理消息,RabbitMQ 从队列中删除该消息
- nack:消息处理失败,RabbitMQ 需要再次投递消息
- reject:消息处理失败并拒绝该消息,RabbitMQ 从队列中删除该消息

SpringAMQP 已经实现了消息确认功能,并允许我们通过配置文件选择 ACK 的处理方式,有三种方式:
- none:不处理,即消息投递给消费者后立刻 ack,消息会立刻从 MQ 中删除,非常不安全,不建议使用
- manual:手动模式。需要自己在业务代码中调用 api,发送 ack 或 reject ,存在业务入侵,但更灵活
- auto:自动模式,SpringAMQP 利用 AOP 对我们的消息处理逻辑做了环绕增强,当业务正常执行时则自动返回 ack,当业务出现异常时,会根据异常的类型返回不同结果:
- 如果是业务异常,会自动返回 nack
- 如果是消息处理或校验异常,自动返回 reject
代码实现
- 开启消息确认机制,设置确认类型为none 需要在 application.yml 文件中编写相关的配置
spring:
rabbitmq:
listener:
simple:
prefetch: 1
acknowledge-mode: none- 测试处理模式为 none 的情况,向 simple.queue 队列发送一条消息,同时监听 simple.queue 队列的消息,监听到队列中的消息后手动抛出一个异常
@RabbitListener(queues = "simple.queue")
public void listenSimpleQueue(String msg) {
System.out.println("消费者收到消息:" + msg);
throw new RuntimeException("抛出异常");
}- 测试结果:消息确认机制设置的none 时,队列中的消息投递给消费者后立刻ack, 消息会立刻从MQ中删除

设置确认类型为auto, 将application.yml中acknowledge-mode中的值设置为auto,并且消费者中代码逻辑不变
处理模式为auto 类型的处理结果:消息确认机制设置为auto 时,当消费者在消费的过程中抛出异常时,队列会一直重复的投递,直到消息消费成功为止

失败重试机制
当消费者出现异常后,消息会不断重新入队,重新发送给消费者,然后再次发生异常,再次 requeue(重新入队),陷入 无限循环,给 RabbitMQ 带来不必要的压力;可以利用 Spring 提供的 retry 机制,在消费者出现异常时利用本地重试,而不是无限制地重新入队。
- 在 application.yml 配置文件中开启失败重试机制
spring:
rabbitmq:
listener:
simple:
prefetch: 1
acknowledge-mode: auto
retry:
enabled: true # 开启消息消费失败重试机制
initial-interval: 1000ms # 消息消费失败后的初始等待时间
multiplier: 1 # 消息消费失败后的等待时长倍数,下次等待时长 = (initial-interval) * multiplier
max-attempts: 3 # 最大重试次数
stateless: true # true表示无状态,false表示有状态,如果业务中包含事务,需要设置为false- 消费者代码
@RabbitListener(queues = "simple.queue")
public void listenSimpleQueue(String msg) {
System.out.println("消费者收到消息:" + msg);
throw new RuntimeException("抛出异常");
}- 测试结果,查看控制台结果:当消费者在消费的过程中出现异常,会进入重试机制,当当前的重试次数大于配置文件中设置的最大重试次数时,会抛出对应的异常

失败消息处理策略
开启重试模式后,如果重试次数耗尽后消息依然处理失败,则需要由 MessageRecoverer 接口来处理, MessageRecoverer 有三个实现类:
- RejectAndDontRequeueRecoverer:重试次数耗尽后,直接 reject,丢弃消息,默认就是这种方式
- ImmediateRequeueMessageRecoverer:重试次数耗尽后,返回 nack,消息重新入队
- RepublishMessageRecoverer:重试耗尽后,将失败消息投递到指定的交换机

实现使用 RepublishMessageRecoverer 类的情况
- 定义一个名为 blog.error 的交换机、一个名为 error.queue 的队列,并将队列和交换机进行绑定;将失败处理策略改为 RepublishMessageRecoverer (开起了消费者重试机制才会生效)
绝密文件,请勿展开
@Configuration
@ConditionalOnProperty(prefix = "spring.rabbitmq.listener.simple.retry", name = "enabled", havingValue = "true")
public class ErrorConfiguration {
@Bean
public DirectExchange errorExchange() {
return new DirectExchange("error.direct");
}
@Bean
public Queue errorQueue() {
return new Queue("error.queue");
}
@Bean
public Binding errorBinding(Queue errorQueue, DirectExchange errorExchange) {
return BindingBuilder.bind(errorQueue).to(errorExchange).with("error");
}
@Bean
public MessageRecoverer messageRecoverer(RabbitTemplate rabbitTemplate) {
return new RepublishMessageRecoverer(rabbitTemplate, "error.direct", "error");
}
}- 在控制台中可以看到,消息的重试次数耗尽后,消息被放入了 error.queue 队列

- 在 RabbitMQ 的控制塔也可以看到, error.direct 交换机 和 error.queue 队列成功创建,消息也成功放入了 error.queue 队列

业务的幂等性
幂等性的概念
幂等是一个数学概念,用函数表达式来描述是这样的:f(x) = f(f(x)),绝对值函数具有幂等性;在程序开发中,幂等是指同一个业务,执行一次或多次对业务状态的影响是一致的

保证消息的幂等性:为每个消息设置一个唯一的ID
给每个消息都设置一个唯一的 id,利用 id 区分是否是重复消息:
- 为每条消息都生成一个唯一的 id,与消息一起投递给消费者
- 消费者接收到消息后处理自己的业务,业务处理成功后将消息 id 保存到数据库
- 如果消费者下次又收到相同消息,先去数据库查询该消息对应的 id 是否存在,如果存在则为重复消息,放弃处理
可以在指定 MessageConverter 的具体类型时,同时为 MessageConverter 设置自动创建一个 messageId
@Bean
public MessageConverter jacksonMessageConvertor() {
Jackson2JsonMessageConverter jackson2JsonMessageConverter = new Jackson2JsonMessageConverter();
jackson2JsonMessageConverter.setCreateMessageIds(true);
return jackson2JsonMessageConverter;
}
- 在rabbitmq控制台中查看队列中的消息,消息中包含message_id值

保证消息的幂等性:结合业务做判断
结合业务逻辑,基于业务本身做判断。以支付业务为例:我们要在支付后修改订单状态为已支付,应该在修改订单状态前先查询订单状态,判断状态是否是未支付,只有未支付订单才需要修改,其它状态的订单不做处理。

兜底策略-定时任务核查
定时任务定期执行,查询那些:
- 状态为“支付中”(Pending)
- 创建时间或最后更新时间已经超过一个合理阈值(例如15分钟) 的订单。
- 然后,主动调用支付服务的查询接口,获取这些订单的最终支付状态,并更新本地数据库。
延迟消息
延迟消息的概念
延迟消息:生产者发送消息时指定一个时间,消费者不会立刻收到消息,而是在指定时间之后才会收到消息
延迟任务:一定时间之后才会执行的任务

死信交换机
当一个队列中的某条消息满足下列情况之一时,就会成为死信(dead letter):
- 消费者使用 basic.reject 或 basic.nack 声明消费失败,并且消息的 requeue 参数设置为 false
- 过期消息(达到了队列或消息本身设置的过期时间),消息超时后无人消费
- 要投递的队列消息堆积满了,最早的消息可能成为死信
如果队列通过 dead-letter-exchange 属性指定了一个交换机,那么该队列中的死信就会投递到这个交换机中,这个交换机称为死信交换机(Dead Letter Exchange,简称DLX)

死信交换机实现发送延迟消息
- 定义普通队列,死信队列,普通的交换机以及死信交换机;并且将队列和交换机进行对应的绑定
绝密文件,请勿展开
@Configuration
public class DeadExchangeConfiguration {
// 定义死信交换机
public static final String DEAD_LETTER_EXCHANGE = "dead.exchange";
public static final String DEAD_LETTER_QUEUE = "dead.queue";
// 定义主队列
public static final String MAIN_QUEUE = "simple.queue";
// 定义主队列绑定的交换机
public static final String MAIN_EXCHANGE = "simple.exchange";
@Bean
public DirectExchange mainExchange() {
return new DirectExchange(MAIN_EXCHANGE);
}
@Bean
public Queue mainQueue() {
return QueueBuilder.durable(MAIN_QUEUE)
.withArgument("x-dead-letter-exchange", DEAD_LETTER_EXCHANGE) // 绑定死信交换机
.withArgument("x-dead-letter-routing-key", "dead") // 指定死信路由键
.build();
}
@Bean
public DirectExchange deadLetterExchange() {
return new DirectExchange(DEAD_LETTER_EXCHANGE);
}
@Bean
public Queue deadLetterQueue() {
return new Queue(DEAD_LETTER_QUEUE);
}
@Bean
public Binding bindingMainQueue() {
return BindingBuilder.bind(mainQueue()).to(mainExchange()).with("simple");
}
@Bean
public Binding bindingDeadLetterQueue() {
return BindingBuilder.bind(deadLetterQueue()).to(deadLetterExchange()).with("dead");
}
}- 生产者发送对应的信息,并设置过期时间
@Test
void testDlxQueue() {
String exchangeName = "simple.exchange";
String msg = "hello, Dead queue!!!";
MessageProperties properties = MessagePropertiesBuilder.newInstance()
.setContentType(MessageProperties.CONTENT_TYPE_TEXT_PLAIN)
.setMessageId(UUID.randomUUID().toString())
.setExpiration("3000")
.build();
Message message = new Message(msg.getBytes(), properties);
rabbitTemplate.convertAndSend(exchangeName, "simple", message);
}- 设置死信队列的消费者(普通队列无需设置消费者)
@RabbitListener(queues = "dead.queue")
public void processDeadLetterMessage(String message) {
System.out.println("Dead letter received: " + message);
}- 测试结果:监听死信队列的消费者输出消息;说明当消息发送到普通队列中,当消息过期后会通过死信交换机转发到死信队列中

延迟消息插件
安装插件的方式
下载延迟插件安装包,下载的插件版本需要和RabbitMQ的版本兼容 下载地址:text

将插件移动到容器内部
docker cp /var/www/dockerMountPoint/rabbitmq_delayed_message_exchange-3.9.0.ez 容器ID:/plugins
- 进入容器中启动插件
rabbitmq-plugins enable rabbitmq_delayed_message_exchange
- 安装插件结果

代码实现发送延迟消息
- 创建交换机和队列,并将交换机设置延迟队列的属性
@Configuration
public class DxlConfiguration {
@Bean
public DirectExchange dxlExchange() {
return ExchangeBuilder.directExchange("dxl.exchange").delayed().durable(true).build();
}
@Bean
public Queue dxlQueue() {
return QueueBuilder.durable("dxl.queue").build();
}
@Bean
public Binding bindingDxlQueue() {
return BindingBuilder.bind(dxlQueue()).to(dxlExchange()).with("dxl");
}
}- 设置消费者,监听dxl.queue队列
@RabbitListener(queues = "dxl.queue")
public void processDxlLetterMessage(String message) {
System.out.println("DXL letter received: " + message);
}- 设置生产者,发送消息到交换机
@Test
void testDlxQueue() {
String exchangeName = "dxl.exchange";
String msg = "hello, DXL queue!!!";
rabbitTemplate.convertAndSend(exchangeName, "dxl", msg, new MessagePostProcessor() {
@Override
public Message postProcessMessage(Message message) throws AmqpException {
message.getMessageProperties().setDelay(10000);
return message;
}
});
}- 测试结果:过了设置10s时间,消费者消费了这条消息

取消超时订单
设置 30 分钟后检测订单支付状态实现起来非常简单,但是存在两个问题:
- 如果并发较高,30分钟可能堆积消息过多,对 MQ 压力很大
- 大多数订单在下单后 1 分钟内就会支付,但消息需要在 MQ 中等待30分钟,浪费资源


代码实现
- 定义一个实体类,用于记录延迟消息的内容和延迟消息的延迟时间列表(该实体类也是延迟消息的类型)
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
public class MultipleDelayMessage<T> {
private T data;
private List<Long> delayMillis;
public MultipleDelayMessage() {
}
public MultipleDelayMessage(T data, Long... delayMillis) {
this.data = data;
this.delayMillis = new ArrayList<>(Arrays.asList(delayMillis));
}
public MultipleDelayMessage(T data, List<Long> delayMillis) {
this.data = data;
this.delayMillis = delayMillis;
}
public static <T> MultipleDelayMessage<T> of(T data, Long... delayMillis) {
return new MultipleDelayMessage<>(data, new ArrayList<>(Arrays.asList(delayMillis)));
}
public static <T> MultipleDelayMessage<T> of(T data, List<Long> delayMillis) {
return new MultipleDelayMessage<>(data, delayMillis);
}
public boolean hasNextDelay() {
return !delayMillis.isEmpty();
}
public Long removeNextDelay() {
return delayMillis.remove(0);
}
public T getData() {
return data;
}
public void setData(T data) {
this.data = data;
}
public List<Long> getDelayMillis() {
return delayMillis;
}
public void setDelayMillis(List<Long> delayMillis) {
this.delayMillis = delayMillis;
}
@Override
public String toString() {
return "MultipleDelayMessage{" +
"data=" + data +
", delayMillis=" + delayMillis +
'}';
}
}- 定义一个发送延迟消息的消息处理器,供所有服务使用
import org.springframework.amqp.AmqpException;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessagePostProcessor;
public class DelayMessagePostProcessor implements MessagePostProcessor {
private final Integer delay;
public DelayMessagePostProcessor(Integer delay) {
this.delay = delay;
}
@Override
public Message postProcessMessage(Message message) throws AmqpException {
message.getMessageProperties().setDelay(delay);
return message;
}
}- 创建交换机和队列
@Configuration
public class OrderConfiguration {
@Bean
public DirectExchange orderExchange() {
return ExchangeBuilder.directExchange("order.exchange").build();
}
@Bean
public Queue orderQueue() {
return QueueBuilder.durable("order.queue").build();
}
@Bean
public Binding binding() {
return BindingBuilder.bind(orderQueue()).to(orderExchange()).with("order");
}
}- 生产者发送消息
@Test
void testOrderQueue() {
MultiDelayMessage<String> message = MultiDelayMessage.of("hello, Order!!!", 1000L, 2000L, 4000L, 8000L, 16000L);
rabbitTemplate.convertAndSend("order.exchange",
"order",
message,
new DelayMessagePostProcessor(message.removeNextDelay().intValue()));
}- 消费者消费消息
@RabbitListener(queues = "order.queue")
public void processOrderMessage(MultiDelayMessage multiDelayMessage) {
if (multiDelayMessage.hasNextDelay()) {
Long multi = multiDelayMessage.removeNextDelay();
rabbitTemplate.convertAndSend("order.exchange",
"order",
multiDelayMessage,
new DelayMessagePostProcessor(multi.intValue()));
System.out.println("发送的延迟时间为:" + multi);
} else {
System.out.println("任务成功完成");
}
}- 测试结果


