Linux 服务器 RocketMQ 5.1.4 安装 + ACL 密码鉴权 + 启停脚本 + SpringBoot Java 生产消费 Demo
2026/6/26 8:25:16 网站建设 项目流程

前言 用来学习MQ使用的 demo场景

本文环境:CentOS7/8、JDK8、RocketMQ 5.1.4(ACL1.0稳定版,支持任意时间戳定时消息)
包含:服务器离线安装、JVM内存调优、开启ACL账号密码、启动/停止/重启命令、防火墙放行端口、SpringBoot整合带密码连接、生产者发送定时消息、消费者消费并调用业务接口,适配课程定时推送场景。

一、服务器前置准备

1. 环境要求

  1. JDK 1.8+ 已配置环境变量
  2. 服务器开放端口:9876(NameServer)10911(Broker)
# 防火墙放行端口firewall-cmd --add-port=9876/tcp--permanentfirewall-cmd --add-port=10911/tcp--permanentfirewall-cmd--reload# 查看端口是否放行firewall-cmd --list-ports

2. 创建安装目录

mkdir-p/usr/local/rocketmqcd/usr/local/rocketmq

二、下载&解压RocketMQ

1. 下载二进制包

官网下载地址:https://rocketmq.apache.org/dowloading/releases
选择rocketmq-all-5.1.4-bin-release.zip
上传到服务器/usr/local/rocketmq目录

2. 解压

unziprocketmq-all-5.1.4-bin-release.zipmvrocketmq-all-5.1.4-bin-release rocketmq5.1.4cdrocketmq5.1.4# 定义环境变量(临时生效,永久写入/etc/profile)exportROCKETMQ_HOME=/usr/local/rocketmq/rocketmq5.1.4

三、修改JVM内存(低配服务器必改,否则启动OOM)

1. 修改NameServer内存

vimbin/runserver.sh# 找到JAVA_OPT,修改Xms XmxJAVA_OPT="${JAVA_OPT}-Xms256m -Xmx256m -Xmn128m"

2. 修改Broker内存

vimbin/runbroker.shJAVA_OPT="${JAVA_OPT}-Xms256m -Xmx256m -Xmn128m"

四、开启ACL密码鉴权(设置账号密码)

步骤1:开启Broker ACL开关

vimconf/broker.conf# 文件末尾追加aclEnable=true# 允许自动创建Topic(测试环境)autoCreateTopicEnable=true

步骤2:配置账号、密码、权限plain_acl.yml

vimconf/plain_acl.yml

完整配置(业务账号仅允许发送/消费课程推送Topic):

accounts:# 管理员账号,运维使用-accessKey:mq_adminsecretKey:Admin@2026#RmqwhiteRemoteAddress:127.0.0.1,服务器内网IPadmin:true# 业务应用账号(Java项目连接使用,用户名密码)-accessKey:course_push_appsecretKey:Push@666888whiteRemoteAddress:0.0.0.0# 所有IP可访问,生产填业务服务IPadmin:falsedefaultTopicPerm:DENYdefaultGroupPerm:DENY# 仅允许操作课程推送Topic:发布PUB 订阅SUBtopicPerms:-"course_push_topic=PUB|SUB"groupPerms:-"course-push-consumer-group=SUB"globalWhiteRemoteAddresses:-127.0.0.1
  • accessKey = 用户名
  • secretKey = 密码
    修改完成保存,重启Broker生效

五、RocketMQ 启动/停止/重启完整命令

进入mq根目录执行:cd /usr/local/rocketmq/rocketmq5.1.4

1. 启动顺序:先NameServer,后Broker

启动NameServer
# 后台启动,输出日志到nohup.outnohupshbin/mqnamesrv&# 查看启动日志,出现 boot success 代表成功tail-f~/logs/rocketmqlogs/namesrv.log
启动Broker(绑定本机IP)
# 替换为你的服务器公网/内网IPnohupshbin/mqbroker-n127.0.0.1:9876&# 查看Broker日志tail-f~/logs/rocketmqlogs/broker.log

2. 停止服务(顺序先Broker,后NameServer)

# 停止Brokershbin/mqshutdown broker# 停止NameServershbin/mqshutdown namesrv

3. 重启流程

# 1.停止shbin/mqshutdown brokershbin/mqshutdown namesrv# 2.等待5秒sleep5# 3.重新启动nohupshbin/mqnamesrv&sleep3nohupshbin/mqbroker-n127.0.0.1:9876&

4. 验证服务连通(内置工具测试)

exportNAMESRV_ADDR=127.0.0.1:9876# 发送测试消息(未开ACL可用,开启ACL会报错,需代码带AK/SK)shbin/tools.sh org.apache.rocketmq.example.quickstart.Producershbin/tools.sh org.apache.rocketmq.example.quickstart.Consumer

六、SpringBoot Java Demo(带ACL密码,定时消息生产+消费)

1. Maven依赖 pom.xml

<?xml version="1.0" encoding="UTF-8"?><projectxmlns="http://maven.apache.org/POM/4.0.0"xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"><modelVersion>4.0.0</modelVersion><parent><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-parent</artifactId><version>2.7.15</version><relativePath/></parent><groupId>com.demo</groupId><artifactId>rocketmq-acl-demo</artifactId><version>1.0.0</version><properties><maven.compiler.source>8</maven.compiler.source><maven.compiler.target>8</maven.compiler.target></properties><dependencies><!-- SpringBoot Web --><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifactId></dependency><!-- RocketMQ Starter 带ACL支持 --><dependency><groupId>org.apache.rocketmq</groupId><artifactId>rocketmq-spring-boot-starter</artifactId><version>2.2.3</version></dependency><!-- Redis 幂等防重复推送 --><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-data-redis</artifactId></dependency><!-- JSON序列化 --><dependency><groupId>com.alibaba.fastjson2</groupId><artifactId>fastjson2</artifactId><version>2.0.32</version></dependency></dependencies></project>

2. application.yml 配置(核心:配置mq地址+ACL账号密码)

spring:application:name:rocketmq-course-pushredis:host:127.0.0.1port:6379# mq 配置rocketmq:# 服务器 NameServer 地址,替换为你的服务器IPname-server:127.0.0.1:9876producer:group:course-push-producer-group# ACL账号密码(plain_acl.yml配置的业务账号)access-key:course_push_appsecret-key:Push@666888send-message-timeout:5000consumer:access-key:course_push_appsecret-key:Push@666888

3. 消息实体类 CoursePushMsg.java

packagecom.demo.entity;importlombok.Data;@DatapublicclassCoursePushMsg{privateLongcourseId;privateStringtitle;privateStringcontent;privateLongendTimeStamp;// 课程结束时间戳(毫秒)}

4. 生产者服务:发送定时消息 CoursePushProducer.java

packagecom.demo.service;importcom.alibaba.fastjson2.JSON;importcom.demo.entity.CoursePushMsg;importorg.apache.rocketmq.client.producer.SendMessageWithTimeStampRequest;importorg.apache.rocketmq.spring.core.RocketMQTemplate;importorg.springframework.beans.factory.annotation.Autowired;importorg.springframework.stereotype.Service;importjava.nio.charset.StandardCharsets;@ServicepublicclassCoursePushProducer{@AutowiredprivateRocketMQTemplaterocketMQTemplate;privatestaticfinalStringTOPIC="course_push_topic";/** * 发送课程结束定时推送消息(指定精确到期时间戳) * @param msg 课程消息实体 */publicvoidsendCourseTimedMsg(CoursePushMsgmsg){byte[]body=JSON.toJSONString(msg).getBytes(StandardCharsets.UTF_8);SendMessageWithTimeStampRequestrequest=SendMessageWithTimeStampRequest.builder().topic(TOPIC).body(body).deliveryTimeStamp(msg.getEndTimeStamp()).build();try{rocketMQTemplate.syncSendWithTimestamp(request);System.out.println("定时消息发送成功,课程ID:"+msg.getCourseId());}catch(Exceptione){e.printStackTrace();thrownewRuntimeException("发送MQ定时消息失败");}}}

5. 模拟推送接口客户端 PushApiClient.java

packagecom.demo.client;importorg.springframework.stereotype.Component;importorg.springframework.web.client.RestTemplate;importjavax.annotation.Resource;@ComponentpublicclassPushApiClient{@ResourceprivateRestTemplaterestTemplate;/** * 业务推送接口:根据课程ID批量推送学员消息 */publicvoidpushNotice(LongcourseId){// 替换为你自己的推送服务接口地址Stringurl="http://127.0.0.1:8080/api/push/send?courseId="+courseId;Stringresp=restTemplate.postForObject(url,null,String.class);System.out.println("调用推送接口完成,返回:"+resp);}}

6. 消费者:监听消息、校验课程、调用推送接口 CoursePushConsumer.java

packagecom.demo.consumer;importcom.alibaba.fastjson2.JSON;importcom.demo.client.PushApiClient;importcom.demo.entity.CoursePushMsg;importorg.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;importorg.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;importorg.apache.rocketmq.common.message.MessageExt;importorg.apache.rocketmq.spring.annotation.RocketMQMessageListener;importorg.springframework.beans.factory.annotation.Autowired;importorg.springframework.data.redis.core.RedisTemplate;importorg.springframework.stereotype.Component;importjava.util.List;importjava.util.concurrent.TimeUnit;@Component@RocketMQMessageListener(topic="course_push_topic",consumerGroup="course-push-consumer-group")publicclassCoursePushConsumerimplementsMessageListenerConcurrently{@AutowiredprivatePushApiClientpushApiClient;@AutowiredprivateRedisTemplate<String,String>redisTemplate;@OverridepublicConsumeConcurrentlyStatusconsumeMessage(List<MessageExt>msgs,org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContextcontext){for(MessageExtmsg:msgs){try{StringbodyStr=newString(msg.getBody());CoursePushMsgmsgData=JSON.parseObject(bodyStr,CoursePushMsg.class);LongcourseId=msgData.getCourseId();longmsgScheduleTs=msg.getDeliveryTimestamp();longrealEndTs=msgData.getEndTimeStamp();// 1. 过滤旧消息(修改课程结束时间产生的过期定时消息)if(Math.abs(msgScheduleTs-realEndTs)>60*1000){System.out.println("旧定时消息,丢弃 courseId:"+courseId);returnConsumeConcurrentlyStatus.CONSUME_SUCCESS;}// 2. Redis幂等,防止重复推送StringredisKey="push:course:"+courseId;if(Boolean.TRUE.equals(redisTemplate.hasKey(redisKey))){System.out.println("该课程已推送,跳过");returnConsumeConcurrentlyStatus.CONSUME_SUCCESS;}// 3. 核心逻辑:调用业务推送接口pushApiClient.pushNotice(courseId);// 4. 成功后写入幂等标记,7天过期redisTemplate.opsForValue().set(redisKey,"1",7,TimeUnit.DAYS);}catch(Exceptione){e.printStackTrace();// 消费异常,MQ自动重试returnConsumeConcurrentlyStatus.RECONSUME_LATER;}}returnConsumeConcurrentlyStatus.CONSUME_SUCCESS;}}

7. 测试接口 Controller TestMqController.java

packagecom.demo.controller;importcom.demo.entity.CoursePushMsg;importcom.demo.service.CoursePushProducer;importorg.springframework.beans.factory.annotation.Autowired;importorg.springframework.web.bind.annotation.GetMapping;importorg.springframework.web.bind.annotation.RequestParam;importorg.springframework.web.bind.annotation.RestController;importjava.time.LocalDateTime;importjava.time.ZoneOffset;importjava.time.format.DateTimeFormatter;@RestControllerpublicclassTestMqController{@AutowiredprivateCoursePushProducerproducer;/** * 测试发送定时推送消息 * 访问示例:http://127.0.0.1:8080/send/push?courseId=1001&endTime=2026-06-25 18:30:00 */@GetMapping("/send/push")publicStringsendPushMsg(@RequestParamLongcourseId,@RequestParamStringendTime){DateTimeFormatterformatter=DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss");LocalDateTimelocalDateTime=LocalDateTime.parse(endTime,formatter);longtimeStamp=localDateTime.toInstant(ZoneOffset.of("+8")).toEpochMilli();CoursePushMsgmsg=newCoursePushMsg();msg.setCourseId(courseId);msg.setTitle("课程已结束提醒");msg.setContent("本节课学习完成,请完成课后作业");msg.setEndTimeStamp(timeStamp);producer.sendCourseTimedMsg(msg);return"定时消息提交成功,到期自动推送学员";}}

8. 启动类 RocketMqApplication.java

packagecom.demo;importorg.springframework.boot.SpringApplication;importorg.springframework.boot.autoconfigure.SpringBootApplication;importorg.springframework.context.annotation.Bean;importorg.springframework.web.client.RestTemplate;@SpringBootApplicationpublicclassRocketMqApplication{publicstaticvoidmain(String[]args){SpringApplication.run(RocketMqApplication.class,args);}@BeanpublicRestTemplaterestTemplate(){returnnewRestTemplate();}}

七、业务场景说明(课程修改结束时间解决方案)

  1. 新增课程:保存课程endTime到数据库,调用接口发送定时消息
  2. 修改课程结束时间:更新数据库endTime,重新调用发送接口生成新定时消息;旧消息到期消费时,时间不匹配直接丢弃,不会重复推送
  3. 取消课程推送:数据库增加pushStatus=2标记,消费时查询状态直接跳过推送逻辑

八、常见踩坑问题

  1. 连接报错ACL权限不足
    • 检查yml的access-key、secret-key和plain_acl.yml完全一致
    • 确认topicPerms配置了PUB|SUB权限
  2. 定时消息不生效
    • RocketMQ版本必须5.x以上才支持syncSendWithTimestamp任意时间戳定时
  3. 服务器外网无法连接
    • 防火墙放行9876、10911端口
    • broker启动时绑定公网IP:nohup sh bin/mqbroker -n 服务器公网IP:9876 &
  4. 重复推送
    • Redis幂等标记 + 数据库推送状态双重兜底
  5. 启动内存不足
    • 修改runserver.sh、runbroker.sh的Xms/Xmx降低内存

九、总结

  1. 生产环境必须开启ACL设置账号密码,禁止裸奔无鉴权
  2. RocketMQ定时消息无法删除,采用「数据库存真实时间,消费过滤旧消息」方案适配课程修改时间场景
  3. SpringBoot starter自动携带ACL鉴权信息,无需手动编写RPC钩子,开发极简
  4. 启停顺序:先NameServer后Broker;停止顺序相反,重启前必须完整关闭进程

需要专业的网站建设服务?

联系我们获取免费的网站建设咨询和方案报价,让我们帮助您实现业务目标

立即咨询