Completed
Push — master ( ed5639...43ce2c )
by Ryosuke
03:05
created

Co   A

Complexity

Total Complexity 27

Size/Duplication

Total Lines 248
Duplicated Lines 0 %

Coupling/Cohesion

Components 1
Dependencies 6

Test Coverage

Coverage 100%

Importance

Changes 36
Bugs 10 Features 5
Metric Value
wmc 27
c 36
b 10
f 5
lcom 1
cbo 6
dl 0
loc 248
ccs 97
cts 97
cp 1
rs 10

9 Methods

Rating   Name   Duplication   Size   Complexity  
A setDefaultOptions() 0 4 1
A getDefaultOptions() 0 4 1
A wait() 0 16 2
A async() 0 10 2
A __construct() 0 1 1
A start() 0 21 3
C processGeneratorContainer() 0 73 9
A getApplier() 0 13 3
B promiseAll() 0 29 5
1
<?php
2
3
namespace mpyw\Co;
4
use mpyw\Co\Internal\Utils;
5
use mpyw\Co\Internal\CoOption;
6
use mpyw\Co\Internal\GeneratorContainer;
7
use mpyw\Co\Internal\CURLPool;
8
9
use mpyw\RuntimePromise\Deferred;
10
use mpyw\RuntimePromise\PromiseInterface;
11
12
class Co implements CoInterface
13
{
14
    /**
15
     * Instance of myself.
16
     * @var Co
17
     */
18
    private static $self;
19
20
    /**
21
     * Options.
22
     * @var CoOption
23
     */
24
    private $options;
25
26
    /**
27
     * cURL request pool object.
28
     * @var CURLPool
29
     */
30
    private $pool;
31
32
    /**
33
     * Overwrite CoOption default.
34
     * @param array $options
35
     */
36 1
    public static function setDefaultOptions(array $options)
37 1
    {
38 1
        CoOption::setDefault($options);
39 1
    }
40
41
    /**
42
     * Get CoOption default as array.
43
     * @return array
44
     */
45 1
    public static function getDefaultOptions()
46 1
    {
47 1
        return CoOption::getDefault();
48
    }
49
50
    /**
51
     * Wait until value is recursively resolved to return it.
52
     * This function call must be atomic.
53
     * @param  mixed $value
54
     * @param  array $options
55
     * @return mixed
56
     * @codeCoverageIgnore
57
     */
58
    public static function wait($value, array $options = [])
59
    {
60
        // Coverage analyzer does not support...
61
        //   try { return; } finally { }
62
        try {
63
            if (self::$self) {
64
                throw new \BadMethodCallException('Co::wait() is already running. Use Co::async() instead.');
65
            }
66
            self::$self = new self;
67
            self::$self->options = new CoOption($options);
68
            self::$self->pool = new CURLPool(self::$self->options);
69
            return self::$self->start($value);
70
        } finally {
71
            self::$self = null;
72
        }
73
    }
74
75
    /**
76
     * Value is recursively resolved, but we never wait it.
77
     * This function must be called along with Co::wait().
78
     * @param  mixed $value
79
     * @param  array $options
80
     */
81 3
    public static function async($value, array $options = [])
82 3
    {
83 3
        if (!self::$self) {
84 1
            throw new \BadMethodCallException(
85
                'Co::async() must be called along with Co::wait(). ' .
86 1
                'This method is mainly expected to be used in CURLOPT_WRITEFUNCTION callback.'
87
            );
88
        }
89 2
        self::$self->start($value, false);
90 2
    }
91
92
    /**
93
     * External instantiation is forbidden.
94
     */
95
    private function __construct() {}
96
97
    /**
98
     * Start resovling.
99
     * @param  mixed    $value
100
     * @param  bool     $wait
101
     * @param  mixed    If $wait, return resolved value.
102
     */
103 13
    private function start($value, $wait = true)
104 13
    {
105 13
        $return = $exception = null;
0 ignored issues
show
Unused Code introduced by
$exception is not used, you could remove the assignment.

This check looks for variable assignements that are either overwritten by other assignments or where the variable is not used subsequently.

$myVar = 'Value';
$higher = false;

if (rand(1, 6) > 3) {
    $higher = true;
} else {
    $higher = false;
}

Both the $myVar assignment in line 1 and the $higher assignment in line 2 are dead. The first because $myVar is never used and the second because $higher is always overwritten for every possible time line.

Loading history...
106 13
        $deferred = new Deferred;
107
        // For convenience, all values are wrapped into generator
108
        $genfunc = function () use ($value, &$return) {
109
            try {
110 13
                $return = (yield $value);
111 3
            } catch (\RuntimeException $e) {
1 ignored issue
show
Unused Code introduced by
catch (\RuntimeException...rveHaltException($e); } does not seem to be reachable.

This check looks for unreachable code. It uses sophisticated control flow analysis techniques to find statements which will never be executed.

Unreachable code is most often the result of return, die or exit statements that have been added for debug purposes.

function fx() {
    try {
        doSomething();
        return true;
    }
    catch (\Exception $e) {
        return false;
    }

    return false;
}

In the above example, the last return false will never be executed, because a return statement has already been met in every possible execution path.

Loading history...
112 3
                $this->pool->reserveHaltException($e);
113
            }
114 13
        };
115 13
        $con = Utils::normalize($genfunc, $this->options);
116
        // We have to provide deferred object only if $wait
117 13
        $this->processGeneratorContainer($con, $deferred);
118
        // We have to wait $return only if $wait
119 11
        if ($wait) {
120 11
            $this->pool->wait();
121 9
            return $return;
122
        }
123 2
    }
124
125
    /**
126
     * Handle resolving generators.
127
     * @param  GeneratorContainer $gc
128
     * @param  Deferred           $deferred
129
     */
130 13
    private function processGeneratorContainer(GeneratorContainer $gc, Deferred $deferred)
131 13
    {
132
        // If generator has no more yields...
133 13
        if (!$gc->valid()) {
134
            // If exception has been thrown in generator, we have to propagate it as rejected value
135 11
            if ($gc->thrown()) {
136 3
                $deferred->reject($gc->getReturnOrThrown());
137 3
                return;
138
            }
139
            // Now we normalize returned value
140
            try {
141 11
                $returned = Utils::normalize($gc->getReturnOrThrown(), $gc->getOptions());
142 11
                $yieldables = Utils::getYieldables($returned);
143
                // If normalized value contains yieldables, we have to chain resolver
144 11
                if ($yieldables) {
145 2
                    $this->promiseAll($yieldables, true)->then(
146 2
                        self::getApplier($returned, $yieldables, [$deferred, 'resolve']),
147 2
                        [$deferred, 'reject']
148
                    );
149 2
                    return;
150
                }
151
                // Propagate normalized returned value
152 11
                $deferred->resolve($returned);
153 1
            } catch (\RuntimeException $e) {
154
                // Propagate exception thrown in normalization
155 1
                $deferred->reject($e);
156
            }
157 11
            return;
158
        }
159
160
        // Now we normalize yielded value
161
        try {
162
            // Check delay request yields
163 13
            if ($gc->key() === CoInterface::DELAY) {
164 2
                $dfd = new Deferred;
165 2
                $this->pool->addDelay($gc->current(), $dfd);
166
                $dfd->promise()->then(function () use ($gc) {
167 1
                    $gc->send(null);
168
                })->always(function () use ($gc, $deferred) {
169 1
                    $this->processGeneratorContainer($gc, $deferred);
170 2
                });
171 2
                return;
172
            }
173 13
            $yielded = Utils::normalize($gc->current(), $gc->getOptions(), $gc->key());
174 5
        } catch (\RuntimeException $e) {
175
            // If exception thrown in normalization...
176
            //   - If generator accepts exception, we throw it into generator
177
            //   - If generator does not accept exception, we assume it as non-exception value
178 3
            $gc->throwAcceptable() ? $gc->throw_($e) : $gc->send($e);
179
            // Continue
180 3
            $this->processGeneratorContainer($gc, $deferred);
181 3
            return;
182
        }
183
184
        // Search yieldables from yielded value
185 11
        $yieldables = Utils::getYieldables($yielded);
186 11
        if (!$yieldables) {
187
            // If there are no yieldables, send yielded value back into generator
188 6
            $gc->send($yielded);
189
            // Continue
190 6
            $this->processGeneratorContainer($gc, $deferred);
191 6
            return;
192
        }
193
194
        // Chain resolver
195 10
        $this->promiseAll($yieldables, $gc->throwAcceptable())->then(
196 9
            self::getApplier($yielded, $yieldables, [$gc, 'send']),
197 9
            [$gc, 'throw_']
198
        )->always(function () use ($gc, $deferred) {
199
            // Continue
200 9
            $this->processGeneratorContainer($gc, $deferred);
201 9
        });
202 9
    }
203
204
    /**
205
     * Return function that apply changes in yieldables.
206
     * @param  mixed    $yielded
207
     * @param  array    $yieldables
208
     * @param  callable $next
209
     */
210 9
    private static function getApplier($yielded, $yieldables, callable $next)
211 9
    {
212
        return function (array $results) use ($yielded, $yieldables, $next) {
213 8
            foreach ($results as $hash => $resolved) {
214 8
                $current = &$yielded;
215 8
                foreach ($yieldables[$hash]['keylist'] as $key) {
216 6
                    $current = &$current[$key];
217
                }
218 8
                $current = $resolved;
219
            }
220 8
            $next($yielded);
221 9
        };
222
    }
223
224
    /**
225
     * Promise all changes in yieldables are prepared.
226
     * @param  array $yieldables
227
     * @param  bool  $throw_acceptable
228
     * @return PromiseInterface
229
     */
230 10
    private function promiseAll($yieldables, $throw_acceptable)
231 10
    {
232 10
        $promises = [];
233 10
        foreach ($yieldables as $yieldable) {
234 10
            $dfd = new Deferred;
235 10
            $promises[(string)$yieldable['value']] = $dfd->promise();
236
            // If caller cannot accept exception,
237
            // we handle rejected value as resolved.
238 10
            if (!$throw_acceptable) {
239 6
                $original_dfd = $dfd;
240 6
                $dfd = new Deferred;
241 6
                $absorber = function ($any) use ($original_dfd) {
242 5
                    $original_dfd->resolve($any);
243 6
                };
244 6
                $dfd->promise()->then($absorber, $absorber);
245
            }
246
            // Add or enqueue cURL handles
247 10
            if (Utils::isCurl($yieldable['value'])) {
248 4
                $this->pool->addOrEnqueue($yieldable['value'], $dfd);
249 4
                continue;
250
            }
251
            // Process generators
252 9
            if (Utils::isGeneratorContainer($yieldable['value'])) {
253 9
                $this->processGeneratorContainer($yieldable['value'], $dfd);
254 8
                continue;
255
            }
256
        }
257 9
        return \mpyw\RuntimePromise\all($promises);
258
    }
259
}
260