从零开始:Java原生连接RabbitMQ完整流程(个人学习笔记001)
2026/5/21 20:02:15 网站建设 项目流程

@TOC)

该文章仅用于个人复习与记录,如有错误,烦请指出,非常感谢

RabbitMQ 是一款开源的消息中间件(也称为消息队列),其核心作用是让不同的系统、服务或组件之间能够异步地传递数据。使用该中间件可以很好地实现业务异步处理、削峰填谷、服务解耦以及数据同步。

本文主要讲解如何使用原生 Java 集成 RabbitMQ。由于主播目前仍处于学习阶段,本文不涉及深层原理,仅说明基本的使用方法。

在进行学习和测试之前,需要先通过 Maven 导入 RabbitMQ 的依赖,并使用 Docker 拉取 RabbitMQ 的镜像,以确保 Java 程序能够与 RabbitMQ 正常连接。

Maven 依赖

xml

<dependency> <groupId>com.rabbitmq</groupId> <artifactId>amqp-client</artifactId> <version>5.30.0</version> </dependency>

Docker 命令

bash

# 拉取镜像 docker pull rabbitmq:management # 创建并运行容器 docker run -d --name rabbitmq -p 5672:5672 rabbitmq:management

接下来,就可以愉快地学习 RabbitMQ 了。


一、基本消息队列(点对点模式)

RabbitMQ 中的组件并不多,在本模式中只讲解以下几种:生产者、消费者、队列。

  • 生产者:消息的源头,负责发送消息。
  • 消费者:消息的终点,负责接收并消费(处理)消息。
  • 队列:消息实际存储的容器,等待消费者将其取走。

下面先给出一段最简单的生产者和消费者代码,然后分段解释其作用。

生产者代码

java

public class Send { private final static String QUEUE_NAME = "hello"; public static void main(String[] args) throws IOException, TimeoutException { ConnectionFactory factory = new ConnectionFactory(); factory.setHost("localhost"); try (Connection connection = factory.newConnection()) { Channel channel = connection.createChannel(); // 指定队列类型 Map<String, Object> map = Map.of("x-queue-type", "quorum"); channel.queueDeclare(QUEUE_NAME, true, false, false, map); String message = "Hello,World!"; // 发送消息 channel.basicPublish("", QUEUE_NAME, null, message.getBytes()); System.out.println("发送消息成功:'" + message + "'"); } } }

代码分段解释

java

ConnectionFactory factory = new ConnectionFactory(); factory.setHost("localhost");
  • ConnectionFactory是用于创建 Java 程序与 RabbitMQ 之间连接的工厂类。
  • setHost()方法用于指定 RabbitMQ 服务器的地址,使 Java 程序能够连接到 RabbitMQ。此外还可以设置用户名、密码等其他属性。

java

try (Connection connection = factory.newConnection()) { Channel channel = connection.createChannel(); // 指定队列类型 Map<String, Object> map = Map.of("x-queue-type", "quorum"); channel.queueDeclare(QUEUE_NAME, true, false, false, map); String message = "Hello,World!"; // 发送消息 channel.basicPublish("", QUEUE_NAME, null, message.getBytes()); System.out.println("发送消息成功:'" + message + "'"); }
  • 这里使用的是try-with-resources语法,而非传统的try-catch。当退出try代码块时(无论以何种方式),Java 程序与 RabbitMQ 的连接会自动关闭。
  • Connection代表 Java 程序与 RabbitMQ 之间的连接,由factory直接创建。
  • Channel是在Connection中开辟的通道,一个连接中可以创建多个通道,每个通道中也可以创建多个队列。
  • Map.of()是一个静态工厂方法,用于创建包含指定键值对的Map。这里用于指定队列的类型。队列类型分为classicquorum,RabbitMQ 官方推荐使用quorum。如果创建队列时不指定类型,则默认创建classic队列。
  • queueDeclare()方法用于在通道中声明一个队列,其参数含义如下:
参数顺序含义说明
1队列名称队列的唯一标识
2是否持久化开启后,RabbitMQ 重启后队列不会消失
3是否独占开启后,队列只能在当前连接中使用
4是否自动删除开启后,当队列中的最后一个消费者取消订阅时,队列会被自动删除
5扩展参数用于配置队列的高级特性

queueDeclare()的幂等性:

  • 如果队列已存在且参数一致,则什么也不做。
  • 如果队列已存在但参数不一致,则会抛出异常。
  • channel.basicPublish()方法用于将消息发布到指定的队列中,其参数含义如下:
参数顺序含义说明
1交换机名称空字符串代表使用默认的无名交换机
2队列名称或路由键消息要传递的目的地,此处传入队列名称
3消息属性例如持久化、优先级、过期时间等
4消息体消息内容,通常以字节数组形式传递

生产者端的流程总结:

创建连接工厂 → 建立连接 → 创建通道 → 声明队列 → 发布消息


消费者代码

java

public class Recv { private final static String QUEUE_NAME = "hello"; public static void main(String[] args) throws IOException, TimeoutException { ConnectionFactory factory = new ConnectionFactory(); factory.setHost("localhost"); Connection connection = factory.newConnection(); Channel channel = connection.createChannel(); // 指定队列类型 Map<String, Object> map = Map.of("x-queue-type", "quorum"); channel.queueDeclare(QUEUE_NAME, true, false, false, map); System.out.println(" [*] Waiting for messages. To exit press CTRL+C"); DeliverCallback deliverCallback = (consumerTag, delivery) -> { String message = new String(delivery.getBody(), "UTF-8"); System.out.println("[x] Received '" + message + "'"); }; boolean autoAck = true; channel.basicConsume(QUEUE_NAME, autoAck, deliverCallback, consumerTag -> {}); } }

消费者代码解释

  • 第 14 行之前的代码与生产者部分相同。

  • DeliverCallback是一个回调函数,当消费者收到消息时会自动调用该函数,执行 Lambda 表达式中的逻辑:

    • consumerTag:RabbitMQ 为当前消费者生成的唯一标识符。
    • delivery:封装了消息所有信息的对象,包含消息体,但它本身不是消息体。
    • getBody():从delivery对象中取出消息体(字节数组)。
  • basicConsume()方法用于消费队列中的消息,其参数含义如下:

参数顺序含义说明
1队列名称从哪个队列消费消息
2是否自动确认autoAcktrue时,RabbitMQ 在发送消息后立即将其标记为已确认,无论消费者是否处理成功。生产环境不建议开启,否则消息可能丢失
3消息回调收到消息后执行的回调函数
4取消回调消费者被取消时执行的回调

消费者端的流程总结:

等待消息 → 从队列中取出消息 → 自动确认(如果启用)→ 消费消息

至此,最简单的点对点(生产者-队列-消费者)模式就介绍完了。


二、工作消息队列(点对多模式)

问题:在生产环境中,点对点的单生产者-单消费者模式性能过低,需要改为单生产者对应多个消费者以提升性能。

解决方案:代码与点对点模式相同,只需同时启动多个消费者即可。RabbitMQ 会以轮询的方式平均向各个消费者发送消息,从而提高消息的消费效率。

消息持久化

要实现消息持久化,需要同时开启队列持久化消息持久化。由于涉及磁盘 I/O 操作,持久化模式的性能相对较低。

队列持久化:

java

boolean durable = true; Map<String, Object> args = Map.of("x-queue-type", "quorum"); channel.queueDeclare("hello", durable, false, false, args);

消息持久化:

java

channel.basicPublish("", "task_queue", MessageProperties.PERSISTENT_TEXT_PLAIN, // 持久化标志 message.getBytes());

公平分发

问题:轮询分发不够公平,正在忙碌的消费者仍然会收到新消息。

解决方案:使用basicQos方法,并将prefetchCount设置为 1。

java

int prefetchCount = 1; channel.basicQos(prefetchCount);

这行代码的作用是:告知 RabbitMQ 每次只向一个消费者发送不超过一条消息。在消费者处理并确认前一条消息之前,RabbitMQ 不会向其发送新消息,而是将新消息派发给下一个空闲的消费者。

简单理解:“我一次只处理 1 条消息,没处理完别给我发新的。”

公平分发模式相比轮询模式效率更高。


三、交换机

在实际生产环境中,简单的点对点和点对多模式无法满足多样化的业务需求。交换机可以根据不同的类型或匹配规则,将消息发送到对应的队列,再由队列转发给消费者,从而适应复杂的业务逻辑。

交换机有以下几种可用类型:directtopicheadersfanout,以及默认的无名交换机

无名交换机(默认交换机)

java

channel.basicPublish("", QUEUE_NAME, null, message.getBytes());

在前面的示例中,我们并没有显式使用交换机,但仍然能够向队列发送消息,这正是无名交换机发力了。basicPublish()方法的第一个参数(空字符串)就代表无名交换机。当使用无名交换机时,消息会被转发到与第二个参数名称匹配的队列。

fanout 交换机

首先创建一个fanout类型的交换机,命名为ex

由于生产者不关心消息具体发往哪个队列,因此在生产者代码中只需要声明交换机即可,无需声明队列。

java

channel.exchangeDeclare("ex", "fanout");

fanout交换机的功能非常简单:它会将接收到的消息广播给所有与之绑定的队列。这种模式常用于搭建日志系统。

有了交换机之后,如何让交换机把消息转发到指定的队列呢?这就需要用到绑定(Binding)

java

channel.queueBind(queueName, "ex", "");

通过绑定,ex交换机会将消息转发到queueName队列中。第三个参数(空字符串)是路由键,将在其他类型的交换机中详细讲解。

使用交换机发送消息

java

channel.basicPublish("ex", "", null, message.getBytes());

参数说明:

  1. 交换机名称:ex
  2. 路由规则:空字符串(fanout类型忽略路由键)
  3. 消息属性:null
  4. 消息体:消息内容

可以看到,与仅使用队列时相比,发送消息的代码有以下变化:

  • 参数 1 从无名交换机(空字符串)变成了我们自己声明的fanout交换机。
  • 参数 2 从队列名变成了路由规则。
使用交换机接收并消费消息

java

// 创建交换机(fanout类型) channel.exchangeDeclare("ex", "fanout"); // 让 RabbitMQ 自动生成一个随机的队列,并获取这个队列的名字 String queueName = channel.queueDeclare().getQueue(); // 绑定交换机与队列(fanout交换机,routingKey为空字符串) channel.queueBind(queueName, "ex", ""); int prefetchCount = 1; channel.basicQos(prefetchCount); DeliverCallback deliverCallback = (String consumerTag, Delivery delivery) -> { String message = new String(delivery.getBody(), "UTF-8"); System.out.println("[x] Received '" + message + "'"); }; boolean autoAck = true; channel.basicConsume(queueName, autoAck, deliverCallback, consumerTag -> {});

消费者代码的变动主要体现在:

  • 创建交换机
  • 绑定交换机与队列

需要注意的是,消费者仍然需要声明队列(这里使用的是 RabbitMQ 自动生成的随机队列),因为消费者只能从队列中取出消息进行消费。

Direct 交换机

在实际生产环境中,可能需要根据消息的重要程度进行不同的转发处理,这时就需要用到direct交换机。

direct交换机会将消息转发给路由键完全匹配的队列,然后由对应的消费者取出消息进行消费。

java

channel.queueBind(queueName, "ex", "user"); channel.basicPublish("ex", "user", null, message.getBytes());

在这段代码中:

  • 队列以"user"作为路由键绑定到交换机。
  • 生产者发布消息时指定的路由键也是"user"
  • 交换机将消息的路由键与队列的路由键进行匹配,匹配成功后将消息转发给对应的队列。
  • 如果多个队列绑定了同一个路由键,交换机会将消息转发给所有这些队列。

Topic 交换机

问题:在生产环境中,有时希望一个队列能够同时监听不同路由的消息。

解决方案:使用topic交换机。topic交换机支持通配符,可以实现模糊匹配的路由规则。

通配符说明:

  • *(星号):可恰好替换一个单词。
  • #(井号):可替代零个或多个单词。

示例场景:假设单词按顺序表示"速度.颜色.动物物种",则上图中的绑定关系可以总结为:

  • Q1:监听所有橙色动物的消息。
  • Q2:监听所有兔子的消息,以及所有懒动物的消息。

通过topic交换机,一个队列可以监听多个不同路由的消息,从而应对复杂的业务场景。

headers 交换机

headers交换机依靠哈希表和字典匹配的方式进行路由,性能比directtopic交换机低很多,因此本文不作展开。


四、其他功能(待补充)

发布者确认

问题:生产者发送消息后,无法确认消息是否成功到达 RabbitMQ。

解决方案:使用发布者确认机制。当消息到达 RabbitMQ 时,RabbitMQ 会向生产者返回一条确认消息;如果未到达,则抛出异常。

单条消息确认:

java

// 发送消息 String message = "hello"; channel.basicPublish("", QUEUE_NAME, null, message.getBytes()); // 等待确认(5秒超时) // 如果没有抛出异常,说明消息已到达 RabbitMQ channel.waitForConfirmsOrDie(5_000);

批量消息确认:

java

// 开启确认模式 channel.confirmSelect(); // 批量确认参数 int batchSize = 100; int count = 0; // 发送 1000 条消息,每 100 条确认一次 for (int i = 0; i < 1000; i++) { String message = "msg" + i; channel.basicPublish("", QUEUE_NAME, null, message.getBytes()); count++; if (count == batchSize) { // 等待这一批消息确认(5秒超时) channel.waitForConfirmsOrDie(5_000); System.out.println("Confirmed " + batchSize + " messages (up to msg" + i + ")"); count = 0; } } // 确认最后一批不足 batchSize 的消息 if (count > 0) { channel.waitForConfirmsOrDie(5_000); System.out.println("Confirmed last " + count + " messages"); }

waitForConfirmsOrDie()方法会等待从上一次确认点到当前确认点之间的所有消息被确认,通过这种方式实现批量消息确认。

手动确认机制

前面介绍basicConsume()方法时提到过autoAck(自动确认)参数。在生产环境中不建议开启自动确认,而是使用手动确认机制。

手动确认只需一行代码:

java

channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);

这行代码的作用是告诉 RabbitMQ 服务器:“这条消息我已经处理完了,你可以将它从队列中删除了。”

使用手动确认机制后,如果消费者在处理消息时发生错误,队列中的消息依然存在,消费者可以重新尝试消费,从而避免消息丢失。


感谢浏览!

需要专业的网站建设服务?

联系我们获取免费的网站建设咨询和方案报价,让我们帮助您实现业务目标

立即咨询