Completed
Pull Request — master (#10)
by Marc
03:01
created
src/Cmp/Queues/Infrastructure/AmqpLib/v26/RabbitMQ/Queue/QueueReader.php 2 patches
Unused Use Statements   -1 removed lines patch added patch discarded remove patch
@@ -10,7 +10,6 @@
 block discarded – undo
10 10
 use Cmp\Queues\Infrastructure\AmqpLib\v26\RabbitMQ\Queue\Config\QueueConfig;
11 11
 use PhpAmqpLib\Channel\AMQPChannel;
12 12
 use PhpAmqpLib\Connection\AMQPLazyConnection;
13
-use PhpAmqpLib\Exception\AMQPRuntimeException;
14 13
 use PhpAmqpLib\Exception\AMQPTimeoutException;
15 14
 use Psr\Log\LoggerInterface;
16 15
 
Please login to merge, or discard this patch.
Spacing   +5 added lines, -5 removed lines patch added patch discarded remove patch
@@ -96,17 +96,17 @@  discard block
 block discarded – undo
96 96
      * @throws ReaderException
97 97
      * @throws TimeoutReaderException
98 98
      */
99
-    public function read(callable $callback, $timeout=0)
99
+    public function read(callable $callback, $timeout = 0)
100 100
     {
101 101
         $this->initialize();
102 102
         $this->messageHandler->setCallback($callback);
103 103
 
104 104
         try {
105 105
             $this->consume($timeout);
106
-        } catch(AMQPTimeoutException $e) {
106
+        } catch (AMQPTimeoutException $e) {
107 107
             $this->stopConsuming();
108 108
             throw new TimeoutReaderException("Timed out at $timeout seconds while reading.", 0, $e);
109
-        } catch(\Exception $e) {
109
+        } catch (\Exception $e) {
110 110
             $this->stopConsuming();
111 111
             throw new ReaderException("Error occurred while reading", 0, $e);
112 112
         }
@@ -165,7 +165,7 @@  discard block
 block discarded – undo
165 165
     protected function consume($timeout)
166 166
     {
167 167
         if ($this->consumerTag === '') {
168
-            $this->logger->debug('Waiting for messages on queue:'.$this->queueConfig->getName());
168
+            $this->logger->debug('Waiting for messages on queue:' . $this->queueConfig->getName());
169 169
             $this->consumerTag = $this->channel->basic_consume(
170 170
                 $this->queueConfig->getName(),
171 171
                 '',
@@ -206,7 +206,7 @@  discard block
 block discarded – undo
206 206
         if ($this->consumerTag) {
207 207
             try {
208 208
                 $this->channel->basic_cancel($this->consumerTag);
209
-            } catch(\Exception $e) {
209
+            } catch (\Exception $e) {
210 210
             }
211 211
 
212 212
             $this->consumerTag = '';
Please login to merge, or discard this patch.
src/Cmp/Queues/Domain/Event/Subscriber.php 1 patch
Spacing   +6 added lines, -6 removed lines patch added patch discarded remove patch
@@ -43,16 +43,16 @@  discard block
 block discarded – undo
43 43
         return $this;
44 44
     }
45 45
 
46
-    public function start($timeout=0)
46
+    public function start($timeout = 0)
47 47
     {
48
-        if(!isset($this->subscriptors[0])) {
48
+        if (!isset($this->subscriptors[0])) {
49 49
             throw new DomainEventException('You must add at least 1 EventSubscriptor in order to publish start reading from queue.');
50 50
         }
51 51
 
52
-        while(true) {
52
+        while (true) {
53 53
             try {
54 54
                 $this->queueReader->read(array($this, 'notify'), $timeout);
55
-            } catch(TimeoutReaderException $e) {
55
+            } catch (TimeoutReaderException $e) {
56 56
                 break;
57 57
             }
58 58
         }
@@ -64,8 +64,8 @@  discard block
 block discarded – undo
64 64
     public function notify(DomainEvent $domainEvent)
65 65
     {
66 66
         $this->logger->info('Domain Event received, notifying subscribers');
67
-        foreach($this->subscriptors as $subscriptor) {
68
-            if($subscriptor->isSubscribed($domainEvent)) {
67
+        foreach ($this->subscriptors as $subscriptor) {
68
+            if ($subscriptor->isSubscribed($domainEvent)) {
69 69
                 $subscriptor->notify($domainEvent);
70 70
             }
71 71
         }
Please login to merge, or discard this patch.
src/Cmp/Queues/Domain/Task/Consumer.php 1 patch
Spacing   +3 added lines, -3 removed lines patch added patch discarded remove patch
@@ -25,12 +25,12 @@
 block discarded – undo
25 25
      * @param callable $callback Callable that'll be invoked when a message is received
26 26
      * @param int      $timeout (optional) If specified, the process will block a max of $timeout seconds. Indefinitely if 0
27 27
      */
28
-    public function consume(callable $callback, $timeout=0)
28
+    public function consume(callable $callback, $timeout = 0)
29 29
     {
30
-        while(true) {
30
+        while (true) {
31 31
             try {
32 32
                 $this->queueReader->read($callback, $timeout);
33
-            } catch(TimeoutReaderException $e) {
33
+            } catch (TimeoutReaderException $e) {
34 34
                 break;
35 35
             }
36 36
         }
Please login to merge, or discard this patch.