Completed
Push — master ( 93f1d6...93c8d2 )
by Ryosuke
03:15
created

Co::async()   A

Complexity

Conditions 2
Paths 2

Size

Total Lines 10
Code Lines 6

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 7
CRAP Score 2

Importance

Changes 4
Bugs 0 Features 0
Metric Value
c 4
b 0
f 0
dl 0
loc 10
ccs 7
cts 7
cp 1
rs 9.4285
cc 2
eloc 6
nc 2
nop 2
crap 2
1
<?php
2
3
namespace mpyw\Co;
4
use mpyw\Co\Internal\Utils;
5
use mpyw\Co\Internal\CoOption;
6
use mpyw\Co\Internal\GeneratorContainer;
7
use mpyw\Co\Internal\CURLPool;
8
9
use mpyw\RuntimePromise\Deferred;
10
use mpyw\RuntimePromise\PromiseInterface;
11
use function mpyw\RuntimePromise\all;
12
13
class Co implements CoInterface
14
{
15
    /**
16
     * Instance of myself.
17
     * @var Co
18
     */
19
    private static $self;
20
21
    /**
22
     * Options.
23
     * @var CoOption
24
     */
25
    private $options;
26
27
    /**
28
     * cURL request pool object.
29
     * @var CURLPool
30
     */
31
    private $pool;
32
33
    /**
34
     * Overwrite CoOption default.
35
     * @param array $options
36
     */
37 1
    public static function setDefaultOptions(array $options)
38 1
    {
39 1
        CoOption::setDefault($options);
40 1
    }
41
42
    /**
43
     * Get CoOption default as array.
44
     * @return array
45
     */
46 1
    public static function getDefaultOptions()
47 1
    {
48 1
        return CoOption::getDefault();
49
    }
50
51
    /**
52
     * Wait until value is recursively resolved to return it.
53
     * This function call must be atomic.
54
     * @param  mixed $value
55
     * @param  array $options
56
     * @return mixed
57
     * @codeCoverageIgnore
58
     */
59
    public static function wait($value, array $options = [])
60
    {
61
        // Coverage analyzer does not support...
62
        //   try { return; } finally { }
1 ignored issue
show
Unused Code Comprehensibility introduced by
54% of this comment could be valid code. Did you maybe forget this after debugging?

Sometimes obsolete code just ends up commented out instead of removed. In this case it is better to remove the code once you have checked you do not need it.

The code might also have been commented out for debugging purposes. In this case it is vital that someone uncomments it again or your project may behave in very unexpected ways in production.

This check looks for comments that seem to be mostly valid code and reports them.

Loading history...
63
        try {
64
            if (self::$self) {
65
                throw new \BadMethodCallException('Co::wait() is already running. Use Co::async() instead.');
66
            }
67
            self::$self = new self;
68
            self::$self->options = new CoOption($options);
69
            self::$self->pool = new CURLPool(self::$self->options);
70
            return self::$self->start($value);
71
        } finally {
72
            self::$self = null;
73
        }
74
    }
75
76
    /**
77
     * Value is recursively resolved, but we never wait it.
78
     * This function must be called along with Co::wait().
79
     * @param  mixed $value
80
     * @param  array $options
81
     */
82 3
    public static function async($value, array $options = [])
83 3
    {
84 3
        if (!self::$self) {
85 1
            throw new \BadMethodCallException(
86
                'Co::async() must be called along with Co::wait(). ' .
87 1
                'This method is mainly expected to be used in CURLOPT_WRITEFUNCTION callback.'
88
            );
89
        }
90 2
        self::$self->start($value, false);
91 2
    }
92
93
    /**
94
     * External instantiation is forbidden.
95
     */
96
    private function __construct() {}
97
98
    /**
99
     * Start resovling.
100
     * @param  mixed    $value
101
     * @param  bool     $wait
102
     * @param  mixed    If $wait, return resolved value.
103
     */
104 10
    private function start($value, $wait = true)
105 10
    {
106 10
        $return = $exception = null;
107 10
        $deferred = new Deferred;
108 10
        $deferred->promise()->then(
109
            function ($r) use (&$return) {
110 8
                $return = $r;
111 10
            },
112
            function ($e) use (&$exception) {
113 1
                $exception = $e;
114 10
            }
115
        );
116
        // For convenience, all values are wrapped into generator
117
        $genfunc = function () use ($value) {
118 10
            yield CoInterface::RETURN_WITH => (yield $value);
119 10
        };
120 10
        $con = Utils::normalize($genfunc, $this->options);
121
        // We have to provide deferred object only if $wait
122 10
        $this->processGeneratorContainer($con, $deferred);
123
        // We have to wait $return only if $wait
124 8
        if ($wait) {
125 8
            $this->pool->wait();
126 8
            if ($exception) {
127 1
                throw $exception;
128
            }
129 8
            return $return;
130
        }
131 2
    }
132
133
    /**
134
     * Handle resolving generators.
135
     * @param  GeneratorContainer $gc
136
     * @param  Deferred           $deferred
137
     */
138 10
    private function processGeneratorContainer(GeneratorContainer $gc, Deferred $deferred)
139 10
    {
140
        // If generator has no more yields...
141 10
        if (!$gc->valid()) {
142
            // If exception has been thrown in generator, we have to propagate it as rejected value
143 8
            if ($gc->thrown()) {
144 2
                $deferred->reject($gc->getReturnOrThrown());
145 2
                return;
146
            }
147
            // Now we normalize returned value
148
            try {
149 8
                $returned = Utils::normalize($gc->getReturnOrThrown(), $gc->getOptions());
150 8
                $yieldables = Utils::getYieldables($returned);
151
                // If normalized value contains yieldables, we have to chain resolver
152 8
                if ($yieldables) {
153 2
                    $this->promiseAll($yieldables, true)->then(
154 2
                        self::getApplier($returned, $yieldables, [$deferred, 'resolve']),
155 2
                        [$deferred, 'reject']
156
                    );
157 2
                    return;
158
                }
159
                // Propagate normalized returned value
160 8
                $deferred->resolve($returned);
161 1
            } catch (\RuntimeException $e) {
162
                // Propagate exception thrown in normalization
163 1
                $deferred->reject($e);
164
            }
165 8
            return;
166
        }
167
168
        // Now we normalize yielded value
169
        try {
170 10
            $yielded = Utils::normalize($gc->current(), $gc->getOptions(), $gc->key());
171 4
        } catch (\RuntimeException $e) {
172
            // If exception thrown in normalization...
173
            //   - If generator accepts exception, we throw it into generator
174
            //   - If generator does not accept exception, we assume it as non-exception value
175 2
            $gc->throwAcceptable() ? $gc->throw_($e) : $gc->send($e);
176
            // Continue
177 2
            $this->processGeneratorContainer($gc, $deferred);
178 2
            return;
179
        }
180
181
        // Search yieldables from yielded value
182 9
        $yieldables = Utils::getYieldables($yielded);
183 9
        if (!$yieldables) {
184
            // If there are no yieldables, send yielded value back into generator
185 5
            $gc->send($yielded);
186
            // Continue
187 5
            $this->processGeneratorContainer($gc, $deferred);
188 5
            return;
189
        }
190
191
        // Chain resolver
192 8
        $this->promiseAll($yieldables, $gc->throwAcceptable())->then(
193 7
            self::getApplier($yielded, $yieldables, [$gc, 'send']),
194 7
            [$gc, 'throw_']
195
        )->always(function () use ($gc, $deferred) {
196
            // Continue
197 7
            $this->processGeneratorContainer($gc, $deferred);
198 7
        });
199 7
    }
200
201
    /**
202
     * Return function that apply changes in yieldables.
203
     * @param  mixed    $yielded
204
     * @param  array    $yieldables
205
     * @param  callable $next
206
     */
207 7
    private static function getApplier($yielded, $yieldables, callable $next)
208 7
    {
209
        return function (array $results) use ($yielded, $yieldables, $next) {
210 7
            foreach ($results as $hash => $resolved) {
211 7
                $current = &$yielded;
212 7
                foreach ($yieldables[$hash]['keylist'] as $key) {
213 5
                    $current = &$current[$key];
214
                }
215 7
                $current = $resolved;
216
            }
217 7
            $next($yielded);
218 7
        };
219
    }
220
221
    /**
222
     * Promise all changes in yieldables are prepared.
223
     * @param  array $yieldables
224
     * @param  bool  $throw_acceptable
225
     * @return PromiseInterface
226
     */
227 8
    private function promiseAll($yieldables, $throw_acceptable)
228 8
    {
229 8
        $promises = [];
230 8
        foreach ($yieldables as $yieldable) {
231 8
            $dfd = new Deferred;
232 8
            $promises[(string)$yieldable['value']] = $dfd->promise();
233
            // If caller cannot accept exception,
234
            // we handle rejected value as resolved.
235 8
            if (!$throw_acceptable) {
236 5
                $original_dfd = $dfd;
237 5
                $dfd = new Deferred;
238 5
                $absorber = function ($any) use ($original_dfd) {
239 4
                    $original_dfd->resolve($any);
240 5
                };
241 5
                $dfd->promise()->then($absorber, $absorber);
242
            }
243
            // Add or enqueue cURL handles
244 8
            if (Utils::isCurl($yieldable['value'])) {
245 4
                $this->pool->addOrEnqueue($yieldable['value'], $dfd);
246 4
                continue;
247
            }
248
            // Process generators
249 7
            if (Utils::isGeneratorContainer($yieldable['value'])) {
250 7
                $this->processGeneratorContainer($yieldable['value'], $dfd);
251 6
                continue;
252
            }
253
        }
254 7
        return all($promises);
255
    }
256
}
257