ConsumerWrapper   A
last analyzed

Complexity

Total Complexity 10

Size/Duplication

Total Lines 118
Duplicated Lines 0 %

Coupling/Cohesion

Components 1
Dependencies 2

Test Coverage

Coverage 24.32%

Importance

Changes 0
Metric Value
wmc 10
lcom 1
cbo 2
dl 0
loc 118
ccs 9
cts 37
cp 0.2432
rs 10
c 0
b 0
f 0

5 Methods

Rating   Name   Duplication   Size   Complexity  
A __construct() 0 11 1
A consume() 0 14 3
A setLogger() 0 4 1
A log() 0 6 2
B doConsume() 0 24 3
1
<?php
2
3
namespace Happyr\Mq2phpBundle\Service;
4
5
use Psr\Log\LoggerAwareInterface;
6
use Psr\Log\LoggerInterface;
7
use SimpleBus\Asynchronous\Consumer\SerializedEnvelopeConsumer;
8
9
/**
10
 * This class delegates a message to the CommandConsumer or EventConsumer depending on the queue name.
11
 *
12
 * @author Tobias Nyholm <[email protected]>
13
 */
14
class ConsumerWrapper implements LoggerAwareInterface
15
{
16
    /**
17
     * @var SerializedEnvelopeConsumer
18
     */
19
    private $commandConsumer;
20
21
    /**
22
     * @var SerializedEnvelopeConsumer
23
     */
24
    private $eventConsumer;
25
26
    /**
27
     * @var string
28
     */
29
    private $commandQueueName;
30
31
    /**
32
     * @var string
33
     */
34
    private $eventQueueName;
35
36
    /**
37
     * @var LoggerInterface
38
     */
39
    private $logger;
40
41
    /**
42
     * @param string                     $commandQueueName
43
     * @param string                     $eventQueueName
44
     * @param SerializedEnvelopeConsumer $commandConsumer
45
     * @param SerializedEnvelopeConsumer $eventConsumer
46
     */
47 1
    public function __construct(
48
        $commandQueueName,
49
        $eventQueueName,
50
        SerializedEnvelopeConsumer $commandConsumer = null,
51
        SerializedEnvelopeConsumer $eventConsumer = null
52
    ) {
53 1
        $this->commandConsumer = $commandConsumer;
54 1
        $this->eventConsumer = $eventConsumer;
55 1
        $this->commandQueueName = $commandQueueName;
56 1
        $this->eventQueueName = $eventQueueName;
57 1
    }
58
59
    /**
60
     * @param $queueName
61
     * @param $message
62
     */
63
    public function consume($queueName, $message)
64
    {
65
        $this->log('info', sprintf('Consuming data from queue: %s', $queueName));
66
67
        if ($queueName === $this->eventQueueName) {
68
            $this->doConsume($queueName, $message, $this->eventConsumer);
69
            $this->log('info', sprintf('Data from queue %s was consumed by the event consumer', $queueName));
70
        } elseif ($queueName === $this->commandQueueName) {
71
            $this->doConsume($queueName, $message, $this->commandConsumer);
72
            $this->log('info', sprintf('Data from queue %s was consumed by the command consumer', $queueName));
73
        } else {
74
            $this->doConsume($queueName, $message);
75
        }
76
    }
77
78
    /**
79
     * @param LoggerInterface $logger
80
     */
81 1
    public function setLogger(LoggerInterface $logger)
82
    {
83 1
        $this->logger = $logger;
84 1
    }
85
86
    /**
87
     * @param string $level
88
     * @param string $message
89
     * @param array  $context
90
     */
91
    private function log($level, $message, array $context = [])
92
    {
93
        if ($this->logger) {
94
            $this->logger->log($level, $message, $context);
95
        }
96
    }
97
98
    /**
99
     * Consume a message and make sure we log errors.
100
     *
101
     * @param string                     $queueName
102
     * @param mixed                      $message
103
     * @param SerializedEnvelopeConsumer $consumer
104
     *
105
     * @throws \Exception
106
     */
107
    private function doConsume($queueName, $message, SerializedEnvelopeConsumer $consumer = null)
108
    {
109
        if ($consumer === null) {
110
            $exceptionMessage = sprintf('No consumer was found for queue named "%s"', $queueName);
111
            $this->log('alert', $exceptionMessage, ['message' => $message]);
112
113
            throw new \RuntimeException($exceptionMessage);
114
        }
115
116
        try {
117
            $consumer->consume($message);
118
        } catch (\Exception $e) {
119
            $this->log(
120
                'error',
121
                sprintf('Tried to handle message from queue %s but failed', $queueName),
122
                [
123
                    'exception' => $e,
124
                    'message' => $message,
125
                ]
126
            );
127
128
            throw $e;
129
        }
130
    }
131
}
132