Completed
Push — master ( 89ca27...29786d )
by Ryosuke
03:21
created

Co::safeDeferred()   A

Complexity

Conditions 1
Paths 1

Size

Total Lines 9
Code Lines 6

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 8
CRAP Score 1

Importance

Changes 0
Metric Value
c 0
b 0
f 0
dl 0
loc 9
ccs 8
cts 8
cp 1
rs 9.6666
cc 1
eloc 6
nc 1
nop 1
crap 1
1
<?php
2
3
namespace mpyw\Co;
4
use mpyw\Co\Internal\Utils;
5
use mpyw\Co\Internal\CoOption;
6
use mpyw\Co\Internal\GeneratorContainer;
7
use mpyw\Co\Internal\Pool;
8
9
use mpyw\RuntimePromise\Deferred;
10
use mpyw\RuntimePromise\PromiseInterface;
11
12
class Co implements CoInterface
13
{
14
    /**
15
     * Instance of myself.
16
     * @var Co
17
     */
18
    private static $self;
19
20
    /**
21
     * Options.
22
     * @var CoOption
23
     */
24
    private $options;
25
26
    /**
27
     * cURL request pool object.
28
     * @var Pool
29
     */
30
    private $pool;
31
32
    /**
33
     * Running cURL or Generator identifiers.
34
     * @var array
35
     */
36
    private $runners = [];
37
38
    /**
39
     * Overwrite CoOption default.
40
     * @param array $options
41
     */
42 1
    public static function setDefaultOptions(array $options)
43 1
    {
44 1
        CoOption::setDefault($options);
45 1
    }
46
47
    /**
48
     * Get CoOption default as array.
49
     * @return array
50
     */
51 1
    public static function getDefaultOptions()
52 1
    {
53 1
        return CoOption::getDefault();
54
    }
55
56
    /**
57
     * Wait until value is recursively resolved to return it.
58
     * This function call must be atomic.
59
     * @param  mixed $value
60
     * @param  array $options
61
     * @return mixed
62
     */
63 22
    public static function wait($value, array $options = [])
64 22
    {
65
        try {
66 22
            if (self::$self) {
67 1
                throw new \BadMethodCallException('Co::wait() is already running. Use Co::async() instead.');
68
            }
69 22
            self::$self = new self;
70 22
            self::$self->options = new CoOption($options);
71 22
            self::$self->pool = new Pool(self::$self->options);
72 22
            return self::$self->start($value);
73
        } finally {
74 22
            self::$self = null;
75
        }
76
        // @codeCoverageIgnoreStart
77
    }
78
    // @codeCoverageIgnoreEnd
79
80
    /**
81
     * Value is recursively resolved, but we never wait it.
82
     * This function must be called along with Co::wait().
83
     * @param  mixed $value
84
     * @param  mixed $throw
85
     */
86 6
    public static function async($value, $throw = null)
87 6
    {
88 6
        if (!self::$self) {
89 1
            throw new \BadMethodCallException('Co::async() must be called along with Co::wait(). ');
90
        }
91 5
        if ($throw !== null) {
92 3
            $throw = filter_var($throw, FILTER_VALIDATE_BOOLEAN, [
93 3
                'flags' => FILTER_NULL_ON_FAILURE,
94
            ]);
95 3
            if ($throw === null) {
96 1
                throw new \InvalidArgumentException("\$throw must be null or boolean.");
97
            }
98
        }
99 4
        self::$self->start($value, false, $throw);
100 4
    }
101
102
    /**
103
     * External instantiation is forbidden.
104
     */
105
    private function __construct() {}
106
107
    /**
108
     * Start resovling.
109
     * @param  mixed    $value
110
     * @param  bool     $wait
111
     * @param  mixed    $throw  Used for Co::async() overrides.
112
     * @param  mixed    If $wait, return resolved value.
113
     */
114 22
    private function start($value, $wait = true, $throw = null)
115 22
    {
116 22
        $deferred = new Deferred;
117 22
        $return = null;
118
        // For convenience, all values are wrapped into generator
119 22
        $con = Utils::normalize($this->getRootGenerator($throw, $value, $return));
120
        // We have to provide deferred object only if $wait
121 22
        $this->processGeneratorContainerRunning($con, $deferred);
122
        // We have to wait $return only if $wait
123 15
        if ($wait) {
124 15
            $this->pool->wait();
125 11
            return $return;
126
        }
127 4
    }
128
129
    /**
130
     * Handle resolving generators.
131
     * @param  GeneratorContainer $gc
132
     * @param  Deferred           $deferred
133
     */
134 18
    private function processGeneratorContainer(GeneratorContainer $gc, Deferred $deferred)
135 18
    {
136 18
        $gc->valid()
137 17
        ? $this->processGeneratorContainerRunning($gc, $deferred)
138 15
        : $this->processGeneratorContainerDone($gc, $deferred);
139 15
    }
140
141
    /**
142
     * Handle resolving generators already done.
143
     * @param  GeneratorContainer $gc
144
     * @param  Deferred           $deferred
145
     */
146 15
    private function processGeneratorContainerDone(GeneratorContainer $gc, Deferred $deferred)
147 15
    {
148
        // If exception has been thrown in generator, we have to propagate it as rejected value
149 15
        if ($gc->thrown()) {
150 10
            $deferred->reject($gc->getReturnOrThrown());
151 10
            return;
152
        }
153
154
        // Now we normalize returned value
155 15
        $returned = Utils::normalize($gc->getReturnOrThrown(), $gc->getYieldKey());
156 15
        $yieldables = Utils::getYieldables($returned, [], $this->runners);
157
158
        // If normalized value contains yieldables, we have to chain resolver
159 15
        if ($yieldables) {
160
            $this
161 4
            ->promiseAll($yieldables, true)
162 4
            ->then(
163 4
                self::getApplier($returned, $yieldables, [$deferred, 'resolve']),
164 4
                [$deferred, 'reject']
165
            )
166
            ->always(function () use ($yieldables) {
167 4
                $this->runners = array_diff_key($this->runners, $yieldables);
168 4
            });
169 4
            return;
170
        }
171
172
        // Propagate normalized returned value
173 15
        $deferred->resolve($returned);
174 15
    }
175
176
    /**
177
     * Handle resolving generators still running.
178
     * @param  GeneratorContainer $gc
179
     * @param  Deferred           $deferred
180
     */
181 22
    private function processGeneratorContainerRunning(GeneratorContainer $gc, Deferred $deferred)
182 22
    {
183
        // Check delay request yields
184 22
        if ($gc->key() === CoInterface::DELAY) {
185 3
            $dfd = new Deferred;
186 3
            $this->pool->addDelay($gc->current(), $dfd);
187
            $dfd
188 3
            ->promise()
189
            ->then(function () use ($gc) {
190 3
                $gc->send(null);
191 3
            })
192
            ->always(function () use ($gc, $deferred) {
193 3
                $this->processGeneratorContainer($gc, $deferred);
194 3
            });
195 3
            return;
196
        }
197
198
        // Now we normalize yielded value
199 22
        $yielded = Utils::normalize($gc->current());
200 22
        $yieldables = Utils::getYieldables($yielded, [], $this->runners);
201 18
        if (!$yieldables) {
202
            // If there are no yieldables, send yielded value back into generator
203 16
            $gc->send($yielded);
204
            // Continue
205 13
            $this->processGeneratorContainer($gc, $deferred);
206 13
            return;
207
        }
208
209
        // Chain resolver
210
        $this
211 18
        ->promiseAll($yieldables, $gc->key() !== CoInterface::SAFE)
212 15
        ->then(
213 15
            self::getApplier($yielded, $yieldables, [$gc, 'send']),
214 15
            [$gc, 'throw_']
215
        )->always(function () use ($gc, $deferred, $yieldables) {
216
            // Continue
217 15
            $this->runners = array_diff_key($this->runners, $yieldables);
218 15
            $this->processGeneratorContainer($gc, $deferred);
219 15
        });
220 15
    }
221
222
    /**
223
     * Return root wrapper generator.
224
     * @param  mixed  $throw
225
     * @param  mixed  $value
226
     * @param  mixed  &$return
227
     */
228 22
    private function getRootGenerator($throw, $value, &$return)
229 22
    {
230
        try {
231 22
            if ($throw !== null) {
232 2
                $key = $throw ? null : CoInterface::SAFE;
233
            } else {
234 22
                $key = $this->options['throw'] ? null : CoInterface::SAFE;
235
            }
236 22
            $return = (yield $key => $value);
237 4
        } catch (\RuntimeException $e) {
238 4
            $this->pool->reserveHaltException($e);
239
        }
240 15
    }
241
242
    /**
243
     * Return function that apply changes in yieldables.
244
     * @param  mixed    $yielded
245
     * @param  array    $yieldables
246
     * @param  callable $next
247
     */
248 15
    private static function getApplier($yielded, array $yieldables, callable $next)
249 15
    {
250
        return function (array $results) use ($yielded, $yieldables, $next) {
251 12
            foreach ($results as $hash => $resolved) {
252 12
                $current = &$yielded;
253 12
                foreach ($yieldables[$hash]['keylist'] as $key) {
254 6
                    $current = &$current[$key];
255
                }
256 12
                $current = $resolved;
257 12
                unset($current);
258
            }
259 12
            $next($yielded);
260 15
        };
261
    }
262
263
    /**
264
     * Promise all changes in yieldables are prepared.
265
     * @param  array $yieldables
266
     * @param  bool  $throw_acceptable
267
     * @return PromiseInterface
268
     */
269 18
    private function promiseAll(array $yieldables, $throw_acceptable)
270 18
    {
271 18
        $promises = [];
272 18
        foreach ($yieldables as $yieldable) {
273 18
            $dfd = new Deferred;
274 18
            $promises[(string)$yieldable['value']] = $dfd->promise();
275
            // If caller cannot accept exception,
276
            // we handle rejected value as resolved.
277 18
            if (!$throw_acceptable) {
278 10
                $dfd = self::safeDeferred($dfd);
279
            }
280
            // Add or enqueue cURL handles
281 18
            if (Utils::isCurl($yieldable['value'])) {
282 4
                $this->pool->addCurl($yieldable['value'], $dfd);
283 4
                continue;
284
            }
285
            // Process generators
286 17
            if (Utils::isGeneratorContainer($yieldable['value'])) {
287 17
                $this->processGeneratorContainer($yieldable['value'], $dfd);
288 14
                continue;
289
            }
290
        }
291 15
        return \mpyw\RuntimePromise\all($promises);
292
    }
293
294
    /**
295
     * Return Deferred that absorbs rejects.
296
     * @param  Deferred $original_dfd
297
     * @return Deferred
298
     */
299 10
    private static function safeDeferred(Deferred $original_dfd)
300 10
    {
301 10
        $dfd = new Deferred;
302 10
        $absorber = function ($any) use ($original_dfd) {
303 8
            $original_dfd->resolve($any);
304 10
        };
305 10
        $dfd->promise()->then($absorber, $absorber);
306 10
        return $dfd;
307
    }
308
}
309