Completed
Push — master ( c897cd...d28680 )
by Olivier
11:16
created

PhpAmqpLibMessageProvider::get()   C

Complexity

Conditions 7
Paths 7

Size

Total Lines 34
Code Lines 20

Duplication

Lines 0
Ratio 0 %

Importance

Changes 2
Bugs 0 Features 0
Metric Value
c 2
b 0
f 0
dl 0
loc 34
rs 6.7272
cc 7
eloc 20
nc 7
nop 0
1
<?php
2
3
namespace Swarrot\Broker\MessageProvider;
4
5
use PhpAmqpLib\Channel\AMQPChannel;
6
use PhpAmqpLib\Wire\AMQPArray;
7
use Swarrot\Broker\Message;
8
9
class PhpAmqpLibMessageProvider implements MessageProviderInterface
10
{
11
    /**
12
     * @var AMQPChannel
13
     */
14
    private $channel;
15
16
    /**
17
     * @var string
18
     */
19
    private $queueName;
20
21
    /**
22
     * @param AMQPChannel $channel
23
     * @param string      $queueName
24
     */
25
    public function __construct(AMQPChannel $channel, $queueName)
26
    {
27
        $this->channel = $channel;
28
        $this->queueName = $queueName;
29
    }
30
31
    /**
32
     * {@inheritdoc}
33
     */
34
    public function get()
35
    {
36
        $envelope = $this->channel->basic_get($this->queueName);
37
38
        if (null === $envelope) {
39
            return;
40
        }
41
42
        $properties = [];
43
        $propertyKeys = [
44
            'content_type', 'delivery_mode', 'content_encoding', 'type', 'timestamp', 'priority', 'expiration',
45
            'app_id', 'message_id', 'reply_to', 'correlation_id', 'user_id', 'cluster_id', 'channel', 'consumer_tag',
46
            'delivery_tag', 'redelivered', 'exchange', 'routing_key',
47
        ];
48
49
        foreach ($propertyKeys as $key) {
50
            if ($envelope->has($key)) {
51
                $properties[$key] = $envelope->get($key);
52
            }
53
        }
54
55
        $properties['headers'] = [];
56
        if ($envelope->has('application_headers')) {
57
            foreach ($envelope->get('application_headers') as $key => $value) {
58
                if ($value[1] instanceof AMQPArray) {
59
                    $properties['headers'][$key] = $value[1]->getNativeData();
60
                } else {
61
                    $properties['headers'][$key] = $value[1];
62
                }
63
            }
64
        }
65
66
        return new Message($envelope->body, $properties, $envelope->get('delivery_tag'));
67
    }
68
69
    /**
70
     * {@inheritdoc}
71
     */
72
    public function ack(Message $message)
73
    {
74
        $this->channel->basic_ack($message->getId());
75
    }
76
77
    /**
78
     * {@inheritdoc}
79
     */
80
    public function nack(Message $message, $requeue = false)
81
    {
82
        $this->channel->basic_nack($message->getId(), false, $requeue);
83
    }
84
85
    /**
86
     * {@inheritdoc}
87
     */
88
    public function getQueueName()
89
    {
90
        return $this->queueName;
91
    }
92
}
93