本笔记系统梳理Kafka核心知识及Flume整合方案。
重点包含:
1)Kafka核心组件(Topic/Partition/Consumer Group)及消息队列特性;
2)环境搭建与基础命令操作;
3)Flume-Kafka集成配置详解,强调Topic一致性等关键参数;
4)三窗口实战演示日志采集流程;
5)典型问题排查指南,如连接拒绝、Topic不匹配等。
笔记突出"配置文件逐项检查"、"保持Topic一致"等实战经验,涵盖从理论到排错的完整知识链,配有结构化命令示例和配置模板,适合作为分布式消息中间件开发的速查手册。
基于我们之前的全部问答和实际操作经验,我为你重新整理一份完整的 Kafka 课堂笔记,融入实战中遇到的问题和解决方案,方便日后复习。
Kafka 课堂笔记(完整实战版)
一、Kafka 概述
1.1 什么是 Kafka?
Kafka 是一个分布式的、基于发布/订阅模式的消息队列系统。
数据流转过程:
text
生产者 → 消息队列(MQ) → 消费者
1.2 消息队列的作用
| 作用 | 说明 |
|---|---|
| 解耦 | 生产者和消费者可独立变化,只需遵循相同接口 |
| 缓冲 | 生产速率和消费速率可以不一致 |
| 峰值处理 | 高并发时数据先入队列,后期慢慢消费 |
| 异步通信 | 任务拆分为多个步骤并行执行 |
| 可恢复性 | 部分组件失效不影响整体 |
1.3 消息队列的两种模式
| 模式 | 特点 | 消费后数据 |
|---|---|---|
| 点对点 | 一个生产者 → 一个消费者 | 通常移除 |
| 发布/订阅 | 一个生产者 → 多个消费者 | 保留在队列中 |
二、Kafka 核心角色
| 角色 | 说明 |
|---|---|
| Producer | 消息生产者,发送数据到 Kafka |
| Consumer | 消息消费者,从 Kafka 读取数据 |
| Consumer Group (CG) | 多个消费者组成一个组,共同消费 Topic |
| Broker | 一台 Kafka 服务器就是一个 Broker |
| Topic | 消息队列(逻辑概念) |
| Partition | Topic 的分区,用于水平扩展 |
| Replication | 副本,保证数据可靠性 |
| Leader | 分区的主节点,负责读写 |
| Follower | 分区的从节点,同步 Leader 数据 |
三、环境搭建
3.1 启动前置服务
bash
# 启动 ZooKeeper(Kafka 依赖) cd $ZOOKEEPER_HOME bin/zkServer.sh start # 启动 Kafka cd $KAFKA_HOME bin/kafka-server-start.sh -daemon config/server.properties # 验证启动 jps | grep -E "Kafka|QuorumPeerMain"3.2 Kafka 配置说明
bash
# 查看监听配置 cat $KAFKA_HOME/config/server.properties | grep listenerslisteners被注释时,默认监听0.0.0.0:9092(所有网络接口)localhost:9092和node100:9092都可以连接(指向同一台机器)
四、Kafka 基本命令
4.1 Topic 操作命令
bash
# 创建 Topic bin/kafka-topics.sh --bootstrap-server localhost:9092 \ --create --topic 主题名 --partitions 1 --replication-factor 1 # 查看所有 Topic bin/kafka-topics.sh --bootstrap-server localhost:9092 --list # 查看 Topic 详情 bin/kafka-topics.sh --bootstrap-server localhost:9092 \ --describe --topic 主题名 # 删除 Topic bin/kafka-topics.sh --bootstrap-server localhost:9092 \ --delete --topic 主题名 # 修改 Topic(如增加分区) bin/kafka-topics.sh --bootstrap-server localhost:9092 \ --alter --topic 主题名 --partitions 34.2 生产与消费消息
bash
# 生产者(发送消息) bin/kafka-console-producer.sh --broker-list localhost:9092 --topic 主题名 # 消费者(接收消息) bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic 主题名 # 消费者(从头开始消费历史消息) bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 \ --topic 主题名 --from-beginning五、Flume + Kafka 整合
5.1 为什么要用 Flume?
| 场景 | Kafka 直接通信 | Flume + Kafka |
|---|---|---|
| 数据来源 | 手动输入 | 自动监控文件/目录 |
| 人工介入 | 每次都需要 | 配置后自动运行 |
| 适用场景 | 测试、调试 | 生产环境日志采集 |
5.2 环境准备
bash
# 创建日志文件目录 mkdir -p /opt/module/flume/data/ # 创建日志文件 touch /opt/module/flume/data/flume.log # 创建 Flume 配置目录 mkdir -p /opt/module/flume/job/5.3 Flume 配置文件详解
文件位置:/opt/module/flume/job/flume-kafka.conf
properties
# ========== 1. 定义组件名称 ========== a1.sources = r1 # Source 名称 a1.sinks = k1 # Sink 名称 a1.channels = c1 # Channel 名称 # ========== 2. 配置 Source(数据来源) ========== a1.sources.r1.type = exec # 类型:执行Linux命令 a1.sources.r1.command = tail -f /opt/module/flume/data/flume.log # 监控文件 # ========== 3. 配置 Sink(数据去向) ========== a1.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink # 类型:Kafka a1.sinks.k1.kafka.bootstrap.servers = localhost:9092 # Kafka地址 a1.sinks.k1.kafka.topic = a2608 # ⚠️ Topic名称(关键!) a1.sinks.k1.kafka.flumeBatchSize = 20 # 批量发送大小 a1.sinks.k1.kafka.producer.acks = 1 # 确认级别 a1.sinks.k1.kafka.producer.linger.ms = 1 # 延迟时间 # ========== 4. 配置 Channel(缓冲区) ========== a1.channels.c1.type = memory # 类型:内存 a1.channels.c1.capacity = 1000 # 最大存储量 a1.channels.c1.transactionCapacity = 100 # 事务处理量 # ========== 5. 绑定组件 ========== a1.sources.r1.channels = c1 # Source → Channel a1.sinks.k1.channel = c1 # Sink ← Channel5.4 配置文件关键点(常见错误)
| 配置项 | 常见错误 | 正确做法 |
|---|---|---|
kafka.topic | 与其他地方不一致 | 必须与消费者监听的 Topic 相同 |
bootstrap.servers | 地址写错 | 确保 Kafka 能访问到 |
command路径 | 文件路径错误 | 确保日志文件存在 |
5.5 启动 Flume
bash
cd /opt/module/flume bin/flume-ng agent -c conf -n a1 -f job/flume-kafka.conf -Dflume.root.logger=INFO,console六、完整实战流程(三个窗口)
6.1 窗口布局
| 窗口 | 角色 | 作用 |
|---|---|---|
| 窗口1 | Flume | 监控日志文件,发送到 Kafka |
| 窗口2 | Kafka 消费者 | 显示接收到的消息 |
| 窗口3 | 数据生产者 | 向日志文件写入测试数据 |
6.2 操作顺序
第一步:创建 Topic
bash
# 任意窗口 cd $KAFKA_HOME bin/kafka-topics.sh --bootstrap-server localhost:9092 \ --create --topic a2608 --partitions 1 --replication-factor 1第二步:窗口2 - 启动消费者
bash
cd $KAFKA_HOME bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic a2608 # 光标停住不动 = 正常等待数据第三步:窗口1 - 启动 Flume
bash
cd /opt/module/flume bin/flume-ng agent -c conf -n a1 -f job/flume-kafka.conf -Dflume.root.logger=INFO,console # 看到 "Started" 日志 = 启动成功第四步:窗口3 - 写入测试数据
bash
echo "测试消息" >> /opt/module/flume/data/flume.log第五步:观察结果
窗口2 应显示
测试消息✅
七、常见问题与排错
7.1 问题速查表
| 现象 | 可能原因 | 解决方法 |
|---|---|---|
消费者启动报Connection refused | Kafka 未启动 | 启动 Kafka |
消费者启动报No such file or directory | 不在 Kafka 目录 | cd $KAFKA_HOME |
| 消费者光标停住,但收不到消息 | Topic 名称不一致 | 检查 Flume 配置中的 topic |
| Flume 启动后很快退出 | 配置文件语法错误 | 检查配置文件 |
Flume 启动但没有tail -f日志 | Source 配置错误 | 检查type和command |
| 写入文件后消费者无反应 | Flume 未运行 | 检查 Flume 进程 |
7.2 排错命令
bash
# 1. 查看 Kafka 中的 Topic 列表 bin/kafka-topics.sh --bootstrap-server localhost:9092 --list # 2. 测试 Kafka 直接通信(绕过 Flume) bin/kafka-console-producer.sh --broker-list localhost:9092 --topic a2608 # 输入消息,看消费者能否收到 # 3. 查看 Flume 进程 ps aux | grep flume # 4. 查看日志文件内容 cat /opt/module/flume/data/flume.log # 5. 检查配置文件关键项 cat /opt/module/flume/job/flume-kafka.conf | grep -E "topic|command|bootstrap"7.3 核心原则:保持 Topic 一致
text
Flume 配置中的 topic = Kafka 消费者监听的 topic
不一致时的后果:
Flume 把数据发到 Topic A
消费者从 Topic B 读取
消费者永远收不到消息 ❌
八、概念补充
8.1 本机环回地址(Loopback)
| 地址 | 说明 |
|---|---|
127.0.0.1 | IPv4 环回地址,指向本机 |
localhost | 域名,解析为127.0.0.1 |
| 特点 | 数据不经过网卡,内部直接传递,速度快 |
8.2 常用命令的幂等性
| 命令 | 多次执行结果 |
|---|---|
mkdir -p | 目录存在时不报错,静默跳过 |
touch | 文件存在时只更新时间戳,不覆盖内容 |
echo >> | 追加内容,不覆盖原有内容 |
九、学习要点总结
9.1 你已掌握的技能
✅ Kafka 核心概念(Topic、Producer、Consumer、Broker)
✅ Kafka 基本命令(创建、查看、删除 Topic,生产、消费消息)
✅ Flume 配置结构(Source、Channel、Sink 及绑定关系)
✅ Flume + Kafka 整合实现日志自动采集
✅ 常见问题的排查方法
9.2 关键经验
配置文件中的每一行都有意义,不要忽视
排错时逐项检查每个参数是否正确
Topic 名称必须一致是最常见的错误
理解后再修改,不要盲目复制粘贴
十、笔记说明
本笔记基于课堂内容和实际操作中的问题总结而成,记录了从零开始的完整学习过程,包含:
理论概念
命令操作
配置详解
实战流程
常见错误及解决方案
适合作为日常复习和实验参考。