Guzzle 是一个非常强大而且稳定的 http client。不同于一般的 cURL 封装组件, Guzzle 内部使用了多种请求方式,来实现 http 请求,cURL 只是最常用的方式,并且 Guzzle 提供了强大的异步、并发功能,使得构建一个 http 请求十分容易而且易拓展。现在 Guzzle 已经被 drupal 整合到核心模块中了,可靠性不言而喻。Guzzle 目前使用了 Psr7 规范,拓展性和兼容性也更加优秀了。之前在一次重构记录中提到过,但是没有深入分析过,这次决定介绍一些使用例子并深入分析其底层实现原理,如果有问题,请留言指出,共同进步。

注意:为了尽量缩减阅读量,部分源码分析只列出了关键步骤。

环境

本文使用的 Guzzle 版本为 6.3.0,composer.json 文件内容为

{

"require": {

"guzzlehttp/guzzle": "^6.3"

}

}

配置

Guzzle 的各种配置都和 http 请求相关,比如是否跟踪 302 跳转,是否携带 cookies,是否使用 ssl、超时等等。

配置项是以数组形式在创建 client 对象的时候传入的,所有的配置都在这里。Guzzle 会提供一个默认配置,会和自定义配置进行合并,并优先自定义配置。

public function __construct(array $config = [])

{

$this->configureDefaults($config);

}

private function configureDefaults(array $config)

{

// 自定义配置和默认配置,在这里合并,并赋值给了成员变量

$this->config = $config + $defaults;

}

比如这样:

$config = [

'allow_redirects' => [

'max' => 5,

'referer' => true,

],

'http_errors' => false,

'decode_content' => true,

'cookies' => true,

'connect_timeout' => 1.5,

'timeout' => 2.5,

'headers' => [

'User-Agent' => 'test client for chengxiaobai.cn',

],

];

$client = new \GuzzleHttp\Client($config);

你也可以在构建请求的时候传入配置,这个时候会和构造方法中传入的配置合并,并且只对当前请求有效。

private function prepareDefaults($options)

{

$defaults = $this->config;

// 这里这是赋值给了局部变量,所以只对当前请求有效

$result = $options + $defaults;

return $result;

}

比如这样:

$client = new \GuzzleHttp\Client($config);

$client->request('GET', 'https://www.chengxiaobai.cn/',

[

'allow_redirects' => [

'max' => 1,

'referer' => false,

],

]);

特殊的 handler 参数

handler 参数比较特殊,它必须是闭包,并且参数为Psr7\Http\Message\RequestInterface 和一个 array 类型的参数,并且必须返回GuzzleHttp\Promise\PromiseInterface 或者在成功时满足Psr7\Http\Message\ResponseInterface。

如果按照面向对象的来描述的话,就是你必须得实现一个这样的接口,Chengxiaobai\handler。

interface Chengxiaobai

{

/**

* handler interface

*

* @param RequestInterface $request

* @param array $options

*

* @return Psr\Http\Message\ResponseInterface | GuzzleHttp\Promise\PromiseInterface

*/

public function handler(Psr\Http\Message\RequestInterface $request,array $options);

}

这样对 handler 结构就很明确了吧。我们看源码怎么解析 handler 配置的。

public function __construct(array $config = [])

{

if (!isset($config['handler'])) {

// 创建一个默认的 handler 栈

$config['handler'] = HandlerStack::create();

} elseif (!is_callable($config['handler'])) {

throw new \InvalidArgumentException('handler must be a callable');

}

}

很明显,如果自定义了 handler 就会放弃 Guzzle 默认提供的 handlerStack 。除非你有足够的把握,请不要随意操作 。

举个自定义 handler 操作的例子,比如对任意一个请求都返回404。

$client = new \GuzzleHttp\Client($config);

$response = $client->request('GET', 'www.chengxiaobai.cn/history.html',

[

'handler' => function (\Psr\Http\Message\RequestInterface $request, array $options) {

return new \GuzzleHttp\Psr7\Response(404);

},

]);

echo $response->getStatusCode();// 404

上面我们说 Guzzle 本身自带了一些 handler ,我们先看看默认创建的 handlerStack 都是些什么,先不管每个 handler 里面的实现,在请求处理阶段会详细说。

public static function create(callable $handler = null)

{

// 这里定义了底层请求实现方法

$stack = new self($handler ?: choose_handler());

// 下面都会添加一些 Middleware 中间件

$stack->push(Middleware::httpErrors(), 'http_errors');

$stack->push(Middleware::redirect(), 'allow_redirects');

$stack->push(Middleware::cookies(), 'cookies');

$stack->push(Middleware::prepareBody(), 'prepare_body');

return $stack;

}

注意 choose_handler 这个方法,这个方法决定了实现请求的底层方法,通过它能让我们对 Guzzle 请求的实现的底层方法有个初步了解,也就是,所有的请求都是通过它发送出去的。仔细看源码注释,很关键。

function choose_handler()

{

$handler = null;

// 判定 curl 方法,如果并发和常规 curl 同时存在

if (function_exists('curl_multi_exec') && function_exists('curl_exec')) {

// 注册并发 curl 为默认请求方式,常规 curl 为同步请求方式

$handler = Proxy::wrapSync(new CurlMultiHandler(), new CurlHandler());

} elseif (function_exists('curl_exec')) {

// 如果两种 curl 方法同时只有一个存在,则优先常规 curl

$handler = new CurlHandler();

} elseif (function_exists('curl_multi_exec')) {

$handler = new CurlMultiHandler();

}

// 如果 allow_url_fopen 开启

if (ini_get('allow_url_fopen')) {

$handler = $handler

// 已有 handler ? 再注册一个流处理 handler

? Proxy::wrapStreaming($handler, new StreamHandler())

// 否则只有流处理 handler

: new StreamHandler();

} elseif (!$handler) {

throw new \RuntimeException('GuzzleHttp requires cURL, the '

. 'allow_url_fopen ini setting, or a custom HTTP handler.');

}

return $handler;

}

创建完 handler 后,会往 stack 中添加一些 middleware,也就是中间件。简单介绍下,push 函数第一个参数是闭包,第二个参数是字符串,中间件的名字,middleware 主要由闭包组成,可能有的 middleware 嵌套有点多,显得有点复杂,但是无论结构如何复杂,本质上就是用来处理各种请求数据,其结构类型和Chengxiaobai\handler一样。

需要深入了解 Handlers 和 Middleware 的可以点击这里看官方文档,个人觉得需要对闭包掌握的比较好,才能很好的理解其设计思路。

根据上面的源码分析你可能注意到,系统默认提供的 handler 是以对象的形式存在的。但其真正使用的时候是当做闭包使用的,这里介绍的是真正发挥作用的闭包结构,而不是表面的 HandlerStack 对象。后面”处理请求“章节会详细介绍。

构建请求

其实所有的请求在处理上都是异步的,同步请求只不过是异步请求构建后立即要求返回结果,异步转同步。但是异步和同步的请求构建都是类似的,不同处我会说明。

public function request($method, $uri = '', array $options = [])

{

$options[RequestOptions::SYNCHRONOUS] = true;

// requestAsync 就是异步请求,不过直接调用了 wait 转同步

return $this->requestAsync($method, $uri, $options)->wait();

}

请求的 uri 参数

如果你在配置中定义了 base_uri 参数,这个时候可以使用相对地址,如果没有,则不支持相对地址,Guzzle 并没有帮你校验最终 uri 参数是否正确,只有等到请求发出去了,才知道 uri 是否正确。

private function buildUri($uri, array $config)

{

// for BC we accept null which would otherwise fail in uri_for

$uri = Psr7\uri_for($uri === null ? '' : $uri);

if (isset($config['base_uri'])) {

$uri = Psr7\UriResolver::resolve(Psr7\uri_for($config['base_uri']), $uri);

}

// 这里使用了 psr7 规范,返回的是一个实现了 UriInterface 的对象

return $uri->getScheme() === '' && $uri->getHost() !== '' ? $uri->withScheme('http') : $uri;

}

比如这样就会报错

$client = new \GuzzleHttp\Client();

$response = $client->request('GET', '/history.html');

/**

* ountput :

* Fatal error: Uncaught GuzzleHttp\Exception\RequestException:

* cURL error 3: malformed (see http://curl.haxx.se/libcurl/c/libcurl-errors.html)

* in /app/vendor/guzzlehttp/guzzle/src/Handler/CurlFactory.php on line 187

*/

base_uri

uri

result

chengxiaobai.cn/first/

/second

chengxiaobai.cn/second

chengxiaobai.cn/first/

second

chengxiaobai.cn/first/second

chengxiaobai.cn/first

/second

chengxiaobai.cn/second

chengxiaobai.cn/first

second

chengxiaobai.cn/second

保险的情况就是每次都使用绝对路径就好了,但是有时候相对路径在做爬取的时候很有用,依据实际需求使用。

构建 reqsest

Guzzle 内部使用的 request 对象都是 Psr\Http\Message\RequestInterface 的实现,这样只要你能按照 psr7 的规范来就很容易拓展 Guzzle。

这里再次提醒大家,modern php 开发,应该遵循 psr 规范,有利于社区更好的协作和稳健发展。

public function requestAsync($method, $uri = '', array $options = [])

{

$request = new Psr7\Request($method, $uri, $headers, $body, $version);

return $this->transfer($request, $options);

}

丰富 request

transfer 的结构类型和Chengxiaobai\handler一样。

private function transfer(RequestInterface $request, array $options)

{

// 这个方法会根据你的请求类型,构建更具体的请求对象

$request = $this->applyOptions($request, $options);

$handler = $options['handler'];

}

applyOptions 从名字能看出来,这个方法会根据你的配置,构建出相匹配的 request 对象。比如根据请求类型的不同,进行参数 encode,设置 body 比如 json、stream,设置 header 等请求细节。

注意配置传入的是一个引用,所以里面对配置的任何修改,都会影响后续操作。

private function applyOptions(RequestInterface $request, array &$options)

{

// 各种判定,修改 $options,如果有没覆盖到的,会新生成一个 $modify 说明需要重新构建 $request

// 构建新的对象方法

$request = Psr7\modify_request($request, $modify);

return $request;

}

如果没有需要修改的,就直接返回,如果有的话,会重新构建一个新的 request 对象。

注意需要的参数,有些构建参数是从 $changes 取的,但是有些是从原本的 $request 对象取的,本质上就是,有新的就用新的,没有就用老的保持不变。

function modify_request(RequestInterface $request, array $changes)

{

if (!$changes) {

return $request;

}

return new Request(

isset($changes['method']) ? $changes['method'] : $request->getMethod(),

$uri,

$headers,

isset($changes['body']) ? $changes['body'] : $request->getBody(),

isset($changes['version'])

? $changes['version']

: $request->getProtocolVersion()

);

}

promise 简介

关于 promise ,属于 guzzlehttp/promises 类库,是一个很值得学习的类库,有机会我会专门分析下它的实现原理,目前我们还是着重分析请求实现过程。

通看源码会发现,虽然 Guzzle 内部大量使用了 promise 并且夹杂着闭包很复杂,但是 promise 发挥的作用都是一样的。目前可以这么理解,就是 promise 是一个状态机,它有三种状态:等待、满足、拒绝。

下面的这个例子,只是示例会如何执行,promise 规范是有各种要求的,具体见 Promises/A+规范,Guzzle 使用的 promise 也是该规范的一个实现。

$promise = new Promise(

function () {

echo 'wait';

},

function () {

echo 'cancle';

}

);

$promise->then(

function () {

echo 'onFulfilled';

},

function () {

echo 'onRejected';

}

)->then(

function () {

echo 'onFulfilled';

},

function () {

echo 'onRejected';

}

);

从等待状态开始执行,满足了就执行 onFulfilled ,拒绝了就执行 onRejected,一连串下来,依靠不同的状态去执行不同的方法,配合 http 请求要么成功要么失败,不会有第三种状态的场景,就可以很顺畅的理解了。

private function transfer(RequestInterface $request, array $options)

{

$handler = $options['handler'];

try {

return Promise\promise_for($handler($request, $options));

} catch (\Exception $e) {

return Promise\rejection_for($e);

}

}

成功就是 promise_for 失败就是 rejection_for。

promise_for

这个方法主要是用来保证返回的是一个 promise 对象,因为经过 $handler 处理后的值可能是一个 promise 对象( $handler 如何处理紧接着会说),也能是一个 response 对象,也可能是一个异常,所以需要对对数据做一个“清洗转换”,并返回一个满足状态的 promise。

function promise_for($value)

{

// 如果是一个 promise 对象就直接返回

if ($value instanceof PromiseInterface) {

return $value;

}

// 如果是一个包含 then 方法的对象,会把它转换成一个 promise 对象

if (method_exists($value, 'then')) {

// 如果里面有 wait、cancel、resolve、reject 等方法,会把它添加进去作为默认方法,否则置为 null

$wfn = method_exists($value, 'wait') ? [$value, 'wait'] : null;

$cfn = method_exists($value, 'cancel') ? [$value, 'cancel'] : null;

$promise = new Promise($wfn, $cfn);

$value->then([$promise, 'resolve'], [$promise, 'reject']);

return $promise;

}

// 前俩者都不满不足的情况下,直接返回一个满足状态的 promise。

return new FulfilledPromise($value);

}

rejection_for

异常情况会走入到 rejection_for 方法。同理进行“数据清洗”,并返回一个拒绝状态的 promise 。

function rejection_for($reason)

{

if ($reason instanceof PromiseInterface) {

return $reason;

}

return new RejectedPromise($reason);

}

handler 处理

还是 transfer 方法,在传给 promise_for 之前,先调用了一个 $handler,也就是配置中的 handler 函数。接着就是返回一个 Promise 对象,用于外层异步调用。

private function transfer(RequestInterface $request, array $options)

{

$handler = $options['handler'];

try {

// 这里会先调用配置中的 handler 方法

return Promise\promise_for($handler($request, $options));

} catch (\Exception $e) {

return Promise\rejection_for($e);

}

}

处理请求

对于上面的 handler 处理小节,你可能会有疑惑,为什么就调用了 handler 函数,那不是直接开始处理请求了吗?

我们之前介绍过 handler 的数据结构,是一个是 handlerStack 对象,但是其调用本质是一系列组合闭包。但数据结构上是一个对象,怎么使用的时候就成了闭包呢?

当尝试以调用函数的方式调用一个对象时,__invoke() 方法会被自动调用。

有了这个前提,我们看下 handlerStack 源码。

从 handlerStack 的名字上,我们就能知道它是一个”栈“数据结构,其满足”后进先出“的特性。

public function __invoke(RequestInterface $request, array $options)

{

// 这个函数主要是实现 Middleware 中间件操作

$handler = $this->resolve();

//这里下面紧接着会有分析

return $handler($request, $options);

}

public function resolve()

{

// 变量缓存,能优化部分性能

if (!$this->cached) {

// 这个 handler 就是之前选择的 实现请求的底层方法

// 如果没有的话,请求都无法实现,就别折腾了,抛个异常终止吧

if (!($prev = $this->handler)) {

throw new \LogicException('No handler has been specified');

}

// 反转顺序,实现”后进先出“特性,调用每个中间件

foreach (array_reverse($this->stack) as $fn) {

// 中间件的注册是 [$middleware, $name] 形式的

// 所以取第一个元素是其具体实现,第二个参数只是名字

// 调用第一次传入的是 handler,后续传入的就是上一次处理的结果

$prev = $fn[0]($prev);

}

// 所有的都处理完毕,缓存起来

$this->cached = $prev;

}

return $this->cached;

}

上面就是很经典的中间件模型实现,laravel 中实现的略有区别,主要用到了 array_reduce 这个函数,但是原理上大同小异,知道其原理一通百通。

我们再继续看看源码。还是这个方法,不过我们分析其最终调用的实现。

根据 Middleware 流程图,我们知道最后一个调用的是 http_errors,我们就来分析它吧,没有任何特殊性,其他的 Middleware 结构都是一样的,只是有些中间件多次使用了 __invoke() 魔术方法而已。

Middleware 里面闭包结构复杂,好好理解下。

public function __invoke(RequestInterface $request, array $options)

{

// 这个函数主要是实现 Middleware 中间件操作

$handler = $this->resolve();

// 现在我们分析这个

return $handler($request, $options);

}

public static function httpErrors()

{

// 第一次调用返回!传入一个 闭包-A

return function (callable $handler) {

// 第二次调用返回!传入 $request,$options

return function ($request, array $options) use ($handler) {

// Middleware 自己的逻辑判定返回什么样的闭包

if (empty($options['http_errors'])) {

// 第三次调用返回!返回 闭包-A 的处理结果

// 这里根据配置 没有注册 then 函数,直接进行下一步处理

return $handler($request, $options);

}

// 第三次调用返回!返回 闭包-A,附加 promise

// 根据上面我们说到的 promise 特性,这里用 then

// 附加了 闭包-A 处理完毕之后要调用的逻辑

return $handler($request, $options)->then(

function (ResponseInterface $response) use ($request, $handler) {

$code = $response->getStatusCode();

if ($code < 400) {

return $response;

}

throw RequestException::create($request, $response);

}

);

};

};

}

关于返回层数,可以根据 return 来迅速定位,一个 return 就对应一次调用返回。

现在我们先梳理下到这步 handlerStack 被调用的次数,知道这三层闭包分别在哪里被调用了,有利于我们得出最终结果。

// 第一次

public static function create(callable $handler = null)

{

$stack->push(Middleware::httpErrors(), 'http_errors');

}

// 第二次

public function resolve()

{

$prev = $fn[0]($prev);

}

// 第三次

public function __invoke(RequestInterface $request, array $options)

{

$handler($request, $options);

}

最后的结果应该是,如果按照 Middleware 结构应该是这样的:

$handler($request, $options)->then('http_errors')

->then('allow_redirects')

->then('cookies')

->then('prepare_body')

这个 $handler 就是最开始传入的请求实现底层方法。

整个 Middleware 就实现了,传入的时候先处理一遍请求数据,请求完了,通过 then 再处理一遍请求结果。

注意!!!由于 Middleware 的作用不同,可能有的 Middleware 并不会处理请求结果,就不会注册 then 函数。这里描述的是 Middleware 的整个流程,并没有对其中某个做特殊分析,因为其需求场景不同,逻辑处理会有细微变化。

这个 $handler 具体是哪个请求方法呢?还记得 choose_handler() 方法吗,它决定了到底使用哪种底层方法去实现请求,现在我们终于执行到发起请求的步骤了。

再回顾下 choose_handler() 方法。

function choose_handler()

{

$handler = Proxy::wrapSync(new CurlMultiHandler(), new CurlHandler());

$handler = new CurlHandler();

$handler = new CurlHandler();

$handler = $handler

? Proxy::wrapStreaming($handler, new StreamHandler())

: new StreamHandler();

return $handler;

}

这两个方法都有源码分析,没有印象的可以再回去看看。

不同 $handler 都是在 __invoke() 方法上做文章。

我们分析第一个 $handler = Proxy::wrapSync(new CurlMultiHandler(), new CurlHandler());。

public static function wrapSync(

callable $default,

callable $sync

) {

return function (RequestInterface $request, array $options) use ($default, $sync) { // 注意这里的三目运算符,判定同步请求选项是否为空

return empty($options[RequestOptions::SYNCHRONOUS])

// 默认是并发请求 new CurlMultiHandler()

? $default($request, $options)

// 这里是同步请求 new CurlHandler()

: $sync($request, $options);

};

}

现在,异步和同步请求终于出现了区别。我们先看同步请求。

同步请求

我们回顾下 request() 方法,注意到一个步骤。

public function request($method, $uri = '', array $options = [])

{

// 这里往配置中添加了一个选项,设置该请求为同步的

$options[RequestOptions::SYNCHRONOUS] = true;

}

所以,这里走的是同步请求,我们来分析 CurlHandler() 。

public function __invoke(RequestInterface $request, array $options)

{

// 如果设置了延迟请求,会在这里阻塞一会

if (isset($options['delay'])) {

usleep($options['delay'] * 1000);

}

// 创建一个 handler 抽象对象

$easy = $this->factory->create($request, $options);

// 执行

curl_exec($easy->handle);

$easy->errno = curl_errno($easy->handle);

// 请求处理结束

return CurlFactory::finish($this, $easy, $this->factory);

}

这里需要分析一下工厂类 CurlFactory,里面大都涉及到 cURL的一些配置,有兴趣的可以看下源码学习,配置的含义官方文档有专门的介绍,我这里就不在分析它们了,主要流程的分析还是不会缺的。

public function create(RequestInterface $request, array $options)

{

if (isset($options['curl']['body_as_string'])) {

$options['_body_as_string'] = $options['curl']['body_as_string'];

unset($options['curl']['body_as_string']);

}

// handle 的一个抽象对象

$easy = new EasyHandle;

$easy->request = $request;

$easy->options = $options;

// 获取默认配置

$conf = $this->getDefaultConf($easy);

// 解析请求方法

$this->applyMethod($easy, $conf);

// 解析配置

$this->applyHandlerOptions($easy, $conf);

// 解析头部

$this->applyHeaders($easy, $conf);

unset($conf['_headers']);

// 解析自定义 curl 配置

if (isset($options['curl'])) {

$conf = array_replace($conf, $options['curl']);

}

// 设置回调函数用于处理返回头

$conf[CURLOPT_HEADERFUNCTION] = $this->createHeaderFn($easy);

// 从 handle池 获取一个 handle,没有就新建一个

$easy->handle = $this->handles

? array_pop($this->handles)

: curl_init();

curl_setopt_array($easy->handle, $conf);

return $easy;

}

public static function finish(

callable $handler,

EasyHandle $easy,

CurlFactoryInterface $factory

) {

// 这里会调用配置用设置的 on_stats 函数

if (isset($easy->options['on_stats'])) {

self::invokeStats($easy);

}

// 有错误的话走错误处理流程

if (!$easy->response || $easy->errno) {

return self::finishError($handler, $easy, $factory);

}

// 释放资源,还到 handle池

$factory->release($easy);

// 处理 流数据

$body = $easy->response->getBody();

if ($body->isSeekable()) {

$body->rewind();

}

// 返回一个满足状态的 promise

return new FulfilledPromise($easy->response);

}

根据源码分析,同步请求在这一步就已经发出了请求,并且回调了配置中的 on_stats 函数,拿到了未经处理的返回值原始返回值,并且同步请求 handler池,也就是复用的请求句柄为3个,这个没有办法修改,写死在代码中的。

异步请求

public function __invoke(RequestInterface $request, array $options)

{

$easy = $this->factory->create($request, $options);

// 为每个请求生成一个 ID

$id = (int) $easy->handle;

// 注册一个 promise,分别是调用执行和关闭方法

$promise = new Promise(

[$this, 'execute'],

// 依据 ID 来关闭请求

function () use ($id) { return $this->cancel($id); }

);

// 添加请求 底层是 curl_multi_add_handle 方法

$this->addRequest(['easy' => $easy, 'deferred' => $promise]);

return $promise;

}

工厂类 CurlFactory 在上面已经分析过,这里不再赘述。但是异步请求这个时候并没有发起最终的请求,先是为每个请求生成一个 ID,然后将请求添加到批处理回话句柄( curl_multi_add_handle )中,最后返回了一个 Promise 对象,里面注册了 execute 函数和 cancel 函数,用于后面发起和关闭请求。

需要注意的就是设定了延迟执行的请求,是在 addRequest() 方法中处理的。后面在”返回结果“章节会讲到延迟请求处理。

流处理

配置中如果 stream 选项不为空,就会启用它,如果你没有 cURL,那就只能用它了。

public function __invoke(RequestInterface $request, array $options)

{

// 如果设置了延迟请求,会在这里阻塞一会

if (isset($options['delay'])) {

usleep($options['delay'] * 1000);

}

// 流处理本身信息较少,所以为了补全一些信息,这里记录处理开始时间

$startTime = isset($options['on_stats']) ? microtime(true) : null;

try {

// 不支持 expect header.

$request = $request->withoutHeader('Expect');

// 当内容为0的时候,依然添加一个头信息

if (0 === $request->getBody()->getSize()) {

$request = $request->withHeader('Content-Length', 0);

}

// 发起请求,然后回调 on_stats 函数

// 解析结果,同样返回一个满足状态的 promise

return $this->createResponse(

$request,

$options,

$this->createStream($request, $options),

$startTime

);

} catch (\InvalidArgumentException $e) {

throw $e;

} catch (\Exception $e) {

// Determine if the error was a networking error.

$message = $e->getMessage();

// This list can probably get more comprehensive.

if (strpos($message, 'getaddrinfo') // DNS lookup failed

|| strpos($message, 'Connection refused')

|| strpos($message, "couldn't connect to host") // error on HHVM

) {

$e = new ConnectException($e->getMessage(), $request, $e);

}

$e = RequestException::wrapException($request, $e);

$this->invokeStats($options, $request, $startTime, null, $e);

return \GuzzleHttp\Promise\rejection_for($e);

}

}

关于流处理,因为其底层实现是 fopen() 函数,其支持的协议比较多,不止有 http,它支持的协议和封装协议在这里可以看到,所以 Guzzle 对其做了一些特殊处理以满足业务需要。

返回结果

根据上面的分析我们已经知道 transfer 方法返回的结果是什么了,然后就是获取返回结果。

同步请求

同步请求因为在 transfer 方法中,实际的请求已经发出去,已经拿到了未经处理的原始返回结果。

public function send(RequestInterface $request, array $options = [])

{

// 我们注意到最后调用的 wait 方法

return $this->sendAsync($request, $options)->wait();

}

在同步请求方法中,直接调用了 wait() 方法,所以直接走 Promise 对象的 wait() 方法及注册的 then()方法。还记得之前的 Middleware 里面注册了一些 then() 方法吗?这里主要就是调用它们了,完成中间件“处理返回结果”的这一步骤,当然还有一些在逻辑处理中注册的 then() 方法,在此不再举例。

异步请求

异步请求在 transfer 方法中返回的是一个 Promise,此时实际请求并没有发送。我们从官方例子来分析发送请求并且获取返回结果的方式。

$promise = $client->requestAsync('GET', 'https://www.chengxiaobai.cn');

$promise->then(

function (ResponseInterface $res) {

echo $res->getStatusCode() . "\n";

},

function (RequestException $e) {

echo $e->getMessage() . "\n";

echo $e->getRequest()->getMethod();

}

);

这种方式是对每个异步请求单独注册 then() 方法,说明这个请求成功了怎么处理,失败了怎么处理。

$client = new Client(['base_uri' => 'https://www.chengxiaobai.cn']);

// 注册多个异步请求,实现并发

$promises = [

'image' => $client->getAsync('/image'),

'png' => $client->getAsync('/image/png'),

'jpeg' => $client->getAsync('/image/jpeg'),

'webp' => $client->getAsync('/image/webp')

];

// 有一个失败就终止

$results = Promise\unwrap($promises);

// 忽略某些请求的异常,保证所有请求都发送出去

$results = Promise\settle($promises)->wait();

这个是设定多个异步请求,实现并发,并选择对部分请求错误是否忽略进行处理。

$client = new Client();

$requests = function ($total) use ($client) {

for ($i = 1; $i < $total; $i++) {

$uri = 'https://www.chengxiaobai.cn/page/' . $i;

// 这里用到了协程

yield function() use ($client, $uri) {

return $client->getAsync($uri.$i);

};

}

};

$pool = new Pool($client, $requests(10), [

// 并发数

'concurrency' => 5,

'fulfilled' => function ($response, $index) {

echo $res->getStatusCode() . "\n";

},

'rejected' => function ($reason, $index) {

echo $e->getMessage() . "\n";

},

]);

// 初始化 Promise

$promise = $pool->promise();

// 发起请求处理

$promise->wait();

这个是对大批量请求做出一个批量处理,类似一个请求池的的概念,设定了出口速率( concurrency ),使用统一的处理逻辑,处理请求池当中的数据。

我们来分析下 Pool 的源码,主要是构造函数。

public function __construct(

ClientInterface $client,

$requests,

array $config = []

) {

// 设定请求池大小

if (isset($config['pool_size'])) {

$config['concurrency'] = $config['pool_size'];

} elseif (!isset($config['concurrency'])) {

// 默认并发数 25

$config['concurrency'] = 25;

}

if (isset($config['options'])) {

$opts = $config['options'];

unset($config['options']);

} else {

$opts = [];

}

// 将请求列表转换为一个迭代器

$iterable = \GuzzleHttp\Promise\iter_for($requests);

$requests = function () use ($iterable, $client, $opts) {

// 遍历请求列表

foreach ($iterable as $key => $rfn) {

// 如果是一个 request 的实现,转换为一个异步请求

if ($rfn instanceof RequestInterface) {

yield $key => $client->sendAsync($rfn, $opts);

} elseif (is_callable($rfn)) {

// 如过是一个闭包,直接调用

yield $key => $rfn($opts);

} else {

throw new \InvalidArgumentException('...');

}

}

};

// 支持迭代的 Promise 对象

$this->each = new EachPromise($requests(), $config);

}

我们可以看到,Pool 模式下,所有的请求配置 $opts 都是一样的,所以每个请求的处理逻辑都是一样的,如果每个请求都有有定制化需求,Pool 模式可能不太适合,当然可以使用修改源码的方式,不过这个已经不符合 Pool 模式设计的初衷了。

不管哪种形式,都可以发现触发最终调用的都是 wait() 方法。这个和 Promise 的规范有关。

我们看下异步如何处理请求的。

还记得异步请求返回的 Promise 吗?

$promise = new Promise(

[$this, 'execute'],

// 依据 ID 来关闭请求

function () use ($id) { return $this->cancel($id); }

);

wait() 方法调用的就是 [$this, 'execute'],我们来分析它的实现。在此之前,我们需要特别说明下延迟请求。

延迟时间

对于延迟请求,同步请求和流请求很好处理,直接阻塞就好了,如果是20个异步请求中包含10个延迟请求,每个延迟时间还不相等,这个时候延迟请求的处理就得好好考虑下了。

在”请求处理“章节我们说过,延迟请求是没有立即加到批处理请求句柄的,它被暂时存放在 $this->delays 队列中。直到你决定发起请求了,延迟请求才被拿出来计算其是否应该被加到批处理请求句柄中。计算逻辑我们从源码看看如何计算阻塞时间。

public function execute()

{

$queue = P\queue();

while ($this->handles || !$queue->isEmpty()) {

// 如果没有在进行的请求,并且延迟请求队列不为空,就开始阻塞

if (!$this->active && $this->delays) {

usleep($this->timeToNext());

}

$this->tick();

}

}

private function timeToNext()

{

$currentTime = microtime(true);

$nextTime = PHP_INT_MAX;

// 找出现有延迟请求队列中最小的延迟时间

foreach ($this->delays as $time) {

if ($time < $nextTime) {

$nextTime = $time;

}

}

return max(0, $nextTime - $currentTime) * 1000000;

}

execute 主要是调用了 tick() 这个方法。

public function tick()

{

// 如果延迟请求队列不为空,处理延迟请求

if ($this->delays) {

$currentTime = microtime(true);

// $this->delays[$id] = microtime(true) + ($easy->options['delay'] / 1000);

foreach ($this->delays as $id => $delay) {

// 延迟任务已经达到延迟预期时间,开始处理

if ($currentTime >= $delay) {

// 将它从延迟任务队列中删除

unset($this->delays[$id]);

// 添加到批量请求句柄中

curl_multi_add_handle(

$this->_mh,

$this->handles[$id]['easy']->handle

);

}

}

}

// 执行队列中的任务

P\queue()->run();

// 执行请求

if ($this->active &&

curl_multi_select($this->_mh, $this->selectTimeout) === -1

) {

// See: https://bugs.php.net/bug.php?id=61141

usleep(250);

}

while (curl_multi_exec($this->_mh, $this->active) === CURLM_CALL_MULTI_PERFORM);

// 获取请求结果信息,移除请求成功的请求

$this->processMessages();

}

然后异步处理流程就很清晰了:

如果延迟请求队列不为空并且当前没有在执行的请求,先阻塞最小的延迟时间,以保证延迟请求队列在每次请求都至少被消耗一个。如果有正在执行的请求或者延迟请求队列不为空,直接执行2。

发起一次批量请求。

获取请求信息,移除成功的请求。

如果请求队列不为空,执行1-3。

从上面的流程,我们可以分析得出,即使你的并发数大于请求数,也并不意味着只请求一次,可能会有重试或者延迟请求造成多次请求。并且根据步骤 1 我们也可以知道,非延迟任务也会跟着一起被阻塞。

和同步请求一样,异步请求下每个请求处理完毕后,都会执行相应的 then() 方法完成返回结果处理。

流请求

流请求和同步请求类似,只是底层实现的方法不一样,流程都是一样,这里不再赘述。

彩蛋

在分析源码的过程中,发现了一个没有被使用的类 GuzzleHttp\Promise\Coroutine ,它也是对 Promise 的一个实现,但是其是通过迭代器来实现的,会不会有协程版的 Promise 呢?我们拭目以待。

本文由程小白创作,本文可自由转载、引用,但需署名作者且注明文章出处。

php choose handler,Guzzle 源码分析相关推荐

  1. Handler的源码分析

    2019独角兽企业重金招聘Python工程师标准>>> 最近复习到了Handler一直只知道怎么用,却没有仔细分析过源码,这下就来看看 为什么要使用handler呢? 因为在安卓应用 ...

  2. Handler机制源码分析

    一.Handler使用上需要注意的几点 1.1 handler使用不当造成的内存泄漏 public class MainActivity extends AppCompatActivity {priv ...

  3. Wangle源码分析:编解码Handler

    2019独角兽企业重金招聘Python工程师标准>>> 前言 编解码是协议相关的,如果没有编解码Handler,那么在处理网络的粘包.拆包时会变得很复杂.除了http之类的公有协议之 ...

  4. Wangle源码分析:Pipeline、Handler、Context

    2019独角兽企业重金招聘Python工程师标准>>> 基本概念 Wangle中的Pipeline和Netty中的Pipeline是很相似的,既可以将它看为一种职责链模式的实现也可以 ...

  5. Android多线程:深入分析 Handler机制源码(二)

    前言 在Android开发的多线程应用场景中,Handler机制十分常用 接下来,深入分析 Handler机制的源码,希望加深理解 目录 1. Handler 机制简介 定义 一套 Android 消 ...

  6. Wangle源码分析:Service

    2019独角兽企业重金招聘Python工程师标准>>> 前言 Wangle中的Service代表一个远程服务(方法),熟悉RPC的朋友肯定知道这就是一个简单的RPC,当然,和一些常见 ...

  7. Wangle源码分析:ClientBootstrap

    2019独角兽企业重金招聘Python工程师标准>>> ClientBootstrap介绍 ClientBootstrap是wangle作为Client端的一个快速启动辅助类,在经过 ...

  8. Wangle源码分析:ServerBootstrap

    2019独角兽企业重金招聘Python工程师标准>>> ServerBootstrap介绍       ServerBootstrap,顾名思义,它是作为Wangle服务端的一个启动 ...

  9. Wangle源码分析:EventBaseHandler、AsyncSocketHandler

    2019独角兽企业重金招聘Python工程师标准>>> 前言 前面的Wangle源码分析系列文章详细的分析了Pipeline.Handler等实现原理,细心的读者可能发现,每次在构造 ...

  10. Handler机制的源码分析

    2019独角兽企业重金招聘Python工程师标准>>> Handler,MessageQueue,Looper的关系 Looper的作用是在线程中处理消息的 MessageQueue ...

最新文章

  1. 安装完DevExpress14.2.5,如何破解它呢?
  2. 上接[翻译]ASP.NET 2.0中的健康监测系统(Health Monitoring)(1) - 基本应用
  3. python音频聚类_Python实现聚类算法AP
  4. 死锁的四个必要条件,及处理方法
  5. 新款苹果手机_苹果宣布新系统 性能依旧“压制quot;安卓
  6. mysql if exists用法_MySQL中EXISTS的用法
  7. 2021 Spring 自定义注解 +AOP +方法入参
  8. 2017 Multi-University Training Contest - Team 7:1003. Color the chessboard(...)
  9. mybatis 插件
  10. css制作序列帧动画
  11. vb adodb mysql_PHP_ADODB类使用,MySQL的例子PHP中最通用的数据 - phpStudy
  12. 直播APP源码开发,直播APP源码搭建,如何优化程序?
  13. 7教程统计意义_SPSS混合线性模型在生物医药统计中的应用——杏花开生物医药统计...
  14. java生成随机名字
  15. rpm的mysql怎么安装_MySQL的rpm安装教程
  16. 关于电子发票打印报销最优美的姿势——发票大师网页版
  17. prior 和 priori的区别
  18. 如何在D盘以管理员身份,运行cmd
  19. BZOJ[3039]玉蟾宫 悬线法
  20. 相关系数-excel-CORREL()

热门文章

  1. jQuery siblings() 方法
  2. TTP229 16路 电容式 触摸开关 数字触摸传感器 模块
  3. Unity为人物模型 添加动效Animator
  4. a轮b轮c轮天使轮区别是什么?
  5. 到底要不要继续坚持做硬件
  6. 解密微信电脑版image文件夹下缓存的用户图片
  7. php容器概念,PHP容器——Pimple运行流程浅析
  8. 使用Faiss来加速计算向量之间的相似度
  9. laravel 验证手机号
  10. 【个人代码及思路】2018年9月CSP第一题:卖菜