Completed
Push — master ( 98b476...faa49d )
by Rémi
18s queued 11s
created

PeclAmqpDriver::generateConsumerTag()   A

Complexity

Conditions 1
Paths 1

Size

Total Lines 7

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
dl 0
loc 7
rs 10
c 0
b 0
f 0
cc 1
nc 1
nop 3
1
<?php
2
3
namespace Burrow\Driver;
4
5
use Assert\AssertionFailedException;
6
use Burrow\Driver;
7
use Burrow\Exception\ConsumerException;
8
use Burrow\Exception\TimeoutException;
9
use Burrow\Message;
10
11
/**
12
 * @codeCoverageIgnore
13
 */
14
class PeclAmqpDriver implements Driver
15
{
16
    const DELIVERY_MODE = 'delivery_mode';
17
    const CONTENT_TYPE = 'content_type';
18
    const APPLICATION_HEADERS = 'headers';
19
    const CORRELATION_ID = 'correlation_id';
20
    const REPLY_TO = 'reply_to';
21
22
    /** @var \AMQPConnection */
23
    private $connection;
24
25
    /** @var \AMQPChannel */
26
    private $channel;
27
28
    /**
29
     * PeclAmqpDriver constructor.
30
     *
31
     * @param \AMQPConnection $connection
32
     */
33
    public function __construct(\AMQPConnection $connection)
34
    {
35
        $this->connection = $connection;
36
    }
37
38
    /**
39
     * Declare a persistent queue
40
     *
41
     * @param string $queueName
42
     * @param string $type
43
     *
44
     * @return string
45
     *
46
     * @throws \AMQPConnectionException
47
     * @throws \AMQPChannelException
48
     * @throws \AMQPQueueException
49
     */
50
    public function declareSimpleQueue($queueName = '', $type = self::QUEUE_DURABLE)
51
    {
52
        $flag = AMQP_DURABLE;
53
        if ($type === self::QUEUE_EXCLUSIVE) {
54
            $flag = AMQP_EXCLUSIVE;
55
        }
56
57
        $queue = $this->getQueue($queueName);
58
        $queue->setFlags($flag);
59
        $queue->declareQueue();
60
61
        return $queue->getName();
62
    }
63
64
    /**
65
     * Declare an exchange
66
     *
67
     * @param string $exchangeName
68
     * @param string $type
69
     *
70
     * @return string
71
     *
72
     * @throws \AMQPConnectionException
73
     * @throws \AMQPChannelException
74
     * @throws \AMQPExchangeException
75
     */
76
    public function declareExchange($exchangeName = '', $type = self::EXCHANGE_TYPE_FANOUT)
77
    {
78
        $exchange = $this->getExchange($exchangeName);
79
        $exchange->setType($type);
80
        $exchange->setFlags(AMQP_DURABLE);
81
        $exchange->declareExchange();
82
83
        return $exchange->getName();
84
    }
85
86
    /**
87
     * Bind an existing queue to an exchange
88
     *
89
     * @param string $exchange
90
     * @param string $queueName
91
     * @param string $routingKey
92
     *
93
     * @return void
94
     *
95
     * @throws \AMQPConnectionException
96
     * @throws \AMQPChannelException
97
     * @throws \AMQPQueueException
98
     */
99
    public function bindQueue($exchange, $queueName, $routingKey = '')
100
    {
101
        $queue = $this->getQueue($queueName);
102
        $queue->bind($exchange, $routingKey);
103
    }
104
105
    /**
106
     * Create a persisting queue and bind it to an exchange
107
     *
108
     * @param string $exchange
109
     * @param string $queueName
110
     * @param string $routingKey
111
     *
112
     * @return void
113
     *
114
     * @throws \AMQPConnectionException
115
     * @throws \AMQPChannelException
116
     * @throws \AMQPQueueException
117
     */
118
    public function declareAndBindQueue($exchange, $queueName, $routingKey = '')
119
    {
120
        $this->declareSimpleQueue($queueName);
121
        $this->bindQueue($exchange, $queueName, $routingKey);
122
    }
123
124
    /**
125
     * Delete a queue
126
     *
127
     * @param string $queueName
128
     *
129
     * @return void
130
     *
131
     * @throws \AMQPConnectionException
132
     * @throws \AMQPChannelException
133
     * @throws \AMQPQueueException
134
     */
135
    public function deleteQueue($queueName)
136
    {
137
        $this->getQueue($queueName)->delete();
138
    }
139
140
    /**
141
     * Delete an exchange
142
     *
143
     * @param string $exchangeName
144
     *
145
     * @return void
146
     *
147
     * @throws \AMQPConnectionException
148
     * @throws \AMQPChannelException
149
     * @throws \AMQPExchangeException
150
     */
151
    public function deleteExchange($exchangeName)
152
    {
153
        $this->getExchange($exchangeName)->delete();
154
    }
155
156
    /**
157
     * Publish a message in the exchange
158
     *
159
     * @param string $exchangeName
160
     * @param Message $message
161
     *
162
     * @return void
163
     *
164
     * @throws \AMQPConnectionException
165
     * @throws \AMQPChannelException
166
     * @throws \AMQPExchangeException
167
     */
168
    public function publish($exchangeName, Message $message)
169
    {
170
        $exchange = $this->getExchange($exchangeName);
171
        $exchange->publish(
172
            $message->getBody(),
173
            $message->getRoutingKey(),
174
            AMQP_NOPARAM,
175
            self::getMessageProperties($message)
176
        );
177
    }
178
179
    /**
180
     * Consume the queue
181
     *
182
     * @param string $queueName
183
     * @param callable $callback Must return false if you want to consume only one message
184
     * @param int $timeout
185
     * @param bool $autoAck
186
     *
187
     * @return void
188
     *
189
     * @throws ConsumerException
190
     * @throws TimeoutException
191
     * @throws \AMQPConnectionException
192
     * @throws \AMQPChannelException
193
     * @throws \AMQPQueueException
194
     * @throws \InvalidArgumentException
195
     * @throws AssertionFailedException
196
     */
197
    public function consume($queueName, callable $callback, $timeout = 0, $autoAck = true)
198
    {
199
        $this->connection->setReadTimeout($timeout);
200
        $this->getChannel()->setPrefetchCount(1);
201
        $queue = $this->getQueue($queueName);
202
        $flags = $autoAck ? AMQP_AUTOACK : AMQP_NOPARAM;
203
        $consumerTag = self::generateConsumerTag();
204
        try {
205
            $queue->consume(function (\AMQPEnvelope $message) use ($callback, $queueName) {
206
207
                $burrowMessage = new Message(
208
                    $message->getBody(),
209
                    $message->getRoutingKey(),
210
                    $message->getHeaders(),
211
                    $message->getCorrelationId(),
212
                    $message->getReplyTo()
213
                );
214
                $burrowMessage->setDeliveryTag($message->getDeliveryTag());
215
                $burrowMessage->setQueue($queueName);
216
217
                return $callback($burrowMessage);
218
            }, $flags, $consumerTag);
219
        } catch (\AMQPQueueException $e) {
220
            if ($e->getMessage() === 'Consumer timeout exceed') {
221
                throw TimeoutException::build($e, $timeout);
222
            }
223
            throw ConsumerException::build($e);
224
        } finally {
225
            $queue->cancel($consumerTag);
226
        }
227
    }
228
229
    /**
230
     * Acknowledge the reception of the message.
231
     *
232
     * @param Message $message
233
     *
234
     * @return void
235
     *
236
     * @throws \AMQPConnectionException
237
     * @throws \AMQPChannelException
238
     * @throws \AMQPQueueException
239
     */
240
    public function ack(Message $message)
241
    {
242
        $queue = $this->getQueue($message->getQueue());
243
        $queue->ack($message->getDeliveryTag());
244
    }
245
246
    /**
247
     * Acknowledge an error during the consumption of the message
248
     *
249
     * @param Message $message
250
     * @param bool $requeue
251
     *
252
     * @return void
253
     *
254
     * @throws \AMQPConnectionException
255
     * @throws \AMQPChannelException
256
     * @throws \AMQPQueueException
257
     */
258
    public function nack(Message $message, $requeue = true)
259
    {
260
        $queue = $this->getQueue($message->getQueue());
261
        $queue->nack($message->getDeliveryTag(), $requeue ? AMQP_REQUEUE : AMQP_NOPARAM);
262
    }
263
264
    /**
265
     * Close the connection
266
     *
267
     * @return void
268
     */
269
    public function close()
270
    {
271
        $this->connection->disconnect();
272
    }
273
274
    /**
275
     * @return \AMQPChannel
276
     *
277
     * @throws \AMQPConnectionException
278
     */
279
    private function getChannel()
280
    {
281
        if (null === $this->channel) {
282
            $this->connection->connect();
283
            $this->channel = new \AMQPChannel($this->connection);
284
        }
285
286
        return $this->channel;
287
    }
288
289
    /**
290
     * @param string $queueName
291
     *
292
     * @return \AMQPQueue
293
     *
294
     * @throws \AMQPQueueException
295
     * @throws \AMQPConnectionException
296
     */
297
    private function getQueue($queueName)
298
    {
299
        $queue = new \AMQPQueue($this->getChannel());
300
        if ($queueName) {
301
            $queue->setName($queueName);
302
        }
303
304
        return $queue;
305
    }
306
307
    /**
308
     * @param string $exchangeName
309
     *
310
     * @return \AMQPExchange
311
     *
312
     * @throws \AMQPExchangeException
313
     * @throws \AMQPConnectionException
314
     */
315
    private function getExchange($exchangeName)
316
    {
317
        $exchange = new \AMQPExchange($this->getChannel());
318
        if ($exchangeName) {
319
            $exchange->setName($exchangeName);
320
        }
321
322
        return $exchange;
323
    }
324
325
    /**
326
     * Returns the message parameters
327
     *
328
     * @param Message $message
329
     *
330
     * @return array
331
     */
332
    private static function getMessageProperties(Message $message)
333
    {
334
        $properties = [
335
            self::DELIVERY_MODE => 2,
336
            self::CONTENT_TYPE => 'text/plain',
337
            self::APPLICATION_HEADERS => $message->getHeaders(),
338
        ];
339
340
        if ($message->getCorrelationId() !== null) {
341
            $properties[self::CORRELATION_ID] = $message->getCorrelationId();
342
        }
343
344
        if ($message->getReplyTo() !== null) {
345
            $properties[self::REPLY_TO] = $message->getReplyTo();
346
        }
347
348
        return $properties;
349
    }
350
351
    private static function generateConsumerTag(
352
        $prefix = 'consumer-tags',
353
        $length = 10,
354
        $chars = '0123456789abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ'
355
    ) {
356
        return $prefix . substr(str_shuffle(str_repeat($chars, ceil($length / strlen($chars)))), 1, $length);
357
    }
358
}
359