1
|
|
|
<?php |
2
|
|
|
|
3
|
|
|
namespace mpyw\Co\Internal; |
4
|
|
|
use mpyw\Co\CURLException; |
5
|
|
|
use React\Promise\PromiseInterface; |
6
|
|
|
|
7
|
|
|
abstract class AbstractScheduler |
8
|
|
|
{ |
9
|
|
|
/** |
10
|
|
|
* cURL multi handle. |
11
|
|
|
* @var resource |
12
|
|
|
*/ |
13
|
|
|
protected $mh; |
14
|
|
|
|
15
|
|
|
/** |
16
|
|
|
* Options. |
17
|
|
|
* @var CoOption |
18
|
|
|
*/ |
19
|
|
|
protected $options; |
20
|
|
|
|
21
|
|
|
/** |
22
|
|
|
* cURL handles those have been already dispatched. |
23
|
|
|
* @var array |
24
|
|
|
*/ |
25
|
|
|
protected $added = []; |
26
|
|
|
|
27
|
|
|
/** |
28
|
|
|
* Deferreds. |
29
|
|
|
* @var array |
30
|
|
|
*/ |
31
|
|
|
protected $deferreds = []; |
32
|
|
|
|
33
|
|
|
/** |
34
|
|
|
* Constructor. |
35
|
|
|
* Initialize cURL multi handle. |
36
|
|
|
* @param CoOption $options |
37
|
|
|
* @param resource $mh curl_multi |
38
|
|
|
*/ |
39
|
|
|
abstract public function __construct(CoOption $options, $mh); |
40
|
|
|
|
41
|
|
|
/** |
42
|
|
|
* Call curl_multi_add_handle() or push into queue. |
43
|
|
|
* @param resource $ch |
44
|
|
|
* @return PromiseInterface |
45
|
|
|
*/ |
46
|
|
|
abstract public function add($ch); |
47
|
|
|
|
48
|
|
|
/** |
49
|
|
|
* Are there no cURL handles? |
50
|
|
|
* @return bool |
51
|
|
|
*/ |
52
|
|
|
abstract public function isEmpty(); |
53
|
|
|
|
54
|
|
|
/** |
55
|
|
|
* Do somthing with consumed handle. |
56
|
|
|
*/ |
57
|
|
|
abstract protected function interruptConsume(); |
58
|
|
|
|
59
|
|
|
/** |
60
|
|
|
* Poll completed cURL entries, consume cURL queue and resolve them. |
61
|
|
|
*/ |
62
|
33 |
|
public function consume() |
63
|
33 |
|
{ |
64
|
33 |
|
$entries = $this->readCompletedEntries(); |
65
|
33 |
|
foreach ($entries as $entry) { |
66
|
16 |
|
curl_multi_remove_handle($this->mh, $entry['handle']); |
67
|
16 |
|
unset($this->added[(string)$entry['handle']]); |
68
|
16 |
|
$this->interruptConsume(); |
69
|
|
|
} |
70
|
33 |
|
$this->resolveEntries($entries); |
71
|
33 |
|
} |
72
|
|
|
|
73
|
|
|
/** |
74
|
|
|
* Poll completed cURL entries. |
75
|
|
|
* @return array |
76
|
|
|
*/ |
77
|
33 |
|
protected function readCompletedEntries() |
78
|
33 |
|
{ |
79
|
33 |
|
$entries = []; |
80
|
|
|
// DO NOT call curl_multi_add_handle() until polling done |
81
|
33 |
|
while ($entry = curl_multi_info_read($this->mh)) { |
82
|
16 |
|
$entries[] = $entry; |
83
|
|
|
} |
84
|
33 |
|
return $entries; |
85
|
|
|
} |
86
|
|
|
|
87
|
|
|
/** |
88
|
|
|
* Resolve polled cURLs. |
89
|
|
|
* @param array $entries Polled cURL entries. |
90
|
|
|
*/ |
91
|
33 |
|
protected function resolveEntries(array $entries) |
92
|
33 |
|
{ |
93
|
33 |
|
foreach ($entries as $entry) { |
94
|
16 |
|
$deferred = $this->deferreds[(string)$entry['handle']]; |
95
|
16 |
|
unset($this->deferreds[(string)$entry['handle']]); |
96
|
16 |
|
$entry['result'] === CURLE_OK |
97
|
14 |
|
? $deferred->resolve(curl_multi_getcontent($entry['handle'])) |
98
|
16 |
|
: $deferred->reject(new CURLException(curl_error($entry['handle']), $entry['result'], $entry['handle'])); |
99
|
|
|
} |
100
|
33 |
|
} |
101
|
|
|
} |
102
|
|
|
|