Completed
Push — master ( e2ef70...127eb9 )
by Ryosuke
03:24
created

CURLPool::add()   A

Complexity

Conditions 4
Paths 4

Size

Total Lines 14
Code Lines 9

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 8
CRAP Score 4

Importance

Changes 1
Bugs 0 Features 0
Metric Value
c 1
b 0
f 0
dl 0
loc 14
ccs 8
cts 8
cp 1
rs 9.2
cc 4
eloc 9
nc 4
nop 2
crap 4
1
<?php
2
3
namespace mpyw\Co\Internal;
4
use mpyw\Co\Co;
5
use mpyw\Co\Internal\CoOption;
6
use mpyw\Co\CURLException;
7
use mpyw\RuntimePromise\Deferred;
8
9
class CURLPool
10
{
11
    /**
12
     * Options.
13
     * @var CoOption
14
     */
15
    private $options;
16
17
    /**
18
     * cURL multi handle.
19
     * @var resource
20
     */
21
    private $mh;
22
23
    /**
24
     * cURL handles those have not been dispatched.
25
     * @var array
26
     */
27
    private $queue = [];
28
29
    /**
30
     * cURL handles those have been already dispatched.
31
     * @var array
32
     */
33
    private $added = [];
34
35
    /**
36
     * React Deferreds.
37
     * @var Deferred
38
     */
39
    private $deferreds = [];
40
41
    /**
42
     * Used for halting loop.
43
     * @var \RuntimeException
44
     */
45
    private $haltException;
46
47
    /**
48
     * TCP connection counter.
49
     * @var ConnectionCounter
50
     */
51
    private $counter;
52
53
    /**
54
     * Delay controller.
55
     * @var Delayer
56
     */
57
    private $delayer;
58
59
    /**
60
     * Constructor.
61
     * Initialize cURL multi handle.
62
     * @param CoOption $options
63
     */
64 26
    public function __construct(CoOption $options)
65 26
    {
66 26
        $this->options = $options;
67 26
        $this->counter = new ConnectionCounter($options);
68 26
        $this->delayer = new Delayer;
69 26
        $this->mh = curl_multi_init();
70 26
        $flags = (int)$options['pipeline'] + (int)$options['multiplex'] * 2;
71 26
        curl_multi_setopt($this->mh, CURLMOPT_PIPELINING, $flags);
72 26
    }
73
74
    /**
75
     * Call curl_multi_add_handle() or push into queue.
76
     * @param resource $ch
77
     * @param Deferred $deferred
78
     */
79 13
    public function addOrEnqueue($ch, Deferred $deferred = null)
80 13
    {
81 13
        if (isset($this->added[(string)$ch]) || isset($this->queue[(string)$ch])) {
82 3
            throw new \UnexpectedValueException("The cURL handle is already enqueued: $ch");
83
        }
84 13
        $this->counter->isPoolFilled($ch)
85 3
            ? $this->enqueue($ch, $deferred)
86 13
            : $this->add($ch, $deferred);
87 13
    }
88
89
    /**
90
     * Call curl_multi_add_handle().
91
     * @param resource $ch
92
     * @param Deferred $deferred
93
     */
94 13
    private function add($ch, Deferred $deferred = null)
95 13
    {
96 13
        $errno = curl_multi_add_handle($this->mh, $ch);
97 13
        if ($errno !== CURLM_OK) {
98
            // @codeCoverageIgnoreStart
99
            $msg = curl_multi_strerror($errno) . ": $ch";
100
            $deferred && $deferred->reject(new \RuntimeException($msg));
101
            return;
102
            // @codeCoverageIgnoreEnd
103
        }
104 13
        $this->added[(string)$ch] = $ch;
105 13
        $this->counter->addDestination($ch);
106 13
        $deferred && $this->deferreds[(string)$ch] = $deferred;
107 13
    }
108
109
    /**
110
     * Push into queue.
111
     * @param resource $ch
112
     * @param Deferred $deferred
113
     */
114 3
    private function enqueue($ch, Deferred $deferred = null)
115 3
    {
116 3
        $this->queue[(string)$ch] = $ch;
117 3
        $deferred && $this->deferreds[(string)$ch] = $deferred;
118 3
    }
119
120
    /**
121
     * Add delay.
122
     * @param int      $time
123
     * @param Deferred $deferred
124
     */
125 6
    public function addDelay($time, Deferred $deferred)
126 6
    {
127 6
        $this->delayer->add($time, $deferred);
128 4
    }
129
130
    /**
131
     * Run curl_multi_exec() loop.
132
     */
133 18
    public function wait()
134 18
    {
135 18
        curl_multi_exec($this->mh, $active); // Start requests.
136
        do {
137
            // if cURL handle is running, use curl_multi_select()
138
            // otherwise, just sleep until nearest time
139 18
            $this->added || $this->queue
140 10
                ? curl_multi_select($this->mh, $this->options['interval']) < 0
141 10
                  && usleep($this->options['interval'] * 1000000)
142 9
                : $this->delayer->sleep();
143 18
            curl_multi_exec($this->mh, $active);
144 18
            $entries = $this->consume();
145 18
            $this->delayer->consumeAndResolve();
146 18
            $this->resolve($entries);
147 18
        } while (!$this->haltException && ($this->added || $this->queue || !$this->delayer->empty()));
148 18
        if ($this->haltException) {
149 4
            throw $this->haltException;
150
        }
151 15
    }
152
153
    /**
154
     * Used for halting loop.
155
     */
156 4
    public function reserveHaltException($e)
157 4
    {
158 4
        $this->haltException = $e;
159 4
    }
160
161
    /**
162
     * Poll completed cURL entries and consume cURL queue.
163
     * @return array
164
     */
165 18
    private function consume()
166 18
    {
167 18
        $entries = [];
168
        // DO NOT call curl_multi_add_handle() until polling done
169 18
        while ($entry = curl_multi_info_read($this->mh)) {
170 10
            $entries[] = $entry;
171
        }
172 18
        foreach ($entries as $entry) {
173 10
            curl_multi_remove_handle($this->mh, $entry['handle']);
174 10
            unset($this->added[(string)$entry['handle']]);
175 10
            $this->counter->removeDestination($entry['handle']);
176 10
            $this->queue && $this->addOrEnqueue(array_shift($this->queue));
177
        }
178 18
        return $entries;
179
    }
180
181
    /**
182
     * Resolve polled cURLs.
183
     * @param  array $entries Polled cURL entries.
184
     */
185 18
    private function resolve($entries)
186 18
    {
187 18
        foreach ($entries as $entry) {
188 10
            if (!isset($this->deferreds[(string)$entry['handle']])) {
189 1
                continue;
190
            }
191 9
            $deferred = $this->deferreds[(string)$entry['handle']];
192 9
            unset($this->deferreds[(string)$entry['handle']]);
193 9
            $entry['result'] === CURLE_OK
194 9
                ? $deferred->resolve(curl_multi_getcontent($entry['handle']))
195 9
                : $deferred->reject(new CURLException(curl_error($entry['handle']), $entry['result'], $entry['handle']));
196
        }
197 18
    }
198
}
199