当有一个逻辑,每次调用这个文件时,该文件要做3 件事:
<?php function task1(){ for ($i=0;$i<=300;$i++){ //写入文件,大概要3000微秒 usleep(3000); echo "写入文件{$i}\n"; } } function task2(){ for ($i=0;$i<=500;$i++){ //发送邮件给500名会员,大概3000微秒 usleep(3000); echo "发送邮件{$i}\n"; } } function task3(){ for ($i=0;$i<=100;$i++){ //模拟插入100条数据,大概3000微秒 usleep(3000); echo "插入数据{$i}\n"; } } task1(); task2(); task3();
这样,就实现了这3个功能了。然而,技术 TeamLeader 又说:
能不能改成交替运行呢?就是说:写入文件一次之后,马上去发送一次邮件,然后再去插入一条数据,于是改成下面这样:
<?php function task1($i) { //使用$i标识 写入文件,大概要3000微秒 if ($i > 300) { return false;//超过300不用写了 } echo "写入文件{$i}\n"; usleep(3000); return true; } function task2($i) { //使用$i标识 发送邮件,大概要3000微秒 if ($i > 500) { return false;//超过500不用发送了 } echo "发送邮件{$i}\n"; usleep(3000); return true; } function task3($i) { //使用$i标识 插入数据,大概要3000微秒 if ($i > 100) { return false;//超过100不用插入 } echo "插入数据{$i}\n"; usleep(3000); return true; } $i = 0; $task1Result = true; $task2Result = true; $task3Result = true; while (true) { $task1Result && $task1Result = task1($i); $task2Result && $task2Result = task2($i); $task3Result && $task3Result = task3($i); if($task1Result===false&&$task2Result===false&&$task3Result===false){ break;//全部任务完成,退出循环 } $i++; }
确实是实现了任务交替执行,但是代码2明显让代码变的非常的难读,扩展性也很差,那么, TeamLeader 说有没有更好的方式实现这个功能呢?这时候我们就必须借助 yield
了。
/** * 首先,我们得封装一个任务类: 任务对象 * Class Task */ class Task { protected $taskId;//任务id protected $coroutine;//生成器 protected $sendValue = null;//生成器send值 protected $beforeFirstYield = true;//迭代指针是否是第一个 public function __construct($taskId, Generator $coroutine) { $this->taskId = $taskId; $this->coroutine = $coroutine; } public function getTaskId() { return $this->taskId; } /** * 设置插入数据 * @param $sendValue */ public function setSendValue($sendValue) { $this->sendValue = $sendValue; } /** * send数据进行迭代 * @return mixed */ public function run() { //如果是 if ($this->beforeFirstYield) { $this->beforeFirstYield = false; var_dump($this->coroutine->current()); return $this->coroutine->current(); } else { $retval = $this->coroutine->send($this->sendValue); $this->sendValue = null; return $retval; } } /** * 是否完成 * @return bool */ public function isFinished() { return !$this->coroutine->valid(); } }
这个封装类,可以更好的去调用运行生成器函数,但只有这个也是不够的,我们还需要一个调度任务类,来代替前面的 while
:
/** * 任务调度 * Class Scheduler */ class Scheduler { protected $maxTaskId = 0;//任务id protected $taskMap = []; // taskId => task protected $taskQueue;//任务队列 public function __construct() { $this->taskQueue = new SplQueue(); } public function newTask(Generator $coroutine) { $tid = ++$this->maxTaskId; //新增任务 $task = new Task($tid, $coroutine); $this->taskMap[$tid] = $task; $this->schedule($task); return $tid; } /** * 任务入列 * @param Task $task */ public function schedule(Task $task) { $this->taskQueue->enqueue($task); } public function run() { while (!$this->taskQueue->isEmpty()) { //任务出列进行遍历生成器数据 $task = $this->taskQueue->dequeue(); $task->run(); if ($task->isFinished()) { //完成则删除该任务 unset($this->taskMap[$task->getTaskId()]); } else { //继续入列 $this->schedule($task); } } } }
我们已经有了一个调度类,还有了一个任务类,可以继续实现上面的功能了:
function task1() { for ($i = 0; $i <= 300; $i++) { //写入文件,大概要3000微秒 usleep(3000); echo "写入文件{$i}\n"; yield $i; } } function task2() { for ($i = 0; $i <= 500; $i++) { //发送邮件给500名会员,大概3000微秒 usleep(3000); echo "发送邮件{$i}\n"; yield $i; } } function task3() { for ($i = 0; $i <= 100; $i++) { //模拟插入100条数据,大概3000微秒 usleep(3000); echo "插入数据{$i}\n"; yield $i; } } $scheduler = new Scheduler; $scheduler->newTask(task1()); $scheduler->newTask(task2()); $scheduler->newTask(task3()); $scheduler->run();
我们已经实现了可以调度任务,进行任务交叉运行的功能了,这就是"协程"。协程可以将多个不同的任务交叉运行
我们在上面已经实现了一个协程封装了,但是任务和调度器缺少了通信。我们可以重新封装下:使协程当中能够获取当前的任务id,新增任务,以及杀死任务…
先封装一下调用的封装:
class YieldCall { protected $callback; public function __construct(callable $callback) { $this->callback = $callback; } /** * 调用时将返回结果 * @param Task $task * @param Scheduler $scheduler * @return mixed */ public function __invoke(Task $task, Scheduler $scheduler) { $callback = $this->callback; return $callback($task, $scheduler); } }
同时我们需要改动下调度器的 run
方法:
public function run() { while (!$this->taskQueue->isEmpty()) { $task = $this->taskQueue->dequeue(); $retval = $task->run(); //如果返回的是YieldCall实例,则先执行 if ($retval instanceof YieldCall) { $retval($task, $this); continue; } if ($task->isFinished()) { unset($this->taskMap[$task->getTaskId()]); } else { $this->schedule($task); } } }
新增 getTaskId
函数去返回 task_id
:
function getTaskId() { //返回一个YieldCall的实例 return new YieldCall( //该匿名函数会先获取任务id,然后send给生成器,并且由YieldCall将task_id返回给生成器函数 function (Task $task, Scheduler $scheduler) { $task->setSendValue($task->getTaskId()); $scheduler->schedule($task); } ); }
然后,我们再修改下 task1
、task2
、task3
函数:
function task1() { $task_id = (yield getTaskId()); for ($i = 0; $i <= 300; $i++) { //写入文件,大概要3000微秒 usleep(3000); echo "任务{$task_id}写入文件{$i}\n"; yield $i; } } function task2() { $task_id = (yield getTaskId()); for ($i = 0; $i <= 500; $i++) { //发送邮件给500名会员,大概3000微秒 usleep(3000); echo "任务{$task_id}发送邮件{$i}\n"; yield $i; } } function task3() { $task_id = (yield getTaskId()); for ($i = 0; $i <= 100; $i++) { //模拟插入100条数据,大概3000微秒 usleep(3000); echo "任务{$task_id}插入数据{$i}\n"; yield $i; } } $scheduler = new Scheduler; $scheduler->newTask(task1()); $scheduler->newTask(task2()); $scheduler->newTask(task3()); $scheduler->run();
这样的话,当第一次执行的时候,会先调用 getTaskId
将 task_id
返回,然后将任务继续执行。这样,我们就获取到了调度器分配给任务的 task_id
,是不是很神奇?
现在新增了一个需求:当发送邮件给会员时,需要新增一个发送短信的子任务,当会员 id 大于 200 时则停止 (别问我为什么要这样做,我自己都不知道)
同时,我们可以利用 YieldCall
去新增任务和杀死任务:
/** * 传入一个生成器函数用于新增任务给调度器调用 * @param Generator $coroutine * @return YieldCall */ function newTask(Generator $coroutine) { return new YieldCall( //该匿名函数,会在调度器中新增一个任务 function(Task $task, Scheduler $scheduler) use ($coroutine) { $task->setSendValue($scheduler->newTask($coroutine)); $scheduler->schedule($task); } ); } /** * 杀死一个任务 * @param $tid * @return YieldCall */ function killTask($taskId) { return new YieldCall( //该匿名函数,传入一个任务id,然后让调度器去杀死该任务 function(Task $task, Scheduler $scheduler) use ($taskId) { $task->setSendValue($scheduler->killTask($taskId)); $scheduler->schedule($task); } ); }
同时,调度器也得有 killTask
的方法:
/** * 杀死一个任务 * @param $taskId * @return bool */ public function killTask($taskId) { if (!isset($this->taskMap[$taskId])) { return false; } unset($this->taskMap[$taskId]); /** * 遍历队列,找出id相同的则删除 */ foreach ($this->taskQueue as $i => $task) { if ($task->getTaskId() === $taskId) { unset($this->taskQueue[$i]); break; } } return true; }
有了新增和删除,我们就可以重新写一下 task2
以及新增 task4
:
function task4(){ $task_id = (yield getTaskId()); while (true) { echo "任务{$task_id}发送短信\n"; yield; } } function task2() { $task_id = (yield getTaskId()); $child_task_id = (yield newTask(task4())); for ($i = 0; $i <= 500; $i++) { //发送邮件给500名会员,大概3000微秒 usleep(3000); echo "任务{$task_id}发送邮件{$i}\n"; yield $i; if($i==200){ yield killTask($child_task_id); } } }
这样我们就完美的实现了新增任务,以及杀死任务了。