前言 用来学习MQ使用的 demo场景
本文环境:CentOS7/8、JDK8、RocketMQ 5.1.4(ACL1.0稳定版,支持任意时间戳定时消息)
包含:服务器离线安装、JVM内存调优、开启ACL账号密码、启动/停止/重启命令、防火墙放行端口、SpringBoot整合带密码连接、生产者发送定时消息、消费者消费并调用业务接口,适配课程定时推送场景。
一、服务器前置准备
1. 环境要求
- JDK 1.8+ 已配置环境变量
- 服务器开放端口:
9876(NameServer)、10911(Broker)
# 防火墙放行端口firewall-cmd --add-port=9876/tcp--permanentfirewall-cmd --add-port=10911/tcp--permanentfirewall-cmd--reload# 查看端口是否放行firewall-cmd --list-ports2. 创建安装目录
mkdir-p/usr/local/rocketmqcd/usr/local/rocketmq二、下载&解压RocketMQ
1. 下载二进制包
官网下载地址:https://rocketmq.apache.org/dowloading/releases
选择rocketmq-all-5.1.4-bin-release.zip
上传到服务器/usr/local/rocketmq目录
2. 解压
unziprocketmq-all-5.1.4-bin-release.zipmvrocketmq-all-5.1.4-bin-release rocketmq5.1.4cdrocketmq5.1.4# 定义环境变量(临时生效,永久写入/etc/profile)exportROCKETMQ_HOME=/usr/local/rocketmq/rocketmq5.1.4三、修改JVM内存(低配服务器必改,否则启动OOM)
1. 修改NameServer内存
vimbin/runserver.sh# 找到JAVA_OPT,修改Xms XmxJAVA_OPT="${JAVA_OPT}-Xms256m -Xmx256m -Xmn128m"2. 修改Broker内存
vimbin/runbroker.shJAVA_OPT="${JAVA_OPT}-Xms256m -Xmx256m -Xmn128m"四、开启ACL密码鉴权(设置账号密码)
步骤1:开启Broker ACL开关
vimconf/broker.conf# 文件末尾追加aclEnable=true# 允许自动创建Topic(测试环境)autoCreateTopicEnable=true步骤2:配置账号、密码、权限plain_acl.yml
vimconf/plain_acl.yml完整配置(业务账号仅允许发送/消费课程推送Topic):
accounts:# 管理员账号,运维使用-accessKey:mq_adminsecretKey:Admin@2026#RmqwhiteRemoteAddress:127.0.0.1,服务器内网IPadmin:true# 业务应用账号(Java项目连接使用,用户名密码)-accessKey:course_push_appsecretKey:Push@666888whiteRemoteAddress:0.0.0.0# 所有IP可访问,生产填业务服务IPadmin:falsedefaultTopicPerm:DENYdefaultGroupPerm:DENY# 仅允许操作课程推送Topic:发布PUB 订阅SUBtopicPerms:-"course_push_topic=PUB|SUB"groupPerms:-"course-push-consumer-group=SUB"globalWhiteRemoteAddresses:-127.0.0.1- accessKey = 用户名
- secretKey = 密码
修改完成保存,重启Broker生效
五、RocketMQ 启动/停止/重启完整命令
进入mq根目录执行:cd /usr/local/rocketmq/rocketmq5.1.4
1. 启动顺序:先NameServer,后Broker
启动NameServer
# 后台启动,输出日志到nohup.outnohupshbin/mqnamesrv&# 查看启动日志,出现 boot success 代表成功tail-f~/logs/rocketmqlogs/namesrv.log启动Broker(绑定本机IP)
# 替换为你的服务器公网/内网IPnohupshbin/mqbroker-n127.0.0.1:9876&# 查看Broker日志tail-f~/logs/rocketmqlogs/broker.log2. 停止服务(顺序先Broker,后NameServer)
# 停止Brokershbin/mqshutdown broker# 停止NameServershbin/mqshutdown namesrv3. 重启流程
# 1.停止shbin/mqshutdown brokershbin/mqshutdown namesrv# 2.等待5秒sleep5# 3.重新启动nohupshbin/mqnamesrv&sleep3nohupshbin/mqbroker-n127.0.0.1:9876&4. 验证服务连通(内置工具测试)
exportNAMESRV_ADDR=127.0.0.1:9876# 发送测试消息(未开ACL可用,开启ACL会报错,需代码带AK/SK)shbin/tools.sh org.apache.rocketmq.example.quickstart.Producershbin/tools.sh org.apache.rocketmq.example.quickstart.Consumer六、SpringBoot Java Demo(带ACL密码,定时消息生产+消费)
1. Maven依赖 pom.xml
<?xml version="1.0" encoding="UTF-8"?><projectxmlns="http://maven.apache.org/POM/4.0.0"xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"><modelVersion>4.0.0</modelVersion><parent><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-parent</artifactId><version>2.7.15</version><relativePath/></parent><groupId>com.demo</groupId><artifactId>rocketmq-acl-demo</artifactId><version>1.0.0</version><properties><maven.compiler.source>8</maven.compiler.source><maven.compiler.target>8</maven.compiler.target></properties><dependencies><!-- SpringBoot Web --><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifactId></dependency><!-- RocketMQ Starter 带ACL支持 --><dependency><groupId>org.apache.rocketmq</groupId><artifactId>rocketmq-spring-boot-starter</artifactId><version>2.2.3</version></dependency><!-- Redis 幂等防重复推送 --><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-data-redis</artifactId></dependency><!-- JSON序列化 --><dependency><groupId>com.alibaba.fastjson2</groupId><artifactId>fastjson2</artifactId><version>2.0.32</version></dependency></dependencies></project>2. application.yml 配置(核心:配置mq地址+ACL账号密码)
spring:application:name:rocketmq-course-pushredis:host:127.0.0.1port:6379# mq 配置rocketmq:# 服务器 NameServer 地址,替换为你的服务器IPname-server:127.0.0.1:9876producer:group:course-push-producer-group# ACL账号密码(plain_acl.yml配置的业务账号)access-key:course_push_appsecret-key:Push@666888send-message-timeout:5000consumer:access-key:course_push_appsecret-key:Push@6668883. 消息实体类 CoursePushMsg.java
packagecom.demo.entity;importlombok.Data;@DatapublicclassCoursePushMsg{privateLongcourseId;privateStringtitle;privateStringcontent;privateLongendTimeStamp;// 课程结束时间戳(毫秒)}4. 生产者服务:发送定时消息 CoursePushProducer.java
packagecom.demo.service;importcom.alibaba.fastjson2.JSON;importcom.demo.entity.CoursePushMsg;importorg.apache.rocketmq.client.producer.SendMessageWithTimeStampRequest;importorg.apache.rocketmq.spring.core.RocketMQTemplate;importorg.springframework.beans.factory.annotation.Autowired;importorg.springframework.stereotype.Service;importjava.nio.charset.StandardCharsets;@ServicepublicclassCoursePushProducer{@AutowiredprivateRocketMQTemplaterocketMQTemplate;privatestaticfinalStringTOPIC="course_push_topic";/** * 发送课程结束定时推送消息(指定精确到期时间戳) * @param msg 课程消息实体 */publicvoidsendCourseTimedMsg(CoursePushMsgmsg){byte[]body=JSON.toJSONString(msg).getBytes(StandardCharsets.UTF_8);SendMessageWithTimeStampRequestrequest=SendMessageWithTimeStampRequest.builder().topic(TOPIC).body(body).deliveryTimeStamp(msg.getEndTimeStamp()).build();try{rocketMQTemplate.syncSendWithTimestamp(request);System.out.println("定时消息发送成功,课程ID:"+msg.getCourseId());}catch(Exceptione){e.printStackTrace();thrownewRuntimeException("发送MQ定时消息失败");}}}5. 模拟推送接口客户端 PushApiClient.java
packagecom.demo.client;importorg.springframework.stereotype.Component;importorg.springframework.web.client.RestTemplate;importjavax.annotation.Resource;@ComponentpublicclassPushApiClient{@ResourceprivateRestTemplaterestTemplate;/** * 业务推送接口:根据课程ID批量推送学员消息 */publicvoidpushNotice(LongcourseId){// 替换为你自己的推送服务接口地址Stringurl="http://127.0.0.1:8080/api/push/send?courseId="+courseId;Stringresp=restTemplate.postForObject(url,null,String.class);System.out.println("调用推送接口完成,返回:"+resp);}}6. 消费者:监听消息、校验课程、调用推送接口 CoursePushConsumer.java
packagecom.demo.consumer;importcom.alibaba.fastjson2.JSON;importcom.demo.client.PushApiClient;importcom.demo.entity.CoursePushMsg;importorg.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;importorg.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;importorg.apache.rocketmq.common.message.MessageExt;importorg.apache.rocketmq.spring.annotation.RocketMQMessageListener;importorg.springframework.beans.factory.annotation.Autowired;importorg.springframework.data.redis.core.RedisTemplate;importorg.springframework.stereotype.Component;importjava.util.List;importjava.util.concurrent.TimeUnit;@Component@RocketMQMessageListener(topic="course_push_topic",consumerGroup="course-push-consumer-group")publicclassCoursePushConsumerimplementsMessageListenerConcurrently{@AutowiredprivatePushApiClientpushApiClient;@AutowiredprivateRedisTemplate<String,String>redisTemplate;@OverridepublicConsumeConcurrentlyStatusconsumeMessage(List<MessageExt>msgs,org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContextcontext){for(MessageExtmsg:msgs){try{StringbodyStr=newString(msg.getBody());CoursePushMsgmsgData=JSON.parseObject(bodyStr,CoursePushMsg.class);LongcourseId=msgData.getCourseId();longmsgScheduleTs=msg.getDeliveryTimestamp();longrealEndTs=msgData.getEndTimeStamp();// 1. 过滤旧消息(修改课程结束时间产生的过期定时消息)if(Math.abs(msgScheduleTs-realEndTs)>60*1000){System.out.println("旧定时消息,丢弃 courseId:"+courseId);returnConsumeConcurrentlyStatus.CONSUME_SUCCESS;}// 2. Redis幂等,防止重复推送StringredisKey="push:course:"+courseId;if(Boolean.TRUE.equals(redisTemplate.hasKey(redisKey))){System.out.println("该课程已推送,跳过");returnConsumeConcurrentlyStatus.CONSUME_SUCCESS;}// 3. 核心逻辑:调用业务推送接口pushApiClient.pushNotice(courseId);// 4. 成功后写入幂等标记,7天过期redisTemplate.opsForValue().set(redisKey,"1",7,TimeUnit.DAYS);}catch(Exceptione){e.printStackTrace();// 消费异常,MQ自动重试returnConsumeConcurrentlyStatus.RECONSUME_LATER;}}returnConsumeConcurrentlyStatus.CONSUME_SUCCESS;}}7. 测试接口 Controller TestMqController.java
packagecom.demo.controller;importcom.demo.entity.CoursePushMsg;importcom.demo.service.CoursePushProducer;importorg.springframework.beans.factory.annotation.Autowired;importorg.springframework.web.bind.annotation.GetMapping;importorg.springframework.web.bind.annotation.RequestParam;importorg.springframework.web.bind.annotation.RestController;importjava.time.LocalDateTime;importjava.time.ZoneOffset;importjava.time.format.DateTimeFormatter;@RestControllerpublicclassTestMqController{@AutowiredprivateCoursePushProducerproducer;/** * 测试发送定时推送消息 * 访问示例:http://127.0.0.1:8080/send/push?courseId=1001&endTime=2026-06-25 18:30:00 */@GetMapping("/send/push")publicStringsendPushMsg(@RequestParamLongcourseId,@RequestParamStringendTime){DateTimeFormatterformatter=DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss");LocalDateTimelocalDateTime=LocalDateTime.parse(endTime,formatter);longtimeStamp=localDateTime.toInstant(ZoneOffset.of("+8")).toEpochMilli();CoursePushMsgmsg=newCoursePushMsg();msg.setCourseId(courseId);msg.setTitle("课程已结束提醒");msg.setContent("本节课学习完成,请完成课后作业");msg.setEndTimeStamp(timeStamp);producer.sendCourseTimedMsg(msg);return"定时消息提交成功,到期自动推送学员";}}8. 启动类 RocketMqApplication.java
packagecom.demo;importorg.springframework.boot.SpringApplication;importorg.springframework.boot.autoconfigure.SpringBootApplication;importorg.springframework.context.annotation.Bean;importorg.springframework.web.client.RestTemplate;@SpringBootApplicationpublicclassRocketMqApplication{publicstaticvoidmain(String[]args){SpringApplication.run(RocketMqApplication.class,args);}@BeanpublicRestTemplaterestTemplate(){returnnewRestTemplate();}}七、业务场景说明(课程修改结束时间解决方案)
- 新增课程:保存课程endTime到数据库,调用接口发送定时消息
- 修改课程结束时间:更新数据库endTime,重新调用发送接口生成新定时消息;旧消息到期消费时,时间不匹配直接丢弃,不会重复推送
- 取消课程推送:数据库增加pushStatus=2标记,消费时查询状态直接跳过推送逻辑
八、常见踩坑问题
- 连接报错ACL权限不足
- 检查yml的access-key、secret-key和plain_acl.yml完全一致
- 确认topicPerms配置了PUB|SUB权限
- 定时消息不生效
- RocketMQ版本必须5.x以上才支持
syncSendWithTimestamp任意时间戳定时
- RocketMQ版本必须5.x以上才支持
- 服务器外网无法连接
- 防火墙放行9876、10911端口
- broker启动时绑定公网IP:
nohup sh bin/mqbroker -n 服务器公网IP:9876 &
- 重复推送
- Redis幂等标记 + 数据库推送状态双重兜底
- 启动内存不足
- 修改runserver.sh、runbroker.sh的Xms/Xmx降低内存
九、总结
- 生产环境必须开启ACL设置账号密码,禁止裸奔无鉴权
- RocketMQ定时消息无法删除,采用「数据库存真实时间,消费过滤旧消息」方案适配课程修改时间场景
- SpringBoot starter自动携带ACL鉴权信息,无需手动编写RPC钩子,开发极简
- 启停顺序:先NameServer后Broker;停止顺序相反,重启前必须完整关闭进程