Completed
Push — master ( 613d98...fa04c9 )
by Rémi
20:40
created

PhpAmqpLibDriver   A

Complexity

Total Complexity 29

Size/Duplication

Total Lines 291
Duplicated Lines 6.53 %

Coupling/Cohesion

Components 1
Dependencies 7

Importance

Changes 0
Metric Value
wmc 29
lcom 1
cbo 7
dl 19
loc 291
rs 10
c 0
b 0
f 0

17 Methods

Rating   Name   Duplication   Size   Complexity  
A __construct() 0 4 1
A declareSimpleQueue() 0 9 1
A declareExchange() 0 6 1
A bindQueue() 0 4 1
A declareAndBindQueue() 0 5 1
A deleteQueue() 0 4 1
A deleteExchange() 0 4 1
A publish() 0 8 1
C consume() 0 43 7
A ack() 0 4 1
A nack() 0 4 1
A close() 0 7 1
A getChannel() 0 8 2
A getMessageProperties() 19 19 3
A getHeaders() 0 5 2
A getCorrelationId() 0 5 2
A getReplyTo() 0 5 2

How to fix   Duplicated Code   

Duplicated Code

Duplicate code is one of the most pungent code smells. A rule that is often used is to re-structure code once it is duplicated in three or more places.

Common duplication problems, and corresponding solutions are:

1
<?php
2
3
namespace Burrow\Driver;
4
5
use Burrow\Driver;
6
use Burrow\Exception\ConsumerException;
7
use Burrow\Exception\TimeoutException;
8
use Burrow\Message;
9
use Burrow\QueueHandler;
10
use PhpAmqpLib\Channel\AMQPChannel;
11
use PhpAmqpLib\Connection\AbstractConnection;
12
use PhpAmqpLib\Exception\AMQPExceptionInterface;
13
use PhpAmqpLib\Exception\AMQPTimeoutException;
14
use PhpAmqpLib\Message\AMQPMessage;
15
use PhpAmqpLib\Wire\AMQPTable;
16
17
/**
18
 * @codeCoverageIgnore
19
 */
20
class PhpAmqpLibDriver implements Driver
21
{
22
    const DELIVERY_MODE = 'delivery_mode';
23
    const CONTENT_TYPE = 'content_type';
24
    const APPLICATION_HEADERS = 'application_headers';
25
    const CORRELATION_ID = 'correlation_id';
26
    const REPLY_TO = 'reply_to';
27
28
    /** @var AbstractConnection */
29
    private $connection;
30
31
    /** @var AMQPChannel */
32
    private $channel;
33
34
    /** @var bool */
35
    private $stop = false;
36
37
    /**
38
     * PhpAmqpLibDriver constructor.
39
     *
40
     * @param AbstractConnection $connection
41
     */
42
    public function __construct(AbstractConnection $connection)
43
    {
44
        $this->connection = $connection;
45
    }
46
47
    /**
48
     * Declare a persistent queue
49
     *
50
     * @param string $queueName
51
     * @param string $type
52
     *
53
     * @return string
54
     */
55
    public function declareSimpleQueue($queueName = '', $type = self::QUEUE_DURABLE)
56
    {
57
        $durable = ($type === self::QUEUE_DURABLE);
58
        $exclusive = ($type === self::QUEUE_EXCLUSIVE);
59
60
        list($name, , ) = $this->getChannel()->queue_declare($queueName, false, $durable, $exclusive, false);
61
62
        return $name;
63
    }
64
65
    /**
66
     * Declare an exchange
67
     *
68
     * @param  string $exchangeName
69
     * @param  string $type
70
     *
71
     * @return string
72
     */
73
    public function declareExchange($exchangeName = '', $type = self::EXCHANGE_TYPE_FANOUT)
74
    {
75
        list($name, , ) = $this->getChannel()->exchange_declare($exchangeName, $type, false, true, false);
76
77
        return $name;
78
    }
79
80
    /**
81
     * Bind an existing queue to an exchange
82
     *
83
     * @param  string $exchange
84
     * @param  string $queueName
85
     * @param  string $routingKey
86
     * @return void
87
     */
88
    public function bindQueue($exchange, $queueName, $routingKey = '')
89
    {
90
        $this->getChannel()->queue_bind($queueName, $exchange, $routingKey);
91
    }
92
93
    /**
94
     * Create a persisting queue and bind it to an exchange
95
     *
96
     * @param  string $exchange
97
     * @param  string $queueName
98
     * @param  string $routingKey
99
     * @return void
100
     */
101
    public function declareAndBindQueue($exchange, $queueName, $routingKey = '')
102
    {
103
        $this->declareSimpleQueue($queueName);
104
        $this->bindQueue($exchange, $queueName, $routingKey);
105
    }
106
107
    /**
108
     * Delete a queue
109
     *
110
     * @param string $queueName
111
     * @return void
112
     */
113
    public function deleteQueue($queueName)
114
    {
115
        $this->getChannel()->queue_delete($queueName);
116
    }
117
118
    /**
119
     * Delete an exchange
120
     *
121
     * @param string $exchangeName
122
     * @return void
123
     */
124
    public function deleteExchange($exchangeName)
125
    {
126
        $this->getChannel()->exchange_delete($exchangeName);
127
    }
128
129
    /**
130
     * Publish a message in the exchange
131
     *
132
     * @param string  $exchangeName
133
     * @param Message $message
134
     *
135
     * @return void
136
     */
137
    public function publish($exchangeName, Message $message)
138
    {
139
        $this->getChannel()->basic_publish(
140
            new AMQPMessage($message->getBody(), self::getMessageProperties($message)),
141
            $exchangeName,
142
            $message->getRoutingKey()
143
        );
144
    }
145
146
    /**
147
     * Consume the queue
148
     *
149
     * @param string   $queueName
150
     * @param callable $callback
151
     * @param int      $timeout
152
     * @param bool     $autoAck
153
     *
154
     * @return void
155
     * @throws \Exception
156
     */
157
    public function consume($queueName, callable $callback, $timeout = 0, $autoAck = true)
158
    {
159
        $this->stop = false;
160
161
        $this->getChannel()->basic_qos(null, 1, null);
0 ignored issues
show
Documentation introduced by
null is of type null, but the function expects a boolean.

It seems like the type of the argument is not accepted by the function/method which you are calling.

In some cases, in particular if PHP’s automatic type-juggling kicks in this might be fine. In other cases, however this might be a bug.

We suggest to add an explicit type cast like in the following example:

function acceptsInteger($int) { }

$x = '123'; // string "123"

// Instead of
acceptsInteger($x);

// we recommend to use
acceptsInteger((integer) $x);
Loading history...
162
        $this->getChannel()->basic_consume(
163
            $queueName,
164
            '',
165
            false,
166
            $autoAck,
167
            false,
168
            false,
169
            function (AMQPMessage $message) use ($callback, $queueName) {
170
                $burrowMessage = new Message(
171
                    $message->getBody(),
172
                    '', // Impossible to retrieve here
173
                    self::getHeaders($message),
174
                    self::getCorrelationId($message),
175
                    self::getReplyTo($message)
176
                );
177
                $burrowMessage->setDeliveryTag($message->delivery_info['delivery_tag']);
0 ignored issues
show
Documentation introduced by
$message->delivery_info['delivery_tag'] is of type object<PhpAmqpLib\Channel\AMQPChannel>, but the function expects a string.

It seems like the type of the argument is not accepted by the function/method which you are calling.

In some cases, in particular if PHP’s automatic type-juggling kicks in this might be fine. In other cases, however this might be a bug.

We suggest to add an explicit type cast like in the following example:

function acceptsInteger($int) { }

$x = '123'; // string "123"

// Instead of
acceptsInteger($x);

// we recommend to use
acceptsInteger((integer) $x);
Loading history...
178
                $burrowMessage->setQueue($queueName);
179
180
                $success = $callback($burrowMessage);
181
                if ($success === QueueHandler::STOP_CONSUMING) {
182
                    $this->stop = true;
183
                }
184
            }
185
        );
186
187
        while (count($this->getChannel()->callbacks) && !$this->stop) {
188
            try {
189
                $this->getChannel()->wait(null, false, $timeout);
190
            } catch (AMQPTimeoutException $e) {
191
                throw TimeoutException::build($e, $timeout);
192
            } catch (\Exception $e) {
193
                if ($e instanceof AMQPExceptionInterface) {
194
                    throw ConsumerException::build($e);
195
                }
196
                throw $e;
197
            }
198
        }
199
    }
200
201
    /**
202
     * Acknowledge the reception of the message
203
     *
204
     * @param Message $message
205
     *
206
     * @return void
207
     */
208
    public function ack(Message $message)
209
    {
210
        $this->getChannel()->basic_ack($message->getDeliveryTag());
211
    }
212
213
    /**
214
     * Aknowledge an error during the consumption of the message
215
     *
216
     * @param Message $message
217
     * @param bool   $requeue
218
     *
219
     * @return void
220
     */
221
    public function nack(Message $message, $requeue = true)
222
    {
223
        $this->getChannel()->basic_reject($message->getDeliveryTag(), $requeue);
224
    }
225
226
    /**
227
     * Close the connection
228
     *
229
     * @return void
230
     */
231
    public function close()
232
    {
233
        $this->stop = true;
234
235
        $this->getChannel()->close();
236
        $this->connection->close();
237
    }
238
239
    /**
240
     * @return AMQPChannel
241
     */
242
    private function getChannel()
243
    {
244
        if (null === $this->channel) {
245
            $this->channel = $this->connection->channel();
246
        }
247
248
        return $this->channel;
249
    }
250
251
    /**
252
     * Returns the message parameters
253
     *
254
     * @param Message $message
255
     *
256
     * @return array
257
     */
258 View Code Duplication
    private static function getMessageProperties(Message $message)
0 ignored issues
show
Duplication introduced by
This method seems to be duplicated in your project.

Duplicated code is one of the most pungent code smells. If you need to duplicate the same code in three or more different places, we strongly encourage you to look into extracting the code into a single class or operation.

You can also find more detailed suggestions in the “Code” section of your repository.

Loading history...
259
    {
260
        $properties = [
261
            self::DELIVERY_MODE       => 2,
262
            self::CONTENT_TYPE        => 'text/plain',
263
            self::APPLICATION_HEADERS => new AMQPTable($message->getHeaders())
264
        ];
265
266
        if ($message->getCorrelationId() !== null) {
267
            $properties[self::CORRELATION_ID] = $message->getCorrelationId();
268
269
        }
270
271
        if ($message->getReplyTo() !== null) {
272
            $properties[self::REPLY_TO] = $message->getReplyTo();
273
        }
274
275
        return $properties;
276
    }
277
278
    /**
279
     * @param AMQPMessage $message
280
     *
281
     * @return array
282
     */
283
    private static function getHeaders(AMQPMessage $message)
284
    {
285
        return $message->has(self::APPLICATION_HEADERS) ?
286
            $message->get(self::APPLICATION_HEADERS)->getNativeData() : [];
287
    }
288
289
    /**
290
     * @param AMQPMessage $message
291
     *
292
     * @return string
293
     */
294
    private static function getCorrelationId(AMQPMessage $message)
295
    {
296
        return $message->has(self::CORRELATION_ID) ?
297
            $message->get(self::CORRELATION_ID) : '';
298
    }
299
300
    /**
301
     * @param AMQPMessage $message
302
     *
303
     * @return string
304
     */
305
    private static function getReplyTo(AMQPMessage $message)
306
    {
307
        return $message->has(self::REPLY_TO) ?
308
            $message->get(self::REPLY_TO) : '';
309
    }
310
}
311