Completed
Push — master ( 0b9fa7...ac04d3 )
by Ryosuke
03:05
created

AbstractScheduler::add()

Size

Total Lines 1

Duplication

Lines 0
Ratio 0 %

Importance

Changes 1
Bugs 0 Features 0
Metric Value
c 1
b 0
f 0
dl 0
loc 1
ccs 0
cts 0
cp 0
nc 1
1
<?php
2
3
namespace mpyw\Co\Internal;
4
use mpyw\Co\CURLException;
5
use mpyw\RuntimePromise\Deferred;
6
7
abstract class AbstractScheduler
8
{
9
    /**
10
     * cURL multi handle.
11
     * @var resource
12
     */
13
    protected $mh;
14
15
    /**
16
     * Options.
17
     * @var CoOption
18
     */
19
    protected $options;
20
21
    /**
22
     * cURL handles those have been already dispatched.
23
     * @var array
24
     */
25
    protected $added = [];
26
27
    /**
28
     * Deferreds.
29
     * @var Deferred
30
     */
31
    protected $deferreds = [];
32
33
    /**
34
     * Constructor.
35
     * Initialize cURL multi handle.
36
     * @param CoOption $options
37
     * @param resource $mh      curl_multi
38
     */
39
    abstract public function __construct(CoOption $options, $mh);
40
41
    /**
42
     * Call curl_multi_add_handle() or push into queue.
43
     * @param resource $ch
44
     * @param Deferred $deferred
45
     */
46
    abstract public function add($ch, Deferred $deferred = null);
47
48
    /**
49
     * Are there no cURL handles?
50
     * @return bool
51
     */
52
    abstract public function isEmpty();
53
54
    /**
55
     * Do somthing with consumed handle.
56
     * @param array $entry
57
     */
58
    abstract protected function interruptConsume(array $entry);
59
60
    /**
61
     * Poll completed cURL entries, consume cURL queue and resolve them.
62
     */
63 21
    public function consume()
64 21
    {
65 21
        $entries = $this->readCompletedEntries();
66 21
        foreach ($entries as $entry) {
67 10
            curl_multi_remove_handle($this->mh, $entry['handle']);
68 10
            unset($this->added[(string)$entry['handle']]);
69 10
            $this->interruptConsume($entry);
70
        }
71 21
        $this->resolveEntries($entries);
72 21
    }
73
74
    /**
75
     * Poll completed cURL entries.
76
     * @return array
77
     */
78 21
    protected function readCompletedEntries()
79 21
    {
80 21
        $entries = [];
81
        // DO NOT call curl_multi_add_handle() until polling done
82 21
        while ($entry = curl_multi_info_read($this->mh)) {
83 10
            $entries[] = $entry;
84
        }
85 21
        return $entries;
86
    }
87
88
    /**
89
     * Resolve polled cURLs.
90
     * @param  array $entries Polled cURL entries.
91
     */
92 21
    protected function resolveEntries(array $entries)
93 21
    {
94 21
        foreach ($entries as $entry) {
95 10
            if (!isset($this->deferreds[(string)$entry['handle']])) {
96 1
                continue;
97
            }
98 9
            $deferred = $this->deferreds[(string)$entry['handle']];
99 9
            unset($this->deferreds[(string)$entry['handle']]);
100 9
            $entry['result'] === CURLE_OK
101 9
                ? $deferred->resolve(curl_multi_getcontent($entry['handle']))
102 9
                : $deferred->reject(new CURLException(curl_error($entry['handle']), $entry['result'], $entry['handle']));
103
        }
104 21
    }
105
}
106