一、如何保证生产者消息发送出去了?
先说一个关键点:
生产者能保证的是:消息成功发送到 RocketMQ Broker。
生产者不能直接保证:消费者一定已经处理完业务。
也就是说,RocketMQ 是异步模型:
生产者 -> Broker -> 消费者生产者发完消息后,通常只能知道:
消息有没有成功写入 Broker而不是:
消费者有没有最终处理完在之前讲的概念里,Broker 是真正存消息的地方,Producer 发消息实际是发给 Broker,Consumer 再从 Broker 拉消息处理。
1. 生产者怎么确认“消息发出去了”?
RocketMQ 发送消息一般有三种方式:
同步发送 异步发送 单向发送1)同步发送:最常用,可靠性较高
生产者发送消息后,会等待 RocketMQ 返回结果。
SendResultsendResult=rocketMQTemplate.syncSend("order_topic:create","订单创建成功,订单ID:10001");System.out.println(sendResult.getSendStatus());如果返回:
SEND_OK说明消息已经成功发送到 Broker。
你可以理解为:
我把快递交给快递站了,并且快递站给我回执了。2)异步发送:适合高并发
生产者发出去后不阻塞当前线程,RocketMQ 后面通过回调告诉你成功还是失败。
rocketMQTemplate.asyncSend("order_topic:create","订单创建成功,订单ID:10001",newSendCallback(){@OverridepublicvoidonSuccess(SendResultsendResult){System.out.println("发送成功:"+sendResult);}@OverridepublicvoidonException(Throwablethrowable){System.out.println("发送失败:"+throwable.getMessage());}});3)单向发送:最快,但不可靠
rocketMQTemplate.sendOneWay("log_topic","用户访问日志");它不等结果,也不关心成功失败。
适合日志、埋点这种允许丢一点的场景。
不适合订单、支付、优惠券这种重要业务。
二、如何保证消费者已经完全消费消息?
这里要非常注意:
RocketMQ 自己只能保证“消费者成功返回后,认为消息消费成功”。
但你的业务是否真的处理成功,要靠你的代码设计。
消费者代码大概是这样:
@Component@RocketMQMessageListener(topic="order_topic",consumerGroup="stock_consumer_group")publicclassStockConsumerimplementsRocketMQListener<String>{@OverridepublicvoidonMessage(Stringmessage){// 1. 执行业务逻辑,比如扣库存System.out.println("扣库存:"+message);// 2. 如果这里没有抛异常,RocketMQ 就认为消费成功}}如果onMessage()正常执行结束,RocketMQ 就认为:
这条消息消费成功如果你抛异常:
@OverridepublicvoidonMessage(Stringmessage){thrownewRuntimeException("扣库存失败");}RocketMQ 就认为:
这条消息消费失败,需要重试三、但是“完全消费”到底是什么意思?
这句话要拆开看。
假设订单服务发了一条消息:
用户下单成功然后有三个消费者组:
stock_group:扣库存 sms_group:发短信 point_group:加积分那么“完全消费”可能有两种意思。
情况 1:只关心某一个消费者组消费成功
比如你只关心库存系统有没有扣库存成功。
那你可以看:
stock_group 是否消费成功如果库存消费者正常执行完,没有抛异常,RocketMQ 就认为这个消费者组消费成功。
情况 2:关心所有业务系统都消费成功
比如你想知道:
库存扣了 短信发了 积分加了这就不是 RocketMQ 自动帮你完成的了。
你需要自己设计一张业务状态表。
比如:
CREATETABLEorder_message_consume_status(idBIGINTPRIMARYKEYAUTO_INCREMENT,order_idBIGINT,message_keyVARCHAR(100),stock_statusTINYINT,sms_statusTINYINT,point_statusTINYINT,create_timeDATETIME,update_timeDATETIME);每个消费者处理完以后更新自己的状态:
库存系统消费成功 -> stock_status = 1 短信系统消费成功 -> sms_status = 1 积分系统消费成功 -> point_status = 1最后你判断:
stock_status = 1 sms_status = 1 point_status = 1才能说:
这条订单消息相关业务都处理完成了。四、RocketMQ 本身不是用来做“强同步确认”的
你不要把 MQ 理解成这种模式:
生产者发送消息 等待消费者全部处理完成 然后生产者再继续执行这就失去了 MQ 的意义。
MQ 的核心价值是:
异步 解耦 削峰 最终一致性所以正常设计是:
生产者只保证消息可靠投递到 Broker 消费者保证自己最终能处理成功 业务通过状态表、重试、幂等来保证最终一致也就是:
MQ 系统里通常不追求“立刻一致”,而是追求“最终一致”。
五、如何让整个链路更可靠?
一般要做这几件事。
1. 生产者使用同步发送,并检查 SEND_OK
SendResultsendResult=rocketMQTemplate.syncSend("coupon_topic:expire",couponMessage);if(sendResult.getSendStatus()!=SendStatus.SEND_OK){thrownewRuntimeException("优惠券过期消息发送失败");}这样可以保证生产者知道消息有没有成功交给 Broker。
2. 重要业务使用事务消息
比如:
创建订单成功 必须发送订单创建消息如果你先插入订单,再发消息,可能出现:
订单插入成功 消息发送失败这就不一致了。
RocketMQ 的事务消息就是为了解决这种问题。
大概流程是:
1. 先发送半消息 2. 执行本地事务,比如创建订单 3. 本地事务成功,提交消息 4. 本地事务失败,回滚消息你可以理解为:
订单创建成功,消息才真正对消费者可见。 订单创建失败,消息不会被消费者看到。3. 消费者处理成功才正常返回
消费者里不要随便吞异常。
错误写法:
@OverridepublicvoidonMessage(Stringmessage){try{// 扣库存stockService.deduct(message);}catch(Exceptione){// 只打印日志,不抛异常e.printStackTrace();}}这个写法很危险。
因为虽然扣库存失败了,但是方法最后正常结束了。
RocketMQ 会认为:
消费成功正确写法:
@OverridepublicvoidonMessage(Stringmessage){try{stockService.deduct(message);}catch(Exceptione){thrownewRuntimeException("扣库存失败,等待 RocketMQ 重试",e);}}这样 RocketMQ 才会重试。
4. 消费者必须做幂等
因为 RocketMQ 可能重复投递消息。
比如同一个订单消息消费了两次:
订单10001扣库存 订单10001扣库存如果不做幂等,就会多扣库存。
可以用数据库唯一索引防重。
比如:
CREATETABLEconsume_record(idBIGINTPRIMARYKEYAUTO_INCREMENT,message_keyVARCHAR(100)UNIQUE,consumer_groupVARCHAR(100),create_timeDATETIME);消费前先判断:
@OverridepublicvoidonMessage(OrderMessagemessage){// 1. 判断是否消费过booleanconsumed=consumeRecordService.exists(message.getMessageKey());if(consumed){return;}// 2. 执行业务逻辑stockService.deduct(message.getOrderId());// 3. 记录已消费consumeRecordService.save(message.getMessageKey(),"stock_group");}这样即使消息重复来了,也不会重复扣库存。
六、死信队列是什么?
死信队列,英文是:
Dead Letter Queue DLQ它的意思是:
一条消息反复消费失败,超过最大重试次数后,就不会继续正常投递了,而是进入死信队列。
比如:
订单消息 -> 库存消费者库存消费者一直失败:
第1次失败 第2次失败 第3次失败 ... 第16次失败超过重试次数后,这条消息就会进入死信队列。
七、为什么需要死信队列?
因为有些消息可能一直处理不了。
例如:
订单ID不存在 数据库字段异常 业务数据格式错误 库存系统代码有 bug 外部接口一直失败如果 RocketMQ 一直无限重试,会浪费资源,还可能阻塞其他消息。
所以 RocketMQ 会说:
这条消息我已经重试很多次了,还是失败。 我先把它放到死信队列里,后面人工排查。八、死信队列可以理解成什么?
你可以把它理解成:
问题快递暂存区普通消息流程:
消息 -> 消费者 -> 消费成功失败重试流程:
消息 -> 消费者 -> 失败 消息 -> 消费者 -> 失败 消息 -> 消费者 -> 失败最终:
消息 -> 死信队列也就是:
正常队列处理不了的异常消息,最后会被隔离到死信队列。九、死信队列的名字一般长什么样?
RocketMQ 的死信 Topic 一般类似:
%DLQ%consumer_group比如你的消费者组是:
stock_consumer_group那么死信队列可能是:
%DLQ%stock_consumer_group也就是说:
死信队列是按 Consumer Group 维度产生的。
为什么?
因为同一条消息,对不同消费者组来说,消费结果可能不一样。
例如:
order_topic 中有一条订单消息库存系统失败:
stock_group 消费失败短信系统成功:
sms_group 消费成功那么这条消息只会对stock_group进入死信队列,不影响sms_group。
十、举个完整例子
订单服务发送消息:
Topic: order_topic Tag: create Body: {"orderId":10001}库存消费者:
@Component@RocketMQMessageListener(topic="order_topic",consumerGroup="stock_group",selectorExpression="create")publicclassStockConsumerimplementsRocketMQListener<String>{@OverridepublicvoidonMessage(Stringmessage){// 模拟一直失败thrownewRuntimeException("扣库存失败");}}RocketMQ 会不断重试。
如果一直失败,最终消息进入:
%DLQ%stock_group然后你可以:
查看死信消息 分析失败原因 修复数据或代码 重新投递消息十一、你真正应该记住的结论
生产者发送可靠性
同步发送 + 检查 SEND_OK能保证消息成功到达 Broker。
消费者消费可靠性
业务处理成功 -> 正常返回 业务处理失败 -> 抛异常,让 RocketMQ 重试防止重复消费
消费者必须做幂等因为消息可能被重复投递。
死信队列
消息多次消费失败后,会进入死信队列,等待人工排查或重新投递。十二、一句话总结
RocketMQ 里,生产者只能确认消息是否成功发送到 Broker,不能直接确认所有消费者都处理完成;消费者是否成功,取决于消费方法是否正常返回。为了保证最终可靠,需要同步发送、消费失败抛异常、自动重试、幂等处理、状态表追踪,最后失败的消息会进入死信队列,也就是专门存放“多次消费失败消息”的队列。