AI 大模型 API 稳定性实战:超时、重试和降级到底该怎么设计
2026/6/27 3:02:55
假设你是某智能家电公司的大数据工程师,负责处理100万台智能空调的实时数据。每台空调每秒发送5条数据(温度、湿度、耗电量、运行状态),每天产生43.2亿条数据。此时你面临三个致命问题:
这些问题不是某家公司的特例,而是物联网(IoT)数据处理的共性痛点:高并发、乱序、多源异构、实时+离线混合需求。而Kafka,正是解决这些问题的“神器”。
本文将结合智能空调的真实场景,讲解Kafka在物联网数据处理中的全流程应用:
dockerrun -d --name kafka -p9092:9092 -eKAFKA_ADVERTISED_LISTENERS=PLAINTEXT://localhost:9092 -eKAFKA_ZOOKEEPER_CONNECT=zookeeper:2181 -eKAFKA_CREATE_TOPICS="iot_temperature:10:1"wurstmeister/kafka(注:iot_temperature是预创建的温度数据主题,10个分区,1个副本)dockerrun -d --name emqx -p1883:1883 -p8083:8083 -p8084:8084 -p8883:8883 -p18083:18083 emqx/emqx:5.0.24目标:将智能空调的温度数据从MQTT broker转发到Kafka主题。
为什么?:MQTT是设备通信的轻量级协议,但不适合高并发数据传输;Kafka是高吞吐量的消息中间件,能承接100万台设备的并发数据。
智能空调通过MQTT协议向EMQ X发送温度数据,数据格式如下:
{"device_id":"ac_12345",// 设备ID"temperature":25.6,// 温度(℃)"humidity":50.2,// 湿度(%)"timestamp":1689012345// 数据生成时间戳(秒)}第一步:创建Kafka主题(用于存储温度数据):
dockerexec-it kafka kafka-topics.sh --create --topic iot_temperature --partitions10--replication-factor1--bootstrap-server localhost:9092(注:10个分区是为了支持高并发,每个分区可被不同消费者处理)
第二步:编写MQTT→Kafka转发程序(用Python):
依赖库:paho-mqtt(MQTT客户端)、kafka-python(Kafka生产者)
pipinstallpaho-mqtt kafka-python代码:
importjsonfrompaho.mqttimportclientasmqtt_clientfromkafkaimportKafkaProducer# MQTT配置MQTT_BROKER="localhost"MQTT_PORT=1883MQTT_TOPIC="ac/temperature"# 空调发送数据的MQTT主题# Kafka配置KAFKA_BROKER="localhost:9092"KAFKA_TOPIC="iot_temperature"# 目标Kafka主题# 初始化Kafka生产者(序列化JSON数据)producer=KafkaProducer(bootstrap_servers=KAFKA_BROKER,value_serializer=lambdav:json.dumps(v).encode("utf-8"))# MQTT连接回调defon_connect(client,userdata,flags,rc):print(f"MQTT连接成功,返回码:{rc}")client.subscribe(MQTT_TOPIC)# 订阅MQTT主题# MQTT消息回调(接收设备数据并转发到Kafka)defon_message(client,userdata,msg):try:# 解析MQTT消息(JSON格式)data=json.loads(msg.payload.decode())# 转发到Kafka(key用device_id,保证同一设备的数据进入同一分区)producer.send(topic=KAFKA_TOPIC,key=data["device_id"].encode("utf-8"),value=data)print(f"转发数据到Kafka成功:{data}")exceptExceptionase:print(f"转发失败:{e}")# 启动MQTT客户端defrun_mqtt_client():client=mqtt_client.Client()client.on_connect=on_connect client.on_me