1
|
|
|
<?php |
2
|
|
|
|
3
|
|
|
/** |
4
|
|
|
* AppserverIo\Messaging\QueueConnection |
5
|
|
|
* |
6
|
|
|
* NOTICE OF LICENSE |
7
|
|
|
* |
8
|
|
|
* This source file is subject to the Open Software License (OSL 3.0) |
9
|
|
|
* that is available through the world-wide-web at this URL: |
10
|
|
|
* http://opensource.org/licenses/osl-3.0.php |
11
|
|
|
* |
12
|
|
|
* PHP version 5 |
13
|
|
|
* |
14
|
|
|
* @author Tim Wagner <[email protected]> |
15
|
|
|
* @copyright 2015 TechDivision GmbH <[email protected]> |
16
|
|
|
* @license http://opensource.org/licenses/osl-3.0.php Open Software License (OSL 3.0) |
17
|
|
|
* @link https://github.com/appserver-io/messaging |
18
|
|
|
* @link http://www.appserver.io |
19
|
|
|
*/ |
20
|
|
|
|
21
|
|
|
namespace AppserverIo\Messaging; |
22
|
|
|
|
23
|
|
|
use Guzzle\Http\Client; |
24
|
|
|
use Guzzle\Http\Exception\CurlException; |
25
|
|
|
use AppserverIo\Psr\Pms\MessageInterface; |
26
|
|
|
use AppserverIo\Properties\PropertiesInterface; |
27
|
|
|
use AppserverIo\Properties\Properties; |
28
|
|
|
use AppserverIo\Messaging\Utils\PropertyKeys; |
29
|
|
|
|
30
|
|
|
/** |
31
|
|
|
* A connection implementation that handles the connection to the message queue. |
32
|
|
|
* |
33
|
|
|
* @author Tim Wagner <[email protected]> |
34
|
|
|
* @copyright 2015 TechDivision GmbH <[email protected]> |
35
|
|
|
* @license http://opensource.org/licenses/osl-3.0.php Open Software License (OSL 3.0) |
36
|
|
|
* @link https://github.com/appserver-io/messaging |
37
|
|
|
* @link http://www.appserver.io |
38
|
|
|
*/ |
39
|
|
|
class QueueConnection |
40
|
|
|
{ |
41
|
|
|
|
42
|
|
|
/** |
43
|
|
|
* The default transport to use. |
44
|
|
|
* |
45
|
|
|
* @var string |
46
|
|
|
*/ |
47
|
|
|
const DEFAULT_TRANSPORT = 'http'; |
48
|
|
|
|
49
|
|
|
/** |
50
|
|
|
* The default client sockets IP address. |
51
|
|
|
* |
52
|
|
|
* @var string |
53
|
|
|
*/ |
54
|
|
|
const DEFAULT_ADDRESS = '127.0.0.1'; |
55
|
|
|
|
56
|
|
|
/** |
57
|
|
|
* The default client sockets port. |
58
|
|
|
* |
59
|
|
|
* @var integer |
60
|
|
|
*/ |
61
|
|
|
const DEFAULT_PORT = 8587; |
62
|
|
|
|
63
|
|
|
/** |
64
|
|
|
* The default index file. |
65
|
|
|
* |
66
|
|
|
* @var string |
67
|
|
|
*/ |
68
|
|
|
const DEFAULT_INDEX_FILE = 'index.mq'; |
69
|
|
|
|
70
|
|
|
/** |
71
|
|
|
* The name of the webapp using this client connection. |
72
|
|
|
* |
73
|
|
|
* @var string |
74
|
|
|
*/ |
75
|
|
|
protected $appName; |
76
|
|
|
|
77
|
|
|
/** |
78
|
|
|
* Holds an ArrayList with the initialized sessions. |
79
|
|
|
* |
80
|
|
|
* @var \ArrayObject |
81
|
|
|
*/ |
82
|
|
|
protected $sessions = null; |
83
|
|
|
|
84
|
|
|
/** |
85
|
|
|
* The message queue parser instance. |
86
|
|
|
* |
87
|
|
|
* @var \AppserverIo\Messaging\MessageQueueParser |
88
|
|
|
*/ |
89
|
|
|
protected $parser; |
90
|
|
|
|
91
|
|
|
/** |
92
|
|
|
* The HTTP client we use for connection to the persistence container. |
93
|
|
|
* |
94
|
|
|
* @var \Guzzle\Http\Client |
95
|
|
|
*/ |
96
|
|
|
protected $client; |
97
|
|
|
|
98
|
|
|
/** |
99
|
|
|
* The default properties for the context configuration. |
100
|
|
|
* |
101
|
|
|
* @var array |
102
|
|
|
*/ |
103
|
|
|
protected $defaultProperties = array( |
104
|
|
|
'transport' => QueueConnection::DEFAULT_TRANSPORT, |
105
|
|
|
'address' => QueueConnection::DEFAULT_ADDRESS, |
106
|
|
|
'port' => QueueConnection::DEFAULT_PORT, |
107
|
|
|
'indexFile' => QueueConnection::DEFAULT_INDEX_FILE |
108
|
|
|
); |
109
|
|
|
|
110
|
|
|
/** |
111
|
|
|
* Initializes the connection. |
112
|
|
|
* |
113
|
|
|
* @param string $appName Name of the webapp using this client connection |
114
|
|
|
* @param \AppserverIo\Properties\PropertiesInterface $properties The properties containing the connection parameters |
115
|
|
|
*/ |
116
|
|
|
public function __construct($appName = '', PropertiesInterface $properties = null) |
117
|
|
|
{ |
118
|
|
|
|
119
|
|
|
// set the application name |
120
|
|
|
$this->appName = $appName; |
121
|
|
|
|
122
|
|
|
// initialize the message queue parser and the session |
123
|
|
|
$this->parser = new MessageQueueParser(); |
124
|
|
|
$this->sessions = new \ArrayObject(); |
125
|
|
|
|
126
|
|
|
// initialize the default properties if no ones has been passed |
127
|
|
|
if ($properties == null) { |
128
|
|
|
// initialize the default properties |
129
|
|
|
$properties = new Properties(); |
130
|
|
|
foreach ($this->defaultProperties as $key => $value) { |
131
|
|
|
$properties->setProperty($key, $value); |
132
|
|
|
} |
133
|
|
|
} |
134
|
|
|
|
135
|
|
|
// inject the properties |
136
|
|
|
$this->injectProperties($properties); |
137
|
|
|
} |
138
|
|
|
|
139
|
|
|
/** |
140
|
|
|
* The properties used to create the connection. |
141
|
|
|
* |
142
|
|
|
* @param \AppserverIo\Properties\PropertiesInterface $properties The connection properties |
143
|
|
|
* |
144
|
|
|
* @return void |
145
|
|
|
*/ |
146
|
|
|
public function injectProperties(PropertiesInterface $properties) |
147
|
|
|
{ |
148
|
|
|
$this->properties = $properties; |
|
|
|
|
149
|
|
|
} |
150
|
|
|
|
151
|
|
|
/** |
152
|
|
|
* Return's the properties used to create the connection. |
153
|
|
|
* |
154
|
|
|
* @return \AppserverIo\Properties\PropertiesInterface The connection properties |
155
|
|
|
*/ |
156
|
|
|
public function getProperties() |
157
|
|
|
{ |
158
|
|
|
return $this->properties; |
|
|
|
|
159
|
|
|
} |
160
|
|
|
|
161
|
|
|
/** |
162
|
|
|
* Returns the parser to process the message. |
163
|
|
|
* |
164
|
|
|
* @return \AppserverIo\Messaging\MessageQueueParser The parser instance |
165
|
|
|
*/ |
166
|
|
|
public function getParser() |
167
|
|
|
{ |
168
|
|
|
return $this->parser; |
169
|
|
|
} |
170
|
|
|
|
171
|
|
|
/** |
172
|
|
|
* Sets the clients webapp name |
173
|
|
|
* |
174
|
|
|
* @param string $appName Name of the webapp using this client connection |
175
|
|
|
* |
176
|
|
|
* @return void |
177
|
|
|
*/ |
178
|
|
|
public function setAppName($appName) |
179
|
|
|
{ |
180
|
|
|
$this->appName = $appName; |
181
|
|
|
} |
182
|
|
|
|
183
|
|
|
/** |
184
|
|
|
* Returns the name of the webapp this connection is for |
185
|
|
|
* |
186
|
|
|
* @return string The webapp name |
187
|
|
|
*/ |
188
|
|
|
public function getAppName() |
189
|
|
|
{ |
190
|
|
|
return $this->appName; |
191
|
|
|
} |
192
|
|
|
|
193
|
|
|
/** |
194
|
|
|
* Returns the IP address or domain name of the server the |
195
|
|
|
* message queue is running on. |
196
|
|
|
* |
197
|
|
|
* @return string Holds the server to connect to |
198
|
|
|
*/ |
199
|
|
|
public function getAddress() |
200
|
|
|
{ |
201
|
|
|
return $this->getProperties()->getProperty(PropertyKeys::ADDRESS); |
202
|
|
|
} |
203
|
|
|
|
204
|
|
|
/** |
205
|
|
|
* Returns the port for the connection. |
206
|
|
|
* |
207
|
|
|
* @return integer The port to use |
208
|
|
|
*/ |
209
|
|
|
public function getPort() |
210
|
|
|
{ |
211
|
|
|
return $this->getProperties()->getProperty(PropertyKeys::PORT); |
212
|
|
|
} |
213
|
|
|
|
214
|
|
|
/** |
215
|
|
|
* Returns the transport to use. |
216
|
|
|
* |
217
|
|
|
* @return string The transport to use. |
218
|
|
|
*/ |
219
|
|
|
public function getTransport() |
220
|
|
|
{ |
221
|
|
|
return $this->getProperties()->getProperty(PropertyKeys::TRANSPORT); |
222
|
|
|
} |
223
|
|
|
|
224
|
|
|
/** |
225
|
|
|
* Returns the index file to use. |
226
|
|
|
* |
227
|
|
|
* @return string The index file to use. |
228
|
|
|
*/ |
229
|
|
|
public function getIndexFile() |
230
|
|
|
{ |
231
|
|
|
return $this->getProperties()->getProperty(PropertyKeys::INDEX_FILE); |
232
|
|
|
} |
233
|
|
|
|
234
|
|
|
/** |
235
|
|
|
* Creates the connection to the container. |
236
|
|
|
* |
237
|
|
|
* @return void |
238
|
|
|
*/ |
239
|
|
|
public function connect() |
240
|
|
|
{ |
241
|
|
|
$this->client = new Client($this->getBaseUrl()); |
242
|
|
|
} |
243
|
|
|
|
244
|
|
|
/** |
245
|
|
|
* Shutdown the connection to the container. |
246
|
|
|
* |
247
|
|
|
* @return void |
248
|
|
|
*/ |
249
|
|
|
public function disconnect() |
250
|
|
|
{ |
251
|
|
|
$this->client = null; |
252
|
|
|
} |
253
|
|
|
|
254
|
|
|
/** |
255
|
|
|
* Returns the socket the connection is based on. |
256
|
|
|
* |
257
|
|
|
* @return \Guzzle\Http\Client The socket instance |
258
|
|
|
*/ |
259
|
|
|
public function getSocket() |
260
|
|
|
{ |
261
|
|
|
return $this->client; |
262
|
|
|
} |
263
|
|
|
|
264
|
|
|
/** |
265
|
|
|
* Sends a Message to the server by writing it to the socket. |
266
|
|
|
* |
267
|
|
|
* @param \AppserverIo\Psr\Pms\MessageInterface $message Holds the message to send |
268
|
|
|
* @param boolean $validateResponse If this flag is TRUE, the queue connection validates the response code |
269
|
|
|
* |
270
|
|
|
* @return \AppserverIo\Messaging\QueueResponse The response of the message queue, or null |
271
|
|
|
* |
272
|
|
|
* @throws \Guzzle\Http\Exception\CurlException |
273
|
|
|
* @throws \Exception |
274
|
|
|
*/ |
275
|
|
|
public function send(MessageInterface $message, $validateResponse = false) |
276
|
|
|
{ |
277
|
|
|
// connect to the server if necessary |
278
|
|
|
$this->connect(); |
279
|
|
|
|
280
|
|
|
// serialize the message and write it to the socket |
281
|
|
|
$packed = MessageQueueProtocol::pack($message); |
282
|
|
|
|
283
|
|
|
// invoke the RMC with a number of retries |
284
|
|
|
$maxRetries = 0; |
285
|
|
|
$retry = true; |
286
|
|
|
while ($retry) { |
287
|
|
|
try { |
288
|
|
|
// send a POST request |
289
|
|
|
$request = $this->getSocket()->post($this->getPath(), array('timeout' => 5)); |
290
|
|
|
$request->setBody($packed); |
291
|
|
|
$response = $request->send(); |
292
|
|
|
|
293
|
|
|
$retry = false; |
294
|
|
|
|
|
|
|
|
295
|
|
|
} catch (CurlException $ce) { |
296
|
|
|
$maxRetries++; |
297
|
|
|
if ($maxRetries >= 5) { |
298
|
|
|
$retry = false; |
|
|
|
|
299
|
|
|
throw $ce; |
300
|
|
|
} |
301
|
|
|
} |
302
|
|
|
} |
303
|
|
|
|
304
|
|
|
// check if we should validate the response |
305
|
|
|
if ($validateResponse && $response->getStatusCode() !== 200) { |
|
|
|
|
306
|
|
|
throw new \Exception($response->getBody()); |
307
|
|
|
} |
308
|
|
|
} |
309
|
|
|
|
310
|
|
|
/** |
311
|
|
|
* Prepares path for the connection to the persistence container. |
312
|
|
|
* |
313
|
|
|
* @return string The path to define the persistence container module |
314
|
|
|
*/ |
315
|
|
|
protected function getPath() |
316
|
|
|
{ |
317
|
|
|
return sprintf('/%s/%s', $this->getAppName(), $this->getIndexFile()); |
318
|
|
|
} |
319
|
|
|
|
320
|
|
|
/** |
321
|
|
|
* Prepares the base URL we used for the connection |
322
|
|
|
* to the persistence container. |
323
|
|
|
* |
324
|
|
|
* @return string The default base URL |
325
|
|
|
*/ |
326
|
|
|
protected function getBaseUrl() |
327
|
|
|
{ |
328
|
|
|
return sprintf('%s://%s:%s', $this->getTransport(), $this->getAddress(), $this->getPort()); |
329
|
|
|
} |
330
|
|
|
|
331
|
|
|
/** |
332
|
|
|
* Initializes a new queue session instance, registers it |
333
|
|
|
* in the array with the open sessions and returns it. |
334
|
|
|
* |
335
|
|
|
* @return \AppserverIo\Messaging\QueueSession The initialized queue session instance |
336
|
|
|
*/ |
337
|
|
|
public function createQueueSession() |
338
|
|
|
{ |
339
|
|
|
return $this->sessions[] = new QueueSession($this); |
340
|
|
|
} |
341
|
|
|
} |
342
|
|
|
|
An attempt at access to an undefined property has been detected. This may either be a typographical error or the property has been renamed but there are still references to its old name.
If you really want to allow access to undefined properties, you can define magic methods to allow access. See the php core documentation on Overloading.