1 | <?php |
||
12 | class Co implements CoInterface |
||
13 | { |
||
14 | /** |
||
15 | * Instance of myself. |
||
16 | * @var Co |
||
17 | */ |
||
18 | private static $self; |
||
19 | |||
20 | /** |
||
21 | * Options. |
||
22 | * @var CoOption |
||
23 | */ |
||
24 | private $options; |
||
25 | |||
26 | /** |
||
27 | * cURL request pool object. |
||
28 | * @var CURLPool |
||
29 | */ |
||
30 | private $pool; |
||
31 | |||
32 | /** |
||
33 | * Overwrite CoOption default. |
||
34 | * @param array $options |
||
35 | */ |
||
36 | 1 | public static function setDefaultOptions(array $options) |
|
37 | 1 | { |
|
38 | 1 | CoOption::setDefault($options); |
|
39 | 1 | } |
|
40 | |||
41 | /** |
||
42 | * Get CoOption default as array. |
||
43 | * @return array |
||
44 | */ |
||
45 | 1 | public static function getDefaultOptions() |
|
46 | 1 | { |
|
47 | 1 | return CoOption::getDefault(); |
|
48 | } |
||
49 | |||
50 | /** |
||
51 | * Wait until value is recursively resolved to return it. |
||
52 | * This function call must be atomic. |
||
53 | * @param mixed $value |
||
54 | * @param array $options |
||
55 | * @return mixed |
||
56 | */ |
||
57 | 18 | public static function wait($value, array $options = []) |
|
58 | 18 | { |
|
59 | try { |
||
60 | 18 | if (self::$self) { |
|
61 | 1 | throw new \BadMethodCallException('Co::wait() is already running. Use Co::async() instead.'); |
|
62 | } |
||
63 | 18 | self::$self = new self; |
|
64 | 18 | self::$self->options = new CoOption($options); |
|
65 | 18 | self::$self->pool = new CURLPool(self::$self->options); |
|
66 | 18 | return self::$self->start($value); |
|
67 | } finally { |
||
68 | 18 | self::$self = null; |
|
69 | } |
||
70 | // @codeCoverageIgnoreStart |
||
71 | } |
||
72 | // @codeCoverageIgnoreEnd |
||
73 | |||
74 | /** |
||
75 | * Value is recursively resolved, but we never wait it. |
||
76 | * This function must be called along with Co::wait(). |
||
77 | * @param mixed $value |
||
78 | * @param mixed $throw |
||
79 | */ |
||
80 | 6 | public static function async($value, $throw = null) |
|
81 | 6 | { |
|
82 | 6 | if (!self::$self) { |
|
83 | 1 | throw new \BadMethodCallException('Co::async() must be called along with Co::wait(). '); |
|
84 | } |
||
85 | 5 | if ($throw !== null) { |
|
86 | 3 | $throw = filter_var($throw, FILTER_VALIDATE_BOOLEAN, [ |
|
87 | 3 | 'flags' => FILTER_NULL_ON_FAILURE, |
|
88 | ]); |
||
89 | 3 | if ($throw === null) { |
|
90 | 1 | throw new \InvalidArgumentException("\$throw must be null or boolean."); |
|
91 | } |
||
92 | } |
||
93 | 4 | self::$self->start($value, false, $throw); |
|
94 | 4 | } |
|
95 | |||
96 | /** |
||
97 | * External instantiation is forbidden. |
||
98 | */ |
||
99 | private function __construct() {} |
||
100 | |||
101 | /** |
||
102 | * Start resovling. |
||
103 | * @param mixed $value |
||
104 | * @param bool $wait |
||
105 | * @param mixed $throw Used for Co::async() overrides. |
||
106 | * @param mixed If $wait, return resolved value. |
||
107 | */ |
||
108 | 18 | private function start($value, $wait = true, $throw = null) |
|
109 | 18 | { |
|
110 | 18 | $deferred = new Deferred; |
|
111 | // For convenience, all values are wrapped into generator |
||
112 | $genfunc = function () use ($throw, $value, &$return) { |
||
113 | try { |
||
114 | 18 | if ($throw !== null) { |
|
115 | 2 | $key = $throw ? null : CoInterface::SAFE; |
|
116 | } else { |
||
117 | 18 | $key = $this->options['throw'] ? null : CoInterface::SAFE; |
|
118 | } |
||
119 | 18 | $return = (yield $key => $value); |
|
120 | 4 | } catch (\RuntimeException $e) { |
|
121 | 4 | $this->pool->reserveHaltException($e); |
|
122 | } |
||
123 | 18 | }; |
|
124 | 18 | $con = Utils::normalize($genfunc); |
|
125 | // We have to provide deferred object only if $wait |
||
126 | 18 | $this->processGeneratorContainer($con, $deferred); |
|
127 | // We have to wait $return only if $wait |
||
128 | 15 | if ($wait) { |
|
129 | 15 | $this->pool->wait(); |
|
130 | 11 | return $return; |
|
131 | } |
||
132 | 4 | } |
|
133 | |||
134 | /** |
||
135 | * Handle resolving generators. |
||
136 | * @param GeneratorContainer $gc |
||
137 | * @param Deferred $deferred |
||
138 | */ |
||
139 | 18 | private function processGeneratorContainer(GeneratorContainer $gc, Deferred $deferred) |
|
140 | 18 | { |
|
141 | // If generator has no more yields... |
||
142 | 18 | if (!$gc->valid()) { |
|
143 | // If exception has been thrown in generator, we have to propagate it as rejected value |
||
144 | 15 | if ($gc->thrown()) { |
|
145 | 10 | $deferred->reject($gc->getReturnOrThrown()); |
|
146 | 10 | return; |
|
147 | } |
||
148 | // Now we normalize returned value |
||
149 | 15 | $returned = Utils::normalize($gc->getReturnOrThrown(), $gc->getYieldKey()); |
|
150 | 15 | $yieldables = Utils::getYieldables($returned); |
|
151 | // If normalized value contains yieldables, we have to chain resolver |
||
152 | 15 | if ($yieldables) { |
|
153 | 4 | $this->promiseAll($yieldables, true)->then( |
|
154 | 4 | self::getApplier($returned, $yieldables, [$deferred, 'resolve']), |
|
155 | 4 | [$deferred, 'reject'] |
|
156 | ); |
||
157 | 4 | return; |
|
158 | } |
||
159 | // Propagate normalized returned value |
||
160 | 15 | $deferred->resolve($returned); |
|
161 | 15 | return; |
|
162 | } |
||
163 | |||
164 | // Check delay request yields |
||
165 | 18 | if ($gc->key() === CoInterface::DELAY) { |
|
166 | 3 | $dfd = new Deferred; |
|
167 | 3 | $this->pool->addDelay($gc->current(), $dfd); |
|
168 | $dfd->promise()->then(function () use ($gc) { |
||
169 | 3 | $gc->send(null); |
|
170 | })->always(function () use ($gc, $deferred) { |
||
171 | 3 | $this->processGeneratorContainer($gc, $deferred); |
|
172 | 3 | }); |
|
173 | 3 | return; |
|
174 | } |
||
175 | |||
176 | // Now we normalize yielded value |
||
177 | 18 | $yielded = Utils::normalize($gc->current()); |
|
178 | 18 | $yieldables = Utils::getYieldables($yielded); |
|
179 | 18 | if (!$yieldables) { |
|
180 | // If there are no yieldables, send yielded value back into generator |
||
181 | 16 | $gc->send($yielded); |
|
182 | // Continue |
||
183 | 13 | $this->processGeneratorContainer($gc, $deferred); |
|
184 | 13 | return; |
|
185 | } |
||
186 | |||
187 | // Chain resolver |
||
188 | 18 | $this->promiseAll($yieldables, $gc->key() !== CoInterface::SAFE)->then( |
|
189 | 15 | self::getApplier($yielded, $yieldables, [$gc, 'send']), |
|
190 | 15 | [$gc, 'throw_'] |
|
191 | )->always(function () use ($gc, $deferred) { |
||
192 | // Continue |
||
193 | 15 | $this->processGeneratorContainer($gc, $deferred); |
|
194 | 15 | }); |
|
195 | 15 | } |
|
196 | |||
197 | /** |
||
198 | * Return function that apply changes in yieldables. |
||
199 | * @param mixed $yielded |
||
200 | * @param array $yieldables |
||
201 | * @param callable $next |
||
202 | */ |
||
203 | 15 | private static function getApplier($yielded, $yieldables, callable $next) |
|
204 | 15 | { |
|
205 | return function (array $results) use ($yielded, $yieldables, $next) { |
||
206 | 12 | foreach ($results as $hash => $resolved) { |
|
207 | 12 | $current = &$yielded; |
|
208 | 12 | foreach ($yieldables[$hash]['keylist'] as $key) { |
|
209 | 6 | $current = &$current[$key]; |
|
210 | } |
||
211 | 12 | $current = $resolved; |
|
212 | 12 | unset($current); |
|
213 | } |
||
214 | 12 | $next($yielded); |
|
215 | 15 | }; |
|
216 | } |
||
217 | |||
218 | /** |
||
219 | * Promise all changes in yieldables are prepared. |
||
220 | * @param array $yieldables |
||
221 | * @param bool $throw_acceptable |
||
222 | * @return PromiseInterface |
||
223 | */ |
||
224 | 18 | private function promiseAll($yieldables, $throw_acceptable) |
|
253 | } |
||
254 |