Kafka Streams实战:从入门到精通
2026/5/27 4:58:59 网站建设 项目流程

Kafka Streams实战:从入门到精通

引言

Kafka Streams是Apache Kafka生态系统中用于构建实时流处理应用的核心库。它提供了轻量级、高效的流处理能力,使得开发者能够像编写普通应用程序一样编写复杂的流处理逻辑,无需额外部署独立的流处理集群。本文将深入探讨Kafka Streams的架构、API使用、状态管理以及实际应用场景。

Kafka Streams基础

1.1 什么是Kafka Streams

Kafka Streams是一个用于构建实时流处理应用的客户端库,具有以下特点:

  • 轻量级:无需独立集群,直接嵌入应用程序
  • 高扩展性:自动处理负载均衡和故障转移
  • 低延迟:毫秒级处理延迟
  • Exactly-Once语义:保证端到端的数据一致性
  • 简化开发:使用Kafka原生API,易于学习和使用
<!-- Maven依赖 --> <dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka-streams</artifactId> <version>3.6.0</version> </dependency>

1.2 核心概念

Kafka Streams有几个核心概念需要理解:

import org.apache.kafka.streams.*; import org.apache.kafka.streams.kstream.*; import org.apache.kafka.common.serialization.Serdes; import java.util.Arrays; import java.util.Properties; public class KafkaStreamsConcepts { public static void main(String[] args) { Properties props = new Properties(); props.put(ApplicationConfig.APPLICATION_ID_CONFIG, "streams-wordcount"); props.put(bootstrapServersConfig, "localhost:9092"); props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass()); props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass()); StreamsBuilder builder = new StreamsBuilder(); // 从输入主题读取 KStream<String, String> source = builder.stream("streams-plaintext-input"); // 转换操作 KStream<String, String> words = source .flatMapValues(value -> Arrays.asList(value.toLowerCase().split("\\W+"))); // 聚合统计 KTable<String, Long> wordCounts = words .groupBy((key, value) -> value) .count(Materialized.as("counts")); // 输出到目标主题 wordCounts.toStream().to("streams-wordcount-output"); // 构建拓扑 KafkaStreams streams = new KafkaStreams( builder.build(), props); streams.start(); } }

Kafka Streams架构

2.1 拓扑结构

Kafka Streams应用程序由处理器拓扑(Topology)组成,拓扑定义了数据流的处理逻辑:

┌─────────────────────────────────────────────────────┐ │ Processor Topology │ │ │ │ Source Processor │ │ │ │ │ ▼ │ │ Processor 1 ────────────────────► Processor 2 │ │ │ │ │ │ ▼ ▼ │ │ State Store Sink Processor │ │ │ └─────────────────────────────────────────────────────┘
public class TopologyExample { public static Topology createWordCountTopology() { StreamsBuilder builder = new StreamsBuilder(); // 源处理器:读取输入主题 KStream<String, String> source = builder.stream("word-count-input", Consumed.with(Serdes.String(), Serdes.String())); // 处理器1:分词并转换为小写 KStream<String, String> lowerCased = source .flatMapValues(textLine -> Arrays.asList(textLine.toLowerCase().split("\\W+"))); // 处理器2:过滤空字符串 KStream<String, String> filtered = lowerCased .filter((key, word) -> !word.isEmpty()); // 状态处理器:聚合统计 KTable<String, Long> wordCounts = filtered .groupBy((key, word) -> KeyValue.pair(word, word)) .count(Materialized.as("word-counts-store")); // 处理器3:格式化输出 KStream<String, String> output = wordCounts.toStream() .map((word, count) -> KeyValue.pair(word, word + " -> " + count)); // sink处理器:写入输出主题 output.to("word-count-output", Produced.with(Serdes.String(), Serdes.String())); return builder.build(); } }

2.2 处理模式

Kafka Streams支持两种处理模式:

public class ProcessingModes { public static void AT_LEAST_ONCE() { Properties props = new Properties(); props.put(StreamsConfig.PROCESSING_GUARANTEE_CONFIG, StreamsConfig.AT_LEAST_ONCE_V2); StreamsBuilder builder = new StreamsBuilder(); // 处理逻辑 KafkaStreams streams = new KafkaStreams( builder.build(), props); streams.start(); } public static void EXACTLY_ONCE_V2() { Properties props = new Properties(); props.put(StreamsConfig.PROCESSING_GUARANTEE_CONFIG, StreamsConfig.EXACTLY_ONCE_V2); StreamsBuilder builder = new StreamsBuilder(); // 处理逻辑 KafkaStreams streams = new KafkaStreams( builder.build(), props); streams.start(); } }

KStream与KTable

3.1 KStream:事件流

KStream表示无界的连续事件流,每个事件都是独立的:

public class KStreamOperations { public static void demonstrateKStream() { StreamsBuilder builder = new StreamsBuilder(); // 创建KStream KStream<String, Order> orders = builder.stream("orders", Consumed.with(Serdes.String(), new JsonSerde<>(Order.class))); // filter操作 KStream<String, Order> vipOrders = orders .filter((key, order) -> order.isVip()); // map操作 KStream<String, OrderConfirm> confirmations = orders .map((key, order) -> KeyValue.pair(key, new OrderConfirm(order))); // flatMap操作 KStream<String, OrderItem> items = orders .flatMap((key, order) -> order.getItems().stream() .map(item -> KeyValue.pair(item.getId(), item)) .collect(Collectors.toList())); // branch操作:分流 KStream<String, Order>[] branches = orders.branch( (key, order) -> order.getStatus().equals("PENDING"), (key, order) -> order.getStatus().equals("PROCESSING"), (key, order) -> order.getStatus().equals("COMPLETED") ); KStream<String, Order> pendingOrders = branches[0]; KStream<String, Order> processingOrders = branches[1]; KStream<String, Order> completedOrders = branches[2]; // merge操作:合流 KStream<String, Order> allOrders = pendingOrders.merge(processingOrders); // 输出 allOrders.to("processed-orders"); } }

3.2 KTable:状态表

KTable表示按Key聚合的最新状态视图:

public class KTableOperations { public static void demonstrateKTable() { StreamsBuilder builder = new StreamsBuilder(); // 创建KTable KTable<String, User> users = builder.table("users", Consumed.with(Serdes.String(), new JsonSerde<>(User.class))); // filter操作 KTable<String, User> activeUsers = users .filter((key, user) -> user.isActive()); // mapValues操作 KTable<String, String> userNames = users .mapValues(user -> user.getName()); // groupBy操作 KTable<String, String> userByCity = users .groupBy((key, user) -> KeyValue.pair(user.getCity(), user.getName())) .reduce((name1, name2) -> name1 + ", " + name2); // 聚合操作 KTable<String, Long> orderCounts = builder.stream("orders", Consumed.with(Serdes.String(), new JsonSerde<>(Order.class))) .groupBy((key, order) -> KeyValue.pair(order.getUserId(), order)) .count(Materialized.as("order-counts")); // 连接操作 KTable<String, UserOrderSummary> userSummaries = users .join(orderCounts, (user, count) -> new UserOrderSummary(user, count)); userSummaries.toStream().to("user-summaries"); } }

3.3 KStream与KTable交互

public class KStreamKTableInteraction { public static void demonstrateInteraction() { StreamsBuilder builder = new StreamsBuilder(); // 创建KTable(用户信息表) KTable<String, User> users = builder.table("users"); // 创建KStream(订单事件流) KStream<String, Order> orders = builder.stream("orders"); // KStream与KTable的join KStream<String, EnrichedOrder> enrichedOrders = orders .join(users, (order, user) -> new EnrichedOrder(order, user), Joined.keySerde(Serdes.String()) .withValueSerde(new JsonSerde<>(Order.class))); enrichedOrders.to("enriched-orders"); // KTable与KTable的join KTable<String, UserProfile> profiles = builder.table("profiles"); KTable<String, UserFullInfo> userFullInfo = users .join(profiles, (user, profile) -> new UserFullInfo(user, profile)); userFullInfo.toStream().to("user-full-info"); } }

状态管理

4.1 状态存储

Kafka Streams使用状态存储(State Store)来保存中间处理结果:

public class StateStoreExample { public static void demonstrateStateStore() { StreamsBuilder builder = new StreamsBuilder(); // 创建源KStream KStream<String, Transaction> transactions = builder.stream("transactions"); // 使用状态存储进行聚合 KTable<String, AccountBalance> balances = transactions .groupBy((key, tx) -> KeyValue.pair(tx.getAccountId(), tx)) .aggregate( () -> new AccountBalance(), (key, tx, balance) -> balance.update(tx), Materialized.<String, AccountBalance, StateStore>as("account-balances") .withKeySerde(Serdes.String()) .withValueSerde(new JsonSerde<>(AccountBalance.class)) .withCachingEnabled() .withLoggingDisabled() ); // 从状态存储查询 ReadOnlyKeyValueStore<String, AccountBalance> store = null; // 需要从KafkaStreams实例获取 AccountBalance balance = store.get("account-123"); } public static class AccountBalance { private String accountId; private BigDecimal totalCredit; private BigDecimal totalDebit; public AccountBalance update(Transaction tx) { if (tx.isCredit()) { totalCredit = totalCredit.add(tx.getAmount()); } else { totalDebit = totalDebit.add(tx.getAmount()); } return this; } public BigDecimal getBalance() { return totalCredit.subtract(totalDebit); } } }

4.2 窗口计算

public class WindowOperations { public static void demonstrateWindows() { StreamsBuilder builder = new StreamsBuilder(); KStream<String, Event> events = builder.stream("events"); // 滚动窗口(Tumbling Window) KTable<Windowed<String>, Long> countByTumblingWindow = events.groupBy((key, event) -> KeyValue.pair(event.getUserId(), event)) .windowedBy(TimeWindows.of(Duration.ofMinutes(5))) .count(); // 跳跃窗口(Hopping Window) KTable<Windowed<String>, Long> countByHoppingWindow = events.groupBy((key, event) -> KeyValue.pair(event.getUserId(), event)) .windowedBy(TimeWindows.of(Duration.ofMinutes(5)) .advanceBy(Duration.ofMinutes(1))) .count(); // 会话窗口(Session Window) KTable<Windowed<String>, Long> sessionCounts = events.groupBy((key, event) -> KeyValue.pair(event.getUserId(), event)) .windowedBy(SessionWindows.with(Duration.ofMinutes(10)) .gap(Duration.ofMinutes(2))) .count(); // 滑动窗口(Sliding Window) KTable<Windowed<String>, Double> slidingAverage = events.groupBy((key, event) -> KeyValue.pair(event.getMetricName(), event)) .windowedBy(SlidingWindows.of(Duration.ofMinutes(5))) .aggregate( () -> new MetricAggregate(), (key, event, agg) -> agg.add(event), (agg1, agg2) -> agg1.merge(agg2), Materialized.as("sliding-metrics") ); } }

时间语义

5.1 事件时间处理

public class EventTimeProcessing { public static void demonstrateEventTime() { Properties props = new Properties(); props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass()); props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass()); // 使用事件时间 props.put(StreamsConfig.DEFAULT_TIMESTAMP_EXTRACTOR_CLASS_CONFIG, WallclockTimestampExtractor.class.getName()); StreamsBuilder builder = new StreamsBuilder(); // 从主题读取并使用自定义时间戳 KStream<String, Event> events = builder.stream("events", Consumed.with(Serdes.String(), new JsonSerde<>(Event.class)) .withTimestampExtractor((record, previousTimestamp) -> { Event event = (Event) record.value(); return event.getEventTimestamp(); })); // 窗口计算使用事件时间 KTable<Windowed<String>, Long> counts = events .groupBy((key, event) -> KeyValue.pair(event.getType(), event)) .windowedBy(TimeWindows.of(Duration.ofMinutes(5))) .count(); counts.toStream() .foreach((windowedKey, count) -> System.out.println(windowedKey.key() + " [" + windowedKey.window().start() + "-" + windowedKey.window().end() + "] = " + count)); } }

5.2 迟到数据处理

public class LateDataHandling { public static void demonstrateLateDataHandling() { StreamsBuilder builder = new StreamsBuilder(); KStream<String, Event> events = builder.stream("events"); // 定义5分钟窗口,允许1分钟迟到 TimeWindows windowSpec = TimeWindows.of(Duration.ofMinutes(5)) .grace(Duration.ofMinutes(1)); KTable<Windowed<String>, Long> counts = events .groupBy((key, event) -> KeyValue.pair(event.getUserId(), event)) .windowedBy(windowSpec) .count() .suppress(Suppressed.untilWindowCloses( Suppressed.BufferConfig.unbounded() .shutDownWhenFull())); // 记录被丢弃的迟到数据 events.filter((key, event) -> { long now = System.currentTimeMillis(); long eventTime = event.getEventTimestamp(); return now - eventTime > Duration.ofMinutes(6).toMillis(); }).foreach((key, event) -> { System.out.println("丢弃迟到数据: " + event); }); } }

实际应用场景

6.1 实时数据统计

public class RealTimeStatistics { public static void main(String[] args) { Properties props = createStreamsConfig(); StreamsBuilder builder = new StreamsBuilder(); // 读取原始数据 KStream<String, MetricEvent> metrics = builder.stream("metrics", Consumed.with(Serdes.String(), new JsonSerde<>(MetricEvent.class))); // 实时统计:每分钟每服务请求数 KTable<Windowed<String>, Long> requestsPerMinute = metrics .filter((key, metric) -> "request".equals(metric.getType())) .groupBy((key, metric) -> KeyValue.pair(metric.getServiceName(), metric)) .windowedBy(TimeWindows.of(Duration.ofMinutes(1))) .count(); // 计算每分钟平均响应时间 KTable<Windowed<String>, Double> avgResponseTime = metrics .filter((key, metric) -> metric.getResponseTime() > 0) .groupBy((key, metric) -> KeyValue.pair(metric.getServiceName(), metric)) .windowedBy(TimeWindows.of(Duration.ofMinutes(1))) .aggregate( () -> new ResponseTimeAccumulator(), (key, metric, acc) -> acc.add(metric.getResponseTime()), (key, acc1, acc2) -> acc1.merge(acc2), Materialized.as("avg-response-time") .withValueSerde(new JsonSerde<>( ResponseTimeAccumulator.class)) ) .mapValues(acc -> acc.getAverage()); // 百分位数统计 KTable<Windowed<String>, Percentiles> percentiles = metrics .groupBy((key, metric) -> KeyValue.pair(metric.getServiceName(), metric)) .windowedBy(TimeWindows.of(Duration.ofMinutes(5))) .aggregate( () -> new PercentileAccumulator(), (key, metric, acc) -> acc.add(metric.getResponseTime()), (key, acc1, acc2) -> acc1.merge(acc2), Materialized.as("percentiles") .withValueSerde(new JsonSerde<>( PercentileAccumulator.class)) ) .mapValues(acc -> acc.calculate()); // 输出到目标主题 requestsPerMinute.toStream() .map((key, value) -> KeyValue.pair(key.key(), String.format("%s,%d,%d,%d", key.key(), key.window().start(), key.window().end(), value))) .to("stats-requests-per-minute"); KafkaStreams streams = new KafkaStreams( builder.build(), props); streams.start(); } static class MetricEvent { private String serviceName; private String type; private long responseTime; private long eventTimestamp; public String getServiceName() { return serviceName; } public String getType() { return type; } public long getResponseTime() { return responseTime; } public long getEventTimestamp() { return eventTimestamp; } } static class ResponseTimeAccumulator { private long sum; private long count; public ResponseTimeAccumulator add(long value) { sum += value; count++; return this; } public ResponseTimeAccumulator merge(ResponseTimeAccumulator other) { sum += other.sum; count += other.count; return this; } public double getAverage() { return count > 0 ? (double) sum / count : 0; } } static class PercentileAccumulator { private List<Long> values = new ArrayList<>(); public PercentileAccumulator add(long value) { values.add(value); Collections.sort(values); return this; } public PercentileAccumulator merge(PercentileAccumulator other) { values.addAll(other.values); Collections.sort(values); return this; } public Percentiles calculate() { return new Percentiles( percentile(50), percentile(90), percentile(95), percentile(99) ); } private long percentile(double p) { if (values.isEmpty()) return 0; int index = (int) Math.ceil(p / 100.0 * values.size()) - 1; return values.get(Math.max(0, index)); } } static class Percentiles { private long p50, p90, p95, p99; public Percentiles(long p50, long p90, long p95, long p99) { this.p50 = p50; this.p90 = p90; this.p95 = p95; this.p99 = p99; } } }

6.2 实时告警系统

public class RealTimeAlerting { public static void main(String[] args) { StreamsBuilder builder = new StreamsBuilder(); KStream<String, SensorReading> readings = builder.stream("sensors"); // 检测异常值 KTable<String, Long> anomalyCount = readings .filter((key, reading) -> isAnomaly(reading)) .groupBy((key, reading) -> KeyValue.pair(reading.getSensorId(), reading)) .windowedBy(TimeWindows.of(Duration.ofMinutes(5))) .count(); // 触发告警 anomalyCount.toStream() .filter((key, count) -> count >= 3) .map((key, count) -> KeyValue.pair(key.key(), new Alert(key.key(), "连续5分钟内检测到" + count + "次异常", key.window().start()))) .to("alerts"); } private static boolean isAnomaly(SensorReading reading) { // 简单的异常检测逻辑 return reading.getValue() > 100 || reading.getValue() < 0; } static class SensorReading { private String sensorId; private double value; private long timestamp; } static class Alert { private String sensorId; private String message; private long timestamp; } }

性能优化

7.1 配置优化

public class PerformanceOptimization { public static Properties createOptimizedConfig() { Properties props = new Properties(); props.put(StreamsConfig.APPLICATION_ID_CONFIG, "optimized-streams-app"); props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "kafka1:9092,kafka2:9092,kafka3:9092"); // 序列化配置 props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass()); props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass()); // 并行度配置 props.put(StreamsConfig.NUM_STREAM_THREADS_CONFIG, 4); props.put(StreamsConfig.PARALLELISM_CONFIG, 3); // 缓存配置 props.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 104857600L); // 100MB // 提交配置 props.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 1000); props.put(StreamsConfig.NUM_STANDBY_REPLICAS_CONFIG, 1); // 状态存储配置 props.put(StreamsConfig.STATE_DIR_CONFIG, "/tmp/kafka-streams"); // 处理保证 props.put(StreamsConfig.PROCESSING_GUARANTEE_CONFIG, StreamsConfig.EXACTLY_ONCE_V2); return props; } }

7.2 状态存储优化

public class StateStoreOptimization { public static void optimizeStateStores() { StreamsBuilder builder = new StreamsBuilder(); // 启用缓存 KTable<String, Long> counts = builder.stream("events") .groupBy((key, value) -> KeyValue.pair(value, value)) .count(Materialized.as("counts") .withCachingEnabled() .withRetention(Duration.ofHours(24))); // RocksDB配置 KTable<String, String> userProfiles = builder.table("profiles", Consumed.with(Serdes.String(), Serdes.String()), Materialized.<String, String, KeyValueStore<Bytes, byte[]>>as("profiles-store") .withRocksDBConfigSetter((options, topic, partition) -> { options.setCompressionType( org.rocksdb.CompressionType.SNAPPY_COMPRESSION); options.setWriteBufferSize(2 * 1024 * 1024); options.setMaxWriteBufferNumber(3); options.setMaxTotalWalSize(64 * 1024 * 1024); })); } }

总结

Kafka Streams是一个功能强大的流处理框架,能够帮助开发者构建高性能、可靠的实时流处理应用。本文详细介绍了Kafka Streams的核心概念、架构、KStream与KTable操作、状态管理、时间语义以及实际应用场景。通过深入理解这些内容,开发者可以更好地应用Kafka Streams构建满足生产环境需求的流处理系统。

需要专业的网站建设服务?

联系我们获取免费的网站建设咨询和方案报价,让我们帮助您实现业务目标

立即咨询