Completed
Push — master ( de5c9f...2ef464 )
by Daniel
21s queued 11s
created

QueueClient::doReceiveMessage()   A

Complexity

Conditions 3
Paths 2

Size

Total Lines 9
Code Lines 4

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
eloc 4
c 0
b 0
f 0
dl 0
loc 9
rs 10
cc 3
nc 2
nop 1
1
<?php
2
3
declare(strict_types=1);
4
5
namespace Jellyfish\QueueRabbitMq;
6
7
use Jellyfish\Queue\MessageInterface;
8
use Jellyfish\Queue\MessageMapperInterface;
9
use Jellyfish\Queue\QueueClientInterface;
10
use PhpAmqpLib\Channel\AMQPChannel;
11
use PhpAmqpLib\Connection\AbstractConnection;
12
use PhpAmqpLib\Message\AMQPMessage;
13
14
class QueueClient implements QueueClientInterface
15
{
16
    /**
17
     * @var \PhpAmqpLib\Connection\AbstractConnection
18
     */
19
    protected $connection;
20
21
    /**
22
     * @var \Jellyfish\QueueRabbitMq\AmqpMessageFactoryInterface
23
     */
24
    protected $amqpMessageFactory;
25
26
    /**
27
     * @var \Jellyfish\Queue\MessageMapperInterface
28
     */
29
    protected $messageMapper;
30
31
    /**
32
     * @var \PhpAmqpLib\Channel\AMQPChannel|null
33
     */
34
    protected $channel;
35
36
    /**
37
     * @param \PhpAmqpLib\Connection\AbstractConnection $connection
38
     * @param \Jellyfish\QueueRabbitMq\AmqpMessageFactoryInterface $amqpMessageFactory
39
     * @param \Jellyfish\Queue\MessageMapperInterface $messageMapper
40
     */
41
    public function __construct(
42
        AbstractConnection $connection,
43
        AmqpMessageFactoryInterface $amqpMessageFactory,
44
        MessageMapperInterface $messageMapper
45
    ) {
46
        $this->connection = $connection;
47
        $this->amqpMessageFactory = $amqpMessageFactory;
48
        $this->messageMapper = $messageMapper;
49
    }
50
51
    /**
52
     * @param string $queueName
53
     *
54
     * @return \Jellyfish\Queue\MessageInterface|null
55
     */
56
    public function receiveMessage(string $queueName): ?MessageInterface
57
    {
58
        $this->declareQueue($queueName);
59
60
        return $this->doReceiveMessage($queueName);
61
    }
62
63
    /**
64
     * @param string $queueName
65
     *
66
     * @return \Jellyfish\Queue\MessageInterface|null
67
     */
68
    protected function doReceiveMessage(string $queueName): ?MessageInterface
69
    {
70
        $messageAsJson = $this->getChannel()->basic_get($queueName, true);
71
72
        if ($messageAsJson === null || !($messageAsJson instanceof AMQPMessage)) {
73
            return null;
74
        }
75
76
        return $this->messageMapper->fromJson($messageAsJson->getBody());
77
    }
78
79
    /**
80
     * @param string $queueName
81
     * @param int $count
82
     *
83
     * @return \Jellyfish\Queue\MessageInterface[]
84
     */
85
    public function receiveMessages(string $queueName, int $count): array
86
    {
87
        $receivedMessages = [];
88
        $this->declareQueue($queueName);
89
90
        for ($i = 0; $i < $count; $i++) {
91
            $receivedMessage = $this->doReceiveMessage($queueName);
92
93
            if ($receivedMessage === null) {
94
                return $receivedMessages;
95
            }
96
97
            $receivedMessages[] = $receivedMessage;
98
        }
99
100
        return $receivedMessages;
101
    }
102
103
    /**
104
     * @param string $queueName
105
     * @param \Jellyfish\Queue\MessageInterface $message
106
     *
107
     * @return \Jellyfish\Queue\QueueClientInterface
108
     */
109
    public function sendMessage(string $queueName, MessageInterface $message): QueueClientInterface
110
    {
111
        $amqpMessageBody = $this->messageMapper->toJson($message);
112
        $amqpMessage = $this->amqpMessageFactory->create($amqpMessageBody);
113
114
        $this->declareQueue($queueName);
115
        $this->getChannel()->basic_publish($amqpMessage, '', $queueName);
116
117
        return $this;
118
    }
119
120
    /**
121
     * @param string $queueName
122
     *
123
     * @return \Jellyfish\Queue\QueueClientInterface
124
     */
125
    protected function declareQueue(string $queueName): QueueClientInterface
126
    {
127
        $this->getChannel()->queue_declare($queueName, false, true);
128
129
        return $this;
130
    }
131
132
    /**
133
     * @return \PhpAmqpLib\Channel\AMQPChannel
134
     */
135
    protected function getChannel(): AMQPChannel
136
    {
137
        if ($this->channel === null) {
138
            $this->channel = $this->connection->channel();
139
        }
140
141
        return $this->channel;
142
    }
143
144
    public function __destruct()
145
    {
146
        if ($this->connection->isConnected()) {
147
            $this->connection->close();
148
        }
149
    }
150
}
151