BaseAmqp   A
last analyzed

Complexity

Total Complexity 39

Size/Duplication

Total Lines 292
Duplicated Lines 0 %

Test Coverage

Coverage 56.44%

Importance

Changes 7
Bugs 4 Features 0
Metric Value
eloc 105
c 7
b 4
f 0
dl 0
loc 292
ccs 57
cts 101
cp 0.5644
rs 9.28
wmc 39

19 Methods

Rating   Name   Duplication   Size   Complexity  
A setQueueOptions() 0 3 1
A reconnect() 0 7 2
A setExchangeOptions() 0 11 3
A getEventDispatcher() 0 3 1
A setEventDispatcher() 0 5 1
A setRoutingKey() 0 3 1
A setConsumerOptions() 0 3 1
A disableAutoSetupFabric() 0 3 1
A setupFabric() 0 8 3
A close() 0 14 6
A queueDeclare() 0 23 5
A setLogger() 0 3 1
A __construct() 0 12 2
A queueBind() 0 5 2
A dispatchEvent() 0 6 2
A setChannel() 0 3 1
A __destruct() 0 3 1
A getChannel() 0 7 3
A exchangeDeclare() 0 16 2
1
<?php
2
3
namespace OldSound\RabbitMqBundle\RabbitMq;
4
5
use OldSound\RabbitMqBundle\Event\AMQPEvent;
6
use PhpAmqpLib\Channel\AMQPChannel;
7
use PhpAmqpLib\Connection\AbstractConnection;
8
use Psr\Log\LoggerInterface;
9
use Psr\Log\NullLogger;
10
use Symfony\Component\EventDispatcher\EventDispatcherInterface;
11
use Symfony\Contracts\EventDispatcher\EventDispatcherInterface as ContractsEventDispatcherInterface;
12
13
abstract class BaseAmqp
14
{
15
    protected $conn;
16
    protected $ch;
17
    protected $consumerTag;
18
    protected $exchangeDeclared = false;
19
    protected $queueDeclared = false;
20
    protected $routingKey = '';
21
    protected $autoSetupFabric = true;
22
    protected $basicProperties = ['content_type' => 'text/plain', 'delivery_mode' => 2];
23
24
    /**
25
     * @var LoggerInterface
26
     */
27
    protected $logger;
28
29
    protected $exchangeOptions = [
30
        'passive' => false,
31
        'durable' => true,
32
        'auto_delete' => false,
33
        'internal' => false,
34
        'nowait' => false,
35
        'arguments' => null,
36
        'ticket' => null,
37
        'declare' => true,
38
    ];
39
40
    protected $queueOptions = [
41
        'name' => '',
42
        'passive' => false,
43
        'durable' => true,
44
        'exclusive' => false,
45
        'auto_delete' => false,
46
        'nowait' => false,
47
        'arguments' => null,
48
        'ticket' => null,
49
        'declare' => true,
50
    ];
51
52
    protected $consumerOptions = [
53
        'no_ack' => false,
54
    ];
55
56
    /**
57
     * @var EventDispatcherInterface|null
58
     */
59
    protected $eventDispatcher = null;
60
61
    /**
62 66
     * @param AbstractConnection $conn
63
     * @param AMQPChannel|null   $ch
64 66
     * @param string|null        $consumerTag
65 66
     */
66
    public function __construct(AbstractConnection $conn, ?AMQPChannel $ch = null, $consumerTag = null)
67 66
    {
68 1
        $this->conn = $conn;
69
        $this->ch = $ch;
70
71 66
        if ($conn->connectOnConstruct()) {
72
            $this->getChannel();
73 66
        }
74 66
75
        $this->consumerTag = $consumerTag ?? sprintf("PHPPROCESS_%s_%s", gethostname(), getmypid());
76 47
77
        $this->logger = new NullLogger();
78 47
    }
79 47
80
    public function __destruct()
81 47
    {
82
        $this->close();
83 47
    }
84
85 41
    public function close()
86
    {
87
        if ($this->ch) {
88
            try {
89
                $this->ch->close();
90
            } catch (\Exception $e) {
91 47
                // ignore on shutdown
92
            }
93
        }
94
95
        if ($this->conn && $this->conn->isConnected()) {
96
            try {
97
                $this->conn->close();
98 47
            } catch (\Exception $e) {
99
                // ignore on shutdown
100
            }
101
        }
102
    }
103
104
    public function reconnect()
105
    {
106
        if (!$this->conn->isConnected()) {
107
            return;
108
        }
109
110
        $this->conn->reconnect();
111
    }
112 24
113
    /**
114 24
     * @return AMQPChannel
115 1
     */
116
    public function getChannel()
117
    {
118 24
        if (empty($this->ch) || null === $this->ch->getChannelId()) {
119
            $this->ch = $this->conn->channel();
120
        }
121
122
        return $this->ch;
123
    }
124
125
    /**
126 19
     * @param  AMQPChannel $ch
127
     *
128 19
     * @return void
129 19
     */
130
    public function setChannel(AMQPChannel $ch)
131
    {
132
        $this->ch = $ch;
133
    }
134
135
    /**
136 2
     * @throws \InvalidArgumentException
137
     * @param  array                     $options
138 2
     * @return void
139
     */
140
    public function setExchangeOptions(array $options = [])
141
    {
142 2
        if (!isset($options['name'])) {
143
            throw new \InvalidArgumentException('You must provide an exchange name');
144
        }
145
146 2
        if (empty($options['type'])) {
147 2
            throw new \InvalidArgumentException('You must provide an exchange type');
148
        }
149
150
        $this->exchangeOptions = array_merge($this->exchangeOptions, $options);
151
    }
152
153
    /**
154
     * @param  array $options
155
     * @return void
156
     */
157
    public function setQueueOptions(array $options = [])
158
    {
159
        $this->queueOptions = array_merge($this->queueOptions, $options);
160
    }
161
162 2
    /**
163
     * @param  array $options
164 2
     * @return void
165 2
     */
166
    public function setConsumerOptions(array $options = [])
167 2
    {
168
        $this->consumerOptions = array_merge($this->consumerOptions, $options);
169 2
    }
170 2
171
    /**
172
     * @param  string $routingKey
173 2
     * @return void
174 2
     */
175
    public function setRoutingKey($routingKey)
176 2
    {
177
        $this->routingKey = $routingKey;
178
    }
179
180
    public function setupFabric()
181 18
    {
182
        if (!$this->exchangeDeclared) {
183 18
            $this->exchangeDeclare();
184 18
        }
185
186
        if (!$this->queueDeclared) {
187
            $this->queueDeclare();
188
        }
189
    }
190
191
    /**
192
     * disables the automatic SetupFabric when using a consumer or producer
193
     */
194
    public function disableAutoSetupFabric()
195
    {
196
        $this->autoSetupFabric = false;
197 2
    }
198
199 2
    /**
200
     * @param LoggerInterface $logger
201
     */
202
    public function setLogger($logger)
203
    {
204
        $this->logger = $logger;
205
    }
206
207
    /**
208
     * Declares exchange
209
     */
210
    protected function exchangeDeclare()
211
    {
212
        if ($this->exchangeOptions['declare']) {
213
            $this->getChannel()->exchange_declare(
214 2
                $this->exchangeOptions['name'],
215
                $this->exchangeOptions['type'],
216
                $this->exchangeOptions['passive'],
217
                $this->exchangeOptions['durable'],
218
                $this->exchangeOptions['auto_delete'],
219
                $this->exchangeOptions['internal'],
220
                $this->exchangeOptions['nowait'],
221
                $this->exchangeOptions['arguments'],
222
                $this->exchangeOptions['ticket']
223
            );
224
225
            $this->exchangeDeclared = true;
226
        }
227
    }
228
229
    /**
230
     * Declares queue, creates if needed
231
     */
232
    protected function queueDeclare()
233
    {
234
        if ($this->queueOptions['declare']) {
235
            [$queueName, , ] = $this->getChannel()->queue_declare(
236
                $this->queueOptions['name'],
237
                $this->queueOptions['passive'],
238
                $this->queueOptions['durable'],
239
                $this->queueOptions['exclusive'],
240
                $this->queueOptions['auto_delete'],
241
                $this->queueOptions['nowait'],
242
                $this->queueOptions['arguments'],
243
                $this->queueOptions['ticket']
244
            );
245
246
            if (isset($this->queueOptions['routing_keys']) && count($this->queueOptions['routing_keys']) > 0) {
247
                foreach ($this->queueOptions['routing_keys'] as $routingKey) {
248
                    $this->queueBind($queueName, $this->exchangeOptions['name'], $routingKey, $this->queueOptions['arguments'] ?? []);
249
                }
250
            } else {
251
                $this->queueBind($queueName, $this->exchangeOptions['name'], $this->routingKey, $this->queueOptions['arguments'] ?? []);
252 2
            }
253
254
            $this->queueDeclared = true;
255 2
        }
256 2
    }
257
258 2
    /**
259
     * Binds queue to an exchange
260
     *
261
     * @param string $queue
262
     * @param string $exchange
263
     * @param string $routing_key
264
     */
265 20
    protected function queueBind($queue, $exchange, $routing_key, array $arguments = [])
266
    {
267 20
        // queue binding is not permitted on the default exchange
268
        if ('' !== $exchange) {
269 20
            $this->getChannel()->queue_bind($queue, $exchange, $routing_key, false, $arguments);
270
        }
271
    }
272
273
    /**
274
     * @param EventDispatcherInterface $eventDispatcher
275
     *
276 51
     * @return BaseAmqp
277
     */
278 51
    public function setEventDispatcher(EventDispatcherInterface $eventDispatcher)
279 21
    {
280 21
        $this->eventDispatcher = $eventDispatcher;
281
282
        return $this;
283
    }
284 51
285
    /**
286
     * @param string $eventName
287
     * @param AMQPEvent  $event
288
     */
289 50
    protected function dispatchEvent($eventName, AMQPEvent $event)
290
    {
291 50
        if ($this->getEventDispatcher() instanceof ContractsEventDispatcherInterface) {
292
            $this->getEventDispatcher()->dispatch(
293
                $event,
294
                $eventName
295
            );
296
        }
297
    }
298
299
    /**
300
     * @return EventDispatcherInterface|null
301
     */
302
    public function getEventDispatcher()
303
    {
304
        return $this->eventDispatcher;
305
    }
306
}
307