Completed
Push — master ( a542f0...920293 )
by Rémi
04:39
created

PhpAmqpLibDriver::consume()   B

Complexity

Conditions 2
Paths 1

Size

Total Lines 32
Code Lines 23

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
dl 0
loc 32
rs 8.8571
c 0
b 0
f 0
cc 2
eloc 23
nc 1
nop 4
1
<?php
2
3
namespace Burrow\Driver;
4
5
use Burrow\Driver;
6
use Burrow\Exception\ConsumerException;
7
use Burrow\Exception\TimeoutException;
8
use Burrow\Message;
9
use Burrow\QueueHandler;
10
use PhpAmqpLib\Channel\AMQPChannel;
11
use PhpAmqpLib\Connection\AbstractConnection;
12
use PhpAmqpLib\Exception\AMQPExceptionInterface;
13
use PhpAmqpLib\Exception\AMQPTimeoutException;
14
use PhpAmqpLib\Message\AMQPMessage;
15
use PhpAmqpLib\Wire\AMQPTable;
16
17
/**
18
 * @codeCoverageIgnore
19
 */
20
class PhpAmqpLibDriver implements Driver
21
{
22
    const DELIVERY_MODE = 'delivery_mode';
23
    const CONTENT_TYPE = 'content_type';
24
    const APPLICATION_HEADERS = 'application_headers';
25
    const CORRELATION_ID = 'correlation_id';
26
    const REPLY_TO = 'reply_to';
27
28
    /** @var AbstractConnection */
29
    private $connection;
30
31
    /** @var AMQPChannel */
32
    private $channel;
33
34
    /** @var bool */
35
    private $stop = false;
36
37
    /**
38
     * PhpAmqpLibDriver constructor.
39
     *
40
     * @param AbstractConnection $connection
41
     */
42
    public function __construct(AbstractConnection $connection)
43
    {
44
        $this->connection = $connection;
45
    }
46
47
    /**
48
     * Declare a persistent queue
49
     *
50
     * @param string $queueName
51
     * @param string $type
52
     *
53
     * @return string
54
     */
55
    public function declareSimpleQueue($queueName = '', $type = self::QUEUE_DURABLE)
56
    {
57
        $durable = ($type === self::QUEUE_DURABLE);
58
        $exclusive = ($type === self::QUEUE_EXCLUSIVE);
59
60
        list($name, , ) = $this->getChannel()->queue_declare($queueName, false, $durable, $exclusive, false);
61
62
        return $name;
63
    }
64
65
    /**
66
     * Declare an exchange
67
     *
68
     * @param  string $exchangeName
69
     * @param  string $type
70
     *
71
     * @return string
72
     */
73
    public function declareExchange($exchangeName = '', $type = self::EXCHANGE_TYPE_FANOUT)
74
    {
75
        list($name, , ) = $this->getChannel()->exchange_declare($exchangeName, $type, false, true, false);
76
77
        return $name;
78
    }
79
80
    /**
81
     * Bind an existing queue to an exchange
82
     *
83
     * @param  string $exchange
84
     * @param  string $queueName
85
     * @param  string $routingKey
86
     * @return void
87
     */
88
    public function bindQueue($exchange, $queueName, $routingKey = '')
89
    {
90
        $this->getChannel()->queue_bind($queueName, $exchange, $routingKey);
91
    }
92
93
    /**
94
     * Create a persisting queue and bind it to an exchange
95
     *
96
     * @param  string $exchange
97
     * @param  string $queueName
98
     * @param  string $routingKey
99
     * @return void
100
     */
101
    public function declareAndBindQueue($exchange, $queueName, $routingKey = '')
102
    {
103
        $this->declareSimpleQueue($queueName);
104
        $this->bindQueue($exchange, $queueName, $routingKey);
105
    }
106
107
    /**
108
     * Delete a queue
109
     *
110
     * @param string $queueName
111
     * @return void
112
     */
113
    public function deleteQueue($queueName)
114
    {
115
        $this->getChannel()->queue_delete($queueName);
116
    }
117
118
    /**
119
     * Delete an exchange
120
     *
121
     * @param string $exchangeName
122
     * @return void
123
     */
124
    public function deleteExchange($exchangeName)
125
    {
126
        $this->getChannel()->exchange_delete($exchangeName);
127
    }
128
129
    /**
130
     * Publish a message in the exchange
131
     *
132
     * @param string  $exchangeName
133
     * @param Message $message
134
     *
135
     * @return void
136
     */
137
    public function publish($exchangeName, Message $message)
138
    {
139
        $this->getChannel()->basic_publish(
140
            new AMQPMessage($message->getBody(), self::getMessageProperties($message)),
141
            $exchangeName,
142
            $message->getRoutingKey()
143
        );
144
    }
145
146
    /**
147
     * Consume the queue
148
     *
149
     * @param string   $queueName
150
     * @param callable $callback
151
     * @param int      $timeout
152
     * @param bool     $autoAck
153
     *
154
     * @return void
155
     * @throws \Exception
156
     */
157
    public function consume($queueName, callable $callback, $timeout = 0, $autoAck = true)
158
    {
159
        $this->stop = false;
160
161
        $this->getChannel()->basic_qos(null, 1, null);
0 ignored issues
show
Documentation introduced by
null is of type null, but the function expects a boolean.

It seems like the type of the argument is not accepted by the function/method which you are calling.

In some cases, in particular if PHP’s automatic type-juggling kicks in this might be fine. In other cases, however this might be a bug.

We suggest to add an explicit type cast like in the following example:

function acceptsInteger($int) { }

$x = '123'; // string "123"

// Instead of
acceptsInteger($x);

// we recommend to use
acceptsInteger((integer) $x);
Loading history...
162
        $this->getChannel()->basic_consume(
163
            $queueName,
164
            '',
165
            false,
166
            $autoAck,
167
            false,
168
            false,
169
            function (AMQPMessage $message) use ($callback, $queueName) {
170
                $burrowMessage = new Message(
171
                    $message->getBody(),
172
                    '', // Impossible to retrieve here
173
                    self::getHeaders($message),
174
                    self::getCorrelationId($message),
175
                    self::getReplyTo($message)
176
                );
177
                $burrowMessage->setDeliveryTag($message->delivery_info['delivery_tag']);
0 ignored issues
show
Documentation introduced by
$message->delivery_info['delivery_tag'] is of type object<PhpAmqpLib\Channel\AMQPChannel>, but the function expects a string.

It seems like the type of the argument is not accepted by the function/method which you are calling.

In some cases, in particular if PHP’s automatic type-juggling kicks in this might be fine. In other cases, however this might be a bug.

We suggest to add an explicit type cast like in the following example:

function acceptsInteger($int) { }

$x = '123'; // string "123"

// Instead of
acceptsInteger($x);

// we recommend to use
acceptsInteger((integer) $x);
Loading history...
178
                $burrowMessage->setQueue($queueName);
179
180
                $success = $callback($burrowMessage);
181
                if ($success === QueueHandler::STOP_CONSUMING) {
182
                    $this->stop = true;
183
                }
184
            }
185
        );
186
187
        $this->wait($timeout);
188
    }
189
190
    /**
191
     * Acknowledge the reception of the message
192
     *
193
     * @param Message $message
194
     *
195
     * @return void
196
     */
197
    public function ack(Message $message)
198
    {
199
        $this->getChannel()->basic_ack($message->getDeliveryTag());
200
    }
201
202
    /**
203
     * Aknowledge an error during the consumption of the message
204
     *
205
     * @param Message $message
206
     * @param bool   $requeue
207
     *
208
     * @return void
209
     */
210
    public function nack(Message $message, $requeue = true)
211
    {
212
        $this->getChannel()->basic_reject($message->getDeliveryTag(), $requeue);
213
    }
214
215
    /**
216
     * Close the connection
217
     *
218
     * @return void
219
     */
220
    public function close()
221
    {
222
        $this->stop = true;
223
224
        $this->getChannel()->close();
225
        $this->connection->close();
226
    }
227
228
    /**
229
     * @return AMQPChannel
230
     */
231
    private function getChannel()
232
    {
233
        if (null === $this->channel) {
234
            $this->channel = $this->connection->channel();
235
        }
236
237
        return $this->channel;
238
    }
239
240
    /**
241
     * @param $timeout
242
     *
243
     * @throws \Exception
244
     */
245
    private function wait($timeout)
246
    {
247
        while (count($this->getChannel()->callbacks) && !$this->stop) {
248
            try {
249
                $this->getChannel()->wait(null, false, $timeout);
250
            } catch (AMQPTimeoutException $e) {
251
                throw TimeoutException::build($e, $timeout);
252
            } catch (\Exception $e) {
253
                if ($e instanceof AMQPExceptionInterface) {
254
                    throw ConsumerException::build($e);
255
                }
256
                throw $e;
257
            }
258
        }
259
    }
260
261
    /**
262
     * Returns the message parameters
263
     *
264
     * @param Message $message
265
     *
266
     * @return array
267
     */
268 View Code Duplication
    private static function getMessageProperties(Message $message)
0 ignored issues
show
Duplication introduced by
This method seems to be duplicated in your project.

Duplicated code is one of the most pungent code smells. If you need to duplicate the same code in three or more different places, we strongly encourage you to look into extracting the code into a single class or operation.

You can also find more detailed suggestions in the “Code” section of your repository.

Loading history...
269
    {
270
        $properties = [
271
            self::DELIVERY_MODE       => 2,
272
            self::CONTENT_TYPE        => 'text/plain',
273
            self::APPLICATION_HEADERS => new AMQPTable($message->getHeaders())
274
        ];
275
276
        if ($message->getCorrelationId() !== null) {
277
            $properties[self::CORRELATION_ID] = $message->getCorrelationId();
278
279
        }
280
281
        if ($message->getReplyTo() !== null) {
282
            $properties[self::REPLY_TO] = $message->getReplyTo();
283
        }
284
285
        return $properties;
286
    }
287
288
    /**
289
     * @param AMQPMessage $message
290
     *
291
     * @return array
292
     */
293
    private static function getHeaders(AMQPMessage $message)
294
    {
295
        return $message->has(self::APPLICATION_HEADERS) ?
296
            $message->get(self::APPLICATION_HEADERS)->getNativeData() : [];
297
    }
298
299
    /**
300
     * @param AMQPMessage $message
301
     *
302
     * @return string
303
     */
304
    private static function getCorrelationId(AMQPMessage $message)
305
    {
306
        return $message->has(self::CORRELATION_ID) ?
307
            $message->get(self::CORRELATION_ID) : '';
308
    }
309
310
    /**
311
     * @param AMQPMessage $message
312
     *
313
     * @return string
314
     */
315
    private static function getReplyTo(AMQPMessage $message)
316
    {
317
        return $message->has(self::REPLY_TO) ?
318
            $message->get(self::REPLY_TO) : '';
319
    }
320
}
321