php choose handler,Guzzle 源码分析
Guzzle 是一个非常强大而且稳定的 http client。不同于一般的 cURL 封装组件, Guzzle 内部使用了多种请求方式,来实现 http 请求,cURL 只是最常用的方式,并且 Guzzle 提供了强大的异步、并发功能,使得构建一个 http 请求十分容易而且易拓展。现在 Guzzle 已经被 drupal 整合到核心模块中了,可靠性不言而喻。Guzzle 目前使用了 Psr7 规范,拓展性和兼容性也更加优秀了。之前在一次重构记录中提到过,但是没有深入分析过,这次决定介绍一些使用例子并深入分析其底层实现原理,如果有问题,请留言指出,共同进步。
注意:为了尽量缩减阅读量,部分源码分析只列出了关键步骤。
环境
本文使用的 Guzzle 版本为 6.3.0,composer.json 文件内容为
{
"require": {
"guzzlehttp/guzzle": "^6.3"
}
}
配置
Guzzle 的各种配置都和 http 请求相关,比如是否跟踪 302 跳转,是否携带 cookies,是否使用 ssl、超时等等。
配置项是以数组形式在创建 client 对象的时候传入的,所有的配置都在这里。Guzzle 会提供一个默认配置,会和自定义配置进行合并,并优先自定义配置。
public function __construct(array $config = [])
{
$this->configureDefaults($config);
}
private function configureDefaults(array $config)
{
// 自定义配置和默认配置,在这里合并,并赋值给了成员变量
$this->config = $config + $defaults;
}
比如这样:
$config = [
'allow_redirects' => [
'max' => 5,
'referer' => true,
],
'http_errors' => false,
'decode_content' => true,
'cookies' => true,
'connect_timeout' => 1.5,
'timeout' => 2.5,
'headers' => [
'User-Agent' => 'test client for chengxiaobai.cn',
],
];
$client = new \GuzzleHttp\Client($config);
你也可以在构建请求的时候传入配置,这个时候会和构造方法中传入的配置合并,并且只对当前请求有效。
private function prepareDefaults($options)
{
$defaults = $this->config;
// 这里这是赋值给了局部变量,所以只对当前请求有效
$result = $options + $defaults;
return $result;
}
比如这样:
$client = new \GuzzleHttp\Client($config);
$client->request('GET', 'https://www.chengxiaobai.cn/',
[
'allow_redirects' => [
'max' => 1,
'referer' => false,
],
]);
特殊的 handler 参数
handler 参数比较特殊,它必须是闭包,并且参数为Psr7\Http\Message\RequestInterface 和一个 array 类型的参数,并且必须返回GuzzleHttp\Promise\PromiseInterface 或者在成功时满足Psr7\Http\Message\ResponseInterface。
如果按照面向对象的来描述的话,就是你必须得实现一个这样的接口,Chengxiaobai\handler。
interface Chengxiaobai
{
/**
* handler interface
*
* @param RequestInterface $request
* @param array $options
*
* @return Psr\Http\Message\ResponseInterface | GuzzleHttp\Promise\PromiseInterface
*/
public function handler(Psr\Http\Message\RequestInterface $request,array $options);
}
这样对 handler 结构就很明确了吧。我们看源码怎么解析 handler 配置的。
public function __construct(array $config = [])
{
if (!isset($config['handler'])) {
// 创建一个默认的 handler 栈
$config['handler'] = HandlerStack::create();
} elseif (!is_callable($config['handler'])) {
throw new \InvalidArgumentException('handler must be a callable');
}
}
很明显,如果自定义了 handler 就会放弃 Guzzle 默认提供的 handlerStack 。除非你有足够的把握,请不要随意操作 。
举个自定义 handler 操作的例子,比如对任意一个请求都返回404。
$client = new \GuzzleHttp\Client($config);
$response = $client->request('GET', 'www.chengxiaobai.cn/history.html',
[
'handler' => function (\Psr\Http\Message\RequestInterface $request, array $options) {
return new \GuzzleHttp\Psr7\Response(404);
},
]);
echo $response->getStatusCode();// 404
上面我们说 Guzzle 本身自带了一些 handler ,我们先看看默认创建的 handlerStack 都是些什么,先不管每个 handler 里面的实现,在请求处理阶段会详细说。
public static function create(callable $handler = null)
{
// 这里定义了底层请求实现方法
$stack = new self($handler ?: choose_handler());
// 下面都会添加一些 Middleware 中间件
$stack->push(Middleware::httpErrors(), 'http_errors');
$stack->push(Middleware::redirect(), 'allow_redirects');
$stack->push(Middleware::cookies(), 'cookies');
$stack->push(Middleware::prepareBody(), 'prepare_body');
return $stack;
}
注意 choose_handler 这个方法,这个方法决定了实现请求的底层方法,通过它能让我们对 Guzzle 请求的实现的底层方法有个初步了解,也就是,所有的请求都是通过它发送出去的。仔细看源码注释,很关键。
function choose_handler()
{
$handler = null;
// 判定 curl 方法,如果并发和常规 curl 同时存在
if (function_exists('curl_multi_exec') && function_exists('curl_exec')) {
// 注册并发 curl 为默认请求方式,常规 curl 为同步请求方式
$handler = Proxy::wrapSync(new CurlMultiHandler(), new CurlHandler());
} elseif (function_exists('curl_exec')) {
// 如果两种 curl 方法同时只有一个存在,则优先常规 curl
$handler = new CurlHandler();
} elseif (function_exists('curl_multi_exec')) {
$handler = new CurlMultiHandler();
}
// 如果 allow_url_fopen 开启
if (ini_get('allow_url_fopen')) {
$handler = $handler
// 已有 handler ? 再注册一个流处理 handler
? Proxy::wrapStreaming($handler, new StreamHandler())
// 否则只有流处理 handler
: new StreamHandler();
} elseif (!$handler) {
throw new \RuntimeException('GuzzleHttp requires cURL, the '
. 'allow_url_fopen ini setting, or a custom HTTP handler.');
}
return $handler;
}
创建完 handler 后,会往 stack 中添加一些 middleware,也就是中间件。简单介绍下,push 函数第一个参数是闭包,第二个参数是字符串,中间件的名字,middleware 主要由闭包组成,可能有的 middleware 嵌套有点多,显得有点复杂,但是无论结构如何复杂,本质上就是用来处理各种请求数据,其结构类型和Chengxiaobai\handler一样。
需要深入了解 Handlers 和 Middleware 的可以点击这里看官方文档,个人觉得需要对闭包掌握的比较好,才能很好的理解其设计思路。
根据上面的源码分析你可能注意到,系统默认提供的 handler 是以对象的形式存在的。但其真正使用的时候是当做闭包使用的,这里介绍的是真正发挥作用的闭包结构,而不是表面的 HandlerStack 对象。后面”处理请求“章节会详细介绍。
构建请求
其实所有的请求在处理上都是异步的,同步请求只不过是异步请求构建后立即要求返回结果,异步转同步。但是异步和同步的请求构建都是类似的,不同处我会说明。
public function request($method, $uri = '', array $options = [])
{
$options[RequestOptions::SYNCHRONOUS] = true;
// requestAsync 就是异步请求,不过直接调用了 wait 转同步
return $this->requestAsync($method, $uri, $options)->wait();
}
请求的 uri 参数
如果你在配置中定义了 base_uri 参数,这个时候可以使用相对地址,如果没有,则不支持相对地址,Guzzle 并没有帮你校验最终 uri 参数是否正确,只有等到请求发出去了,才知道 uri 是否正确。
private function buildUri($uri, array $config)
{
// for BC we accept null which would otherwise fail in uri_for
$uri = Psr7\uri_for($uri === null ? '' : $uri);
if (isset($config['base_uri'])) {
$uri = Psr7\UriResolver::resolve(Psr7\uri_for($config['base_uri']), $uri);
}
// 这里使用了 psr7 规范,返回的是一个实现了 UriInterface 的对象
return $uri->getScheme() === '' && $uri->getHost() !== '' ? $uri->withScheme('http') : $uri;
}
比如这样就会报错
$client = new \GuzzleHttp\Client();
$response = $client->request('GET', '/history.html');
/**
* ountput :
* Fatal error: Uncaught GuzzleHttp\Exception\RequestException:
* cURL error 3: malformed (see http://curl.haxx.se/libcurl/c/libcurl-errors.html)
* in /app/vendor/guzzlehttp/guzzle/src/Handler/CurlFactory.php on line 187
*/
base_uri
uri
result
chengxiaobai.cn/first/
/second
chengxiaobai.cn/second
chengxiaobai.cn/first/
second
chengxiaobai.cn/first/second
chengxiaobai.cn/first
/second
chengxiaobai.cn/second
chengxiaobai.cn/first
second
chengxiaobai.cn/second
保险的情况就是每次都使用绝对路径就好了,但是有时候相对路径在做爬取的时候很有用,依据实际需求使用。
构建 reqsest
Guzzle 内部使用的 request 对象都是 Psr\Http\Message\RequestInterface 的实现,这样只要你能按照 psr7 的规范来就很容易拓展 Guzzle。
这里再次提醒大家,modern php 开发,应该遵循 psr 规范,有利于社区更好的协作和稳健发展。
public function requestAsync($method, $uri = '', array $options = [])
{
$request = new Psr7\Request($method, $uri, $headers, $body, $version);
return $this->transfer($request, $options);
}
丰富 request
transfer 的结构类型和Chengxiaobai\handler一样。
private function transfer(RequestInterface $request, array $options)
{
// 这个方法会根据你的请求类型,构建更具体的请求对象
$request = $this->applyOptions($request, $options);
$handler = $options['handler'];
}
applyOptions 从名字能看出来,这个方法会根据你的配置,构建出相匹配的 request 对象。比如根据请求类型的不同,进行参数 encode,设置 body 比如 json、stream,设置 header 等请求细节。
注意配置传入的是一个引用,所以里面对配置的任何修改,都会影响后续操作。
private function applyOptions(RequestInterface $request, array &$options)
{
// 各种判定,修改 $options,如果有没覆盖到的,会新生成一个 $modify 说明需要重新构建 $request
// 构建新的对象方法
$request = Psr7\modify_request($request, $modify);
return $request;
}
如果没有需要修改的,就直接返回,如果有的话,会重新构建一个新的 request 对象。
注意需要的参数,有些构建参数是从 $changes 取的,但是有些是从原本的 $request 对象取的,本质上就是,有新的就用新的,没有就用老的保持不变。
function modify_request(RequestInterface $request, array $changes)
{
if (!$changes) {
return $request;
}
return new Request(
isset($changes['method']) ? $changes['method'] : $request->getMethod(),
$uri,
$headers,
isset($changes['body']) ? $changes['body'] : $request->getBody(),
isset($changes['version'])
? $changes['version']
: $request->getProtocolVersion()
);
}
promise 简介
关于 promise ,属于 guzzlehttp/promises 类库,是一个很值得学习的类库,有机会我会专门分析下它的实现原理,目前我们还是着重分析请求实现过程。
通看源码会发现,虽然 Guzzle 内部大量使用了 promise 并且夹杂着闭包很复杂,但是 promise 发挥的作用都是一样的。目前可以这么理解,就是 promise 是一个状态机,它有三种状态:等待、满足、拒绝。
下面的这个例子,只是示例会如何执行,promise 规范是有各种要求的,具体见 Promises/A+规范,Guzzle 使用的 promise 也是该规范的一个实现。
$promise = new Promise(
function () {
echo 'wait';
},
function () {
echo 'cancle';
}
);
$promise->then(
function () {
echo 'onFulfilled';
},
function () {
echo 'onRejected';
}
)->then(
function () {
echo 'onFulfilled';
},
function () {
echo 'onRejected';
}
);
从等待状态开始执行,满足了就执行 onFulfilled ,拒绝了就执行 onRejected,一连串下来,依靠不同的状态去执行不同的方法,配合 http 请求要么成功要么失败,不会有第三种状态的场景,就可以很顺畅的理解了。
private function transfer(RequestInterface $request, array $options)
{
$handler = $options['handler'];
try {
return Promise\promise_for($handler($request, $options));
} catch (\Exception $e) {
return Promise\rejection_for($e);
}
}
成功就是 promise_for 失败就是 rejection_for。
promise_for
这个方法主要是用来保证返回的是一个 promise 对象,因为经过 $handler 处理后的值可能是一个 promise 对象( $handler 如何处理紧接着会说),也能是一个 response 对象,也可能是一个异常,所以需要对对数据做一个“清洗转换”,并返回一个满足状态的 promise。
function promise_for($value)
{
// 如果是一个 promise 对象就直接返回
if ($value instanceof PromiseInterface) {
return $value;
}
// 如果是一个包含 then 方法的对象,会把它转换成一个 promise 对象
if (method_exists($value, 'then')) {
// 如果里面有 wait、cancel、resolve、reject 等方法,会把它添加进去作为默认方法,否则置为 null
$wfn = method_exists($value, 'wait') ? [$value, 'wait'] : null;
$cfn = method_exists($value, 'cancel') ? [$value, 'cancel'] : null;
$promise = new Promise($wfn, $cfn);
$value->then([$promise, 'resolve'], [$promise, 'reject']);
return $promise;
}
// 前俩者都不满不足的情况下,直接返回一个满足状态的 promise。
return new FulfilledPromise($value);
}
rejection_for
异常情况会走入到 rejection_for 方法。同理进行“数据清洗”,并返回一个拒绝状态的 promise 。
function rejection_for($reason)
{
if ($reason instanceof PromiseInterface) {
return $reason;
}
return new RejectedPromise($reason);
}
handler 处理
还是 transfer 方法,在传给 promise_for 之前,先调用了一个 $handler,也就是配置中的 handler 函数。接着就是返回一个 Promise 对象,用于外层异步调用。
private function transfer(RequestInterface $request, array $options)
{
$handler = $options['handler'];
try {
// 这里会先调用配置中的 handler 方法
return Promise\promise_for($handler($request, $options));
} catch (\Exception $e) {
return Promise\rejection_for($e);
}
}
处理请求
对于上面的 handler 处理小节,你可能会有疑惑,为什么就调用了 handler 函数,那不是直接开始处理请求了吗?
我们之前介绍过 handler 的数据结构,是一个是 handlerStack 对象,但是其调用本质是一系列组合闭包。但数据结构上是一个对象,怎么使用的时候就成了闭包呢?
当尝试以调用函数的方式调用一个对象时,__invoke() 方法会被自动调用。
有了这个前提,我们看下 handlerStack 源码。
从 handlerStack 的名字上,我们就能知道它是一个”栈“数据结构,其满足”后进先出“的特性。
public function __invoke(RequestInterface $request, array $options)
{
// 这个函数主要是实现 Middleware 中间件操作
$handler = $this->resolve();
//这里下面紧接着会有分析
return $handler($request, $options);
}
public function resolve()
{
// 变量缓存,能优化部分性能
if (!$this->cached) {
// 这个 handler 就是之前选择的 实现请求的底层方法
// 如果没有的话,请求都无法实现,就别折腾了,抛个异常终止吧
if (!($prev = $this->handler)) {
throw new \LogicException('No handler has been specified');
}
// 反转顺序,实现”后进先出“特性,调用每个中间件
foreach (array_reverse($this->stack) as $fn) {
// 中间件的注册是 [$middleware, $name] 形式的
// 所以取第一个元素是其具体实现,第二个参数只是名字
// 调用第一次传入的是 handler,后续传入的就是上一次处理的结果
$prev = $fn[0]($prev);
}
// 所有的都处理完毕,缓存起来
$this->cached = $prev;
}
return $this->cached;
}
上面就是很经典的中间件模型实现,laravel 中实现的略有区别,主要用到了 array_reduce 这个函数,但是原理上大同小异,知道其原理一通百通。
我们再继续看看源码。还是这个方法,不过我们分析其最终调用的实现。
根据 Middleware 流程图,我们知道最后一个调用的是 http_errors,我们就来分析它吧,没有任何特殊性,其他的 Middleware 结构都是一样的,只是有些中间件多次使用了 __invoke() 魔术方法而已。
Middleware 里面闭包结构复杂,好好理解下。
public function __invoke(RequestInterface $request, array $options)
{
// 这个函数主要是实现 Middleware 中间件操作
$handler = $this->resolve();
// 现在我们分析这个
return $handler($request, $options);
}
public static function httpErrors()
{
// 第一次调用返回!传入一个 闭包-A
return function (callable $handler) {
// 第二次调用返回!传入 $request,$options
return function ($request, array $options) use ($handler) {
// Middleware 自己的逻辑判定返回什么样的闭包
if (empty($options['http_errors'])) {
// 第三次调用返回!返回 闭包-A 的处理结果
// 这里根据配置 没有注册 then 函数,直接进行下一步处理
return $handler($request, $options);
}
// 第三次调用返回!返回 闭包-A,附加 promise
// 根据上面我们说到的 promise 特性,这里用 then
// 附加了 闭包-A 处理完毕之后要调用的逻辑
return $handler($request, $options)->then(
function (ResponseInterface $response) use ($request, $handler) {
$code = $response->getStatusCode();
if ($code < 400) {
return $response;
}
throw RequestException::create($request, $response);
}
);
};
};
}
关于返回层数,可以根据 return 来迅速定位,一个 return 就对应一次调用返回。
现在我们先梳理下到这步 handlerStack 被调用的次数,知道这三层闭包分别在哪里被调用了,有利于我们得出最终结果。
// 第一次
public static function create(callable $handler = null)
{
$stack->push(Middleware::httpErrors(), 'http_errors');
}
// 第二次
public function resolve()
{
$prev = $fn[0]($prev);
}
// 第三次
public function __invoke(RequestInterface $request, array $options)
{
$handler($request, $options);
}
最后的结果应该是,如果按照 Middleware 结构应该是这样的:
$handler($request, $options)->then('http_errors')
->then('allow_redirects')
->then('cookies')
->then('prepare_body')
这个 $handler 就是最开始传入的请求实现底层方法。
整个 Middleware 就实现了,传入的时候先处理一遍请求数据,请求完了,通过 then 再处理一遍请求结果。
注意!!!由于 Middleware 的作用不同,可能有的 Middleware 并不会处理请求结果,就不会注册 then 函数。这里描述的是 Middleware 的整个流程,并没有对其中某个做特殊分析,因为其需求场景不同,逻辑处理会有细微变化。
这个 $handler 具体是哪个请求方法呢?还记得 choose_handler() 方法吗,它决定了到底使用哪种底层方法去实现请求,现在我们终于执行到发起请求的步骤了。
再回顾下 choose_handler() 方法。
function choose_handler()
{
$handler = Proxy::wrapSync(new CurlMultiHandler(), new CurlHandler());
$handler = new CurlHandler();
$handler = new CurlHandler();
$handler = $handler
? Proxy::wrapStreaming($handler, new StreamHandler())
: new StreamHandler();
return $handler;
}
这两个方法都有源码分析,没有印象的可以再回去看看。
不同 $handler 都是在 __invoke() 方法上做文章。
我们分析第一个 $handler = Proxy::wrapSync(new CurlMultiHandler(), new CurlHandler());。
public static function wrapSync(
callable $default,
callable $sync
) {
return function (RequestInterface $request, array $options) use ($default, $sync) { // 注意这里的三目运算符,判定同步请求选项是否为空
return empty($options[RequestOptions::SYNCHRONOUS])
// 默认是并发请求 new CurlMultiHandler()
? $default($request, $options)
// 这里是同步请求 new CurlHandler()
: $sync($request, $options);
};
}
现在,异步和同步请求终于出现了区别。我们先看同步请求。
同步请求
我们回顾下 request() 方法,注意到一个步骤。
public function request($method, $uri = '', array $options = [])
{
// 这里往配置中添加了一个选项,设置该请求为同步的
$options[RequestOptions::SYNCHRONOUS] = true;
}
所以,这里走的是同步请求,我们来分析 CurlHandler() 。
public function __invoke(RequestInterface $request, array $options)
{
// 如果设置了延迟请求,会在这里阻塞一会
if (isset($options['delay'])) {
usleep($options['delay'] * 1000);
}
// 创建一个 handler 抽象对象
$easy = $this->factory->create($request, $options);
// 执行
curl_exec($easy->handle);
$easy->errno = curl_errno($easy->handle);
// 请求处理结束
return CurlFactory::finish($this, $easy, $this->factory);
}
这里需要分析一下工厂类 CurlFactory,里面大都涉及到 cURL的一些配置,有兴趣的可以看下源码学习,配置的含义官方文档有专门的介绍,我这里就不在分析它们了,主要流程的分析还是不会缺的。
public function create(RequestInterface $request, array $options)
{
if (isset($options['curl']['body_as_string'])) {
$options['_body_as_string'] = $options['curl']['body_as_string'];
unset($options['curl']['body_as_string']);
}
// handle 的一个抽象对象
$easy = new EasyHandle;
$easy->request = $request;
$easy->options = $options;
// 获取默认配置
$conf = $this->getDefaultConf($easy);
// 解析请求方法
$this->applyMethod($easy, $conf);
// 解析配置
$this->applyHandlerOptions($easy, $conf);
// 解析头部
$this->applyHeaders($easy, $conf);
unset($conf['_headers']);
// 解析自定义 curl 配置
if (isset($options['curl'])) {
$conf = array_replace($conf, $options['curl']);
}
// 设置回调函数用于处理返回头
$conf[CURLOPT_HEADERFUNCTION] = $this->createHeaderFn($easy);
// 从 handle池 获取一个 handle,没有就新建一个
$easy->handle = $this->handles
? array_pop($this->handles)
: curl_init();
curl_setopt_array($easy->handle, $conf);
return $easy;
}
public static function finish(
callable $handler,
EasyHandle $easy,
CurlFactoryInterface $factory
) {
// 这里会调用配置用设置的 on_stats 函数
if (isset($easy->options['on_stats'])) {
self::invokeStats($easy);
}
// 有错误的话走错误处理流程
if (!$easy->response || $easy->errno) {
return self::finishError($handler, $easy, $factory);
}
// 释放资源,还到 handle池
$factory->release($easy);
// 处理 流数据
$body = $easy->response->getBody();
if ($body->isSeekable()) {
$body->rewind();
}
// 返回一个满足状态的 promise
return new FulfilledPromise($easy->response);
}
根据源码分析,同步请求在这一步就已经发出了请求,并且回调了配置中的 on_stats 函数,拿到了未经处理的返回值原始返回值,并且同步请求 handler池,也就是复用的请求句柄为3个,这个没有办法修改,写死在代码中的。
异步请求
public function __invoke(RequestInterface $request, array $options)
{
$easy = $this->factory->create($request, $options);
// 为每个请求生成一个 ID
$id = (int) $easy->handle;
// 注册一个 promise,分别是调用执行和关闭方法
$promise = new Promise(
[$this, 'execute'],
// 依据 ID 来关闭请求
function () use ($id) { return $this->cancel($id); }
);
// 添加请求 底层是 curl_multi_add_handle 方法
$this->addRequest(['easy' => $easy, 'deferred' => $promise]);
return $promise;
}
工厂类 CurlFactory 在上面已经分析过,这里不再赘述。但是异步请求这个时候并没有发起最终的请求,先是为每个请求生成一个 ID,然后将请求添加到批处理回话句柄( curl_multi_add_handle )中,最后返回了一个 Promise 对象,里面注册了 execute 函数和 cancel 函数,用于后面发起和关闭请求。
需要注意的就是设定了延迟执行的请求,是在 addRequest() 方法中处理的。后面在”返回结果“章节会讲到延迟请求处理。
流处理
配置中如果 stream 选项不为空,就会启用它,如果你没有 cURL,那就只能用它了。
public function __invoke(RequestInterface $request, array $options)
{
// 如果设置了延迟请求,会在这里阻塞一会
if (isset($options['delay'])) {
usleep($options['delay'] * 1000);
}
// 流处理本身信息较少,所以为了补全一些信息,这里记录处理开始时间
$startTime = isset($options['on_stats']) ? microtime(true) : null;
try {
// 不支持 expect header.
$request = $request->withoutHeader('Expect');
// 当内容为0的时候,依然添加一个头信息
if (0 === $request->getBody()->getSize()) {
$request = $request->withHeader('Content-Length', 0);
}
// 发起请求,然后回调 on_stats 函数
// 解析结果,同样返回一个满足状态的 promise
return $this->createResponse(
$request,
$options,
$this->createStream($request, $options),
$startTime
);
} catch (\InvalidArgumentException $e) {
throw $e;
} catch (\Exception $e) {
// Determine if the error was a networking error.
$message = $e->getMessage();
// This list can probably get more comprehensive.
if (strpos($message, 'getaddrinfo') // DNS lookup failed
|| strpos($message, 'Connection refused')
|| strpos($message, "couldn't connect to host") // error on HHVM
) {
$e = new ConnectException($e->getMessage(), $request, $e);
}
$e = RequestException::wrapException($request, $e);
$this->invokeStats($options, $request, $startTime, null, $e);
return \GuzzleHttp\Promise\rejection_for($e);
}
}
关于流处理,因为其底层实现是 fopen() 函数,其支持的协议比较多,不止有 http,它支持的协议和封装协议在这里可以看到,所以 Guzzle 对其做了一些特殊处理以满足业务需要。
返回结果
根据上面的分析我们已经知道 transfer 方法返回的结果是什么了,然后就是获取返回结果。
同步请求
同步请求因为在 transfer 方法中,实际的请求已经发出去,已经拿到了未经处理的原始返回结果。
public function send(RequestInterface $request, array $options = [])
{
// 我们注意到最后调用的 wait 方法
return $this->sendAsync($request, $options)->wait();
}
在同步请求方法中,直接调用了 wait() 方法,所以直接走 Promise 对象的 wait() 方法及注册的 then()方法。还记得之前的 Middleware 里面注册了一些 then() 方法吗?这里主要就是调用它们了,完成中间件“处理返回结果”的这一步骤,当然还有一些在逻辑处理中注册的 then() 方法,在此不再举例。
异步请求
异步请求在 transfer 方法中返回的是一个 Promise,此时实际请求并没有发送。我们从官方例子来分析发送请求并且获取返回结果的方式。
$promise = $client->requestAsync('GET', 'https://www.chengxiaobai.cn');
$promise->then(
function (ResponseInterface $res) {
echo $res->getStatusCode() . "\n";
},
function (RequestException $e) {
echo $e->getMessage() . "\n";
echo $e->getRequest()->getMethod();
}
);
这种方式是对每个异步请求单独注册 then() 方法,说明这个请求成功了怎么处理,失败了怎么处理。
$client = new Client(['base_uri' => 'https://www.chengxiaobai.cn']);
// 注册多个异步请求,实现并发
$promises = [
'image' => $client->getAsync('/image'),
'png' => $client->getAsync('/image/png'),
'jpeg' => $client->getAsync('/image/jpeg'),
'webp' => $client->getAsync('/image/webp')
];
// 有一个失败就终止
$results = Promise\unwrap($promises);
// 忽略某些请求的异常,保证所有请求都发送出去
$results = Promise\settle($promises)->wait();
这个是设定多个异步请求,实现并发,并选择对部分请求错误是否忽略进行处理。
$client = new Client();
$requests = function ($total) use ($client) {
for ($i = 1; $i < $total; $i++) {
$uri = 'https://www.chengxiaobai.cn/page/' . $i;
// 这里用到了协程
yield function() use ($client, $uri) {
return $client->getAsync($uri.$i);
};
}
};
$pool = new Pool($client, $requests(10), [
// 并发数
'concurrency' => 5,
'fulfilled' => function ($response, $index) {
echo $res->getStatusCode() . "\n";
},
'rejected' => function ($reason, $index) {
echo $e->getMessage() . "\n";
},
]);
// 初始化 Promise
$promise = $pool->promise();
// 发起请求处理
$promise->wait();
这个是对大批量请求做出一个批量处理,类似一个请求池的的概念,设定了出口速率( concurrency ),使用统一的处理逻辑,处理请求池当中的数据。
我们来分析下 Pool 的源码,主要是构造函数。
public function __construct(
ClientInterface $client,
$requests,
array $config = []
) {
// 设定请求池大小
if (isset($config['pool_size'])) {
$config['concurrency'] = $config['pool_size'];
} elseif (!isset($config['concurrency'])) {
// 默认并发数 25
$config['concurrency'] = 25;
}
if (isset($config['options'])) {
$opts = $config['options'];
unset($config['options']);
} else {
$opts = [];
}
// 将请求列表转换为一个迭代器
$iterable = \GuzzleHttp\Promise\iter_for($requests);
$requests = function () use ($iterable, $client, $opts) {
// 遍历请求列表
foreach ($iterable as $key => $rfn) {
// 如果是一个 request 的实现,转换为一个异步请求
if ($rfn instanceof RequestInterface) {
yield $key => $client->sendAsync($rfn, $opts);
} elseif (is_callable($rfn)) {
// 如过是一个闭包,直接调用
yield $key => $rfn($opts);
} else {
throw new \InvalidArgumentException('...');
}
}
};
// 支持迭代的 Promise 对象
$this->each = new EachPromise($requests(), $config);
}
我们可以看到,Pool 模式下,所有的请求配置 $opts 都是一样的,所以每个请求的处理逻辑都是一样的,如果每个请求都有有定制化需求,Pool 模式可能不太适合,当然可以使用修改源码的方式,不过这个已经不符合 Pool 模式设计的初衷了。
不管哪种形式,都可以发现触发最终调用的都是 wait() 方法。这个和 Promise 的规范有关。
我们看下异步如何处理请求的。
还记得异步请求返回的 Promise 吗?
$promise = new Promise(
[$this, 'execute'],
// 依据 ID 来关闭请求
function () use ($id) { return $this->cancel($id); }
);
wait() 方法调用的就是 [$this, 'execute'],我们来分析它的实现。在此之前,我们需要特别说明下延迟请求。
延迟时间
对于延迟请求,同步请求和流请求很好处理,直接阻塞就好了,如果是20个异步请求中包含10个延迟请求,每个延迟时间还不相等,这个时候延迟请求的处理就得好好考虑下了。
在”请求处理“章节我们说过,延迟请求是没有立即加到批处理请求句柄的,它被暂时存放在 $this->delays 队列中。直到你决定发起请求了,延迟请求才被拿出来计算其是否应该被加到批处理请求句柄中。计算逻辑我们从源码看看如何计算阻塞时间。
public function execute()
{
$queue = P\queue();
while ($this->handles || !$queue->isEmpty()) {
// 如果没有在进行的请求,并且延迟请求队列不为空,就开始阻塞
if (!$this->active && $this->delays) {
usleep($this->timeToNext());
}
$this->tick();
}
}
private function timeToNext()
{
$currentTime = microtime(true);
$nextTime = PHP_INT_MAX;
// 找出现有延迟请求队列中最小的延迟时间
foreach ($this->delays as $time) {
if ($time < $nextTime) {
$nextTime = $time;
}
}
return max(0, $nextTime - $currentTime) * 1000000;
}
execute 主要是调用了 tick() 这个方法。
public function tick()
{
// 如果延迟请求队列不为空,处理延迟请求
if ($this->delays) {
$currentTime = microtime(true);
// $this->delays[$id] = microtime(true) + ($easy->options['delay'] / 1000);
foreach ($this->delays as $id => $delay) {
// 延迟任务已经达到延迟预期时间,开始处理
if ($currentTime >= $delay) {
// 将它从延迟任务队列中删除
unset($this->delays[$id]);
// 添加到批量请求句柄中
curl_multi_add_handle(
$this->_mh,
$this->handles[$id]['easy']->handle
);
}
}
}
// 执行队列中的任务
P\queue()->run();
// 执行请求
if ($this->active &&
curl_multi_select($this->_mh, $this->selectTimeout) === -1
) {
// See: https://bugs.php.net/bug.php?id=61141
usleep(250);
}
while (curl_multi_exec($this->_mh, $this->active) === CURLM_CALL_MULTI_PERFORM);
// 获取请求结果信息,移除请求成功的请求
$this->processMessages();
}
然后异步处理流程就很清晰了:
如果延迟请求队列不为空并且当前没有在执行的请求,先阻塞最小的延迟时间,以保证延迟请求队列在每次请求都至少被消耗一个。如果有正在执行的请求或者延迟请求队列不为空,直接执行2。
发起一次批量请求。
获取请求信息,移除成功的请求。
如果请求队列不为空,执行1-3。
从上面的流程,我们可以分析得出,即使你的并发数大于请求数,也并不意味着只请求一次,可能会有重试或者延迟请求造成多次请求。并且根据步骤 1 我们也可以知道,非延迟任务也会跟着一起被阻塞。
和同步请求一样,异步请求下每个请求处理完毕后,都会执行相应的 then() 方法完成返回结果处理。
流请求
流请求和同步请求类似,只是底层实现的方法不一样,流程都是一样,这里不再赘述。
彩蛋
在分析源码的过程中,发现了一个没有被使用的类 GuzzleHttp\Promise\Coroutine ,它也是对 Promise 的一个实现,但是其是通过迭代器来实现的,会不会有协程版的 Promise 呢?我们拭目以待。
本文由程小白创作,本文可自由转载、引用,但需署名作者且注明文章出处。
php choose handler,Guzzle 源码分析相关推荐
- Handler的源码分析
2019独角兽企业重金招聘Python工程师标准>>> 最近复习到了Handler一直只知道怎么用,却没有仔细分析过源码,这下就来看看 为什么要使用handler呢? 因为在安卓应用 ...
- Handler机制源码分析
一.Handler使用上需要注意的几点 1.1 handler使用不当造成的内存泄漏 public class MainActivity extends AppCompatActivity {priv ...
- Wangle源码分析:编解码Handler
2019独角兽企业重金招聘Python工程师标准>>> 前言 编解码是协议相关的,如果没有编解码Handler,那么在处理网络的粘包.拆包时会变得很复杂.除了http之类的公有协议之 ...
- Wangle源码分析:Pipeline、Handler、Context
2019独角兽企业重金招聘Python工程师标准>>> 基本概念 Wangle中的Pipeline和Netty中的Pipeline是很相似的,既可以将它看为一种职责链模式的实现也可以 ...
- Android多线程:深入分析 Handler机制源码(二)
前言 在Android开发的多线程应用场景中,Handler机制十分常用 接下来,深入分析 Handler机制的源码,希望加深理解 目录 1. Handler 机制简介 定义 一套 Android 消 ...
- Wangle源码分析:Service
2019独角兽企业重金招聘Python工程师标准>>> 前言 Wangle中的Service代表一个远程服务(方法),熟悉RPC的朋友肯定知道这就是一个简单的RPC,当然,和一些常见 ...
- Wangle源码分析:ClientBootstrap
2019独角兽企业重金招聘Python工程师标准>>> ClientBootstrap介绍 ClientBootstrap是wangle作为Client端的一个快速启动辅助类,在经过 ...
- Wangle源码分析:ServerBootstrap
2019独角兽企业重金招聘Python工程师标准>>> ServerBootstrap介绍 ServerBootstrap,顾名思义,它是作为Wangle服务端的一个启动 ...
- Wangle源码分析:EventBaseHandler、AsyncSocketHandler
2019独角兽企业重金招聘Python工程师标准>>> 前言 前面的Wangle源码分析系列文章详细的分析了Pipeline.Handler等实现原理,细心的读者可能发现,每次在构造 ...
- Handler机制的源码分析
2019独角兽企业重金招聘Python工程师标准>>> Handler,MessageQueue,Looper的关系 Looper的作用是在线程中处理消息的 MessageQueue ...
最新文章
- 安装完DevExpress14.2.5,如何破解它呢?
- 上接[翻译]ASP.NET 2.0中的健康监测系统(Health Monitoring)(1) - 基本应用
- python音频聚类_Python实现聚类算法AP
- 死锁的四个必要条件,及处理方法
- 新款苹果手机_苹果宣布新系统 性能依旧“压制quot;安卓
- mysql if exists用法_MySQL中EXISTS的用法
- 2021 Spring 自定义注解 +AOP +方法入参
- 2017 Multi-University Training Contest - Team 7:1003. Color the chessboard(...)
- mybatis 插件
- css制作序列帧动画
- vb adodb mysql_PHP_ADODB类使用,MySQL的例子PHP中最通用的数据 - phpStudy
- 直播APP源码开发,直播APP源码搭建,如何优化程序?
- 7教程统计意义_SPSS混合线性模型在生物医药统计中的应用——杏花开生物医药统计...
- java生成随机名字
- rpm的mysql怎么安装_MySQL的rpm安装教程
- 关于电子发票打印报销最优美的姿势——发票大师网页版
- prior 和 priori的区别
- 如何在D盘以管理员身份,运行cmd
- BZOJ[3039]玉蟾宫 悬线法
- 相关系数-excel-CORREL()