Skip to content

MQ

同步调用

在微服务中,如果使用OpenFegin的同步方法调用,如下图流程所示:

alt text

  • 拓展性差

目前的业务相对简单,但是随着业务规模扩大,产品的功能也在不断完善。在大多数电商业务中,用户支付成功后都会以短信或者其它方式通知用户,告知支付成功。某些项目中,还会有积分或金币的概念。用户支付成功后,给用户以积分奖励或者返还金币,那应该如何处理?最后,项目的支付业务会越来约臃肿;而且每次有新的需求,现有支付逻辑都要跟着变化,代码经常变动,不符合开闭原则,拓展性不好。

alt text

  • 性能下降

由于采用了同步调用,调用者需要等待服务提供者执行完返回结果后,才能继续向下执行,也就是说每次远程调用,调用者都是阻塞等待状态。最终整个业务的响应时长就是每次远程调用的执行时长之和;每个微服务的执行时长都是50ms,则最终整个业务的耗时可能高达300ms

alt text

  • 级联失败

由于我们是基于OpenFeign调用交易服务、通知服务。当交易服务、通知服务出现故障时,整个事务都会回滚,交易失败。这其实就是同步调用的级联失败问题。但是假设用户余额充足,扣款已经成功,此时我们应该确保支付流水单更新为已支付,确保交易成功。因此,这里不能因为短信通知、更新订单状态失败而回滚整个事务。

异步调用

异步调用方式其实就是基于消息通知的方式,一般包含三个角色:

  • 消息发送者:投递消息的人,就是原来的调用方
  • 消息Broker:管理、暂存、转发消息,你可以把它理解成微信服务器
  • 消息接收者:接收和处理消息的人,就是原来的服务提供方

alt text

在异步调用中,发送者不再直接同步调用接收者的业务接口,而是发送一条消息投递给消息Broker。然后接收者根据自己的需求从消息Broker那里订阅消息。每当发送方发送消息后,接受者都能获取消息并处理。这样,发送消息的人和接收消息的人就完全解耦了

alt text

除了扣减余额、更新支付流水单状态以外,其它调用逻辑全部取消。而是改为发送一条消息到Broker。而相关的微服务都可以订阅消息通知,一旦消息到达Broker,则会分发给每一个订阅了的微服务,处理各自的业务。如果提出了新的需求,比如要在支付成功后更新用户积分。支付代码完全不用变更,而仅仅是让积分服务也订阅消息即可:

alt text

不管后期增加了多少消息订阅者,作为支付服务来讲,执行问扣减余额、更新支付流水状态后,发送消息即可。业务耗时仅仅是这三部分业务耗时,仅仅100ms,大大提高了业务性能。另外,不管是交易服务、通知服务,还是积分服务,他们的业务与支付关联度低。现在采用了异步调用,解除了耦合,他们即便执行过程中出现了故障,也不会影响到支付服务。

综上,异步调用的优势包括:

  • 耦合度更低
  • 性能更好
  • 业务拓展性强
  • 故障隔离,避免级联失败

异步调用的缺点包括:

  • 完全依赖于Broker的可靠性、安全性和性能
  • 架构复杂,后期维护和调试麻烦

技术选型

消息Broker,目前常见的实现方案就是消息队列(MessageQueue),简称为MQ. 目比较常见的MQ实现:

  • ActiveMQ
  • RabbitMQ
  • RocketMQ
  • Kafka

几种常见MQ的对比:

RabbitMQActiveMQRocketMQKafka
公司/社区RabbitApache阿里Apache
开发语言ErlangJavaJavaScala&Java
协议支持AMQP,XMPP,SMTP,STOMPOpenWire,STOMP,REST,XMPP,AMQP自定义协议自定义协议
可用性一般
单机吞吐量一般非常高
消息延迟微秒级毫秒级毫秒级毫秒以内
消息可靠性一般一般

追求可用性:Kafka、 RocketMQ 、RabbitMQ 追求可靠性:RabbitMQ、RocketMQ 追求吞吐能力:RocketMQ、Kafka 追求消息低延迟:RabbitMQ、Kafka

据统计,目前国内消息队列使用最多的还是RabbitMQ,再加上其各方面都比较均衡,稳定性也好。

Docker安装RabbitMQ

  • 搜索RabbitMQ镜像

docker search rabbitmq

  • 下载RabbitMQ

docker pull rabbitmq:3.8.25-management alt text

  • 运行镜像的脚本
yaml
#!/bin/bash

docker run -d \
--name rabbitmq \
--hostname my-rabbitmq-host \
-e RABBITMQ_DEFAULT_USER=admin \
-e RABBITMQ_DEFAULT_PASS=admin123456 \
-p 5672:5672 \
-p 15672:15672 \
-v /var/www/dockerMountPoint/rabbitmq_data_management:/var/lib/rabbitmq \
--restart unless-stopped \
rabbitmq:3.8.25-management

成功运行RabbitMQ容器,无法正常登录的原因

  • 访问rabbitmq主页时会出现无法访问,这是因为没有开启插件
yaml
docker exec -it 容器ID bash
rabbitmq-plugins enable rabbitmq_management

alt text

  • 虚拟机防火墙开放15672、5672的端口 alt text

  • 腾讯云服务器放开对应的安全组 alt text

  • 腾讯云服务器放开对应的防火墙 alt text

RabbitMQ 的整体架构和核心概念

RabbitMQ 有几个核心概念:

  • Publisher:消息发送者
  • Consumer:消息的消费者
  • Queue:消息队列,存储消息
  • Exchange:交换机,负责路由消息
  • VirtualHost:虚拟主机,用于数据隔离

alt text

alt text

RabbitMQ 快速入门

注意事项:交换机只能路由和转发消息,不能存储消息

  • 新建队列: 创建一个名为 hello.queue 的队列

alt text

  • 绑定队列与交换机:将hello.queue绑定和交换机绑定 alt text

  • 发送消息:在RabbitMQ中的amq.fanout交换机发送一条消息,并查看队列中的消息数量,以及队列中的具体消息信息 alt text

SpringBoot项目集成RabbitMQ

  • 新建一个 SpringBoot 项目,并创建 consumer 和 publisher 两个子模块,项目的整体结构如下 alt text

  • 在父工程中引入 SpringAMQP 的依赖

xml
<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
  • 在 consumer 和 publisher 模块的 application.yml 中分别编写与 RabbitMQ 有关的配置信息
yaml
spring:
  rabbitmq:
    host: 主机的IP地址
    port: 5672
    virtual-host: /
    username: admin
    password: admin123456
    listener:
      simple:
        prefetch: 1
  • 在RabbitMQ中创建一个simple.queue的队列

  • 在 publisher 模块中编写测试类,用户向队列发送消息

java
@Test
void testSendMessage() {
    String queueName = "simple.queue";
    String msg = "hello.amqp!!!";
    rabbitTemplate.convertAndSend(queueName, msg);
}
  • 查看RabbitMQ中simple.queue队列中的消息数量以及消息内容

alt text

  • 设置消费者接受消息
java
@RabbitListener(queues = "simple.queue")
public void listenSimpleQueue(String msg) {
    System.out.println("消费者收到消息:" + msg);
}

alt text

Work Queues 模型

Work Queues 的概念

Work Queues,简单地来说,就是让多个消费者绑定到一个队列,共同消费队列中的消息,虽然有多个消费者绑定同一个队列,但是队列中的某一条消息只会被一个消费者消费

alt text

Work Queues 实测

  • 在 publisher服务的 SpringAmqp 测试类中添加以下方法,该方法可以在 1 秒内产生 50 条消息
java
@Test
void testSendMessage1() throws InterruptedException {
    String queueName = "test1";
    for (int i = 0; i < 50; i++) {
        String msg = "hello.amqp------------" + i;
        rabbitTemplate.convertAndSend(queueName, msg);
        Thread.sleep(20);
    }
}
  • 在 consumer 服务的 RabbitMQListener 类中添加以下方法,监听 work.queue 队列;分别在两个消费者中使用Thread.sleep()模拟消费者的速度
java
@RabbitListener(queues = "test1")
public void listTestQueue1(String msg) throws InterruptedException {
    System.out.println("消费者1收到消息---" + msg);
    Thread.sleep(20);
}

@RabbitListener(queues = "test1")
public void listTestQueue2(String msg) throws InterruptedException {
    System.err.println("消费者2收到消息11111---" + msg);
    Thread.sleep(200);
}

alt text

注意: 当消费者性能不一致的时候,队列仍然是采用轮询的方式将任务平均的分配给对应的消费者;但是正常情况下,当消费者的消费能力更强时,应该分配更多的任务,消费能力更弱的消费者分配相对更少的任务数量。

  • 配置消费数量适用消费者的消费能力策略
yaml
spring:
  rabbitmq:
    listener:
      simple:
        prefetch: 1

alt text

交换机

真正的生产环境都会经过交换机来发送消息,而不是直接发送到队列

交换机的作用:

  • 接收 publisher 发送的消息
  • 将消息按照规则路由到与交换机绑定的队列

交换机的类型有以下三种:

  • Fanout:广播
  • Direct:定向
  • Topic:话题

Fanout 交换机

Fanout 交换机会将接收到的消息广播到每一个跟其绑定的 queue ,所以也叫广播模式

alt text

Fanout 快速入门

  • 在RabbitMQ中声明交换机(存在默认的fanout交换机)

alt text

  • 在该交换机上绑定对应的队列

alt text

  • 在 consumer 服务的 RabbitMQListener 类中添加以下方法,分别监听 fanout.queue1 和 fanout.queue2 队列
java
@RabbitListener(queues = "test1")
public void listTestQueue1(String msg) throws InterruptedException {
    System.out.println("消费者1收到消息---" + msg);
    Thread.sleep(20);
}

@RabbitListener(queues = "test1")
public void listTestQueue2(String msg) throws InterruptedException {
    System.err.println("消费者2收到消息11111---" + msg);
    Thread.sleep(200);
}
  • 在 publisher 服务的测试类中添加以下方法,向amq.fanout 交换机发送消息
java
@Test
void testSendMessage2() throws InterruptedException {
    String msg = "Hello, Every One!!!";
    String exchangeName = "amq.fanout";
    rabbitTemplate.convertAndSend(exchangeName, null, msg);
}

alt text

Direct 交换机

Direct 交换机会将接收到的消息根据规则路由到指定的队列,被称为定向路由

  • 每一个 Queue 都与 Exchange 设置一个 bindingKey
  • 发布者发送消息时,指定消息的 RoutingKey
  • Exchange 将消息路由到 bindingKey 与消息 routingKey 一致的队列

alt text

需要注意的是:同一个队列可以绑定多个 bindingKey ,如果有多个队列绑定了同一个 bindingKey ,就可以实现类似于 Fanout 交换机的效果。由此可以看出,Direct 交换机的功能比 Fanout 交换机更强大

快速上手

做案例来体验 Direct 交换机的效果,案例要求如下:

  • 在 RabbitMQ 控制台中,声明队列 direct.queue1 和 direct.queue2
  • 在 RabbitMQ 控制台中,声明交换机 zuo.direct ,将上面创建的两个队列与其绑定
  • 在 consumer 服务中,编写两个消费者方法,分别监听 direct.queue1 和 direct.queue2
  • 在 publisher 服务中编写测试方法,利用不同的 RoutingKey 向 zuo.direct 交换机发送消息

操作流程

  • 在RabbitMQ中创建direct.queue1和direct.queue2队列

alt text

  • 声明zuo.direct 的Direct 类型的交换机,并且通过对应的bindingKey绑定队列的队列信息

alt text

  • 创建消费者监听对应的队列信息
java
@RabbitListener(queues = "direct.queue1")
public void listDirectQueue1(String msg) {
    System.out.println("消费者1收到消息---" + msg);
}

@RabbitListener(queues = "direct.queue2")
public void listDirectQueue2(String msg) {
    System.err.println("消费者2收到消息---" + msg);
}
  • 创建生产者发送消息到队列
java
@Test
void testSendMessage3() {
    String msg = "Hello, 测试Direct类型的交换机---- RoutingKey = red";
    String exchangeName = "zuo.direct";
    rabbitTemplate.convertAndSend(exchangeName, "red", msg);
}

@Test
void testSendMessage4() {
    String msg = "Hello, 测试Direct类型的交换机---- RoutingKey = blue";
    String exchangeName = "zuo.direct";
    rabbitTemplate.convertAndSend(exchangeName, "blue", msg);
}
  • 测试结果

alt text

Topic 交换机

Topic Exchange 与 Direct Exchange类似,区别在于 Topic Exchange 的 routingKey 可以是多个单词的列表(多个 routingKey 之间以.分割) Queue 与 Exchange 指定 bindingKey 时可以使用通配符 #:代指 0 个或多个单词 *:代指 1 个单词

alt text

快速上手

做一个案例来体验 Topic 交换机的效果,案例要求如下:

在 RabbitMQ 控制台中,声明队列 topic.queue1 和 topic.queue2
在 RabbitMQ 控制台中,声明交换机 zuo.topic ,将两个队列与其绑定
在 consumer 服务中编写两个消费者方法,分别监听 topic.queue1 和 topic.queue2
在 publisher 服务中编写测试方法,利用不同的 routingKey 向 zuo.topic 发送消息
  • 创建topic.queue1 和 topic.queue2队列信息

alt text

  • 创建zuo.topic的交换机,并绑定对应的队列和设置对应的Routingkey

alt text

  • 在consumer模块设置消费者
java
@RabbitListener(queues = "topic.queue1")
public void listTopicQueue1(String msg) {
    System.out.println("消费者1收到消息---" + msg);
}

@RabbitListener(queues = "topic.queue2")
public void listTopicQueue2(String msg) {
    System.err.println("消费者2收到消息---" + msg);
}
  • 在publisher模块中设置生产者
java
@Test
void testSendMessage5() {
    String msg = "Hello, 测试Topic类型的交换机 RoutingKey + china.news";
    String exchangeName = "zuo.topic";
    rabbitTemplate.convertAndSend(exchangeName, "china.news", msg);
}

@Test
void testSendMessage6() {
    String msg = "Hello, 测试Topic类型的交换机 RoutingKey + china.queue";
    String exchangeName = "zuo.topic";
    rabbitTemplate.convertAndSend(exchangeName, "china.queue", msg);
}
  • 测试结果

alt text

代码声明队列和交换机

我们之前创建队列和交换机都是在 RabbitMQ 的控制台页面中创建的,不仅十分繁琐,还有可能打错队列和交换机的名。而且,不同的环境(开发环境、测试环境、生产环境)可能会有不同的队列和交换机,手动创建队列和交换机效率十分低下

SpringAQMP提供的创建队列和交换机的类

SpringAMQP 提供了几个类,用来声明队列、交换机及其绑定关系:

  • Queue:用于声明队列,可以用工厂类 QueueBuilder 构建
  • Exchange:用于声明交换机,可以用工厂类 ExchangeBuilder 构建
  • Binding:用于声明队列和交换机的绑定关系,可以用工厂类 BindingBuilder 构建

alt text

快速上手

  • 创建一个 Fanout 类型的交换机,并且创建队列与这个交换机绑定

编程式声明

  • 在 consumer 服务中编写 FanoutConfiguration 配置类
绝密文件,请勿展开
java
@Configuration
public class DirectConfiguration {
    /**
     * 创建名称为hello.direct 的交换机
     *
     * @return
     */
    @Bean
    public DirectExchange directExchange() {
        return ExchangeBuilder.directExchange("hello.direct").build();
    }

    /**
     * 创建名称为hello.queue3的队列
     *
     * @return
     */
    @Bean
    public Queue directQueue3() {
        return QueueBuilder.durable("hello.queue3").build();
    }

    /**
     * 创建名称为hello.queue4的队列
     *
     * @return
     */
    @Bean
    public Queue directQueue4() {
        return QueueBuilder.durable("hello.queue4").build();
    }

    @Bean
    public Binding bindingQueue3WithBlue() {
        return BindingBuilder.bind(directQueue3()).to(directExchange()).with("blue");
    }

    @Bean
    public Binding bindingQueue3WithRed() {
        return BindingBuilder.bind(directQueue3()).to(directExchange()).with("red");
    }

    @Bean
    public Binding bindingQueue4WithRed() {
        return BindingBuilder.bind(directQueue4()).to(directExchange()).with("red");
    }
}

注解式声明

  • SpringAMOP 提供了基于@RabbitListener注解声明队列和交换机的方式
java
@RabbitListener(bindings = @QueueBinding(
        value = @Queue(name = "direct.queue3"),
        exchange = @Exchange(name = "you.direct", type = ExchangeTypes.DIRECT),
        key = {"red", "blue"}
))
public void listDirectQueue3(String msg) {
    System.out.println("消费者3收到消息----" + msg);
}

消息转化器

在了解消息转换器之前,我们先来做一个小案例,案例的内容是利用 SpringAMQP 发送一条消息,消息的内容为一个 Java 对象,案例要求如下:

  • 在 RabbitMQ 控制台创建一个队列,名为 object.queue
  • 编写单元测试,向该队列中直接发送一条消息,消息的内容为 Map
  • 在控制台查看消息

代码实现

  • 向object.queue队列发送Map集合信息
java
@Test
void testSendMessage7() {
    Map<String, Object> map = new HashMap<>();
    map.put("name", "Tom");
    map.put("age", 21);
    rabbitTemplate.convertAndSend("object.queue", map);
}
  • 成功发送消息后,我们在 RabbitMQ 的控制台查看消息的具体内容

alt text

结论: 可以发现,消息的内容类型为 application/x-java-serialized-object,并且消息的内容也变成一堆乱码;本来是想发送一个简单的仅含有姓名和年龄两个字段的简短信息,但是消息却变成了一堆乱码,不仅可读性大大下降,而且占用的空间也大大地增加了,这显然不是我们想要的效果

默认的消息转换器

Spring 处理对象类型的消息是由 org.springframework.amap.support.converter.MessageConverter 接口来处理的,该接口默认实现是 SimpleMessageConverter

SimpleMessageConverter 类是基于 JDK 提供的 ObjectOutputStream 来类完成序列化的,这种序列化方式存在以下问题:

  • 使用 JDK 序列化有安全风险(如果序列化后的消息被恶意篡改,在反序列化的过程中可能会执行一些高危的代码)
  • 经过 JDK 序列化的消息占用空间很大
  • 经过 JDK 序列化的消息可读性很差

自定义消息转换器

  • 一般建议采用 JSON 序列化代替默认的 JDK 序列化;要使用 JSON 序列化,需要先引入 jackson 依赖(在项目的父工程中引入)
xml
<dependency>
    <groupId>com.fasterxml.jackson.dataformat</groupId>
    <artifactId>jackson-dataformat-xml</artifactId>
</dependency>
  • 接着在 consumer 服务和 publisher 服务中配置 MessageConverter
java
@Bean
public MessageConverter jacksonMessageConvertor(){
    return new Jackson2JsonMessageConverter();
}
  • 再次发送对象类型的消息,可以看到消息已经成功转换成 JSON 类型的字符串
java
 @Test
void testSendMessage7() {
    Map<String, Object> map = new HashMap<>();
    map.put("name", "Tom");
    map.put("age", 21);
    rabbitTemplate.convertAndSend("object.queue", map);
    }
  • 在 consumer 服务的 RabbitMQListener 类中添加对 object.queue 队列的监听(用什么类型发,就用什么类型接收)
java
@RabbitListener(queues = "object.queue")
public void listDirectObjectQueue(Map<String, Object> map) {
    System.out.println("ObjectQueue队列中的消息为" + map);
}
  • 消费者的消费结果

alt text

生产者的可靠性

生产者重连

  • 由于网络问题,可能会出现客户端连接 RabbitMQ 失败的情况,我们可以通过配置开启连接 RabbitMQ 失败后的重连机制
yaml
spring:
  rabbitmq:
    host: 101.43.73.194
    port: 5672
    virtual-host: /
    username: admin
    password: admin123456
    connection-timeout: 1s # 连接超时时间
    template:
      retry:
        initial-interval: 1000ms # 连接失败后的初始等待时间
        multiplier: 1 # 连接失败后的等待时常倍数,下次等待时常 = (initial - interval) * multiplier
        max-attempts: 3 # 最大重试次数
        enabled: true # 开启超时重试机制
  • 填写完配置信息后,我们手动停止 RabbitMQ ,模拟生产者连接 RabbitMQ 失败的情况
yaml
docker stop rabbitmq
  • 启动测试类
java
@Test
void testSendMessageToQueue() {
    String queueName = "simple.queue";
    String msg = "Hello, SpringAMQP!";
    rabbitTemplate.convertAndSend(queueName, msg);
}
  • 可以在控制台看到,总共有三次重新连接 RabbitMQ 的记录,三次连接都失败后,就直接抛异常了

alt text

注意事项:

  • 当网络不稳定的时候,利用重试机制可以有效提高消息发送的成功率,但 SpringAMOP 提供的重试机制是阻塞式的重试,也就是说多次重试等待的过程中,线程会被阻塞,影响业务性能
  • 如果对于业务性能有要求,建议禁用重试机制。如果一定要使用,请合理配置等待时长(比如 200 ms)和重试次数,也
  • 可以考虑使用异步线程来执行发送消息的代码

生产者确认

RabbitMQ 提供了 Publisher Confirm 和 Publisher Return 两种确认机制。开启确机制认后,如果 MQ 成功收到消息后,会返回确认消息给生产者,返回的结果有以下几种情况:

  • 消息投递到了 MQ,但是路由失败,此时会通过 PublisherReturn 机制返回路由异常的原因,然后返回 ACK,告知生产者消息投递成功
  • 临时消息投递到了 MQ,并且入队成功,返回 ACK,告知生产者消息投递成功
  • 持久消息投递到了MQ,并且入队完成持久化,返回 ACK,告知生产者消息投递成功
  • 其它情况都会返回 NACK,告知生产者消息投递失败

alt text

生产者确认机制代码实现

  • 在 publisher 服务中编写与生产者确认机制有关的配置信息( application.yml 文件)
yaml
spring:
  rabbitmq:
    publisher-returns: true
    publisher-confirm-type: correlated
  • publisher-confirm-type 有三种模式
    • none:关闭 confirm 机制
    • simple:以同步阻塞等待的方式返回 MQ 的回执消息
    • correlated:以异步回调方式的方式返回 MQ 的回执消息

注意: 每个 RabbitTemplate 只能配置一个 ReturnCallback

  • 在 publisher 模块新增一个名为 RabbitMQConfig 的配置类,并让该类实现 ApplicationContextAware 接口
java
@Slf4j
@Configuration
public class MqConfirmConfig implements ApplicationContextAware {
    @Override
    public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
        RabbitTemplate rabbitTemplate = applicationContext.getBean(RabbitTemplate.class);
        // 配置消息没有到交换机回调函数
        rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() {
            @Override
            public void confirm(CorrelationData correlationData, boolean b, String s) {
                log.debug("消息发送到对应的详细信息交换机: id:{}, toString:{}, boolean:{}, s:{}" +
                        correlationData.getId(), correlationData, b, s);
            }
        });       
        // 配置消息没有到队列回调函数
        rabbitTemplate.setReturnsCallback(new RabbitTemplate.ReturnsCallback() {
            @Override
            public void returnedMessage(ReturnedMessage returnedMessage) {
                log.debug("收到消息的return callback, exchange:{}, key:{},msg:{},code:{},text:{}",
                        returnedMessage.getExchange(), returnedMessage.getRoutingKey(),
                        returnedMessage.getMessage(), returnedMessage.getReplyCode(),
                        returnedMessage.getReplyText());
            }
        });
    }
}
  • 在 publisher 模块添加一个测试类,测试 ReturnCallback 的效果
java
 @Test
void testConfirmCallBack() throws InterruptedException {
    // 1、创建cd
    CorrelationData cd = new CorrelationData(UUID.randomUUID().toString());
    // 2、添加ConfirmCallBack
    cd.getFuture().addCallback(new ListenableFutureCallback<CorrelationData.Confirm>() {
        @Override
        public void onFailure(Throwable ex) {
            log.error("消息回调失败", ex);
        }

        @Override
        public void onSuccess(CorrelationData.Confirm result) {
            log.debug("收到confirm callback回执");
            if (result.isAck()) {
                // 消息发送成功
                log.debug("消息发送成功,收到ack");
            } else {
                //消息发送失败
                log.debug("消息发送失败,收到nack");
            }
        }
    });
    rabbitTemplate.convertAndSend("zuo.direct1", "blue", "测试发送消息回调机制", cd);
    Thread.sleep(10000);
}
  • 发送成功后可以看到消息发送成功的回调信息

alt text

  • 如果发送消息不存在对应的routingKey,如下如报错信息 alt text

  • 如果发送消息不存在对应的交换机,如下如报错信息 alt text

数据的持久化

为了提升性能,默认情况下MQ的数据都是在内存存储的临时数据,重启后就会消失。为了保证数据的可靠性,必须配置数据持久化,包括:

  • 交换机持久化
  • 队列持久化
  • 消息持久化

交换机持久化

在控制台的Exchanges页面,添加交换机时可以配置交换机的Durability参数:设置为Durable就是持久化模式,Transient就是临时模式。

alt text

当使用docker命令重启容器时,会发现该属性为Transient交换机不存在

队列持久化

在控制台的Queues页面,添加队列时,同样可以配置队列的Durability参数:可以创建一个名 test1 的队列;然后设置为 Transient ,再重启 mq 之后查看;它已经不存在,说明它是临时的,重启之后就不存在。

alt text

消息持久化

在控制台发送消息的时候,可以添加很多参数,而消息的持久化是要配置一个Delivery mode:在 simple.queue 下发两条消息;一条是持久化的,一条是临时的消息;然后重启mq再查看队列中的消息。发现持久化的消息存在,临时的消息不存在。

alt text

消息代理(RabbitMQ)的可靠性

队列 Page Out机制

在默认情况下,RabbitMQ 会将接收到的信息保存在内存中以降低消息收发的延迟,这样会导致两个问题:

  • 一旦 RabbitMQ 宕机,内存中的消息会丢失
  • 内存空间是有限的,当消费者处理过慢或者消费者出现故障或时,会导致消息积压,引发 MQ 阻塞( Paged Out 现象)

怎么理解 MQ 阻塞呢,当队列的空间被消息占满了之后,RabbitMQ 会先把老旧的信息存到磁盘,为新消息腾出空间,在这个过程中,整个 MQ 是被阻塞的,也就是说,在 MQ 完成这一系列工作之前,无法处理已有的消息和接收新的消息

alt text

alt text

演示一下 RabbitMQ 发生 Paged Out 现象(也就是队列的空间被消息占满了之后,将老旧消息移到磁盘,为新消息腾出空间的情况)

  • 在发送消息之前,先把生产者确认机制关闭,提高消息发送的速度
java
spring:
  rabbitmq:
    publisher-returns: false
    publisher-confirm-type: none
  • 测试发送一百万调非持久化信息
java
@Test
void testPagedOut() {
    Message message = MessageBuilder.withBody("Hello, paged out".getBytes(StandardCharsets.UTF_8))
            .setDeliveryMode(MessageDeliveryMode.NON_PERSISTENT)
            .build();

    for (int i = 0; i < 1000000; i++) {
        rabbitTemplate.convertAndSend("simple.queue", message);
    }
}
  • 测试结果:当内存满了之后,就会将数据信息转到磁盘上,这个时间段mq 是不会接收消息,相当于处于阻塞状态

alt text

向RabbitMQ 中发送持久化的消息,当消息存满内存,mq 是否会出现阻塞

  • 测试发送持久化消息
java
@Test
void testPagedOut() {
    Message message = MessageBuilder.withBody("Hello, paged out".getBytes(StandardCharsets.UTF_8))
            .setDeliveryMode(MessageDeliveryMode.PERSISTENT)
            .build();

    for (int i = 0; i < 1; i++) {
        rabbitTemplate.convertAndSend("simple.queue", message);
    }
}
  • 测试结果:当内存满了之后,mq 不会阻塞,数据会存入磁盘中

alt text

LazyQueue(3.12 版本后所有队列都是 Lazy Queue 模式)

从 RabbitMQ 的 3.6.0 版本开始,增加了 Lazy Queue 的概念,也就是惰性队列,惰性队列的特征如下:

  • 接收到消息后直接存入磁盘而非内存(内存中只保留最近的消息,默认2048条)
  • 消费者要处理消息时才会从磁盘中读取并加载到内存
  • 支持数百万条的消息存储,在 3.12 版本后,所有队列都是 Lazy Queue 模式,无法更改

开启持久化和生产者确认时,RabbitMQ 只有在消息持久化完成后才会给生产者返回 ACK 回执

创建Lazy Queue方式

  • 在 RabbitMQ 控制台中,要创建一个惰性队列,只需要在声明队列时,指定 x-queue-mode 属性为 lazy 即可

alt text

  • 在 Java 代码中,要创建一个惰性队列,只需要在声明队列时,指定 x-queue-mode 属性为 lazy
java
@Bean
public org.springframework.amqp.core.Queue lazeQueue() {
    return QueueBuilder.durable("lazy.queue1")
            .lazy()
            .build();
}
  • 注解式创建
java
@RabbitListener(queuesToDeclare = @org.springframework.amqp.rabbit.annotation.Queue(
        name = "lazy.queue2",
        durable = "true",
        arguments = @Argument(
                name = "x-queue-mode",
                value = "lazy"
        )
))
public void listenLazeQueue(String message) {
    System.out.println("消费者收到了 laze.queue2的消息: " + message);
}

性能测试

测试向Lazy Queue队列中发送一百万条消息,查看mq 处理消息的效率以及是否会出现阻塞状态

alt text

结论:mq 将接收的消息存入Page Out中,所以不会出现mq阻塞的现象,而且消息处理的效率高

消费者的可靠性

消费者的确认机制

为了确认消费者是否成功处理消息,RabbitMQ 提供了消费者确认机制(Consumer Acknowledgement)。处理消息后,消费者应该向 RabbitMQ 发送一个回执,告知 RabbitMQ 消息的处理状态,回执有三种可选值:

  • ack:成功处理消息,RabbitMQ 从队列中删除该消息
  • nack:消息处理失败,RabbitMQ 需要再次投递消息
  • reject:消息处理失败并拒绝该消息,RabbitMQ 从队列中删除该消息

alt text

SpringAMQP 已经实现了消息确认功能,并允许我们通过配置文件选择 ACK 的处理方式,有三种方式:

  • none:不处理,即消息投递给消费者后立刻 ack,消息会立刻从 MQ 中删除,非常不安全,不建议使用
  • manual:手动模式。需要自己在业务代码中调用 api,发送 ack 或 reject ,存在业务入侵,但更灵活
  • auto:自动模式,SpringAMQP 利用 AOP 对我们的消息处理逻辑做了环绕增强,当业务正常执行时则自动返回 ack,当业务出现异常时,会根据异常的类型返回不同结果:
    • 如果是业务异常,会自动返回 nack
    • 如果是消息处理或校验异常,自动返回 reject

代码实现

  • 开启消息确认机制,设置确认类型为none 需要在 application.yml 文件中编写相关的配置
yaml
spring:
  rabbitmq:
    listener:
      simple:
        prefetch: 1
        acknowledge-mode: none
  • 测试处理模式为 none 的情况,向 simple.queue 队列发送一条消息,同时监听 simple.queue 队列的消息,监听到队列中的消息后手动抛出一个异常
java
@RabbitListener(queues = "simple.queue")
public void listenSimpleQueue(String msg) {
    System.out.println("消费者收到消息:" + msg);
    throw new RuntimeException("抛出异常");
}
  • 测试结果:消息确认机制设置的none 时,队列中的消息投递给消费者后立刻ack, 消息会立刻从MQ中删除

alt text

  • 设置确认类型为auto, 将application.yml中acknowledge-mode中的值设置为auto,并且消费者中代码逻辑不变

  • 处理模式为auto 类型的处理结果:消息确认机制设置为auto 时,当消费者在消费的过程中抛出异常时,队列会一直重复的投递,直到消息消费成功为止

alt text

失败重试机制

当消费者出现异常后,消息会不断重新入队,重新发送给消费者,然后再次发生异常,再次 requeue(重新入队),陷入 无限循环,给 RabbitMQ 带来不必要的压力;可以利用 Spring 提供的 retry 机制,在消费者出现异常时利用本地重试,而不是无限制地重新入队。

  • 在 application.yml 配置文件中开启失败重试机制
yaml
spring:
  rabbitmq:
    listener:
      simple:
        prefetch: 1
        acknowledge-mode: auto
        retry:
          enabled: true # 开启消息消费失败重试机制
          initial-interval: 1000ms # 消息消费失败后的初始等待时间
          multiplier: 1 # 消息消费失败后的等待时长倍数,下次等待时长 = (initial-interval) * multiplier
          max-attempts: 3 # 最大重试次数
          stateless: true # true表示无状态,false表示有状态,如果业务中包含事务,需要设置为false
  • 消费者代码
java
@RabbitListener(queues = "simple.queue")
public void listenSimpleQueue(String msg) {
    System.out.println("消费者收到消息:" + msg);
    throw new RuntimeException("抛出异常");
}
  • 测试结果,查看控制台结果:当消费者在消费的过程中出现异常,会进入重试机制,当当前的重试次数大于配置文件中设置的最大重试次数时,会抛出对应的异常

alt text

失败消息处理策略

开启重试模式后,如果重试次数耗尽后消息依然处理失败,则需要由 MessageRecoverer 接口来处理, MessageRecoverer 有三个实现类:

  • RejectAndDontRequeueRecoverer:重试次数耗尽后,直接 reject,丢弃消息,默认就是这种方式
  • ImmediateRequeueMessageRecoverer:重试次数耗尽后,返回 nack,消息重新入队
  • RepublishMessageRecoverer:重试耗尽后,将失败消息投递到指定的交换机

alt text

实现使用 RepublishMessageRecoverer 类的情况

  • 定义一个名为 blog.error 的交换机、一个名为 error.queue 的队列,并将队列和交换机进行绑定;将失败处理策略改为 RepublishMessageRecoverer (开起了消费者重试机制才会生效)
绝密文件,请勿展开
java
@Configuration
@ConditionalOnProperty(prefix = "spring.rabbitmq.listener.simple.retry", name = "enabled", havingValue = "true")
public class ErrorConfiguration {
    @Bean
    public DirectExchange errorExchange() {
        return new DirectExchange("error.direct");
    }

    @Bean
    public Queue errorQueue() {
        return new Queue("error.queue");
    }

    @Bean
    public Binding errorBinding(Queue errorQueue, DirectExchange errorExchange) {
        return BindingBuilder.bind(errorQueue).to(errorExchange).with("error");
    }

    @Bean
    public MessageRecoverer messageRecoverer(RabbitTemplate rabbitTemplate) {
        return new RepublishMessageRecoverer(rabbitTemplate, "error.direct", "error");
    }
}
  • 在控制台中可以看到,消息的重试次数耗尽后,消息被放入了 error.queue 队列

alt text

  • 在 RabbitMQ 的控制塔也可以看到, error.direct 交换机 和 error.queue 队列成功创建,消息也成功放入了 error.queue 队列

alt text

业务的幂等性

幂等性的概念

幂等是一个数学概念,用函数表达式来描述是这样的:f(x) = f(f(x)),绝对值函数具有幂等性;在程序开发中,幂等是指同一个业务,执行一次或多次对业务状态的影响是一致的

alt text

保证消息的幂等性:为每个消息设置一个唯一的ID

给每个消息都设置一个唯一的 id,利用 id 区分是否是重复消息:

  • 为每条消息都生成一个唯一的 id,与消息一起投递给消费者
  • 消费者接收到消息后处理自己的业务,业务处理成功后将消息 id 保存到数据库
  • 如果消费者下次又收到相同消息,先去数据库查询该消息对应的 id 是否存在,如果存在则为重复消息,放弃处理

可以在指定 MessageConverter 的具体类型时,同时为 MessageConverter 设置自动创建一个 messageId

java
@Bean
public MessageConverter jacksonMessageConvertor() {
    Jackson2JsonMessageConverter jackson2JsonMessageConverter = new Jackson2JsonMessageConverter();
    jackson2JsonMessageConverter.setCreateMessageIds(true);
    return jackson2JsonMessageConverter;
}

alt text

  • 在rabbitmq控制台中查看队列中的消息,消息中包含message_id值 alt text

保证消息的幂等性:结合业务做判断

结合业务逻辑,基于业务本身做判断。以支付业务为例:我们要在支付后修改订单状态为已支付,应该在修改订单状态前先查询订单状态,判断状态是否是未支付,只有未支付订单才需要修改,其它状态的订单不做处理。

alt text

兜底策略-定时任务核查

定时任务定期执行,查询那些:

  • 状态为“支付中”(Pending)
  • 创建时间或最后更新时间已经超过一个合理阈值(例如15分钟) 的订单。
  • 然后,主动调用支付服务的查询接口,获取这些订单的最终支付状态,并更新本地数据库。

延迟消息

延迟消息的概念

延迟消息:生产者发送消息时指定一个时间,消费者不会立刻收到消息,而是在指定时间之后才会收到消息

延迟任务:一定时间之后才会执行的任务

alt text

死信交换机

当一个队列中的某条消息满足下列情况之一时,就会成为死信(dead letter)

  • 消费者使用 basic.reject 或 basic.nack 声明消费失败,并且消息的 requeue 参数设置为 false
  • 过期消息(达到了队列或消息本身设置的过期时间),消息超时后无人消费
  • 要投递的队列消息堆积满了,最早的消息可能成为死信

如果队列通过 dead-letter-exchange 属性指定了一个交换机,那么该队列中的死信就会投递到这个交换机中,这个交换机称为死信交换机(Dead Letter Exchange,简称DLX)

alt text

死信交换机实现发送延迟消息

  • 定义普通队列,死信队列,普通的交换机以及死信交换机;并且将队列和交换机进行对应的绑定
绝密文件,请勿展开
java
@Configuration
public class DeadExchangeConfiguration {
    // 定义死信交换机
    public static final String DEAD_LETTER_EXCHANGE = "dead.exchange";
    public static final String DEAD_LETTER_QUEUE = "dead.queue";

    // 定义主队列
    public static final String MAIN_QUEUE = "simple.queue";

    // 定义主队列绑定的交换机
    public static final String MAIN_EXCHANGE = "simple.exchange";

    @Bean
    public DirectExchange mainExchange() {
        return new DirectExchange(MAIN_EXCHANGE);
    }

    @Bean
    public Queue mainQueue() {
        return QueueBuilder.durable(MAIN_QUEUE)
                .withArgument("x-dead-letter-exchange", DEAD_LETTER_EXCHANGE) // 绑定死信交换机
                .withArgument("x-dead-letter-routing-key", "dead") // 指定死信路由键
                .build();
    }

    @Bean
    public DirectExchange deadLetterExchange() {
        return new DirectExchange(DEAD_LETTER_EXCHANGE);
    }

    @Bean
    public Queue deadLetterQueue() {
        return new Queue(DEAD_LETTER_QUEUE);
    }

    @Bean
    public Binding bindingMainQueue() {
        return BindingBuilder.bind(mainQueue()).to(mainExchange()).with("simple");
    }

    @Bean
    public Binding bindingDeadLetterQueue() {
        return BindingBuilder.bind(deadLetterQueue()).to(deadLetterExchange()).with("dead");
    }
}
  • 生产者发送对应的信息,并设置过期时间
java
@Test
void testDlxQueue() {
    String exchangeName = "simple.exchange";
    String msg = "hello, Dead queue!!!";
    MessageProperties properties = MessagePropertiesBuilder.newInstance()
            .setContentType(MessageProperties.CONTENT_TYPE_TEXT_PLAIN)
            .setMessageId(UUID.randomUUID().toString())
            .setExpiration("3000")
            .build();
    Message message = new Message(msg.getBytes(), properties);
    rabbitTemplate.convertAndSend(exchangeName, "simple", message);
}
  • 设置死信队列的消费者(普通队列无需设置消费者)
java
@RabbitListener(queues = "dead.queue")
public void processDeadLetterMessage(String message) {
    System.out.println("Dead letter received: " + message);
}
  • 测试结果:监听死信队列的消费者输出消息;说明当消息发送到普通队列中,当消息过期后会通过死信交换机转发到死信队列中

alt text

延迟消息插件

安装插件的方式

  • 下载延迟插件安装包,下载的插件版本需要和RabbitMQ的版本兼容 下载地址:textalt text

  • 将插件移动到容器内部

docker cp /var/www/dockerMountPoint/rabbitmq_delayed_message_exchange-3.9.0.ez 容器ID:/plugins

  • 进入容器中启动插件

rabbitmq-plugins enable rabbitmq_delayed_message_exchange

  • 安装插件结果 alt text

代码实现发送延迟消息

  • 创建交换机和队列,并将交换机设置延迟队列的属性
java
@Configuration
public class DxlConfiguration {
    @Bean
    public DirectExchange dxlExchange() {
        return ExchangeBuilder.directExchange("dxl.exchange").delayed().durable(true).build();
    }
    @Bean
    public Queue dxlQueue() {
        return QueueBuilder.durable("dxl.queue").build();
    }
    @Bean
    public Binding bindingDxlQueue() {
        return BindingBuilder.bind(dxlQueue()).to(dxlExchange()).with("dxl");
    }
}
  • 设置消费者,监听dxl.queue队列
java
@RabbitListener(queues = "dxl.queue")
public void processDxlLetterMessage(String message) {
    System.out.println("DXL letter received: " + message);
}
  • 设置生产者,发送消息到交换机
java
 @Test
void testDlxQueue() {
    String exchangeName = "dxl.exchange";
    String msg = "hello, DXL queue!!!";
    rabbitTemplate.convertAndSend(exchangeName, "dxl", msg, new MessagePostProcessor() {
        @Override
        public Message postProcessMessage(Message message) throws AmqpException {
            message.getMessageProperties().setDelay(10000);
            return message;
        }
    });
}
  • 测试结果:过了设置10s时间,消费者消费了这条消息 alt text

取消超时订单

设置 30 分钟后检测订单支付状态实现起来非常简单,但是存在两个问题:

  • 如果并发较高,30分钟可能堆积消息过多,对 MQ 压力很大
  • 大多数订单在下单后 1 分钟内就会支付,但消息需要在 MQ 中等待30分钟,浪费资源

alt text

alt text

代码实现

  • 定义一个实体类,用于记录延迟消息的内容和延迟消息的延迟时间列表(该实体类也是延迟消息的类型)
java
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;

public class MultipleDelayMessage<T> {

    private T data;

    private List<Long> delayMillis;

    public MultipleDelayMessage() {

    }

    public MultipleDelayMessage(T data, Long... delayMillis) {
        this.data = data;
        this.delayMillis = new ArrayList<>(Arrays.asList(delayMillis));
    }

    public MultipleDelayMessage(T data, List<Long> delayMillis) {
        this.data = data;
        this.delayMillis = delayMillis;
    }

    public static <T> MultipleDelayMessage<T> of(T data, Long... delayMillis) {
        return new MultipleDelayMessage<>(data, new ArrayList<>(Arrays.asList(delayMillis)));
    }

    public static <T> MultipleDelayMessage<T> of(T data, List<Long> delayMillis) {
        return new MultipleDelayMessage<>(data, delayMillis);
    }

    public boolean hasNextDelay() {
        return !delayMillis.isEmpty();
    }

    public Long removeNextDelay() {
        return delayMillis.remove(0);
    }

    public T getData() {
        return data;
    }

    public void setData(T data) {
        this.data = data;
    }

    public List<Long> getDelayMillis() {
        return delayMillis;
    }

    public void setDelayMillis(List<Long> delayMillis) {
        this.delayMillis = delayMillis;
    }

    @Override
    public String toString() {
        return "MultipleDelayMessage{" +
                "data=" + data +
                ", delayMillis=" + delayMillis +
                '}';
    }
}
  • 定义一个发送延迟消息的消息处理器,供所有服务使用
java
import org.springframework.amqp.AmqpException;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessagePostProcessor;

public class DelayMessagePostProcessor implements MessagePostProcessor {

    private final Integer delay;

    public DelayMessagePostProcessor(Integer delay) {
        this.delay = delay;
    }

    @Override
    public Message postProcessMessage(Message message) throws AmqpException {
        message.getMessageProperties().setDelay(delay);
        return message;
    }

}
  • 创建交换机和队列
java
@Configuration
public class OrderConfiguration {
    @Bean
    public DirectExchange orderExchange() {
        return ExchangeBuilder.directExchange("order.exchange").build();
    }

    @Bean
    public Queue orderQueue() {
        return QueueBuilder.durable("order.queue").build();
    }

    @Bean
    public Binding binding() {
        return BindingBuilder.bind(orderQueue()).to(orderExchange()).with("order");
    }
}
  • 生产者发送消息
java
@Test
void testOrderQueue() {
    MultiDelayMessage<String> message = MultiDelayMessage.of("hello, Order!!!", 1000L, 2000L, 4000L, 8000L, 16000L);
    rabbitTemplate.convertAndSend("order.exchange",
            "order",
            message,
            new DelayMessagePostProcessor(message.removeNextDelay().intValue()));
}
  • 消费者消费消息
java
@RabbitListener(queues = "order.queue")
public void processOrderMessage(MultiDelayMessage multiDelayMessage) {
    if (multiDelayMessage.hasNextDelay()) {
        Long multi = multiDelayMessage.removeNextDelay();
        rabbitTemplate.convertAndSend("order.exchange",
                "order",
                multiDelayMessage,
                new DelayMessagePostProcessor(multi.intValue()));
        System.out.println("发送的延迟时间为:" + multi);
    } else {
        System.out.println("任务成功完成");
    }
}
  • 测试结果

alt text

Last updated: