Java + Spring Boot 操作 Kafka 完整学习指南
2026/5/25 7:55:02 网站建设 项目流程

前置条件:ZooKeeper 集群 + Kafka 集群已启动(3个ZK节点 + 3个Broker) Broker 地址:172.17.0.7:9092, 172.17.0.7:9093, 172.17.0.7:9094


第一阶段:原生 Java API 操作 Kafka

目的:理解底层原理,Spring Boot 只是对这层的封装

一、Maven 依赖

<dependencies> <!-- Kafka 客户端(原生Java API) --> <dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka-clients</artifactId> <version>3.9.0</version> </dependency> <!-- 日志(可选) --> <dependency> <groupId>org.slf4j</groupId> <artifactId>slf4j-simple</artifactId> <version>1.7.36</version> </dependency> </dependencies>

二、生产者(Producer)

2.1 最简单的生产者

import org.apache.kafka.clients.producer.*; import java.util.Properties; public class SimpleProducer { public static void main(String[] args) throws Exception { // 1. 配置生产者 Properties props = new Properties(); props.put("bootstrap.servers", "172.17.0.7:9092,172.17.0.7:9093,172.17.0.7:9094"); props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer"); // 2. 创建生产者 KafkaProducer<String, String> producer = new KafkaProducer<>(props); // 3. 发送消息 for (int i = 0; i < 10; i++) { ProducerRecord<String, String> record = new ProducerRecord<>("test-topic", "key-" + i, "hello kafka " + i); producer.send(record); System.out.println("发送消息: hello kafka " + i); } // 4. 关闭(会自动flush) producer.close(); } }

2.2 三种发送方式

// ① 发后不管(Fire and Forget)—— 性能最高,可能丢消息 producer.send(record); // ② 同步发送 —— 等待结果,性能最低,最可靠 try { RecordMetadata metadata = producer.send(record).get(); // .get() 阻塞等待 System.out.printf("发送成功 → Topic:%s Partition:%d Offset:%d%n", metadata.topic(), metadata.partition(), metadata.offset()); } catch (Exception e) { System.out.println("发送失败: " + e.getMessage()); } // ③ 异步发送(推荐)—— 性能高,有回调处理结果 producer.send(record, (metadata, exception) -> { if (exception == null) { System.out.printf("发送成功 → Partition:%d Offset:%d%n", metadata.partition(), metadata.offset()); } else { System.out.println("发送失败: " + exception.getMessage()); } });

2.3 生产者重要配置详解

Properties props = new Properties(); props.put("bootstrap.servers", "172.17.0.7:9092,172.17.0.7:9093,172.17.0.7:9094"); props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer"); // ★ acks:消息可靠性核心配置 // "0" → 发完不管,性能最高,可能丢消息 // "1" → Leader写成功就返回,Leader挂了可能丢 // "all"→ 所有ISR副本写成功才返回,最可靠(推荐生产环境) props.put("acks", "all"); // 重试次数(遇到可重试错误时) props.put("retries", 3); // 批量发送:消息积累到16KB一起发(提升吞吐量) props.put("batch.size", 16384); // 等待时间:即使不到16KB,等1ms也发出去(减少延迟) props.put("linger.ms", 1); // 缓冲区大小:生产者本地缓存32MB,满了send()会阻塞 props.put("buffer.memory", 33554432); // 幂等性:开启后自动去重,防止重试导致的消息重复 props.put("enable.idempotence", true);

2.4 消息分区策略

// 情况1:指定分区 → 直接发到该分区 new ProducerRecord<>("test-topic", 0, "key", "value"); // 发到分区0 // 情况2:指定key → 对key做hash,相同key一定进同一分区(保证有序) new ProducerRecord<>("test-topic", "user-001", "登录消息"); new ProducerRecord<>("test-topic", "user-001", "下单消息"); // 上面两条消息一定在同一分区,保证user-001的消息有序 // 情况3:不指定key → 轮询分配到各分区(负载均衡) new ProducerRecord<>("test-topic", "hello");

三、消费者(Consumer)

3.1 最简单的消费者

import org.apache.kafka.clients.consumer.*; import java.time.Duration; import java.util.*; public class SimpleConsumer { public static void main(String[] args) { // 1. 配置消费者 Properties props = new Properties(); props.put("bootstrap.servers", "172.17.0.7:9092,172.17.0.7:9093,172.17.0.7:9094"); props.put("group.id", "my-consumer-group"); // 消费者组ID(重要!) props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); // 2. 创建消费者 KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props); // 3. 订阅 Topic consumer.subscribe(Collections.singletonList("test-topic")); // 4. 持续拉取消息 while (true) { // poll():向Broker拉取消息,最多等待1秒 ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1000)); for (ConsumerRecord<String, String> record : records) { System.out.printf("收到消息 → Topic:%s Partition:%d Offset:%d Key:%s Value:%s%n", record.topic(), record.partition(), record.offset(), record.key(), record.value()); } } } }

3.2 消费者组(Consumer Group)核心概念

Topic: test-topic(3个分区) 消费者组A(3个消费者): 消费者组B(1个消费者): Consumer1 → Partition0 Consumer1 → Partition0 Consumer2 → Partition1 → Partition1 Consumer3 → Partition2 → Partition2 结论: - 同一组内:每条消息只被一个消费者消费(负载均衡) - 不同组间:每条消息都会被每个组各消费一次(广播) - 消费者数 > 分区数:多余的消费者空闲,等待接管

3.3 offset 提交方式

// ① 自动提交(默认)—— 简单,但可能重复消费或丢失 props.put("enable.auto.commit", "true"); props.put("auto.commit.interval.ms", "5000"); // 每5秒自动提交一次 // ② 手动同步提交 —— 处理完消息再提交,更可靠 props.put("enable.auto.commit", "false"); while (true) { ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1000)); for (ConsumerRecord<String, String> record : records) { // 处理消息... System.out.println("处理消息: " + record.value()); } consumer.commitSync(); // 处理完一批再提交(阻塞等待) } // ③ 手动异步提交 —— 性能更好(推荐) consumer.commitAsync((offsets, exception) -> { if (exception != null) { System.out.println("提交失败: " + exception.getMessage()); } });

3.4 消费者重要配置详解

// 从哪里开始消费(当消费者组第一次启动,没有记录offset时) // "earliest" → 从最早的消息开始(--from-beginning) // "latest" → 只消费启动后的新消息(默认) props.put("auto.offset.reset", "earliest"); // 一次poll最多拉取多少条 props.put("max.poll.records", 500); // poll间隔超过这个时间,broker认为消费者挂了,触发Rebalance props.put("max.poll.interval.ms", 300000); // 5分钟 // 心跳间隔(消费者定期向broker报活) props.put("heartbeat.interval.ms", 3000); // 超过这个时间没心跳,认为消费者挂了 props.put("session.timeout.ms", 30000);

四、Rebalance(再均衡)机制

什么是Rebalance: 消费者组内成员发生变化时,Kafka重新分配分区给消费者的过程 触发时机: 1. 新消费者加入组 2. 消费者离开组(正常关闭或崩溃) 3. Topic分区数变化 Rebalance过程(STW): 所有消费者停止消费 → GroupCoordinator重新分配 → 消费者恢复消费 ↑ 这段时间不能消费!所以Rebalance要尽量避免频繁发生 避免不必要Rebalance的方法: - 合理设置 session.timeout.ms 和 heartbeat.interval.ms - 处理消息不要太慢,避免超过 max.poll.interval.ms

第二阶段:Spring Boot 操作 Kafka

一、项目搭建

1.1 Maven 依赖

<parent> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-parent</artifactId> <version>3.2.0</version> </parent> <dependencies> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter</artifactId> </dependency> <!-- Spring Kafka(包含了kafka-clients) --> <dependency> <groupId>org.springframework.kafka</groupId> <artifactId>spring-kafka</artifactId> </dependency> <!-- 用于对象序列化 --> <dependency> <groupId>com.fasterxml.jackson.core</groupId> <artifactId>jackson-databind</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-test</artifactId> <scope>test</scope> </dependency> <dependency> <groupId>org.springframework.kafka</groupId> <artifactId>spring-kafka-test</artifactId> <scope>test</scope> </dependency> </dependencies>

1.2 application.yml 核心配置

spring: kafka: # Broker 地址列表 bootstrap-servers: 172.17.0.7:9092,172.17.0.7:9093,172.17.0.7:9094 # 生产者配置 producer: key-serializer: org.apache.kafka.common.serialization.StringSerializer value-serializer: org.apache.kafka.common.serialization.StringSerializer acks: all # 最高可靠性 retries: 3 batch-size: 16384 linger-ms: 1 buffer-memory: 33554432 # 如果发送的是对象,改用 JsonSerializer: # value-serializer: org.springframework.kafka.support.serializer.JsonSerializer # 消费者配置 consumer: group-id: my-spring-group # 消费者组ID auto-offset-reset: earliest # 第一次启动从最早消息开始 enable-auto-commit: false # 关闭自动提交(让Spring管理) key-deserializer: org.apache.kafka.common.serialization.StringDeserializer value-deserializer: org.apache.kafka.common.serialization.StringDeserializer max-poll-records: 100 # 如果消费的是对象,改用 JsonDeserializer: # value-deserializer: org.springframework.kafka.support.serializer.JsonDeserializer # properties: # spring.json.trusted.packages: "*" # 监听器配置 listener: ack-mode: manual_immediate # 手动提交offset concurrency: 3 # 3个消费者线程(建议 = 分区数) type: batch # batch=批量消费,single=逐条消费

二、生产者

2.1 发送字符串消息

import org.springframework.kafka.core.KafkaTemplate; import org.springframework.kafka.support.SendResult; import org.springframework.stereotype.Service; import java.util.concurrent.CompletableFuture; @Service public class KafkaProducerService { // Spring 自动注入,无需手动创建 private final KafkaTemplate<String, String> kafkaTemplate; public KafkaProducerService(KafkaTemplate<String, String> kafkaTemplate) { this.kafkaTemplate = kafkaTemplate; } // ① 最简单:只发消息内容 public void sendMessage(String message) { kafkaTemplate.send("test-topic", message); } // ② 带 key(相同key进同一分区,保证有序) public void sendWithKey(String key, String message) { kafkaTemplate.send("test-topic", key, message); } // ③ 指定分区 public void sendToPartition(int partition, String key, String message) { kafkaTemplate.send("test-topic", partition, key, message); } // ④ 异步回调(推荐) public void sendWithCallback(String message) { CompletableFuture<SendResult<String, String>> future = kafkaTemplate.send("test-topic", message); future.whenComplete((result, ex) -> { if (ex == null) { System.out.printf("发送成功 → Partition:%d Offset:%d%n", result.getRecordMetadata().partition(), result.getRecordMetadata().offset()); } else { System.out.println("发送失败: " + ex.getMessage()); } }); } // ⑤ 同步发送(等待结果) public void sendSync(String message) throws Exception { SendResult<String, String> result = kafkaTemplate.send("test-topic", message).get(); System.out.printf("同步发送成功 → Partition:%d Offset:%d%n", result.getRecordMetadata().partition(), result.getRecordMetadata().offset()); } }

2.2 发送对象(JSON)

// 实体类 public class OrderEvent { private String orderId; private String userId; private Double amount; private String status; // getter/setter/构造方法省略 }
# application.yml 中改为 JsonSerializer spring: kafka: producer: value-serializer: org.springframework.kafka.support.serializer.JsonSerializer
// 注入对象类型的 KafkaTemplate @Service public class OrderProducerService { private final KafkaTemplate<String, OrderEvent> kafkaTemplate; public OrderProducerService(KafkaTemplate<String, OrderEvent> kafkaTemplate) { this.kafkaTemplate = kafkaTemplate; } public void sendOrder(OrderEvent order) { // 用 orderId 作为 key,保证同一订单的消息有序 kafkaTemplate.send("order-topic", order.getOrderId(), order); System.out.println("发送订单事件: " + order.getOrderId()); } }

三、消费者

3.1 基础消费(逐条)

import org.apache.kafka.clients.consumer.ConsumerRecord; import org.springframework.kafka.annotation.KafkaListener; import org.springframework.kafka.support.Acknowledgment; import org.springframework.stereotype.Component; @Component public class KafkaConsumerService { // 最简单的监听 @KafkaListener(topics = "test-topic", groupId = "my-spring-group") public void listen(String message) { System.out.println("收到消息: " + message); } // 获取消息元数据(partition、offset等) @KafkaListener(topics = "test-topic", groupId = "my-spring-group") public void listenWithMeta(ConsumerRecord<String, String> record) { System.out.printf("收到消息 → Partition:%d Offset:%d Key:%s Value:%s%n", record.partition(), record.offset(), record.key(), record.value()); } // 手动提交 offset(需要 yml 中配置 ack-mode: manual_immediate) @KafkaListener(topics = "test-topic", groupId = "my-spring-group") public void listenWithAck(ConsumerRecord<String, String> record, Acknowledgment ack) { try { System.out.println("处理消息: " + record.value()); // 处理成功后手动提交 ack.acknowledge(); } catch (Exception e) { // 处理失败不提交,下次重新消费 System.out.println("处理失败,不提交offset: " + e.getMessage()); } } }

3.2 批量消费(推荐,性能更好)

# yml 中配置 spring: kafka: listener: type: batch # 开启批量模式
@KafkaListener(topics = "test-topic", groupId = "my-spring-group") public void batchListen(List<ConsumerRecord<String, String>> records, Acknowledgment ack) { System.out.println("批量收到 " + records.size() + " 条消息"); for (ConsumerRecord<String, String> record : records) { System.out.printf("Partition:%d Offset:%d Value:%s%n", record.partition(), record.offset(), record.value()); // 处理每条消息... } // 整批处理完后提交一次(比逐条提交性能好很多) ack.acknowledge(); }

3.3 消费对象(JSON)

spring: kafka: consumer: value-deserializer: org.springframework.kafka.support.serializer.JsonDeserializer properties: spring.json.trusted.packages: "com.yourpackage.dto" # 信任的包名 spring.json.value.default.type: "com.yourpackage.dto.OrderEvent"
@KafkaListener(topics = "order-topic", groupId = "order-group") public void handleOrder(ConsumerRecord<String, OrderEvent> record, Acknowledgment ack) { OrderEvent order = record.value(); System.out.printf("收到订单 → ID:%s 用户:%s 金额:%.2f%n", order.getOrderId(), order.getUserId(), order.getAmount()); // 处理订单业务逻辑... ack.acknowledge(); }

3.4 监听多个 Topic / 指定分区

// 监听多个 Topic @KafkaListener(topics = {"topic1", "topic2"}, groupId = "my-group") public void listenMultipleTopics(ConsumerRecord<String, String> record) { System.out.println("来自 " + record.topic() + ": " + record.value()); } // 指定消费某个分区(从offset=0开始) @KafkaListener( groupId = "my-group", topicPartitions = { @TopicPartition(topic = "test-topic", partitionOffsets = { @PartitionOffset(partition = "0", initialOffset = "0"), @PartitionOffset(partition = "1", initialOffset = "0") }) } ) public void listenSpecificPartition(ConsumerRecord<String, String> record) { System.out.println("分区" + record.partition() + ": " + record.value()); }

四、高级特性

4.1 消息头(Header)传递额外信息

// 生产者:发送时携带 Header import org.apache.kafka.common.header.internals.RecordHeader; import org.springframework.messaging.Message; import org.springframework.messaging.support.MessageBuilder; public void sendWithHeader(String message) { Message<String> msg = MessageBuilder .withPayload(message) .setHeader("source", "order-service") .setHeader("version", "v1") .setHeader(KafkaHeaders.TOPIC, "test-topic") .build(); kafkaTemplate.send(msg); } // 消费者:读取 Header @KafkaListener(topics = "test-topic") public void listenWithHeader( ConsumerRecord<String, String> record, @Header(value = "source", required = false) String source) { System.out.println("来源: " + source + " 消息: " + record.value()); }

4.2 错误处理与重试

import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory; import org.springframework.kafka.listener.DefaultErrorHandler; import org.springframework.util.backoff.FixedBackOff; @Configuration public class KafkaConfig { @Bean public DefaultErrorHandler errorHandler() { // 重试2次,每次间隔1秒 FixedBackOff backOff = new FixedBackOff(1000L, 2); DefaultErrorHandler handler = new DefaultErrorHandler(backOff); // 某些异常不重试,直接跳过 handler.addNotRetryableExceptions(IllegalArgumentException.class); return handler; } @Bean public ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory( ConsumerFactory<String, String> consumerFactory, DefaultErrorHandler errorHandler) { ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>(); factory.setConsumerFactory(consumerFactory); factory.setCommonErrorHandler(errorHandler); return factory; } }

4.3 死信队列(DLT)

// 重试多次后仍然失败的消息,自动发到死信Topic(原Topic名+".DLT") import org.springframework.kafka.listener.DeadLetterPublishingRecoverer; @Bean public DefaultErrorHandler errorHandler(KafkaTemplate<Object, Object> template) { // 失败消息发送到死信队列 DeadLetterPublishingRecoverer recoverer = new DeadLetterPublishingRecoverer(template); // 重试3次后进死信队列 FixedBackOff backOff = new FixedBackOff(1000L, 3); return new DefaultErrorHandler(recoverer, backOff); } // 消费死信队列 @KafkaListener(topics = "test-topic.DLT", groupId = "dlt-group") public void handleDeadLetter(ConsumerRecord<String, String> record) { System.out.println("死信消息(需人工处理): " + record.value()); // 记录日志、告警、人工干预... }

4.4 事务(保证消息发送原子性)

spring: kafka: producer: transaction-id-prefix: tx- # 开启事务
@Service public class TransactionalProducerService { private final KafkaTemplate<String, String> kafkaTemplate; // @Transactional 注解:方法内所有消息要么全发成功,要么全回滚 @Transactional public void sendTransactional() { kafkaTemplate.send("topic1", "消息1"); kafkaTemplate.send("topic2", "消息2"); // 如果这里抛异常,上面两条消息都不会发出去 if (Math.random() > 0.5) { throw new RuntimeException("模拟业务异常"); } kafkaTemplate.send("topic3", "消息3"); } }

4.5 动态创建 Topic

import org.apache.kafka.clients.admin.NewTopic; import org.springframework.kafka.config.TopicBuilder; @Configuration public class KafkaTopicConfig { // Spring Boot 启动时自动创建(如果不存在) @Bean public NewTopic orderTopic() { return TopicBuilder.name("order-topic") .partitions(3) .replicas(3) .build(); } @Bean public NewTopic userTopic() { return TopicBuilder.name("user-topic") .partitions(3) .replicas(2) .build(); } }

五、完整实战案例:订单消息系统

项目结构

src/main/java/com/example/kafka/ ├── KafkaApplication.java ├── config/ │ ├── KafkaTopicConfig.java ← Topic 配置 │ └── KafkaConsumerConfig.java ← 消费者工厂配置 ├── dto/ │ └── OrderEvent.java ← 消息实体 ├── producer/ │ └── OrderProducer.java ← 生产者 └── consumer/ ├── OrderConsumer.java ← 正常消费者 └── OrderDltConsumer.java ← 死信队列消费者

OrderEvent.java

package com.example.kafka.dto; public class OrderEvent { private String orderId; private String userId; private Double amount; private String status; // CREATED / PAID / SHIPPED / COMPLETED public OrderEvent() {} public OrderEvent(String orderId, String userId, Double amount, String status) { this.orderId = orderId; this.userId = userId; this.amount = amount; this.status = status; } // getter & setter public String getOrderId() { return orderId; } public void setOrderId(String orderId) { this.orderId = orderId; } public String getUserId() { return userId; } public void setUserId(String userId) { this.userId = userId; } public Double getAmount() { return amount; } public void setAmount(Double amount) { this.amount = amount; } public String getStatus() { return status; } public void setStatus(String status) { this.status = status; } @Override public String toString() { return String.format("OrderEvent{orderId='%s', userId='%s', amount=%.2f, status='%s'}", orderId, userId, amount, status); } }

KafkaTopicConfig.java

package com.example.kafka.config; import org.apache.kafka.clients.admin.NewTopic; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.kafka.config.TopicBuilder; @Configuration public class KafkaTopicConfig { @Bean public NewTopic orderTopic() { return TopicBuilder.name("order-topic") .partitions(3) .replicas(3) .build(); } }

OrderProducer.java

package com.example.kafka.producer; import com.example.kafka.dto.OrderEvent; import org.springframework.kafka.core.KafkaTemplate; import org.springframework.stereotype.Service; @Service public class OrderProducer { private final KafkaTemplate<String, OrderEvent> kafkaTemplate; public OrderProducer(KafkaTemplate<String, OrderEvent> kafkaTemplate) { this.kafkaTemplate = kafkaTemplate; } public void sendOrder(OrderEvent order) { kafkaTemplate.send("order-topic", order.getOrderId(), order) .whenComplete((result, ex) -> { if (ex == null) { System.out.printf("[Producer] 订单发送成功 → %s Partition:%d%n", order.getOrderId(), result.getRecordMetadata().partition()); } else { System.out.println("[Producer] 发送失败: " + ex.getMessage()); } }); } }

OrderConsumer.java

package com.example.kafka.consumer; import com.example.kafka.dto.OrderEvent; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.springframework.kafka.annotation.KafkaListener; import org.springframework.kafka.support.Acknowledgment; import org.springframework.stereotype.Component; @Component public class OrderConsumer { @KafkaListener( topics = "order-topic", groupId = "order-service-group", containerFactory = "kafkaListenerContainerFactory" ) public void handleOrder(ConsumerRecord<String, OrderEvent> record, Acknowledgment ack) { OrderEvent order = record.value(); System.out.printf("[Consumer] 收到订单 Partition:%d Offset:%d → %s%n", record.partition(), record.offset(), order); try { // 模拟业务处理 processOrder(order); ack.acknowledge(); // 处理成功,提交offset } catch (Exception e) { // 不提交offset,交给重试机制处理 System.out.println("[Consumer] 处理失败: " + e.getMessage()); throw e; // 抛出让 ErrorHandler 处理重试 } } private void processOrder(OrderEvent order) { System.out.println("[Consumer] 处理订单业务逻辑: " + order.getOrderId()); // 实际业务:更新数据库、调用其他服务等 } }

KafkaApplication.java(启动类 + 测试)

package com.example.kafka; import com.example.kafka.dto.OrderEvent; import com.example.kafka.producer.OrderProducer; import org.springframework.boot.CommandLineRunner; import org.springframework.boot.SpringApplication; import org.springframework.boot.autoconfigure.SpringBootApplication; import org.springframework.context.annotation.Bean; @SpringBootApplication public class KafkaApplication { public static void main(String[] args) { SpringApplication.run(KafkaApplication.class, args); } // 启动后自动发送测试消息 @Bean public CommandLineRunner runner(OrderProducer producer) { return args -> { Thread.sleep(2000); // 等消费者就绪 for (int i = 1; i <= 5; i++) { OrderEvent order = new OrderEvent( "ORDER-" + i, "USER-" + (i % 3 + 1), 100.0 * i, "CREATED" ); producer.sendOrder(order); Thread.sleep(500); } }; } }

六、原生 API vs Spring Boot 对比

功能原生 Java APISpring Boot
配置手动写 Propertiesapplication.yml 声明式配置
生产者KafkaProducer+ 手动管理KafkaTemplate自动注入
消费者while(true)手动轮询@KafkaListener注解驱动
offset提交完全手动自动/手动可选(ack-mode)
错误处理自己写try/catch+重试逻辑DefaultErrorHandler开箱即用
事务手动beginTransaction@Transactional注解
Topic创建调用AdminClient@Bean NewTopic自动创建
多线程消费自己管理线程池concurrency参数一行搞定

七、学习路线图

第1天:原生Java API ├── SimpleProducer 发送10条消息 ├── SimpleConsumer 消费并打印 └── 理解 Partition/Offset/ConsumerGroup 第2天:Spring Boot 基础 ├── 搭项目,配 yml ├── KafkaTemplate 发消息 └── @KafkaListener 收消息 第3天:Spring Boot 进阶 ├── 发送 JSON 对象 ├── 手动提交 offset └── 批量消费 第4天:高级特性 ├── 错误处理 + 重试 ├── 死信队列 └── 事务消息 第5天:实战演练 ├── 完成订单消息系统 ├── 模拟Broker宕机观察行为 └── 监控消费者Lag

八、常见问题速查

Q1: 消息重复消费怎么处理?

原因:消费者处理完但offset提交前崩溃,重启后重新消费 解决:业务层做幂等处理 - 数据库唯一键约束 - Redis 记录已处理的 messageId - 数据库乐观锁(版本号)

Q2: 消息顺序性怎么保证?

Kafka 只保证同一Partition内有序 做法: - 需要有序的消息使用相同的 key → 进入同一Partition - 如:同一订单的所有事件用 orderId 作为 key

Q3: 如何监控消费进度(Lag)?

# 查看某个消费者组的消费进度 kafka-consumer-groups.sh \ --bootstrap-server 172.17.0.7:9092 \ --describe \ --group order-service-group # 输出: # TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG # order-topic 0 100 105 5 ← 落后5条

Q4: 发送失败了怎么办?

生产者端: - acks=all + retries=3 保证发送可靠性 - enable.idempotence=true 防止重复发送 消费者端: - 手动提交offset,处理失败不提交 - 配置重试 + 死信队列兜底

最后更新:2026年5月

需要专业的网站建设服务?

联系我们获取免费的网站建设咨询和方案报价,让我们帮助您实现业务目标

立即咨询