(课堂笔记)Kafka + Flume 完整实战
2026/6/1 19:02:08 网站建设 项目流程

本笔记系统梳理Kafka核心知识及Flume整合方案。


重点包含:

1)Kafka核心组件(Topic/Partition/Consumer Group)及消息队列特性;

2)环境搭建与基础命令操作;

3)Flume-Kafka集成配置详解,强调Topic一致性等关键参数;

4)三窗口实战演示日志采集流程;

5)典型问题排查指南,如连接拒绝、Topic不匹配等。


笔记突出"配置文件逐项检查"、"保持Topic一致"等实战经验,涵盖从理论到排错的完整知识链,配有结构化命令示例和配置模板,适合作为分布式消息中间件开发的速查手册。

基于我们之前的全部问答和实际操作经验,我为你重新整理一份完整的 Kafka 课堂笔记,融入实战中遇到的问题和解决方案,方便日后复习。


Kafka 课堂笔记(完整实战版)

一、Kafka 概述

1.1 什么是 Kafka?

Kafka 是一个分布式的、基于发布/订阅模式的消息队列系统。

数据流转过程

text

生产者 → 消息队列(MQ) → 消费者

1.2 消息队列的作用

作用说明
解耦生产者和消费者可独立变化,只需遵循相同接口
缓冲生产速率和消费速率可以不一致
峰值处理高并发时数据先入队列,后期慢慢消费
异步通信任务拆分为多个步骤并行执行
可恢复性部分组件失效不影响整体

1.3 消息队列的两种模式

模式特点消费后数据
点对点一个生产者 → 一个消费者通常移除
发布/订阅一个生产者 → 多个消费者保留在队列中

二、Kafka 核心角色

角色说明
Producer消息生产者,发送数据到 Kafka
Consumer消息消费者,从 Kafka 读取数据
Consumer Group (CG)多个消费者组成一个组,共同消费 Topic
Broker一台 Kafka 服务器就是一个 Broker
Topic消息队列(逻辑概念)
PartitionTopic 的分区,用于水平扩展
Replication副本,保证数据可靠性
Leader分区的主节点,负责读写
Follower分区的从节点,同步 Leader 数据

三、环境搭建

3.1 启动前置服务

bash

# 启动 ZooKeeper(Kafka 依赖) cd $ZOOKEEPER_HOME bin/zkServer.sh start # 启动 Kafka cd $KAFKA_HOME bin/kafka-server-start.sh -daemon config/server.properties # 验证启动 jps | grep -E "Kafka|QuorumPeerMain"

3.2 Kafka 配置说明

bash

# 查看监听配置 cat $KAFKA_HOME/config/server.properties | grep listeners
  • listeners被注释时,默认监听0.0.0.0:9092(所有网络接口)

  • localhost:9092node100:9092都可以连接(指向同一台机器)


四、Kafka 基本命令

4.1 Topic 操作命令

bash

# 创建 Topic bin/kafka-topics.sh --bootstrap-server localhost:9092 \ --create --topic 主题名 --partitions 1 --replication-factor 1 # 查看所有 Topic bin/kafka-topics.sh --bootstrap-server localhost:9092 --list # 查看 Topic 详情 bin/kafka-topics.sh --bootstrap-server localhost:9092 \ --describe --topic 主题名 # 删除 Topic bin/kafka-topics.sh --bootstrap-server localhost:9092 \ --delete --topic 主题名 # 修改 Topic(如增加分区) bin/kafka-topics.sh --bootstrap-server localhost:9092 \ --alter --topic 主题名 --partitions 3

4.2 生产与消费消息

bash

# 生产者(发送消息) bin/kafka-console-producer.sh --broker-list localhost:9092 --topic 主题名 # 消费者(接收消息) bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic 主题名 # 消费者(从头开始消费历史消息) bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 \ --topic 主题名 --from-beginning

五、Flume + Kafka 整合

5.1 为什么要用 Flume?

场景Kafka 直接通信Flume + Kafka
数据来源手动输入自动监控文件/目录
人工介入每次都需要配置后自动运行
适用场景测试、调试生产环境日志采集

5.2 环境准备

bash

# 创建日志文件目录 mkdir -p /opt/module/flume/data/ # 创建日志文件 touch /opt/module/flume/data/flume.log # 创建 Flume 配置目录 mkdir -p /opt/module/flume/job/

5.3 Flume 配置文件详解

文件位置/opt/module/flume/job/flume-kafka.conf

properties

# ========== 1. 定义组件名称 ========== a1.sources = r1 # Source 名称 a1.sinks = k1 # Sink 名称 a1.channels = c1 # Channel 名称 # ========== 2. 配置 Source(数据来源) ========== a1.sources.r1.type = exec # 类型:执行Linux命令 a1.sources.r1.command = tail -f /opt/module/flume/data/flume.log # 监控文件 # ========== 3. 配置 Sink(数据去向) ========== a1.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink # 类型:Kafka a1.sinks.k1.kafka.bootstrap.servers = localhost:9092 # Kafka地址 a1.sinks.k1.kafka.topic = a2608 # ⚠️ Topic名称(关键!) a1.sinks.k1.kafka.flumeBatchSize = 20 # 批量发送大小 a1.sinks.k1.kafka.producer.acks = 1 # 确认级别 a1.sinks.k1.kafka.producer.linger.ms = 1 # 延迟时间 # ========== 4. 配置 Channel(缓冲区) ========== a1.channels.c1.type = memory # 类型:内存 a1.channels.c1.capacity = 1000 # 最大存储量 a1.channels.c1.transactionCapacity = 100 # 事务处理量 # ========== 5. 绑定组件 ========== a1.sources.r1.channels = c1 # Source → Channel a1.sinks.k1.channel = c1 # Sink ← Channel

5.4 配置文件关键点(常见错误)

配置项常见错误正确做法
kafka.topic与其他地方不一致必须与消费者监听的 Topic 相同
bootstrap.servers地址写错确保 Kafka 能访问到
command路径文件路径错误确保日志文件存在

5.5 启动 Flume

bash

cd /opt/module/flume bin/flume-ng agent -c conf -n a1 -f job/flume-kafka.conf -Dflume.root.logger=INFO,console

六、完整实战流程(三个窗口)

6.1 窗口布局

窗口角色作用
窗口1Flume监控日志文件,发送到 Kafka
窗口2Kafka 消费者显示接收到的消息
窗口3数据生产者向日志文件写入测试数据

6.2 操作顺序

第一步:创建 Topic

bash

# 任意窗口 cd $KAFKA_HOME bin/kafka-topics.sh --bootstrap-server localhost:9092 \ --create --topic a2608 --partitions 1 --replication-factor 1
第二步:窗口2 - 启动消费者

bash

cd $KAFKA_HOME bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic a2608 # 光标停住不动 = 正常等待数据
第三步:窗口1 - 启动 Flume

bash

cd /opt/module/flume bin/flume-ng agent -c conf -n a1 -f job/flume-kafka.conf -Dflume.root.logger=INFO,console # 看到 "Started" 日志 = 启动成功
第四步:窗口3 - 写入测试数据

bash

echo "测试消息" >> /opt/module/flume/data/flume.log
第五步:观察结果
  • 窗口2 应显示测试消息


七、常见问题与排错

7.1 问题速查表

现象可能原因解决方法
消费者启动报Connection refusedKafka 未启动启动 Kafka
消费者启动报No such file or directory不在 Kafka 目录cd $KAFKA_HOME
消费者光标停住,但收不到消息Topic 名称不一致检查 Flume 配置中的 topic
Flume 启动后很快退出配置文件语法错误检查配置文件
Flume 启动但没有tail -f日志Source 配置错误检查typecommand
写入文件后消费者无反应Flume 未运行检查 Flume 进程

7.2 排错命令

bash

# 1. 查看 Kafka 中的 Topic 列表 bin/kafka-topics.sh --bootstrap-server localhost:9092 --list # 2. 测试 Kafka 直接通信(绕过 Flume) bin/kafka-console-producer.sh --broker-list localhost:9092 --topic a2608 # 输入消息,看消费者能否收到 # 3. 查看 Flume 进程 ps aux | grep flume # 4. 查看日志文件内容 cat /opt/module/flume/data/flume.log # 5. 检查配置文件关键项 cat /opt/module/flume/job/flume-kafka.conf | grep -E "topic|command|bootstrap"

7.3 核心原则:保持 Topic 一致

text

Flume 配置中的 topic = Kafka 消费者监听的 topic

不一致时的后果

  • Flume 把数据发到 Topic A

  • 消费者从 Topic B 读取

  • 消费者永远收不到消息 ❌


八、概念补充

8.1 本机环回地址(Loopback)

地址说明
127.0.0.1IPv4 环回地址,指向本机
localhost域名,解析为127.0.0.1
特点数据不经过网卡,内部直接传递,速度快

8.2 常用命令的幂等性

命令多次执行结果
mkdir -p目录存在时不报错,静默跳过
touch文件存在时只更新时间戳,不覆盖内容
echo >>追加内容,不覆盖原有内容

九、学习要点总结

9.1 你已掌握的技能

✅ Kafka 核心概念(Topic、Producer、Consumer、Broker)
✅ Kafka 基本命令(创建、查看、删除 Topic,生产、消费消息)
✅ Flume 配置结构(Source、Channel、Sink 及绑定关系)
✅ Flume + Kafka 整合实现日志自动采集
✅ 常见问题的排查方法

9.2 关键经验

  1. 配置文件中的每一行都有意义,不要忽视

  2. 排错时逐项检查每个参数是否正确

  3. Topic 名称必须一致是最常见的错误

  4. 理解后再修改,不要盲目复制粘贴


十、笔记说明

本笔记基于课堂内容和实际操作中的问题总结而成,记录了从零开始的完整学习过程,包含:

  • 理论概念

  • 命令操作

  • 配置详解

  • 实战流程

  • 常见错误及解决方案

适合作为日常复习和实验参考。

需要专业的网站建设服务?

联系我们获取免费的网站建设咨询和方案报价,让我们帮助您实现业务目标

立即咨询