|
1
|
|
|
<?php |
|
2
|
|
|
namespace PHPDaemon\Network; |
|
3
|
|
|
|
|
4
|
|
|
use PHPDaemon\Network; |
|
5
|
|
|
use PHPDaemon\Request; |
|
6
|
|
|
use PHPDaemon\Structures\ObjectStorage; |
|
7
|
|
|
use PHPDaemon\Structures\PriorityQueueCallbacks; |
|
8
|
|
|
|
|
9
|
|
|
/** |
|
10
|
|
|
* Network client pattern |
|
11
|
|
|
* @package PHPDaemon\Network |
|
12
|
|
|
* @author Vasily Zorin <[email protected]> |
|
13
|
|
|
*/ |
|
14
|
|
|
abstract class Client extends Pool |
|
15
|
|
|
{ |
|
16
|
|
|
|
|
17
|
|
|
/** |
|
18
|
|
|
* @var array Array of servers |
|
19
|
|
|
*/ |
|
20
|
|
|
protected $servers = []; |
|
21
|
|
|
|
|
22
|
|
|
/** |
|
23
|
|
|
* @var array Active connections |
|
24
|
|
|
*/ |
|
25
|
|
|
protected $servConn = []; |
|
26
|
|
|
|
|
27
|
|
|
/** |
|
28
|
|
|
* @var array |
|
29
|
|
|
*/ |
|
30
|
|
|
protected $servConnFree = []; |
|
31
|
|
|
|
|
32
|
|
|
/** |
|
33
|
|
|
* @var integer |
|
34
|
|
|
*/ |
|
35
|
|
|
protected $maxConnPerServ = 32; |
|
36
|
|
|
|
|
37
|
|
|
/** |
|
38
|
|
|
* @var boolean |
|
39
|
|
|
*/ |
|
40
|
|
|
protected $acquireOnGet = false; |
|
41
|
|
|
|
|
42
|
|
|
/** |
|
43
|
|
|
* @var array |
|
44
|
|
|
*/ |
|
45
|
|
|
protected $pending = []; |
|
46
|
|
|
|
|
47
|
|
|
|
|
48
|
|
|
/** |
|
49
|
|
|
* Setting default config options |
|
50
|
|
|
* Overriden from ConnectionPool::getConfigDefaults |
|
51
|
|
|
* @return array |
|
52
|
|
|
*/ |
|
53
|
|
|
protected function getConfigDefaults() |
|
54
|
|
|
{ |
|
55
|
|
|
return [ |
|
56
|
|
|
/* [boolean] Expose? */ |
|
57
|
|
|
'expose' => 1, |
|
58
|
|
|
|
|
59
|
|
|
/* [string|array] Default servers */ |
|
60
|
|
|
'servers' => '127.0.0.1', |
|
61
|
|
|
|
|
62
|
|
|
/* [string] Default server */ |
|
63
|
|
|
'server' => '127.0.0.1', |
|
64
|
|
|
|
|
65
|
|
|
/* [integer] Maximum connections per server */ |
|
66
|
|
|
'maxconnperserv' => 32 |
|
67
|
|
|
]; |
|
68
|
|
|
} |
|
69
|
|
|
|
|
70
|
|
|
/** |
|
71
|
|
|
* Applies config |
|
72
|
|
|
* @return void |
|
73
|
|
|
*/ |
|
74
|
|
|
protected function applyConfig() |
|
75
|
|
|
{ |
|
76
|
|
|
parent::applyConfig(); |
|
77
|
|
|
if (isset($this->config->servers)) { |
|
78
|
|
|
$servers = array_filter(array_map('trim', explode(',', $this->config->servers->value)), 'mb_orig_strlen'); |
|
|
|
|
|
|
79
|
|
|
$this->servers = []; |
|
80
|
|
|
foreach ($servers as $s) { |
|
81
|
|
|
$this->addServer($s); |
|
82
|
|
|
} |
|
83
|
|
|
} |
|
84
|
|
|
if (isset($this->config->maxconnperserv)) { |
|
85
|
|
|
$this->maxConnPerServ = $this->config->maxconnperserv->value; |
|
|
|
|
|
|
86
|
|
|
} |
|
87
|
|
|
} |
|
88
|
|
|
|
|
89
|
|
|
/** |
|
90
|
|
|
* Adds server |
|
91
|
|
|
* @param string $url Server URL |
|
92
|
|
|
* @param integer $weight Weight |
|
|
|
|
|
|
93
|
|
|
* @return void |
|
94
|
|
|
*/ |
|
95
|
|
|
public function addServer($url, $weight = null) |
|
96
|
|
|
{ |
|
97
|
|
|
$this->servers[$url] = $weight; |
|
98
|
|
|
} |
|
99
|
|
|
|
|
100
|
|
|
|
|
101
|
|
|
/** |
|
102
|
|
|
* Returns available connection from the pool |
|
103
|
|
|
* @param string $url Address |
|
|
|
|
|
|
104
|
|
|
* @param callback $cb onConnected |
|
|
|
|
|
|
105
|
|
|
* @param integer $pri Optional. Priority |
|
106
|
|
|
* @param \Closure $beforeConnect Called before establishing the connection |
|
|
|
|
|
|
107
|
|
|
* @call ( callable $cb ) |
|
108
|
|
|
* @call ( string $url = null, callable $cb = null, integer $pri = 0 ) |
|
109
|
|
|
* @return boolean Success|Connection |
|
110
|
|
|
*/ |
|
111
|
|
|
public function getConnection($url = null, $cb = null, $pri = 0, \Closure $beforeConnect = null) |
|
112
|
|
|
{ |
|
113
|
|
|
if (!is_string($url) && $url !== null && $cb === null) { // if called getConnection(function....) |
|
114
|
|
|
$cb = $url; |
|
115
|
|
|
$url = null; |
|
116
|
|
|
} |
|
117
|
|
|
if ($url === null) { |
|
118
|
|
|
if (isset($this->config->server->value)) { |
|
119
|
|
|
$url = $this->config->server->value; |
|
|
|
|
|
|
120
|
|
|
} elseif (isset($this->servers) && sizeof($this->servers)) { |
|
121
|
|
|
$url = array_rand($this->servers); |
|
122
|
|
|
} else { |
|
123
|
|
|
if ($cb) { |
|
124
|
|
|
$cb(false); |
|
125
|
|
|
} |
|
126
|
|
|
return false; |
|
127
|
|
|
} |
|
128
|
|
|
} |
|
129
|
|
|
|
|
130
|
|
|
start: |
|
131
|
|
|
$conn = false; |
|
132
|
|
|
if (isset($this->servConn[$url])) { |
|
133
|
|
|
$storage = $this->servConn[$url]; |
|
134
|
|
|
$free = $this->servConnFree[$url]; |
|
135
|
|
|
if ($free->count() > 0) { |
|
136
|
|
|
$conn = $free->getFirst(); |
|
137
|
|
|
if (!$conn->isConnected() || $conn->isFinished()) { |
|
138
|
|
|
$free->detach($conn); |
|
139
|
|
|
goto start; |
|
140
|
|
|
} |
|
141
|
|
|
if ($this->acquireOnGet) { |
|
142
|
|
|
$free->detach($conn); |
|
143
|
|
|
} |
|
144
|
|
|
} elseif ($storage->count() >= $this->maxConnPerServ) { |
|
145
|
|
|
if (!isset($this->pending[$url])) { |
|
146
|
|
|
$this->pending[$url] = new PriorityQueueCallbacks; |
|
147
|
|
|
} |
|
148
|
|
|
$this->pending[$url]->enqueue($cb, $pri); |
|
149
|
|
|
return true; |
|
150
|
|
|
} |
|
151
|
|
|
if ($conn) { |
|
152
|
|
|
if ($cb !== null) { |
|
153
|
|
|
$conn->onConnected($cb); |
|
154
|
|
|
} |
|
155
|
|
|
return true; |
|
156
|
|
|
} |
|
157
|
|
|
} else { |
|
158
|
|
|
$this->servConn[$url] = new ObjectStorage; |
|
159
|
|
|
$this->servConnFree[$url] = new ObjectStorage; |
|
160
|
|
|
} |
|
161
|
|
|
//Daemon::log($url . "\n" . Debug::dump($this->finished) . "\n" . Debug::backtrace(true)); |
|
162
|
|
|
$conn = $this->connect($url, $cb, null, $beforeConnect); |
|
|
|
|
|
|
163
|
|
|
|
|
164
|
|
|
if (!$conn || $conn->isFinished()) { |
|
165
|
|
|
return false; |
|
166
|
|
|
} |
|
167
|
|
|
$this->servConn[$url]->attach($conn); |
|
168
|
|
|
return true; |
|
169
|
|
|
} |
|
170
|
|
|
|
|
171
|
|
|
/** |
|
172
|
|
|
* Detach Connection |
|
173
|
|
|
* @param object $conn Connection |
|
174
|
|
|
* @return void |
|
175
|
|
|
*/ |
|
176
|
|
|
public function detach($conn) |
|
177
|
|
|
{ |
|
178
|
|
|
parent::detach($conn); |
|
179
|
|
|
$this->touchPending($conn->getUrl()); |
|
180
|
|
|
} |
|
181
|
|
|
|
|
182
|
|
|
/** |
|
183
|
|
|
* Mark connection as free |
|
184
|
|
|
* @param ClientConnection $conn Connection |
|
185
|
|
|
* @param string $url URL |
|
186
|
|
|
* @return void |
|
187
|
|
|
*/ |
|
188
|
|
|
public function markConnFree(ClientConnection $conn, $url) |
|
189
|
|
|
{ |
|
190
|
|
|
if (!isset($this->servConnFree[$url])) { |
|
191
|
|
|
return; |
|
192
|
|
|
} |
|
193
|
|
|
$this->servConnFree[$url]->attach($conn); |
|
194
|
|
|
} |
|
195
|
|
|
|
|
196
|
|
|
/** |
|
197
|
|
|
* Mark connection as busy |
|
198
|
|
|
* @param ClientConnection $conn Connection |
|
199
|
|
|
* @param string $url URL |
|
200
|
|
|
* @return void |
|
201
|
|
|
*/ |
|
202
|
|
|
public function markConnBusy(ClientConnection $conn, $url) |
|
203
|
|
|
{ |
|
204
|
|
|
if (!isset($this->servConnFree[$url])) { |
|
205
|
|
|
return; |
|
206
|
|
|
} |
|
207
|
|
|
$this->servConnFree[$url]->detach($conn); |
|
208
|
|
|
} |
|
209
|
|
|
|
|
210
|
|
|
/** |
|
211
|
|
|
* Detaches connection from URL |
|
212
|
|
|
* @param ClientConnection $conn Connection |
|
213
|
|
|
* @param string $url URL |
|
214
|
|
|
* @return void |
|
215
|
|
|
*/ |
|
216
|
|
|
public function detachConnFromUrl(ClientConnection $conn, $url) |
|
217
|
|
|
{ |
|
218
|
|
|
if (isset($this->servConnFree[$url])) { |
|
219
|
|
|
$this->servConnFree[$url]->detach($conn); |
|
220
|
|
|
} |
|
221
|
|
|
if (isset($this->servConn[$url])) { |
|
222
|
|
|
$this->servConn[$url]->detach($conn); |
|
223
|
|
|
} |
|
224
|
|
|
} |
|
225
|
|
|
|
|
226
|
|
|
/** |
|
227
|
|
|
* Touch pending "requests for connection" |
|
228
|
|
|
* @param string $url URL |
|
229
|
|
|
* @return void |
|
230
|
|
|
*/ |
|
231
|
|
|
public function touchPending($url) |
|
232
|
|
|
{ |
|
233
|
|
|
while (isset($this->pending[$url]) && !$this->pending[$url]->isEmpty()) { |
|
234
|
|
|
if (true === $this->getConnection($url, $this->pending[$url]->dequeue())) { |
|
235
|
|
|
return; |
|
236
|
|
|
} |
|
237
|
|
|
} |
|
238
|
|
|
} |
|
239
|
|
|
|
|
240
|
|
|
/** |
|
241
|
|
|
* Returns available connection from the pool by key |
|
242
|
|
|
* @param string $key Key |
|
243
|
|
|
* @param callable $cb Callback |
|
|
|
|
|
|
244
|
|
|
* @callback $cb ( ) |
|
245
|
|
|
* @return boolean Success |
|
246
|
|
|
*/ |
|
247
|
|
|
public function getConnectionByKey($key, $cb = null) |
|
248
|
|
|
{ |
|
249
|
|
|
if (is_object($key)) { |
|
250
|
|
|
return $key->onConnected($cb); |
|
251
|
|
|
} |
|
252
|
|
|
srand(crc32($key)); |
|
253
|
|
|
$addr = array_rand($this->servers); |
|
254
|
|
|
srand(); |
|
255
|
|
|
return $this->getConnection($addr, $cb); |
|
256
|
|
|
} |
|
257
|
|
|
|
|
258
|
|
|
/** |
|
259
|
|
|
* Returns available connection from the pool |
|
260
|
|
|
* @param callable $cb Callback |
|
|
|
|
|
|
261
|
|
|
* @callback $cb ( ) |
|
262
|
|
|
* @return boolean Success |
|
263
|
|
|
*/ |
|
264
|
|
|
public function getConnectionRR($cb = null) |
|
265
|
|
|
{ |
|
266
|
|
|
return $this->getConnection(null, $cb); |
|
267
|
|
|
} |
|
268
|
|
|
|
|
269
|
|
|
/** |
|
270
|
|
|
* Sends a request to arbitrary server |
|
271
|
|
|
* @param string $server Server |
|
272
|
|
|
* @param string $data Data |
|
273
|
|
|
* @param callable $onResponse Called when the request complete |
|
|
|
|
|
|
274
|
|
|
* @callback $onResponse ( ) |
|
275
|
|
|
* @return boolean Success |
|
276
|
|
|
*/ |
|
277
|
|
View Code Duplication |
public function requestByServer($server, $data, $onResponse = null) |
|
|
|
|
|
|
278
|
|
|
{ |
|
279
|
|
|
$this->getConnection($server, function ($conn) use ($data, $onResponse) { |
|
280
|
|
|
if (!$conn->isConnected()) { |
|
281
|
|
|
return; |
|
282
|
|
|
} |
|
283
|
|
|
$conn->onResponse($onResponse); |
|
284
|
|
|
$conn->write($data); |
|
285
|
|
|
}); |
|
286
|
|
|
return true; |
|
287
|
|
|
} |
|
288
|
|
|
|
|
289
|
|
|
/** |
|
290
|
|
|
* Sends a request to server according to the key |
|
291
|
|
|
* @param string $key Key |
|
292
|
|
|
* @param string $data Data |
|
293
|
|
|
* @param callable $onResponse Callback called when the request complete |
|
|
|
|
|
|
294
|
|
|
* @callback $onResponse ( ) |
|
295
|
|
|
* @return boolean Success |
|
296
|
|
|
*/ |
|
297
|
|
View Code Duplication |
public function requestByKey($key, $data, $onResponse = null) |
|
|
|
|
|
|
298
|
|
|
{ |
|
299
|
|
|
$this->getConnectionByKey($key, function ($conn) use ($data, $onResponse) { |
|
300
|
|
|
if (!$conn->isConnected()) { |
|
301
|
|
|
return; |
|
302
|
|
|
} |
|
303
|
|
|
$conn->onResponse($onResponse); |
|
304
|
|
|
$conn->write($data); |
|
305
|
|
|
}); |
|
306
|
|
|
return true; |
|
307
|
|
|
} |
|
308
|
|
|
|
|
309
|
|
|
/** |
|
310
|
|
|
* Called when application instance is going to shutdown |
|
311
|
|
|
* @param boolean $graceful Graceful? |
|
312
|
|
|
* @return boolean Ready to shutdown? |
|
313
|
|
|
*/ |
|
314
|
|
|
public function onShutdown($graceful = false) |
|
315
|
|
|
{ |
|
316
|
|
|
return $graceful ? true : $this->finish(); |
|
317
|
|
|
} |
|
318
|
|
|
} |
|
319
|
|
|
|
An attempt at access to an undefined property has been detected. This may either be a typographical error or the property has been renamed but there are still references to its old name.
If you really want to allow access to undefined properties, you can define magic methods to allow access. See the php core documentation on Overloading.