@@ -240,6 +240,9 @@ |
||
240 | 240 | } |
241 | 241 | } |
242 | 242 | |
243 | + /** |
|
244 | + * @param boolean $doHandle |
|
245 | + */ |
|
243 | 246 | public function setHandleSignals($doHandle) |
244 | 247 | { |
245 | 248 | $this->dispatchSignals = $doHandle; |
@@ -21,7 +21,7 @@ discard block |
||
21 | 21 | * @return boolean |
22 | 22 | * @throws StompException |
23 | 23 | */ |
24 | - public function connect ($username = '', $password = '') |
|
24 | + public function connect($username = '', $password = '') |
|
25 | 25 | { |
26 | 26 | $this->_makeConnection(); |
27 | 27 | if ($username != '') { |
@@ -30,7 +30,7 @@ discard block |
||
30 | 30 | if ($password != '') { |
31 | 31 | $this->_password = $password; |
32 | 32 | } |
33 | - $headers = array('login' => $this->_username , 'passcode' => $this->_password); |
|
33 | + $headers = array('login' => $this->_username, 'passcode' => $this->_password); |
|
34 | 34 | if ($this->clientId != null) { |
35 | 35 | $headers["client-id"] = $this->clientId; |
36 | 36 | } |
@@ -66,7 +66,7 @@ discard block |
||
66 | 66 | * @return boolean |
67 | 67 | * @throws StompException |
68 | 68 | */ |
69 | - public function subscribe ($destination, $properties = null, $sync = null) |
|
69 | + public function subscribe($destination, $properties = null, $sync = null) |
|
70 | 70 | { |
71 | 71 | $headers = array('ack' => 'client'); |
72 | 72 | if ($this->brokerVendor == 'AMQ') { |
@@ -108,7 +108,7 @@ discard block |
||
108 | 108 | * @param Frame $stompFrame |
109 | 109 | * @throws StompException |
110 | 110 | */ |
111 | - protected function _writeFrame (Frame $stompFrame) |
|
111 | + protected function _writeFrame(Frame $stompFrame) |
|
112 | 112 | { |
113 | 113 | if (!is_resource($this->_socket)) { |
114 | 114 | throw new StompException('Socket connection hasn\'t been established'); |
@@ -131,7 +131,7 @@ discard block |
||
131 | 131 | * |
132 | 132 | * @return Frame False when no frame to read |
133 | 133 | */ |
134 | - public function readFrame () |
|
134 | + public function readFrame() |
|
135 | 135 | { |
136 | 136 | if (!empty($this->_waitbuf)) { |
137 | 137 | return array_shift($this->_waitbuf); |
@@ -205,7 +205,7 @@ discard block |
||
205 | 205 | $connect_errno = null; |
206 | 206 | $connect_errstr = null; |
207 | 207 | |
208 | - while (! $connected && $att ++ < $this->_attempts) { |
|
208 | + while (!$connected && $att++ < $this->_attempts) { |
|
209 | 209 | if (isset($this->_params['randomize']) && $this->_params['randomize'] == 'true') { |
210 | 210 | $i = rand(0, count($this->_hosts) - 1); |
211 | 211 | } else { |
@@ -223,7 +223,7 @@ discard block |
||
223 | 223 | $this->_socket = null; |
224 | 224 | } |
225 | 225 | |
226 | - $this->_socket = @fsockopen($scheme . '://' . $host, $port, $connect_errno, $connect_errstr, $this->_connect_timeout_seconds); |
|
226 | + $this->_socket = @fsockopen($scheme.'://'.$host, $port, $connect_errno, $connect_errstr, $this->_connect_timeout_seconds); |
|
227 | 227 | |
228 | 228 | $this->maybeStopClient(); |
229 | 229 | |
@@ -235,7 +235,7 @@ discard block |
||
235 | 235 | break; |
236 | 236 | } |
237 | 237 | } |
238 | - if (! $connected) { |
|
238 | + if (!$connected) { |
|
239 | 239 | throw new StompException("Could not connect to a broker"); |
240 | 240 | } |
241 | 241 | } |
@@ -2,7 +2,6 @@ |
||
2 | 2 | |
3 | 3 | namespace Kaliop\Queueing\Plugins\StompBundle\Adapter\Stomp; |
4 | 4 | |
5 | -use Kaliop\QueueingBundle\Service\MessageProducer as BaseMessageProducer; |
|
6 | 5 | use Symfony\Component\DependencyInjection\ContainerAwareInterface; |
7 | 6 | use Symfony\Component\DependencyInjection\ContainerInterface; |
8 | 7 | use InvalidArgumentException; |
@@ -39,7 +39,7 @@ |
||
39 | 39 | return array('list-configured'); |
40 | 40 | } |
41 | 41 | |
42 | - public function executeAction($action, array $arguments=array()) |
|
42 | + public function executeAction($action, array $arguments = array()) |
|
43 | 43 | { |
44 | 44 | switch ($action) { |
45 | 45 | case 'list-configured': |
@@ -59,7 +59,7 @@ discard block |
||
59 | 59 | |
60 | 60 | protected function setClientId() |
61 | 61 | { |
62 | - $newId = $this->subscriptionName . ($this->label != '' ? '_' . $this->label : ''); |
|
62 | + $newId = $this->subscriptionName.($this->label != '' ? '_'.$this->label : ''); |
|
63 | 63 | if ($newId != $this->client->clientId) { |
64 | 64 | $this->client->clientId = $newId; |
65 | 65 | $this->subscribed = false; |
@@ -87,7 +87,7 @@ discard block |
||
87 | 87 | */ |
88 | 88 | public function setCallback($callback) |
89 | 89 | { |
90 | - if (! $callback instanceof \Kaliop\QueueingBundle\Queue\MessageConsumerInterface) { |
|
90 | + if (!$callback instanceof \Kaliop\QueueingBundle\Queue\MessageConsumerInterface) { |
|
91 | 91 | throw new \RuntimeException('Can not set callback to Stomp Consumer, as it is not a MessageConsumerInterface'); |
92 | 92 | } |
93 | 93 | $this->callback = $callback; |
@@ -111,7 +111,7 @@ discard block |
||
111 | 111 | * @param int $timeout seconds |
112 | 112 | * @return nothing |
113 | 113 | */ |
114 | - public function consume($amount, $timeout=0) |
|
114 | + public function consume($amount, $timeout = 0) |
|
115 | 115 | { |
116 | 116 | $toConsume = $amount; |
117 | 117 | if ($timeout > 0) { |
@@ -123,7 +123,7 @@ discard block |
||
123 | 123 | |
124 | 124 | $this->subscribe(); |
125 | 125 | |
126 | - while(true) { |
|
126 | + while (true) { |
|
127 | 127 | if ($timeout > 0) { |
128 | 128 | $this->client->setReadTimeout($remaining); |
129 | 129 | } |
@@ -131,7 +131,7 @@ discard block |
||
131 | 131 | $message = $this->client->readFrame(); |
132 | 132 | |
133 | 133 | if ($message !== false) { |
134 | - switch($message->command) |
|
134 | + switch ($message->command) |
|
135 | 135 | { |
136 | 136 | case 'MESSAGE': |
137 | 137 | $this->client->ack($message); |
@@ -160,11 +160,11 @@ discard block |
||
160 | 160 | } |
161 | 161 | } |
162 | 162 | |
163 | - protected function getClientProperties(array $additionalProperties = array(), $command='') |
|
163 | + protected function getClientProperties(array $additionalProperties = array(), $command = '') |
|
164 | 164 | { |
165 | 165 | $result = $additionalProperties; |
166 | 166 | |
167 | - switch($command) |
|
167 | + switch ($command) |
|
168 | 168 | { |
169 | 169 | case 'SUBSCRIBE'; |
170 | 170 | //$result = array_merge(array('persistent' => 'true'), $result); |
@@ -33,7 +33,7 @@ discard block |
||
33 | 33 | public function publish($msgBody, $routingKey = '', $additionalProperties = array()) |
34 | 34 | { |
35 | 35 | $this->connect(); |
36 | - if (! $this->client->send( |
|
36 | + if (!$this->client->send( |
|
37 | 37 | $this->getFullQueueName($routingKey), |
38 | 38 | $msgBody, |
39 | 39 | $this->getClientProperties($additionalProperties), |
@@ -51,8 +51,8 @@ discard block |
||
51 | 51 | { |
52 | 52 | $this->connect(); |
53 | 53 | |
54 | - foreach($messages as $message) { |
|
55 | - if (! $this->client->send( |
|
54 | + foreach ($messages as $message) { |
|
55 | + if (!$this->client->send( |
|
56 | 56 | $this->getFullQueueName(@$message['routingKey']), |
57 | 57 | $message['msgBody'], |
58 | 58 | $this->getClientProperties(@$message['additionalProperties']), |
@@ -84,15 +84,15 @@ |
||
84 | 84 | { |
85 | 85 | $queueName = $this->stompQueueName; |
86 | 86 | if ($routingKey != '') { |
87 | - switch($this->client->brokerVendor) { |
|
87 | + switch ($this->client->brokerVendor) { |
|
88 | 88 | case 'Apollo': |
89 | 89 | $routingKey = str_replace('#', '**', $routingKey); |
90 | - $queueName = rtrim($queueName, '.') . '.' . ltrim($routingKey, '.'); |
|
90 | + $queueName = rtrim($queueName, '.').'.'.ltrim($routingKey, '.'); |
|
91 | 91 | break; |
92 | 92 | case 'AMQ': |
93 | 93 | default: |
94 | 94 | $routingKey = str_replace('#', '>', $routingKey); |
95 | - $queueName = rtrim($queueName, '.') . '.' . ltrim($routingKey, '.'); |
|
95 | + $queueName = rtrim($queueName, '.').'.'.ltrim($routingKey, '.'); |
|
96 | 96 | } |
97 | 97 | } |
98 | 98 | return $queueName; |
@@ -117,7 +117,7 @@ |
||
117 | 117 | * @param string $routingKey |
118 | 118 | * @return Consumer |
119 | 119 | */ |
120 | - public function createConsumer($queueName, $queueDestination, $connectionId, $subscriptionName, $callback=null, $routingKey=null) |
|
120 | + public function createConsumer($queueName, $queueDestination, $connectionId, $subscriptionName, $callback = null, $routingKey = null) |
|
121 | 121 | { |
122 | 122 | $class = $this->container->getParameter('kaliop_queueing.stomp.consumer.class'); |
123 | 123 | $consumer = new $class($this->getConnectionConfig($connectionId)); |