@@ -18,6 +18,9 @@ |
||
| 18 | 18 | return $consumer; |
| 19 | 19 | } |
| 20 | 20 | |
| 21 | + /** |
|
| 22 | + * @param string $queueName |
|
| 23 | + */ |
|
| 21 | 24 | protected function getQueueCallback($queueName) |
| 22 | 25 | { |
| 23 | 26 | $callbacks = $this->container->getParameter('kaliop_queueing_kinesis.default.consumers'); |
@@ -14,7 +14,7 @@ |
||
| 14 | 14 | |
| 15 | 15 | public function __construct(Cache $cacheService) |
| 16 | 16 | { |
| 17 | - $this->cache = $cacheService; |
|
| 17 | + $this->cache = $cacheService; |
|
| 18 | 18 | } |
| 19 | 19 | |
| 20 | 20 | /** |
@@ -10,7 +10,7 @@ |
||
| 10 | 10 | protected $properties = array(); |
| 11 | 11 | protected $contentType; |
| 12 | 12 | |
| 13 | - public function __construct($body, array $properties = array(), $contentType='application/json') |
|
| 13 | + public function __construct($body, array $properties = array(), $contentType = 'application/json') |
|
| 14 | 14 | { |
| 15 | 15 | $this->body = $body; |
| 16 | 16 | $this->properties = $properties; |
@@ -44,7 +44,7 @@ |
||
| 44 | 44 | return array('info', 'list', 'delete'); |
| 45 | 45 | } |
| 46 | 46 | |
| 47 | - public function executeAction($action, array $arguments=array()) |
|
| 47 | + public function executeAction($action, array $arguments = array()) |
|
| 48 | 48 | { |
| 49 | 49 | switch ($action) { |
| 50 | 50 | case 'delete': |
@@ -70,7 +70,7 @@ discard block |
||
| 70 | 70 | public function batchPublish(array $messages) |
| 71 | 71 | { |
| 72 | 72 | $records = aray(); |
| 73 | - foreach($messages as $message) { |
|
| 73 | + foreach ($messages as $message) { |
|
| 74 | 74 | $records[] = array( |
| 75 | 75 | 'Data' => $message['msgBody'], |
| 76 | 76 | 'PartitionKey' => $message['routingKey'] |
@@ -121,7 +121,7 @@ discard block |
||
| 121 | 121 | */ |
| 122 | 122 | public function setContentType($contentType) |
| 123 | 123 | { |
| 124 | - if($contentType != 'application/json') { |
|
| 124 | + if ($contentType != 'application/json') { |
|
| 125 | 125 | throw new \Exception("Unsupported content-type for message serialization: $contentType. Only 'application/json' is supported"); |
| 126 | 126 | } |
| 127 | 127 | |
@@ -53,7 +53,7 @@ discard block |
||
| 53 | 53 | */ |
| 54 | 54 | public function setCallback($callback) |
| 55 | 55 | { |
| 56 | - if (! $callback instanceof \Kaliop\QueueingBundle\Queue\MessageConsumerInterface) { |
|
| 56 | + if (!$callback instanceof \Kaliop\QueueingBundle\Queue\MessageConsumerInterface) { |
|
| 57 | 57 | throw new \RuntimeException('Can not set callback to SQS Consumer, as it is not a MessageConsumerInterface'); |
| 58 | 58 | } |
| 59 | 59 | $this->callback = $callback; |
@@ -107,7 +107,7 @@ discard block |
||
| 107 | 107 | * @param int $timeout |
| 108 | 108 | * @return nothing |
| 109 | 109 | */ |
| 110 | - public function consume($amount, $timeout=0) |
|
| 110 | + public function consume($amount, $timeout = 0) |
|
| 111 | 111 | { |
| 112 | 112 | $iterator = $this->getInitialMessageIterator(); |
| 113 | 113 | |
@@ -117,7 +117,7 @@ discard block |
||
| 117 | 117 | $remaining = $timeout; |
| 118 | 118 | } |
| 119 | 119 | |
| 120 | - while(true) { |
|
| 120 | + while (true) { |
|
| 121 | 121 | $reqTime = microtime(true); |
| 122 | 122 | $result = $this->client->getRecords(array( |
| 123 | 123 | 'ShardIterator' => $iterator, |
@@ -131,7 +131,7 @@ discard block |
||
| 131 | 131 | $this->sequenceNumberStore->save($this->streamName, $this->shardId, $last['SequenceNumber']); |
| 132 | 132 | } |
| 133 | 133 | |
| 134 | - foreach($records as $record) { |
|
| 134 | + foreach ($records as $record) { |
|
| 135 | 135 | $data = $record['Data']; |
| 136 | 136 | unset($record['Data']); |
| 137 | 137 | $this->callback->receive(new Message($data, $record)); |