Completed
Push — master ( 6ae8fd...612217 )
by Ryosuke
03:27
created

Co   A

Complexity

Total Complexity 26

Size/Duplication

Total Lines 241
Duplicated Lines 0 %

Coupling/Cohesion

Components 1
Dependencies 6

Test Coverage

Coverage 87.62%

Importance

Changes 32
Bugs 9 Features 4
Metric Value
wmc 26
c 32
b 9
f 4
lcom 1
cbo 6
dl 0
loc 241
ccs 92
cts 105
cp 0.8762
rs 10

9 Methods

Rating   Name   Duplication   Size   Complexity  
A setDefaultOptions() 0 4 1
A getDefaultOptions() 0 4 1
A wait() 0 14 2
A async() 0 10 2
A __construct() 0 1 1
B start() 0 28 3
C processGeneratorContainer() 0 62 8
A getApplier() 0 13 3
B promiseAll() 0 29 5
1
<?php
2
3
namespace mpyw\Co;
4
use mpyw\Co\Internal\Utils;
5
use mpyw\Co\Internal\CoOption;
6
use mpyw\Co\Internal\GeneratorContainer;
7
use mpyw\Co\Internal\CURLPool;
8
9
use mpyw\RuntimePromise\Deferred;
10
use mpyw\RuntimePromise\PromiseInterface;
11
use function mpyw\RuntimePromise\all;
12
13
class Co implements CoInterface
14
{
15
    /**
16
     * Instance of myself.
17
     * @var Co
18
     */
19
    private static $self;
20
21
    /**
22
     * Options.
23
     * @var CoOption
24
     */
25
    private $options;
26
27
    /**
28
     * cURL request pool object.
29
     * @var CURLPool
30
     */
31
    private $pool;
32
33
    /**
34
     * Overwrite CoOption default.
35
     * @param array $options
36
     */
37
    public static function setDefaultOptions(array $options)
38
    {
39
        CoOption::setDefault($options);
40
    }
41
42
    /**
43
     * Get CoOption default as array.
44
     * @return array
45
     */
46
    public static function getDefaultOptions()
47
    {
48
        return CoOption::getDefault();
49
    }
50
51
    /**
52
     * Wait until value is recursively resolved to return it.
53
     * This function call must be atomic.
54
     * @param  mixed $value
55
     * @param  array $options
56
     * @return mixed
57
     */
58 8
    public static function wait($value, array $options = [])
59 8
    {
60
        try {
61 8
            if (self::$self) {
62
                throw new \BadMethodCallException('Co::wait() is already running. Use Co::async() instead.');
63
            }
64 8
            self::$self = new self;
65 8
            self::$self->options = new CoOption($options);
66 8
            self::$self->pool = new CURLPool(self::$self->options);
67 8
            return self::$self->start($value);
68
        } finally {
69 8
            self::$self = null;
70
        }
71
    }
72
73
    /**
74
     * Value is recursively resolved, but we never wait it.
75
     * This function must be called along with Co::wait().
76
     * @param  mixed $value
77
     * @param  array $options
78
     */
79 2
    public static function async($value, array $options = [])
80 2
    {
81 2
        if (!self::$self) {
82
            throw new \BadMethodCallException(
83
                'Co::async() must be called along with Co::wait(). ' .
84
                'This method is mainly expected to be used in CURLOPT_WRITEFUNCTION callback.'
85
            );
86
        }
87 2
        self::$self->start($value, false);
88 2
    }
89
90
    /**
91
     * External instantiation is forbidden.
92
     */
93
    private function __construct() {}
94
95
    /**
96
     * Start resovling.
97
     * @param  mixed    $value
98
     * @param  bool     $wait
99
     * @param  mixed    If $wait, return resolved value.
100
     */
101 8
    private function start($value, $wait = true)
102 8
    {
103 8
        $return = $exception = null;
104 8
        $deferred = new Deferred;
105 8
        $deferred->promise()->then(
106
            function ($r) use (&$return) {
107 7
                $return = $r;
108 8
            },
109
            function ($e) use (&$exception) {
110 1
                $exception = $e;
111 8
            }
112
        );
113
        // For convenience, all values are wrapped into generator
114
        $genfunc = function () use ($value) {
115 8
            yield CoInterface::RETURN_WITH => (yield $value);
116 8
        };
117 8
        $con = Utils::normalize($genfunc, $this->options);
118
        // We have to provide deferred object only if $wait
119 8
        $this->processGeneratorContainer($con, $deferred);
120
        // We have to wait $return only if $wait
121 7
        if ($wait) {
122 7
            $this->pool->wait();
123 7
            if ($exception) {
124 1
                throw $exception;
125
            }
126 7
            return $return;
127
        }
128 2
    }
129
130
    /**
131
     * Handle resolving generators.
132
     * @param  GeneratorContainer $gc
133
     * @param  Deferred           $deferred
134
     */
135 8
    private function processGeneratorContainer(GeneratorContainer $gc, Deferred $deferred)
136 8
    {
137
        // If generator has no more yields...
138 8
        if (!$gc->valid()) {
139
            // If exception has been thrown in generator, we have to propagate it as rejected value
140 7
            if ($gc->thrown()) {
141 2
                $deferred->reject($gc->getReturnOrThrown());
142 2
                return;
143
            }
144
            // Now we normalize returned value
145
            try {
146 7
                $returned = Utils::normalize($gc->getReturnOrThrown(), $gc->getOptions());
147 7
                $yieldables = Utils::getYieldables($returned);
148
                // If normalized value contains yieldables, we have to chain resolver
149 7
                if ($yieldables) {
150 2
                    $this->promiseAll($yieldables, true)->then(
151 2
                        self::getApplier($returned, $yieldables, [$deferred, 'resolve']),
152 2
                        [$deferred, 'reject']
153
                    );
154 2
                    return;
155
                }
156
                // Propagate normalized returned value
157 7
                $deferred->resolve($returned);
158
            } catch (\RuntimeException $e) {
159
                // Propagate exception thrown in normalization
160
                $deferred->reject($e);
161
            }
162 7
            return;
163
        }
164
165
        // Now we normalize yielded value
166
        try {
167 8
            $yielded = Utils::normalize($gc->current(), $gc->getOptions(), $gc->key());
168 3
        } catch (\RuntimeException $e) {
169
            // If exception thrown in normalization...
170
            //   - If generator accepts exception, we throw it into generator
171
            //   - If generator does not accept exception, we assume it as non-exception value
172 2
            $gc->throwAcceptable() ? $gc->throw_($e) : $gc->send($e);
173
            // Continue
174 2
            $this->processGeneratorContainer($gc, $deferred);
175 2
            return;
176
        }
177
178
        // Search yieldables from yielded value
179 8
        $yieldables = Utils::getYieldables($yielded);
180 8
        if (!$yieldables) {
181
            // If there are no yieldables, send yielded value back into generator
182 4
            $gc->send($yielded);
183
            // Continue
184 4
            $this->processGeneratorContainer($gc, $deferred);
185 4
            return;
186
        }
187
188
        // Chain resolver
189 7
        $this->promiseAll($yieldables, $gc->throwAcceptable())->then(
190 6
            self::getApplier($yielded, $yieldables, [$gc, 'send']),
191 6
            [$gc, 'throw_']
192
        )->always(function () use ($gc, $deferred) {
193
            // Continue
194 6
            $this->processGeneratorContainer($gc, $deferred);
195 6
        });
196 6
    }
197
198
    /**
199
     * Return function that apply changes in yieldables.
200
     * @param  mixed    $yielded
201
     * @param  array    $yieldables
202
     * @param  callable $next
203
     */
204 6
    private static function getApplier($yielded, $yieldables, callable $next)
205 6
    {
206
        return function (array $results) use ($yielded, $yieldables, $next) {
207 6
            foreach ($results as $hash => $resolved) {
208 6
                $current = &$yielded;
209 6
                foreach ($yieldables[$hash]['keylist'] as $key) {
210 5
                    $current = &$current[$key];
211
                }
212 6
                $current = $resolved;
213
            }
214 6
            $next($yielded);
215 6
        };
216
    }
217
218
    /**
219
     * Promise all changes in yieldables are prepared.
220
     * @param  array $yieldables
221
     * @param  bool  $throw_acceptable
222
     * @return PromiseInterface
223
     */
224 7
    private function promiseAll($yieldables, $throw_acceptable)
225 7
    {
226 7
        $promises = [];
227 7
        foreach ($yieldables as $yieldable) {
228 7
            $dfd = new Deferred;
229 7
            $promises[(string)$yieldable['value']] = $dfd->promise();
230
            // If caller cannot accept exception,
231
            // we handle rejected value as resolved.
232 7
            if (!$throw_acceptable) {
233 4
                $original_dfd = $dfd;
234 4
                $dfd = new Deferred;
235 4
                $absorber = function ($any) use ($original_dfd) {
236 3
                    $original_dfd->resolve($any);
237 4
                };
238 4
                $dfd->promise()->then($absorber, $absorber);
239
            }
240
            // Add or enqueue cURL handles
241 7
            if (Utils::isCurl($yieldable['value'])) {
242 4
                $this->pool->addOrEnqueue($yieldable['value'], $dfd);
243 4
                continue;
244
            }
245
            // Process generators
246 6
            if (Utils::isGeneratorContainer($yieldable['value'])) {
247 6
                $this->processGeneratorContainer($yieldable['value'], $dfd);
248 5
                continue;
249
            }
250
        }
251 6
        return all($promises);
252
    }
253
}
254