Home > ブログ > PHPのストリームフィルタ

ブログ

PHPのストリームフィルタ

phpにはストリームフィルタという機能があります。ストリーム(*1)にフィルタを登録しておくと、ストリームへread/writeした際に自動でフィルタによるデータ加工が行われるようになります。

以下はフィルタを使って文字コードの変換を行う例です。fwrite()で書き込みをするとフィルタによって文字コードがSJISに変換されてテキスト出力されます。

Stream Filterの使用例

<?php
$fp = fopen('php://output', 'w'); 
stream_filter_prepend($fp,'convert.iconv.utf-8/cp932//TRANSLIT');
fwrite($fp, 'こんにちは');
fclose($fp);

※エラー処理は省略

あまり使いどころがないかもしれませんが、fputcsv()でCSV出力する時に便利だったりします。 fputcsv()で文字列に出力できると、その後に文字コードを変換したりできるのですが、fputcsv()は指定したストリームに出力することしかできないため、文字コードを変換するタイミングがありません。出力ストリームにフィルタを設定しておけば出力時に文字コードを変換することができます。また、改行コードを変換するフィルタを作成しておけば、fputcsv()で出力時にLF → CRLFに変換することもできます。

fputcsv()とStream Filterの組み合わせ

<?php
$fp = fopen('php://output', 'w'); 
stream_filter_prepend($fp,'convert.iconv.utf-8/cp932//TRANSLIT');
fputcsv($fp, ['りんご', 'みかん', 'いちご']);
fclose($fp);

上の例はPHPの組み込みのフィルタですが、自分でフィルタを作成することもできます。ただし、ストリームフィルタ周りのDocumentはあまりないため、苦労することになるはずです。今回はフィルタの基本的な作成方法を解説します。

基本的な例

例として入力にstrtoupper()を適用するstrtoupper_filterフィルタクラスを説明します。これはPHPのソースコードに含まれている ext/standard/tests/filters/read.phpt のテストコードを簡略化したものです。

strtoupper_filterフィルタ

<?php
// PHPのソースコード内のテストコードより(若干修正)
class strtoupper_filter extends php_user_filter {
    function filter($in, $out, &$consumed, $closing)
    {
        $output = 0;
        while ($bucket = stream_bucket_make_writeable($in)) {
            $bucket->data = strtoupper($bucket->data);
            $consumed += $bucket->datalen;
            stream_bucket_append($out, $bucket);
            $output = 1;
        }
        if ($closing && !$output) {
            $output = 1;
        }
        return $output ? PSFS_PASS_ON : PSFS_FEED_ME;
    }
}

function process()
{
    $fp = fopen('php://output', 'w'); 
    stream_filter_register('strtoupper_filter', strtoupper_filter::class);
    stream_filter_append($fp, 'strtoupper_filter');

    while (!feof(STDIN)) {
        $data = fread(STDIN, 1000);
        if ($data === false) {
            break;
        }
        fwrite($fp, $data);
    }
    fclose($fp);
}

process();

標準入力から受け取ったテキストを大文字化して出力します。

実行例

$ echo "Hello" | php strtoupper.php
HELLO

このフィルタクラスの作成方法を順番に説明していきます。

まずフィルタは php_user_filter を継承したクラスとして実装します。

フィルタクラスのメインは変換処理を行うfilter()メソッドです。このメソッドはストリームにread/writeするたびに呼び出されます。filter()メソッドのシグネチャは以下のようになっています。

function filter($in, $out, &$consumed, $closing)

$in と $out は"userfilter.bucket brigade"というタイプのリソースです。(以下では Bucket Brigade もしくは単に Brigade と呼びます)。 filter()でやることは $in から受け取ったデータを加工して $out に渡すことです。

filter()内の処理を説明する前に、Bucket BrigadeとBucketについて説明しておきます。

Bucket Brigade は文字通り Bucket の集まりで、単なる Bucket の双方向リストです。Bucket はバッファーを管理するオブジェクトです。Bucket Brigade と Bucket の構造の概要を図1に示します。これは、PHPの内部実装(Cで実装されているもの)での構造なので、PHPスクリプト側からの見え方は少し異なります。

Bucket Brigade
図1 Bucket Brigadeの構造

次にfwrite()でストリームにデータ書き込んだ際のデータの流れを図2に示します。

Stream Filter
図2 データの流れ

fwrite() を呼び出すと入力Brigadeと出力Brigadeが作成され、入力Brigadeにfwrite()で書き込んだデータを格納した Bucket を一つ登録します。

そして、入力Brigadeと空の出力Brigadeをフィルタの filter() メソッドに渡してフィルタにデータの加工を行わせます。フィルタの出力は出力Brigadeに登録されるので、これを次の入力Brigadeとして次のフィルタに渡します。最後のフィルタまで処理すると出力Brigadeのデータが出力されます。

filter()メソッドの説明に戻りますが、$in, $out 引数がこの入力Brigadeと出力Brigadeになります。

filter() 内では stream_bucket_make_writeable($in) で入力Brigadeから先頭のBucketを取得しています。取り出した Bucket のデータを strtoupper() で加工して stream_bucket_append($out, $bucket) で出力Brigadeに登録しています。

ところで stream_bucket_make_writeable() の make_writeable(書込み可にする)とはどういうことでしょうか?PHPの内部実装の話になりますが、Bucket は内部に参照カウントを持ち、PHP内の各所から Bucket を共有できるようになっています。 この共有した Bucket のデータを勝手に書き換えるのはまずいので、stream_bucket_make_writeable() は参照カウントが2以上(共有されている)時は Bucket を複製して、自分専用の自由に書込みできる Bucket を作成します。これが "make_writeable" の由来です。 内部関数の php_stream_bucket_make_writeable() の名前から来ているのですが、PHPスクリプト側から使う視点ではちょっと分かりにくい名前ですね。

話が戻りますが、データ出力時は $consume を書き換えます。$in から読み取ったデータ量を設定するのですが、これはfwrite()の返り値(書込みサイズ)になります(*2)。

$closing 引数はストリームがclose中の場合にtrueになります。trueだともうこれ以上データは続きません。ストリームが EOF に到達した場合に $close = true で filter() が呼び出されます。

filter() の返り値は PSFS_PASS_ON, PSFS_FEED_ME, PSFS_ERR_FATAL の何れかになります。

stream_bucket_append() で $out にデータを出力した場合は PSFS_PASS_ON を返します。PSFS_PASS_ON が返されると、$out は次のフィルタに渡されます(図2)。 入力Brigade が空で何も出力しなかった場合は PSFS_FEED_ME を返します。 PSFS_FEED_ME が返された場合は、そこでフィルタ処理は中断されます(同じく図2)。この例ではEOFに到達した時や、ストリームが空の状態でfflush()を呼び出した時などに、空の入力Brigadeでfilter()が呼び出されます。

返り値については、例外的に以下の処理で、$closing == true なら送信データがなくても PSFS_PASS_ON を返しているのに気付いたでしょうか?

        if ($closing && !$output) {
            $output = 1;
        }

先の説明のとおり、PSFS_FEED_ME を返してしまうとフィルタ処理は中断されます。 このため、後続のフィルタの filter($closing = true) が呼び出されません。 後続のフィルタでフィルタ内にデータをバッファリングしていたような場合は、その出力機会を失い、データが消えることになります(*3)。このため $closing == true 時は必ず PSFS_PASS_ON を返して、後続の filter($closing = true) も呼び出されるようにしています。

ちなみに、PSFS_PASS_ON / PSFS_FEED_ME の返り値は、単に次のフィルタにデータを渡すか中断するかを制御しているだけです。出力データがない(出力Brigadeが空の)状況で PSFS_PASS_ON を返した場合は、次のフィルタに空の入力Brigade($in) が渡されます。動作に問題はありません。

これで単純なフィルタは実装できるようになりました。

作成したフィルタを使うには stream_filter_register() で名前登録して、stream_filter_prepend() や stream_filter_append() でフィルタチェーンに登録します。

少し複雑な例

PSFS_FEED_ME のもっと詳細な説明するために、もう少し複雑な例として改行コードを変換するフィルタを書いてみます。このフィルタは入力データ中の CR(\x0d), LF(\x0a) を CRLF(\x0d\x0a) に変換して改行コードを統一します。

まずは不完全な例から。

CRLFフィルタ(不完全な例)

<?php
// 改行コードをCRLFに統一するフィルタ(不完全な例)
class BadLf2CrLfFilter extends php_user_filter
{
    public function filter($in, $out, &$consumed, $closing)
    {
        $output = 0;
        while ($bucket = stream_bucket_make_writeable($in)) {
            // \x0d \x0aのシーケンスのうち \x0d までのデータを
            // 受け取った場合、
            // \x0d \x0a => \x0d \x0a \x0d \x0a
            // のように2つのCRLFに変換されてしまう。
            $consumed += $bucket->datalen;
            $bucket->data = preg_replace(
                "/(\x0d\x0a|\x0d|\x0a)/", "\x0d\x0a",
                $bucket->data
            );
            // $bucket->datalenは更新しなくても問題ない
            stream_bucket_append($out, $bucket);
            $output = 1;
        }
        if ($closing && !$output) {
            $output = 1;
        }
        return $output ? PSFS_PASS_ON : PSFS_FEED_ME;
    }
}

function process()
{
    $fp = fopen('php://output', 'w'); 
    stream_filter_register('bad_nlconv', BadLf2CrLfFilter::class);
    stream_filter_append($fp, 'bad_nlconv');

    $total = 0;
    while (!feof(STDIN)) {
        $data = fread(STDIN, 1000);
        if ($data === false) {
            break;
        }

        fwrite($fp, $data);
    }
    fclose($fp);
}

process();

strtoupper_filter の strtoupper() を preg_replace() での改行コード変換に変えただけです。

上記の例では標準入力から fread() でデータを受け取りながら php://output に出力しています。 扱うのはテキストデータなので fgets() でデータを読み込んで行単位で書き込めば、BadLf2CrLfFilter でも正しく動作します。 ただし、一般にストリームのデータ処理においては、受信データがどこで区切られるかはわかりません(*4)。 例のように fread() で読み込んだデータをそのまま fwrite() した場合、データ中に CR LF のシーケンスがあったとして、CRまでのデータが送られてきて、次の受信で LF 以降のデータが送られてくることもあります。 フィルタもこれを考慮する必要があります。 上記の作りでは CR(\x0d) LF(\x0a) のシーケンスのうち、CR までのデータを受け取った場合に、最終的に CR LF が二つに増えてしまいます(図3)。

Incorrect Conversion
図3 誤った変換動作

これを修正したのが以下のクラスになります。

修正したCRLFフィルタ(ただしく変換できるがまだ問題点はある)

class Lf2CrLfFilter extends php_user_filter
{
    private $data = '';
    private $datalen = 0;

    public function filter($in, $out, &$consumed, $closing)
    {
        while ($bucket = stream_bucket_make_writeable($in)) {
            $this->data .= $bucket->data;
            $this->datalen += $bucket->datalen;
            $consumed += $bucket->datalen;
        }

        // 行末の改行コード判定
        // preg_match("/\x0d$/", $this->data)だと"foo\x0d\x0a"も
        // マッチするので注意
        $lastByte = ord(substr($this->data, strlen($this->data) - 1));
        if (!$closing && $lastByte == 0x0d) {
            // CRLFのうちCRまでしか受け取っていない可能性があるので
            // 続きを受け取る。
            return PSFS_FEED_ME;
        }

        // streamがなくなっている。fclose()をしていなかった場合など。
        // $this->dataに残っているデータがあった場合は破棄される。
        if (get_resource_type($this->stream) == 'Unknown') {
            return PSFS_ERR_FATAL;
        }

        $this->data = preg_replace(
            "/(\x0d\x0a|\x0d|\x0a)/", "\x0d\x0a",
            $this->data
        );

        $bucket = stream_bucket_new($this->stream, $this->data);

        $this->data = '';
        $this->datalen = 0;

        stream_bucket_append($out, $bucket);

        return PSFS_PASS_ON;
    }
}

このクラスでは受け取ったデータの終端がCRだった場合は、続くコードがLFである可能性を考えて、変換処理は行わず $this->data にデータを溜めておきます。 この時に、PSFS_FEED_ME を return して続きのデータを待ちます。 PHP Manualにある「PSFS_FEED_ME: フィルタの処理は成功しましたが、返すデータはありません。 ストリームあるいは一つ前のフィルタから、さらにデータが必要です。」とはこのような使い方を説明したものです。

続きのデータを受け取り、改行コードが CR LF, CR, LF のどれなのかが確定したら、preg_replace()で変換して出力Brigadeに出力しています。

とりあえずこれでデータがどこで途切れようと改行コードを正しく変換できるようになりましたが、PSFS_FEED_ME の説明のために処理を簡略化しているため、一点問題があります。それは、CR終端の行を fwrite() され続けると、$this->data にデータが溜まっていくだけで close するまで変換および出力がまったく行われないことです。 これを修正したものが、以下のクラスになります。

さらに修正したCRLFフィルタ

class Lf2CrLfFilter extends php_user_filter
{
    private $data = '';
    private $datalen = 0;

    public function filter($in, $out, &$consumed, $closing)
    {
        while ($bucket = stream_bucket_make_writeable($in)) {
            $this->data .= $bucket->data;
            $this->datalen += $bucket->datalen;
            $consumed += $bucket->datalen;
        }

        // 行末の改行コード判定
        $lastByte = ord(substr($this->data, strlen($this->data) - 1));
        if (!$closing && $lastByte == 0x0d) {
            // CRLFのうちCRまでしか受け取っていない可能性がある
            if (strlen($this->data) > 2000) {
                // データがある程度溜まっていれば、CRの前までは処理できるので処理する。
                // > 1で判定して毎回処理してもいいが、ここではある程度データが
                // 溜まってから処理する。
                $data = substr($this->data, 0, strlen($this->data) - 1);
                $datalen = strlen($data);
                $this->data = substr($this->data, strlen($this->data) - 1);
                $this->datalen = strlen($this->data);
            } else {
                // 何も処理せずCRの続きを待つ。
                return PSFS_FEED_ME;
            }
        } else {
            // 受信データを全て処理
            $data = $this->data;
            $datalen = $this->datalen;
            $this->data = '';
            $this->datalen = 0;
        }

        /*
         * Input:
         * $data     加工対象データ
         * $datalen  strlen($data)
         */

        if (get_resource_type($this->stream) == 'Unknown') {
            return PSFS_ERR_FATAL;
        }

        $data = preg_replace(
            "/(\x0d\x0a|\x0d|\x0a)/", "\x0d\x0a",
            $data
        );

        $bucket = stream_bucket_new($this->stream, $data);

        stream_bucket_append($out, $bucket);

        return PSFS_PASS_ON;
    }
}

このクラスでは、データの終端がCRだった場合でも、バッファ内の変換できる部分(CRの手前まで)は変換してしまい、少しずつ出力するようにしています。

まとめ

DocumentがあまりないPHPのストリームフィルタの作成方法を説明しました。また、あまり情報のない PSFS_FEED_ME の扱いについても説明しました。

PHPのストリームフィルタに限りませんが、ストリームのデータを処理する上で重要なのは、受け取ったデータがまだ途中までしか来ていない可能性を考えて処理することです。 受信データのうち完全な部分までを処理して、不完全な部分があれば、その後のデータの到着を待ってから処理を継続する必要があります。

ストリームデータに対するフィルタなら1byteずつfwrite()されても、全データを一気にfwrite()されても最終的に同じ結果になる必要があります。どのようにfwriteしてくるかを勝手な想定をしないよう気をつける必要があります。

ちなみに組み込みの convert.iconv フィルタも同様の対応になっています。 マルチバイトシーケンスの途中までしかデータがなかった場合は、データが揃っているところまで変換して、残りは続きのデータが来るのを待ってから変換を行っています(*5)。

最初の例の strtoupper_filter のように単純に置換するだけのフィルタなら実装も簡単なのですが、今回の改行コード変換のように、バイトシーケンスを意識する必要があるものの作り方も理解できたと思います。 マルチバイト文字に何らかの変換を行うフィルタを書く場合も、マルチバイト文字の途中のバイトまでしか届いていない可能性に注意して実装する必要があります。

追記

Lf2CrLfFilter はPHP7.4以降で動作します。7.2(おそらく7.3も)ではfilter()メソッドの呼び出しで $closing が true になることがないため、最後のデータ受信時にバッファ($this->data)内にデータが残っていた場合、出力されることなく失われます。

$closing が true にならないのは、おそらく以下のバグだと思われます。

stream filter loses final block of data
https://bugs.php.net/bug.php?id=77069

(*1) ストリームとはfopen()やfsockopen()などで返されるリソースのこと。

(*2) フィルタチェーンの先頭のフィルタが返すconsumeがfwrite()の返り値になります。

(*3) フィルタ内にデータをバッファリングする例はこの後で出てきます。

(*4) 必ず行単位に送ってくるとは限りませんし、1行のサイズが大きい場合は、fgets()で行をまるごと読めずにサイズ指定しないといけない場合もあります。また、bzip2.decompress フィルタとチェーンして使った場合なども、bzip2.decompress は展開データを完了した部分から順次送ってくるので、データの区切りがどこに来るかは想定できません(マルチバイト文字の途中で区切られたりとか)。

(*5) 当初、convert.iconv は incomplete multibyte sequence に対応していないと誤った記述をしていたので修正しました(2022.2.25)。ただし、convert.iconv フィルタは出力データがない場合でも PSFS_FEED_ME を返さず、常に PSFS_PASS_ON を返して後続のフィルタに空のBrigadeを流しているようです。PSFS_FEED_ME を返すケースがなかったので勘違いしてしまいました。

投稿日:2022/02/18 11:28(最終更新:2022/02/25 16:49)

タグ: PHP プログラミング

Top

アーカイブ

タグ

Server (17) プログラミング (15) 作業実績 (15) PHP (10) ネットワーク (9) C (7) C++ (7) Nginx (5) OpenSSL (5) Webアプリ (5) Linux (4) laravel (4) EC-CUBE (4) 書籍 (4) AWS (3) JavaScript (3) Rust (3) Golang (2) Vue.js (2) デモ (1) Apache (1) お知らせ (1) Symfony (1) MySQL (1) CreateJS (1) OSS (1)