1
|
|
|
<?php |
2
|
|
|
|
3
|
|
|
namespace Kaliop\Queueing\Plugins\StompBundle\Adapter\Stomp; |
4
|
|
|
|
5
|
|
|
use FuseSource\Stomp\Stomp as BaseClient; |
6
|
|
|
use FuseSource\Stomp\Frame; |
7
|
|
|
use FuseSource\Stomp\Message\Map; |
8
|
|
|
use FuseSource\Stomp\Exception\StompException; |
9
|
|
|
use Kaliop\QueueingBundle\Adapter\ForcedStopException; |
10
|
|
|
|
11
|
|
|
class Client extends BaseClient |
12
|
|
|
{ |
13
|
|
|
public $debug = false; |
14
|
|
|
protected $forceStop = false; |
15
|
|
|
protected $forceStopReason; |
16
|
|
|
protected $dispatchSignals = false; |
17
|
|
|
|
18
|
|
|
/** |
19
|
|
|
* Connect to server. Reimplemented to sniff out Apollo |
20
|
|
|
* |
21
|
|
|
* @param string $username |
22
|
|
|
* @param string $password |
23
|
|
|
* @return boolean |
24
|
|
|
* @throws StompException |
25
|
|
|
*/ |
26
|
|
|
public function connect ($username = '', $password = '') |
27
|
|
|
{ |
28
|
|
|
$this->_makeConnection(); |
29
|
|
|
if ($username != '') { |
30
|
|
|
$this->_username = $username; |
31
|
|
|
} |
32
|
|
|
if ($password != '') { |
33
|
|
|
$this->_password = $password; |
34
|
|
|
} |
35
|
|
|
$headers = array('login' => $this->_username , 'passcode' => $this->_password); |
36
|
|
|
if ($this->clientId != null) { |
37
|
|
|
$headers["client-id"] = $this->clientId; |
38
|
|
|
} |
39
|
|
|
$frame = new Frame("CONNECT", $headers); |
40
|
|
|
$this->_writeFrame($frame); |
41
|
|
|
$frame = $this->readFrame(); |
42
|
|
|
|
43
|
|
|
if ($frame instanceof Frame && $frame->command == 'CONNECTED') { |
44
|
|
|
$this->_sessionId = $frame->headers["session"]; |
45
|
|
|
if (isset($frame->headers['server']) && false !== stristr(trim($frame->headers['server']), 'rabbitmq')) { |
46
|
|
|
$this->brokerVendor = 'RMQ'; |
47
|
|
|
} |
48
|
|
|
if (isset($frame->headers['server']) && false !== strpos($frame->headers['server'], 'apache-apollo/')) { |
49
|
|
|
$this->brokerVendor = 'Apollo'; |
50
|
|
|
} |
51
|
|
|
return true; |
52
|
|
|
} else { |
53
|
|
|
if ($frame instanceof Frame) { |
54
|
|
|
throw new StompException("Unexpected command: {$frame->command}", 0, $frame->body); |
55
|
|
|
} else { |
56
|
|
|
throw new StompException("Connection not acknowledged"); |
57
|
|
|
} |
58
|
|
|
} |
59
|
|
|
} |
60
|
|
|
|
61
|
|
|
/** |
62
|
|
|
* Register to listen to a given destination. |
63
|
|
|
* Reimplemented to support Apollo, and subscriptions to both ActiveMQ queues and topics |
64
|
|
|
* |
65
|
|
|
* @param string $destination Destination queue |
66
|
|
|
* @param array $properties |
67
|
|
|
* @param boolean $sync Perform request synchronously |
68
|
|
|
* @return boolean |
69
|
|
|
* @throws StompException |
70
|
|
|
*/ |
71
|
|
|
public function subscribe ($destination, $properties = null, $sync = null) |
72
|
|
|
{ |
73
|
|
|
$headers = array('ack' => 'client'); |
74
|
|
|
if ($this->brokerVendor == 'AMQ') { |
75
|
|
|
$headers['activemq.prefetchSize'] = $this->prefetchSize; |
76
|
|
|
} else if ($this->brokerVendor == 'RMQ') { |
77
|
|
|
$headers['prefetch-count'] = $this->prefetchSize; |
78
|
|
|
} |
79
|
|
|
|
80
|
|
|
if ($this->clientId != null) { |
81
|
|
|
if ($this->brokerVendor == 'AMQ') { |
82
|
|
|
if (strpos($destination, '/queue/') !== 0) { |
83
|
|
|
$headers['activemq.subscriptionName'] = $this->clientId; |
84
|
|
|
} |
85
|
|
|
} else if ($this->brokerVendor == 'RMQ' || $this->brokerVendor == 'Apollo') { |
86
|
|
|
$headers['id'] = $this->clientId; |
87
|
|
|
} |
88
|
|
|
} |
89
|
|
|
|
90
|
|
|
if (isset($properties)) { |
91
|
|
|
foreach ($properties as $name => $value) { |
92
|
|
|
$headers[$name] = $value; |
93
|
|
|
} |
94
|
|
|
} |
95
|
|
|
$headers['destination'] = $destination; |
96
|
|
|
$frame = new Frame('SUBSCRIBE', $headers); |
97
|
|
|
$this->_prepareReceipt($frame, $sync); |
|
|
|
|
98
|
|
|
$this->_writeFrame($frame); |
99
|
|
|
if ($this->_waitForReceipt($frame, $sync) == true) { |
|
|
|
|
100
|
|
|
$this->_subscriptions[$destination] = $properties; |
101
|
|
|
return true; |
102
|
|
|
} else { |
103
|
|
|
return false; |
104
|
|
|
} |
105
|
|
|
} |
106
|
|
|
|
107
|
|
|
/** |
108
|
|
|
* Write frame to server. Reimplemented to add debug support |
109
|
|
|
* |
110
|
|
|
* @param Frame $stompFrame |
111
|
|
|
* @throws StompException |
112
|
|
|
*/ |
113
|
|
|
protected function _writeFrame (Frame $stompFrame) |
114
|
|
|
{ |
115
|
|
|
if (!is_resource($this->_socket)) { |
116
|
|
|
throw new StompException('Socket connection hasn\'t been established'); |
117
|
|
|
} |
118
|
|
|
|
119
|
|
|
$data = $stompFrame->__toString(); |
120
|
|
|
$r = fwrite($this->_socket, $data, strlen($data)); |
121
|
|
|
if ($r === false || $r == 0) { |
122
|
|
|
$this->_reconnect(); |
123
|
|
|
$this->_writeFrame($stompFrame); |
124
|
|
|
} else { |
125
|
|
|
if ($this->debug) { |
126
|
|
|
echo "STOMP FRAME SENT:\n ".str_replace("\n", "\n ", $data)."\n"; |
127
|
|
|
} |
128
|
|
|
} |
129
|
|
|
} |
130
|
|
|
|
131
|
|
|
/** |
132
|
|
|
* Read response frame from server |
133
|
|
|
* |
134
|
|
|
* @return Frame False when no frame to read |
135
|
|
|
*/ |
136
|
|
|
public function readFrame () |
137
|
|
|
{ |
138
|
|
|
if (!empty($this->_waitbuf)) { |
139
|
|
|
return array_shift($this->_waitbuf); |
140
|
|
|
} |
141
|
|
|
|
142
|
|
|
if (!$this->hasFrameToRead()) { |
143
|
|
|
return false; |
|
|
|
|
144
|
|
|
} |
145
|
|
|
|
146
|
|
|
$rb = 1024; |
147
|
|
|
$data = ''; |
148
|
|
|
$end = false; |
149
|
|
|
|
150
|
|
|
do { |
151
|
|
|
$read = fgets($this->_socket, $rb); |
152
|
|
|
if ($read === false || $read === "") { |
153
|
|
|
$this->_reconnect(); |
154
|
|
|
return $this->readFrame(); |
155
|
|
|
} |
156
|
|
|
$data .= $read; |
157
|
|
|
if (strpos($data, "\x00") !== false) { |
158
|
|
|
$end = true; |
159
|
|
|
$data = trim($data, "\n"); |
160
|
|
|
} |
161
|
|
|
$len = strlen($data); |
162
|
|
|
} while ($len < 2 || $end == false); |
|
|
|
|
163
|
|
|
|
164
|
|
|
if ($this->debug) { |
165
|
|
|
echo "STOMP FRAME RECEIVED:\n ".str_replace("\n", "\n ", $data)."\n"; |
166
|
|
|
} |
167
|
|
|
|
168
|
|
|
list ($header, $body) = explode("\n\n", $data, 2); |
169
|
|
|
$header = explode("\n", $header); |
170
|
|
|
$headers = array(); |
171
|
|
|
$command = null; |
172
|
|
|
foreach ($header as $v) { |
173
|
|
|
if (isset($command)) { |
174
|
|
|
list ($name, $value) = explode(':', $v, 2); |
175
|
|
|
$headers[$name] = $value; |
176
|
|
|
} else { |
177
|
|
|
$command = $v; |
178
|
|
|
} |
179
|
|
|
} |
180
|
|
|
$frame = new Frame($command, $headers, trim($body)); |
181
|
|
|
if (isset($frame->headers['transformation']) && $frame->headers['transformation'] == 'jms-map-json') { |
182
|
|
|
return new Map($frame); |
183
|
|
|
} else { |
184
|
|
|
return $frame; |
185
|
|
|
} |
186
|
|
|
return $frame; |
|
|
|
|
187
|
|
|
} |
188
|
|
|
|
189
|
|
|
/** |
190
|
|
|
* Make socket connection to the server |
191
|
|
|
* Reimplemented to support forcestop |
192
|
|
|
* |
193
|
|
|
* @throws StompException |
194
|
|
|
*/ |
195
|
|
|
protected function _makeConnection() |
196
|
|
|
{ |
197
|
|
|
if (count($this->_hosts) == 0) { |
198
|
|
|
throw new StompException("No broker defined"); |
199
|
|
|
} |
200
|
|
|
|
201
|
|
|
// force disconnect, if previous established connection exists |
202
|
|
|
$this->disconnect(); |
203
|
|
|
|
204
|
|
|
$i = $this->_currentHost; |
205
|
|
|
$att = 0; |
206
|
|
|
$connected = false; |
207
|
|
|
$connect_errno = null; |
208
|
|
|
$connect_errstr = null; |
209
|
|
|
|
210
|
|
|
while (! $connected && $att ++ < $this->_attempts) { |
211
|
|
|
if (isset($this->_params['randomize']) && $this->_params['randomize'] == 'true') { |
212
|
|
|
$i = rand(0, count($this->_hosts) - 1); |
213
|
|
|
} else { |
214
|
|
|
$i = ($i + 1) % count($this->_hosts); |
215
|
|
|
} |
216
|
|
|
$broker = $this->_hosts[$i]; |
217
|
|
|
$host = $broker[0]; |
218
|
|
|
$port = $broker[1]; |
219
|
|
|
$scheme = $broker[2]; |
220
|
|
|
if ($port == null) { |
221
|
|
|
$port = $this->_defaultPort; |
222
|
|
|
} |
223
|
|
|
if ($this->_socket != null) { |
224
|
|
|
fclose($this->_socket); |
225
|
|
|
$this->_socket = null; |
226
|
|
|
} |
227
|
|
|
|
228
|
|
|
$this->_socket = @fsockopen($scheme . '://' . $host, $port, $connect_errno, $connect_errstr, $this->_connect_timeout_seconds); |
229
|
|
|
|
230
|
|
|
$this->maybeStopClient(); |
231
|
|
|
|
232
|
|
|
if (!is_resource($this->_socket) && $att >= $this->_attempts && !array_key_exists($i + 1, $this->_hosts)) { |
233
|
|
|
throw new StompException("Could not connect to $host:$port ($att/{$this->_attempts})"); |
234
|
|
|
} else if (is_resource($this->_socket)) { |
235
|
|
|
$connected = true; |
236
|
|
|
$this->_currentHost = $i; |
237
|
|
|
break; |
238
|
|
|
} |
239
|
|
|
} |
240
|
|
|
if (! $connected) { |
241
|
|
|
throw new StompException("Could not connect to a broker"); |
242
|
|
|
} |
243
|
|
|
} |
244
|
|
|
|
245
|
|
|
public function setHandleSignals($doHandle) |
246
|
|
|
{ |
247
|
|
|
$this->dispatchSignals = $doHandle; |
248
|
|
|
} |
249
|
|
|
|
250
|
|
|
public function forceStop($reason = '') |
251
|
|
|
{ |
252
|
|
|
$this->forceStop = true; |
253
|
|
|
$this->forceStopReason = $reason; |
254
|
|
|
} |
255
|
|
|
|
256
|
|
|
/** |
257
|
|
|
* Dispatches signals and throws an exception if user wants to stop. To be called at execution points when there is no data loss |
258
|
|
|
* |
259
|
|
|
* @throws ForcedStopException |
260
|
|
|
*/ |
261
|
|
|
protected function maybeStopClient() |
262
|
|
|
{ |
263
|
|
|
if ($this->dispatchSignals) { |
264
|
|
|
pcntl_signal_dispatch(); |
265
|
|
|
} |
266
|
|
|
|
267
|
|
|
if ($this->forceStop) { |
268
|
|
|
throw new ForcedStopException($this->forceStopReason); |
269
|
|
|
} |
270
|
|
|
} |
271
|
|
|
} |
272
|
|
|
|
This check looks at variables that have been passed in as parameters and are passed out again to other methods.
If the outgoing method call has stricter type requirements than the method itself, an issue is raised.
An additional type check may prevent trouble.