Completed
Push — master ( ec8c5b...0b9fa7 )
by Ryosuke
02:59
created

Scheduler::consume()   A

Complexity

Conditions 3
Paths 3

Size

Total Lines 11
Code Lines 8

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 10
CRAP Score 3

Importance

Changes 1
Bugs 0 Features 1
Metric Value
c 1
b 0
f 1
dl 0
loc 11
ccs 10
cts 10
cp 1
rs 9.4285
cc 3
eloc 8
nc 3
nop 0
crap 3
1
<?php
2
3
namespace mpyw\Co\Internal;
4
use mpyw\Co\CURLException;
5
use mpyw\RuntimePromise\Deferred;
6
7
class Scheduler
8
{
9
    /**
10
     * cURL multi handle.
11
     * @var resource
12
     */
13
    private $mh;
14
15
    /**
16
     * Options.
17
     * @var CoOption
18
     */
19
    private $options;
20
21
    /**
22
     * cURL handles those have not been dispatched.
23
     * @var array
24
     */
25
    private $queue = [];
26
27
    /**
28
     * cURL handles those have been already dispatched.
29
     * @var array
30
     */
31
    private $added = [];
32
33
    /**
34
     * Deferreds.
35
     * @var Deferred
36
     */
37
    private $deferreds = [];
38
39
    /**
40
     * TCP connection counter.
41
     * @var ConnectionCounter
42
     */
43
    private $counter;
44
45
    /**
46
     * Constructor.
47
     * Initialize cURL multi handle.
48
     * @param CoOption $options
49
     * @param resource $mh      curl_multi
50
     */
51 30
    public function __construct(CoOption $options, $mh)
52 30
    {
53 30
        $this->mh = $mh;
54 30
        $this->options = $options;
55 30
        $this->counter = new ConnectionCounter($options);
56 30
    }
57
58
    /**
59
     * Call curl_multi_add_handle() or push into queue.
60
     * @param resource $ch
61
     * @param Deferred $deferred
62
     */
63 10
    public function add($ch, Deferred $deferred = null)
64 10
    {
65 10
        $this->counter->isPoolFilled($ch)
66 2
            ? $this->addReserved($ch, $deferred)
67 10
            : $this->addImmediate($ch, $deferred);
68 10
    }
69
70
    /**
71
     * Are there no cURL handles?
72
     * @return bool
73
     */
74 21
    public function isEmpty()
75 21
    {
76 21
        return !$this->added && !$this->queue;
77
    }
78
79
    /**
80
     * Call curl_multi_add_handle().
81
     * @param resource $ch
82
     * @param Deferred $deferred
83
     */
84 10
    private function addImmediate($ch, Deferred $deferred = null)
85 10
    {
86 10
        $errno = curl_multi_add_handle($this->mh, $ch);
87 10
        if ($errno !== CURLM_OK) {
88
            // @codeCoverageIgnoreStart
89
            $msg = curl_multi_strerror($errno) . ": $ch";
90
            $deferred && $deferred->reject(new \RuntimeException($msg));
91
            return;
92
            // @codeCoverageIgnoreEnd
93
        }
94 10
        $this->added[(string)$ch] = $ch;
95 10
        $this->counter->addDestination($ch);
96 10
        $deferred && $this->deferreds[(string)$ch] = $deferred;
97 10
    }
98
99
    /**
100
     * Push into queue.
101
     * @param resource $ch
102
     * @param Deferred $deferred
103
     */
104 2
    private function addReserved($ch, Deferred $deferred = null)
105 2
    {
106 2
        $this->queue[(string)$ch] = $ch;
107 2
        $deferred && $this->deferreds[(string)$ch] = $deferred;
108 2
    }
109
110
    /**
111
     * Poll completed cURL entries, consume cURL queue and resolve them.
112
     */
113 21
    public function consume()
114 21
    {
115 21
        $entries = $this->readCompletedEntries();
116 21
        foreach ($entries as $entry) {
117 10
            curl_multi_remove_handle($this->mh, $entry['handle']);
118 10
            unset($this->added[(string)$entry['handle']]);
119 10
            $this->counter->removeDestination($entry['handle']);
120 10
            $this->queue && $this->add(array_shift($this->queue));
121
        }
122 21
        $this->resolveEntries($entries);
123 21
    }
124
125
    /**
126
     * Poll completed cURL entries.
127
     * @return array
128
     */
129 21
    private function readCompletedEntries()
130 21
    {
131 21
        $entries = [];
132
        // DO NOT call curl_multi_add_handle() until polling done
133 21
        while ($entry = curl_multi_info_read($this->mh)) {
134 10
            $entries[] = $entry;
135
        }
136 21
        return $entries;
137
    }
138
139
    /**
140
     * Resolve polled cURLs.
141
     * @param  array $entries Polled cURL entries.
142
     */
143 21
    private function resolveEntries(array $entries)
144 21
    {
145 21
        foreach ($entries as $entry) {
146 10
            if (!isset($this->deferreds[(string)$entry['handle']])) {
147 1
                continue;
148
            }
149 9
            $deferred = $this->deferreds[(string)$entry['handle']];
150 9
            unset($this->deferreds[(string)$entry['handle']]);
151 9
            $entry['result'] === CURLE_OK
152 9
                ? $deferred->resolve(curl_multi_getcontent($entry['handle']))
153 9
                : $deferred->reject(new CURLException(curl_error($entry['handle']), $entry['result'], $entry['handle']));
154
        }
155 21
    }
156
}
157