PHP异步处理与消息队列实战
2026/6/5 20:48:30 网站建设 项目流程

PHP异步处理与消息队列实战

Web应用中有很多耗时操作不适合在请求中同步执行,比如发送邮件、生成报表、处理图片等。把这些任务放到消息队列中异步处理,可以大大提升用户体验。今天说说PHP中的异步处理方案和消息队列的使用。

先说一个简单但有效的异步处理方法——在请求结束后继续执行。

```php
// 使用fastcgi_finish_request实现异步
function asyncTask(callable $task): void
{
if (function_exists('fastcgi_finish_request')) {
$task();
} else {
register_shutdown_function($task);
}
}

echo "请求开始处理...\n";

// 注册一个异步任务
asyncTask(function () {
sleep(2);
file_put_contents('/tmp/async.log', date('Y-m-d H:i:s') . " 异步任务执行完成\n", FILE_APPEND);
});

echo "请求响应已发送\n";

// 如果有fastcgi_finish_request,调用后响应会立即发送给客户端
if (function_exists('fastcgi_finish_request')) {
fastcgi_finish_request();
}
?>
```

用MySQL做简单的消息队列:

```php
class MySQLQueue
{
private PDO $pdo;
private string $table;

public function __construct(PDO $pdo, string $table = 'jobs')
{
$this->pdo = $pdo;
$this->table = $table;
$this->initTable();
}

private function initTable(): void
{
$this->pdo->exec("
CREATE TABLE IF NOT EXISTS {$this->table} (
id BIGINT AUTO_INCREMENT PRIMARY KEY,
queue VARCHAR(50) NOT NULL DEFAULT 'default',
payload TEXT NOT NULL,
status ENUM('pending', 'processing', 'completed', 'failed') DEFAULT 'pending',
attempts INT DEFAULT 0,
max_attempts INT DEFAULT 3,
available_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
completed_at TIMESTAMP NULL,
INDEX idx_queue_status (queue, status),
INDEX idx_available (available_at)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4
");
}

public function push(string $queue, array $data, int $delay = 0): int
{
$availableAt = date('Y-m-d H:i:s', time() + $delay);
$stmt = $this->pdo->prepare("
INSERT INTO {$this->table} (queue, payload, available_at)
VALUES (?, ?, ?)
");
$stmt->execute([$queue, json_encode($data), $availableAt]);
return (int)$this->pdo->lastInsertId();
}

public function pop(string $queue, int $timeout = 5): ?array
{
$start = time();

while (time() - $start < $timeout) {
$this->pdo->beginTransaction();

$stmt = $this->pdo->prepare("
SELECT * FROM {$this->table}
WHERE queue = ? AND status = 'pending' AND available_at <= NOW()
ORDER BY id ASC
LIMIT 1
FOR UPDATE
");
$stmt->execute([$queue]);
$job = $stmt->fetch(PDO::FETCH_ASSOC);

if ($job) {
$stmt = $this->pdo->prepare("
UPDATE {$this->table} SET status = 'processing', attempts = attempts + 1
WHERE id = ?
");
$stmt->execute([$job['id']]);
$this->pdo->commit();

return $job;
}

$this->pdo->commit();
usleep(200000); // 200ms
}

return null;
}

public function complete(int $id): void
{
$stmt = $this->pdo->prepare("
UPDATE {$this->table} SET status = 'completed', completed_at = NOW()
WHERE id = ?
");
$stmt->execute([$id]);
}

public function fail(int $id): void
{
$stmt = $this->pdo->prepare("
UPDATE {$this->table} SET status = 'failed' WHERE id = ?
");
$stmt->execute([$id]);
}

public function retryFailed(string $queue): int
{
$stmt = $this->pdo->prepare("
UPDATE {$this->table} SET status = 'pending', available_at = NOW()
WHERE queue = ? AND status = 'failed' AND attempts < max_attempts
");
$stmt->execute([$queue]);
return $stmt->rowCount();
}

public function size(string $queue): int
{
$stmt = $this->pdo->prepare("
SELECT COUNT(*) FROM {$this->table}
WHERE queue = ? AND status = 'pending'
");
$stmt->execute([$queue]);
return (int)$stmt->fetchColumn();
}

public function worker(string $queue, callable $handler): void
{
echo "Worker启动,监听队列: $queue\n";

while (true) {
try {
$job = $this->pop($queue);
if ($job === null) {
sleep(1);
continue;
}

echo "处理任务 #{$job['id']}\n";
$payload = json_decode($job['payload'], true);

try {
$handler($payload);
$this->complete($job['id']);
echo "任务 #{$job['id']} 完成\n";
} catch (Exception $e) {
echo "任务 #{$job['id']} 失败: {$e->getMessage()}\n";
if ($job['attempts'] >= $job['max_attempts']) {
$this->fail($job['id']);
} else {
$this->release($job['id'], 60);
}
}
} catch (Exception $e) {
echo "Worker错误: {$e->getMessage()}\n";
sleep(5);
}

// 检查内存
if (memory_get_usage(true) > 128 * 1024 * 1024) {
echo "内存超限,退出\n";
break;
}
}
}

private function release(int $id, int $delay): void
{
$availableAt = date('Y-m-d H:i:s', time() + $delay);
$stmt = $this->pdo->prepare("
UPDATE {$this->table} SET status = 'pending', available_at = ?
WHERE id = ?
");
$stmt->execute([$availableAt, $id]);
}
}

$pdo = new PDO('mysql:host=localhost;dbname=test', 'root', '');
$queue = new MySQLQueue($pdo);

// 推送任务
$queue->push('emails', ['to' => 'user@test.com', 'subject' => '欢迎注册', 'body' => 'Hello!']);
$queue->push('emails', ['to' => 'admin@test.com', 'subject' => '新用户注册', 'body' => '新用户已注册']);
$queue->push('reports', ['type' => 'daily', 'date' => '2024-01-15']);

echo "队列大小: emails=" . $queue->size('emails') . ", reports=" . $queue->size('reports') . "\n";
?>
```

用Redis做消息队列性能更好,支持优先级和延迟消息:

```php
class RedisQueue
{
private Redis $redis;
private string $prefix = 'queue:';

public function __construct(Redis $redis)
{
$this->redis = $redis;
}

public function push(string $queue, array $data, int $delay = 0): string
{
$jobId = uniqid('job_', true);
$payload = json_encode([
'id' => $jobId,
'data' => $data,
'created_at' => time(),
'attempts' => 0,
]);

if ($delay > 0) {
$this->redis->zAdd($this->prefix . "delayed:{$queue}", time() + $delay, $payload);
} else {
$this->redis->lPush($this->prefix . $queue, $payload);
}

return $jobId;
}

public function later(string $queue, array $data, int $delay): string
{
return $this->push($queue, $data, $delay);
}

public function pop(string $queue, int $timeout = 5): ?array
{
$result = $this->redis->brPop([$this->prefix . $queue], $timeout);
if ($result === null) return null;

$payload = json_decode($result[1], true);
if ($payload === null) return null;

$payload['attempts']++;
return $payload;
}

public function size(string $queue): int
{
return $this->redis->lLen($this->prefix . $queue);
}

private function migrateDelayed(string $queue): int
{
$now = time();
$delayedKey = $this->prefix . "delayed:{$queue}";
$jobs = $this->redis->zRangeByScore($delayedKey, 0, $now);

$count = 0;
foreach ($jobs as $job) {
$this->redis->lPush($this->prefix . $queue, $job);
$this->redis->zRem($delayedKey, $job);
$count++;
}

return $count;
}
}
?>
```

消息队列让耗时操作异步执行,不阻塞主请求。选择合适的队列后端很重要。MySQL队列简单但性能有限,Redis队列性能好功能完善,RabbitMQ和Kafka适合大型分布式系统。对于大部分PHP项目来说,Redis队列已经足够了。

需要专业的网站建设服务?

联系我们获取免费的网站建设咨询和方案报价,让我们帮助您实现业务目标

立即咨询