1
|
|
|
<?php |
2
|
|
|
|
3
|
|
|
namespace mpyw\Co\Internal; |
4
|
|
|
use mpyw\Co\CURLException; |
5
|
|
|
use mpyw\RuntimePromise\Deferred; |
6
|
|
|
|
7
|
|
|
abstract class AbstractScheduler |
8
|
|
|
{ |
9
|
|
|
/** |
10
|
|
|
* cURL multi handle. |
11
|
|
|
* @var resource |
12
|
|
|
*/ |
13
|
|
|
protected $mh; |
14
|
|
|
|
15
|
|
|
/** |
16
|
|
|
* Options. |
17
|
|
|
* @var CoOption |
18
|
|
|
*/ |
19
|
|
|
protected $options; |
20
|
|
|
|
21
|
|
|
/** |
22
|
|
|
* cURL handles those have been already dispatched. |
23
|
|
|
* @var array |
24
|
|
|
*/ |
25
|
|
|
protected $added = []; |
26
|
|
|
|
27
|
|
|
/** |
28
|
|
|
* Deferreds. |
29
|
|
|
* @var Deferred |
30
|
|
|
*/ |
31
|
|
|
protected $deferreds = []; |
32
|
|
|
|
33
|
|
|
/** |
34
|
|
|
* Constructor. |
35
|
|
|
* Initialize cURL multi handle. |
36
|
|
|
* @param CoOption $options |
37
|
|
|
* @param resource $mh curl_multi |
38
|
|
|
*/ |
39
|
|
|
abstract public function __construct(CoOption $options, $mh); |
40
|
|
|
|
41
|
|
|
/** |
42
|
|
|
* Call curl_multi_add_handle() or push into queue. |
43
|
|
|
* @param resource $ch |
44
|
|
|
* @param Deferred $deferred |
45
|
|
|
*/ |
46
|
|
|
abstract public function add($ch, Deferred $deferred = null); |
47
|
|
|
|
48
|
|
|
/** |
49
|
|
|
* Are there no cURL handles? |
50
|
|
|
* @return bool |
51
|
|
|
*/ |
52
|
|
|
abstract public function isEmpty(); |
53
|
|
|
|
54
|
|
|
/** |
55
|
|
|
* Do somthing with consumed handle. |
56
|
|
|
* @param array $entry |
57
|
|
|
*/ |
58
|
|
|
abstract protected function interruptConsume(array $entry); |
59
|
|
|
|
60
|
|
|
/** |
61
|
|
|
* Poll completed cURL entries, consume cURL queue and resolve them. |
62
|
|
|
*/ |
63
|
21 |
|
public function consume() |
64
|
21 |
|
{ |
65
|
21 |
|
$entries = $this->readCompletedEntries(); |
66
|
21 |
|
foreach ($entries as $entry) { |
67
|
10 |
|
curl_multi_remove_handle($this->mh, $entry['handle']); |
68
|
10 |
|
unset($this->added[(string)$entry['handle']]); |
69
|
10 |
|
$this->interruptConsume($entry); |
70
|
|
|
} |
71
|
21 |
|
$this->resolveEntries($entries); |
72
|
21 |
|
} |
73
|
|
|
|
74
|
|
|
/** |
75
|
|
|
* Poll completed cURL entries. |
76
|
|
|
* @return array |
77
|
|
|
*/ |
78
|
21 |
|
protected function readCompletedEntries() |
79
|
21 |
|
{ |
80
|
21 |
|
$entries = []; |
81
|
|
|
// DO NOT call curl_multi_add_handle() until polling done |
82
|
21 |
|
while ($entry = curl_multi_info_read($this->mh)) { |
83
|
10 |
|
$entries[] = $entry; |
84
|
|
|
} |
85
|
21 |
|
return $entries; |
86
|
|
|
} |
87
|
|
|
|
88
|
|
|
/** |
89
|
|
|
* Resolve polled cURLs. |
90
|
|
|
* @param array $entries Polled cURL entries. |
91
|
|
|
*/ |
92
|
21 |
|
protected function resolveEntries(array $entries) |
93
|
21 |
|
{ |
94
|
21 |
|
foreach ($entries as $entry) { |
95
|
10 |
|
if (!isset($this->deferreds[(string)$entry['handle']])) { |
96
|
1 |
|
continue; |
97
|
|
|
} |
98
|
9 |
|
$deferred = $this->deferreds[(string)$entry['handle']]; |
99
|
9 |
|
unset($this->deferreds[(string)$entry['handle']]); |
100
|
9 |
|
$entry['result'] === CURLE_OK |
101
|
9 |
|
? $deferred->resolve(curl_multi_getcontent($entry['handle'])) |
102
|
9 |
|
: $deferred->reject(new CURLException(curl_error($entry['handle']), $entry['result'], $entry['handle'])); |
103
|
|
|
} |
104
|
21 |
|
} |
105
|
|
|
} |
106
|
|
|
|