Passed
Push — master ( d34844...01c7f3 )
by Mihai
03:03
created

BaseAmqp   A

Complexity

Total Complexity 39

Size/Duplication

Total Lines 272
Duplicated Lines 0 %

Test Coverage

Coverage 38.14%

Importance

Changes 6
Bugs 3 Features 0
Metric Value
eloc 97
c 6
b 3
f 0
dl 0
loc 272
ccs 37
cts 97
cp 0.3814
rs 9.28
wmc 39

18 Methods

Rating   Name   Duplication   Size   Complexity  
A setQueueOptions() 0 3 1
A reconnect() 0 7 2
A setExchangeOptions() 0 11 3
A setEventDispatcher() 0 5 1
A setRoutingKey() 0 3 1
A disableAutoSetupFabric() 0 3 1
A setupFabric() 0 8 3
A close() 0 14 6
A queueDeclare() 0 17 5
A setLogger() 0 3 1
A __construct() 0 12 3
A queueBind() 0 5 2
A setChannel() 0 3 1
A getChannel() 0 7 3
A exchangeDeclare() 0 15 2
A __destruct() 0 3 1
A getEventDispatcher() 0 3 1
A dispatchEvent() 0 6 2
1
<?php
2
3
namespace OldSound\RabbitMqBundle\RabbitMq;
4
5
use OldSound\RabbitMqBundle\Event\AMQPEvent;
6
use PhpAmqpLib\Channel\AMQPChannel;
7
use PhpAmqpLib\Connection\AbstractConnection;
8
use Psr\Log\LoggerInterface;
9
use Psr\Log\NullLogger;
10
use Symfony\Component\EventDispatcher\EventDispatcherInterface;
11
use Symfony\Contracts\EventDispatcher\EventDispatcherInterface as ContractsEventDispatcherInterface;
12
13
abstract class BaseAmqp
14
{
15
    protected $conn;
16
    protected $ch;
17
    protected $consumerTag;
18
    protected $exchangeDeclared = false;
19
    protected $queueDeclared = false;
20
    protected $routingKey = '';
21
    protected $autoSetupFabric = true;
22
    protected $basicProperties = array('content_type' => 'text/plain', 'delivery_mode' => 2);
23
24
    /**
25
     * @var LoggerInterface
26
     */
27
    protected $logger;
28
29
    protected $exchangeOptions = array(
30
        'passive' => false,
31
        'durable' => true,
32
        'auto_delete' => false,
33
        'internal' => false,
34
        'nowait' => false,
35
        'arguments' => null,
36
        'ticket' => null,
37
        'declare' => true,
38
    );
39
40
    protected $queueOptions = array(
41
        'name' => '',
42
        'passive' => false,
43
        'durable' => true,
44
        'exclusive' => false,
45
        'auto_delete' => false,
46
        'nowait' => false,
47
        'arguments' => null,
48
        'ticket' => null,
49
        'declare' => true,
50
    );
51
52
    /**
53
     * @var EventDispatcherInterface|null
54
     */
55
    protected $eventDispatcher = null;
56
57
    /**
58
     * @param AbstractConnection   $conn
59
     * @param AMQPChannel|null $ch
60
     * @param null             $consumerTag
0 ignored issues
show
Documentation Bug introduced by
Are you sure the doc-type for parameter $consumerTag is correct as it would always require null to be passed?
Loading history...
61
     */
62 60
    public function __construct(AbstractConnection $conn, AMQPChannel $ch = null, $consumerTag = null)
63
    {
64 60
        $this->conn = $conn;
65 60
        $this->ch = $ch;
66
67 60
        if ($conn->connectOnConstruct()) {
68 1
            $this->getChannel();
69
        }
70
71 60
        $this->consumerTag = empty($consumerTag) ? sprintf("PHPPROCESS_%s_%s", gethostname(), getmypid()) : $consumerTag;
72
73 60
        $this->logger = new NullLogger();
74 60
    }
75
76 41
    public function __destruct()
77
    {
78 41
        $this->close();
79 41
    }
80
81 41
    public function close()
82
    {
83 41
        if ($this->ch) {
84
            try {
85 35
                $this->ch->close();
86
            } catch (\Exception $e) {
87
                // ignore on shutdown
88
            }
89
        }
90
91 41
        if ($this->conn && $this->conn->isConnected()) {
92
            try {
93
                $this->conn->close();
94
            } catch (\Exception $e) {
95
                // ignore on shutdown
96
            }
97
        }
98 41
    }
99
100
    public function reconnect()
101
    {
102
        if (!$this->conn->isConnected()) {
103
            return;
104
        }
105
106
        $this->conn->reconnect();
107
    }
108
109
    /**
110
     * @return AMQPChannel
111
     */
112 16
    public function getChannel()
113
    {
114 16
        if (empty($this->ch) || null === $this->ch->getChannelId()) {
115 1
            $this->ch = $this->conn->channel();
116
        }
117
118 16
        return $this->ch;
119
    }
120
121
    /**
122
     * @param  AMQPChannel $ch
123
     *
124
     * @return void
125
     */
126 13
    public function setChannel(AMQPChannel $ch)
127
    {
128 13
        $this->ch = $ch;
129 13
    }
130
131
    /**
132
     * @throws \InvalidArgumentException
133
     * @param  array                     $options
134
     * @return void
135
     */
136
    public function setExchangeOptions(array $options = array())
137
    {
138
        if (!isset($options['name'])) {
139
            throw new \InvalidArgumentException('You must provide an exchange name');
140
        }
141
142
        if (empty($options['type'])) {
143
            throw new \InvalidArgumentException('You must provide an exchange type');
144
        }
145
146
        $this->exchangeOptions = array_merge($this->exchangeOptions, $options);
147
    }
148
149
    /**
150
     * @param  array $options
151
     * @return void
152
     */
153
    public function setQueueOptions(array $options = array())
154
    {
155
        $this->queueOptions = array_merge($this->queueOptions, $options);
156
    }
157
158
    /**
159
     * @param  string $routingKey
160
     * @return void
161
     */
162
    public function setRoutingKey($routingKey)
163
    {
164
        $this->routingKey = $routingKey;
165
    }
166
167
    public function setupFabric()
168
    {
169
        if (!$this->exchangeDeclared) {
170
            $this->exchangeDeclare();
171
        }
172
173
        if (!$this->queueDeclared) {
174
            $this->queueDeclare();
175
        }
176
    }
177
178
    /**
179
     * disables the automatic SetupFabric when using a consumer or producer
180
     */
181 12
    public function disableAutoSetupFabric()
182
    {
183 12
        $this->autoSetupFabric = false;
184 12
    }
185
186
    /**
187
     * @param LoggerInterface $logger
188
     */
189
    public function setLogger($logger)
190
    {
191
        $this->logger = $logger;
192
    }
193
194
    /**
195
     * Declares exchange
196
     */
197
    protected function exchangeDeclare()
198
    {
199
        if ($this->exchangeOptions['declare']) {
200
            $this->getChannel()->exchange_declare(
201
                $this->exchangeOptions['name'],
202
                $this->exchangeOptions['type'],
203
                $this->exchangeOptions['passive'],
204
                $this->exchangeOptions['durable'],
205
                $this->exchangeOptions['auto_delete'],
206
                $this->exchangeOptions['internal'],
207
                $this->exchangeOptions['nowait'],
208
                $this->exchangeOptions['arguments'],
209
                $this->exchangeOptions['ticket']);
210
211
            $this->exchangeDeclared = true;
212
        }
213
    }
214
215
    /**
216
     * Declares queue, creates if needed
217
     */
218
    protected function queueDeclare()
219
    {
220
        if ($this->queueOptions['declare']) {
221
            list($queueName, ,) = $this->getChannel()->queue_declare($this->queueOptions['name'], $this->queueOptions['passive'],
222
                $this->queueOptions['durable'], $this->queueOptions['exclusive'],
223
                $this->queueOptions['auto_delete'], $this->queueOptions['nowait'],
224
                $this->queueOptions['arguments'], $this->queueOptions['ticket']);
225
226
            if (isset($this->queueOptions['routing_keys']) && count($this->queueOptions['routing_keys']) > 0) {
227
                foreach ($this->queueOptions['routing_keys'] as $routingKey) {
228
                    $this->queueBind($queueName, $this->exchangeOptions['name'], $routingKey);
229
                }
230
            } else {
231
                $this->queueBind($queueName, $this->exchangeOptions['name'], $this->routingKey);
232
            }
233
234
            $this->queueDeclared = true;
235
        }
236
    }
237
238
    /**
239
     * Binds queue to an exchange
240
     *
241
     * @param string $queue
242
     * @param string $exchange
243
     * @param string $routing_key
244
     */
245
    protected function queueBind($queue, $exchange, $routing_key)
246
    {
247
        // queue binding is not permitted on the default exchange
248
        if ('' !== $exchange) {
249
            $this->getChannel()->queue_bind($queue, $exchange, $routing_key);
250
        }
251
    }
252
253
    /**
254
     * @param EventDispatcherInterface $eventDispatcher
255
     *
256
     * @return BaseAmqp
257
     */
258 20
    public function setEventDispatcher(EventDispatcherInterface $eventDispatcher)
259
    {
260 20
        $this->eventDispatcher = $eventDispatcher;
261
262 20
        return $this;
263
    }
264
265
    /**
266
     * @param string $eventName
267
     * @param AMQPEvent  $event
268
     */
269 43
    protected function dispatchEvent($eventName, AMQPEvent $event)
270
    {
271 43
        if ($this->getEventDispatcher() instanceof ContractsEventDispatcherInterface) {
272 19
            $this->getEventDispatcher()->dispatch(
273 19
                $event,
274 19
                $eventName
0 ignored issues
show
Unused Code introduced by
The call to Symfony\Contracts\EventD...erInterface::dispatch() has too many arguments starting with $eventName. ( Ignorable by Annotation )

If this is a false-positive, you can also ignore this issue in your code via the ignore-call  annotation

274
            $this->getEventDispatcher()->/** @scrutinizer ignore-call */ dispatch(

This check compares calls to functions or methods with their respective definitions. If the call has more arguments than are defined, it raises an issue.

If a function is defined several times with a different number of parameters, the check may pick up the wrong definition and report false positives. One codebase where this has been known to happen is Wordpress. Please note the @ignore annotation hint above.

Loading history...
275
            );
276
        }
277 43
    }
278
279
    /**
280
     * @return EventDispatcherInterface|null
281
     */
282 42
    public function getEventDispatcher()
283
    {
284 42
        return $this->eventDispatcher;
285
    }
286
}
287