1
|
|
|
<?php |
2
|
|
|
|
3
|
|
|
/* |
4
|
|
|
* This file is part of the slince/spike package. |
5
|
|
|
* |
6
|
|
|
* (c) Slince <[email protected]> |
7
|
|
|
* |
8
|
|
|
* For the full copyright and license information, please view the LICENSE |
9
|
|
|
* file that was distributed with this source code. |
10
|
|
|
*/ |
11
|
|
|
|
12
|
|
|
namespace Slince\Common; |
13
|
|
|
|
14
|
|
|
use React\Promise; |
15
|
|
|
use React\Stream\ReadableStreamInterface; |
16
|
|
|
use Spike\Common\Exception\RuntimeException; |
17
|
|
|
use Spike\Common\Protocol\HttpHeaderParser; |
18
|
|
|
use Spike\Common\Protocol\StreamingJsonParser; |
19
|
|
|
|
20
|
|
|
/** |
21
|
|
|
* @param ReadableStreamInterface $stream |
22
|
|
|
* @param callable $resolve |
23
|
|
|
* @param callable $reject |
24
|
|
|
* @param StreamingJsonParser $streamParser |
25
|
|
|
*/ |
26
|
|
|
function jsonBuffer(ReadableStreamInterface $stream, callable $resolve, callable $reject = null, StreamingJsonParser $streamParser = null) |
27
|
|
|
{ |
28
|
|
|
// stream already ended => resolve with empty buffer |
29
|
|
|
if (!$stream->isReadable()) { |
30
|
|
|
return; |
31
|
|
|
} |
32
|
|
|
if (null === $streamParser) { |
33
|
|
|
$streamParser = new StreamingJsonParser(); |
34
|
|
|
} |
35
|
|
|
$bufferer = function ($data) use ($resolve, $streamParser) { |
36
|
|
|
$parsed = $streamParser->push($data); |
37
|
|
|
if ($parsed) { |
|
|
|
|
38
|
|
|
$resolve($parsed); |
39
|
|
|
} |
40
|
|
|
}; |
41
|
|
|
$stream->on('data', $bufferer); |
42
|
|
|
$stream->on('error', function ($error) use ($stream, $bufferer, $reject) { |
43
|
|
|
$stream->removeListener('data', $bufferer); |
44
|
|
|
$reject && $reject(new RuntimeException('An error occured on the underlying stream while buffering', 0, $error)); |
45
|
|
|
}); |
46
|
|
|
$stream->on('close', function () use ($resolve, $streamParser) { |
47
|
|
|
$resolve($streamParser->push('')); |
48
|
|
|
}); |
49
|
|
|
} |
50
|
|
|
|
51
|
|
|
/** |
52
|
|
|
* @param ReadableStreamInterface $stream |
53
|
|
|
* @param HttpHeaderParser $parser |
54
|
|
|
* |
55
|
|
|
* @return Promise\PromiseInterface |
56
|
|
|
*/ |
57
|
|
|
function httpHeaderBuffer(ReadableStreamInterface $stream, HttpHeaderParser $parser = null) |
58
|
|
|
{ |
59
|
|
|
// stream already ended => resolve with empty buffer |
60
|
|
|
if (!$stream->isReadable()) { |
61
|
|
|
return Promise\resolve(''); |
62
|
|
|
} |
63
|
|
|
if (null === $parser) { |
64
|
|
|
$parser = new HttpHeaderParser(); |
65
|
|
|
} |
66
|
|
|
$promise = new Promise\Promise(function ($resolve, $reject) use ($stream, &$bufferer, $parser) { |
67
|
|
|
$bufferer = function ($data) use ($resolve, $parser) { |
68
|
|
|
$parsed = $parser->push($data); |
69
|
|
|
if ($parsed) { |
|
|
|
|
70
|
|
|
$resolve($parsed); |
71
|
|
|
} |
72
|
|
|
}; |
73
|
|
|
$stream->on('data', $bufferer); |
74
|
|
|
$stream->on('error', function ($error) use ($reject) { |
75
|
|
|
$reject(new RuntimeException('An error occured on the underlying stream while buffering', 0, $error)); |
76
|
|
|
}); |
77
|
|
|
$stream->on('close', function () use ($resolve, $parser) { |
78
|
|
|
$resolve($parser->push('')); |
79
|
|
|
}); |
80
|
|
|
}, function ($_, $reject) { |
81
|
|
|
$reject(new RuntimeException('Cancelled buffering')); |
82
|
|
|
}); |
83
|
|
|
|
84
|
|
|
return $promise->then(null, function ($error) use (&$buffer, $bufferer, $stream) { |
85
|
|
|
// promise rejected => clear buffer and buffering |
86
|
|
|
$buffer = ''; |
87
|
|
|
$stream->removeListener('data', $bufferer); |
88
|
|
|
throw $error; |
89
|
|
|
}); |
90
|
|
|
} |
This check marks implicit conversions of arrays to boolean values in a comparison. While in PHP an empty array is considered to be equal (but not identical) to false, this is not always apparent.
Consider making the comparison explicit by using
empty(..)
or! empty(...)
instead.