Unity打包Linux服务器应用踩坑记:从发布到后台稳定运行(含Systemd服务配置)
2026/5/25 8:37:42
前置条件:ZooKeeper 集群 + Kafka 集群已启动(3个ZK节点 + 3个Broker) Broker 地址:172.17.0.7:9092, 172.17.0.7:9093, 172.17.0.7:9094
目的:理解底层原理,Spring Boot 只是对这层的封装
<dependencies> <!-- Kafka 客户端(原生Java API) --> <dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka-clients</artifactId> <version>3.9.0</version> </dependency> <!-- 日志(可选) --> <dependency> <groupId>org.slf4j</groupId> <artifactId>slf4j-simple</artifactId> <version>1.7.36</version> </dependency> </dependencies>import org.apache.kafka.clients.producer.*; import java.util.Properties; public class SimpleProducer { public static void main(String[] args) throws Exception { // 1. 配置生产者 Properties props = new Properties(); props.put("bootstrap.servers", "172.17.0.7:9092,172.17.0.7:9093,172.17.0.7:9094"); props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer"); // 2. 创建生产者 KafkaProducer<String, String> producer = new KafkaProducer<>(props); // 3. 发送消息 for (int i = 0; i < 10; i++) { ProducerRecord<String, String> record = new ProducerRecord<>("test-topic", "key-" + i, "hello kafka " + i); producer.send(record); System.out.println("发送消息: hello kafka " + i); } // 4. 关闭(会自动flush) producer.close(); } }// ① 发后不管(Fire and Forget)—— 性能最高,可能丢消息 producer.send(record); // ② 同步发送 —— 等待结果,性能最低,最可靠 try { RecordMetadata metadata = producer.send(record).get(); // .get() 阻塞等待 System.out.printf("发送成功 → Topic:%s Partition:%d Offset:%d%n", metadata.topic(), metadata.partition(), metadata.offset()); } catch (Exception e) { System.out.println("发送失败: " + e.getMessage()); } // ③ 异步发送(推荐)—— 性能高,有回调处理结果 producer.send(record, (metadata, exception) -> { if (exception == null) { System.out.printf("发送成功 → Partition:%d Offset:%d%n", metadata.partition(), metadata.offset()); } else { System.out.println("发送失败: " + exception.getMessage()); } });Properties props = new Properties(); props.put("bootstrap.servers", "172.17.0.7:9092,172.17.0.7:9093,172.17.0.7:9094"); props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer"); // ★ acks:消息可靠性核心配置 // "0" → 发完不管,性能最高,可能丢消息 // "1" → Leader写成功就返回,Leader挂了可能丢 // "all"→ 所有ISR副本写成功才返回,最可靠(推荐生产环境) props.put("acks", "all"); // 重试次数(遇到可重试错误时) props.put("retries", 3); // 批量发送:消息积累到16KB一起发(提升吞吐量) props.put("batch.size", 16384); // 等待时间:即使不到16KB,等1ms也发出去(减少延迟) props.put("linger.ms", 1); // 缓冲区大小:生产者本地缓存32MB,满了send()会阻塞 props.put("buffer.memory", 33554432); // 幂等性:开启后自动去重,防止重试导致的消息重复 props.put("enable.idempotence", true);// 情况1:指定分区 → 直接发到该分区 new ProducerRecord<>("test-topic", 0, "key", "value"); // 发到分区0 // 情况2:指定key → 对key做hash,相同key一定进同一分区(保证有序) new ProducerRecord<>("test-topic", "user-001", "登录消息"); new ProducerRecord<>("test-topic", "user-001", "下单消息"); // 上面两条消息一定在同一分区,保证user-001的消息有序 // 情况3:不指定key → 轮询分配到各分区(负载均衡) new ProducerRecord<>("test-topic", "hello");import org.apache.kafka.clients.consumer.*; import java.time.Duration; import java.util.*; public class SimpleConsumer { public static void main(String[] args) { // 1. 配置消费者 Properties props = new Properties(); props.put("bootstrap.servers", "172.17.0.7:9092,172.17.0.7:9093,172.17.0.7:9094"); props.put("group.id", "my-consumer-group"); // 消费者组ID(重要!) props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); // 2. 创建消费者 KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props); // 3. 订阅 Topic consumer.subscribe(Collections.singletonList("test-topic")); // 4. 持续拉取消息 while (true) { // poll():向Broker拉取消息,最多等待1秒 ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1000)); for (ConsumerRecord<String, String> record : records) { System.out.printf("收到消息 → Topic:%s Partition:%d Offset:%d Key:%s Value:%s%n", record.topic(), record.partition(), record.offset(), record.key(), record.value()); } } } }Topic: test-topic(3个分区) 消费者组A(3个消费者): 消费者组B(1个消费者): Consumer1 → Partition0 Consumer1 → Partition0 Consumer2 → Partition1 → Partition1 Consumer3 → Partition2 → Partition2 结论: - 同一组内:每条消息只被一个消费者消费(负载均衡) - 不同组间:每条消息都会被每个组各消费一次(广播) - 消费者数 > 分区数:多余的消费者空闲,等待接管// ① 自动提交(默认)—— 简单,但可能重复消费或丢失 props.put("enable.auto.commit", "true"); props.put("auto.commit.interval.ms", "5000"); // 每5秒自动提交一次 // ② 手动同步提交 —— 处理完消息再提交,更可靠 props.put("enable.auto.commit", "false"); while (true) { ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1000)); for (ConsumerRecord<String, String> record : records) { // 处理消息... System.out.println("处理消息: " + record.value()); } consumer.commitSync(); // 处理完一批再提交(阻塞等待) } // ③ 手动异步提交 —— 性能更好(推荐) consumer.commitAsync((offsets, exception) -> { if (exception != null) { System.out.println("提交失败: " + exception.getMessage()); } });// 从哪里开始消费(当消费者组第一次启动,没有记录offset时) // "earliest" → 从最早的消息开始(--from-beginning) // "latest" → 只消费启动后的新消息(默认) props.put("auto.offset.reset", "earliest"); // 一次poll最多拉取多少条 props.put("max.poll.records", 500); // poll间隔超过这个时间,broker认为消费者挂了,触发Rebalance props.put("max.poll.interval.ms", 300000); // 5分钟 // 心跳间隔(消费者定期向broker报活) props.put("heartbeat.interval.ms", 3000); // 超过这个时间没心跳,认为消费者挂了 props.put("session.timeout.ms", 30000);什么是Rebalance: 消费者组内成员发生变化时,Kafka重新分配分区给消费者的过程 触发时机: 1. 新消费者加入组 2. 消费者离开组(正常关闭或崩溃) 3. Topic分区数变化 Rebalance过程(STW): 所有消费者停止消费 → GroupCoordinator重新分配 → 消费者恢复消费 ↑ 这段时间不能消费!所以Rebalance要尽量避免频繁发生 避免不必要Rebalance的方法: - 合理设置 session.timeout.ms 和 heartbeat.interval.ms - 处理消息不要太慢,避免超过 max.poll.interval.ms<parent> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-parent</artifactId> <version>3.2.0</version> </parent> <dependencies> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter</artifactId> </dependency> <!-- Spring Kafka(包含了kafka-clients) --> <dependency> <groupId>org.springframework.kafka</groupId> <artifactId>spring-kafka</artifactId> </dependency> <!-- 用于对象序列化 --> <dependency> <groupId>com.fasterxml.jackson.core</groupId> <artifactId>jackson-databind</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-test</artifactId> <scope>test</scope> </dependency> <dependency> <groupId>org.springframework.kafka</groupId> <artifactId>spring-kafka-test</artifactId> <scope>test</scope> </dependency> </dependencies>spring: kafka: # Broker 地址列表 bootstrap-servers: 172.17.0.7:9092,172.17.0.7:9093,172.17.0.7:9094 # 生产者配置 producer: key-serializer: org.apache.kafka.common.serialization.StringSerializer value-serializer: org.apache.kafka.common.serialization.StringSerializer acks: all # 最高可靠性 retries: 3 batch-size: 16384 linger-ms: 1 buffer-memory: 33554432 # 如果发送的是对象,改用 JsonSerializer: # value-serializer: org.springframework.kafka.support.serializer.JsonSerializer # 消费者配置 consumer: group-id: my-spring-group # 消费者组ID auto-offset-reset: earliest # 第一次启动从最早消息开始 enable-auto-commit: false # 关闭自动提交(让Spring管理) key-deserializer: org.apache.kafka.common.serialization.StringDeserializer value-deserializer: org.apache.kafka.common.serialization.StringDeserializer max-poll-records: 100 # 如果消费的是对象,改用 JsonDeserializer: # value-deserializer: org.springframework.kafka.support.serializer.JsonDeserializer # properties: # spring.json.trusted.packages: "*" # 监听器配置 listener: ack-mode: manual_immediate # 手动提交offset concurrency: 3 # 3个消费者线程(建议 = 分区数) type: batch # batch=批量消费,single=逐条消费import org.springframework.kafka.core.KafkaTemplate; import org.springframework.kafka.support.SendResult; import org.springframework.stereotype.Service; import java.util.concurrent.CompletableFuture; @Service public class KafkaProducerService { // Spring 自动注入,无需手动创建 private final KafkaTemplate<String, String> kafkaTemplate; public KafkaProducerService(KafkaTemplate<String, String> kafkaTemplate) { this.kafkaTemplate = kafkaTemplate; } // ① 最简单:只发消息内容 public void sendMessage(String message) { kafkaTemplate.send("test-topic", message); } // ② 带 key(相同key进同一分区,保证有序) public void sendWithKey(String key, String message) { kafkaTemplate.send("test-topic", key, message); } // ③ 指定分区 public void sendToPartition(int partition, String key, String message) { kafkaTemplate.send("test-topic", partition, key, message); } // ④ 异步回调(推荐) public void sendWithCallback(String message) { CompletableFuture<SendResult<String, String>> future = kafkaTemplate.send("test-topic", message); future.whenComplete((result, ex) -> { if (ex == null) { System.out.printf("发送成功 → Partition:%d Offset:%d%n", result.getRecordMetadata().partition(), result.getRecordMetadata().offset()); } else { System.out.println("发送失败: " + ex.getMessage()); } }); } // ⑤ 同步发送(等待结果) public void sendSync(String message) throws Exception { SendResult<String, String> result = kafkaTemplate.send("test-topic", message).get(); System.out.printf("同步发送成功 → Partition:%d Offset:%d%n", result.getRecordMetadata().partition(), result.getRecordMetadata().offset()); } }// 实体类 public class OrderEvent { private String orderId; private String userId; private Double amount; private String status; // getter/setter/构造方法省略 }# application.yml 中改为 JsonSerializer spring: kafka: producer: value-serializer: org.springframework.kafka.support.serializer.JsonSerializer// 注入对象类型的 KafkaTemplate @Service public class OrderProducerService { private final KafkaTemplate<String, OrderEvent> kafkaTemplate; public OrderProducerService(KafkaTemplate<String, OrderEvent> kafkaTemplate) { this.kafkaTemplate = kafkaTemplate; } public void sendOrder(OrderEvent order) { // 用 orderId 作为 key,保证同一订单的消息有序 kafkaTemplate.send("order-topic", order.getOrderId(), order); System.out.println("发送订单事件: " + order.getOrderId()); } }import org.apache.kafka.clients.consumer.ConsumerRecord; import org.springframework.kafka.annotation.KafkaListener; import org.springframework.kafka.support.Acknowledgment; import org.springframework.stereotype.Component; @Component public class KafkaConsumerService { // 最简单的监听 @KafkaListener(topics = "test-topic", groupId = "my-spring-group") public void listen(String message) { System.out.println("收到消息: " + message); } // 获取消息元数据(partition、offset等) @KafkaListener(topics = "test-topic", groupId = "my-spring-group") public void listenWithMeta(ConsumerRecord<String, String> record) { System.out.printf("收到消息 → Partition:%d Offset:%d Key:%s Value:%s%n", record.partition(), record.offset(), record.key(), record.value()); } // 手动提交 offset(需要 yml 中配置 ack-mode: manual_immediate) @KafkaListener(topics = "test-topic", groupId = "my-spring-group") public void listenWithAck(ConsumerRecord<String, String> record, Acknowledgment ack) { try { System.out.println("处理消息: " + record.value()); // 处理成功后手动提交 ack.acknowledge(); } catch (Exception e) { // 处理失败不提交,下次重新消费 System.out.println("处理失败,不提交offset: " + e.getMessage()); } } }# yml 中配置 spring: kafka: listener: type: batch # 开启批量模式@KafkaListener(topics = "test-topic", groupId = "my-spring-group") public void batchListen(List<ConsumerRecord<String, String>> records, Acknowledgment ack) { System.out.println("批量收到 " + records.size() + " 条消息"); for (ConsumerRecord<String, String> record : records) { System.out.printf("Partition:%d Offset:%d Value:%s%n", record.partition(), record.offset(), record.value()); // 处理每条消息... } // 整批处理完后提交一次(比逐条提交性能好很多) ack.acknowledge(); }spring: kafka: consumer: value-deserializer: org.springframework.kafka.support.serializer.JsonDeserializer properties: spring.json.trusted.packages: "com.yourpackage.dto" # 信任的包名 spring.json.value.default.type: "com.yourpackage.dto.OrderEvent"@KafkaListener(topics = "order-topic", groupId = "order-group") public void handleOrder(ConsumerRecord<String, OrderEvent> record, Acknowledgment ack) { OrderEvent order = record.value(); System.out.printf("收到订单 → ID:%s 用户:%s 金额:%.2f%n", order.getOrderId(), order.getUserId(), order.getAmount()); // 处理订单业务逻辑... ack.acknowledge(); }// 监听多个 Topic @KafkaListener(topics = {"topic1", "topic2"}, groupId = "my-group") public void listenMultipleTopics(ConsumerRecord<String, String> record) { System.out.println("来自 " + record.topic() + ": " + record.value()); } // 指定消费某个分区(从offset=0开始) @KafkaListener( groupId = "my-group", topicPartitions = { @TopicPartition(topic = "test-topic", partitionOffsets = { @PartitionOffset(partition = "0", initialOffset = "0"), @PartitionOffset(partition = "1", initialOffset = "0") }) } ) public void listenSpecificPartition(ConsumerRecord<String, String> record) { System.out.println("分区" + record.partition() + ": " + record.value()); }// 生产者:发送时携带 Header import org.apache.kafka.common.header.internals.RecordHeader; import org.springframework.messaging.Message; import org.springframework.messaging.support.MessageBuilder; public void sendWithHeader(String message) { Message<String> msg = MessageBuilder .withPayload(message) .setHeader("source", "order-service") .setHeader("version", "v1") .setHeader(KafkaHeaders.TOPIC, "test-topic") .build(); kafkaTemplate.send(msg); } // 消费者:读取 Header @KafkaListener(topics = "test-topic") public void listenWithHeader( ConsumerRecord<String, String> record, @Header(value = "source", required = false) String source) { System.out.println("来源: " + source + " 消息: " + record.value()); }import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory; import org.springframework.kafka.listener.DefaultErrorHandler; import org.springframework.util.backoff.FixedBackOff; @Configuration public class KafkaConfig { @Bean public DefaultErrorHandler errorHandler() { // 重试2次,每次间隔1秒 FixedBackOff backOff = new FixedBackOff(1000L, 2); DefaultErrorHandler handler = new DefaultErrorHandler(backOff); // 某些异常不重试,直接跳过 handler.addNotRetryableExceptions(IllegalArgumentException.class); return handler; } @Bean public ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory( ConsumerFactory<String, String> consumerFactory, DefaultErrorHandler errorHandler) { ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>(); factory.setConsumerFactory(consumerFactory); factory.setCommonErrorHandler(errorHandler); return factory; } }// 重试多次后仍然失败的消息,自动发到死信Topic(原Topic名+".DLT") import org.springframework.kafka.listener.DeadLetterPublishingRecoverer; @Bean public DefaultErrorHandler errorHandler(KafkaTemplate<Object, Object> template) { // 失败消息发送到死信队列 DeadLetterPublishingRecoverer recoverer = new DeadLetterPublishingRecoverer(template); // 重试3次后进死信队列 FixedBackOff backOff = new FixedBackOff(1000L, 3); return new DefaultErrorHandler(recoverer, backOff); } // 消费死信队列 @KafkaListener(topics = "test-topic.DLT", groupId = "dlt-group") public void handleDeadLetter(ConsumerRecord<String, String> record) { System.out.println("死信消息(需人工处理): " + record.value()); // 记录日志、告警、人工干预... }spring: kafka: producer: transaction-id-prefix: tx- # 开启事务@Service public class TransactionalProducerService { private final KafkaTemplate<String, String> kafkaTemplate; // @Transactional 注解:方法内所有消息要么全发成功,要么全回滚 @Transactional public void sendTransactional() { kafkaTemplate.send("topic1", "消息1"); kafkaTemplate.send("topic2", "消息2"); // 如果这里抛异常,上面两条消息都不会发出去 if (Math.random() > 0.5) { throw new RuntimeException("模拟业务异常"); } kafkaTemplate.send("topic3", "消息3"); } }import org.apache.kafka.clients.admin.NewTopic; import org.springframework.kafka.config.TopicBuilder; @Configuration public class KafkaTopicConfig { // Spring Boot 启动时自动创建(如果不存在) @Bean public NewTopic orderTopic() { return TopicBuilder.name("order-topic") .partitions(3) .replicas(3) .build(); } @Bean public NewTopic userTopic() { return TopicBuilder.name("user-topic") .partitions(3) .replicas(2) .build(); } }src/main/java/com/example/kafka/ ├── KafkaApplication.java ├── config/ │ ├── KafkaTopicConfig.java ← Topic 配置 │ └── KafkaConsumerConfig.java ← 消费者工厂配置 ├── dto/ │ └── OrderEvent.java ← 消息实体 ├── producer/ │ └── OrderProducer.java ← 生产者 └── consumer/ ├── OrderConsumer.java ← 正常消费者 └── OrderDltConsumer.java ← 死信队列消费者package com.example.kafka.dto; public class OrderEvent { private String orderId; private String userId; private Double amount; private String status; // CREATED / PAID / SHIPPED / COMPLETED public OrderEvent() {} public OrderEvent(String orderId, String userId, Double amount, String status) { this.orderId = orderId; this.userId = userId; this.amount = amount; this.status = status; } // getter & setter public String getOrderId() { return orderId; } public void setOrderId(String orderId) { this.orderId = orderId; } public String getUserId() { return userId; } public void setUserId(String userId) { this.userId = userId; } public Double getAmount() { return amount; } public void setAmount(Double amount) { this.amount = amount; } public String getStatus() { return status; } public void setStatus(String status) { this.status = status; } @Override public String toString() { return String.format("OrderEvent{orderId='%s', userId='%s', amount=%.2f, status='%s'}", orderId, userId, amount, status); } }package com.example.kafka.config; import org.apache.kafka.clients.admin.NewTopic; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.kafka.config.TopicBuilder; @Configuration public class KafkaTopicConfig { @Bean public NewTopic orderTopic() { return TopicBuilder.name("order-topic") .partitions(3) .replicas(3) .build(); } }package com.example.kafka.producer; import com.example.kafka.dto.OrderEvent; import org.springframework.kafka.core.KafkaTemplate; import org.springframework.stereotype.Service; @Service public class OrderProducer { private final KafkaTemplate<String, OrderEvent> kafkaTemplate; public OrderProducer(KafkaTemplate<String, OrderEvent> kafkaTemplate) { this.kafkaTemplate = kafkaTemplate; } public void sendOrder(OrderEvent order) { kafkaTemplate.send("order-topic", order.getOrderId(), order) .whenComplete((result, ex) -> { if (ex == null) { System.out.printf("[Producer] 订单发送成功 → %s Partition:%d%n", order.getOrderId(), result.getRecordMetadata().partition()); } else { System.out.println("[Producer] 发送失败: " + ex.getMessage()); } }); } }package com.example.kafka.consumer; import com.example.kafka.dto.OrderEvent; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.springframework.kafka.annotation.KafkaListener; import org.springframework.kafka.support.Acknowledgment; import org.springframework.stereotype.Component; @Component public class OrderConsumer { @KafkaListener( topics = "order-topic", groupId = "order-service-group", containerFactory = "kafkaListenerContainerFactory" ) public void handleOrder(ConsumerRecord<String, OrderEvent> record, Acknowledgment ack) { OrderEvent order = record.value(); System.out.printf("[Consumer] 收到订单 Partition:%d Offset:%d → %s%n", record.partition(), record.offset(), order); try { // 模拟业务处理 processOrder(order); ack.acknowledge(); // 处理成功,提交offset } catch (Exception e) { // 不提交offset,交给重试机制处理 System.out.println("[Consumer] 处理失败: " + e.getMessage()); throw e; // 抛出让 ErrorHandler 处理重试 } } private void processOrder(OrderEvent order) { System.out.println("[Consumer] 处理订单业务逻辑: " + order.getOrderId()); // 实际业务:更新数据库、调用其他服务等 } }package com.example.kafka; import com.example.kafka.dto.OrderEvent; import com.example.kafka.producer.OrderProducer; import org.springframework.boot.CommandLineRunner; import org.springframework.boot.SpringApplication; import org.springframework.boot.autoconfigure.SpringBootApplication; import org.springframework.context.annotation.Bean; @SpringBootApplication public class KafkaApplication { public static void main(String[] args) { SpringApplication.run(KafkaApplication.class, args); } // 启动后自动发送测试消息 @Bean public CommandLineRunner runner(OrderProducer producer) { return args -> { Thread.sleep(2000); // 等消费者就绪 for (int i = 1; i <= 5; i++) { OrderEvent order = new OrderEvent( "ORDER-" + i, "USER-" + (i % 3 + 1), 100.0 * i, "CREATED" ); producer.sendOrder(order); Thread.sleep(500); } }; } }| 功能 | 原生 Java API | Spring Boot |
|---|---|---|
| 配置 | 手动写 Properties | application.yml 声明式配置 |
| 生产者 | KafkaProducer+ 手动管理 | KafkaTemplate自动注入 |
| 消费者 | while(true)手动轮询 | @KafkaListener注解驱动 |
| offset提交 | 完全手动 | 自动/手动可选(ack-mode) |
| 错误处理 | 自己写try/catch+重试逻辑 | DefaultErrorHandler开箱即用 |
| 事务 | 手动beginTransaction | @Transactional注解 |
| Topic创建 | 调用AdminClient | @Bean NewTopic自动创建 |
| 多线程消费 | 自己管理线程池 | concurrency参数一行搞定 |
第1天:原生Java API ├── SimpleProducer 发送10条消息 ├── SimpleConsumer 消费并打印 └── 理解 Partition/Offset/ConsumerGroup 第2天:Spring Boot 基础 ├── 搭项目,配 yml ├── KafkaTemplate 发消息 └── @KafkaListener 收消息 第3天:Spring Boot 进阶 ├── 发送 JSON 对象 ├── 手动提交 offset └── 批量消费 第4天:高级特性 ├── 错误处理 + 重试 ├── 死信队列 └── 事务消息 第5天:实战演练 ├── 完成订单消息系统 ├── 模拟Broker宕机观察行为 └── 监控消费者Lag原因:消费者处理完但offset提交前崩溃,重启后重新消费 解决:业务层做幂等处理 - 数据库唯一键约束 - Redis 记录已处理的 messageId - 数据库乐观锁(版本号)Kafka 只保证同一Partition内有序 做法: - 需要有序的消息使用相同的 key → 进入同一Partition - 如:同一订单的所有事件用 orderId 作为 key# 查看某个消费者组的消费进度 kafka-consumer-groups.sh \ --bootstrap-server 172.17.0.7:9092 \ --describe \ --group order-service-group # 输出: # TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG # order-topic 0 100 105 5 ← 落后5条生产者端: - acks=all + retries=3 保证发送可靠性 - enable.idempotence=true 防止重复发送 消费者端: - 手动提交offset,处理失败不提交 - 配置重试 + 死信队列兜底最后更新:2026年5月