Completed
Push — master ( 495cea...8436d0 )
by Ryosuke
03:22
created

Co::getApplier()   A

Complexity

Conditions 3
Paths 1

Size

Total Lines 14
Code Lines 9

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 10
CRAP Score 3

Importance

Changes 2
Bugs 1 Features 0
Metric Value
c 2
b 1
f 0
dl 0
loc 14
ccs 10
cts 10
cp 1
rs 9.4285
cc 3
eloc 9
nc 1
nop 3
crap 3
1
<?php
2
3
namespace mpyw\Co;
4
use mpyw\Co\Internal\Utils;
5
use mpyw\Co\Internal\CoOption;
6
use mpyw\Co\Internal\GeneratorContainer;
7
use mpyw\Co\Internal\CURLPool;
8
9
use mpyw\RuntimePromise\Deferred;
10
use mpyw\RuntimePromise\PromiseInterface;
11
12
class Co implements CoInterface
13
{
14
    /**
15
     * Instance of myself.
16
     * @var Co
17
     */
18
    private static $self;
19
20
    /**
21
     * Options.
22
     * @var CoOption
23
     */
24
    private $options;
25
26
    /**
27
     * cURL request pool object.
28
     * @var CURLPool
29
     */
30
    private $pool;
31
32
    /**
33
     * Overwrite CoOption default.
34
     * @param array $options
35
     */
36 1
    public static function setDefaultOptions(array $options)
37 1
    {
38 1
        CoOption::setDefault($options);
39 1
    }
40
41
    /**
42
     * Get CoOption default as array.
43
     * @return array
44
     */
45 1
    public static function getDefaultOptions()
46 1
    {
47 1
        return CoOption::getDefault();
48
    }
49
50
    /**
51
     * Wait until value is recursively resolved to return it.
52
     * This function call must be atomic.
53
     * @param  mixed $value
54
     * @param  array $options
55
     * @return mixed
56
     */
57 15
    public static function wait($value, array $options = [])
58 15
    {
59
        try {
60 15
            if (self::$self) {
61 1
                throw new \BadMethodCallException('Co::wait() is already running. Use Co::async() instead.');
62
            }
63 15
            self::$self = new self;
64 15
            self::$self->options = new CoOption($options);
65 15
            self::$self->pool = new CURLPool(self::$self->options);
66 15
            return self::$self->start($value);
67
        } finally {
68 15
            self::$self = null;
69
        }
70
        // @codeCoverageIgnoreStart
71
    }
72
    // @codeCoverageIgnoreEnd
73
74
    /**
75
     * Value is recursively resolved, but we never wait it.
76
     * This function must be called along with Co::wait().
77
     * @param  mixed $value
78
     * @param  mixed $throw
79
     */
80 6
    public static function async($value, $throw = null)
81 6
    {
82 6
        if (!self::$self) {
83 1
            throw new \BadMethodCallException('Co::async() must be called along with Co::wait(). ');
84
        }
85 5
        if ($throw !== null) {
86 3
            $throw = filter_var($throw, FILTER_VALIDATE_BOOLEAN, [
87 3
                'flags' => FILTER_NULL_ON_FAILURE,
88
            ]);
89 3
            if ($throw === null) {
90 1
                throw new \InvalidArgumentException("\$throw must be null or boolean.");
91
            }
92
        }
93 4
        self::$self->start($value, false, $throw);
94 4
    }
95
96
    /**
97
     * External instantiation is forbidden.
98
     */
99
    private function __construct() {}
100
101
    /**
102
     * Start resovling.
103
     * @param  mixed    $value
104
     * @param  bool     $wait
105
     * @param  mixed    $throw  Used for Co::async() overrides.
106
     * @param  mixed    If $wait, return resolved value.
107
     */
108 15
    private function start($value, $wait = true, $throw = null)
109 15
    {
110 15
        $deferred = new Deferred;
111
        // For convenience, all values are wrapped into generator
112
        $genfunc = function () use ($value, &$return) {
113
            try {
114 15
                $return = (yield $value);
115 4
            } catch (\RuntimeException $e) {
116 4
                $this->pool->reserveHaltException($e);
117
            }
118 15
        };
119 15
        $options = $throw === null ? $this->options : $this->options->reconfigure(['throw' => $throw]);
120 15
        $con = Utils::normalize($genfunc, $options);
121
        // We have to provide deferred object only if $wait
122 15
        $this->processGeneratorContainer($con, $deferred);
123
        // We have to wait $return only if $wait
124 12
        if ($wait) {
125 12
            $this->pool->wait();
126 9
            return $return;
127
        }
128 4
    }
129
130
    /**
131
     * Handle resolving generators.
132
     * @param  GeneratorContainer $gc
133
     * @param  Deferred           $deferred
134
     */
135 15
    private function processGeneratorContainer(GeneratorContainer $gc, Deferred $deferred)
136 15
    {
137
        // If generator has no more yields...
138 15
        if (!$gc->valid()) {
139
            // If exception has been thrown in generator, we have to propagate it as rejected value
140 12
            if ($gc->thrown()) {
141 5
                $deferred->reject($gc->getReturnOrThrown());
142 5
                return;
143
            }
144
            // Now we normalize returned value
145 12
            $returned = Utils::normalize($gc->getReturnOrThrown(), $gc->getOptions());
146 12
            $yieldables = Utils::getYieldables($returned);
147
            // If normalized value contains yieldables, we have to chain resolver
148 12
            if ($yieldables) {
149 3
                $this->promiseAll($yieldables, true)->then(
150 3
                    self::getApplier($returned, $yieldables, [$deferred, 'resolve']),
151 3
                    [$deferred, 'reject']
152
                );
153 3
                return;
154
            }
155
            // Propagate normalized returned value
156 12
            $deferred->resolve($returned);
157 12
            return;
158
        }
159
160
        // Check delay request yields
161 15
        if ($gc->key() === CoInterface::DELAY) {
162 3
            $dfd = new Deferred;
163 3
            $this->pool->addDelay($gc->current(), $dfd);
164
            $dfd->promise()->then(function () use ($gc) {
165 3
                $gc->send(null);
166
            })->always(function () use ($gc, $deferred) {
167 3
                $this->processGeneratorContainer($gc, $deferred);
168 3
            });
169 3
            return;
170
        }
171
172
        // Now we normalize yielded value
173 15
        $yielded = Utils::normalize($gc->current(), $gc->getOptions(), $gc->key());
174 15
        $yieldables = Utils::getYieldables($yielded);
175 15
        if (!$yieldables) {
176
            // If there are no yieldables, send yielded value back into generator
177 13
            $gc->send($yielded);
178
            // Continue
179 10
            $this->processGeneratorContainer($gc, $deferred);
180 10
            return;
181
        }
182
183
        // Chain resolver
184 15
        $this->promiseAll($yieldables, $gc->throwAcceptable())->then(
185 12
            self::getApplier($yielded, $yieldables, [$gc, 'send']),
186 12
            [$gc, 'throw_']
187
        )->always(function () use ($gc, $deferred) {
188
            // Continue
189 12
            $this->processGeneratorContainer($gc, $deferred);
190 12
        });
191 12
    }
192
193
    /**
194
     * Return function that apply changes in yieldables.
195
     * @param  mixed    $yielded
196
     * @param  array    $yieldables
197
     * @param  callable $next
198
     */
199 12
    private static function getApplier($yielded, $yieldables, callable $next)
200 12
    {
201
        return function (array $results) use ($yielded, $yieldables, $next) {
202 10
            foreach ($results as $hash => $resolved) {
203 10
                $current = &$yielded;
204 10
                foreach ($yieldables[$hash]['keylist'] as $key) {
205 6
                    $current = &$current[$key];
206
                }
207 10
                $current = $resolved;
208 10
                unset($current);
209
            }
210 10
            $next($yielded);
211 12
        };
212
    }
213
214
    /**
215
     * Promise all changes in yieldables are prepared.
216
     * @param  array $yieldables
217
     * @param  bool  $throw_acceptable
218
     * @return PromiseInterface
219
     */
220 15
    private function promiseAll($yieldables, $throw_acceptable)
221 15
    {
222 15
        $promises = [];
223 15
        foreach ($yieldables as $yieldable) {
224 15
            $dfd = new Deferred;
225 15
            $promises[(string)$yieldable['value']] = $dfd->promise();
226
            // If caller cannot accept exception,
227
            // we handle rejected value as resolved.
228 15
            if (!$throw_acceptable) {
229 8
                $original_dfd = $dfd;
230 8
                $dfd = new Deferred;
231 8
                $absorber = function ($any) use ($original_dfd) {
232 6
                    $original_dfd->resolve($any);
233 8
                };
234 8
                $dfd->promise()->then($absorber, $absorber);
235
            }
236
            // Add or enqueue cURL handles
237 15
            if (Utils::isCurl($yieldable['value'])) {
238 4
                $this->pool->addOrEnqueue($yieldable['value'], $dfd);
239 4
                continue;
240
            }
241
            // Process generators
242 14
            if (Utils::isGeneratorContainer($yieldable['value'])) {
243 14
                $this->processGeneratorContainer($yieldable['value'], $dfd);
244 11
                continue;
245
            }
246
        }
247 12
        return \mpyw\RuntimePromise\all($promises);
248
    }
249
}
250