PHP 实现个简单的协程

| 选择喜欢的代码风格  

先举个协程场景需求的例子:


当有一个逻辑,每次调用这个文件时,该文件要做3 件事:

  1. 写入 300 个文件
  2. 发送邮件给 500 个会员
  3. 插入 100 条数据

<?php
function task1(){
    for ($i=0;$i<=300;$i++){
        //写入文件,大概要3000微秒
        usleep(3000);
        echo "写入文件{$i}\n";
    }
}
function task2(){
    for ($i=0;$i<=500;$i++){
        //发送邮件给500名会员,大概3000微秒
        usleep(3000);
        echo "发送邮件{$i}\n";
    }
}
function task3(){
    for ($i=0;$i<=100;$i++){
        //模拟插入100条数据,大概3000微秒
        usleep(3000);
        echo "插入数据{$i}\n";
    }
}
task1();
task2();
task3();

这样,就实现了这3个功能了。然而,技术 TeamLeader 又说:

能不能改成交替运行呢?就是说:写入文件一次之后,马上去发送一次邮件,然后再去插入一条数据,于是改成下面这样:

<?php
function task1($i)
{
    //使用$i标识 写入文件,大概要3000微秒
    if ($i > 300) {
        return false;//超过300不用写了
    }
    echo "写入文件{$i}\n";
    usleep(3000);
    return true;
}
 
function task2($i)
{
    //使用$i标识 发送邮件,大概要3000微秒
    if ($i > 500) {
        return false;//超过500不用发送了
    }
    echo "发送邮件{$i}\n";
    usleep(3000);
    return true;
}
 
function task3($i)
{
    //使用$i标识 插入数据,大概要3000微秒
    if ($i > 100) {
        return false;//超过100不用插入
    }
    echo "插入数据{$i}\n";
    usleep(3000);
    return true;
}
 
$i           = 0;
$task1Result = true;
$task2Result = true;
$task3Result = true;
while (true) {
    $task1Result && $task1Result = task1($i);
    $task2Result && $task2Result = task2($i);
    $task3Result && $task3Result = task3($i);
    if($task1Result===false&&$task2Result===false&&$task3Result===false){
        break;//全部任务完成,退出循环
    }
    $i++;
}

确实是实现了任务交替执行,但是代码2明显让代码变的非常的难读,扩展性也很差,那么, TeamLeader 说有没有更好的方式实现这个功能呢?这时候我们就必须借助 yield 了。

/**
 * 首先,我们得封装一个任务类: 任务对象
 * Class Task
 */
class Task {
    protected $taskId;//任务id
    protected $coroutine;//生成器
    protected $sendValue = null;//生成器send值
    protected $beforeFirstYield = true;//迭代指针是否是第一个
 
    public function __construct($taskId, Generator $coroutine) {
        $this->taskId = $taskId;
        $this->coroutine = $coroutine;
    }
 
    public function getTaskId() {
        return $this->taskId;
    }
 
    /**
     * 设置插入数据
     * @param $sendValue
     */
    public function setSendValue($sendValue) {
        $this->sendValue = $sendValue;
    }
 
    /**
     * send数据进行迭代
     * @return mixed
     */
    public function run() {
        //如果是
        if ($this->beforeFirstYield) {
            $this->beforeFirstYield = false;
            var_dump($this->coroutine->current());
            return $this->coroutine->current();
        } else {
            $retval = $this->coroutine->send($this->sendValue);
            $this->sendValue = null;
            return $retval;
        }
    }
 
    /**
     * 是否完成
     * @return bool
     */
    public function isFinished() {
        return !$this->coroutine->valid();
    }
}

这个封装类,可以更好的去调用运行生成器函数,但只有这个也是不够的,我们还需要一个调度任务类,来代替前面的 while:

/**
 * 任务调度
 * Class Scheduler
 */
class Scheduler {
    protected $maxTaskId = 0;//任务id
    protected $taskMap = []; // taskId => task
    protected $taskQueue;//任务队列
 
    public function __construct() {
        $this->taskQueue = new SplQueue();
    }
 
    public function newTask(Generator $coroutine) {
        $tid = ++$this->maxTaskId;
        //新增任务
        $task = new Task($tid, $coroutine);
        $this->taskMap[$tid] = $task;
        $this->schedule($task);
        return $tid;
    }
 
    /**
     * 任务入列
     * @param Task $task
     */
    public function schedule(Task $task) {
        $this->taskQueue->enqueue($task);
    }
 
    public function run() {
        while (!$this->taskQueue->isEmpty()) {
            //任务出列进行遍历生成器数据
            $task = $this->taskQueue->dequeue();
            $task->run();
 
            if ($task->isFinished()) {
                //完成则删除该任务
                unset($this->taskMap[$task->getTaskId()]);
            } else {
                //继续入列
                $this->schedule($task);
            }
        }
    }
} 

我们已经有了一个调度类,还有了一个任务类,可以继续实现上面的功能了:

function task1()
{
    for ($i = 0; $i <= 300; $i++) {
        //写入文件,大概要3000微秒
        usleep(3000);
        echo "写入文件{$i}\n";
        yield $i;
    }
}
 
function task2()
{
    for ($i = 0; $i <= 500; $i++) {
        //发送邮件给500名会员,大概3000微秒
        usleep(3000);
        echo "发送邮件{$i}\n";
        yield $i;
    }
}
 
function task3()
{
    for ($i = 0; $i <= 100; $i++) {
        //模拟插入100条数据,大概3000微秒
        usleep(3000);
        echo "插入数据{$i}\n";
        yield $i;
    }
}

$scheduler = new Scheduler;
 
$scheduler->newTask(task1());
$scheduler->newTask(task2());
$scheduler->newTask(task3());
 
$scheduler->run();

我们已经实现了可以调度任务,进行任务交叉运行的功能了,这就是"协程"。协程可以将多个不同的任务交叉运行

协程与调度器的通信


我们在上面已经实现了一个协程封装了,但是任务和调度器缺少了通信。我们可以重新封装下:使协程当中能够获取当前的任务id,新增任务,以及杀死任务…

先封装一下调用的封装:

class YieldCall
{
    protected $callback;
 
    public function __construct(callable $callback)
    {
        $this->callback = $callback;
    }
 
    /**
     * 调用时将返回结果
     * @param Task $task
     * @param Scheduler $scheduler
     * @return mixed
     */
    public function __invoke(Task $task, Scheduler $scheduler)
    {
        $callback = $this->callback;
        return $callback($task, $scheduler);
    }
}

同时我们需要改动下调度器的 run 方法:

public function run()
{
    while (!$this->taskQueue->isEmpty()) {
        $task   = $this->taskQueue->dequeue();
        $retval = $task->run();
 
        //如果返回的是YieldCall实例,则先执行
        if ($retval instanceof YieldCall) {
            $retval($task, $this);
            continue;
        }
        if ($task->isFinished()) {
            unset($this->taskMap[$task->getTaskId()]);
        } else {
            $this->schedule($task);
        }
    }
}

新增 getTaskId 函数去返回 task_id

function getTaskId()
{
    //返回一个YieldCall的实例
    return new YieldCall(
        //该匿名函数会先获取任务id,然后send给生成器,并且由YieldCall将task_id返回给生成器函数
        function (Task $task, Scheduler $scheduler) {
            $task->setSendValue($task->getTaskId());
            $scheduler->schedule($task);
        }
    );
}

然后,我们再修改下 task1task2task3 函数:

function task1()
{
    $task_id = (yield getTaskId());
    for ($i = 0; $i <= 300; $i++) {
        //写入文件,大概要3000微秒
        usleep(3000);
        echo "任务{$task_id}写入文件{$i}\n";
        yield $i;
    }
}
 
function task2()
{
    $task_id = (yield getTaskId());
    for ($i = 0; $i <= 500; $i++) {
        //发送邮件给500名会员,大概3000微秒
        usleep(3000);
        echo "任务{$task_id}发送邮件{$i}\n";
        yield $i;
    }
}
 
function task3()
{
    $task_id = (yield getTaskId());
    for ($i = 0; $i <= 100; $i++) {
        //模拟插入100条数据,大概3000微秒
        usleep(3000);
        echo "任务{$task_id}插入数据{$i}\n";
        yield $i;
    }
}
 
$scheduler = new Scheduler;
 
$scheduler->newTask(task1());
$scheduler->newTask(task2());
$scheduler->newTask(task3());
 
$scheduler->run();

这样的话,当第一次执行的时候,会先调用 getTaskIdtask_id 返回,然后将任务继续执行。这样,我们就获取到了调度器分配给任务的 task_id,是不是很神奇?

生成新任务以及杀死任务


现在新增了一个需求:当发送邮件给会员时,需要新增一个发送短信的子任务,当会员 id 大于 200 时则停止 (别问我为什么要这样做,我自己都不知道)

同时,我们可以利用 YieldCall 去新增任务和杀死任务:

/**
 * 传入一个生成器函数用于新增任务给调度器调用
 * @param Generator $coroutine
 * @return YieldCall
 */
function newTask(Generator $coroutine) {
    return new YieldCall(
        //该匿名函数,会在调度器中新增一个任务
        function(Task $task, Scheduler $scheduler) use ($coroutine) {
            $task->setSendValue($scheduler->newTask($coroutine));
            $scheduler->schedule($task);
        }
    );
}
 
/**
 * 杀死一个任务
 * @param $tid
 * @return YieldCall
 */
function killTask($taskId) {
    return new YieldCall(
        //该匿名函数,传入一个任务id,然后让调度器去杀死该任务
        function(Task $task, Scheduler $scheduler) use ($taskId) {
            $task->setSendValue($scheduler->killTask($taskId));
            $scheduler->schedule($task);
        }
    );
}

同时,调度器也得有 killTask 的方法:

/**
 * 杀死一个任务
 * @param $taskId
 * @return bool
 */
public function killTask($taskId)
{
    if (!isset($this->taskMap[$taskId])) {
        return false;
    }
 
    unset($this->taskMap[$taskId]);
 
    /**
     * 遍历队列,找出id相同的则删除
     */
    foreach ($this->taskQueue as $i => $task) {
        if ($task->getTaskId() === $taskId) {
            unset($this->taskQueue[$i]);
            break;
        }
    }
 
    return true;
}

有了新增和删除,我们就可以重新写一下 task2 以及新增 task4:

function task4(){
    $task_id = (yield getTaskId());
    while (true) {
        echo "任务{$task_id}发送短信\n";
        yield;
    }
}
function task2()
{
    $task_id = (yield getTaskId());
    $child_task_id = (yield newTask(task4()));
    for ($i = 0; $i <= 500; $i++) {
        //发送邮件给500名会员,大概3000微秒
        usleep(3000);
        echo "任务{$task_id}发送邮件{$i}\n";
        yield $i;
        if($i==200){
           yield killTask($child_task_id);
        }
    }
}

这样我们就完美的实现了新增任务,以及杀死任务了。

PHP yield 协程扩展阅读:




发表评论