Completed
Push — master ( ebcbe7...06aa7a )
by Ryosuke
04:11
created

Co::getApplier()   A

Complexity

Conditions 3
Paths 1

Size

Total Lines 14
Code Lines 9

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 10
CRAP Score 3

Importance

Changes 2
Bugs 1 Features 0
Metric Value
c 2
b 1
f 0
dl 0
loc 14
ccs 10
cts 10
cp 1
rs 9.4285
cc 3
eloc 9
nc 1
nop 3
crap 3
1
<?php
2
3
namespace mpyw\Co;
4
use mpyw\Co\Internal\TypeUtils;
5
use mpyw\Co\Internal\ControlUtils;
6
use mpyw\Co\Internal\YieldableUtils;
7
use mpyw\Co\Internal\CoOption;
8
use mpyw\Co\Internal\GeneratorContainer;
9
use mpyw\Co\Internal\Pool;
10
use mpyw\Co\Internal\ControlException;
11
12
use mpyw\RuntimePromise\Deferred;
13
use mpyw\RuntimePromise\PromiseInterface;
14
15
class Co implements CoInterface
16
{
17
    /**
18
     * Instance of myself.
19
     * @var Co
20
     */
21
    private static $self;
22
23
    /**
24
     * Options.
25
     * @var CoOption
26
     */
27
    private $options;
28
29
    /**
30
     * cURL request pool object.
31
     * @var Pool
32
     */
33
    private $pool;
34
35
    /**
36
     * Running cURL or Generator identifiers.
37
     * @var array
38
     */
39
    private $runners = [];
40
41
    /**
42
     * Overwrite CoOption default.
43
     * @param array $options
44
     */
45 1
    public static function setDefaultOptions(array $options)
46 1
    {
47 1
        CoOption::setDefault($options);
48 1
    }
49
50
    /**
51
     * Get CoOption default as array.
52
     * @return array
53
     */
54 1
    public static function getDefaultOptions()
55 1
    {
56 1
        return CoOption::getDefault();
57
    }
58
59
    /**
60
     * Wait until value is recursively resolved to return it.
61
     * This function call must be atomic.
62
     * @param  mixed $value
63
     * @param  array $options
64
     * @return mixed
65
     */
66 30
    public static function wait($value, array $options = [])
67 30
    {
68
        try {
69 30
            if (self::$self) {
70 1
                throw new \BadMethodCallException('Co::wait() is already running. Use Co::async() instead.');
71
            }
72 30
            self::$self = new self;
73 30
            self::$self->options = new CoOption($options);
74 30
            self::$self->pool = new Pool(self::$self->options);
75 30
            return self::$self->start($value);
76
        } finally {
77 30
            self::$self = null;
78
        }
79
        // @codeCoverageIgnoreStart
80
    }
81
    // @codeCoverageIgnoreEnd
82
83
    /**
84
     * Value is recursively resolved, but we never wait it.
85
     * This function must be called along with Co::wait().
86
     * @param  mixed $value
87
     * @param  mixed $throw
88
     */
89 6
    public static function async($value, $throw = null)
90 6
    {
91 6
        if (!self::$self) {
92 1
            throw new \BadMethodCallException('Co::async() must be called along with Co::wait(). ');
93
        }
94 5
        if ($throw !== null) {
95 3
            $throw = filter_var($throw, FILTER_VALIDATE_BOOLEAN, [
96 3
                'flags' => FILTER_NULL_ON_FAILURE,
97
            ]);
98 3
            if ($throw === null) {
99 1
                throw new \InvalidArgumentException("\$throw must be null or boolean.");
100
            }
101
        }
102 4
        self::$self->start($value, false, $throw);
103 4
    }
104
105
    /**
106
     * External instantiation is forbidden.
107
     */
108
    private function __construct() {}
109
110
    /**
111
     * Start resovling.
112
     * @param  mixed    $value
113
     * @param  bool     $wait
114
     * @param  mixed    $throw  Used for Co::async() overrides.
115
     * @param  mixed    If $wait, return resolved value.
116
     */
117 30
    private function start($value, $wait = true, $throw = null)
118 30
    {
119 30
        $deferred = new Deferred;
120 30
        $return = null;
121
        // For convenience, all values are wrapped into generator
122 30
        $con = YieldableUtils::normalize($this->getRootGenerator($throw, $value, $return));
123
        // We have to provide deferred object only if $wait
124 30
        $this->processGeneratorContainerRunning($con, $deferred);
125
        // We have to wait $return only if $wait
126 23
        if ($wait) {
127 23
            $this->pool->wait();
128 15
            return $return;
129
        }
130 4
    }
131
132
    /**
133
     * Handle resolving generators.
134
     * @param  GeneratorContainer $gc
135
     * @param  Deferred           $deferred
136
     */
137 26
    private function processGeneratorContainer(GeneratorContainer $gc, Deferred $deferred)
138 26
    {
139 26
        $gc->valid()
140 25
        ? $this->processGeneratorContainerRunning($gc, $deferred)
141 23
        : $this->processGeneratorContainerDone($gc, $deferred);
142 23
    }
143
144
    /**
145
     * Handle resolving generators already done.
146
     * @param  GeneratorContainer $gc
147
     * @param  Deferred           $deferred
148
     */
149 23
    private function processGeneratorContainerDone(GeneratorContainer $gc, Deferred $deferred)
150 23
    {
151
        // If exception has been thrown in generator, we have to propagate it as rejected value
152 23
        if ($gc->thrown()) {
153 17
            $deferred->reject($gc->getReturnOrThrown());
154 17
            return;
155
        }
156
157
        // Now we normalize returned value
158 23
        $returned = YieldableUtils::normalize($gc->getReturnOrThrown(), $gc->getYieldKey());
159 23
        $yieldables = YieldableUtils::getYieldables($returned, [], $this->runners);
160
161
        // If normalized value contains yieldables, we have to chain resolver
162 23
        if ($yieldables) {
163
            $this
164 4
            ->promiseAll($yieldables, true)
165 4
            ->then(
166 4
                YieldableUtils::getApplier($returned, $yieldables, [$deferred, 'resolve']),
167 4
                [$deferred, 'reject']
168
            )
169
            ->always(function () use ($yieldables) {
170 4
                $this->runners = array_diff_key($this->runners, $yieldables);
171 4
            });
172 4
            return;
173
        }
174
175
        // Propagate normalized returned value
176 23
        $deferred->resolve($returned);
177 23
    }
178
179
    /**
180
     * Handle resolving generators still running.
181
     * @param  GeneratorContainer $gc
182
     * @param  Deferred           $deferred
183
     */
184 30
    private function processGeneratorContainerRunning(GeneratorContainer $gc, Deferred $deferred)
185 30
    {
186
        // Check delay request yields
187 30
        if ($gc->key() === CoInterface::DELAY) {
188 3
            $dfd = new Deferred;
189 3
            $this->pool->addDelay($gc->current(), $dfd);
190
            $dfd
191 3
            ->promise()
192
            ->then(function () use ($gc) {
193 3
                $gc->send(null);
194 3
            })
195
            ->always(function () use ($gc, $deferred) {
196 3
                $this->processGeneratorContainer($gc, $deferred);
197 3
            });
198 3
            return;
199
        }
200
201
        // Now we normalize yielded value
202 30
        $yielded = YieldableUtils::normalize($gc->current());
203 30
        $yieldables = YieldableUtils::getYieldables($yielded, [], $this->runners);
204 26
        if (!$yieldables) {
205
            // If there are no yieldables, send yielded value back into generator
206 18
            $gc->send($yielded);
207
            // Continue
208 15
            $this->processGeneratorContainer($gc, $deferred);
209 15
            return;
210
        }
211
212
        // Chain resolver
213
        $this
214 26
        ->promiseAll($yieldables, $gc->key() !== CoInterface::SAFE)
215 23
        ->then(
216 23
            YieldableUtils::getApplier($yielded, $yieldables, [$gc, 'send']),
217 23
            [$gc, 'throw_']
218 23
        )->always(function () use ($gc, $deferred, $yieldables) {
219
            // Continue
220 23
            $this->runners = array_diff_key($this->runners, $yieldables);
221 23
            $this->processGeneratorContainer($gc, $deferred);
222 23
        });
223 23
    }
224
225
    /**
226
     * Return root wrapper generator.
227
     * @param  mixed  $throw
228
     * @param  mixed  $value
229
     * @param  mixed  &$return
230
     */
231 30
    private function getRootGenerator($throw, $value, &$return)
232 30
    {
233
        try {
234 30
            if ($throw !== null) {
235 2
                $key = $throw ? null : CoInterface::SAFE;
236
            } else {
237 30
                $key = $this->options['throw'] ? null : CoInterface::SAFE;
238
            }
239 30
            $return = (yield $key => $value);
240 8
        } catch (\RuntimeException $e) {
241 8
            $this->pool->reserveHaltException($e);
242
        }
243 23
    }
244
245
    /**
246
     * Promise all changes in yieldables are prepared.
247
     * @param  array $yieldables
248
     * @param  bool  $throw_acceptable
249
     * @return PromiseInterface
250
     */
251 26
    private function promiseAll(array $yieldables, $throw_acceptable)
252 26
    {
253 26
        $promises = [];
254 26
        foreach ($yieldables as $yieldable) {
255 26
            $dfd = new Deferred;
256 26
            $promises[(string)$yieldable['value']] = $dfd->promise();
257
            // If caller cannot accept exception,
258
            // we handle rejected value as resolved.
259 26
            if (!$throw_acceptable) {
260 10
                $dfd = YieldableUtils::safeDeferred($dfd);
261
            }
262
            // Add or enqueue cURL handles
263 26
            if (TypeUtils::isCurl($yieldable['value'])) {
264 10
                $this->pool->addCurl($yieldable['value'], $dfd);
265 10
                continue;
266
            }
267
            // Process generators
268 25
            if (TypeUtils::isGeneratorContainer($yieldable['value'])) {
269 25
                $this->processGeneratorContainer($yieldable['value'], $dfd);
270 22
                continue;
271
            }
272
        }
273 23
        return \mpyw\RuntimePromise\all($promises);
274
    }
275
276
    /**
277
     * Wrap value with the Generator that returns the first successful result.
278
     * If all yieldables failed, AllFailedException is thrown.
279
     * If no yieldables found, AllFailedException is thrown.
280
     *
281
     * @param  mixed $value
282
     * @return mixed Resolved value.
283
     * @throws AllFailedException
284
     */
285 4
    public static function any($value)
286 4
    {
287 4
        yield Co::RETURN_WITH => (yield ControlUtils::anyOrRace(
288
            $value,
289 4
            ['\mpyw\Co\Internal\ControlUtils', 'reverse'],
290 4
            'Co::any() failed.'
291
        ));
292
        // @codeCoverageIgnoreStart
293
    }
294
    // @codeCoverageIgnoreEnd
295
296
    /**
297
     * Wrap value with the Generator that returns the first result.
298
     * If no yieldables found, AllFailedException is thrown.
299
     *
300
     * @param  mixed $value
301
     * @return mixed Resolved value.
302
     * @throws \RuntimeException|AllFailedException
303
     */
304 3
    public static function race($value)
305 3
    {
306 3
        yield Co::RETURN_WITH => (yield ControlUtils::anyOrRace(
307
            $value,
308 3
            ['\mpyw\Co\Internal\ControlUtils', 'fail'],
309 3
            'Co::race() failed.'
310
        ));
311
        // @codeCoverageIgnoreStart
312
    }
313
    // @codeCoverageIgnoreEnd
314
315
    /**
316
     * Wrap value with the Generator that returns the all results.
317
     * Normally you don't have to use this method, just yield an array that contains yieldables.
318
     * You should use only with Co::race() or Co::any().
319
     *
320
     * @param  mixed $value
321
     * @return mixed Resolved value.
322
     * @throws \RuntimeException
323
     */
324 1
    public static function all($value)
325 1
    {
326 1
        yield Co::RETURN_WITH => (yield $value);
327
        // @codeCoverageIgnoreStart
328
    }
329
    // @codeCoverageIgnoreEnd
330
}
331