AbstractScheduler::interruptConsume()
last analyzed

Size

Total Lines 1

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
dl 0
loc 1
ccs 0
cts 0
cp 0
c 0
b 0
f 0
nc 1
1
<?php
2
3
namespace mpyw\Co\Internal;
4
use mpyw\Co\CURLException;
5
use React\Promise\PromiseInterface;
6
7
abstract class AbstractScheduler
8
{
9
    /**
10
     * cURL multi handle.
11
     * @var resource
12
     */
13
    protected $mh;
14
15
    /**
16
     * Options.
17
     * @var CoOption
18
     */
19
    protected $options;
20
21
    /**
22
     * cURL handles those have been already dispatched.
23
     * @var array
24
     */
25
    protected $added = [];
26
27
    /**
28
     * Deferreds.
29
     * @var array
30
     */
31
    protected $deferreds = [];
32
33
    /**
34
     * Constructor.
35
     * Initialize cURL multi handle.
36
     * @param CoOption $options
37
     * @param resource $mh      curl_multi
38
     */
39
    abstract public function __construct(CoOption $options, $mh);
40
41
    /**
42
     * Call curl_multi_add_handle() or push into queue.
43
     * @param resource $ch
44
     * @return PromiseInterface
45
     */
46
    abstract public function add($ch);
47
48
    /**
49
     * Are there no cURL handles?
50
     * @return bool
51
     */
52
    abstract public function isEmpty();
53
54
    /**
55
     * Do somthing with consumed handle.
56
     */
57
    abstract protected function interruptConsume();
58
59
    /**
60
     * Poll completed cURL entries, consume cURL queue and resolve them.
61
     */
62 33
    public function consume()
63 33
    {
64 33
        $entries = $this->readCompletedEntries();
65 33
        foreach ($entries as $entry) {
66 16
            curl_multi_remove_handle($this->mh, $entry['handle']);
67 16
            unset($this->added[(string)$entry['handle']]);
68 16
            $this->interruptConsume();
69
        }
70 33
        $this->resolveEntries($entries);
71 33
    }
72
73
    /**
74
     * Poll completed cURL entries.
75
     * @return array
76
     */
77 33
    protected function readCompletedEntries()
78 33
    {
79 33
        $entries = [];
80
        // DO NOT call curl_multi_add_handle() until polling done
81 33
        while ($entry = curl_multi_info_read($this->mh)) {
82 16
            $entries[] = $entry;
83
        }
84 33
        return $entries;
85
    }
86
87
    /**
88
     * Resolve polled cURLs.
89
     * @param  array $entries Polled cURL entries.
90
     */
91 33
    protected function resolveEntries(array $entries)
92 33
    {
93 33
        foreach ($entries as $entry) {
94 16
            $deferred = $this->deferreds[(string)$entry['handle']];
95 16
            unset($this->deferreds[(string)$entry['handle']]);
96 16
            $entry['result'] === CURLE_OK
97 14
                ? $deferred->resolve(curl_multi_getcontent($entry['handle']))
98 16
                : $deferred->reject(new CURLException(curl_error($entry['handle']), $entry['result'], $entry['handle']));
99
        }
100 33
    }
101
}
102