Apache Airflow 系列教程 | 第22课:Kubernetes 集成深度实践
2026/5/10 7:07:59 网站建设 项目流程

导读(Introduction)

欢迎来到 Apache Airflow 源码深度解析系列的第二十二课。

在前一课中,我们学习了如何通过云服务 Provider 集成 AWS、GCP、Azure 等外部服务。本课将聚焦于 Airflow 与 Kubernetes 的深度集成——这是现代数据平台中最重要的基础设施融合之一。

Kubernetes 为 Airflow 带来了真正的弹性伸缩能力。传统的 CeleryExecutor 需要预先分配固定数量的 Worker 节点,而 KubernetesExecutor 则采用"Pod 即 Worker"的设计哲学——每个任务在独立的 Pod 中执行,用完即销毁。这意味着你可以实现零闲置资源、完全隔离的任务执行,以及按需伸缩的集群容量。

本课将深入providers/cncf/kubernetes/源码,从 KubernetesExecutor 的调度循环、KubernetesPodOperator 的生命周期管理、到 Pod 模板的三层合并机制,全面揭示 Airflow 如何在 Kubernetes 之上构建弹性工作流执行引擎。


学习目标(Learning Objectives)

完成本课学习后,你将能够:

  1. 理解 KubernetesExecutor 的工作原理——掌握"Pod 即 Worker"的核心设计,以及任务从入队到 Pod 创建再到结果回收的完整生命周期
  2. 深入 KubernetesPodOperator 的实现——理解在独立 Pod 中执行任务的同步/异步模式、Pod 重连、XCom 传递机制
  3. 掌握 Pod 模板与三层合并机制——理解 pod_template_file、动态 Pod、executor_config 三者的优先级与合并逻辑
  4. 分析 KubernetesHook 的连接管理——理解 in_cluster、kubeconfig 等多种认证方式的优先级链
  5. 理解 Pod 生命周期监控——KubernetesJobWatcher 的 Watch 机制、资源版本跟踪、失败检测
  6. 掌握 Kubernetes Secret 处理——环境变量注入与 Volume 挂载两种密钥分发方式
  7. 了解生产环境最佳实践——资源限制、命名空间隔离、日志收集、Pod 清理策略

正文内容(Main Content)

1. KubernetesExecutor:Pod 即 Worker 的设计哲学

1.1 传统 Executor 的局限

在理解 KubernetesExecutor 之前,让我们回顾传统执行器的局限性:

Executor 类型局限性
LocalExecutor单机运行,无法水平扩展
CeleryExecutor需要预部署 Worker 节点,资源利用率低
SequentialExecutor串行执行,仅适合测试

KubernetesExecutor的核心理念是将 Kubernetes 集群本身作为弹性计算资源池:

  • 按需创建:每个 Task Instance 独立在一个 Pod 中执行
  • 完全隔离:不同任务之间没有进程级别的相互影响
  • 弹性伸缩:Pod 数量随任务负载自动伸缩,无需预分配 Worker
  • 异构资源:不同任务可请求不同的 CPU/内存/GPU 资源
1.2 整体架构概览

KubernetesExecutor 的架构包含以下核心组件:

┌───────────────────────────────────────────────────────┐ │ Scheduler │ │ ┌──────────────────────────────────────────────────┐ │ │ │ KubernetesExecutor │ │ │ │ ┌────────────┐ ┌────────────┐ ┌───────────┐ │ │ │ │ │ task_queue │ │result_queue│ │ running │ │ │ │ │ └─────┬──────┘ └─────▲──────┘ └───────────┘ │ │ │ │ │ │ │ │ │ │ ▼ │ │ │ │ │ ┌──────────────────────┴───────────────────────┐ │ │ │ │ │ AirflowKubernetesScheduler │ │ │ │ │ │ ┌──────────────┐ ┌────────────────────┐ │ │ │ │ │ │ │ run_next() │ │ KubernetesJobWatcher│ │ │ │ │ │ │ │ (Pod创建) │ │ (Pod事件监听) │ │ │ │ │ │ │ └──────┬───────┘ └──────────▲─────────┘ │ │ │ │ │ └─────────┼───────────────────────┼────────────┘ │ │ │ └────────────┼───────────────────────┼──────────────┘ │ └───────────────┼───────────────────────┼────────────────┘ │ │ ▼ │ ┌───────────────────────────────────────────────────────┐ │ Kubernetes API Server │ │ ┌─────────┐ ┌─────────┐ ┌─────────┐ ┌─────────┐ │ │ │Worker Pod│ │Worker Pod│ │Worker Pod│ │Worker Pod│ │ │ └─────────┘ └─────────┘ └─────────┘ └─────────┘ │ └───────────────────────────────────────────────────────┘
1.3 源码解析:KubernetesExecutor 类

KubernetesExecutor 继承自BaseExecutor,是整个 Kubernetes 集成的核心调度器。让我们看其初始化逻辑:

# providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/executors/kubernetes_executor.pyclassKubernetesExecutor(BaseExecutor):"""Executor for Kubernetes."""RUNNING_POD_LOG_LINES=100supports_ad_hoc_ti_run:bool=Truesupports_multi_team:bool=Truedef__init__(self,*args,**kwargs):super().__init__(*args,**kwargs)# 团队感知配置支持(AIP-67 Multi-Team 特性)ifnothasattr(self,"conf")ornothasattr(self.conf,"getint"):self.conf=conf self.kube_config=KubeConfig(executor_conf=self.conf)self.parallelism=self.kube_config.parallelism# 核心队列:基于 multiprocessing.Manager 的进程安全队列self._manager=multiprocessing.Manager()self.task_queue:Queue[KubernetesJob]=self._manager.JoinableQueue()self.result_queue:Queue[KubernetesResults]=self._manager.JoinableQueue()self.kube_scheduler:AirflowKubernetesScheduler|None=Noneself.kube_client:client.CoreV1Api|None=Noneself.task_publish_retries:Counter[TaskInstanceKey]=Counter()self.task_publish_max_retries=self.conf.getint("kubernetes_executor","task_publish_max_retries",fallback=0)

关键设计要点:

  • 双队列架构task_queue接收待执行任务,result_queue接收执行结果
  • 进程安全:使用multiprocessing.Manager().JoinableQueue()保证多进程安全
  • Multi-Team 支持supports_multi_team = True表示支持 AIP-67 多团队配置隔离
  • 发布重试task_publish_retries跟踪 Pod 创建失败的重试次数
1.4 启动流程:start() 方法
defstart(self)->None:"""Start the executor."""self.log.info("Start Kubernetes executor")self.scheduler_job_id=str(self.job_id)fromairflow.providers.cncf.kubernetes.executors.kubernetes_executor_utilsimport(AirflowKubernetesScheduler,)fromairflow.providers.cncf.kubernetes.kube_clientimportget_kube_client self.kube_client=get_kube_client()self.kube_scheduler=AirflowKubernetesScheduler(kube_config=self.kube_config,result_queue=self.result_queue,kube_client=self.kube_client,scheduler_job_id=self.scheduler_job_id,)

启动时创建两个关键组件:

  1. kube_client:通过get_kube_client()获取 Kubernetes API 客户端
  2. kube_schedulerAirflowKubernetesScheduler负责实际的 Pod 创建和监控
1.5 核心调度循环:sync() 方法

sync()是 KubernetesExecutor 的心跳方法,由 Scheduler 定期调用:

defsync(self)->None:"""Synchronize task state."""# 1. 定期检查是否需要收养已完成的孤儿 Podadoption_interval=conf.getfloat("scheduler","orphaned_tasks_check_interval",fallback=300.0)now=time.monotonic()ifnow-self._last_completed_pod_adoption>=adoption_interval:self._last_completed_pod_adoption=now self._adopt_completed_pods(self.kube_client)# 2. 同步 kube_scheduler 状态(处理 Watcher 事件)self.kube_scheduler.sync()# 3. 从 result_queue 中获取执行结果,更新任务状态last_resource_version:dict[str,str]=defaultdict(lambda:"0")withcontextlib.suppress(Empty):whileTrue:results=self.result_queue.get_nowait()try:last_resource_version[results.namespace]=results.resource_version self._change_state(results)exceptExceptionase:self.result_queue.put(results)# 重新入队finally:self.result_queue.task_done()# 4. 批量从 task_queue 创建 Podwithcontextlib.suppress(Empty):for_inrange(self.kube_config.worker_pods_creation_batch_size):task=self.task_queue.get_nowait()try:self.kube_scheduler.run_next(task)exceptApiExceptionase:# 处理配额超限、限流等情况,支持重试ifcan_retry_publish:self.task_queue.put(task)self.task_publish_retries[key]=retries+1else:self.fail(key,e)

sync()的工作流程可以概括为四个步骤:

  1. 孤儿 Pod 收养:处理前一个 Scheduler 遗留的未完成 Pod
  2. 事件同步:从 KubernetesJobWatcher 获取 Pod 状态变更事件
  3. 结果处理:从 result_queue 中读取已完成任务的结果
  4. Pod 创建:批量从 task_queue 中取出任务并创建对应的 Worker Pod

注意限流保护:当收到 HTTP 429 响应时,设置create_pods_after时间戳,暂停 Pod 创建直到Retry-After时间过后。

1.6 任务入队:execute_async() 与 queue_workload()

当 Scheduler 决定执行某个任务时,调用execute_async()将任务放入队列:

defexecute_async(self,key,command,queue=None,executor_config=None):"""Execute task asynchronously."""# 从 executor_config 中提取 Pod 覆盖配置kube_executor_config=PodGenerator.from_obj(executor_config)pod_template_file=executor_config.get("pod_template_file",None)ifexecutor_configelseNone# 标记任务为 QUEUED 状态self.event_buffer[key]=(TaskInstanceState.QUEUED,self.scheduler_job_id)# 放入 task_queue,等待 sync() 方法消费self.task_queue.put(KubernetesJob(key,command,kube_executor_config,pod_template_file))

在 Airflow 3.x 中,新增了queue_workload()方法支持 workload 模式:

defqueue_workload(self,workload:workloads.All,session:Session|None)->None:ifnotisinstance(workload,workloads.ExecuteTask):raiseRuntimeError(f"{type(self)}cannot handle workloads of type{type(workload)}")ti=workload.ti self.queued_tasks[ti.key]=workload

2. AirflowKubernetesScheduler:Pod 创建与监控的核心

2.1 职责与初始化

AirflowKubernetesScheduler是 KubernetesExecutor 的"内部调度器",负责实际的 Pod 操作:

# providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/executors/kubernetes_executor_utils.pyclassAirflowKubernetesScheduler(LoggingMixin):"""Airflow Scheduler for Kubernetes."""def__init__(self,kube_config,result_queue,kube_client,scheduler_job_id):self.kube_config=kube_config self.result_queue=result_queue self.namespace=self.kube_config.kube_namespace self.kube_client=kube_client self._manager=multiprocessing.Manager()self.watcher_queue=self._manager.Queue()self.scheduler_job_id=scheduler_job_id# 创建 KubernetesJobWatcher 进程self.kube_watchers=self._make_kube_watchers()
2.2 Pod 创建:run_next() 方法

run_next()是将任务转化为 Kubernetes Pod 的核心方法:

defrun_next(self,next_job:KubernetesJob)->None:"""Receives the next job to run, builds the pod, and creates it."""key=next_job.key command=next_job.command kube_executor_config=next_job.kube_executor_config pod_template_file=next_job.pod_template_file dag_id,task_id,run_id,try_number,map_index=key# Airflow 3.x: 将 workload 序列化为 Task SDK 命令iflen(command)==1andisinstance(command[0],ExecuteTask):workload=command[0]command=workload_to_command_args(workload)# 获取基础 Pod 模板base_worker_pod=get_base_pod_from_template(pod_template_file,self.kube_config)# 构建完整 Pod 规格(三层合并)pod=PodGenerator.construct_pod(namespace=self.namespace,scheduler_job_id=self.scheduler_job_id,pod_id=create_unique_id(dag_id,task_id),dag_id=dag_id,task_id=task_id,kube_image=self.kube_config.kube_image,try_number=try_number,map_index=map_index,date=None,run_id=run_id,args=list(command),pod_override_object=kube_executor_config,base_worker_pod=base_worker_pod,with_mutation_hook=True,)# 异步创建 Pod(不阻塞)self.run_pod_async(pod,**self.kube_config.kube_client_request_args)

workload_to_command_args()函数将 workload 转化为 Task SDK 命令:

defworkload_to_command_args(workload:workloads.ExecuteTask)->list[str

需要专业的网站建设服务?

联系我们获取免费的网站建设咨询和方案报价,让我们帮助您实现业务目标

立即咨询