WinUtil终极指南:一站式Windows系统优化与软件管理工具
2026/6/6 5:00:50
与Kafka类似,RocketMQ也支持基于队列(分区)的顺序消费机制。具体表现为:同一队列内的消息保证有序,而不同队列间的消息则是无序的。
实现顺序消息发送时,生产者需在send方法中传入MessageQueueSelector。该接口的select方法用于确定消息投递的目标队列,常见实现方式是采用取模路由策略:
SendResult sendResult = producer.send(msg, new MessageQueueSelector() { @Override public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) { Integer id = (Integer) arg; int index = id % mqs.size(); return mqs.get(index); } }, orderId);注意必须使用同步发送方式确保顺序性。
消费者端通过MessageListenerOrderly模式实现顺序消费:
consumer.registerMessageListener(new MessageListenerOrderly() { @Override public ConsumeOrderlyStatus consumeMessage(List<MessageExt> msgs, ConsumeOrderlyContext context) { System.out.printf("Receive order msg:" + new String(msgs.get(0).getBody())); return ConsumeOrderlyStatus.SUCCESS; } });顺序消费通过三级加锁机制保障:
扩展说明: 第三把锁主要应对消费者集群重平衡场景。当队列需要重新分配时,该锁确保正在处理的消息能完成消费并提交位点,避免新消费者重复消费。若不加此锁,可能导致位点未提交的消息被重复处理。
需注意:顺序消费会降低系统吞吐量,且存在消息阻塞传递效应,应谨慎使用。