Completed
Push — master ( 93f1d6...93c8d2 )
by Ryosuke
03:15
created

CURLPool::addOrEnqueue()   B

Complexity

Conditions 8
Paths 7

Size

Total Lines 21
Code Lines 14

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 13
CRAP Score 8

Importance

Changes 5
Bugs 1 Features 0
Metric Value
c 5
b 1
f 0
dl 0
loc 21
ccs 13
cts 13
cp 1
rs 7.1428
cc 8
eloc 14
nc 7
nop 2
crap 8
1
<?php
2
3
namespace mpyw\Co\Internal;
4
use mpyw\Co\Co;
5
use mpyw\Co\Internal\CoOption;
6
use mpyw\Co\CURLException;
7
use mpyw\RuntimePromise\Deferred;
8
9
class CURLPool
10
{
11
    /**
12
     * Options.
13
     * @var CoOption
14
     */
15
    private $options;
16
17
    /**
18
     * cURL multi handle.
19
     * @var resource
20
     */
21
    private $mh;
22
23
    /**
24
     * cURL handles those have not been dispatched.
25
     * @var array
26
     */
27
    private $queue = [];
28
29
    /**
30
     * cURL handles those have been already dispatched.
31
     * @var array
32
     */
33
    private $added = [];
34
35
    /**
36
     * React Deferreds.
37
     * @var Deferred
38
     */
39
    private $deferreds = [];
40
41
    /**
42
     * Constructor.
43
     * Initialize cURL multi handle.
44
     * @param CoOption $options
45
     */
46 14
    public function __construct(CoOption $options)
47 14
    {
48 14
        $this->options = $options;
49 14
        $this->mh = curl_multi_init();
50 14
        $flags = (int)$options['pipeline'] + (int)$options['multiplex'] * 2;
51 14
        curl_multi_setopt($this->mh, CURLMOPT_PIPELINING, $flags);
52 14
    }
53
54
    /**
55
     * Call curl_multi_add_handle() or push into queue.
56
     * @param resource $ch
57
     * @param Deferred $deferred
58
     */
59 8
    public function addOrEnqueue($ch, $deferred = null)
60 8
    {
61 8
        if (isset($this->added[(string)$ch]) || isset($this->queue[(string)$ch])) {
62 3
            throw new \InvalidArgumentException("The cURL handle is already enqueued: $ch");
63
        }
64 8
        if (count($this->added) >= $this->options['concurrency']) {
65 2
            $this->queue[(string)$ch] = $ch;
66 2
            $deferred && $this->deferreds[(string)$ch] = $deferred;
67 2
            return;
68
        }
69 8
        $errno = curl_multi_add_handle($this->mh, $ch);
70 8
        if ($errno !== CURLM_OK) {
71
            // @codeCoverageIgnoreStart
72
            $msg = curl_multi_strerror($errno) . ": $ch";
73
            $deferred && $deferred->reject(new \RuntimeException($msg));
74
            return;
75
            // @codeCoverageIgnoreEnd
76
        }
77 8
        $this->added[(string)$ch] = $ch;
78 8
        $deferred && $this->deferreds[(string)$ch] = $deferred;
79 8
    }
80
81
    /**
82
     * Run curl_multi_exec() loop.
83
     */
84 9
    public function wait()
85 9
    {
86 9
        curl_multi_exec($this->mh, $active); // Start requests.
87
        do {
88 9
            curl_multi_select($this->mh, $this->options['interval']); // Wait events.
89 9
            curl_multi_exec($this->mh, $active);
90 9
            foreach ($this->readEntries() as $entry) {
91 5
                $r = $entry['result'] === CURLE_OK
92 5
                    ? curl_multi_getcontent($entry['handle'])
93 5
                    : new CURLException(curl_error($entry['handle']), $entry['result'], $entry['handle']);
94 5
                if (isset($this->deferreds[(string)$entry['handle']])) {
95 5
                    $deferred = $this->deferreds[(string)$entry['handle']];
96 5
                    unset($this->deferreds[(string)$entry['handle']]);
97 5
                    $r instanceof CURLException ? $deferred->reject($r) : $deferred->resolve($r);
98
                }
99
            }
100 9
        } while ($this->added || $this->queue);
101
        // All request must be done when reached here.
102 9
        if ($active) {
103
            // @codeCoverageIgnoreStart
104
            throw new \LogicException('Unreachable statement.');
105
            // @codeCoverageIgnoreEnd
106
        }
107 9
    }
108
109
    /**
110
     * Read completed cURL handles.
111
     * @return array
112
     */
113 9
    private function readEntries()
114 9
    {
115 9
        $entries = [];
116 9
        while ($entry = curl_multi_info_read($this->mh)) {
117 5
            $entries[] = $entry;
118
        }
119 9
        foreach ($entries as $entry) {
120 5
            curl_multi_remove_handle($this->mh, $entry['handle']);
121 5
            unset($this->added[(string)$entry['handle']]);
122 5
            if ($this->queue) {
123 1
                $ch = array_shift($this->queue);
124 5
                $this->addOrEnqueue($ch);
125
            }
126
        }
127 9
        return $entries;
128
    }
129
}
130