博客 / 詳情

返回

PHP 多任務協程處理

本文首發於 PHP 多任務協程處理,轉載請註明出處!

上週 有幸和同事一起在 SilverStripe 分享最近的工作事宜。今天我計劃分享 PHP 異步編程,不過由於上週我聊過 ReactPHP;我決定討論一些不一樣的內容。所以本文將探討多任務協程這方面的內容。

另外我還計劃把這個主題加入到我正在籌備的一本 PHP 異步編程的圖書中。雖然這本書相比本文來説會涉及更多細節,但我覺得本文依然具有實際意義!

那麼,開始吧!
new MyIterator(
generators make code asynchronous without extensions

這就是本文我們要討論的問題。不過我們會從更簡單更熟悉的示例開始。

一切從數組開始

我們可以通過簡單的遍歷來使用數組:

$array = ["foo", "bar", "baz"];
 
foreach ($array as $key => $value) {
    print "item: " . $key . "|" . $value . "\n";
}
 
for ($i = 0; $i < count($array); $i++) {
    print "item: " . $i . "|" . $array[$i] . "\n";
}

這是我們日常編碼所依賴的基本實現。可以通過遍歷數組獲取每個元素的鍵名和鍵值。

當然,如果我們希望能夠知道在何時可以使用數組。PHP 提供了一個方便的內置函數:

print is_array($array) ? "yes" : "no"; // yes

類數組處理

有時,我們需要對一些數據使用相同的方式進行遍歷處理,但它們並非數組類型。比如對 DOMDocument 類進行處理:

$document = new DOMDocument();
$document->loadXML("<div></div>");

$elements = $document->getElementsByTagName("div");
print_r($elements); // DOMNodeList Object ( [length] => 1 )

這顯然不是一個數組,但是它有一個 length 屬性。我們能像遍歷數組一樣,對其進行遍歷麼?我們可以判斷它是否實現了下面這個特殊的接口:

print ($elements instanceof Traversable) ? "yes" : "no"; // yes

這真的太有用了。它不會導致我們在遍歷非可遍歷數據時觸發錯誤。我們僅需在處理前進行檢測即可。

不過,這會引發另外一個問題:我們能否讓自定義類也擁有這個功能呢?回答是肯定的!第一個實現方法類似如下:

class MyTraversable implements Traversable
{
    //  在這裏編碼...
}

如果我們執行這個類,我們將看到一個錯誤信息:

PHP Fatal error: Class MyTraversable must implement interface Traversable as part of either Iterator or IteratorAggregate

Iterator(迭代器)

我們無法直接實現 Traversable,但是我們可以嘗試第二種方案:

class MyTraversable implements Iterator
{
    //  在這裏編碼...
}

這個接口需要我們實現 5 個方法。讓我們完善我們的迭代器:

class MyTraversable implements Iterator
{
    protected $data;

    protected $index = 0;

    public function __construct($data)
    {
        $this->data = $data;
    }

    public function current()
    {
        return $this->data[$this->index];
    }

    public function next()
    {
        return $this->data[$this->index++];
    }

    public function key()
    {
        return $this->index;
    }

    public function rewind()
    {
        $this->index = 0;
    }

    public function valid()
    {
        return $this->index < count($this->data);
    }
}

這邊我們需要注意幾個事項:

  1. 我們需要存儲構造器方法傳入的 $data 數組,以便後續我們可以從中獲取它的元素。
  2. 還需要一個內部索引(或指針)來跟蹤 currentnext 元素。
  3. rewind() 僅僅重置 index 屬性,這樣 current()next() 才能正常工作。
  4. 鍵名並非只能是數字類型!這裏使用數組索引是為了保證示例足夠簡單。

我們可以向下面這樣運行這段代碼:

$iterator = new MyTraversable(["foo", "bar", "baz"]);
 
foreach ($iterator as $key => $value) {
    print "item: " . $key . "|" . $value . "\n";
}

這看起來需要處理太多工作,但是這是能夠像數組一樣使用 foreach/for 功能的一個簡潔實現。

IteratorAggregate(聚合迭代器)

還記得第二個接口拋出的 Traversable 異常麼?下面看一個比實現 Iterator 接口更快的實現吧:

class MyIteratorAggregate implements IteratorAggregate
{
    protected $data;

    public function __construct($data)
    {
        $this->data = $data;
    }

    public function getIterator()
    {
        return new ArrayIterator($this->data);
    }
}

這裏我們作弊了。相比於實現一個完整的 Iterator,我們通過 ArrayIterator() 裝飾。不過,這相比於通過實現完整的 Iterator 簡化了不少代碼。

so what does this have to do with generators?

兄弟莫急!先讓我們比較一些代碼。首先,我們在不使用生成器的情況下從文件中讀取每一行數據:

$content = file_get_contents(__FILE__);

$lines = explode("\n", $content);

foreach ($lines as $i => $line) {
    print $i . ". " . $line . "\n";
}

這段代碼讀取文件自身,然後會打印出每行的行號和代碼。那麼為什麼我們不使用生成器呢!

function lines($file) {
    $handle = fopen($file, 'r');

    while (!feof($handle)) {
        yield trim(fgets($handle));
    }

    fclose($handle);
}

foreach (lines(__FILE__) as $i => $line) {
    print $i . ". " . $line . "\n";
}

我知道這看起來更加複雜。不錯,不過這是因為我們沒有使用 file_get_contents() 函數。一個生成器看起來就像是一個函數,但是它會在每次獲取到 yield 關鍵詞是停止運行。

生成器看起來有點像迭代器:

print_r(lines(__FILE__)); // Generator Object ( )

儘管它不是迭代器,它是一個 Generator。它的內部定義了什麼方法呢?

print_r(get_class_methods(lines(__FILE__)));
 
// Array
// (
//     [0] => rewind
//     [1] => valid
//     [2] => current
//     [3] => key
//     [4] => next
//     [5] => send
//     [6] => throw
//     [7] => __wakeup
// )
如果你讀取一個大文件,然後使用 memory_get_peak_usage(),你會注意到生成器的代碼會使用固定的內存,無論這個文件有多大。它每次進度去一行。而是用 file_get_contents() 函數讀取整個文件,會使用更大的內存。這就是在迭代處理這類事物時,生成器的能給我們帶來的優勢!

Send(發送數據)

可以將數據發送到生成器中。看下下面這個生成器:

<?php
$generator = call_user_func(function() {
    yield "foo";
});

print $generator->current() . "\n"; // foo
注意這裏我們如何在 call_user_func() 函數中封裝生成器函數的?這裏僅僅是一個簡單的函數定義,然後立即調用它獲取一個新的生成器實例...

我們已經見過 yield 的用法。我們可以通過擴展這個生成器來接收數據:

$generator = call_user_func(function() {
    $input = (yield "foo");

    print "inside: " . $input . "\n";
});

print $generator->current() . "\n";

$generator->send("bar");

數據通過 yield 關鍵字傳入和返回。首先,執行 current() 代碼直到遇到 yield,返回 foosend() 將輸出傳入到生成器打印輸入的位置。你需要習慣這種用法。

拋出異常(Throw)

由於我們需要同這些函數進行交互,可能希望將異常推送到生成器中。這樣這些函數就可以自行處理異常。

看看下面這個示例:

$multiply = function($x, $y) {
    yield $x * $y;
};

print $multiply(5, 6)->current(); // 30

現在讓我們將它封裝到另一個函數中:

$calculate = function ($op, $x, $y) use ($multiply) {
    if ($op === 'multiply') {
        $generator = $multiply($x, $y);

        return $generator->current();
    }
};

print $calculate("multiply", 5, 6); // 30

這裏我們通過一個普通閉包將乘法生成器封裝起來。現在讓我們驗證無效參數:

$calculate = function ($op, $x, $y) use ($multiply) {

    if ($op === "multiply") {
        $generator = $multiply($x, $y);

        if (!is_numeric($x) || !is_numeric($y)) {
            throw new InvalidArgumentException();
        }

        return $generator->current();
    }
};

print $calculate('multiply', 5, 'foo'); // PHP Fatal error...

如果我們希望能夠通過生成器處理異常?我們怎樣才能將異常傳入生成器呢!

$multiply = function ($x, $y) {
    try {
        yield $x * $y;
    } catch (InvalidArgumentException $exception) {
        print "ERRORS!";
    }
};

$calculate = function ($op, $x, $y) use ($multiply) {

    if ($op === "multiply") {
        $generator = $multiply($x, $y);

        if (!is_numeric($x) || !is_numeric($y)) {
            $generator->throw(new InvalidArgumentException());
        }

        return $generator->current();
    }
};
print $calculate('multiply', 5, 'foo'); // PHP Fatal error...

棒呆了!我們不僅可以像迭代器一樣使用生成器。還可以通過它們發送數據並拋出異常。它們是可中斷和可恢復的函數。有些語言把這些函數叫做……

coroutines

我們可以使用協程(coroutines)來構建異步代碼。讓我們來創建一個簡單的任務調度程序。首先我們需要一個 Task 類:

class Task
{
    protected $generator;

    public function __construct(Generator $generator)
    {
        $this->generator = $generator;
    }

    public function run()
    {
        $this->generator->next();
    }

    public function finished()
    {
        return !$this->generator->valid();
    }
}

Task 是普通生成器的裝飾器。我們將生成器賦值給它的成員變量以供後續使用,然後實現一個簡單的 run()finished() 方法。run() 方法用於執行任務,finished() 方法用於讓調度程序知道何時終止運行。

然後我們需要一個 Scheduler 類:

class Scheduler
{
    protected $queue;

    public function __construct()
    {
        $this->queue = new SplQueue();
    }

    public function enqueue(Task $task)
    {
        $this->queue->enqueue($task);
    }

    pulic function run()
    {
        while (!$this->queue->isEmpty()) {
            $task = $this->queue->dequeue();
            $task->run();

            if (!$task->finished()) {
                $this->queue->enqueue($task);
            }
        }
    }
}

Scheduler 用於維護一個待執行的任務隊列。run() 會彈出隊列中的所有任務並執行它,直到運行完整個隊列任務。如果某個任務沒有執行完畢,當這個任務本次運行完成後,我們將再次入列。

SplQueue 對於這個示例來講再合適不過了。它是一種 FIFO(先進先出:fist in first out) 數據結構,能夠確保每個任務都能夠獲取足夠的處理時間。

我們可以像這樣運行這段代碼:

$scheduler = new Scheduler();

$task1 = new Task(call_user_func(function() {
    for ($i = 0; $i < 3; $i++) {
        print "task1: " . $i . "\n";
        yield;
    }
}));

$task2 = new Task(call_user_func(function() {
    for ($i = 0; $i < 6; $i++) {
        print "task2: " . $i . "\n";
        yield;
    }
}));

$scheduler->enqueue($task1);
$scheduler->enqueue($task2);

$scheduler->run();

運行時,我們將看到如下執行結果:

task 1: 0
task 1: 1
task 2: 0
task 2: 1
task 1: 2
task 2: 2
task 2: 3
task 2: 4
task 2: 5

這幾乎就是我們想要的執行結果。不過有個問題發生在首次運行每個任務時,它們都執行了兩次。我們可以對 Task 類稍作修改來修復這個問題:

class Task
{
    protected $generator;

    protected $run = false;

    public function __construct(Generator $generator)
    {
        $this->generator = $generator;
    }

    public function run()
    {
        if ($this->run) {
            $this->generator->next();
        } else {
            $this->generator->current();
        }

        $this->run = true;
    }

    public function finished()
    {
        return !$this->generator->valid();
    }
}

我們需要調整首次 run() 方法調用,從生成器當前有效的指針讀取運行。後續調用可以從下一個指針讀取運行...

so simple!

有些人基於這個思路實現了一些超讚的類庫。我們來看看其中的兩個...

RecoilPHP

RecoilPHP 是一套基於協程的類庫,它最令人印象深刻的是用於 ReactPHP 內核。可以將事件循環在 RecoilPHP 和 RecoilPHP 之間進行交換,而你的程序無需架構上的調整。

我們來看一下 ReactPHP 異步 DNS 解決方案:

function resolve($domain, $resolver) {
    $resolver
        ->resolve($domain)
        ->then(function ($ip) use ($domain) {
            print "domain: " . $domain . "\n";
            print "ip: " . $ip . "\n";
        }, function ($error) {            
            print $error . "\n";
        })
}

function run()
{
    $loop = React\EventLoop\Factory::create();
 
    $factory = new React\Dns\Resolver\Factory();
 
    $resolver = $factory->create("8.8.8.8", $loop);
 
    resolve("silverstripe.org", $resolver);
    resolve("wordpress.org", $resolver);
    resolve("wardrobecms.com", $resolver);
    resolve("pagekit.com", $resolver);
 
    $loop->run();
}
 
run();

resolve() 接收域名和 DNS 解析器,並使用 ReactPHP 執行標準的 DNS 查找。不用太過糾結與 resolve() 函數內部。重要的是這個函數不是生成器,而是一個函數!

run() 創建一個 ReactPHP 事件循環,DNS 解析器(這裏是個工廠實例)解析若干域名。同樣,這個也不是一個生成器。

想知道 RecoilPHP 到底有何不同?還希望掌握更多細節!

use Recoil\Recoil;
 
function resolve($domain, $resolver)
{
    try {
        $ip = (yield $resolver->resolve($domain));
 
        print "domain: " . $domain . "\n";
        print "ip: " . $ip . "\n";
    } catch (Exception $exception) {
        print $exception->getMessage() . "\n";
    }
}
 
function run()
{
    $loop = (yield Recoil::eventLoop());
 
    $factory = new React\Dns\Resolver\Factory();
 
    $resolver = $factory->create("8.8.8.8", $loop);
 
    yield [
        resolve("silverstripe.org", $resolver),
        resolve("wordpress.org", $resolver),
        resolve("wardrobecms.com", $resolver),
        resolve("pagekit.com", $resolver),
    ];
}
 
Recoil::run("run");

通過將它集成到 ReactPHP 來完成一些令人稱奇的工作。每次運行 resolve() 時,RecoilPHP 會管理由 $resoler->resolve() 返回的 promise 對象,然後將數據發送給生成器。此時我們就像在編寫同步代碼一樣。與我們在其他一步模型中使用回調代碼不同,這裏只有一個指令列表。

RecoilPHP 知道它應該管理一個有執行 run() 函數時返回的 yield 數組。RoceilPHP 還支持基於協程的數據庫(PDO)和日誌庫。

IcicleIO

IcicleIO 為了一全新的方案實現 ReactPHP 一樣的目標,而僅僅使用協程功能。相比 ReactPHP 它僅包含極少的組件。但是,核心的異步流、服務器、Socket、事件循環特性一個不落。

讓我們看一個 socket 服務器示例:

use Icicle\Coroutine\Coroutine;
use Icicle\Loop\Loop;
use Icicle\Socket\Client\ClientInterface;
use Icicle\Socket\Server\ServerInterface;
use Icicle\Socket\Server\ServerFactory;
 
$factory = new ServerFactory();
 
$coroutine = Coroutine::call(function (ServerInterface $server) {
    $clients = new SplObjectStorage();
     
    $handler = Coroutine::async(
        function (ClientInterface $client) use (&$clients) {
            $clients->attach($client);
             
            $host = $client->getRemoteAddress();
            $port = $client->getRemotePort();
             
            $name = $host . ":" . $port;
             
            try {
                foreach ($clients as $stream) {
                    if ($client !== $stream) {
                        $stream->write($name . "connected.\n");
                    }
                }
 
                yield $client->write("Welcome " . $name . "!\n");
                 
                while ($client->isReadable()) {
                    $data = trim(yield $client->read());
                     
                    if ("/exit" === $data) {
                        yield $client->end("Goodbye!\n");
                    } else {
                        $message = $name . ":" . $data . "\n";
                        
                        foreach ($clients as $stream) {
                            if ($client !== $stream) {
                                $stream->write($message);
                            }
                        }
                    }
                }
            } catch (Exception $exception) {
                $client->close($exception);
            } finally {
                $clients->detach($client);
                foreach ($clients as $stream) {
                    $stream->write($name . "disconnected.\n");
                }
            }
        }
    );
     
    while ($server->isOpen()) {
        $handler(yield $server->accept());
    }
}, $factory->create("127.0.0.1", 6000));
 
Loop::run();

據我所知,這段代碼所做的事情如下:

  1. 在 127.0.0.1 和 6000 端口創建一個服務器實例,然後將其傳入外部生成器.
  2. 外部生成器運行,同時服務器等待新連接。當服務器接收一個連接它將其傳入內部生成器。
  3. 內部生成器寫入消息到 socket。當 socket 可讀時運行。
  4. 每次 socket 向服務器發送消息時,內部生成器檢測消息是否是退出標識。如果是,通知其他 socket。否則,其它 socket 發送這個相同的消息。

打開命令行終端輸入 nc localhost 6000 查看執行結果!

該示例使用 SplObjectStorage 跟蹤 socket 連接。這樣我們就可以向所有 socket 發送消息。

mathematical!

這個話題可以包含很多內容。希望您能看到生成器是如何創建的,以及它們如何幫助編寫迭代程序和異步代碼。

如果你有問題,可以隨時問我。

感謝 Nikita Popov(還有它的啓蒙教程 Cooperative multitasking using coroutines (in PHP!) ),Anthony Ferrara 和 Joe Watkins。這些研究工作澤被蒼生,給我以寫作此篇文章的靈感。關注他們吧,好麼?

原文

Co-operative PHP Multitasking

user avatar chenxiaokai 頭像 mafa1993 頭像
2 位用戶收藏了這個故事!

發佈 評論

Some HTML is okay.