登录 |  注册
首页 >  面试合集 >  Java面试宝典(第三部分·高级) >  RabbitMQ 持久化成功的条件

RabbitMQ 持久化成功的条件

要保证消息持久化成功的条件有哪些?

  • 声明队列必须设置持久化 durable 设置为 true.

  • 消息推送投递模式必须设置持久化,deliveryMode 设置为 2(持久)。

  • 消息已经到达持久化交换器。

  • 消息已经到达持久化队列。

以上四个条件都满足才能保证消息持久化成功。


要解决该问题, 就要用到 RabbitMQ 中持久化的概念, 所谓持久化, 就是 RabbitMQ 会将内存中的数据 (Exchange 交换器, Queue 队列, Message 消息) 固化到磁盘, 以防异常情况发生时, 数据丢失.

其中, RabblitMQ 的持久化分为三个部分:

  • 交换器 (Exchange) 的持久化

  • 队列 (Queue) 的持久化

  • 消息 (Message) 的持久化

交换器 (Exchange) 的持久化

我们声明 Exchange 的代码是这样的:

private final static String EXCHANGE_NAME = "normal-confirm-exchange";
// 创建一个 Exchange
channel.exchangeDeclare(EXCHANGE_NAME, "direct");

这种情况下声明的 Exchange 是非持久化的, 在 RabbitMQ 出现异常情况 (重启, 宕机) 时, 该 Exchange 会丢失, 会影响后续的消息写入该 Exchange, 那么如何设置 Exchange 为持久化的呢? 答案是设置 durable 参数.

durable: 设置是否持久化. durable 设置为 true 表示持久化, 反之是非持久化.

持久化可以将交换器存盘, 在服务器重启的时候不会丢失相关信息.

设置 Exchange 持久化:

channel.exchangeDeclare(EXCHANGE_NAME, "direct", true);

此时调用的重载方法为:

public DeclareOk exchangeDeclare(String exchange, String type, boolean durable) throws IOException {
    return this.exchangeDeclare(exchange, (String)type, durable, false, (Map)null);
}

为了能更好的理解, 我们新建个生产类如下:

package com.zwwhnly.springbootaction.rabbitmq.durable;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
public class DurableProducer {
    private final static String EXCHANGE_NAME = "durable-exchange";
    private final static String QUEUE_NAME = "durable-queue";
    public static void main(String[] args) throws IOException, TimeoutException {
        // 创建连接
        ConnectionFactory factory = new ConnectionFactory();
        // 设置 RabbitMQ 的主机名
        factory.setHost("localhost");
        // 创建一个连接
        Connection connection = factory.newConnection();
        // 创建一个通道
        Channel channel = connection.createChannel();
        // 创建一个 Exchange
        channel.exchangeDeclare(EXCHANGE_NAME, "direct");
        channel.queueDeclare(QUEUE_NAME, false, false, false, null);
        channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "");
        // 发送消息
        String message = "durable exchange test";
        channel.basicPublish(EXCHANGE_NAME, "", null, message.getBytes());
        // 关闭频道和连接
        channel.close();
        connection.close();
    }
}

示例代码中, 我们新建了 1 个非持久化的 Exchange,1 个非持久化的 Queue, 并将它们做了绑定, 此时运行代码, Exchange 和 Queue 新建成功, 消息'durable exchange test'也被正确地投递到了队列中:

RabbitMQ.png

RabbitMQ.png

此时重启下 RabbitMQ 服务, 会发现 Exchange 丢失了:

RabbitMQ.png

修改下代码, 将 durable 参数设置为 ture:

// 创建一个 Exchange
channel.exchangeDeclare(EXCHANGE_NAME, "direct", true);

此时运行完代码, 然后重启下 RabbitMQ 服务, 会发现 Exchange 不再丢失:

RabbitMQ.png

队列 (Queue) 的持久化

细心的网友可能会发现, 虽然现在重启 RabbitMQ 服务后, Exchange 不丢失了, 但是队列和消息丢失了, 那么如何解决队列不丢失呢? 答案也是设置 durable 参数.

durable: 设置是否持久化. 为 true 则设置队列为持久化.

持久化的队列会存盘, 在服务器重启的时候可以保证不丢失相关信息.

简单修改下上面声明 Queue 的代码, 将 durable 参数设置为 true:

channel.queueDeclare(QUEUE_NAME, true, false, false, null);

此时调用的重载方法如下:

public com.rabbitmq.client.impl.AMQImpl.Queue.DeclareOk queueDeclare(String queue, boolean durable, boolean exclusive, boolean autoDelete, Map<String, Object> arguments) throws IOException {
    validateQueueNameLength(queue);
    return (com.rabbitmq.client.impl.AMQImpl.Queue.DeclareOk)this.exnWrappingRpc((new com.rabbitmq.client.AMQP.Queue.Declare.Builder()).queue(queue).durable(durable).exclusive(exclusive).autoDelete(autoDelete).arguments(arguments).build()).getMethod();
}

运行代码, 然后重启 RabbitMQ 服务, 会发现队列现在不丢失了:

RabbitMQ.png

消息 (Message) 的持久化

虽然现在 RabbitMQ 重启后, Exchange 和 Queue 都不丢失了, 但是存储在 Queue 里的消息却仍然会丢失, 那么如何保证消息不丢失呢? 答案是设置消息的投递模式为 2, 即代表持久化.

修改发送消息的代码为:

// 发送消息
String message = "durable exchange test";
AMQP.BasicProperties props = new AMQP.BasicProperties().builder().deliveryMode(2).build();
channel.basicPublish(EXCHANGE_NAME, "", props, message.getBytes());

调用的重载方法为:

public void basicPublish(String exchange, String routingKey, BasicProperties props, byte[] body) throws IOException {
    this.basicPublish(exchange, routingKey, false, props, body);
}

运行代码, 然后重启 RabbitMQ 服务, 发现此时 Exchange,Queue, 消息都不丢失了:

RabbitMQ.png

RabbitMQ.png

至此, 我们完美的解决了 RabbitMQ 重启后, 消息丢失的问题.

最终的代码如下, 你也可以通过文末的源码链接下载本文用到的所有源码:

package com.zwwhnly.springbootaction.rabbitmq.durable;
import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
public class DurableProducer {
    private final static String EXCHANGE_NAME = "durable-exchange";
    private final static String QUEUE_NAME = "durable-queue";
    public static void main(String[] args) throws IOException, TimeoutException {
        // 创建连接
        ConnectionFactory factory = new ConnectionFactory();
        // 设置 RabbitMQ 的主机名
        factory.setHost("localhost");
        // 创建一个连接
        Connection connection = factory.newConnection();
        // 创建一个通道
        Channel channel = connection.createChannel();
        // 创建一个 Exchange
        channel.exchangeDeclare(EXCHANGE_NAME, "direct", true);
        channel.queueDeclare(QUEUE_NAME, true, false, false, null);
        channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "");
        // 发送消息
        String message = "durable exchange test";
        AMQP.BasicProperties props = new AMQP.BasicProperties().builder().deliveryMode(2).build();
        channel.basicPublish(EXCHANGE_NAME, "", props, message.getBytes());
        // 关闭频道和连接
        channel.close();
        connection.close();
    }
}

注意事项

1)理论上可以将所有的消息都设置为持久化, 但是这样会严重影响 RabbitMQ 的性能. 因为写入磁盘的速度比写入内存的速度慢得不止一点点. 对于可靠性不是那么高的消息可以不采用持久化处理以提高整体的吞吐量. 在选择是否要将消息持久化时, 需要在可靠性和吞吐量之间做一个权衡.

2)将交换器, 队列, 消息都设置了持久化之后仍然不能百分之百保证数据不丢失, 因为当持久化的消息正确存入 RabbitMQ 之后, 还需要一段时间 (虽然很短, 但是不可忽视) 才能存入磁盘之中. 如果在这段时间内 RabbitMQ 服务节点发生了宕机, 重启等异常情况, 消息还没来得及落盘, 那么这些消息将会丢失.

3)单单只设置队列持久化, 重启之后消息会丢失; 单单只设置消息的持久化, 重启之后队列消失, 继而消息也丢失. 单单设置消息持久化而不设置队列的持久化显得毫无意义.

上一篇: 解决RabbitMQ消息丢失问题
下一篇: RabbitMQ的持久化有什么缺点?
推荐文章
  • 在HTML中,如果你想让一个输入框(input元素)不可编辑,你可以通过设置其readonly属性来实现。示例如下:input type="text" value="此处内容不可编辑" readonly在上述代码中,readonly属性使得用户无法修改输入框中的内容。另外,如果你希望输入框完全不可交
  • ASP.NET教程ASP.NET又称为ASP+,基于.NETFramework的Web开发平台,是微软公司推出的新一代脚本语言。ASP.NET是一个使用HTML、CSS、JavaScript和服务器脚本创建网页和网站的开发框架。ASP.NET支持三种不一样的开发模式:WebPages(Web页面)、
  • C# 判断判断结构要求程序员指定一个或多个要评估或测试的条件,以及条件为真时要执行的语句(必需的)和条件为假时要执行的语句(可选的)。下面是大多数编程语言中典型的判断结构的通常形式:判断语句C#提供了以下类型的判断语句。点击链接查看每个语句的细节。语句描述if语句一个 if语句 由一个布尔表达式后跟
  • C#循环有的时候,可能需要多次执行同一块代码。通常情况下,语句是顺序执行的:函数中的第一个语句先执行,接着是第二个语句,依此类推。编程语言提供了允许更为复杂的执行路径的多种控制结构。循环语句允许我们多次执行一个语句或语句组,下面是大多数编程语言中循环语句的通常形式:循环类型C#提供了以下几种循环类型
  • C#数组(Array)数组是一个存储相同类型元素的固定大小的顺序集合。数组是用来存储数据的集合,一般认为数组是一个同一类型变量的集合。声明数组变量并不是声明number0、number1、...、number99一个个单独的变量,而是声明一个就像numbers这样的变量,然后使用numbers[0]
  • ASP.NET是一个由微软公司开发的用于构建Web应用程序的框架,它是.NETFramework的一部分。它提供了一种模型-视图-控制器(MVC)架构、Web表单以及最新的ASP.NETCore中的RazorPages等多种开发模式,可以用来创建动态网页和Web服务。以下是一些基础的ASP.NET编
学习大纲