Loading... # [PHP中的协程](https://blog.p2hp.com/archives/4269) 之前学习Lua的时候第一次接触到了协程(coroutine)的概念。而PHP5.5版本中也加入了协程的概念,从此PHP编程又有了新的思路和玩法。这里学习一下PHP中协程的相关概念的使用方法。 分成上下两篇文章吧,这篇主要讲一下基础概念。 协程是什么? 在以前的Lua学习笔记三中可以看到,协程与多线程的比较,有自己的堆栈、局部变量、指令指针等,但是协程本身与其他协程共享全局变量。主要不同在于,多处理器下,多线程可以真实的同时运行多个线程。而协程任意时刻只能有一个在真实运行,并且只有在明确要求被挂起时才会挂起。 PHP中协程如何理解? 这里引用知乎赵老师的答案,说的比较好理解。具体来说,一个包含yeild的php函数,就是协程,他有阶段性的结算值 yield $var, 但是代码并不返回,php的调度者接到这个值后,喂给一个generator,generator是个实现了iterator接口的+和协程通讯接口(比如send方法)的实例,所以可以用在for循环里(另个接口负责和协程通讯)。那么generator收到了这个协程的阶段性的值后,他喂给for循环,等for循环下一次循环的时候,他又启动这个协程,协程从上次中断的点继续执行,继续计算,继续yeild值给generator,generator喂for循环,继续循环,直到协程执行完毕。 相关函数 final class Generator implements Iterator { public function rewind(); // 返回到迭代器的第一个元素。 public function valid(); // 返回false如果迭代器已经关闭,否则返回true public function current(); // 返回当前yield值. public function key(); // 返回当前yield键名. public function next(); // 恢复生成器的执行。 public function PS_UNRESERVE_PREFIX_throw(Exception $exception) {};//抛出异常 public function send($value); // 将传入的值作为yield表达式的结果并且恢复发生器的执行。 } 简单例子 简单的迭代器给foreach使用 function my_range($start, $end, $step = 1) { for ($i = $start; $i <= $end; $i += $step) { yield $i; } } foreach (my_range(1, 5) as $num) { echo $num; } //output 12345 带send可以交互的例子 function gen() { $ret = (yield 'a'); echo $ret; $ret = (yield 'b'); echo $ret; } $gen = gen(); $ret = $gen->current(); echo $ret; $ret = $gen->send("c"); echo $ret; $ret = $gen->send("d"); echo $ret; //output acbd 带抛出异常的例子 function gen() { try{ $ret = (yield 'a'); echo $ret; $ret = (yield 'b'); echo $ret; } catch (Exception $ex) { echo $ex->getMessage(); } } $gen = gen(); $ret = $gen->current(); echo $ret; $ret = $gen->send("c"); echo $ret; $ret = $gen->throw(new Exception("d")); var_dump($ret); //output acbdNULL 那么能用来干什么呢? 我们来看看,协程可以自己主动出让执行权,把不需要抢占的操作时间(比如socket等待链接)让出来,并且可以和调用方通过yield的方式传递信息。显而易见,他可以用来做多任务调度! PHP中协程实现多任务调度,鸟哥有一篇翻译的文章里有讲解,网上能找到的大部分资料,都跟这篇相关。但是至少在我看来,理解起来还是蛮复杂的。这里针对那篇文章的前半部分做一个笔记,忽略后面关于独立堆栈协程的部分。 function里使用yield关键字,将生成迭代器。这样调用functionName()时,其实得到的是一个迭代器对象,而并没有实际运行程序。 为什么要走系统调用SystemCall这一层呢?模拟进程和系统会话的方式,控制权限。通过给yield表达式传递信息来与调度器通信,yield既是中断也是传递给调度器的方式。 SystemCall 包含一个回调函数,他自己本身可以被执行。被执行时实际上是调用了这个回调函数,入参是某个task和调度器。 SystemCall其实并没有其他作用,只是在协程函数里面跟在yield后面传给调度器来执行。 注意SplQueue塞进去的对象其实是引用(PHP里对象入参都是引用,不只是SplQueue)!外面对象改了,里面也会变。 为什么忽略协程堆栈? 我打算在第二篇文章中,把有赞的zan framework里关于协程的部分抽出来,针对性的说一下包含子协程额多任务调度。当然主要想偷个懒。 不过个人感觉zan框架里的协程部分,比之前说的那篇文章要好理解一些。 上篇文章里提到PHP中协程的引入,可以使PHP编程有新的玩法,不在遵循原本顺序执行的思路,从而应对大访问量和并发操作。 有赞的zan framework就是基于PHP协程的,提供最简单的方式开发面向C10K+的高并发HTTP服务或SOA服务。我并没有深入的学习这个框架,这里只打算把关于协程的部分抽出来学习一下。 zan框架高并发设计思路 粗看之下(不一定对哦),框架应该是用swoole_server + 协程解决高并发访问。 比如Web服务中,swoole的http_server只开启了少数的几个worker进程。我们知道,如果worker的onRequest里使用的是异步方法,则worker的响应是异步处理的,反之则是阻塞的。 zan框架在worker进程中大量使用了PHP协程,所以一个worker进程可以响应很多并发的请求(但是本质上正在执行的只有一个),这应该就是能过达到C10K+的原因吧。 并且协程相对于回调的方式,在PHPer看来更容易接受吧。另外,框架设计里还使用了middleware、连接池、依赖注入等等比较现代的设计,感觉可以更深入的学习一下。 一个类一个类来看 我们把框架里关于协程的部分拆出来看,下面一个类一个类的分析。与鸟哥博客里那个文章的实现相比,有一些相同的地方,更多的是不同。比如那篇文章里,多个任务放到一个schedule里调度,对于后面的实现就比较繁琐。 这里把框架里的代码抽出来,并进行了一定删减,去掉了与其他业务强相关的东西。比如Event、Context、Async等等。实际上Async用于处理MySQL查询的返回值的,框架内部将MySQL的具体操作类封装成了Async的子类,并且yield给调度器来用。 Singal类 Singal类里包含了系统调用所需的信号量。指明了协程在一轮运行之后应该处于的状态。 class Signal { const TASK_SLEEP = 1; const TASK_AWAKE = 2; const TASK_CONTINUE = 3; const TASK_KILLED = 4; const TASK_RUNNING = 5; const TASK_WAIT = 6; const TASK_DONE = 7; public static function isSignal($signal) { if(!$signal) { return false; } if (!is_int($signal)) { return false; } if($signal < 1 ) { return false; } if($signal > 7) { return false; } return true; } } Task类 Task包装了具体协程函数,并提供相应的get set方法。与网上流行的那篇文章(以下简称那文)不同的是,我们的scheduler是内置于Task里的,在run方法里实现具体的调度。 这里我们省略了Context,并且让taskId自增。 class Task { protected $taskId = 0; protected $parentId = 0; protected $coroutine = null; //这里忽略了context 保存的是当前http请求的相关信息,可以通过系统调用的方式操作 protected $context = null; protected $sendValue = null; protected $scheduler = null; protected $status = 0; public function __construct(Generator $coroutine, $taskId = 0, $parentId = 0) { $this->coroutine = $coroutine; if(isset($GLOBALS['stTaskId']) && $taskId == 0){ global $stTaskId; $taskId = $stTaskId ++; } $this->taskId = $taskId; $this->parentId = $parentId; $this->scheduler = new Scheduler($this); } /** * 静态方法调用 * @param $coroutine * @param int $taskId * @param int $parentId * @return Task */ public static function execute($coroutine, $taskId = 0, $parentId = 0) { if ($coroutine instanceof Generator) { if(isset($GLOBALS['stTaskId']) && $taskId == 0){ global $stTaskId; $taskId = $stTaskId ++; } $task = new Task($coroutine, $taskId, $parentId); $task->run(); return $task; } return $coroutine; } public function run() { while (true) { try { if ($this->status == Signal::TASK_KILLED){ $this->fireTaskDoneEvent(); break; } $this->status = $this->scheduler->schedule(); //以下几种状态表示信号量,实际上已经从while里跳出来了。如果需要继续的话,会在其他地方重启。 switch ($this->status) { case Signal::TASK_KILLED: case Signal::TASK_SLEEP: case Signal::TASK_WAIT: return null; case Signal::TASK_DONE: $this->fireTaskDoneEvent(); return null; } } catch (Exception $e) { $this->scheduler->throwException($e); } } } public function send($value) { $this->sendValue = $value; return $this->coroutine->send($value); } public function getTaskId() { return $this->taskId; } public function getContext() { return $this->context; } public function getSendValue() { return $this->sendValue; } public function getResult() { return $this->sendValue; } public function getStatus() { return $this->status; } public function setStatus($signal) { $this->status = $signal; } public function getCoroutine() { return $this->coroutine; } public function setCoroutine(Generator $coroutine) { $this->coroutine = $coroutine; } public function fireTaskDoneEvent() { echo "Task done $this->taskId\n"; } } Scheduler类 scheduler类负责: 获取Task里的协程函数跑完一轮的返回值 根据返回值的类型采取不同的处理方式,如系统调用、子协程、普通yield值、检查协程栈等等。 在子协程的调用过程中,负责父子协程的进栈出栈,yield值的传递等等。 class Scheduler { private $task = null; private $stack = null; public function __construct(Task $task) { $this->task = $task; $this->stack = new SplStack(); } public function schedule() { $coroutine = $this->task->getCoroutine(); $value = $coroutine->current(); $signal = $this->handleSysCall($value); if ($signal !== null) return $signal; $signal = $this->handleCoroutine($value); if ($signal !== null) return $signal; $signal = $this->handleYieldValue($value); if ($signal !== null) return $signal; $signal = $this->handleTaskStack($value); if ($signal !== null) return $signal; $signal = $this->checkTaskDone($value); if ($signal !== null) return $signal; return Signal::TASK_DONE; } public function isStackEmpty() { return $this->stack->isEmpty(); } public function throwException($e, $isFirstCall = false) { if ($this->isStackEmpty()) { $this->task->getCoroutine()->throw($e); return; } try{ if ($isFirstCall) { $coroutine = $this->task->getCoroutine(); } else { $coroutine = $this->stack->pop(); } $this->task->setCoroutine($coroutine); $coroutine->throw($e); $this->task->run(); }catch (Exception $e){ $this->throwException($e); } } /** * 处理系统调用 * @param $value * @return mixed|null */ private function handleSysCall($value) { if (!($value instanceof SysCall) && !is_subclass_of($value, SysCall::class) ) { return null; } echo $this->task->getTaskId()."| SYSCALL\n"; //走系统调用 实际上因为__invoke 走的是 $value($this->task); $signal = call_user_func($value, $this->task); if (Signal::isSignal($signal)) { return $signal; } return null; } /** * 处理子协程 * @param $value * @return int|null */ private function handleCoroutine($value) { if (!($value instanceof Generator)) { return null; } echo $this->task->getTaskId()."| COROUTINE\n"; //获取当前的协程 入栈 $coroutine = $this->task->getCoroutine(); $this->stack->push($coroutine); //将新的协程设为当前的协程 $this->task->setCoroutine($value); return Signal::TASK_CONTINUE; } /** * 处理协程栈 * @param $value * @return int|null */ private function handleTaskStack($value) { //能够跑到这里说明当前协程已经跑完了 valid()==false了 需要看下栈里是否还有以前的协程 if ($this->isStackEmpty()) { return null; } echo $this->task->getTaskId()."| TASKSTACK\n"; //出栈 设置为当前运行的协程 $coroutine = $this->stack->pop(); $this->task->setCoroutine($coroutine); //这个sendvalue可能是从刚跑完的协程那里得到的 把它当做send值传给老协程 让他继续跑 $value = $this->task->getSendValue(); $this->task->send($value); return Signal::TASK_CONTINUE; } /** * 处理普通的yield值 * @param $value * @return int|null */ private function handleYieldValue($value) { $coroutine = $this->task->getCoroutine(); if (!$coroutine->valid()) { return null; } // if($this->task->getTaskId() == 2){ // // }else{ echo $this->task->getTaskId()."| YIELD VALUE\n"; // } //如果协程后面没有yield了 这里发出send以后valid就变成false了 并且current变成NULL $status = $this->task->send($value); return Signal::TASK_CONTINUE; } private function checkTaskDone($value) { $coroutine = $this->task->getCoroutine(); if ($coroutine->valid()) { return null; } echo $this->task->getTaskId()."| CHECKDONE\n"; return Signal::TASK_DONE; } } SysCall类 与那文的思路相同,系统调用类一般作为yield后面跟着的值吐给外层的调用方来执行,并且可能返回响应的信号量,标识这个Task是继续运行还是进入等待状态中。 不同的是这里的__invoke入参不需要Scheduler。 class SysCall { protected $callback = null; public function __construct(\Closure $callback) { $this->callback = $callback; } public function __invoke(Task $task) { return call_user_func($this->callback, $task); } } 组装起来! 基本的组件就是上面的几个类了,下面举一些实际的例子,说明如何利用这几个看似简陋的组件来搞大新闻。 延迟执行任务 function taskSleep($ms) { return new SysCall(function (Task $task) use ($ms) { swoole_timer_after($ms, function() use($task){ $task->send("this is send value in sleep function."); $task->run(); }); return Signal::TASK_SLEEP; }); } function delay(){ yield taskSleep(2000); } function gen(){ echo "gen1\n"; yield 1; echo "gen2\n"; yield 2; echo "gen3\n"; yield 3; } //Task::execute(delay(), 1); 亦可 (new Task(delay(), 1))->run(); (new Task(gen(), 2))->run(); /** output 1| SYSCALL gen1 2| YIELD VALUE gen2 2| YIELD VALUE gen3 2| YIELD VALUE 2| CHECKDONE Task done 2 //2秒以后// 1| CHECKDONE Task done 1 **/ taskSleep是个系统调用,告诉调度器我要睡眠了(传递给他一个Signal::TASK_SLEEP)。具体说明时候唤醒呢,要等swoole_timer_after2秒以后将它唤醒。 我们这里同时跑了两个任务,从输出来看第一个任务的延时执行,并不会阻塞第二个任务。可以清楚地看到,我们的协程是可以实现多任务并行处理的(当然实际上并不是并行)。 独立堆栈的子协程 function justReturnValue(){ yield (delay()); yield 'yield value 2'; } function gen2(){ $ret1 = (yield "yield value 1"); echo "[ret] $ret1\n"; $ret2 = (yield justReturnValue()); echo "[ret] $ret2\n"; } (new Task(gen2(), 1))->run(); /** output 1| YIELD VALUE [ret] yield value 1 1| COROUTINE 1| COROUTINE 1| SYSCALL // 2秒以后 // 1| TASKSTACK 1| YIELD VALUE 1| TASKSTACK [ret] yield value 2 1| CHECKDONE Task done 1 **/ gen2里有一个子协程justReturnValue的调用,而justReturnValue里也有delay的子协程调用。通过输出可以清楚的看到,父子协程进栈出栈的顺序,以及出栈的协程会将吐出来的值交给原先的协程。 实现一个非阻塞IO的Web服务 参照那文里的实现,我们也可以写一个自己的Web服务。首先还是来说明一下要做什么,以及思路。 直接引用那文的说法: 有一个任务是在套接字上侦听是否有新连接,当有新连接要建立的时候,它创建一个新任务来处理新连接。 Web服务器最难的部分通常是像读数据这样的套接字操作是阻塞的。例如PHP将等待到客户端完成发送为止。对一个Web服务器来说,这有点不太高效。因为服务器在一个时间点上只能处理一个连接。 解决方案是确保在真正对套接字读写之前该套接字已经“准备就绪”. 为了查找哪个套接字已经准备好读或者写了, 可以使用 流选择函数 传统的做法中,创建一个套接字,等待新连接,然后读取、发送、关闭。这些都是阻塞的,会花时间在这些抢占资源的步骤上。如果我们使用协程的方式,可以先将等待操作的任务yield掉,之后结合stream_select方法,选择出可以继续操作的任务将其resume。 通俗的说,可以理解为大家一起挤公交车,原先必须一个一个上,但是上车以后要找公交卡,要刷卡或者投币,操作完了以后下一个乘客才能继续。如果使用协程的话,第一个乘客A上车以后,挂起到一边找公交卡,不影响第二个乘客B上车。等到A掏出公交卡以后,直接插队刷卡上车。虽然还是一个一个排队上车,但是找卡的时间里其他乘客不会干等了。 socket的状态 首先定义2个全局数组保存所有用到的socket。再定义2个系统调用将socket设置进数组里,并且返回等待信号量让Task挂起。 $waitingForRead = []; $waitingForWrite = []; function waitForRead($socket) { return new SysCall( function(Task $task) use ($socket) { global $waitingForRead; if (isset($waitingForRead[(int) $socket])) { $waitingForRead[(int) $socket][1][] = $task; } else { $waitingForRead[(int) $socket] = [$socket, [$task]]; } //设置完了不让他往下走 return Signal::TASK_WAIT; } ); } function waitForWrite($socket) { return new SysCall( function(Task $task) use ($socket) { global $waitingForWrite; if (isset($waitingForWrite[(int) $socket])) { $waitingForWrite[(int) $socket][1][] = $task; } else { $waitingForWrite[(int) $socket] = [$socket, [$task]]; } //设置完了不让他往下走 return Signal::TASK_WAIT; } ); } 选择可以操作的socket 注册一个任务,不断检查我们的全局数组,直到有socket就绪了,将其对应的任务唤醒。 function ioPoll($timeout) { global $waitingForRead; global $waitingForWrite; $rSocks = []; foreach ($waitingForRead as list($socket)) { $rSocks[] = $socket; } $wSocks = []; foreach ($waitingForWrite as list($socket)) { $wSocks[] = $socket; } $eSocks = []; // dummy //stream_select 方法会直接修改入参 只保留就绪的socket数组 if (false === stream_select($rSocks, $wSocks, $eSocks, $timeout)) { return; } foreach ($rSocks as $socket) { list(, $tasks) = $waitingForRead[(int) $socket]; unset($waitingForRead[(int) $socket]); foreach ($tasks as $task) { $task->send("ready for read"); $task->run(); } } foreach ($wSocks as $socket) { list(, $tasks) = $waitingForWrite[(int) $socket]; unset($waitingForWrite[(int) $socket]); foreach ($tasks as $task) { $task->send("ready for write"); $task->run(); } } } function ioPollTask() { global $waitingForRead; global $waitingForWrite; while (true) { if(count($waitingForRead) <=1 && count($waitingForWrite) <=1){ //如果等待检查的socket只有1个 则用阻塞的方式等待 ioPoll(null); }else{ //否则设为0超时 ioPoll(0); } yield; } } 封装socket 将socket封装一下,定义了必须的4个方法。 class CoSocket { protected $socket; public function __construct($socket) { $this->socket = $socket; } public function accept() { //等待本socket就绪 yield waitForRead($this->socket); //就绪以后会继续走到这里 返回给外层一个客户端连接socket yield stream_socket_accept($this->socket, 0); } public function read($size) { //等待本socket就绪 yield waitForRead($this->socket); //就绪以后回把读取到的内容 返回给外层 yield fread($this->socket, $size); } public function write($string) { //等待本socket就绪 yield waitForWrite($this->socket); //就绪以后把响应写给客户端 fwrite($this->socket, $string); } public function close() { @fclose($this->socket); } } 处理客户端新连接 服务端socket接受到新的连接以后,创建新的任务。下面是这个任务里实际运行的协程。 function handleClient(CoSocket $socket) { $data = (yield $socket->read(8192)); $msg = "Received following request:\n\n$data"; $msgLength = strlen($msg); //响应报文由状态行(HTTP版本、状态码)+HTTP首部字段(响应首部字段、通用首部字段、实体首部字段)组成。 //空行(CR+LF)分隔首部与报文主体。所以这里留个空行在打印$msg $response = <<<RES HTTP/1.1 200 OK\r Content-Type: text/plain\r Content-Length: $msgLength\r Connection: close\r \r $msg RES; yield $socket->write($response); yield $socket->close(); } 开启服务! 直接创建一个包含server协程的任务,和一个不断刷新stream_select的任务。之后的流程都交给ioPollTask来调度了。 //定义全局的taskId自增用 static $stTaskId = 1; function server($port){ echo "Starting server at port $port...\n"; //这里抛出的异常会被scheduler和task抛来抛去 最后还是到这里catch一下 try{ $socket = @stream_socket_server("tcp://localhost:$port", $errNo, $errStr); if (!$socket) throw new Exception($errStr, $errNo); //设置为读写非阻塞 stream_set_blocking($socket, 0); $socket = new CoSocket($socket); while (true) { $clientSocket = (yield $socket->accept()); $clientCoSocket = new CoSocket($clientSocket); //为新的链接创建Task Task::execute(handleClient($clientCoSocket)); } }catch (Exception $e){ echo $e->getMessage(); } } //创建服务端socket的task 1 Task::execute(server(8000)); //不断刷新socket_select的task 2 Task::execute(ioPollTask()); 运行效果 开启服务后,我们先直接用curl访问,观测一下得到的结果。 ➜ ~ curl -d "a=123&b=456" http://localhost:8000 Received following request: POST / HTTP/1.1 Host: localhost:8000 User-Agent: curl/7.51.0 Accept: */* Content-Length: 11 Content-Type: application/x-www-form-urlencoded a=123&b=456 可以看到服务端吐出了我们发送给他的信息,包括HTTP请求行、首部字段和正文。如果我们在浏览器里访问的话,正文内容会丰富许多,会有Cookie,UA等等,如下: Received following request: GET / HTTP/1.1 Host: localhost:8000 Connection: keep-alive Upgrade-Insecure-Requests: 1 User-Agent: Mozilla/5.0 (Macintosh; Intel Mac OS X 10_12_3) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/56.0.2924.87 Safari/537.36 Accept: text/html,application/xhtml+xml,application/xml;q=0.9,image/webp,*/*;q=0.8 DontTrackMeHere: gzip, deflate, sdch, br Accept-Language: zh-CN,zh;q=0.8,en;q=0.6,zh-TW;q=0.4 Cookie: Phpstorm-f86ac615=34137ba0-5113-4922-b809-b6fa20dbf937 不足的地方 zan framework里的协程调度,并没有采用任务队列的方式。可能是因为他只是针对单独的http或者tcp请求来设计的吧,一般是链式调用。由于这个原因,所以没法设置具体某个任务的执行顺序。当然实际效果跟那文里是相同的,说到底还是由ioPollTask来驱动。 小结 我们通过几个例子加深了对PHP中协程用法的理解。需要注意的是,在协程中(本文构造的这种结构)我们要避免使用死循环,除非循环里yield的结果可以将其挂起并出让控制权给其他协程。 比如上文的Web服务器中,因为有不能主动挂起的ioPollTask,所以不能实现在响应时延迟几秒的效果。因为即使使用了taskSleep这种系统调用,也会因为ioPollTask死循环导致不能获取控制权无法执行 最后修改:2023 年 08 月 10 日 © 允许规范转载 赞 如果觉得我的文章对你有用,请随意赞赏