PHP数据流与管道处理详解
2026/6/3 2:56:11 网站建设 项目流程

PHP数据流与管道处理详解

数据流是PHP中处理数据的重要抽象。流式处理可以处理任意大小的数据,内存占用固定。今天说说PHP的流体系和各种用法。

PHP的流使用统一的协议格式,比如file://、http://、php://等。所有的文件操作都可以用流的方式处理。

```php
// 基础流操作
$handle = fopen('php://temp', 'r+');
fwrite($handle, "Hello Stream!\n");
fwrite($handle, "第二行数据\n");

rewind($handle);
while (($line = fgets($handle)) !== false) {
echo "读取: " . trim($line) . "\n";
}
fclose($handle);

// php://input 读取请求体
$rawBody = file_get_contents('php://input');
echo "请求体: $rawBody\n";

// php://output 直接输出
$output = fopen('php://output', 'w');
fwrite($output, "直接输出到浏览器\n");
fclose($output);

// php://memory 内存流
$memory = fopen('php://memory', 'r+');
fwrite($memory, str_repeat("data", 1000));
echo "内存流位置: " . ftell($memory) . "\n";
rewind($memory);
$content = stream_get_contents($memory);
echo "内存流长度: " . strlen($content) . "\n";
fclose($memory);
?>
```

流过滤器可以对流中的数据做实时转换。

```php
// 注册自定义流过滤器
class UpperCaseFilter extends php_user_filter
{
public function filter($in, $out, &$consumed, $closing): int
{
while ($bucket = stream_bucket_make_writeable($in)) {
$bucket->data = strtoupper($bucket->data);
$consumed += $bucket->datalen;
stream_bucket_append($out, $bucket);
}
return PSFS_PASS_ON;
}
}

class Base64EncodeFilter extends php_user_filter
{
public function filter($in, $out, &$consumed, $closing): int
{
while ($bucket = stream_bucket_make_writeable($in)) {
$bucket->data = base64_encode($bucket->data);
$consumed += $bucket->datalen;
stream_bucket_append($out, $bucket);
}
return PSFS_PASS_ON;
}
}

stream_filter_register('uppercase', UpperCaseFilter::class);
stream_filter_register('base64encode', Base64EncodeFilter::class);

// 使用过滤器
$handle = fopen('php://memory', 'r+');
stream_filter_append($handle, 'uppercase');
fwrite($handle, "hello world\nthis is php\n");
rewind($handle);

while (($line = fgets($handle)) !== false) {
echo $line;
}
fclose($handle);
?>
```

流封装器可以自定义数据的来源和去向。

```php
class ConfigStreamWrapper
{
private $position = 0;
private $data = '';
private static array $configs = [];

public static function setConfig(string $key, mixed $value): void
{
self::$configs[$key] = $value;
}

public function stream_open(string $path, string $mode, int $options, ?string &$opened_path): bool
{
$key = str_replace('config://', '', $path);
$value = self::$configs[$key] ?? null;
$this->data = json_encode($value);
$this->position = 0;
return true;
}

public function stream_read(int $count): string
{
$data = substr($this->data, $this->position, $count);
$this->position += strlen($data);
return $data;
}

public function stream_write(string $data): int
{
$this->data .= $data;
return strlen($data);
}

public function stream_tell(): int
{
return $this->position;
}

public function stream_eof(): bool
{
return $this->position >= strlen($this->data);
}

public function stream_seek(int $offset, int $whence): bool
{
switch ($whence) {
case SEEK_SET: $this->position = $offset; break;
case SEEK_CUR: $this->position += $offset; break;
case SEEK_END: $this->position = strlen($this->data) + $offset; break;
}
return true;
}
}

stream_wrapper_register('config', ConfigStreamWrapper::class);

ConfigStreamWrapper::setConfig('database', [
'host' => 'localhost',
'port' => 3306,
'name' => 'test',
]);

$config = file_get_contents('config://database');
echo "配置: $config\n";
?>
```

管道模式的流式处理,适合处理大文件。

```php
function createReadStream(string $path): callable
{
return function () use ($path) {
$handle = fopen($path, 'r');
while (($line = fgets($handle)) !== false) {
yield trim($line);
}
fclose($handle);
};
}

function createTransformStream(callable $transform): callable
{
return function (iterable $input) use ($transform): Generator {
foreach ($input as $item) {
yield $transform($item);
}
};
}

function createFilterStream(callable $predicate): callable
{
return function (iterable $input) use ($predicate): Generator {
foreach ($input as $item) {
if ($predicate($item)) {
yield $item;
}
}
};
}

function createWriteStream(callable $writeFn): callable
{
return function (iterable $input) use ($writeFn): void {
foreach ($input as $item) {
$writeFn($item);
}
};
}

function pipe(iterable $source, callable ...$transforms): Generator
{
$result = $source;
foreach ($transforms as $transform) {
$result = $transform($result);
}
return $result;
}

// 使用管道处理数据
$source = function () {
foreach (range(1, 100) as $i) {
yield "数据_{$i}";
}
};

$pipeline = pipe(
$source(),
createTransformStream(fn($item) => strtoupper($item)),
createFilterStream(fn($item) => str_contains($item, '5') || str_contains($item, '0')),
createTransformStream(fn($item) => "[{$item}]"),
);

foreach ($pipeline as $item) {
echo "$item ";
}
echo "\n";
?>
```

PHP的流系统是一个强大但被低估的功能。理解流的原理后,可以用统一的接口处理文件、网络、内存等各种数据源。自定义流封装器和过滤器可以扩展PHP的数据处理能力,在处理大数据时特别有用。

需要专业的网站建设服务?

联系我们获取免费的网站建设咨询和方案报价,让我们帮助您实现业务目标

立即咨询