
一、什么是 KafkaApache Kafka 是一个开源的分布式流处理平台最初由 LinkedIn 开发2011 年开源。它不仅仅是一个消息队列更是一个集消息发布/订阅、持久化存储、实时流处理于一体的分布式系统。简单来说Kafka 就像是分布式系统的数据高速公路——各个系统通过它传输数据既能异步解耦又能保证高吞吐和可靠性。Kafka 的三大核心能力能力说明发布与订阅类似消息队列生产者发送消息消费者订阅消费持久化存储消息持久化到磁盘可回溯、可重放不会消费即删除实时流处理通过 Kafka Streams / ksqlDB 实现实时计算核心设计目标高吞吐单集群轻松支持百万级消息/秒低延迟毫秒级端到端延迟可扩展水平扩展增加 Broker 即可提升容量持久化消息可保留数天甚至永久支持重复消费容错性副本机制保证数据不丢失二、核心概念理解 Kafka首先要理解以下核心概念概念说明BrokerKafka 集群中的一个节点负责消息的存储和转发Topic消息的逻辑分类生产者按 Topic 发布消息消费者按 Topic 订阅PartitionTopic 的物理分区是分布式存储和并行处理的基本单元ReplicaPartition 的副本分为 Leader处理读写和 Follower同步数据Offset消息在 Partition 中的唯一偏移量标识消费进度Consumer Group消费者组组内消费者共同消费一个 Topic 的不同 PartitionISRIn-Sync Replicas与 Leader 保持同步的副本集合关键理解一个 Topic 可以有多个 Partition分布在不同的 Broker 上每个 Partition 内部消息有序跨 Partition无序一个 Partition 只能被同一个 Consumer Group 内的一个 Consumer消费不同的 Consumer Group 之间互不影响可以独立消费同一个 Topic三、架构原理3.1 数据流转过程Kafka 的数据流转非常清晰Producer → Broker 集群 → Consumer Group生产者发送消息Producer 将消息发送到指定 TopicKafka 根据分区策略如 Key Hash将消息路由到具体的 PartitionBroker 存储消息消息被追加写入 Partition 的日志文件顺序写入同时 Follower 副本从 Leader 同步数据消费者拉取消息Consumer 从 Partition 的 Leader 副本拉取消息处理完成后提交 Offset3.2 分区机制Partition 是 Kafka 实现水平扩展和并行处理的核心Topic: order-topic ├── Partition-0 (Broker-1) ├── Partition-1 (Broker-2) └── Partition-2 (Broker-3)分区的关键特性特性说明分区内有序同一 Partition 的消息按 Offset 严格有序分区间无序不同 Partition 之间没有顺序保证水平扩展增加 Partition 数量可线性提升吞吐量并行消费多个 Consumer 可并行消费不同 Partition分区数与消费者数的关系消费者数 ≤ Partition 数部分消费者消费多个 Partition消费者数 Partition 数最佳并行度消费者数 Partition 数部分消费者空闲浪费资源3.3 副本机制与 ISR每个 Partition 可以有多个副本Replica分布在不同的 Broker 上Partition-0 ├── Leader (Broker-1) ← 处理所有读写请求 ├── Follower-1 (Broker-2) ← 同步 Leader 数据 └── Follower-2 (Broker-3) ← 同步 Leader 数据ISRIn-Sync ReplicasISR 是与 Leader 保持同步的副本集合同步判定标准在replica.lag.time.max.ms默认 30s内未落后 Leader只有 ISR 中的副本才有资格在 Leader 故障时被选举为新 Leader数据写入流程1. Producer 发送消息到 Leader 2. Leader 写入本地日志 3. Follower 从 Leader 拉取并写入本地 4. Follower 向 Leader 返回 ACK 5. Leader 收到 ISR 中过半副本的 ACK 后向 Producer 返回成功3.4 Leader 选举当 Leader 副本所在 Broker 宕机时Kafka 会从 ISR 中选举新的 LeaderController 检测到 Leader 失效从 ISR 中选择一个新的副本作为 Leader通知所有相关 Broker 更新元数据Producer 和 Consumer 自动重定向到新 Leader四、为什么 Kafka 这么快Kafka 能够实现百万级 TPS依赖以下核心设计4.1 顺序写入Kafka 将消息追加写入磁盘Append-Only避免了随机 I/O磁盘顺序写入速度 ≈ 内存随机写入速度 磁盘随机写入速度4.2 零拷贝Zero-Copy传统数据传输磁盘 → 内核缓冲区 → 用户空间 → Socket 缓冲区 → 网卡4次拷贝Kafka 使用sendfile()系统调用磁盘 → 内核缓冲区 → 网卡2次拷贝4.3 页缓存Page CacheKafka 不自己管理内存而是利用操作系统的 Page Cache消息写入时直接进入 Page Cache由 OS 异步刷盘消息读取时优先从 Page Cache 读取命中率高4.4 批量与压缩批量发送Producer 可以攒一批消息后一次性发送批量压缩支持 Snappy、LZ4、Zstd 等压缩算法减少网络传输4.5 分段存储每个 Partition 由多个 Log Segment 组成partition-0/ ├── 00000000000000000000.log ← 第 0 ~ 1000 条消息 ├── 00000000000000000000.index ← 偏移量索引 ├── 00000000000000000000.timeindex← 时间戳索引 ├── 00000000000000368769.log ← 第 1001 ~ 2000 条消息 └── ...每个 Segment 默认 1GB便于管理和清理通过稀疏索引快速定位消息五、消费者组与 Rebalance5.1 消费者组机制Consumer Group 是 Kafka 实现并行消费和消息广播的核心Topic: order-topic (3 Partitions) Consumer Group A (订单处理): ├── Consumer-A1 → Partition-0 ├── Consumer-A2 → Partition-1 └── Consumer-A3 → Partition-2 Consumer Group B (数据分析): ├── Consumer-B1 → Partition-0 ├── Consumer-B2 → Partition-1 └── Consumer-B3 → Partition-2同一个 Group 内一个 Partition 只能被一个 Consumer 消费负载均衡不同 Group 之间每个 Group 都能消费完整的 Topic消息广播5.2 Rebalance再平衡当 Consumer Group 中的成员发生变化时会触发 Rebalance触发条件新 Consumer 加入 GroupConsumer 宕机或主动退出Consumer 心跳超时session.timeout.ms消费处理超时max.poll.interval.msTopic 分区数变化分区分配策略策略说明Range按分区序号范围分配默认RoundRobin轮询分配所有分区Sticky粘性分配最小化重平衡时的分区移动CooperativeSticky协作式粘性分配避免全量停止注意Rebalance 期间 Consumer 无法消费消息频繁 Rebalance 会影响性能。六、常见应用场景6.1 日志收集Kafka 最初就是为日志收集而生的。将各个服务的日志统一发送到 Kafka再由下游系统处理App Logs → Kafka (log-topic) → Elasticsearch → Kibana ↓ Hive (离线分析)优势解耦日志产生与存储高吞吐应对突发流量日志可重放修复 ETL 逻辑后重新导入6.2 异步解耦与削峰填谷将耗时操作异步化主流程无需等待用户下单 → 订单服务 → Kafka (order-topic) ├── 库存服务扣减库存 ├── 积分服务发放积分 └── 通知服务发送短信/邮件优势主流程响应时间缩短下游系统按需消费避免被流量冲垮系统间完全解耦独立扩展6.3 实时数据管道ETLKafka 作为数据管道的中心连接各个数据源和数据目的地MySQL (CDC) → Debezium → Kafka → Kafka Streams → ClickHouse / S3 ↓ Flink (实时计算)优势统一的数据入口和出口支持多种数据源和数据目的地数据可回溯、可重放6.4 流式处理结合 Kafka Streams 或 Flink实现实时数据处理// 实时统计每小时订单金额KStreamString,OrderEventordersbuilder.stream(db.orders);KTableWindowedString,DoublehourlyRevenueorders.filter((k,v)-v.getStatus().equals(PAID)).groupBy((k,v)-v.getStoreId()).windowedBy(TimeWindows.of(Duration.ofHours(1))).aggregate(()-0.0,(storeId,order,total)-totalorder.getAmount());典型应用实时排行榜实时风控实时监控告警实时用户画像6.5 事件驱动架构在微服务架构中服务间通过事件通信订单服务 → [订单创建事件] → Kafka ├── 库存服务 → [库存扣减事件] ├── 支付服务 → [支付完成事件] └── 物流服务 → [发货事件]优势服务间完全解耦支持事件溯源Event Sourcing最终一致性保证七、分布式安装部署1. 集群规划在已有的hadoop1、hadoop2、hadoop3上部署 kafka_2.12-3.5.12. 下载安装下载地址https://kafka.apache.org/community/downloads/#351下载完后上传到/opt/software目录再 tar 解压到/opt/modulecd /opt/software tar -zxvf kafka_2.12-3.5.1.tgz -C /opt/module/3. 配置和分发进入解压目录修改配置文件server.propertiescd/opt/module/kafka_2.12-3.5.1/config/vimserver.properties输入下面内容注意先注释掉已有的同名配置#broker的全局唯一编号不能重复只能是数字。broker.id1#broker对外暴露的IP和端口 每个节点单独配置advertised.listenersPLAINTEXT://hadoop1:9092#处理网络请求的线程数量num.network.threads3#用来处理磁盘IO的线程数量num.io.threads8#发送套接字的缓冲区大小socket.send.buffer.bytes102400#接收套接字的缓冲区大小socket.receive.buffer.bytes102400#请求套接字的缓冲区大小socket.request.max.bytes104857600#kafka运行日志(数据)存放的路径路径不需要提前创建kafka自动帮你创建可以配置多个磁盘路径路径与路径之间可以用分隔log.dirs/opt/module/kafka_2.12-3.5.1/datas#topic在当前broker上的分区个数num.partitions1#用来恢复和清理data下数据的线程数量num.recovery.threads.per.data.dir1# 每个topic创建时的副本数默认时1个副本offsets.topic.replication.factor1#segment文件保留的最长时间超时将被删除log.retention.hours168#每个segment文件的大小默认最大1Glog.segment.bytes1073741824# 检查过期数据的时间默认5分钟检查一次是否数据过期log.retention.check.interval.ms300000#配置连接Zookeeper集群地址在zk根目录下创建/kafka方便管理zookeeper.connecthadoop1:2181,hadoop2:2181,hadoop3:2181/kafka分发安装包cd/opt/module xsync kafka_2.12-3.5.1分别在其它hadoop2、hadoop3修改配置文件server.properties中的broker.id和advertised.listeners不能重复6. 配置环境变量sudovim/etc/profile.d/my_env.sh在文件末尾增加如下内容#KAFKA_HOMEexportKAFKA_HOME/opt/module/kafka_2.12-3.5.1exportPATH$PATH:$KAFKA_HOME/bin分发环境变量文件再刷新环境变量sudo/home/hadoop/bin/xsync /etc/profile.d/my_env.shsource/etc/profile4. 集群启停脚本在/home/hadoop/bin目录下创建脚本cd~/binvimkf.sh输入如下内容#! /bin/bashcase$1instart){foriinhadoop1 hadoop2 hadoop3doecho --------启动$iKafka-------ssh$i/opt/module/kafka_2.12-3.5.1/bin/kafka-server-start.sh -daemon /opt/module/kafka_2.12-3.5.1/config/server.propertiesdone};;stop){foriinhadoop1 hadoop2 hadoop3doecho --------停止$iKafka-------ssh$i/opt/module/kafka_2.12-3.5.1/bin/kafka-server-stop.sh done};;esac增加执行权限起停集群kafka依赖zookeeper服务必须先启动zookeeperchmodx kf.sh zk.sh start kf.sh start kf.sh stop zk.sh stop八、 常用终端命令1. 服务启停命令脚本名称完整示例命令功能作用kafka-server-start.shbin/kafka-server-start.sh config/server.properties前台启动 Kafka Broker 服务kafka-server-stop.shbin/kafka-server-stop.sh优雅关闭 Kafka 服务zookeeper-server-start.shbin/zookeeper-server-start.sh config/zookeeper.properties启动内置 ZookeeperKRaft 版本无需zookeeper-server-stop.shbin/zookeeper-server-stop.sh关闭内置 Zookeeper2. Topic 主题管理脚本名称完整示例命令功能作用kafka-topics.sh创建 Topicbin/kafka-topics.sh --create --bootstrap-server 127.0.0.1:9092 --topic test_topic --partitions 3 --replication-factor 2创建主题指定分区、副本数kafka-topics.sh查看所有 Topicbin/kafka-topics.sh --list --bootstrap-server 127.0.0.1:9092列出集群全部主题kafka-topics.sh查看 Topic 详情bin/kafka-topics.sh --describe --bootstrap-server 127.0.0.1:9092 --topic test_topic查看分区 Leader、副本、ISR 同步列表kafka-topics.sh扩容分区bin/kafka-topics.sh --alter --bootstrap-server 127.0.0.1:9092 --topic test_topic --partitions 5增加分区不支持减少分区kafka-topics.sh删除 Topicbin/kafka-topics.sh --delete --bootstrap-server 127.0.0.1:9092 --topic test_topic删除主题需开启delete.topic.enabletrue3. 控制台生产者发送消息脚本名称完整示例命令功能作用kafka-console-producer.shbin/kafka-console-producer.sh --bootstrap-server 127.0.0.1:9092 --topic test_topic简易控制台发送普通消息kafka-console-producer.sh带 Key 发送bin/kafka-console-producer.sh --bootstrap-server 127.0.0.1:9092 --topic test_topic --property parse.keytrue --property key.separator,发送带消息 Key 的数据逗号分隔 key/value4. 控制台消费者消费消息脚本名称完整示例命令功能作用kafka-console-consumer.shbin/kafka-console-consumer.sh --bootstrap-server 127.0.0.1:9092 --topic test_topic实时消费新产生消息kafka-console-consumer.shbin/kafka-console-consumer.sh --bootstrap-server 127.0.0.1:9092 --topic test_topic --from-beginning从头消费该 Topic 所有历史消息kafka-console-consumer.sh指定消费组bin/kafka-console-consumer.sh --bootstrap-server 127.0.0.1:9092 --topic test_topic --group test_group使用自定义消费组持久化 offsetkafka-console-consumer.sh打印 KeyValuebin/kafka-console-consumer.sh --bootstrap-server 127.0.0.1:9092 --topic test_topic --from-beginning --property print.keytrue --property key.separator:消费时同时输出消息 Key 与 Value5. 消费组 Offset 偏移量管理脚本名称完整示例命令功能作用kafka-consumer-groups.sh查看全部消费组bin/kafka-consumer-groups.sh --bootstrap-server 127.0.0.1:9092 --list列出集群内所有消费组名称kafka-consumer-groups.sh查看消费组详情bin/kafka-consumer-groups.sh --bootstrap-server 127.0.0.1:9092 --group test_group --describe查看分区 offset、消息堆积 LAG、分配节点kafka-consumer-groups.sh重置 offset 到最早bin/kafka-consumer-groups.sh --bootstrap-server 127.0.0.1:9092 --group test_group --topic test_topic --reset-offsets --to-earliest --execute偏移量重置至分区第一条消息需停消费程序kafka-consumer-groups.sh重置 offset 到最新bin/kafka-consumer-groups.sh --bootstrap-server 127.0.0.1:9092 --group test_group --topic test_topic --reset-offsets --to-latest --execute偏移量重置至最新消息跳过历史数据kafka-consumer-groups.sh删除消费组bin/kafka-consumer-groups.sh --bootstrap-server 127.0.0.1:9092 --group test_group --delete删除无活跃消费者、无 offset 的空消费组九、Kafka 与其他消息队列对比对比维度KafkaRabbitMQRocketMQ定位分布式流平台传统消息队列金融级消息队列吞吐量百万级/秒万级/秒十万级/秒延迟毫秒级微秒级毫秒级消息留存支持可配置保留时间消费即删除支持顺序保证Partition 内有序单队列有序队列内有序协议自定义二进制AMQP自定义适用场景日志、事件流、流处理任务队列、RPC金融、电商选型建议要高吞吐、可重放、流处理 →Kafka要低延迟、复杂路由、AMQP →RabbitMQ要金融级可靠、事务消息 →RocketMQ十、总结Kafka 的核心价值可以用一句话概括高吞吐 持久化 流处理。核心能力支撑的应用场景高吞吐、低延迟日志收集、实时数据管道持久化存储消息重放、事件溯源分区并行水平扩展、并行消费副本机制数据可靠性、高可用消费者组负载均衡、消息广播Kafka Streams实时流处理Kafka 已经成为现代分布式系统的标配基础设施尤其在大数据、实时计算、微服务架构中扮演着核心角色。参考资源Apache Kafka 官方文档Kafka 设计原理