Kafka Streams 与生态集成 —— 实时流处理实战
2026/6/16 6:32:48 网站建设 项目流程

一、Kafka Streams 概述

1.1 什么是 Kafka Streams

Kafka Streams是 Apache Kafka 开源项目的组成部分,是一个用于在 Kafka 上构建高可扩展、容错流处理应用的客户端类库

1.2 Kafka Streams 核心特点

特性说明
功能强大高扩展性、弹性、容错
轻量级无需专门集群,一个 Jar 包即可运行
完全集成100% 兼容 Kafka,易于集成现有应用
实时性毫秒级延迟,非微批处理
窗口支持允许乱序数据、迟到数据

1.3 为什么选择 Kafka Streams

对比维度: Spark Streaming Storm Kafka Streams ───────────────────────────────────────────────────────────────────── 部署复杂度: 高 高 低(嵌入应用) 资源占用: 高(预留内存) 高 低(不额外占用) 与 Kafka 集成: 需专门模块 需 spout 原生支持 状态管理: 外部系统 外部系统 内置(Kafka 持久化) 延迟: 秒级(微批) 毫秒级 毫秒级 重新计算: 困难 困难 滚动部署,自动重算

选择 Kafka Streams 的 6 大理由

  1. 类库而非框架:开发者控制运行方式,方便调试
  2. 部署简单:嵌入应用,无额外集群要求
  3. Kafka 生态已有:大部分流式系统已部署 Kafka,接入成本低
  4. 不占用系统资源:无 supervisor/node manager 等额外进程
  5. 数据持久化:Kafka 本身持久化,支持滚动部署和重新计算
  6. 动态调整并行度:利用 Consumer Rebalance 在线调整

二、Kafka Streams 核心概念

2.1 Topology(拓扑)

组件作用类比
Source从 Kafka Topic 读取数据数据源
Processor处理/转换数据算子
Sink将结果写入 Kafka Topic数据汇

2.2 Stream 与 Table

Stream(流): Table(表): ┌─────────────────────────┐ ┌─────────────────────────┐ │ 时间 → 事件序列 │ │ Key → 最新 Value │ │ │ │ │ │ t1: K1->V1 │ │ K1 -> V3 (最新) │ │ t2: K2->V2 │ ──▶ │ K2 -> V4 (最新) │ │ t3: K1->V3 │ 聚合 │ K3 -> V5 (最新) │ │ t4: K2->V4 │ │ │ │ t5: K3->V5 │ │ 有状态的视图 │ └─────────────────────────┘ └─────────────────────────┘ KStream: 每个事件独立处理(插入) KTable: 相同 Key 更新(Upsert)

三、Kafka Streams 数据清洗实战

3.1 需求分析

需求:实时处理单词带有>>>前缀的内容,过滤前缀后输出。

输入 Topic "first": 输出 Topic "second": ┌─────────────────┐ ┌─────────────────┐ │ hello>>>world │ │ world │ │ h>>>atguigu │ ──清洗──▶ │ atguigu │ │ hahaha │ │ hahaha │ └─────────────────┘ └─────────────────┘

3.2 完整代码实现

① 主程序:构建 Topology
packagecom.atguigu.kafka.stream;importorg.apache.kafka.streams.KafkaStreams;importorg.apache.kafka.streams.StreamsConfig;importorg.apache.kafka.streams.processor.Processor;importorg.apache.kafka.streams.processor.ProcessorSupplier;importorg.apache.kafka.streams.processor.TopologyBuilder;importjava.util.Properties;publicclassApplication{publicstaticvoidmain(String[]args){// 定义输入/输出 TopicStringfrom="first";Stringto="second";// 设置参数Propertiessettings=newProperties();settings.put(StreamsConfig.APPLICATION_ID_CONFIG,"logFilter");settings.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG,"hadoop102:9092");StreamsConfigconfig=newStreamsConfig(settings);// 构建拓扑TopologyBuilderbuilder=newTopologyBuilder();builder.addSource("SOURCE",from)// 添加数据源.addProcessor("PROCESS",newProcessorSupplier<byte[],byte[]>(){@OverridepublicProcessor<byte[],byte[]>get(){// 返回自定义处理器returnnewLogProcessor();}},"SOURCE")// 依赖 SOURCE.addSink("SINK",to,"PROCESS");// 添加数据汇,依赖 PROCESS// 创建并启动 Kafka StreamsKafkaStreamsstreams=newKafkaStreams(builder,config);streams.start();}}
② 业务处理器:LogProcessor
packagecom.atguigu.kafka.stream;importorg.apache.kafka.streams.processor.Processor;importorg.apache.kafka.streams.processor.ProcessorContext;publicclassLogProcessorimplementsProcessor<byte[],byte[]>{privateProcessorContextcontext;@Overridepublicvoidinit(ProcessorContextcontext){this.context=context;}@Overridepublicvoidprocess(byte[]key,byte[]value){Stringinput=newString(value);// 如果包含 ">>>" 则只保留该标记后面的内容if(input.contains(">>>")){input=input.split(">>>")[1].trim();}// 输出到下一个 Topiccontext.forward("logProcessor".getBytes(),input.getBytes());}@Overridepublicvoidpunctuate(longtimestamp){// 周期性操作(如定时刷新状态)}@Overridepublicvoidclose(){// 资源清理}}

3.3 测试验证

Step 1:启动 Kafka Streams 程序

Step 2:在 hadoop104 上启动生产者

bin/kafka-console-producer.sh\--broker-list hadoop102:9092\--topicfirst>hello>>>world>h>>>atguigu>hahaha

Step 3:在 hadoop103 上启动消费者

bin/kafka-console-consumer.sh\--zookeeperhadoop102:2181\--from-beginning\--topicsecond

输出结果

world atguigu hahaha

四、Kafka 与 Flume 集成

4.1 定位对比

维度FlumeKafka
开发公司ClouderaLinkedIn
适用场景多个生产者,下游消费者少下游消费者众多
数据安全一般高(支持 Replication)
Hadoop 生态原生对接需连接器
吞吐量中等极高

4.2 经典数据采集架构

4.3 Flume 配置:写入 Kafka

配置文件flume-kafka.conf

# Agent 定义 a1.sources = r1 a1.sinks = k1 a1.channels = c1 # ========== Source 配置 ========== a1.sources.r1.type = exec a1.sources.r1.command = tail -F -c +0 /opt/module/datas/flume.log a1.sources.r1.shell = /bin/bash -c # ========== Sink 配置(Kafka) ========== a1.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink # Kafka 集群地址 a1.sinks.k1.kafka.bootstrap.servers = hadoop102:9092,hadoop103:9092,hadoop104:9092 # 目标 Topic a1.sinks.k1.kafka.topic = first # 批次大小(条) a1.sinks.k1.kafka.flumeBatchSize = 20 # ACK 级别 a1.sinks.k1.kafka.producer.acks = 1 # 发送延迟 a1.sinks.k1.kafka.producer.linger.ms = 1 # ========== Channel 配置 ========== a1.channels.c1.type = memory a1.channels.c1.capacity = 1000 a1.channels.c1.transactionCapacity = 100 # ========== 绑定 ========== a1.sources.r1.channels = c1 a1.sinks.k1.channel = c1

4.4 启动与验证

Step 1:启动 Kafka 消费者(IDEA 或命令行)

bin/kafka-console-consumer.sh\--zookeeperhadoop102:2181\--from-beginning\--topicfirst

Step 2:启动 Flume Agent

cd/opt/module/flume bin/flume-ng agent\-cconf/\-na1\-fjobs/flume-kafka.conf

Step 3:向日志文件追加数据

echo"hello kafka from flume">>/opt/module/datas/flume.log

Step 4:观察 Kafka 消费者输出

hello kafka from flume

五、Kafka 配置参数大全

5.1 Broker 核心配置

参数默认值描述
broker.id-Broker 唯一标识
log.dirs/tmp/kafka-logs数据存储目录
num.partitions1默认分区数
default.replication.factor1默认副本数
log.retention.hours168日志保留时间(小时)
log.segment.bytes1073741824Segment 文件大小
num.network.threads3网络处理线程数
num.io.threads8IO 处理线程数
socket.send.buffer.bytes102400发送缓冲区
socket.receive.buffer.bytes102400接收缓冲区
zookeeper.connect-ZK 连接地址
delete.topic.enablefalse允许删除 Topic
auto.create.topics.enabletrue自动创建 Topic
min.insync.replicas1最小 ISR 副本数
unclean.leader.election.enabletrue允许非 ISR 选举 Leader

5.2 Producer 核心配置

参数默认值描述
bootstrap.servers-Kafka 集群地址
acks1ACK 级别(0/1/all)
retries0发送失败重试次数
batch.size16384批次大小(字节)
linger.ms0发送延迟(毫秒)
buffer.memory33554432缓冲区大小
compression.typenone压缩类型
key.serializer-Key 序列化器
value.serializer-Value 序列化器

5.3 Consumer 核心配置

参数默认值描述
bootstrap.servers-Kafka 集群地址
group.id-消费者组 ID
enable.auto.committrue自动提交 Offset
auto.commit.interval.ms5000自动提交间隔
auto.offset.resetlatest无 Offset 时起始位置
key.deserializer-Key 反序列化器
value.deserializer-Value 反序列化器
max.poll.records500单次最大拉取数
session.timeout.ms10000会话超时时间

如果本专栏对你有帮助,欢迎点赞 👍 + 收藏 ⭐ + 关注 🔖,你的支持是我持续创作的动力!

需要专业的网站建设服务?

联系我们获取免费的网站建设咨询和方案报价,让我们帮助您实现业务目标

立即咨询