Completed
Push — master ( f391b7...edf64b )
by Ryosuke
02:42
created

Co   D

Complexity

Total Complexity 110

Size/Duplication

Total Lines 614
Duplicated Lines 0 %

Coupling/Cohesion

Components 1
Dependencies 2

Test Coverage

Coverage 34.57%

Importance

Changes 17
Bugs 7 Features 2
Metric Value
wmc 110
c 17
b 7
f 2
lcom 1
cbo 2
dl 0
loc 614
ccs 93
cts 269
cp 0.3457
rs 4.8159

23 Methods

Rating   Name   Duplication   Size   Complexity  
A setDefaultOptions() 0 4 1
A getDefaultOptions() 0 4 1
B wait() 0 27 5
A async() 0 11 2
A __construct() 0 9 4
C enqueue() 0 23 7
A setTable() 0 8 2
B unsetTable() 0 24 5
D run() 0 30 9
C initialize() 0 49 9
C updateCurl() 0 32 13
B canThrow() 0 17 5
C updateGenerator() 0 30 8
D validateOptions() 0 27 10
B normalize() 0 12 5
A isGeneratorRunning() 0 5 2
B getGeneratorReturn() 0 11 5
A isCurl() 0 4 2
A isGenerator() 0 4 1
A isArrayLike() 0 5 3
A flatten() 0 11 4
A setTree() 0 11 3
A unsetTree() 0 11 4

How to fix   Complexity   

Complex Class

Complex classes like Co often do a lot of different things. To break such a class down, we need to identify a cohesive component within that class. A common approach to find such a component is to look for fields/methods that share the same prefixes, or suffixes. You can also have a look at the cohesion graph to spot any un-connected, or weakly-connected components.

Once you have determined the fields that belong together, you can apply the Extract Class refactoring. If the component makes sense as a sub-class, Extract Subclass is also a candidate, and is often faster.

While breaking up the class, it is a good idea to analyze how other classes use Co, and based on these observations, apply Extract Interface, too.

1
<?php
2
3
namespace mpyw\Co;
4
5
/**
6
 * Asynchronous cURL executor simply based on resource and Generator.
7
 * http://github.com/mpyw/co
8
 *
9
 * @author mpyw
10
 * @license MIT
11
 */
12
13
class Co
14
{
15
16
    /**
17
     * Special constants used for Generator yielding keys.
18
     *
19
     * @const Co::RETURN_WITH  Treat yielded value as returned value.
20
     *                         This is for PHP 5.5 ~ 5.6.
21
     * @const Co::UNSAFE       Allow current yield to throw Exceptions.
22
     * @const Co::SAFE         Forbid current yield to throw Exceptions.
23
     *                         Exceptions are just to be returned.
24
     */
25
    const RETURN_WITH = '__RETURN_WITH__';
26
    const RETURN_ = '__RETURN_WITH__'; // alias
27
    const RET = '__RETURN_WITH__'; // alias
28
    const RTN = '__RETURN_WITH__'; // alias
29
    const UNSAFE = '__UNSAFE__';
30
    const SAFE = '__SAFE__';
31
32
    /**
33
     * Static default options.
34
     */
35
    private static $defaults = array(
36
        'throw' => true, // Throw CURLExceptions?
37
        'pipeline' => false, // Use HTTP/1.1 pipelining?
38
        'multiplex' => true, // Use HTTP/2 multiplexing?
39
        'interval' => 0.5, // curl_multi_select() timeout
40
        'concurrency' => 6, // Limit of TCP connections
41
    );
42
43
    /**
44
     * Execution instance is stored here.
45
     */
46
    private static $self;
47
48
    /**
49
     * Instance properties
50
     *
51
     * *Stack ID* means...
52
     *   - Generator ID
53
     *   - "wait" (Co::wait calls)
54
     *   - "async" (Co::async calls)
55
     */
56
    private $options = array();
57
    private $mh;                          // curl_multi_init()
58
    private $count = 0;                   // count(curl_multi_add_handle called)
59
    private $queue = array();             // cURL resources over concurrency limits are temporalily stored here
60
    private $tree = array();              // array<*Stack ID*, mixed>
61
    private $values = array();            // array<*Stack ID*|*cURL ID*, Generator|resource>
62
    private $value_to_parent = array();   // array<*Stack ID*|*cURL ID*, *Stack ID*>
63
    private $value_to_children = array(); // array<*Stack ID*, array<*Stack ID*|*cURL ID*, true>>
64
    private $value_to_keylist = array();  // array<*Stack ID*|*cURL ID*, array<mixed>>
65
66
    /**
67
     * Override or get default settings.
68
     *
69
     * @access public
70
     * @static
71
     * @param array $options
72
     */
73
    public static function setDefaultOptions(array $options)
74
    {
75
        self::$defaults = self::validateOptions($options);
76
    }
77
    public static function getDefaultOptions()
78
    {
79
        return self::$defaults;
80
    }
81
82
    /**
83
     * Wait all cURL requests to be completed.
84
     * Options override static defaults.
85
     *
86
     * @access public
87
     * @static
88
     * @param mixed $value
89
     * @param array $options
90
     * @see self::__construct()
91
     */
92
    public static function wait($value, array $options = array())
93
    {
94
95
        $options = self::validateOptions($options) + self::$defaults;
96
        // This function call must be atomic.
97
        try {
98
            if (self::$self) {
99
                throw new \BadMethodCallException(
100
                    'Co::wait() is already running. Use Co::async() instead.'
101
                );
102
            }
103
            self::$self = new self($options);
104
            $enqueued = self::$self->initialize($value, 'wait');
105
            if ($enqueued) {
106
                self::$self->run();
107
            }
108
            $result = self::$self->tree['wait'];
109
            self::$self = null;
110
            return $result;
111
        } catch (\Throwable $e) { // for PHP 7+
112
            self::$self = null;
113
            throw $e;
114
        } catch (\Exception $e) { // for PHP 5
115
            self::$self = null;
116
            throw $e;
117
        }
118
    }
119
120
    /**
121
     * Parallel execution along with Co::async().
122
     * This method is mainly expected to be used in CURLOPT_WRITEFUNCTION callback.
123
     *
124
     * @access public
125
     * @static
126
     * @param mixed $value
127
     * @see self::__construct()
128
     */
129
    public static function async($value)
130
    {
131
        // This function must be called along with Co::wait().
132
        if (!self::$self) {
133
            throw new \BadMethodCallException(
134
                'Co::async() must be called along with Co::wait(). ' .
135
                'This method is mainly expected to be used in CURLOPT_WRITEFUNCTION callback.'
136
            );
137
        }
138
        self::$self->initialize($value, 'async');
139
    }
140
141
    /**
142
     * Internal constructor.
143
     *
144
     * @access private
145
     * @param array $options
146
     * @see self::initialize(), self::run()
147
     */
148 3
    private function __construct(array $options)
149 3
    {
150 3
        $this->mh = curl_multi_init();
151 3
        if (function_exists('curl_multi_setopt')) {
152 3
            $flags = ($options['pipeline'] ? 1 : 0) | ($options['multiplex'] ? 2 : 0);
153 3
            curl_multi_setopt($this->mh, CURLMOPT_PIPELINING, $flags);
154
        }
155 3
        $this->options = $options;
156 3
    }
157
158
    /**
159
     * Call curl_multi_add_handle or push into waiting queue.
160
     *
161
     * @access private
162
     * @param resource $curl
163
     */
164 1
    private function enqueue($curl)
165 1
    {
166 1
        if (!$this->options['concurrency'] || $this->count < $this->options['concurrency']) {
167
            // If within concurrency limit...
168 1
            if (CURLM_OK !== $errno = curl_multi_add_handle($this->mh, $curl)) {
169 1
                $msg = curl_multi_strerror($errno) . ": $curl";
170 1
                if ($errno === 7 || $errno === CURLE_FAILED_INIT) {
171
                    // These errors are caused by users mistake.
172 1
                    throw new \InvalidArgumentException($msg);
173
                } else {
174
                    // These errors are by internal reason.
175
                    throw new \RuntimeException($msg);
176
                }
177
            }
178 1
            ++$this->count;
179
        } else {
180
            // Else...
181 1
            if (isset($this->queue[(string)$curl])) {
182 1
                throw new \InvalidArgumentException("The cURL resource is already enqueued: $curl");
183
            }
184 1
            $this->queue[(string)$curl] = $curl;
185
        }
186 1
    }
187
188
    /**
189
     * Set or overwrite tree of return values.
190
     *
191
     * @access private
192
     * @param mixed $value mixed
193
     * @param string $parent_hash *Stack ID*
194
     * @param array $keylist Queue of keys for its hierarchy.
195
     */
196 1
    private function setTree($value, $parent_hash, array $keylist = array())
197 1
    {
198 1
        $current = &$this->tree[$parent_hash];
199 1
        while (null !== $key = array_shift($keylist)) {
200 1
            if (!is_array($current)) {
201 1
                $current = array();
202
            }
203 1
            $current = &$current[$key];
204
        }
205 1
        $current = $value;
206 1
    }
207
208
    /**
209
     * Unset tree of return values.
210
     *
211
     * @access private
212
     * @param string $hash *Stack ID* or *cURL ID*
213
     */
214 1
    private function unsetTree($hash)
215 1
    {
216 1
        if (isset($this->tree[$hash])) {
217 1
            foreach (self::flatten($this->tree[$hash]) as $v) {
218 1
                if (self::isGenerator($v)) {
219 1
                    $this->unsetTree(spl_object_hash($v));
220
                }
221
            }
222 1
            unset($this->tree[$hash]);
223
        }
224 1
    }
225
226
    /**
227
     * Set table of dependencies.
228
     *
229
     * @access private
230
     * @param Generator|resource $value
231
     * @param string $parent_hash *Stack ID* or *cURL ID*
232
     * @param array $keylist Queue of keys for its hierarchy.
233
     */
234
    private function setTable($value, $parent_hash, array $keylist = array())
235
    {
236
        $hash = is_object($value) ? spl_object_hash($value) : (string)$value;
237
        $this->values[$hash] = $value;
238
        $this->value_to_parent[$hash] = $parent_hash;
239
        $this->value_to_children[$parent_hash][$hash] = true;
240
        $this->value_to_keylist[$hash] = $keylist;
241
    }
242
243
    /**
244
     * Unset table of dependencies.
245
     *
246
     * @access private
247
     * @param string $hash *Stack ID* or *cURL ID*
248
     */
249
    private function unsetTable($hash)
250
    {
251
        $parent_hash = $this->value_to_parent[$hash];
252
        // Clear self table.
253
        if (isset($this->queue[$hash])) {
254
            unset($this->queue[$hash]);
255
        }
256
        unset($this->values[$hash]);
257
        unset($this->value_to_parent[$hash]);
258
        unset($this->value_to_keylist[$hash]);
259
        // Clear descendants tables.
260
        // (This is required for cases that
261
        //  some cURL resources are abondoned because of Exceptions thrown)
262
        if (isset($this->value_to_children[$hash])) {
263
            foreach ($this->value_to_children[$hash] as $child => $_) {
264
                $this->unsetTable($child);
265
            }
266
            unset($this->value_to_children[$hash]);
267
        }
268
        // Clear reference from ancestor table.
269
        if (isset($this->value_to_children[$parent_hash][$hash])) {
270
            unset($this->value_to_children[$parent_hash][$hash]);
271
        }
272
    }
273
274
    /**
275
     * Run curl_multi_exec() loop.
276
     *
277
     * @access private
278
     * @see self::updateCurl(), self::enqueue()
279
     */
280
    private function run()
281
    {
282
        curl_multi_exec($this->mh, $active); // Start requests.
283
        do {
284
            curl_multi_select($this->mh, $this->options['interval']); // Wait events.
285
            curl_multi_exec($this->mh, $active); // Update resources.
286
            // NOTE: DO NOT call curl_multi_remove_handle
287
            //       or curl_multi_add_handle while looping curl_multi_info_read!
288
            $entries = array();
289
            do if ($entry = curl_multi_info_read($this->mh, $remains)) {
290
                $entries[] = $entry;
291
            } while ($remains);
292
            // Remove done and consume queue.
293
            foreach ($entries as $entry) {
294
                curl_multi_remove_handle($this->mh, $entry['handle']);
295
                --$this->count;
296
                if ($curl = array_shift($this->queue)) {
297
                    $this->enqueue($curl);
298
                }
299
            }
300
            // Update cURL and Generator stacks.
301
            foreach ($entries as $entry) {
302
                $this->updateCurl($entry['handle'], $entry['result']);
303
            }
304
        } while ($this->count > 0 || $this->queue);
305
        // All request must be done when reached here.
306
        if ($active) {
307
            throw new \LogicException('Unreachable statement.');
308
        }
309
    }
310
311
    /**
312
     * Unset table of dependencies.
313
     *
314
     * @access private
315
     * @param mixed $value
316
     * @param string $parent_hash  *Stack ID* or *cURL ID*
317
     * @param array $keylist       Queue of keys for its hierarchy.
318
     * @return bool                Enqueued?
319
     */
320
    private function initialize($value, $parent_hash, array $keylist = array())
321
    {
322
        $value = self::normalize($value);
323
        // Array or Traversable
324
        if (self::isArrayLike($value)) {
325
            $this->setTree($value, $parent_hash, $keylist);
326
            $enqueued = false;
327
            foreach ($value as $k => $v) {
328
                // Append current key and call recursively
329
                $tmp_keylist = $keylist;
330
                $tmp_keylist[] = $k;
331
                $enqueued = $this->initialize($v, $parent_hash, $tmp_keylist) || $enqueued;
332
            }
333
            return $enqueued;
334
        }
335
        // Generator
336
        if (self::isGenerator($value)) {
337
            $hash = spl_object_hash($value);
338
            if (isset($this->values[$hash])) {
339
                throw new \InvalidArgumentException("The Genertor is already running: #$hash");
340
            }
341
            $this->setTree($value, $parent_hash, $keylist);
342
            while (self::isGeneratorRunning($value)) {
343
                $current = self::normalize($value->current());
344
                // Call recursively
345
                $enqueued = $this->initialize($current, $hash);
346
                if ($enqueued) { // If cURL resource found?
347
                    $this->setTable($value, $parent_hash, $keylist);
348
                    return true;
349
                }
350
                // Search more...
351
                $value->send($current);
352
            }
353
            $value = self::getGeneratorReturn($value);
354
            // Replace current tree with new value
355
            $this->unsetTree($hash);
356
            return $this->initialize($value, $parent_hash, $keylist);
357
        }
358
        // cURL resource
359
        if (self::isCurl($value)) {
360
            $this->enqueue($value);
361
            $this->setTree($value, $parent_hash, $keylist);
362
            $this->setTable($value, $parent_hash, $keylist);
363
            return true;
364
        }
365
        // Other
366
        $this->setTree($value, $parent_hash, $keylist);
367
        return false;
368
    }
369
370
    /**
371
     * Update tree with cURL result.
372
     *
373
     * @access private
374
     * @param resource $value
375
     * @param int $errno
376
     * @see self::updateGenerator()
377
     */
378
    private function updateCurl($value, $errno)
379
    {
380
        $hash = (string)$value;
381
        if (!isset($this->values[$hash])) {
382
            return;
383
        }
384
        $parent_hash = $this->value_to_parent[$hash]; // *Stack ID*
385
        $parent = isset($this->values[$parent_hash]) ? $this->values[$parent_hash] : null; // Generator or null
386
        $keylist = $this->value_to_keylist[$hash];
387
        $result =
388
            $errno === CURLE_OK
389
            ? curl_multi_getcontent($value)
390
            : new CURLException(curl_error($value), $errno, $value)
391
        ;
392
        $this->setTree($result, $parent_hash, $keylist);
393
        $this->unsetTable($hash);
394
        if ($errno !== CURLE_OK && $parent && $this->canThrow($parent)) {// Error and is to be thrown into Generator?
395
            $this->unsetTree($hash); // No more needed
396
            $parent->throw($result);
397
            $this->updateGenerator($parent);
398
        } elseif ($errno !== CURLE_OK && !$parent && $this->options['throw']) { // Error and is to be thrown globally?
399
            $this->unsetTree($hash); // No more needed
400
            throw $result;
401
        } elseif ($parent_hash === 'async') { // Co::async() complete?
402
            $this->unsetTree($hash); // No more needed
403
        } elseif ($parent && !$this->value_to_children[$parent_hash]) { // Generator complete?
404
            $this->unsetTree($hash); // No more needed
405
            $result = $this->tree[$parent_hash];
406
            $parent->send($result);
407
            $this->updateGenerator($parent);
408
        }
409
    }
410
411
    /**
412
     * Check current Generator can throw a CURLException.
413
     *
414
     * @access private
415
     * @param Generator $value
416
     * @return bool
417
     */
418
    private function canThrow(\Generator $value)
419
    {
420
        while (true) {
421
            $key = $value->key();
422
            if ($key === self::SAFE) {
423
                return false;
424
            }
425
            if ($key === self::UNSAFE) {
426
                return true;
427
            }
428
            $parent_hash = $this->value_to_parent[spl_object_hash($value)];
429
            if (!isset($this->values[$parent_hash])) {
430
                return $this->options['throw'];
431
            }
432
            $value = $this->values[$parent_hash];
433
        }
434
    }
435
436
    /**
437
     * Update tree with updateCurl() result.
438
     *
439
     * @access private
440
     * @param Generator $value
441
     */
442
    private function updateGenerator(\Generator $value)
443
    {
444
        $hash = spl_object_hash($value);
445
        if (!isset($this->values[$hash])) {
446
            return;
447
        }
448
        while (self::isGeneratorRunning($value)) {
449
            $current = self::normalize($value->current());
450
            $enqueued = $this->initialize($current, $hash);
451
            if ($enqueued) { // cURL resource found?
452
                return;
453
            }
454
            // Search more...
455
            $value->send($current);
456
        }
457
        $value = self::getGeneratorReturn($value);
458
        $parent_hash = $this->value_to_parent[$hash];
459
        $parent = isset($this->values[$parent_hash]) ? $this->values[$parent_hash] : null;
460
        $keylist = $this->value_to_keylist[$hash];
461
        $this->unsetTable($hash);
462
        $this->unsetTree($hash);
463
        $enqueued = $this->initialize($value, $parent_hash, $keylist);
464
        if (!$enqueued && $parent && !$this->value_to_children[$parent_hash]) { // Generator complete?
465
            // Traverse parent stack.
466
            $next = $this->tree[$parent_hash];
467
            $this->unsetTree($parent_hash);
468
            $parent->send($next);
469
            $this->updateGenerator($parent);
470
        }
471
    }
472
473
    /**
474
     * Validate options.
475
     *
476
     * @access private
477
     * @static
478
     * @param array $options
479
     * @return array
480
     */
481 1
    private static function validateOptions(array $options)
482 1
    {
483 1
        foreach ($options as $key => $value) {
484 1
            if (in_array($key, array('throw', 'pipeline', 'multiplex'), true)) {
485 1
                $value = filter_var($value, FILTER_VALIDATE_BOOLEAN, array(
486 1
                    'flags' => FILTER_NULL_ON_FAILURE,
487
                ));
488 1
                if ($value === null) {
489 1
                    throw new \InvalidArgumentException("Option[$key] must be boolean.");
490
                }
491 1
            } elseif ($key === 'interval') {
492 1
                $value = filter_var($value, FILTER_VALIDATE_FLOAT);
493 1
                if ($value === false || $value < 0.0) {
494 1
                    throw new \InvalidArgumentException("Option[interval] must be positive float or zero.");
495
                }
496 1
            } elseif ($key === 'concurrency') {
497 1
                $value = filter_var($value, FILTER_VALIDATE_INT);
498 1
                if ($value === false || $value < 0) {
499 1
                    throw new \InvalidArgumentException("Option[concurrency] must be positive integer or zero.");
500
                }
501
            } else {
502
                throw new \InvalidArgumentException("Unknown option: $key");
503
            }
504 1
            $options[$key] = $value;
505
        }
506 1
        return $options;
507
    }
508
509
    /**
510
     * Normalize value.
511
     *
512
     * @access private
513
     * @static
514
     * @param mixed $value
515
     * @return miexed
516
     */
517 1
    private static function normalize($value)
518 1
    {
519 1
        while ($value instanceof \Closure) {
520 1
            $value = $value();
521
        }
522 1
        if (self::isArrayLike($value)
523 1
            && !is_array($value)
524 1
            && !$value->valid()) {
525 1
            $value = array();
526
        }
527 1
        return $value;
528
    }
529
530
    /**
531
     * Check if a Generator is running.
532
     * This method supports psuedo return with Co::RETURN_WITH.
533
     *
534
     * @access private
535
     * @static
536
     * @param Generator $value
537
     * @return bool
538
     */
539 1
    private static function isGeneratorRunning(\Generator $value)
540 1
    {
541 1
        $value->current();
542 1
        return $value->valid() && $value->key() !== self::RETURN_WITH; // yield Co::RETURN_WITH => XX
543
    }
544
545
    /**
546
     * Get return value from a Generator.
547
     * This method supports psuedo return with Co::RETURN_WITH.
548
     *
549
     * @access private
550
     * @static
551
     * @param Generator $value
552
     * @return bool
553
     */
554 1
    private static function getGeneratorReturn(\Generator $value)
555 1
    {
556 1
        $value->current();
557 1
        if ($value->valid() && $value->key() === self::RETURN_WITH) {  // yield Co::RETURN_WITH => XX
558 1
            return $value->current();
559
        }
560 1
        if ($value->valid()) {
561 1
            throw new \LogicException('Unreachable statement.');
562
        }
563 1
        return method_exists($value, 'getReturn') ? $value->getReturn() : null;
564
    }
565
566
    /**
567
     * Check if value is a valid cURL resource.
568
     *
569
     * @access private
570
     * @static
571
     * @param mixed $value
572
     * @return bool
573
     */
574 1
    private static function isCurl($value)
575 1
    {
576 1
        return is_resource($value) && get_resource_type($value) === 'curl';
577
    }
578
579
    /**
580
     * Check if value is a valid Generator.
581
     *
582
     * @access private
583
     * @static
584
     * @param mixed $value
585
     * @return bool
586
     */
587 2
    private static function isGenerator($value)
588 2
    {
589 2
        return $value instanceof \Generator;
590
    }
591
592
    /**
593
     * Check if value is a valid array or Traversable, not a Generator.
594
     *
595
     * @access private
596
     * @static
597
     * @param mixed $value
598
     * @return bool
599
     */
600 4
    private static function isArrayLike($value)
601 4
    {
602 4
        return $value instanceof \Traversable && !$value instanceof \Generator
603 4
               || is_array($value);
604
    }
605
606
    /**
607
     * Flatten an array or a Traversable.
608
     *
609
     * @access private
610
     * @static
611
     * @param mixed $value
612
     * @return array
613
     */
614 2
    private static function flatten($value, array &$carry = array())
615 2
    {
616 2
        if (!self::isArrayLike($value)) {
617 2
            $carry[] = $value;
618
        } else {
619 2
            foreach ($value as $v) {
620 2
                self::flatten($v, $carry);
621
            }
622
        }
623 2
        return func_num_args() <= 1 ? $carry : null;
624
    }
625
626
}
627