Completed
Push — master ( 7857b0...9c5fad )
by Ryosuke
02:51
created

Co::start()   B

Complexity

Conditions 6
Paths 2

Size

Total Lines 25
Code Lines 16

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 16
CRAP Score 6

Importance

Changes 8
Bugs 2 Features 0
Metric Value
c 8
b 2
f 0
dl 0
loc 25
ccs 16
cts 16
cp 1
rs 8.439
cc 6
eloc 16
nc 2
nop 3
crap 6
1
<?php
2
3
namespace mpyw\Co;
4
use mpyw\Co\Internal\Utils;
5
use mpyw\Co\Internal\CoOption;
6
use mpyw\Co\Internal\GeneratorContainer;
7
use mpyw\Co\Internal\CURLPool;
8
9
use mpyw\RuntimePromise\Deferred;
10
use mpyw\RuntimePromise\PromiseInterface;
11
12
class Co implements CoInterface
13
{
14
    /**
15
     * Instance of myself.
16
     * @var Co
17
     */
18
    private static $self;
19
20
    /**
21
     * Options.
22
     * @var CoOption
23
     */
24
    private $options;
25
26
    /**
27
     * cURL request pool object.
28
     * @var CURLPool
29
     */
30
    private $pool;
31
32
    /**
33
     * Overwrite CoOption default.
34
     * @param array $options
35
     */
36 1
    public static function setDefaultOptions(array $options)
37 1
    {
38 1
        CoOption::setDefault($options);
39 1
    }
40
41
    /**
42
     * Get CoOption default as array.
43
     * @return array
44
     */
45 1
    public static function getDefaultOptions()
46 1
    {
47 1
        return CoOption::getDefault();
48
    }
49
50
    /**
51
     * Wait until value is recursively resolved to return it.
52
     * This function call must be atomic.
53
     * @param  mixed $value
54
     * @param  array $options
55
     * @return mixed
56
     */
57 18
    public static function wait($value, array $options = [])
58 18
    {
59
        try {
60 18
            if (self::$self) {
61 1
                throw new \BadMethodCallException('Co::wait() is already running. Use Co::async() instead.');
62
            }
63 18
            self::$self = new self;
64 18
            self::$self->options = new CoOption($options);
65 18
            self::$self->pool = new CURLPool(self::$self->options);
66 18
            return self::$self->start($value);
67
        } finally {
68 18
            self::$self = null;
69
        }
70
        // @codeCoverageIgnoreStart
71
    }
72
    // @codeCoverageIgnoreEnd
73
74
    /**
75
     * Value is recursively resolved, but we never wait it.
76
     * This function must be called along with Co::wait().
77
     * @param  mixed $value
78
     * @param  mixed $throw
79
     */
80 6
    public static function async($value, $throw = null)
81 6
    {
82 6
        if (!self::$self) {
83 1
            throw new \BadMethodCallException('Co::async() must be called along with Co::wait(). ');
84
        }
85 5
        if ($throw !== null) {
86 3
            $throw = filter_var($throw, FILTER_VALIDATE_BOOLEAN, [
87 3
                'flags' => FILTER_NULL_ON_FAILURE,
88
            ]);
89 3
            if ($throw === null) {
90 1
                throw new \InvalidArgumentException("\$throw must be null or boolean.");
91
            }
92
        }
93 4
        self::$self->start($value, false, $throw);
94 4
    }
95
96
    /**
97
     * External instantiation is forbidden.
98
     */
99
    private function __construct() {}
100
101
    /**
102
     * Start resovling.
103
     * @param  mixed    $value
104
     * @param  bool     $wait
105
     * @param  mixed    $throw  Used for Co::async() overrides.
106
     * @param  mixed    If $wait, return resolved value.
107
     */
108 18
    private function start($value, $wait = true, $throw = null)
109 18
    {
110 18
        $deferred = new Deferred;
111
        // For convenience, all values are wrapped into generator
112
        $genfunc = function () use ($throw, $value, &$return) {
113
            try {
114 18
                if ($throw !== null) {
115 2
                    $key = $throw ? null : CoInterface::SAFE;
116
                } else {
117 18
                    $key = $this->options['throw'] ? null : CoInterface::SAFE;
118
                }
119 18
                $return = (yield $key => $value);
120 4
            } catch (\RuntimeException $e) {
121 4
                $this->pool->reserveHaltException($e);
122
            }
123 18
        };
124 18
        $con = Utils::normalize($genfunc);
125
        // We have to provide deferred object only if $wait
126 18
        $this->processGeneratorContainer($con, $deferred);
127
        // We have to wait $return only if $wait
128 15
        if ($wait) {
129 15
            $this->pool->wait();
130 11
            return $return;
131
        }
132 4
    }
133
134
    /**
135
     * Handle resolving generators.
136
     * @param  GeneratorContainer $gc
137
     * @param  Deferred           $deferred
138
     */
139 18
    private function processGeneratorContainer(GeneratorContainer $gc, Deferred $deferred)
140 18
    {
141
        // If generator has no more yields...
142 18
        if (!$gc->valid()) {
143
            // If exception has been thrown in generator, we have to propagate it as rejected value
144 15
            if ($gc->thrown()) {
145 10
                $deferred->reject($gc->getReturnOrThrown());
146 10
                return;
147
            }
148
            // Now we normalize returned value
149 15
            $returned = Utils::normalize($gc->getReturnOrThrown(), $gc->getYieldKey());
150 15
            $yieldables = Utils::getYieldables($returned);
151
            // If normalized value contains yieldables, we have to chain resolver
152 15
            if ($yieldables) {
153 4
                $this->promiseAll($yieldables, true)->then(
154 4
                    self::getApplier($returned, $yieldables, [$deferred, 'resolve']),
155 4
                    [$deferred, 'reject']
156
                );
157 4
                return;
158
            }
159
            // Propagate normalized returned value
160 15
            $deferred->resolve($returned);
161 15
            return;
162
        }
163
164
        // Check delay request yields
165 18
        if ($gc->key() === CoInterface::DELAY) {
166 3
            $dfd = new Deferred;
167 3
            $this->pool->addDelay($gc->current(), $dfd);
168
            $dfd->promise()->then(function () use ($gc) {
169 3
                $gc->send(null);
170
            })->always(function () use ($gc, $deferred) {
171 3
                $this->processGeneratorContainer($gc, $deferred);
172 3
            });
173 3
            return;
174
        }
175
176
        // Now we normalize yielded value
177 18
        $yielded = Utils::normalize($gc->current());
178 18
        $yieldables = Utils::getYieldables($yielded);
179 18
        if (!$yieldables) {
180
            // If there are no yieldables, send yielded value back into generator
181 16
            $gc->send($yielded);
182
            // Continue
183 13
            $this->processGeneratorContainer($gc, $deferred);
184 13
            return;
185
        }
186
187
        // Chain resolver
188 18
        $this->promiseAll($yieldables, $gc->key() !== CoInterface::SAFE)->then(
189 15
            self::getApplier($yielded, $yieldables, [$gc, 'send']),
190 15
            [$gc, 'throw_']
191
        )->always(function () use ($gc, $deferred) {
192
            // Continue
193 15
            $this->processGeneratorContainer($gc, $deferred);
194 15
        });
195 15
    }
196
197
    /**
198
     * Return function that apply changes in yieldables.
199
     * @param  mixed    $yielded
200
     * @param  array    $yieldables
201
     * @param  callable $next
202
     */
203 15
    private static function getApplier($yielded, $yieldables, callable $next)
204 15
    {
205
        return function (array $results) use ($yielded, $yieldables, $next) {
206 12
            foreach ($results as $hash => $resolved) {
207 12
                $current = &$yielded;
208 12
                foreach ($yieldables[$hash]['keylist'] as $key) {
209 6
                    $current = &$current[$key];
210
                }
211 12
                $current = $resolved;
212 12
                unset($current);
213
            }
214 12
            $next($yielded);
215 15
        };
216
    }
217
218
    /**
219
     * Promise all changes in yieldables are prepared.
220
     * @param  array $yieldables
221
     * @param  bool  $throw_acceptable
222
     * @return PromiseInterface
223
     */
224 18
    private function promiseAll($yieldables, $throw_acceptable)
225 18
    {
226 18
        $promises = [];
227 18
        foreach ($yieldables as $yieldable) {
228 18
            $dfd = new Deferred;
229 18
            $promises[(string)$yieldable['value']] = $dfd->promise();
230
            // If caller cannot accept exception,
231
            // we handle rejected value as resolved.
232 18
            if (!$throw_acceptable) {
233 10
                $original_dfd = $dfd;
234 10
                $dfd = new Deferred;
235 10
                $absorber = function ($any) use ($original_dfd) {
236 8
                    $original_dfd->resolve($any);
237 10
                };
238 10
                $dfd->promise()->then($absorber, $absorber);
239
            }
240
            // Add or enqueue cURL handles
241 18
            if (Utils::isCurl($yieldable['value'])) {
242 4
                $this->pool->addOrEnqueue($yieldable['value'], $dfd);
243 4
                continue;
244
            }
245
            // Process generators
246 17
            if (Utils::isGeneratorContainer($yieldable['value'])) {
247 17
                $this->processGeneratorContainer($yieldable['value'], $dfd);
248 14
                continue;
249
            }
250
        }
251 15
        return \mpyw\RuntimePromise\all($promises);
252
    }
253
}
254