Completed
Push — master ( 94b61d...0770f8 )
by Ryosuke
03:14
created

Co::enqueue()   B

Complexity

Conditions 7
Paths 7

Size

Total Lines 21
Code Lines 13

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 14
CRAP Score 7

Importance

Changes 4
Bugs 1 Features 1
Metric Value
c 4
b 1
f 1
dl 0
loc 21
ccs 14
cts 14
cp 1
rs 7.551
cc 7
eloc 13
nc 7
nop 1
crap 7
1
<?php
2
3
namespace mpyw\Co;
4
5
/**
6
 * Asynchronous cURL executor simply based on resource and Generator.
7
 * http://github.com/mpyw/co
8
 *
9
 * @author mpyw
10
 * @license MIT
11
 */
12
13
class Co
14
{
15
16
    /**
17
     * Special constants used for Generator yielding keys.
18
     *
19
     * @const Co::RETURN_WITH  Treat yielded value as returned value.
20
     *                         This is for PHP 5.5 ~ 5.6.
21
     * @const Co::UNSAFE       Allow current yield to throw Exceptions.
22
     * @const Co::SAFE         Forbid current yield to throw Exceptions.
23
     *                         Exceptions are just to be returned.
24
     */
25
    const RETURN_WITH = '__RETURN_WITH__';
26
    const RETURN_ = '__RETURN_WITH__'; // alias
27
    const RET = '__RETURN_WITH__'; // alias
28
    const RTN = '__RETURN_WITH__'; // alias
29
    const UNSAFE = '__UNSAFE__';
30
    const SAFE = '__SAFE__';
31
32
    /**
33
     * Static default options.
34
     */
35
    private static $defaults = array(
36
        'throw' => true, // Throw CURLExceptions?
37
        'pipeline' => false, // Use HTTP/1.1 pipelining?
38
        'multiplex' => true, // Use HTTP/2 multiplexing?
39
        'interval' => 0.5, // curl_multi_select() timeout
40
        'concurrency' => 6, // Limit of TCP connections
41
    );
42
43
    /**
44
     * Execution instance is stored here.
45
     */
46
    private static $self;
47
48
    /**
49
     * Instance properties
50
     *
51
     * *Stack ID* means...
52
     *   - Generator ID
53
     *   - "wait" (Co::wait calls)
54
     *   - "async" (Co::async calls)
55
     */
56
    private $options = array();
57
    private $mh;                          // curl_multi_init()
58
    private $count = 0;                   // count(curl_multi_add_handle called)
59
    private $queue = array();             // cURL resources over concurrency limits are temporalily stored here
60
    private $tree = array();              // array<*Stack ID*, mixed>
61
    private $values = array();            // array<*Stack ID*|*cURL ID*, Generator|resource>
62
    private $value_to_parent = array();   // array<*Stack ID*|*cURL ID*, *Stack ID*>
63
    private $value_to_children = array(); // array<*Stack ID*, array<*Stack ID*|*cURL ID*, true>>
64
    private $value_to_keylist = array();  // array<*Stack ID*|*cURL ID*, array<mixed>>
65
66
    /**
67
     * Override or get default settings.
68
     *
69
     * @access public
70
     * @static
71
     * @param array $options
72
     */
73
    public static function setDefaultOptions(array $options)
74
    {
75
        self::$defaults = self::validateOptions($options);
76
    }
77
    public static function getDefaultOptions()
78
    {
79
        return self::$defaults;
80
    }
81
82
    /**
83
     * Wait all cURL requests to be completed.
84
     * Options override static defaults.
85
     *
86
     * @access public
87
     * @static
88
     * @param mixed $value
89
     * @param array $options
90
     * @see self::__construct()
91
     */
92
    public static function wait($value, array $options = array())
93
    {
94
        $options = self::validateOptions($options) + self::$defaults;
95
        // This function call must be atomic.
96
        try {
97
            if (self::$self) {
98
                throw new \BadMethodCallException(
99
                    'Co::wait() is already running. Use Co::async() instead.'
100
                );
101
            }
102
            self::$self = new self($options);
103
            if (self::$self->initialize($value, 'wait')) {
104
                self::$self->run();
105
            }
106
            $result = self::$self->tree['wait'];
107
            self::$self = null;
108
            return $result;
109
        } catch (\Throwable $e) { } catch (\Exception $e) { } // For both PHP7+ and PHP5
0 ignored issues
show
Coding Style Comprehensibility introduced by
Consider adding a comment why this CATCH block is empty.
Loading history...
110
        self::$self = null;
111
        throw $e;
112
    }
113
114
    /**
115
     * Parallel execution along with Co::async().
116
     * This method is mainly expected to be used in CURLOPT_WRITEFUNCTION callback.
117
     *
118
     * @access public
119
     * @static
120
     * @param mixed $value
121
     * @see self::__construct()
122
     */
123
    public static function async($value)
124
    {
125
        // This function must be called along with Co::wait().
126
        if (!self::$self) {
127
            throw new \BadMethodCallException(
128
                'Co::async() must be called along with Co::wait(). ' .
129
                'This method is mainly expected to be used in CURLOPT_WRITEFUNCTION callback.'
130
            );
131
        }
132
        self::$self->initialize($value, 'async');
133
    }
134
135
    /**
136
     * Internal constructor.
137
     *
138
     * @access private
139
     * @param array $options
140
     * @see self::initialize(), self::run()
141
     */
142 4
    private function __construct(array $options)
143 4
    {
144 4
        $this->mh = curl_multi_init();
145 4
        if (function_exists('curl_multi_setopt')) {
146 4
            $flags = ($options['pipeline'] ? 1 : 0) | ($options['multiplex'] ? 2 : 0);
147 4
            curl_multi_setopt($this->mh, CURLMOPT_PIPELINING, $flags);
148
        }
149 4
        $this->options = $options;
150 4
    }
151
152
    /**
153
     * Call curl_multi_add_handle or push into waiting queue.
154
     *
155
     * @access private
156
     * @param resource $curl
157
     */
158 1
    private function enqueue($curl)
159 1
    {
160 1
        if (!$this->options['concurrency'] || $this->count < $this->options['concurrency']) {
161
            // If within concurrency limit...
162 1
            if (CURLM_OK !== $errno = curl_multi_add_handle($this->mh, $curl)) {
163 1
                $msg = curl_multi_strerror($errno) . ": $curl";
164 1
                $class = $errno === 7 || $errno === CURLE_FAILED_INIT
165 1
                    ? 'InvalidArgumentException'
166 1
                    : 'RuntimeException'
167
                ;
168 1
                throw new $class($msg);
169
            }
170 1
            ++$this->count;
171
        } else {
172
            // Else...
173 1
            if (isset($this->queue[(string)$curl])) {
174 1
                throw new \InvalidArgumentException("The cURL resource is already enqueued: $curl");
175
            }
176 1
            $this->queue[(string)$curl] = $curl;
177
        }
178 1
    }
179
180
    /**
181
     * Set or overwrite tree of return values.
182
     *
183
     * @access private
184
     * @param mixed $value mixed
185
     * @param string $parent_hash *Stack ID*
186
     * @param array $keylist Queue of keys for its hierarchy.
187
     */
188 1
    private function setTree($value, $parent_hash, array $keylist = array())
189 1
    {
190 1
        $current = &$this->tree[$parent_hash];
191 1
        while (null !== $key = array_shift($keylist)) {
192 1
            if (!is_array($current)) {
193 1
                $current = array();
194
            }
195 1
            $current = &$current[$key];
196
        }
197 1
        $current = $value;
198 1
    }
199
200
    /**
201
     * Unset tree of return values.
202
     *
203
     * @access private
204
     * @param string $hash *Stack ID* or *cURL ID*
205
     */
206 1
    private function unsetTree($hash)
207 1
    {
208 1
        if (isset($this->tree[$hash])) {
209 1
            foreach (self::flatten($this->tree[$hash]) as $v) {
210 1
                if (self::isGenerator($v)) {
211 1
                    $this->unsetTree(spl_object_hash($v));
212
                }
213
            }
214 1
            unset($this->tree[$hash]);
215
        }
216 1
    }
217
218
    /**
219
     * Set table of dependencies.
220
     *
221
     * @access private
222
     * @param Generator|resource $value
223
     * @param string $parent_hash *Stack ID* or *cURL ID*
224
     * @param array $keylist Queue of keys for its hierarchy.
225
     */
226 1
    private function setTable($value, $parent_hash, array $keylist = array())
227 1
    {
228 1
        $hash = is_object($value) ? spl_object_hash($value) : (string)$value;
229 1
        $this->values[$hash] = $value;
230 1
        $this->value_to_parent[$hash] = $parent_hash;
231 1
        $this->value_to_children[$parent_hash][$hash] = true;
232 1
        $this->value_to_keylist[$hash] = $keylist;
233 1
    }
234
235
    /**
236
     * Unset table of dependencies.
237
     *
238
     * @access private
239
     * @param string $hash *Stack ID* or *cURL ID*
240
     */
241
    private function unsetTable($hash)
242
    {
243
        $parent_hash = $this->value_to_parent[$hash];
244
        // Clear self table.
245
        unset($this->queue[$hash]);
246
        unset($this->values[$hash]);
247
        unset($this->value_to_parent[$hash]);
248
        unset($this->value_to_keylist[$hash]);
249
        // Clear descendants tables.
250
        // (This is required for cases that
251
        //  some cURL resources are abondoned because of Exceptions thrown)
252
        if (isset($this->value_to_children[$hash])) {
253
            foreach ($this->value_to_children[$hash] as $child => $_) {
254
                $this->unsetTable($child);
255
            }
256
            unset($this->value_to_children[$hash]);
257
        }
258
        // Clear reference from ancestor table.
259
        if (isset($this->value_to_children[$parent_hash][$hash])) {
260
            unset($this->value_to_children[$parent_hash][$hash]);
261
        }
262
    }
263
264
    /**
265
     * Run curl_multi_exec() loop.
266
     *
267
     * @access private
268
     * @see self::updateCurl(), self::enqueue()
269
     */
270
    private function run()
271
    {
272
        curl_multi_exec($this->mh, $active); // Start requests.
273
        do {
274
            curl_multi_select($this->mh, $this->options['interval']); // Wait events.
275
            curl_multi_exec($this->mh, $active); // Update resources.
276
            // NOTE: DO NOT call curl_multi_remove_handle
277
            //       or curl_multi_add_handle while looping curl_multi_info_read!
278
            $entries = array();
279
            do if ($entry = curl_multi_info_read($this->mh, $remains)) {
280
                $entries[] = $entry;
281
            } while ($remains);
282
            // Remove done and consume queue.
283
            foreach ($entries as $entry) {
284
                curl_multi_remove_handle($this->mh, $entry['handle']);
285
                --$this->count;
286
                if ($curl = array_shift($this->queue)) {
287
                    $this->enqueue($curl);
288
                }
289
            }
290
            // Update cURL and Generator stacks.
291
            foreach ($entries as $entry) {
292
                $this->updateCurl($entry['handle'], $entry['result']);
293
            }
294
        } while ($this->count > 0 || $this->queue);
295
        // All request must be done when reached here.
296
        if ($active) {
297
            throw new \LogicException('Unreachable statement.');
298
        }
299
    }
300
301
    /**
302
     * Unset table of dependencies.
303
     *
304
     * @access private
305
     * @param mixed $value
306
     * @param string $parent_hash  *Stack ID* or *cURL ID*
307
     * @param array $keylist       Queue of keys for its hierarchy.
308
     * @return bool                Enqueued?
309
     */
310
    private function initialize($value, $parent_hash, array $keylist = array())
311
    {
312
        $value = self::normalize($value);
313
        // Array or Traversable
314
        if (self::isArrayLike($value)) {
315
            $this->setTree($value, $parent_hash, $keylist);
316
            $enqueued = false;
317
            foreach ($value as $k => $v) {
318
                // Append current key and call recursively
319
                $tmp_keylist = $keylist;
320
                $tmp_keylist[] = $k;
321
                $enqueued = $this->initialize($v, $parent_hash, $tmp_keylist) || $enqueued;
322
            }
323
            return $enqueued;
324
        }
325
        // Generator
326
        if (self::isGenerator($value)) {
327
            $hash = spl_object_hash($value);
328
            if (isset($this->values[$hash])) {
329
                throw new \InvalidArgumentException("The Genertor is already running: #$hash");
330
            }
331
            $this->setTree($value, $parent_hash, $keylist);
332
            while (self::isGeneratorRunning($value)) {
333
                $current = self::normalize($value->current());
334
                // Call recursively
335
                $enqueued = $this->initialize($current, $hash);
336
                if ($enqueued) { // If cURL resource found?
337
                    $this->setTable($value, $parent_hash, $keylist);
338
                    return true;
339
                }
340
                // Search more...
341
                $value->send($current);
342
            }
343
            $value = self::getGeneratorReturn($value);
344
            // Replace current tree with new value
345
            $this->unsetTree($hash);
346
            return $this->initialize($value, $parent_hash, $keylist);
347
        }
348
        // cURL resource
349
        if (self::isCurl($value)) {
350
            $this->enqueue($value);
351
            $this->setTree($value, $parent_hash, $keylist);
352
            $this->setTable($value, $parent_hash, $keylist);
353
            return true;
354
        }
355
        // Other
356
        $this->setTree($value, $parent_hash, $keylist);
357
        return false;
358
    }
359
360
    /**
361
     * Update tree with cURL result.
362
     *
363
     * @access private
364
     * @param resource $value
365
     * @param int $errno
366
     * @see self::updateGenerator()
367
     */
368
    private function updateCurl($value, $errno)
369
    {
370
        $hash = (string)$value;
371
        if (!isset($this->values[$hash])) {
372
            return;
373
        }
374
        $parent_hash = $this->value_to_parent[$hash]; // *Stack ID*
375
        $parent = isset($this->values[$parent_hash]) ? $this->values[$parent_hash] : null; // Generator or null
376
        $keylist = $this->value_to_keylist[$hash];
377
        $result =
378
            $errno === CURLE_OK
379
            ? curl_multi_getcontent($value)
380
            : new CURLException(curl_error($value), $errno, $value)
381
        ;
382
        $this->setTree($result, $parent_hash, $keylist);
383
        $this->unsetTable($hash);
384
        if ($errno !== CURLE_OK && $parent && $this->canThrow($parent)) {// Error and is to be thrown into Generator?
385
            $this->unsetTree($hash); // No more needed
386
            $parent->throw($result);
387
            $this->updateGenerator($parent);
388
        } elseif ($errno !== CURLE_OK && !$parent && $this->options['throw']) { // Error and is to be thrown globally?
389
            $this->unsetTree($hash); // No more needed
390
            throw $result;
391
        } elseif ($parent_hash === 'async') { // Co::async() complete?
392
            $this->unsetTree($hash); // No more needed
393
        } elseif ($parent && !$this->value_to_children[$parent_hash]) { // Generator complete?
394
            $this->unsetTree($hash); // No more needed
395
            $result = $this->tree[$parent_hash];
396
            $parent->send($result);
397
            $this->updateGenerator($parent);
398
        }
399
    }
400
401
    /**
402
     * Check current Generator can throw a CURLException.
403
     *
404
     * @access private
405
     * @param Generator $value
406
     * @return bool
407
     */
408
    private function canThrow(\Generator $value)
409
    {
410
        while (true) {
411
            $key = $value->key();
412
            if ($key === self::SAFE) {
413
                return false;
414
            }
415
            if ($key === self::UNSAFE) {
416
                return true;
417
            }
418
            $parent_hash = $this->value_to_parent[spl_object_hash($value)];
419
            if (!isset($this->values[$parent_hash])) {
420
                return $this->options['throw'];
421
            }
422
            $value = $this->values[$parent_hash];
423
        }
424
    }
425
426
    /**
427
     * Update tree with updateCurl() result.
428
     *
429
     * @access private
430
     * @param Generator $value
431
     */
432
    private function updateGenerator(\Generator $value)
433
    {
434
        $hash = spl_object_hash($value);
435
        if (!isset($this->values[$hash])) {
436
            return;
437
        }
438
        while (self::isGeneratorRunning($value)) {
439
            $current = self::normalize($value->current());
440
            $enqueued = $this->initialize($current, $hash);
441
            if ($enqueued) { // cURL resource found?
442
                return;
443
            }
444
            // Search more...
445
            $value->send($current);
446
        }
447
        $value = self::getGeneratorReturn($value);
448
        $parent_hash = $this->value_to_parent[$hash];
449
        $parent = isset($this->values[$parent_hash]) ? $this->values[$parent_hash] : null;
450
        $keylist = $this->value_to_keylist[$hash];
451
        $this->unsetTable($hash);
452
        $this->unsetTree($hash);
453
        $enqueued = $this->initialize($value, $parent_hash, $keylist);
454
        if (!$enqueued && $parent && !$this->value_to_children[$parent_hash]) { // Generator complete?
455
            // Traverse parent stack.
456
            $next = $this->tree[$parent_hash];
457
            $this->unsetTree($parent_hash);
458
            $parent->send($next);
459
            $this->updateGenerator($parent);
460
        }
461
    }
462
463
    /**
464
     * Validate options.
465
     *
466
     * @access private
467
     * @static
468
     * @param array $options
469
     * @return array
470
     */
471 1
    private static function validateOptions(array $options)
472 1
    {
473 1
        foreach ($options as $key => $value) {
474 1
            if (in_array($key, array('throw', 'pipeline', 'multiplex'), true)) {
475 1
                $value = filter_var($value, FILTER_VALIDATE_BOOLEAN, array(
476 1
                    'flags' => FILTER_NULL_ON_FAILURE,
477
                ));
478 1
                if ($value === null) {
479 1
                    throw new \InvalidArgumentException("Option[$key] must be boolean.");
480
                }
481 1
            } elseif ($key === 'interval') {
482 1
                $value = filter_var($value, FILTER_VALIDATE_FLOAT);
483 1
                if ($value === false || $value < 0.0) {
484 1
                    throw new \InvalidArgumentException("Option[interval] must be positive float or zero.");
485
                }
486 1
            } elseif ($key === 'concurrency') {
487 1
                $value = filter_var($value, FILTER_VALIDATE_INT);
488 1
                if ($value === false || $value < 0) {
489 1
                    throw new \InvalidArgumentException("Option[concurrency] must be positive integer or zero.");
490
                }
491
            } else {
492
                throw new \InvalidArgumentException("Unknown option: $key");
493
            }
494 1
            $options[$key] = $value;
495
        }
496 1
        return $options;
497
    }
498
499
    /**
500
     * Normalize value.
501
     *
502
     * @access private
503
     * @static
504
     * @param mixed $value
505
     * @return miexed
506
     */
507 1
    private static function normalize($value)
508 1
    {
509 1
        while ($value instanceof \Closure) {
510 1
            $value = $value();
511
        }
512 1
        if (self::isArrayLike($value)
513 1
            && !is_array($value)
514 1
            && !$value->valid()) {
515 1
            $value = array();
516
        }
517 1
        return $value;
518
    }
519
520
    /**
521
     * Check if a Generator is running.
522
     * This method supports psuedo return with Co::RETURN_WITH.
523
     *
524
     * @access private
525
     * @static
526
     * @param Generator $value
527
     * @return bool
528
     */
529 1
    private static function isGeneratorRunning(\Generator $value)
530 1
    {
531 1
        $value->current();
532 1
        return $value->valid() && $value->key() !== self::RETURN_WITH; // yield Co::RETURN_WITH => XX
533
    }
534
535
    /**
536
     * Get return value from a Generator.
537
     * This method supports psuedo return with Co::RETURN_WITH.
538
     *
539
     * @access private
540
     * @static
541
     * @param Generator $value
542
     * @return bool
543
     */
544 1
    private static function getGeneratorReturn(\Generator $value)
545 1
    {
546 1
        $value->current();
547 1
        if ($value->valid() && $value->key() === self::RETURN_WITH) {  // yield Co::RETURN_WITH => XX
548 1
            return $value->current();
549
        }
550 1
        if ($value->valid()) {
551 1
            throw new \LogicException('Unreachable statement.');
552
        }
553 1
        return method_exists($value, 'getReturn') ? $value->getReturn() : null;
554
    }
555
556
    /**
557
     * Check if value is a valid cURL resource.
558
     *
559
     * @access private
560
     * @static
561
     * @param mixed $value
562
     * @return bool
563
     */
564 1
    private static function isCurl($value)
565 1
    {
566 1
        return is_resource($value) && get_resource_type($value) === 'curl';
567
    }
568
569
    /**
570
     * Check if value is a valid Generator.
571
     *
572
     * @access private
573
     * @static
574
     * @param mixed $value
575
     * @return bool
576
     */
577 2
    private static function isGenerator($value)
578 2
    {
579 2
        return $value instanceof \Generator;
580
    }
581
582
    /**
583
     * Check if value is a valid array or Traversable, not a Generator.
584
     *
585
     * @access private
586
     * @static
587
     * @param mixed $value
588
     * @return bool
589
     */
590 4
    private static function isArrayLike($value)
591 4
    {
592 4
        return $value instanceof \Traversable && !$value instanceof \Generator
593 4
               || is_array($value);
594
    }
595
596
    /**
597
     * Flatten an array or a Traversable.
598
     *
599
     * @access private
600
     * @static
601
     * @param mixed $value
602
     * @return array
603
     */
604 2
    private static function flatten($value, array &$carry = array())
605 2
    {
606 2
        if (!self::isArrayLike($value)) {
607 2
            $carry[] = $value;
608
        } else {
609 2
            foreach ($value as $v) {
610 2
                self::flatten($v, $carry);
611
            }
612
        }
613 2
        return func_num_args() <= 1 ? $carry : null;
614
    }
615
616
}
617