PHP资源调度与任务编排系统
任务编排将多个任务组织成工作流,按依赖关系顺序执行。PHP可以实现简单的任务调度和编排系统。今天说说PHP中任务编排的实现。
任务编排的核心是DAG(有向无环图)。任务之间可以有依赖关系,只有依赖的任务完成之后才能执行当前任务。
```php
class Task
{
public string $id;
public string $name;
public callable $handler;
public array $dependencies = [];
public string $status = 'pending';
public mixed $result = null;
public ?string $error = null;
public function __construct(string $name, callable $handler)
{
$this->id = uniqid('task_', true);
$this->name = $name;
$this->handler = $handler;
}
public function dependsOn(Task $task): void
{
$this->dependencies[$task->id] = $task;
}
}
class Workflow
{
private string $name;
private array $tasks = [];
private array $executed = [];
private array $failed = [];
public function __construct(string $name)
{
$this->name = $name;
}
public function addTask(Task $task): void
{
$this->tasks[$task->id] = $task;
}
public function run(): array
{
echo "开始执行工作流: {$this->name}\n\n";
$ready = $this->getReadyTasks();
$round = 0;
while (!empty($ready)) {
$round++;
echo "=== 第 {$round} 轮 ===\n";
foreach ($ready as $task) {
$this->executeTask($task);
}
$ready = $this->getReadyTasks();
}
echo "\n工作流执行完成\n";
echo "成功: " . count($this->executed) . ", 失败: " . count($this->failed) . "\n";
return [
'success' => count($this->failed) === 0,
'executed' => $this->executed,
'failed' => $this->failed,
];
}
private function getReadyTasks(): array
{
$ready = [];
foreach ($this->tasks as $task) {
if ($task->status !== 'pending') continue;
$depsMet = true;
foreach ($task->dependencies as $depId => $dep) {
if ($dep->status !== 'completed') {
$depsMet = false;
break;
}
}
if ($depsMet) {
$ready[] = $task;
}
}
return $ready;
}
private function executeTask(Task $task): void
{
echo " 执行任务: {$task->name}\n";
try {
$task->status = 'running';
$task->result = ($task->handler)();
$task->status = 'completed';
$this->executed[] = $task->id;
echo " ✓ 完成\n";
} catch (\Exception $e) {
$task->status = 'failed';
$task->error = $e->getMessage();
$this->failed[] = $task->id;
echo " ✗ 失败: {$e->getMessage()}\n";
}
}
public function getStatus(): array
{
$status = [];
foreach ($this->tasks as $task) {
$status[] = [
'name' => $task->name,
'status' => $task->status,
'dependencies' => array_map(fn($d) => $d->name, $task->dependencies),
];
}
return $status;
}
}
// 构建数据处理工作流
$workflow = new Workflow('数据处理');
$extract = new Task('数据抽取', function () {
echo " 从数据库抽取数据...\n";
sleep(1);
return ['raw_data' => [1, 2, 3, 4, 5]];
});
$validate = new Task('数据验证', function () {
echo " 验证数据格式...\n";
sleep(1);
return ['valid' => true];
});
$transform = new Task('数据转换', function () {
echo " 转换数据格式...\n";
sleep(1);
return ['transformed' => true];
});
$load = new Task('数据加载', function () {
echo " 加载到数据仓库...\n";
sleep(1);
return ['loaded' => true];
});
$report = new Task('生成报告', function () {
echo " 生成处理报告...\n";
sleep(1);
return ['report_url' => '/reports/001'];
});
// 设置依赖关系
$transform->dependsOn($extract);
$transform->dependsOn($validate);
$load->dependsOn($transform);
$report->dependsOn($load);
$workflow->addTask($extract);
$workflow->addTask($validate);
$workflow->addTask($transform);
$workflow->addTask($load);
$workflow->addTask($report);
$result = $workflow->run();
?>
```
DAG调度器支持更复杂的任务编排:
```php
class DagScheduler
{
private array $nodes = [];
private array $edges = [];
public function addNode(string $id, callable $task, array $deps = []): void
{
$this->nodes[$id] = ['id' => $id, 'task' => $task, 'deps' => $deps, 'status' => 'pending'];
}
public function addEdge(string $from, string $to): void
{
$this->edges[] = ['from' => $from, 'to' => $to];
}
public function getTopologicalOrder(): array
{
$inDegree = [];
$graph = [];
foreach ($this->nodes as $id => $node) {
$inDegree[$id] = 0;
$graph[$id] = [];
}
foreach ($this->edges as $edge) {
$graph[$edge['from']][] = $edge['to'];
$inDegree[$edge['to']]++;
}
$queue = [];
foreach ($inDegree as $id => $degree) {
if ($degree === 0) $queue[] = $id;
}
$order = [];
while (!empty($queue)) {
$node = array_shift($queue);
$order[] = $node;
foreach ($graph[$node] as $neighbor) {
$inDegree[$neighbor]--;
if ($inDegree[$neighbor] === 0) {
$queue[] = $neighbor;
}
}
}
return $order;
}
public function hasCycle(): bool
{
return count($this->getTopologicalOrder()) !== count($this->nodes);
}
}
?>
任务编排是数据处理和流程自动化的基础。工作流引擎按依赖顺序执行任务,确保前序任务完成后才执行后续任务。DAG调度器可以检测循环依赖,保证任务的有序执行。对于简单的编排需求,PHP实现的调度器已经够用。复杂的编排建议使用Apache Airflow或Temporal。
PHP资源调度与任务编排系统