Completed
Push — master ( 063d73...cb743b )
by Pol-Valentin
03:09
created

PhpAmqpLibDriver::checkIfWeShouldRetry()   A

Complexity

Conditions 6
Paths 2

Size

Total Lines 11

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
dl 0
loc 11
rs 9.2222
c 0
b 0
f 0
cc 6
nc 2
nop 1
1
<?php
2
3
namespace Burrow\Driver;
4
5
use Assert\AssertionFailedException;
6
use Burrow\Driver;
7
use Burrow\Exception\ConsumerException;
8
use Burrow\Exception\TimeoutException;
9
use Burrow\Message;
10
use Burrow\QueueHandler;
11
use Exception;
12
use PhpAmqpLib\Channel\AMQPChannel;
13
use PhpAmqpLib\Connection\AbstractConnection;
14
use PhpAmqpLib\Exception\AMQPConnectionClosedException;
15
use PhpAmqpLib\Exception\AMQPExceptionInterface;
16
use PhpAmqpLib\Exception\AMQPIOException;
17
use PhpAmqpLib\Exception\AMQPSocketException;
18
use PhpAmqpLib\Exception\AMQPTimeoutException;
19
use PhpAmqpLib\Message\AMQPMessage;
20
use PhpAmqpLib\Wire\AMQPTable;
21
22
/**
23
 * @codeCoverageIgnore
24
 */
25
class PhpAmqpLibDriver implements Driver
26
{
27
    const DELIVERY_MODE = 'delivery_mode';
28
    const CONTENT_TYPE = 'content_type';
29
    const APPLICATION_HEADERS = 'application_headers';
30
    const CORRELATION_ID = 'correlation_id';
31
    const REPLY_TO = 'reply_to';
32
33
    /** @var AbstractConnection */
34
    private $connection;
35
36
    /** @var AMQPChannel */
37
    private $channel;
38
39
    /** @var bool */
40
    private $stop = false;
41
    /** @var bool */
42
    private $retryPublish = false;
43
44
    /**
45
     * PhpAmqpLibDriver constructor.
46
     *
47
     * @param AbstractConnection $connection
48
     */
49
    public function __construct(AbstractConnection $connection)
50
    {
51
        $this->connection = $connection;
52
    }
53
54
    /**
55
     * Declare a persistent queue
56
     *
57
     * @param string $queueName
58
     * @param string $type
59
     *
60
     * @return string
61
     */
62
    public function declareSimpleQueue($queueName = '', $type = self::QUEUE_DURABLE)
63
    {
64
        $durable = ($type === self::QUEUE_DURABLE);
65
        $exclusive = ($type === self::QUEUE_EXCLUSIVE);
66
67
        list($name, ,) = $this->getChannel()->queue_declare($queueName, false, $durable, $exclusive, false);
68
69
        return $name;
70
    }
71
72
    /**
73
     * Declare an exchange
74
     *
75
     * @param string $exchangeName
76
     * @param string $type
77
     *
78
     * @return string
79
     */
80
    public function declareExchange($exchangeName = '', $type = self::EXCHANGE_TYPE_FANOUT)
81
    {
82
        list($name, ,) = $this->getChannel()->exchange_declare($exchangeName, $type, false, true, false);
83
84
        return $name;
85
    }
86
87
    /**
88
     * Bind an existing queue to an exchange
89
     *
90
     * @param string $exchange
91
     * @param string $queueName
92
     * @param string $routingKey
93
     * @return void
94
     */
95
    public function bindQueue($exchange, $queueName, $routingKey = '')
96
    {
97
        $this->getChannel()->queue_bind($queueName, $exchange, $routingKey);
98
    }
99
100
    /**
101
     * Create a persisting queue and bind it to an exchange
102
     *
103
     * @param string $exchange
104
     * @param string $queueName
105
     * @param string $routingKey
106
     * @return void
107
     */
108
    public function declareAndBindQueue($exchange, $queueName, $routingKey = '')
109
    {
110
        $this->declareSimpleQueue($queueName);
111
        $this->bindQueue($exchange, $queueName, $routingKey);
112
    }
113
114
    /**
115
     * Delete a queue
116
     *
117
     * @param string $queueName
118
     * @return void
119
     */
120
    public function deleteQueue($queueName)
121
    {
122
        $this->getChannel()->queue_delete($queueName);
123
    }
124
125
    /**
126
     * Delete an exchange
127
     *
128
     * @param string $exchangeName
129
     * @return void
130
     */
131
    public function deleteExchange($exchangeName)
132
    {
133
        $this->getChannel()->exchange_delete($exchangeName);
134
    }
135
136
    /**
137
     * Publish a message in the exchange
138
     *
139
     * @param string $exchangeName
140
     * @param Message $message
141
     *
142
     * @return void
143
     */
144
    public function publish($exchangeName, Message $message)
145
    {
146
        try {
147
            $this->getChannel()->basic_publish(
148
                new AMQPMessage($message->getBody(), self::getMessageProperties($message)),
149
                $exchangeName,
150
                $message->getRoutingKey()
151
            );
152
        } catch (Exception $exception) {
153
            $this->checkIfWeShouldRetry($exception);
154
            $this->retryPublish($exchangeName, $message);
155
        }
156
    }
157
158
    /**
159
     * Consume the queue
160
     *
161
     * @param string $queueName
162
     * @param callable $callback
163
     * @param int $timeout
164
     * @param bool $autoAck
165
     *
166
     * @return void
167
     * @throws AssertionFailedException
168
     * @throws \OutOfBoundsException
169
     * @throws \InvalidArgumentException
170
     * @throws Exception
171
     */
172
    public function consume($queueName, callable $callback, $timeout = 0, $autoAck = true)
173
    {
174
        $this->stop = false;
175
176
        $this->getChannel()->basic_qos(null, 1, null);
0 ignored issues
show
Documentation introduced by
null is of type null, but the function expects a boolean.

It seems like the type of the argument is not accepted by the function/method which you are calling.

In some cases, in particular if PHP’s automatic type-juggling kicks in this might be fine. In other cases, however this might be a bug.

We suggest to add an explicit type cast like in the following example:

function acceptsInteger($int) { }

$x = '123'; // string "123"

// Instead of
acceptsInteger($x);

// we recommend to use
acceptsInteger((integer) $x);
Loading history...
177
        $this->getChannel()->basic_consume(
178
            $queueName,
179
            '',
180
            false,
181
            $autoAck,
182
            false,
183
            false,
184
            function (AMQPMessage $message) use ($callback, $queueName) {
185
                $burrowMessage = new Message(
186
                    $message->getBody(),
187
                    '', // Impossible to retrieve here
188
                    self::getHeaders($message),
189
                    self::getCorrelationId($message),
190
                    self::getReplyTo($message)
191
                );
192
                $burrowMessage->setDeliveryTag($message->delivery_info['delivery_tag']);
193
                $burrowMessage->setQueue($queueName);
194
195
                $success = $callback($burrowMessage);
196
                if ($success === QueueHandler::STOP_CONSUMING) {
197
                    $this->stop = true;
198
                }
199
            }
200
        );
201
202
        $this->wait($timeout);
203
    }
204
205
    /**
206
     * Acknowledge the reception of the message
207
     *
208
     * @param Message $message
209
     *
210
     * @return void
211
     */
212
    public function ack(Message $message)
213
    {
214
        $this->getChannel()->basic_ack($message->getDeliveryTag());
215
    }
216
217
    /**
218
     * Aknowledge an error during the consumption of the message
219
     *
220
     * @param Message $message
221
     * @param bool $requeue
222
     *
223
     * @return void
224
     */
225
    public function nack(Message $message, $requeue = true)
226
    {
227
        $this->getChannel()->basic_reject($message->getDeliveryTag(), $requeue);
228
    }
229
230
    /**
231
     * Close the connection
232
     *
233
     * @return void
234
     */
235
    public function close()
236
    {
237
        $this->stop = true;
238
239
        $this->getChannel()->close();
240
        $this->connection->close();
241
    }
242
243
    /**
244
     * @return AMQPChannel
245
     */
246
    private function getChannel()
247
    {
248
        if (null === $this->channel) {
249
            $this->channel = $this->connection->channel();
250
        }
251
252
        return $this->channel;
253
    }
254
255
    /**
256
     * @param $timeout
257
     *
258
     * @throws Exception
259
     */
260
    private function wait($timeout)
261
    {
262
        while (count($this->getChannel()->callbacks) && !$this->stop) {
263
            try {
264
                $this->getChannel()->wait(null, false, $timeout);
265
            } catch (AMQPTimeoutException $e) {
266
                throw TimeoutException::build($e, $timeout);
267
            } catch (Exception $e) {
268
                if ($e instanceof AMQPExceptionInterface) {
269
                    throw ConsumerException::build($e);
270
                }
271
                throw $e;
272
            }
273
        }
274
    }
275
276
    /**
277
     * Returns the message parameters
278
     *
279
     * @param Message $message
280
     *
281
     * @return array
282
     */
283
    private static function getMessageProperties(Message $message)
284
    {
285
        $properties = [
286
            self::DELIVERY_MODE => 2,
287
            self::CONTENT_TYPE => 'text/plain',
288
            self::APPLICATION_HEADERS => new AMQPTable($message->getHeaders()),
289
        ];
290
291
        if ($message->getCorrelationId() !== null) {
292
            $properties[self::CORRELATION_ID] = $message->getCorrelationId();
293
        }
294
295
        if ($message->getReplyTo() !== null) {
296
            $properties[self::REPLY_TO] = $message->getReplyTo();
297
        }
298
299
        return $properties;
300
    }
301
302
    /**
303
     * @param AMQPMessage $message
304
     *
305
     * @return array
306
     *
307
     * @throws \OutOfBoundsException
308
     */
309
    private static function getHeaders(AMQPMessage $message)
310
    {
311
        return $message->has(self::APPLICATION_HEADERS) ?
312
            $message->get(self::APPLICATION_HEADERS)->getNativeData() : [];
313
    }
314
315
    /**
316
     * @param AMQPMessage $message
317
     *
318
     * @return string
319
     *
320
     * @throws \OutOfBoundsException
321
     */
322
    private static function getCorrelationId(AMQPMessage $message)
323
    {
324
        return $message->has(self::CORRELATION_ID) ?
325
            $message->get(self::CORRELATION_ID) : '';
326
    }
327
328
    /**
329
     * @param AMQPMessage $message
330
     *
331
     * @return string
332
     *
333
     * @throws \OutOfBoundsException
334
     */
335
    private static function getReplyTo(AMQPMessage $message)
336
    {
337
        return $message->has(self::REPLY_TO) ?
338
            $message->get(self::REPLY_TO) : '';
339
    }
340
341
    private function retryPublish($exchangeName, Message $message)
342
    {
343
        $this->retryPublish = true;
344
        $this->connection->reconnect();
345
        $this->channel = null;
346
        $this->publish($exchangeName, $message);
347
        $this->retryPublish = false;
348
    }
349
350
    private function checkIfWeShouldRetry(Exception $exception)
351
    {
352
        if (!($exception instanceof AMQPConnectionClosedException
353
                || $exception instanceof AMQPTimeoutException
354
                || $exception instanceof AMQPIOException
355
                || $exception instanceof AMQPSocketException)
356
            || ($this->retryPublish !== false)
357
        ) {
358
            throw $exception;
359
        }
360
    }
361
}
362