AMQPSubscribeEventQueue   A
last analyzed

Complexity

Total Complexity 12

Size/Duplication

Total Lines 154
Duplicated Lines 20.78 %

Coupling/Cohesion

Components 1
Dependencies 4

Test Coverage

Coverage 100%

Importance

Changes 0
Metric Value
wmc 12
lcom 1
cbo 4
dl 32
loc 154
ccs 50
cts 50
cp 1
rs 10
c 0
b 0
f 0

6 Methods

Rating   Name   Duplication   Size   Complexity  
A __construct() 0 7 1
A publish() 0 8 1
A subscribe() 0 26 3
A unsubscribe() 12 12 2
A declareQueue() 0 8 2
A handle() 20 20 3

How to fix   Duplicated Code   

Duplicated Code

Duplicate code is one of the most pungent code smells. A rule that is often used is to re-structure code once it is duplicated in three or more places.

Common duplication problems, and corresponding solutions are:

1
<?php
2
3
/**
4
 * GpsLab component.
5
 *
6
 * @author    Peter Gribanov <[email protected]>
7
 * @copyright Copyright (c) 2011, Peter Gribanov
8
 * @license   http://opensource.org/licenses/MIT
9
 */
10
11
namespace GpsLab\Domain\Event\Queue\Subscribe;
12
13
use GpsLab\Domain\Event\Event;
14
use GpsLab\Domain\Event\Queue\Serializer\Serializer;
15
use PhpAmqpLib\Channel\AMQPChannel;
16
use PhpAmqpLib\Message\AMQPMessage;
17
use Psr\Log\LoggerInterface;
18
19
class AMQPSubscribeEventQueue implements SubscribeEventQueue
20
{
21
    /**
22
     * @var AMQPChannel
23
     */
24
    private $channel;
25
26
    /**
27
     * @var Serializer
28
     */
29
    private $serializer;
30
31
    /**
32
     * @var LoggerInterface
33
     */
34
    private $logger;
35
36
    /**
37
     * @var callable[]
38
     */
39
    private $handlers = [];
40
41
    /**
42
     * @var string
43
     */
44
    private $queue_name = '';
45
46
    /**
47
     * @var bool
48
     */
49
    private $subscribed = false;
50
51
    /**
52
     * @var bool
53
     */
54
    private $declared = false;
55
56
    /**
57
     * @param AMQPChannel     $channel
58
     * @param Serializer      $serializer
59
     * @param LoggerInterface $logger
60
     * @param string          $queue_name
61
     */
62 5
    public function __construct(AMQPChannel $channel, Serializer $serializer, LoggerInterface $logger, $queue_name)
63
    {
64 5
        $this->channel = $channel;
65 5
        $this->serializer = $serializer;
66 5
        $this->logger = $logger;
67 5
        $this->queue_name = $queue_name;
68 5
    }
69
70
    /**
71
     * Publish event to queue.
72
     *
73
     * @param Event $event
74
     *
75
     * @return bool
76
     */
77 1
    public function publish(Event $event)
78
    {
79 1
        $message = $this->serializer->serialize($event);
80 1
        $this->declareQueue();
81 1
        $this->channel->basic_publish(new AMQPMessage($message), '', $this->queue_name);
82
83 1
        return true;
84
    }
85
86
    /**
87
     * Subscribe on event queue.
88
     *
89
     * @throws \ErrorException
90
     *
91
     * @param callable $handler
92
     */
93 4
    public function subscribe(callable $handler)
94
    {
95 4
        $this->handlers[] = $handler;
96
97
        // laze subscribe
98 4
        if (!$this->subscribed) {
99 4
            $this->declareQueue();
100 4
            $this->channel->basic_consume(
101 4
                $this->queue_name,
102 4
                '',
103 4
                false,
104 4
                true,
105 4
                false,
106 4
                false,
107 4
                function (AMQPMessage $message) {
108 3
                    $this->handle($message->body);
109 4
                }
110
            );
111
112 3
            $this->subscribed = true;
113
        }
114
115 3
        while ($this->channel->is_consuming()) {
116 1
            $this->channel->wait();
117
        }
118 3
    }
119
120
    /**
121
     * Unsubscribe on event queue.
122
     *
123
     * @param callable $handler
124
     *
125
     * @return bool
126
     */
127 1 View Code Duplication
    public function unsubscribe(callable $handler)
128
    {
129 1
        $index = array_search($handler, $this->handlers);
130
131 1
        if ($index === false) {
132 1
            return false;
133
        }
134
135 1
        unset($this->handlers[$index]);
136
137 1
        return true;
138
    }
139
140 5
    private function declareQueue()
141
    {
142
        // laze declare queue
143 5
        if (!$this->declared) {
144 5
            $this->channel->queue_declare($this->queue_name, false, false, false, false);
145 5
            $this->declared = true;
146
        }
147 5
    }
148
149
    /**
150
     * @param string $message
151
     */
152 3 View Code Duplication
    private function handle($message)
153
    {
154
        try {
155 3
            $event = $this->serializer->deserialize($message);
156 1
        } catch (\Exception $e) { // catch only deserialize exception
157
            // it's a critical error
158
            // it is necessary to react quickly to it
159 1
            $this->logger->critical('Failed denormalize a event in the AMQP queue', [$message, $e->getMessage()]);
160
161
            // try denormalize in later
162 1
            $this->declareQueue();
163 1
            $this->channel->basic_publish(new AMQPMessage($message), '', $this->queue_name);
164
165 1
            return; // no event for handle
166
        }
167
168 2
        foreach ($this->handlers as $handler) {
169 2
            call_user_func($handler, $event);
170
        }
171 1
    }
172
}
173