|
1
|
|
|
<?php |
|
2
|
|
|
|
|
3
|
|
|
namespace mpyw\Co; |
|
4
|
|
|
use mpyw\Co\Internal\Utils; |
|
5
|
|
|
use mpyw\Co\Internal\CoOption; |
|
6
|
|
|
use mpyw\Co\Internal\GeneratorContainer; |
|
7
|
|
|
use mpyw\Co\Internal\CURLPool; |
|
8
|
|
|
|
|
9
|
|
|
use mpyw\RuntimePromise\Deferred; |
|
10
|
|
|
use mpyw\RuntimePromise\PromiseInterface; |
|
11
|
|
|
use function mpyw\RuntimePromise\all; |
|
12
|
|
|
|
|
13
|
|
|
class Co implements CoInterface |
|
14
|
|
|
{ |
|
15
|
|
|
/** |
|
16
|
|
|
* Instance of myself. |
|
17
|
|
|
* @var Co |
|
18
|
|
|
*/ |
|
19
|
|
|
private static $self; |
|
20
|
|
|
|
|
21
|
|
|
/** |
|
22
|
|
|
* Options. |
|
23
|
|
|
* @var CoOption |
|
24
|
|
|
*/ |
|
25
|
|
|
private $options; |
|
26
|
|
|
|
|
27
|
|
|
/** |
|
28
|
|
|
* cURL request pool object. |
|
29
|
|
|
* @var CURLPool |
|
30
|
|
|
*/ |
|
31
|
|
|
private $pool; |
|
32
|
|
|
|
|
33
|
|
|
/** |
|
34
|
|
|
* Overwrite CoOption default. |
|
35
|
|
|
* @param array $options |
|
36
|
|
|
*/ |
|
37
|
1 |
|
public static function setDefaultOptions(array $options) |
|
38
|
1 |
|
{ |
|
39
|
1 |
|
CoOption::setDefault($options); |
|
40
|
1 |
|
} |
|
41
|
|
|
|
|
42
|
|
|
/** |
|
43
|
|
|
* Get CoOption default as array. |
|
44
|
|
|
* @return array |
|
45
|
|
|
*/ |
|
46
|
1 |
|
public static function getDefaultOptions() |
|
47
|
1 |
|
{ |
|
48
|
1 |
|
return CoOption::getDefault(); |
|
49
|
|
|
} |
|
50
|
|
|
|
|
51
|
|
|
/** |
|
52
|
|
|
* Wait until value is recursively resolved to return it. |
|
53
|
|
|
* This function call must be atomic. |
|
54
|
|
|
* @param mixed $value |
|
55
|
|
|
* @param array $options |
|
56
|
|
|
* @return mixed |
|
57
|
|
|
* @codeCoverageIgnore |
|
58
|
|
|
*/ |
|
59
|
|
|
public static function wait($value, array $options = []) |
|
60
|
|
|
{ |
|
61
|
|
|
// Coverage analyzer does not support... |
|
62
|
|
|
// try { return; } finally { } |
|
|
|
|
|
|
63
|
|
|
try { |
|
64
|
|
|
if (self::$self) { |
|
65
|
|
|
throw new \BadMethodCallException('Co::wait() is already running. Use Co::async() instead.'); |
|
66
|
|
|
} |
|
67
|
|
|
self::$self = new self; |
|
68
|
|
|
self::$self->options = new CoOption($options); |
|
69
|
|
|
self::$self->pool = new CURLPool(self::$self->options); |
|
70
|
|
|
return self::$self->start($value); |
|
71
|
|
|
} finally { |
|
72
|
|
|
self::$self = null; |
|
73
|
|
|
} |
|
74
|
|
|
} |
|
75
|
|
|
|
|
76
|
|
|
/** |
|
77
|
|
|
* Value is recursively resolved, but we never wait it. |
|
78
|
|
|
* This function must be called along with Co::wait(). |
|
79
|
|
|
* @param mixed $value |
|
80
|
|
|
* @param array $options |
|
81
|
|
|
*/ |
|
82
|
3 |
|
public static function async($value, array $options = []) |
|
83
|
3 |
|
{ |
|
84
|
3 |
|
if (!self::$self) { |
|
85
|
1 |
|
throw new \BadMethodCallException( |
|
86
|
|
|
'Co::async() must be called along with Co::wait(). ' . |
|
87
|
1 |
|
'This method is mainly expected to be used in CURLOPT_WRITEFUNCTION callback.' |
|
88
|
|
|
); |
|
89
|
|
|
} |
|
90
|
2 |
|
self::$self->start($value, false); |
|
91
|
2 |
|
} |
|
92
|
|
|
|
|
93
|
|
|
/** |
|
94
|
|
|
* External instantiation is forbidden. |
|
95
|
|
|
*/ |
|
96
|
|
|
private function __construct() {} |
|
97
|
|
|
|
|
98
|
|
|
/** |
|
99
|
|
|
* Start resovling. |
|
100
|
|
|
* @param mixed $value |
|
101
|
|
|
* @param bool $wait |
|
102
|
|
|
* @param mixed If $wait, return resolved value. |
|
103
|
|
|
*/ |
|
104
|
10 |
|
private function start($value, $wait = true) |
|
105
|
10 |
|
{ |
|
106
|
10 |
|
$return = $exception = null; |
|
107
|
10 |
|
$deferred = new Deferred; |
|
108
|
10 |
|
$deferred->promise()->then( |
|
109
|
|
|
function ($r) use (&$return) { |
|
110
|
8 |
|
$return = $r; |
|
111
|
10 |
|
}, |
|
112
|
|
|
function ($e) use (&$exception) { |
|
113
|
1 |
|
$exception = $e; |
|
114
|
10 |
|
} |
|
115
|
|
|
); |
|
116
|
|
|
// For convenience, all values are wrapped into generator |
|
117
|
|
|
$genfunc = function () use ($value) { |
|
118
|
10 |
|
yield CoInterface::RETURN_WITH => (yield $value); |
|
119
|
10 |
|
}; |
|
120
|
10 |
|
$con = Utils::normalize($genfunc, $this->options); |
|
121
|
|
|
// We have to provide deferred object only if $wait |
|
122
|
10 |
|
$this->processGeneratorContainer($con, $deferred); |
|
123
|
|
|
// We have to wait $return only if $wait |
|
124
|
8 |
|
if ($wait) { |
|
125
|
8 |
|
$this->pool->wait(); |
|
126
|
8 |
|
if ($exception) { |
|
127
|
1 |
|
throw $exception; |
|
128
|
|
|
} |
|
129
|
8 |
|
return $return; |
|
130
|
|
|
} |
|
131
|
2 |
|
} |
|
132
|
|
|
|
|
133
|
|
|
/** |
|
134
|
|
|
* Handle resolving generators. |
|
135
|
|
|
* @param GeneratorContainer $gc |
|
136
|
|
|
* @param Deferred $deferred |
|
137
|
|
|
*/ |
|
138
|
10 |
|
private function processGeneratorContainer(GeneratorContainer $gc, Deferred $deferred) |
|
139
|
10 |
|
{ |
|
140
|
|
|
// If generator has no more yields... |
|
141
|
10 |
|
if (!$gc->valid()) { |
|
142
|
|
|
// If exception has been thrown in generator, we have to propagate it as rejected value |
|
143
|
8 |
|
if ($gc->thrown()) { |
|
144
|
2 |
|
$deferred->reject($gc->getReturnOrThrown()); |
|
145
|
2 |
|
return; |
|
146
|
|
|
} |
|
147
|
|
|
// Now we normalize returned value |
|
148
|
|
|
try { |
|
149
|
8 |
|
$returned = Utils::normalize($gc->getReturnOrThrown(), $gc->getOptions()); |
|
150
|
8 |
|
$yieldables = Utils::getYieldables($returned); |
|
151
|
|
|
// If normalized value contains yieldables, we have to chain resolver |
|
152
|
8 |
|
if ($yieldables) { |
|
153
|
2 |
|
$this->promiseAll($yieldables, true)->then( |
|
154
|
2 |
|
self::getApplier($returned, $yieldables, [$deferred, 'resolve']), |
|
155
|
2 |
|
[$deferred, 'reject'] |
|
156
|
|
|
); |
|
157
|
2 |
|
return; |
|
158
|
|
|
} |
|
159
|
|
|
// Propagate normalized returned value |
|
160
|
8 |
|
$deferred->resolve($returned); |
|
161
|
1 |
|
} catch (\RuntimeException $e) { |
|
162
|
|
|
// Propagate exception thrown in normalization |
|
163
|
1 |
|
$deferred->reject($e); |
|
164
|
|
|
} |
|
165
|
8 |
|
return; |
|
166
|
|
|
} |
|
167
|
|
|
|
|
168
|
|
|
// Now we normalize yielded value |
|
169
|
|
|
try { |
|
170
|
10 |
|
$yielded = Utils::normalize($gc->current(), $gc->getOptions(), $gc->key()); |
|
171
|
4 |
|
} catch (\RuntimeException $e) { |
|
172
|
|
|
// If exception thrown in normalization... |
|
173
|
|
|
// - If generator accepts exception, we throw it into generator |
|
174
|
|
|
// - If generator does not accept exception, we assume it as non-exception value |
|
175
|
2 |
|
$gc->throwAcceptable() ? $gc->throw_($e) : $gc->send($e); |
|
176
|
|
|
// Continue |
|
177
|
2 |
|
$this->processGeneratorContainer($gc, $deferred); |
|
178
|
2 |
|
return; |
|
179
|
|
|
} |
|
180
|
|
|
|
|
181
|
|
|
// Search yieldables from yielded value |
|
182
|
9 |
|
$yieldables = Utils::getYieldables($yielded); |
|
183
|
9 |
|
if (!$yieldables) { |
|
184
|
|
|
// If there are no yieldables, send yielded value back into generator |
|
185
|
5 |
|
$gc->send($yielded); |
|
186
|
|
|
// Continue |
|
187
|
5 |
|
$this->processGeneratorContainer($gc, $deferred); |
|
188
|
5 |
|
return; |
|
189
|
|
|
} |
|
190
|
|
|
|
|
191
|
|
|
// Chain resolver |
|
192
|
8 |
|
$this->promiseAll($yieldables, $gc->throwAcceptable())->then( |
|
193
|
7 |
|
self::getApplier($yielded, $yieldables, [$gc, 'send']), |
|
194
|
7 |
|
[$gc, 'throw_'] |
|
195
|
|
|
)->always(function () use ($gc, $deferred) { |
|
196
|
|
|
// Continue |
|
197
|
7 |
|
$this->processGeneratorContainer($gc, $deferred); |
|
198
|
7 |
|
}); |
|
199
|
7 |
|
} |
|
200
|
|
|
|
|
201
|
|
|
/** |
|
202
|
|
|
* Return function that apply changes in yieldables. |
|
203
|
|
|
* @param mixed $yielded |
|
204
|
|
|
* @param array $yieldables |
|
205
|
|
|
* @param callable $next |
|
206
|
|
|
*/ |
|
207
|
7 |
|
private static function getApplier($yielded, $yieldables, callable $next) |
|
208
|
7 |
|
{ |
|
209
|
|
|
return function (array $results) use ($yielded, $yieldables, $next) { |
|
210
|
7 |
|
foreach ($results as $hash => $resolved) { |
|
211
|
7 |
|
$current = &$yielded; |
|
212
|
7 |
|
foreach ($yieldables[$hash]['keylist'] as $key) { |
|
213
|
5 |
|
$current = &$current[$key]; |
|
214
|
|
|
} |
|
215
|
7 |
|
$current = $resolved; |
|
216
|
|
|
} |
|
217
|
7 |
|
$next($yielded); |
|
218
|
7 |
|
}; |
|
219
|
|
|
} |
|
220
|
|
|
|
|
221
|
|
|
/** |
|
222
|
|
|
* Promise all changes in yieldables are prepared. |
|
223
|
|
|
* @param array $yieldables |
|
224
|
|
|
* @param bool $throw_acceptable |
|
225
|
|
|
* @return PromiseInterface |
|
226
|
|
|
*/ |
|
227
|
8 |
|
private function promiseAll($yieldables, $throw_acceptable) |
|
228
|
8 |
|
{ |
|
229
|
8 |
|
$promises = []; |
|
230
|
8 |
|
foreach ($yieldables as $yieldable) { |
|
231
|
8 |
|
$dfd = new Deferred; |
|
232
|
8 |
|
$promises[(string)$yieldable['value']] = $dfd->promise(); |
|
233
|
|
|
// If caller cannot accept exception, |
|
234
|
|
|
// we handle rejected value as resolved. |
|
235
|
8 |
|
if (!$throw_acceptable) { |
|
236
|
5 |
|
$original_dfd = $dfd; |
|
237
|
5 |
|
$dfd = new Deferred; |
|
238
|
5 |
|
$absorber = function ($any) use ($original_dfd) { |
|
239
|
4 |
|
$original_dfd->resolve($any); |
|
240
|
5 |
|
}; |
|
241
|
5 |
|
$dfd->promise()->then($absorber, $absorber); |
|
242
|
|
|
} |
|
243
|
|
|
// Add or enqueue cURL handles |
|
244
|
8 |
|
if (Utils::isCurl($yieldable['value'])) { |
|
245
|
4 |
|
$this->pool->addOrEnqueue($yieldable['value'], $dfd); |
|
246
|
4 |
|
continue; |
|
247
|
|
|
} |
|
248
|
|
|
// Process generators |
|
249
|
7 |
|
if (Utils::isGeneratorContainer($yieldable['value'])) { |
|
250
|
7 |
|
$this->processGeneratorContainer($yieldable['value'], $dfd); |
|
251
|
6 |
|
continue; |
|
252
|
|
|
} |
|
253
|
|
|
} |
|
254
|
7 |
|
return all($promises); |
|
255
|
|
|
} |
|
256
|
|
|
} |
|
257
|
|
|
|
Sometimes obsolete code just ends up commented out instead of removed. In this case it is better to remove the code once you have checked you do not need it.
The code might also have been commented out for debugging purposes. In this case it is vital that someone uncomments it again or your project may behave in very unexpected ways in production.
This check looks for comments that seem to be mostly valid code and reports them.