理解生成器

参考官方文档:Generators

生成器让我们快速、简单地实现一个迭代器,而不需要创建一个实现了Iterator接口的类后,再实例化出一个对象。

一个生成器长什么样?如下

1

2

3

4

5

6<?php

function foo() {

……

yield [$someValue];

……

}

与一般函数的区别在于:

它不能return $notNULLValue(不能有,会报语法错误= =PHP Fatal error: Generators cannot return values using "return"),但可以是return;(相当于return NULL;其实当一个函数没有明确进行return时,PHP会自动为函数加入return;)

必须含有yield关键字(当生成器执行的时候,每次执行到yield都会中断,并且将$someValue作为返回值,如果有的话,没有则是返回NULL)。yield的具体语法见:Generator syntax

它会被转换为Generator类的一个对象

生成器是如何执行的?

先看看Generator这个类:

1

2

3

4

5

6

7

8

9

10

11

12<?php

Generator implements Iterator {//实现了Iterator接口,可被foreach迭代访问

/* Methods */

public mixed current ( void ) //获取yielded value

public mixed key ( void ) //获取yielded key

public void next ( void ) //从上一次断点之后继续执行

public void rewind ( void ) //重置迭代

public mixed send ( mixed $value ) //向生成器中传入$value后,从上一次断点之后继续执行

public mixed throw ( Exception $exception ) //向生成器中抛入一个异常,从上一次断点之后继续执行

public bool valid ( void ) //检查迭代是否结束了

public void __wakeup ( void ) //当序列化Generator对象时抛出异常,即Generator对象不能进行序列化

}

生成器的执行是在调用next(), send(), throw()时就会执行一次。

对于rewind()方法,当生成器执行过时,调用该方法将会抛出异常,其文档中的评论有人提到rewind()方法存在的原因可能只是为了兼容Iterator接口= =。

另一个需要注意的是调用一个Generator对象的方法时,都会首先判断Generator对象是否初始化了(这里不是指调用构造函数,而是指Generator对象是否执行过第一次了),没有的话就初始化:

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15<?php

function gen() {

$a = 'foo';

$b = (yield $a);//这里其实可以看做两句语句的执行:return $a; $b = $sendedValue;

var_dump($b);

yield 'bar';

}

$gen = gen();

var_dump($gen->current());

var_dump($gen->send('something'));

//output:

//string(3) "foo"

//string(9) "something"

//string(3) "bar"

从Generator的源码就可以发现这一点(Zend/zend_generators.c):

1

2

3

4

5

6

7

8

9

10

11

12

13

14<?php

ZEND_METHOD(Generator, current)

{

……

zend_generator_ensure_initialized(generator TSRMLS_CC);//Generator每个方法中都会调用该函数

……

}

static void zend_generator_ensure_initialized(zend_generator *generator TSRMLS_DC)

{

if (generator->execute_data && !generator->value) {//如果当前的value为null,则让Generator执行一次,即执行到第一个yield处中断

zend_generator_resume(generator TSRMLS_CC);//执行Generator一次

generator->flags |= ZEND_GENERATOR_AT_FIRST_YIELD;

}

}

那么生成器执行的生命周期如下图所示:

一个generator可以看成由多个yield组成,每一次执行都会执行到下一个yield中断并返回产生的值或结束。

其实对于Generator我有两个疑惑:

为何zend_generator_ensure_initialized(generator TSRMLS_CC)不在构造函数中执行,而是放到每个方法中;

为何Generator执行过后,调用rewind()方法无法重置迭代,是实现不了么= =。

一个关于生成器使用的栗子

如果不用生成器,代码是这样子:

1

2

3

4

5<?php

foreach (range(1, 10, 1) as $number) {

echo "$number";

}

//output: 1 2 3 4 5 6 7 8 9 10

如果用生成器,代码是这样子:

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24<?php

function xrange($start, $limit, $step = 1) {

if ($start < $limit) {

if ($step <= 0) {

throw new LogicException('Step must be +ve');

}

for ($i = $start; $i <= $limit; $i += $step) {

yield $i;

}

} else {

if ($step >= 0) {

throw new LogicException('Step must be -ve');

}

for ($i = $start; $i >= $limit; $i += $step) {

yield $i;

}

}

}

foreach (xrange(1, 10, 1) as $number) { //生成器是一个Generator对象,继承了Iterator

echo "$number";

}

//output: 1 2 3 4 5 6 7 8 9 10

用了生成器代码量反而翻倍了,其实range()的调用会产生一个数组返回,如果产生一个超大数组的话,那么就会占用掉许多内存。而使用生成器的话就不会产生一个超大数组,生成器每次运行到yield $i的时候就会中断执行,并且返回$i的值。也许有人会说,我这里用一个for循环就搞定了呀!是的,对于上面简单的逻辑是可行的,但如果需要比较复杂的逻辑的话,又需要复用,就可以封装到Generator中了。

理解协程

维基百科对于协程的定义:Coroutine

协程是作为程序的一个组件,通过允许多个入口点挂起和恢复执行,产生非抢占的多任务处理子例程。

个人理解:就像操作系统中任务是由进程去做的,操作系统负责对它们进行调度。而在一个进程中,如果要处理多个任务,可以交给协程去处理,并由进程进行调度,之所以说是非抢占式是因为协程的终止需要当前执行的协程主动交出CPU的使用权,多个协程是不断地交替串行执行的。

PHP的Generator符合成为协程的要求:允许多个入口点挂起和恢复执行,一个Generator对象可以看作为一个协程。

那么在PHP中要使用协程进行多任务的处理,就还差一个调度器。

鸟哥有一篇文章是讲有关协程的:在PHP中使用协程实现多任务调度(文章的最原始出处应该是Cooperative multitasking using coroutines (in PHP!)) 文中就讲解了如何实现一个调度器,并配合非阻塞IO实现一个简单的回显web服务器。

一开始看鸟哥的这篇文章有很多模糊不清的地方(所以才有了此文= =),下文内容将基于鸟哥的文章,再加上自己的理解阐述PHP的协程,画图思路参考了一篇内部文章:TSF高性能后台服务解决方案-总体介绍中的“揭秘TSF:协程”这张图。

实现调度器

基础版的调度器如下:

首先实现“任务”——对Generator的封装:

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

29

30

31

32

33

34

35<?php

class Task {

protected $taskId;

protected $coroutine; //Generator对象

protected $sendValue = null; //发送到Generator中的值

protected $beforeFirstYield = true; //用于判断是否第一次执行Generator

public function __construct($taskId, Generator $coroutine) {

$this->taskId = $taskId;

$this->coroutine = $coroutine;

}

public function getTaskId() {

return $this->taskId;

}

public function setSendValue($sendValue) {//指定哪些值将被发送到下次的恢复执行中

$this->sendValue = $sendValue;

}

public function run() {

if ($this->beforeFirstYield) {//第一次执行,返回当前的value,上文中已解释过这里的原因

$this->beforeFirstYield = false;

return $this->coroutine->current();

} else {

$retval = $this->coroutine->send($this->sendValue);

$this->sendValue = null;

return $retval;

}

}

public function isFinished() {

return !$this->coroutine->valid();

}

}

调度器:

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

29

30

31

32

33

34

35<?php

class Scheduler {

protected $maxTaskId = 0;

protected $taskMap = []; // taskId => task

protected $taskQueue;

public function __construct() {

$this->taskQueue = new SplQueue();

}

public function newTask(Generator $coroutine) {

$tid = ++$this->maxTaskId;

$task = new Task($tid, $coroutine);

$this->taskMap[$tid] = $task;

$this->schedule($task);

return $tid;

}

public function schedule(Task $task) {

$this->taskQueue->enqueue($task);

}

public function run() {//多个任务是交替串行执行的

while (!$this->taskQueue->isEmpty()) {//如果队列任务不为空,则不断出队任务,并执行

$task = $this->taskQueue->dequeue();

$task->run();

if ($task->isFinished()) {//判断当前任务是否执行结束了

unset($this->taskMap[$task->getTaskId()]);

} else {//还没执行结束的任务需要重新入队,以便下次调度

$this->schedule($task);

}

}

}

}

以两个简单的任务来测试上述调度器:

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

29

30

31

32

33

34

35

36

37

38

39

40<?php

function task1() {//协程1

for ($i = 1; $i <= 10; ++$i) {

echo "This is task 1 iteration$i.\n";

yield;

}

}

function task2() {//协程2

for ($i = 1; $i <= 5; ++$i) {

echo "This is task 2 iteration$i.\n";

yield;

}

}

$scheduler = new Scheduler;

$scheduler->newTask(task1());

$scheduler->newTask(task2());

$scheduler->run();

//output

/*

This is task 1 iteration 1.

This is task 2 iteration 1.

This is task 1 iteration 2.

This is task 2 iteration 2.

This is task 1 iteration 3.

This is task 2 iteration 3.

This is task 1 iteration 4.

This is task 2 iteration 4.

This is task 1 iteration 5.

This is task 2 iteration 5.

This is task 1 iteration 6.

This is task 1 iteration 7.

This is task 1 iteration 8.

This is task 1 iteration 9.

This is task 1 iteration 10.

*/

输出结果正如期望的:对于前5个迭代来说,两个任务是交替执行的,在第2个任务结束后,只有第一个任务继续执行。

在C语言编程中,进程与内核的通信是通过系统调用实现的,那么协程与调度器之间的通信也是要借助“系统调用”来实现(当然也可以不通过“系统调用”这层抽象来做,只不过加了“系统调用”这层抽象的话代码会优雅很多,因为协程与调度器之间通信时要做的事情可能有很多种,我们可以将不同种类的事情封装到不同的“系统调用”中)。

那么“系统调用”所要做的是什么事情?

产生任务需要的value;

重新调度任务。

先看一个获取任务Id的“系统调用”:

1

2

3

4

5

6

7<?php

function getTaskId() {

return new SystemCall(function(Task $task, Scheduler $scheduler) {

$task->setSendValue($task->getTaskId());//产生任务需要的value,在task->run()时会通过task->coroutine->send(task->sendValue)发送到Generator中

$scheduler->schedule($task);//重新调度任务

});

}

getTaskId()返回了一个SystemCall类的对象,构造方法的参数中传入了一个回调函数。

封装“系统调用”的类如下:

1

2

3

4

5

6

7

8

9

10

11

12

13<?php

class SystemCall {

protected $callback;

public function __construct(callable $callback) {

$this->callback = $callback;

}

public function __invoke(Task $task, Scheduler $scheduler) {//当SystemCall对象被当做函数调用时调用该方法

$callback = $this->callback;

return $callback($task, $scheduler);

}

}

现在,需要修改下调度器的run()方法以支持“系统调用”:

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18<?php

public function run() {

while (!$this->taskQueue->isEmpty()) {

$task = $this->taskQueue->dequeue();

$retval = $task->run();

if ($retval instanceof SystemCall) {//如果任务发出了一个系统调用,那么执行该系统调用

$retval($task, $this);

continue;//因为在系统调用中已经重新调度task,并且此时任务肯定还未执行结束,所以直接continue

}

if ($task->isFinished()) {

unset($this->taskMap[$task->getTaskId()]);

} else {

$this->schedule($task);

}

}

}

测试下“系统调用”:

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17<?php

function task($max) {

$tid = (yield getTaskId()); //发出一个系统调用,这里可以看成:return getTaskId();$tid = task->sendValue;

for ($i = 1; $i <= $max; ++$i) {

echo "This is task$tiditeration$i.\n";

yield;

}

}

$scheduler = new Scheduler;

$scheduler->newTask(task(10));

$scheduler->newTask(task(5));

$scheduler->run();

//output: 与上个例子相同

再加入2个“系统调用”:

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17<?php

function newTask(Generator $coroutine) {//产生新任务的“系统调用”

return new SystemCall(

function(Task $task, Scheduler $scheduler) use ($coroutine) {

$task->setSendValue($scheduler->newTask($coroutine));//返回taskId给调用者

$scheduler->schedule($task);//重新调度发出当前“系统调用”的任务

}

);

}

function killTask($tid) {//杀死某个任务

return new SystemCall(

function(Task $task, Scheduler $scheduler) use ($tid) {

$task->setSendValue($scheduler->killTask($tid));//成功杀死某个任务返回true,否则返回false

$scheduler->schedule($task);//重新调度发出当前“系统调用”的任务

}

);

}

在调度器中需要新增一个killTask()方法:

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19<?php

public function killTask($tid) {

if (!isset($this->taskMap[$tid])) {//判断tid是否存在

return false;

}

unset($this->taskMap[$tid]);

// This is a bit ugly and could be optimized so it does not have to walk the queue,

// but assuming that killing tasks is rather rare I won't bother with it now

foreach ($this->taskQueue as $i => $task) {//遍历任务队列找到tid后从队列中去除它

if ($task->getTaskId() === $tid) {

unset($this->taskQueue[$i]);

break;

}

}

return true;

}

测试下新增的“系统调用”:

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

29

30

31

32

33

34

35

36

37<?php

function childTask() {//子任务

$tid = (yield getTaskId());

while (true) {

echo "Child task$tidstill alive!\n";

yield;

}

}

function task() {//父任务

$tid = (yield getTaskId());

$childTid = (yield newTask(childTask()));//产生一个子任务,并获取返回的子任务id

for ($i = 1; $i <= 6; ++$i) {

echo "Parent task$tiditeration$i.\n";

yield;

if ($i == 3) yield killTask($childTid);//杀死子任务

}

}

$scheduler = new Scheduler;

$scheduler->newTask(task());

$scheduler->run();

//output:

/*

Parent task 1 iteration 1.

Child task 2 still alive!

Parent task 1 iteration 2.

Child task 2 still alive!

Parent task 1 iteration 3.

Child task 2 still alive!

Parent task 1 iteration 4.

Parent task 1 iteration 5.

Parent task 1 iteration 6.

*/

可以看到在第3次迭代之后子任务被杀死了,注意这里的父子关系只是逻辑意义上的。

该调度器的示例代码目标是完成一个web服务器:一个任务负责在套接字上监听是否有新的连接,当有新连接要建立的时候,它创建一个新任务来处理新连接,对于套接字的IO都是非阻塞的。

首先,新增两个新的“系统调用”:

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16<?php

function waitForRead($socket) {//等待socket可以读

return new SystemCall(

function(Task $task, Scheduler $scheduler) use ($socket) {

$scheduler->waitForRead($socket, $task);//将task加入等待socket可以读的队列中

}

);

}

function waitForWrite($socket) {//等待socket可以写

return new SystemCall(

function(Task $task, Scheduler $scheduler) use ($socket) {

$scheduler->waitForWrite($socket, $task);//将task加入等待socket可以写的队列中

}

);

}

调度器需要新增两个入事件队列的方法:

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22<?php

// resourceID => [socket, [tasks]]

protected $waitingForRead = []; //等待socket可以读的队列

protected $waitingForWrite = [];//等待socket可以写的队列

public function waitForRead($socket, Task $task) {

if (isset($this->waitingForRead[(int) $socket])) {

$this->waitingFo

rRead[(int) $socket][1][] = $task;//入队

} else {

$this->waitingForRead[(int) $socket] = [$socket, [$task]];//初始化队列

}

}

public function waitForWrite($socket, Task $task) {

if (isset($this->waitingForWrite[(int) $socket])) {

$this->waitingForWrite[(int) $socket][1][] = $task;

} else {

$this->waitingForWrite[(int) $socket] = [$socket, [$task]];

}

}

在Linux C中,epoll负责监听多个文件描述符上发生的事件,在PHP中可以用stream_select()函数模拟epoll,下面将在调度器中增加处理socket读写事件的方法:

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

29

30

31

32

33

34

35

36<?php

protected function ioPoll($timeout) {

$rSocks = [];

foreach ($this->waitingForRead as list($socket)) {

$rSocks[] = $socket;

}

$wSocks = [];

foreach ($this->waitingForWrite as list($socket)) {

$wSocks[] = $socket;

}

$eSocks = []; // dummy

if (!stream_select($rSocks, $wSocks, $eSocks, $timeout)) {

return;

}

foreach ($rSocks as $socket) {//读事件发生

list(, $tasks) = $this->waitingForRead[(int) $socket];

unset($this->waitingForRead[(int) $socket]);//从队列中去除

foreach ($tasks as $task) {//逐个调度等待该socket上发生的读事件的任务

$this->schedule($task);

}

}

foreach ($wSocks as $socket) {//写事件发生

list(, $tasks) = $this->waitingForWrite[(int) $socket];

unset($this->waitingForWrite[(int) $socket]);//从队列中去除

foreach ($tasks as $task) {//逐个调度等待该socket上发生的写事件的任务

$this->schedule($task);

}

}

}

接下来需要在合适的地方调用ioPoll()方法,可以在调度器调度某个task之后,调用该方法:

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28<?php

public function run() {

while (!$this->taskQueue->isEmpty()) {

$task = $this->taskQueue->dequeue();

$retval = $task->run();

if ($retval instanceof SystemCall) {

$retval($task, $this);

if ($this->taskQueue->isEmpty()) {//如果任务队列为空的话,则阻塞stream_select的调用,直到有新的连接到来

$this->ioPoll(null);

} elseif (!empty($this->waitingForRead) || !empty($this->waitingForWrite)) {

//若socket等待队列不为空

$this->ioPoll(0);//0秒超时,既不阻塞stream_select的调用

}

continue;

}

if ($this->taskQueue->isEmpty()) {

$this->ioPoll(null);

} elseif (!empty($this->waitingForRead) || !empty($this->waitingForWrite)) {

$this->ioPoll(0);

}

if ($task->isFinished()) {

unset($this->taskMap[$task->getTaskId()]);

} else {

$this->schedule($task);

}

}

}

以上方法并不太好,因为每次调度任务的时候都要做一次逻辑判断以及ioPoll()方法调用会比较频繁,比较好的的做法是把ioPoll()方法封装到一个task任务中去执行(就好像一个“常驻任务”,每次调用到它时就去调用ioPoll()方法查看是否有socket事件发生):

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

29<?php

protected function ioPollTask() {

while (true) {

if ($this->taskQueue->isEmpty()) {

$this->ioPoll(null);//阻塞stream_select的调用,直到有新的连接到来

} else {

$this->ioPoll(0);//0秒超时,既不阻塞stream_select的调用

}

yield;//将yield放到无限循环中,使得该task能“常驻”在调度器队列中

}

}

public function run() {

$this->newTask($this->ioPollTask());//产生“常驻任务”,负责检查socket事件

while (!$this->taskQueue->isEmpty()) {

$task = $this->taskQueue->dequeue();

$retval = $task->run();

if ($retval instanceof SystemCall) {

$retval($task, $this);

continue;

}

if ($task->isFinished()) {

unset($this->taskMap[$task->getTaskId()]);

} else {

$this->schedule($task);

}

}

}

这个时候就该测试下前面完成的成果了:

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

29

30

31

32

33

34

35

36

37

38

39

40

41

42<?php

function server($port) {

echo "Starting server at port$port...\n";

$socket = @stream_socket_server("tcp://localhost:$port", $errNo, $errStr);

if (!$socket) throw new Exception($errStr, $errNo);

stream_set_blocking($socket, 0);//设置socket非阻塞

while (true) {

yield waitForRead($socket);

$clientSocket = stream_socket_accept($socket, 0);//accept的socket也是非阻塞的

yield newTask(handleClient($clientSocket));//产生新的任务处理客户端请求

}

}

function handleClient($socket) {//负责处理客户端请求的Generator

yield waitForRead($socket);

$data = fread($socket, 8192);

$msg = "Received following request:\n\n$data";

$msgLength = strlen($msg);

$response = <<

HTTP/1.1 200 OK\r

Content-Type: text/plain\r

Content-Length: $msgLength\r

Connection: close\r

\r

$msg

RES;

yield waitForWrite($socket);

fwrite($socket, $response);

fclose($socket);

}

$scheduler = new Scheduler;

$scheduler->newTask(server(8000));

$scheduler->run();

上述测试代码接收localhost:8000上的连接,然后返回发送来的内容作为HTTP响应。

可以使用curl http://localhost:8000/或ab -n 20 -c 20 localhost:8000/进行访问测试。

现在新版调度器如下:

协程栈

假如在一个协程(Generator对象)中有另外一个协程需要执行,按照之前完成的调度器是无法正确工作的:

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19<?php

function getSomething() //child coroutine

{

……

yield

……

}

function task()

{

$v = (yield getSomething()); //希望在这里调用子协程,并获得返回值

……

yield

}

$scheduler = new Scheduler;

$scheduler->newTask(task());

$scheduler->run();

注意上面代码中的$v = (yield getSomething());,这里跟之前的那些task的不同之处在于其yield抛出的值是协程,而不是“系统调用”。

那么这个时候就需要一个类似如下图的协程堆栈调用:

首先,实现对子协程返回值的封装:

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17<?php

class CoroutineReturnValue {

protected $value;

public function __construct($value) {

$this->value = $value;

}

public function getValue() {

return $this->value;

}

}

function retval($value) {

return new CoroutineReturnValue($value);

}

实现协程栈:

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28<?php

function stackedCoroutine(Generator $gen) {//本身也是个协程

$stack = new SplStack;

for (;;) {

$value = $gen->current();

if ($value instanceof Generator) {//如果是子协程调用

$stack->push($gen);//父协程压栈

$gen = $value;//$gen指向当前协程

continue;

}

$isReturnValue = $value instanceof CoroutineReturnValue;

if (!$gen->valid() || $isReturnValue) {//如果当前协程执行完了或者当前协程返回的值是CoroutineReturnValue对象(即该值是要返回给父协程的),则结束当前协程的执行

if ($stack->isEmpty()) {//如果栈空了,那么整个协程栈执行完毕

return;

}

$gen = $stack->pop();//如果协程栈不为空,那么弹出下一个需要执行的协程

$gen->send($isReturnValue ? $value->getValue() : NULL);//如果子协程有返回值给到父协程,则发送过去

continue;

}

$gen->send(yield $gen->key() => $value);

}

}

Task类中的构造函数需要做一点点修改:

1

2

3

4

5<?php

public function __construct($taskId, Generator $coroutine) {

$this->taskId = $taskId;

$this->coroutine = StackedCoroutine($coroutine); //coroutine属性赋值为协程栈

}

现在可以改进下上面的web服务器例子,把socket相关操作都封装到一个类中:

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28<?php

class CoSocket {

protected $socket;

public function __construct($socket) {

$this->socket = $socket;//socket资源

}

public function accept() {

yield waitForRead($this->socket);

yield retval(new CoSocket(stream_socket_accept($this->socket, 0)));

}

public function read($size) {

yield waitForRead($this->socket);

yield retval(fread($this->socket, $size));

}

public function write($string) {

yield waitForWrite($this->socket);

fwrite($this->socket, $string);

}

public function close() {

@fclose($this->socket);

}

}

现在server端可以这样写了:

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

29

30

31

32

33

34

35

36

37<?php

function server($port) {

echo "Starting server at port$port...\n";

$socket = @stream_socket_server("tcp://localhost:$port", $errNo, $errStr);

if (!$socket) throw new Exception($errStr, $errNo);

stream_set_blocking($socket, 0);

$socket = new CoSocket($socket);

while (true) {

//yield $socket->accept() 是子协程调用

yield newTask(

handleClient(yield $socket->accept())//这里handleClient()函数接收到的参数是一个CoSocket对象

);

}

}

function handleClient($socket) {

$data = (yield $socket->read(8192));//子协程调用

$msg = "Received following request:\n\n$data";

$msgLength = strlen($msg);

$response = <<

HTTP/1.1 200 OK\r

Content-Type: text/plain\r

Content-Length: $msgLength\r

Connection: close\r

\r

$msg

RES;

yield $socket->write($response);

yield $socket->close();

}

错误处理

现在整个web服务器还缺少对出错的处理,无法在需要的时候抛出异常,为了解决这个问题,可以使用Generator的throw()方法。

throw()方法接受一个Exception, 并将其抛出到协程的当前悬挂点, 看看下面代码:

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15<?php

function gen() {

echo "Foo\n";

try {

yield;

} catch (Exception $e) {

echo "Exception:{$e->getMessage()}\n";

}

echo "Bar\n";

}

$gen = gen();

$gen->rewind(); // echos "Foo"

$gen->throw(new Exception('Test')); // echos "Exception: Test"

// and "Bar"

这样的话,当在进行“系统调用”和子协程调用时就可以抛出异常了。

首先需要修改Scheduler::run()方法:

1

2

3

4

5

6

7

8

9

10<?php

if ($retval instanceof SystemCall) {

try {

$retval($task, $this);

} catch (Exception $e) {

$task->setException($e);

$this->schedule($task);

}

continue;

}

再修改Task类以支持异常传递到协程中:

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26<?php

class Task {

// ...

protected $exception = null;

public function setException($exception) {

$this->exception = $exception;

}

public function run() {

if ($this->beforeFirstYield) {

$this->beforeFirstYield = false;

return $this->coroutine->current();

} elseif ($this->exception) {//如果当前exception属性非null,那么向协程栈中抛入该异常

$retval = $this->coroutine->throw($this->exception);

$this->exception = null;

return $retval;

} else {

$retval = $this->coroutine->send($this->sendValue);

$this->sendValue = null;

return $retval;

}

}

// ...

}

最后,还需要修改协程栈,以正确处理异常:

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

29

30

31

32

33

34

35

36

37

38

39

40

41

42

43

44

45

46

47

48

49

50<?php

function stackedCoroutine(Generator $gen) {

$stack = new SplStack;

$exception = null;

for (;;) {

try {

if ($exception) {

$gen->throw($exception);

$exception = null;

continue;

}

$value = $gen->current();

if ($value instanceof Generator) {

$stack->push($gen);

$gen = $value;

continue;

}

$isReturnValue = $value instanceof CoroutineReturnValue;

if (!$gen->valid() || $isReturnValue) {

if ($stack->isEmpty()) {

return;

}

$gen = $stack->pop();

$gen->send($isReturnValue ? $value->getValue() : NULL);

continue;

}

try {

$sendValue = (yield $gen->key() => $value);//处理“系统调用”抛出的Exception

} catch (Exception $e) {

$gen->throw($e);

continue;

}

$gen->send($sendValue);

} catch (Exception $e) {//捕获try代码块中的$gen执行时抛出的异常

if ($stack->isEmpty()) {

throw $e;

}

$gen = $stack->pop();

$exception = $e;

}

}

}

现在,可以在“系统调用”中使用异常抛出了!例如,调用killTask时,当传递一个不存在的tid时,抛出一个异常:

1

2

3

4

5

6

7

8

9

10

11

12<?php

function killTask($tid) {

return new SystemCall(

function(Task $task, Scheduler $scheduler) use ($tid) {

if ($scheduler->killTask($tid)) {

$scheduler->schedule($task);

} else {

throw new InvalidArgumentException('Invalid task ID!');

}

}

);

}

测试代码(记得注释掉,此前在Scheduler::run()方法中设置的“常驻任务”:$this->newTask($this->ioPollTask());):

1

2

3

4

5

6

7

8<?php

function task() {

try {

yield killTask(500);

} catch (Exception $e) {

echo 'Tried to kill task 500 but failed: ', $e->getMessage(), "\n";

}

}

结语

当我们重新审视协程(Task)中的代码时,会发现编写逻辑跟同步编码十分相似,然而,很酷的事情是通过yield发起的操作实际上都是异步执行的!

举个例子说明下协程的用处,例如现在我们的业务代码里需要调用(通过tcp、udp、http……)其他业务系统的服务,该调用会有一定的耗时。

如果使用PHP语言常规的LNMP模式,当客户端访问我们的业务代码时,能同时处理的请求数 = php-fpm开启的进程数。

如果使用PHP的协程(当然前提是要有一个功能完备且强大的调度器),那么同时处理的请求数 = php进程数 * 每个进程能开启的协程数。

支持协程的语言不止是PHP,还有很多其他语言都有相应的机制实现协程,参阅:Coroutine

php携程语比,PHP 协程相关推荐

  1. linux的进程/线程/协程系列5:协程的发展复兴与实现现状

    协程的发展复兴与实现现状 前言 本篇摘要: 1. 协同制的发展史 1.1 协同工作制的提出 1.2 自顶向下,无需协同 1.3 协同式思想的应用 2. 协程的复兴 2.1 高并发带来的问题 2.2 制 ...

  2. 【Kotlin 协程】Flow 异步流 ⑤ ( 流的上下文 | 上下文保存 | 查看流发射和收集的协程 | 不能在不同协程中执行流的发射和收集操作 | 修改流发射的协程上下文 | flowOn函数 )

    文章目录 一.流的上下文 1.上下文保存 2.流收集函数原型 3.流发射函数原型 4.代码示例 - 查看流发射和收集的协程 5.代码示例 - 不能在不同协程中执行相同流的发射和收集操作 二.修改流发射 ...

  3. 在 Android 开发中使用 Kotlin 协程 (一) -- 初识 Kotlin 协程

    前言 最近在研究 Kotlin 协程,发现功能真的超级强大,很有用,而且很好学,如果你正在或计划使用 Kotlin 开发 Android,那么 Kotlin 协程你一定不能错过! 协程是什么? 我们平 ...

  4. 10-线程,进程,协程,IO多路复用

    - 线程进程介绍 1. 工作最小单元是线程 2. 应用程序 -> 至少有一个进程 -> 至少有一个线程 3. 应用场景: IO密集型:线程 计算密集型:进程 4. GIL,全局解释器锁. ...

  5. python3之协程(3)---greenlet实现协程操作

    原文链接:https://www.cnblogs.com/xybaby/p/6337944.html 正文 在前面的文章中提到python原生的generator是semicoroutine,而gre ...

  6. python中协程的理解_python协程的理解

    一.介绍 什么是并发? 并发的本质就是切换+保存状态 cpu正在运行一个任务,会在两种情况下切走去执行其他的任务(切换由操作系统强制控制): 1.任务发生阻塞 2.计算任务时间过长,需要让出cpu给高 ...

  7. Kotlin学习笔记22 协程part2 join CoroutineScope 协程vs线程

    参考链接 示例来自bilibili Kotlin语言深入解析 张龙老师的视频 1 Job的join方法 import kotlinx.coroutines.* /*** Job的join方法* 它会挂 ...

  8. 协程asyncio_初识asyncio协程

    初识asyncio协程 一.基本概念 ​ 要想了解学习协程相关知识要先对以下几个概念先行了解: 阻塞 ​ 阻塞状态是指程序未得到某所需计算资源时的挂起状态,简单说就是程序在等待某个操作未执行完前无法执 ...

  9. 【协程】MyCoroutine轻量级协程框架代码详细剖解

    协程是什么 协程是用于解决IO密集型业务的轻量级框架.一个项目用到的IO读写非常多而且操作频繁,操作系统多次进行系统调用时,多个IO读写会出现其中一个IO如果出现长时间阻塞的时候,其他读写已经就绪的I ...

最新文章

  1. 原来AGILE就是这么一回事啊!
  2. 某程序员吐槽:提离职后领导开始演戏,假装不知道我工资低,对我进行挽留,怎么办?...
  3. 财务大数据比赛有python吗-【教改实验班简介】财务大数据分析班
  4. Eclipse新建的Maven项目想修改DynamicWebModule,直接去项目目录下修改
  5. 使用VS2015进行C++开发的6个主要原因
  6. 前端初学者开发学习视频_初学者学习前端开发的实用指南
  7. python 实现对地图的点击_python使用folium库绘制地图点击框
  8. 人工智能、机器学习和深度学习的区别与认识
  9. 开源服务器 Jenkins 曝漏洞,可用于发动 DDoS 攻击
  10. android 二级 滚动,android使用 ScrollerView 实现 可上下滚动的分类栏实例
  11. 前端工程师做事的三重境界:我的进阶之路
  12. Python使用matplotlib可视化模拟烧烤摊每月营业额折线图
  13. 【渝粤教育】广东开放大学 计量基础知识 形成性考核 (48)
  14. 《Redis入门指南(第 2 版)》读后感
  15. 网站建设需要怎么做?个人网站建设教程
  16. WordPress网站建设中实用的简繁切换工具
  17. Difference between Static video and Single image ?静态视频和单张图像的区别
  18. 社团联合会计算机教程,计算机与信息工程学院学生社团联合会
  19. 性能衡量指标-吞吐量与响应时间
  20. 【毕业设计选题】2022通信工程毕业设计题目推荐大全

热门文章

  1. linux免密后还是要输密码,ssh配置免密后依然需要输入密码的问题解决及排查过程...
  2. python 编程环境 微信_微信开发之新浪SAE上配置WeRoBot微信机器人,python,Mac环境...
  3. bzoj1084 [SCOI2005]最大子矩阵 dp
  4. Intel 64/x86_64/IA-32/x86处理器 - 指令格式(1) - 概述
  5. Algorithms Part 1-Question 2-QuickSort-快速排序算法
  6. python脚本文件格式_Python 基础语法_Python脚本文件结构
  7. 求最大和 java_三种算法求最大子段和问题——Java实现
  8. GDC 2012]Epic Games谈在智能手机上制作和台式游戏机同等级的图形游戏的经验
  9. UE4 FBX静态网格物体通道
  10. Android Studio3.x填坑路