如何保证MQ消息成功发送,成功消费?死信队列是干嘛的?
2026/6/3 1:15:00 网站建设 项目流程

一、如何保证生产者消息发送出去了?

先说一个关键点:

生产者能保证的是:消息成功发送到 RocketMQ Broker。
生产者不能直接保证:消费者一定已经处理完业务。

也就是说,RocketMQ 是异步模型:

生产者 -> Broker -> 消费者

生产者发完消息后,通常只能知道:

消息有没有成功写入 Broker

而不是:

消费者有没有最终处理完

在之前讲的概念里,Broker 是真正存消息的地方,Producer 发消息实际是发给 Broker,Consumer 再从 Broker 拉消息处理。


1. 生产者怎么确认“消息发出去了”?

RocketMQ 发送消息一般有三种方式:

同步发送 异步发送 单向发送

1)同步发送:最常用,可靠性较高

生产者发送消息后,会等待 RocketMQ 返回结果。

SendResultsendResult=rocketMQTemplate.syncSend("order_topic:create","订单创建成功,订单ID:10001");System.out.println(sendResult.getSendStatus());

如果返回:

SEND_OK

说明消息已经成功发送到 Broker。

你可以理解为:

我把快递交给快递站了,并且快递站给我回执了。

2)异步发送:适合高并发

生产者发出去后不阻塞当前线程,RocketMQ 后面通过回调告诉你成功还是失败。

rocketMQTemplate.asyncSend("order_topic:create","订单创建成功,订单ID:10001",newSendCallback(){@OverridepublicvoidonSuccess(SendResultsendResult){System.out.println("发送成功:"+sendResult);}@OverridepublicvoidonException(Throwablethrowable){System.out.println("发送失败:"+throwable.getMessage());}});

3)单向发送:最快,但不可靠

rocketMQTemplate.sendOneWay("log_topic","用户访问日志");

它不等结果,也不关心成功失败。

适合日志、埋点这种允许丢一点的场景。

不适合订单、支付、优惠券这种重要业务。


二、如何保证消费者已经完全消费消息?

这里要非常注意:

RocketMQ 自己只能保证“消费者成功返回后,认为消息消费成功”。
但你的业务是否真的处理成功,要靠你的代码设计。

消费者代码大概是这样:

@Component@RocketMQMessageListener(topic="order_topic",consumerGroup="stock_consumer_group")publicclassStockConsumerimplementsRocketMQListener<String>{@OverridepublicvoidonMessage(Stringmessage){// 1. 执行业务逻辑,比如扣库存System.out.println("扣库存:"+message);// 2. 如果这里没有抛异常,RocketMQ 就认为消费成功}}

如果onMessage()正常执行结束,RocketMQ 就认为:

这条消息消费成功

如果你抛异常:

@OverridepublicvoidonMessage(Stringmessage){thrownewRuntimeException("扣库存失败");}

RocketMQ 就认为:

这条消息消费失败,需要重试

三、但是“完全消费”到底是什么意思?

这句话要拆开看。

假设订单服务发了一条消息:

用户下单成功

然后有三个消费者组:

stock_group:扣库存 sms_group:发短信 point_group:加积分

那么“完全消费”可能有两种意思。


情况 1:只关心某一个消费者组消费成功

比如你只关心库存系统有没有扣库存成功。

那你可以看:

stock_group 是否消费成功

如果库存消费者正常执行完,没有抛异常,RocketMQ 就认为这个消费者组消费成功。


情况 2:关心所有业务系统都消费成功

比如你想知道:

库存扣了 短信发了 积分加了

这就不是 RocketMQ 自动帮你完成的了。

你需要自己设计一张业务状态表。

比如:

CREATETABLEorder_message_consume_status(idBIGINTPRIMARYKEYAUTO_INCREMENT,order_idBIGINT,message_keyVARCHAR(100),stock_statusTINYINT,sms_statusTINYINT,point_statusTINYINT,create_timeDATETIME,update_timeDATETIME);

每个消费者处理完以后更新自己的状态:

库存系统消费成功 -> stock_status = 1 短信系统消费成功 -> sms_status = 1 积分系统消费成功 -> point_status = 1

最后你判断:

stock_status = 1 sms_status = 1 point_status = 1

才能说:

这条订单消息相关业务都处理完成了。

四、RocketMQ 本身不是用来做“强同步确认”的

你不要把 MQ 理解成这种模式:

生产者发送消息 等待消费者全部处理完成 然后生产者再继续执行

这就失去了 MQ 的意义。

MQ 的核心价值是:

异步 解耦 削峰 最终一致性

所以正常设计是:

生产者只保证消息可靠投递到 Broker 消费者保证自己最终能处理成功 业务通过状态表、重试、幂等来保证最终一致

也就是:

MQ 系统里通常不追求“立刻一致”,而是追求“最终一致”。


五、如何让整个链路更可靠?

一般要做这几件事。


1. 生产者使用同步发送,并检查 SEND_OK

SendResultsendResult=rocketMQTemplate.syncSend("coupon_topic:expire",couponMessage);if(sendResult.getSendStatus()!=SendStatus.SEND_OK){thrownewRuntimeException("优惠券过期消息发送失败");}

这样可以保证生产者知道消息有没有成功交给 Broker。


2. 重要业务使用事务消息

比如:

创建订单成功 必须发送订单创建消息

如果你先插入订单,再发消息,可能出现:

订单插入成功 消息发送失败

这就不一致了。

RocketMQ 的事务消息就是为了解决这种问题。

大概流程是:

1. 先发送半消息 2. 执行本地事务,比如创建订单 3. 本地事务成功,提交消息 4. 本地事务失败,回滚消息

你可以理解为:

订单创建成功,消息才真正对消费者可见。 订单创建失败,消息不会被消费者看到。

3. 消费者处理成功才正常返回

消费者里不要随便吞异常。

错误写法:

@OverridepublicvoidonMessage(Stringmessage){try{// 扣库存stockService.deduct(message);}catch(Exceptione){// 只打印日志,不抛异常e.printStackTrace();}}

这个写法很危险。

因为虽然扣库存失败了,但是方法最后正常结束了。

RocketMQ 会认为:

消费成功

正确写法:

@OverridepublicvoidonMessage(Stringmessage){try{stockService.deduct(message);}catch(Exceptione){thrownewRuntimeException("扣库存失败,等待 RocketMQ 重试",e);}}

这样 RocketMQ 才会重试。


4. 消费者必须做幂等

因为 RocketMQ 可能重复投递消息。

比如同一个订单消息消费了两次:

订单10001扣库存 订单10001扣库存

如果不做幂等,就会多扣库存。

可以用数据库唯一索引防重。

比如:

CREATETABLEconsume_record(idBIGINTPRIMARYKEYAUTO_INCREMENT,message_keyVARCHAR(100)UNIQUE,consumer_groupVARCHAR(100),create_timeDATETIME);

消费前先判断:

@OverridepublicvoidonMessage(OrderMessagemessage){// 1. 判断是否消费过booleanconsumed=consumeRecordService.exists(message.getMessageKey());if(consumed){return;}// 2. 执行业务逻辑stockService.deduct(message.getOrderId());// 3. 记录已消费consumeRecordService.save(message.getMessageKey(),"stock_group");}

这样即使消息重复来了,也不会重复扣库存。


六、死信队列是什么?

死信队列,英文是:

Dead Letter Queue DLQ

它的意思是:

一条消息反复消费失败,超过最大重试次数后,就不会继续正常投递了,而是进入死信队列。

比如:

订单消息 -> 库存消费者

库存消费者一直失败:

第1次失败 第2次失败 第3次失败 ... 第16次失败

超过重试次数后,这条消息就会进入死信队列。


七、为什么需要死信队列?

因为有些消息可能一直处理不了。

例如:

订单ID不存在 数据库字段异常 业务数据格式错误 库存系统代码有 bug 外部接口一直失败

如果 RocketMQ 一直无限重试,会浪费资源,还可能阻塞其他消息。

所以 RocketMQ 会说:

这条消息我已经重试很多次了,还是失败。 我先把它放到死信队列里,后面人工排查。

八、死信队列可以理解成什么?

你可以把它理解成:

问题快递暂存区

普通消息流程:

消息 -> 消费者 -> 消费成功

失败重试流程:

消息 -> 消费者 -> 失败 消息 -> 消费者 -> 失败 消息 -> 消费者 -> 失败

最终:

消息 -> 死信队列

也就是:

正常队列处理不了的异常消息,最后会被隔离到死信队列。

九、死信队列的名字一般长什么样?

RocketMQ 的死信 Topic 一般类似:

%DLQ%consumer_group

比如你的消费者组是:

stock_consumer_group

那么死信队列可能是:

%DLQ%stock_consumer_group

也就是说:

死信队列是按 Consumer Group 维度产生的。

为什么?

因为同一条消息,对不同消费者组来说,消费结果可能不一样。

例如:

order_topic 中有一条订单消息

库存系统失败:

stock_group 消费失败

短信系统成功:

sms_group 消费成功

那么这条消息只会对stock_group进入死信队列,不影响sms_group


十、举个完整例子

订单服务发送消息:

Topic: order_topic Tag: create Body: {"orderId":10001}

库存消费者:

@Component@RocketMQMessageListener(topic="order_topic",consumerGroup="stock_group",selectorExpression="create")publicclassStockConsumerimplementsRocketMQListener<String>{@OverridepublicvoidonMessage(Stringmessage){// 模拟一直失败thrownewRuntimeException("扣库存失败");}}

RocketMQ 会不断重试。

如果一直失败,最终消息进入:

%DLQ%stock_group

然后你可以:

查看死信消息 分析失败原因 修复数据或代码 重新投递消息

十一、你真正应该记住的结论

生产者发送可靠性

同步发送 + 检查 SEND_OK

能保证消息成功到达 Broker。


消费者消费可靠性

业务处理成功 -> 正常返回 业务处理失败 -> 抛异常,让 RocketMQ 重试

防止重复消费

消费者必须做幂等

因为消息可能被重复投递。


死信队列

消息多次消费失败后,会进入死信队列,等待人工排查或重新投递。

十二、一句话总结

RocketMQ 里,生产者只能确认消息是否成功发送到 Broker,不能直接确认所有消费者都处理完成;消费者是否成功,取决于消费方法是否正常返回。为了保证最终可靠,需要同步发送、消费失败抛异常、自动重试、幂等处理、状态表追踪,最后失败的消息会进入死信队列,也就是专门存放“多次消费失败消息”的队列。

需要专业的网站建设服务?

联系我们获取免费的网站建设咨询和方案报价,让我们帮助您实现业务目标

立即咨询