Completed
Pull Request — master (#10)
by Marc
03:01
created
Infrastructure/AmqpLib/v26/RabbitMQ/Queue/Config/ConnectionConfig.php 1 patch
Spacing   +1 added lines, -1 removed lines patch added patch discarded remove patch
@@ -16,7 +16,7 @@
 block discarded – undo
16 16
     protected $password;
17 17
     protected $vHost;
18 18
 
19
-    public function __construct($host, $port, $user, $password, $vHost='/')
19
+    public function __construct($host, $port, $user, $password, $vHost = '/')
20 20
     {
21 21
         $this->host = $host;
22 22
         $this->port = $port;
Please login to merge, or discard this patch.
Cmp/Queues/Infrastructure/AmqpLib/v26/RabbitMQ/Queue/DelayedQueueWriter.php 1 patch
Spacing   +10 added lines, -10 removed lines patch added patch discarded remove patch
@@ -68,7 +68,7 @@  discard block
 block discarded – undo
68 68
     {
69 69
         $this->delay = $delay;
70 70
         $this->exchangeName = $exchangeName;
71
-        $this->delayedExchangeName = self::DELAY_QUEUE_PREFIX.$this->delay.$this->exchangeName;
71
+        $this->delayedExchangeName = self::DELAY_QUEUE_PREFIX . $this->delay . $this->exchangeName;
72 72
         $this->logger = $logger;
73 73
         $this->channel = $channel;
74 74
     }
@@ -82,15 +82,15 @@  discard block
 block discarded – undo
82 82
     {
83 83
         $this->initialize();
84 84
         try {
85
-            foreach($messages as $message) {
85
+            foreach ($messages as $message) {
86 86
                 $encodedMessage = json_encode($message);
87 87
                 $this->logger->debug('Writing:' . $encodedMessage);
88 88
                 $msg = new AMQPMessage($encodedMessage, array('delivery_mode' => 2));
89 89
                 $this->channel->batch_basic_publish($msg, $this->delayedExchangeName, $message->getName());
90 90
             }
91 91
             $this->channel->publish_batch();
92
-        } catch(\Exception $exception) {
93
-            $this->logger->error('Error writing delayed messages: '.$exception->getMessage());
92
+        } catch (\Exception $exception) {
93
+            $this->logger->error('Error writing delayed messages: ' . $exception->getMessage());
94 94
             throw new WriterException($exception->getMessage(), $exception->getCode());
95 95
         }
96 96
     }
@@ -100,13 +100,13 @@  discard block
 block discarded – undo
100 100
      */
101 101
     protected function initialize()
102 102
     {
103
-        try{
104
-            $delayedQueue = self::DELAY_QUEUE_PREFIX.$this->delay.'Queue';
103
+        try {
104
+            $delayedQueue = self::DELAY_QUEUE_PREFIX . $this->delay . 'Queue';
105 105
 
106
-            $this->logger->info('Creating delayed exchange '.$this->delayedExchangeName);
106
+            $this->logger->info('Creating delayed exchange ' . $this->delayedExchangeName);
107 107
             // Delay Queue
108 108
             $this->channel->exchange_declare($this->delayedExchangeName, 'fanout', false, true, true);
109
-            $this->logger->info('Creating delayed queue '.$delayedQueue);
109
+            $this->logger->info('Creating delayed queue ' . $delayedQueue);
110 110
             $this->channel->queue_declare(
111 111
                 $delayedQueue,
112 112
                 false,
@@ -121,8 +121,8 @@  discard block
 block discarded – undo
121 121
                 ]
122 122
             );
123 123
             $this->channel->queue_bind($delayedQueue, $this->delayedExchangeName);
124
-        } catch(\Exception $exception) {
125
-            $this->logger->error('Error configuring delayed queues: '.$exception->getMessage());
124
+        } catch (\Exception $exception) {
125
+            $this->logger->error('Error configuring delayed queues: ' . $exception->getMessage());
126 126
             throw new WriterException($exception->getMessage(), $exception->getCode());
127 127
         }
128 128
     }
Please login to merge, or discard this patch.
src/Cmp/Queues/Infrastructure/AmqpLib/v26/RabbitMQ/Queue/QueueWriter.php 1 patch
Spacing   +7 added lines, -7 removed lines patch added patch discarded remove patch
@@ -64,8 +64,8 @@  discard block
 block discarded – undo
64 64
         $this->initialize();
65 65
         try {
66 66
             $messagesWithDelay = [];
67
-            foreach($messages as $message) {
68
-                if($message->getDelay() > 0) {
67
+            foreach ($messages as $message) {
68
+                if ($message->getDelay() > 0) {
69 69
                     $messagesWithDelay[$message->getDelay()][] = $message;
70 70
                     continue;
71 71
                 }
@@ -75,8 +75,8 @@  discard block
 block discarded – undo
75 75
                 $this->channel->batch_basic_publish($msg, $this->exchangeConfig->getName(), $message->getName());
76 76
             }
77 77
             $this->channel->publish_batch();
78
-            foreach($messagesWithDelay as $delay => $delayedMessages) {
79
-                if(!isset($this->delayedQueueWriterRegistry[$delay])) {
78
+            foreach ($messagesWithDelay as $delay => $delayedMessages) {
79
+                if (!isset($this->delayedQueueWriterRegistry[$delay])) {
80 80
                     $this->delayedQueueWriterRegistry[$delay] = new DelayedQueueWriter(
81 81
                         $this->exchangeConfig->getName(),
82 82
                         $delay,
@@ -86,8 +86,8 @@  discard block
 block discarded – undo
86 86
                 }
87 87
                 $this->delayedQueueWriterRegistry[$delay]->write($delayedMessages);
88 88
             }
89
-        } catch(\Exception $exception) {
90
-            $this->logger->error('Error writing messages: '.$exception->getMessage());
89
+        } catch (\Exception $exception) {
90
+            $this->logger->error('Error writing messages: ' . $exception->getMessage());
91 91
             throw new WriterException($exception->getMessage(), $exception->getCode());
92 92
         }
93 93
     }
@@ -97,7 +97,7 @@  discard block
 block discarded – undo
97 97
      */
98 98
     protected function initialize()
99 99
     {
100
-        if($this->channel) {
100
+        if ($this->channel) {
101 101
             return;
102 102
         }
103 103
         $this->logger->info('Connecting to RabbitMQ');
Please login to merge, or discard this patch.
src/Cmp/Queues/Domain/Event/DomainEvent.php 2 patches
Indentation   +2 added lines, -2 removed lines patch added patch discarded remove patch
@@ -36,8 +36,8 @@
 block discarded – undo
36 36
     public function __construct($origin, $name, $occurredOn, array $body = [])
37 37
     {
38 38
         $this->setOrigin($origin)
39
-             ->setName($name)
40
-             ->setOccurredOn($occurredOn)
39
+                ->setName($name)
40
+                ->setOccurredOn($occurredOn)
41 41
         ;
42 42
         $this->body = $body;
43 43
     }
Please login to merge, or discard this patch.
Spacing   +3 added lines, -3 removed lines patch added patch discarded remove patch
@@ -91,7 +91,7 @@  discard block
 block discarded – undo
91 91
      */
92 92
     protected function setOrigin($origin)
93 93
     {
94
-        if(empty($origin)) {
94
+        if (empty($origin)) {
95 95
             throw new DomainEventException('DomainEvent origin cannot be empty');
96 96
         }
97 97
         $this->origin = $origin;
@@ -105,7 +105,7 @@  discard block
 block discarded – undo
105 105
      */
106 106
     protected function setName($name)
107 107
     {
108
-        if(empty($name)) {
108
+        if (empty($name)) {
109 109
             throw new DomainEventException('DomainEvent name cannot be empty');
110 110
         }
111 111
         $this->name = $name;
@@ -119,7 +119,7 @@  discard block
 block discarded – undo
119 119
      */
120 120
     protected function setOccurredOn($occurredOn)
121 121
     {
122
-        if(!is_null($occurredOn) && !preg_match('/^\d+(\.\d{1,4})?$/', $occurredOn)) { // accepts also microseconds
122
+        if (!is_null($occurredOn) && !preg_match('/^\d+(\.\d{1,4})?$/', $occurredOn)) { // accepts also microseconds
123 123
             throw new DomainEventException("$occurredOn is not a valid unix timestamp.");
124 124
         }
125 125
         $this->occurredOn = $occurredOn;
Please login to merge, or discard this patch.
src/Cmp/Queues/Domain/Event/Publisher.php 1 patch
Spacing   +1 added lines, -1 removed lines patch added patch discarded remove patch
@@ -41,7 +41,7 @@
 block discarded – undo
41 41
      */
42 42
     public function publish()
43 43
     {
44
-        if(!isset($this->events[0])) {
44
+        if (!isset($this->events[0])) {
45 45
             throw new DomainEventException('You must add at least 1 DomainEvent in order to publish to queue.');
46 46
         }
47 47
         $this->queueWriter->write($this->events);
Please login to merge, or discard this patch.
src/Cmp/Queues/Domain/Task/Task.php 2 patches
Indentation   +1 added lines, -1 removed lines patch added patch discarded remove patch
@@ -31,7 +31,7 @@
 block discarded – undo
31 31
     public function __construct($name, array $body, $delay=0)
32 32
     {
33 33
         $this->setName($name)
34
-             ->setDelay($delay)
34
+                ->setDelay($delay)
35 35
         ;
36 36
         $this->body = $body;
37 37
     }
Please login to merge, or discard this patch.
Spacing   +3 added lines, -3 removed lines patch added patch discarded remove patch
@@ -28,7 +28,7 @@  discard block
 block discarded – undo
28 28
      * @param array $body
29 29
      * @param int $delay
30 30
      */
31
-    public function __construct($name, array $body, $delay=0)
31
+    public function __construct($name, array $body, $delay = 0)
32 32
     {
33 33
         $this->setName($name)
34 34
              ->setDelay($delay)
@@ -67,7 +67,7 @@  discard block
 block discarded – undo
67 67
      */
68 68
     protected function setName($name)
69 69
     {
70
-        if(empty($name)) {
70
+        if (empty($name)) {
71 71
             throw new TaskException('Task name cannot be empty');
72 72
         }
73 73
         $this->name = $name;
@@ -81,7 +81,7 @@  discard block
 block discarded – undo
81 81
      */
82 82
     protected function setDelay($delay)
83 83
     {
84
-        if(!is_null($delay) && !preg_match('/^\d+$/', $delay)) {
84
+        if (!is_null($delay) && !preg_match('/^\d+$/', $delay)) {
85 85
             throw new TaskException("Task delay $delay is not a valid delay.");
86 86
         }
87 87
         $this->delay = $delay;
Please login to merge, or discard this patch.
src/Cmp/Queues/Domain/Task/Producer.php 1 patch
Spacing   +1 added lines, -1 removed lines patch added patch discarded remove patch
@@ -40,7 +40,7 @@
 block discarded – undo
40 40
      */
41 41
     public function produce()
42 42
     {
43
-        if(!isset($this->tasks[0])) {
43
+        if (!isset($this->tasks[0])) {
44 44
             throw new TaskException('You must add at least 1 Task before producing.');
45 45
         }
46 46
         $this->queueWriter->write($this->tasks);
Please login to merge, or discard this patch.
src/Cmp/Queues/Domain/Queue/QueueReader.php 1 patch
Spacing   +1 added lines, -1 removed lines patch added patch discarded remove patch
@@ -17,7 +17,7 @@
 block discarded – undo
17 17
      * @throws ReaderException
18 18
      * @return void
19 19
      */
20
-    public function read(callable $callback, $timeout=0);
20
+    public function read(callable $callback, $timeout = 0);
21 21
 
22 22
     /**
23 23
      * Deletes all messages from the queue
Please login to merge, or discard this patch.
src/Cmp/Queues/Infrastructure/AmqpLib/v26/RabbitMQ/Queue/QueueReader.php 2 patches
Unused Use Statements   -1 removed lines patch added patch discarded remove patch
@@ -10,7 +10,6 @@
 block discarded – undo
10 10
 use Cmp\Queues\Infrastructure\AmqpLib\v26\RabbitMQ\Queue\Config\QueueConfig;
11 11
 use PhpAmqpLib\Channel\AMQPChannel;
12 12
 use PhpAmqpLib\Connection\AMQPLazyConnection;
13
-use PhpAmqpLib\Exception\AMQPRuntimeException;
14 13
 use PhpAmqpLib\Exception\AMQPTimeoutException;
15 14
 use Psr\Log\LoggerInterface;
16 15
 
Please login to merge, or discard this patch.
Spacing   +5 added lines, -5 removed lines patch added patch discarded remove patch
@@ -96,17 +96,17 @@  discard block
 block discarded – undo
96 96
      * @throws ReaderException
97 97
      * @throws TimeoutReaderException
98 98
      */
99
-    public function read(callable $callback, $timeout=0)
99
+    public function read(callable $callback, $timeout = 0)
100 100
     {
101 101
         $this->initialize();
102 102
         $this->messageHandler->setCallback($callback);
103 103
 
104 104
         try {
105 105
             $this->consume($timeout);
106
-        } catch(AMQPTimeoutException $e) {
106
+        } catch (AMQPTimeoutException $e) {
107 107
             $this->stopConsuming();
108 108
             throw new TimeoutReaderException("Timed out at $timeout seconds while reading.", 0, $e);
109
-        } catch(\Exception $e) {
109
+        } catch (\Exception $e) {
110 110
             $this->stopConsuming();
111 111
             throw new ReaderException("Error occurred while reading", 0, $e);
112 112
         }
@@ -165,7 +165,7 @@  discard block
 block discarded – undo
165 165
     protected function consume($timeout)
166 166
     {
167 167
         if ($this->consumerTag === '') {
168
-            $this->logger->debug('Waiting for messages on queue:'.$this->queueConfig->getName());
168
+            $this->logger->debug('Waiting for messages on queue:' . $this->queueConfig->getName());
169 169
             $this->consumerTag = $this->channel->basic_consume(
170 170
                 $this->queueConfig->getName(),
171 171
                 '',
@@ -206,7 +206,7 @@  discard block
 block discarded – undo
206 206
         if ($this->consumerTag) {
207 207
             try {
208 208
                 $this->channel->basic_cancel($this->consumerTag);
209
-            } catch(\Exception $e) {
209
+            } catch (\Exception $e) {
210 210
             }
211 211
 
212 212
             $this->consumerTag = '';
Please login to merge, or discard this patch.