Co::__construct()   A
last analyzed

Complexity

Conditions 1
Paths 1

Size

Total Lines 1

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 0
CRAP Score 2

Importance

Changes 0
Metric Value
dl 0
loc 1
ccs 0
cts 0
cp 0
rs 10
c 0
b 0
f 0
cc 1
nc 1
nop 0
crap 2
1
<?php
2
3
namespace mpyw\Co;
4
use mpyw\Co\Internal\TypeUtils;
5
use mpyw\Co\Internal\ControlUtils;
6
use mpyw\Co\Internal\YieldableUtils;
7
use mpyw\Co\Internal\CoOption;
8
use mpyw\Co\Internal\GeneratorContainer;
9
use mpyw\Co\Internal\Pool;
10
use React\Promise\Deferred;
11
use React\Promise\PromiseInterface;
12
use React\Promise\ExtendedPromiseInterface;
13
use React\Promise\FulfilledPromise;
14
use React\Promise\RejectedPromise;
15
16
class Co implements CoInterface
17
{
18
    /**
19
     * Instance of myself.
20
     * @var Co
21
     */
22
    private static $self;
23
24
    /**
25
     * Options.
26
     * @var CoOption
27
     */
28
    private $options;
29
30
    /**
31
     * cURL request pool object.
32
     * @var Pool
33
     */
34
    private $pool;
35
36
    /**
37
     * Running cURL or Generator identifiers.
38
     * @var array
39
     */
40
    private $runners = [];
41
42
    /**
43
     * Overwrite CoOption default.
44
     * @param array $options
45
     */
46 1
    public static function setDefaultOptions(array $options)
47 1
    {
48 1
        CoOption::setDefault($options);
49 1
    }
50
51
    /**
52
     * Get CoOption default as array.
53
     * @return array
54
     */
55 1
    public static function getDefaultOptions()
56 1
    {
57 1
        return CoOption::getDefault();
58
    }
59
60
    /**
61
     * Wait until value is recursively resolved to return it.
62
     * This function call must be atomic.
63
     * @param  mixed $value
64
     * @param  array $options
65
     * @return mixed
66
     */
67 31
    public static function wait($value, array $options = [])
68 31
    {
69
        try {
70 31
            if (self::$self) {
71 1
                throw new \BadMethodCallException('Co::wait() is already running. Use Co::async() instead.');
72
            }
73 31
            self::$self = new self;
74 31
            self::$self->options = new CoOption($options);
75 31
            self::$self->pool = new Pool(self::$self->options);
76 31
            return self::$self->start($value);
77
        } finally {
78 31
            self::$self = null;
79
        }
80
        // @codeCoverageIgnoreStart
81
    }
82
    // @codeCoverageIgnoreEnd
83
84
    /**
85
     * Value is recursively resolved, but we never wait it.
86
     * This function must be called along with Co::wait().
87
     * @param  mixed $value
88
     * @param  mixed $throw
89
     */
90 6
    public static function async($value, $throw = null)
91 6
    {
92 6
        if (!self::$self) {
93 1
            throw new \BadMethodCallException('Co::async() must be called along with Co::wait(). ');
94
        }
95 5
        if ($throw !== null) {
96 3
            $throw = filter_var($throw, FILTER_VALIDATE_BOOLEAN, [
97 3
                'flags' => FILTER_NULL_ON_FAILURE,
98
            ]);
99 3
            if ($throw === null) {
100 1
                throw new \InvalidArgumentException("\$throw must be null or boolean.");
101
            }
102
        }
103 4
        self::$self->start($value, false, $throw);
104 4
    }
105
106
    /**
107
     * Return if Co::wait() is running.
108
     * @return bool
109
     */
110 2
    public static function isRunning()
111 2
    {
112 2
        return (bool)self::$self;
113
    }
114
115
    /**
116
     * External instantiation is forbidden.
117
     */
118
    private function __construct() {}
119
120
    /**
121
     * Start resovling.
122
     * @param  mixed    $value
123
     * @param  bool     $wait
124
     * @param  mixed    $throw  Used for Co::async() overrides.
125
     * @param  mixed    If $wait, return resolved value.
126
     */
127 31
    private function start($value, $wait = true, $throw = null)
128 31
    {
129 31
        $return = null;
130
        // For convenience, all values are wrapped into generator
131 31
        $con = YieldableUtils::normalize($this->getRootGenerator($throw, $value, $return));
132 31
        $promise = $this->processGeneratorContainerRunning($con);
133 27
        if ($promise instanceof ExtendedPromiseInterface) {
134
            // This is actually 100% true; just used for unwrapping Exception thrown.
135 27
            $promise->done();
136
        }
137
        // We have to wait $return only if $wait
138 27
        if ($wait) {
139 27
            $this->pool->wait();
140 17
            return $return;
141
        }
142 4
    }
143
144
    /**
145
     * Handle resolving generators.
146
     * @param  GeneratorContainer $gc
147
     * @return PromiseInterface
148
     */
149 27
    private function processGeneratorContainer(GeneratorContainer $gc)
150 27
    {
151 27
        return $gc->valid()
152 26
            ? $this->processGeneratorContainerRunning($gc)
153 27
            : $this->processGeneratorContainerDone($gc);
154
    }
155
156
    /**
157
     * Handle resolving generators already done.
158
     * @param  GeneratorContainer $gc
159
     * @return PromiseInterface
160
     */
161 27
    private function processGeneratorContainerDone(GeneratorContainer $gc)
162 27
    {
163
        // If exception has been thrown in generator, we have to propagate it as rejected value
164 27
        if ($gc->thrown()) {
165 20
            return new RejectedPromise($gc->getReturnOrThrown());
166
        }
167
168
        // Now we normalize returned value
169 27
        $returned = YieldableUtils::normalize($gc->getReturnOrThrown(), $gc->getYieldKey());
170 27
        $yieldables = YieldableUtils::getYieldables($returned, [], $this->runners);
171
172
        // If normalized value contains yieldables, we have to chain resolver
173 27
        if ($yieldables) {
174 4
            $deferred = new Deferred;
175 4
            return $this->promiseAll($yieldables, true)
176 4
            ->then(
177 4
                YieldableUtils::getApplier($returned, $yieldables, [$deferred, 'resolve']),
178 4
                [$deferred, 'reject']
179
            )
180 4
            ->then(function () use ($yieldables, $deferred) {
181 4
                $this->runners = array_diff_key($this->runners, $yieldables);
182 4
                return $deferred->promise();
183 4
            });
184
        }
185
186
        // Propagate normalized returned value
187 27
        return new FulfilledPromise($returned);
188
    }
189
190
    /**
191
     * Handle resolving generators still running.
192
     * @param  GeneratorContainer $gc
193
     * @return PromiseInterface
194
     */
195 31
    private function processGeneratorContainerRunning(GeneratorContainer $gc)
196 31
    {
197
        // Check delay request yields
198 31
        if ($gc->key() === CoInterface::DELAY) {
199 3
            return $this->pool->addDelay($gc->current())
200 3
            ->then(function () use ($gc) {
201 3
                $gc->send(null);
202 3
                return $this->processGeneratorContainer($gc);
203 3
            });
204
        }
205
206
        // Now we normalize yielded value
207 31
        $yielded = YieldableUtils::normalize($gc->current());
208 31
        $yieldables = YieldableUtils::getYieldables($yielded, [], $this->runners);
209 27
        if (!$yieldables) {
210
            // If there are no yieldables, send yielded value back into generator
211 19
            $gc->send($yielded);
212
            // Continue
213 19
            return $this->processGeneratorContainer($gc);
214
        }
215
216
        // Chain resolver
217 27
        return $this->promiseAll($yieldables, $gc->key() !== CoInterface::SAFE)
218 27
        ->then(
219 27
            YieldableUtils::getApplier($yielded, $yieldables, [$gc, 'send']),
220 27
            [$gc, 'throw_']
221 27
        )->then(function () use ($gc, $yieldables) {
222
            // Continue
223 27
            $this->runners = array_diff_key($this->runners, $yieldables);
224 27
            return $this->processGeneratorContainer($gc);
225 27
        });
226
    }
227
228
    /**
229
     * Return root wrapper generator.
230
     * @param  mixed  $throw
231
     * @param  mixed  $value
232
     * @param  mixed  &$return
233
     */
234 31
    private function getRootGenerator($throw, $value, &$return)
235 31
    {
236
        try {
237 31
            if ($throw !== null) {
238 2
                $key = $throw ? null : CoInterface::SAFE;
239
            } else {
240 31
                $key = $this->options['throw'] ? null : CoInterface::SAFE;
241
            }
242 31
            $return = (yield $key => $value);
243 18
            return;
244 11
        } catch (\Throwable $e) {} catch (\Exception $e) {}
245 11
        $this->pool->reserveHaltException($e);
246 11
    }
247
248
    /**
249
     * Promise all changes in yieldables are prepared.
250
     * @param  array $yieldables
251
     * @param  bool  $throw_acceptable
252
     * @return PromiseInterface
253
     */
254 27
    private function promiseAll(array $yieldables, $throw_acceptable)
255 27
    {
256 27
        $promises = [];
257 27
        foreach ($yieldables as $yieldable) {
258
            // Add or enqueue cURL handles
259 27
            if (TypeUtils::isCurl($yieldable['value'])) {
260 10
                $promises[(string)$yieldable['value']] = $this->pool->addCurl($yieldable['value']);
261 10
                continue;
262
            }
263
            // Process generators
264 26
            if (TypeUtils::isGeneratorContainer($yieldable['value'])) {
265 26
                $promises[(string)$yieldable['value']] = $this->processGeneratorContainer($yieldable['value']);
266 26
                continue;
267
            }
268
        }
269
        // If caller cannot accept exception,
270
        // we handle rejected value as resolved.
271 27
        if (!$throw_acceptable) {
272 10
            $promises = array_map(
273 10
                ['\mpyw\Co\Internal\YieldableUtils', 'safePromise'],
274
                $promises
275
            );
276
        }
277 27
        return \React\Promise\all($promises);
278
    }
279
280
    /**
281
     * Wrap value with the Generator that returns the first successful result.
282
     * If all yieldables failed, AllFailedException is thrown.
283
     * If no yieldables found, AllFailedException is thrown.
284
     *
285
     * @param  mixed $value
286
     * @return \Generator Resolved value.
287
     * @throws AllFailedException
288
     */
289 4
    public static function any($value)
290 4
    {
291 4
        yield Co::RETURN_WITH => (yield ControlUtils::anyOrRace(
292
            $value,
293 4
            ['\mpyw\Co\Internal\ControlUtils', 'reverse'],
294 4
            'Co::any() failed.'
295
        ));
296
        // @codeCoverageIgnoreStart
297
    }
298
    // @codeCoverageIgnoreEnd
299
300
    /**
301
     * Wrap value with the Generator that returns the first result.
302
     * If no yieldables found, AllFailedException is thrown.
303
     *
304
     * @param  mixed $value
305
     * @return \Generator Resolved value.
306
     * @throws \RuntimeException|AllFailedException
307
     */
308 3
    public static function race($value)
309 3
    {
310 3
        yield Co::RETURN_WITH => (yield ControlUtils::anyOrRace(
311
            $value,
312 3
            ['\mpyw\Co\Internal\ControlUtils', 'fail'],
313 3
            'Co::race() failed.'
314
        ));
315
        // @codeCoverageIgnoreStart
316
    }
317
    // @codeCoverageIgnoreEnd
318
319
    /**
320
     * Wrap value with the Generator that returns the all results.
321
     * Normally you don't have to use this method, just yield an array that contains yieldables.
322
     * You should use only with Co::race() or Co::any().
323
     *
324
     * @param  mixed $value
325
     * @return \Generator Resolved value.
326
     * @throws \RuntimeException
327
     */
328 1
    public static function all($value)
329 1
    {
330 1
        yield Co::RETURN_WITH => (yield $value);
331
        // @codeCoverageIgnoreStart
332
    }
333
    // @codeCoverageIgnoreEnd
334
}
335