Passed
Pull Request — master (#31)
by Daniel
04:39
created

QueueClient   A

Complexity

Total Complexity 18

Size/Duplication

Total Lines 173
Duplicated Lines 0 %

Importance

Changes 0
Metric Value
wmc 18
eloc 53
c 0
b 0
f 0
dl 0
loc 173
rs 10

7 Methods

Rating   Name   Duplication   Size   Complexity  
A sendMessage() 0 13 1
A createQueue() 0 13 2
A __construct() 0 6 1
A getQueueUrl() 0 13 2
A declareQueue() 0 9 2
A receiveMessage() 0 17 2
B receiveMessages() 0 31 8
1
<?php
2
3
declare(strict_types=1);
4
5
namespace Jellyfish\QueueSQS;
6
7
use Aws\Sqs\SqsClient;
8
use Jellyfish\Queue\MessageInterface;
9
use Jellyfish\Queue\MessageMapperInterface;
10
use Jellyfish\Queue\QueueClientInterface;
11
use Jellyfish\QueueSQS\Exception\CreateQueueException;
12
13
use function count;
14
use function is_array;
15
16
class QueueClient implements QueueClientInterface
17
{
18
    /**
19
     * @var \Aws\Sqs\SqsClient
20
     */
21
    protected $sqsClient;
22
23
    /**
24
     * @var \Jellyfish\Queue\MessageMapperInterface
25
     */
26
    protected $messageMapper;
27
28
    /**
29
     * @param \Aws\Sqs\SqsClient $sqsClient
30
     * @param MessageMapperInterface $messageMapper
31
     */
32
    public function __construct(
33
        SqsClient $sqsClient,
34
        MessageMapperInterface $messageMapper
35
    ) {
36
        $this->sqsClient = $sqsClient;
37
        $this->messageMapper = $messageMapper;
38
    }
39
40
41
    /**
42
     * @param string $queueName
43
     *
44
     * @return string
45
     *
46
     * @throws \Jellyfish\QueueSQS\Exception\CreateQueueException
47
     */
48
    protected function declareQueue(string $queueName): string
49
    {
50
        $queueUrl = $this->getQueueUrl($queueName);
51
52
        if ($queueUrl === null) {
53
            $queueUrl = $this->createQueue($queueName);
54
        }
55
56
        return $queueUrl;
57
    }
58
59
    /**
60
     * @param string $queueName
61
     *
62
     * @return string
63
     */
64
    protected function getQueueUrl(string $queueName): ?string
65
    {
66
        $args = [
67
            'QueueName' => $queueName
68
        ];
69
70
        $result = $this->sqsClient->getQueueUrl($args);
71
72
        if (!$result->hasKey('QueueUrl')) {
73
            return null;
74
        }
75
76
        return $result->get('QueueUrl');
77
    }
78
79
    /**
80
     * @param string $queueName
81
     *
82
     * @return string
83
     *
84
     * @throws \Jellyfish\QueueSQS\Exception\CreateQueueException
85
     */
86
    protected function createQueue(string $queueName): string
87
    {
88
        $args = [
89
            'QueueName' => $queueName
90
        ];
91
92
        $result = $this->sqsClient->createQueue($args);
93
94
        if (!$result->hasKey('QueueUrl')) {
95
            throw new CreateQueueException(sprintf('Could not create queue "%s".', $queueName));
96
        }
97
98
        return $result->get('QueueUrl');
0 ignored issues
show
Bug Best Practice introduced by
The expression return $result->get('QueueUrl') could return the type null which is incompatible with the type-hinted return string. Consider adding an additional type-check to rule them out.
Loading history...
99
    }
100
101
    /**
102
     * @param string $queueName
103
     *
104
     * @return \Jellyfish\Queue\MessageInterface|null
105
     *
106
     * @throws \Jellyfish\QueueSQS\Exception\CreateQueueException
107
     */
108
    public function receiveMessage(string $queueName): ?MessageInterface
109
    {
110
        $queueUrl = $this->declareQueue($queueName);
111
112
        $args = [
113
            'QueueUrl' => $queueUrl,
114
            'MaxNumberOfMessages' => 1
115
        ];
116
117
        $result = $this->sqsClient->receiveMessage($args);
118
        $messageAsJson = $result->search('Messages[0].Body');
119
120
        if ($messageAsJson === null) {
121
            return null;
122
        }
123
124
        return $this->messageMapper->fromJson($messageAsJson);
125
    }
126
127
    /**
128
     * @param string $queueName
129
     * @param int $count
130
     *
131
     * @return \Jellyfish\Queue\MessageInterface[]
132
     *
133
     * @throws \Jellyfish\QueueSQS\Exception\CreateQueueException
134
     */
135
    public function receiveMessages(string $queueName, int $count): array
136
    {
137
        $receivedMessages = [];
138
        $queueUrl = $this->declareQueue($queueName);
139
        $args = [
140
            'QueueUrl' => $queueUrl,
141
            'MaxNumberOfMessages' => 10
142
        ];
143
144
        for ($i = 0; $i < ceil($count / 10); $i++) {
145
            $result = $this->sqsClient->receiveMessage($args);
146
            $messages = $result->get('Messages');
147
148
            if ($messages === null || !is_array($messages) || count($messages) === 0) {
149
                return $receivedMessages;
150
            }
151
152
            foreach ($messages as $message) {
153
                if (!isset($message['Body'])) {
154
                    continue;
155
                }
156
157
                $receivedMessages[] = $this->messageMapper->fromJson($message['Body']);
158
            }
159
160
            if (count($messages) < 10) {
161
                return $receivedMessages;
162
            }
163
        }
164
165
        return $receivedMessages;
166
    }
167
168
    /**
169
     * @param string $queueName
170
     * @param \Jellyfish\Queue\MessageInterface $message
171
     *
172
     * @return \Jellyfish\Queue\QueueClientInterface
173
     *
174
     * @throws \Jellyfish\QueueSQS\Exception\CreateQueueException
175
     */
176
    public function sendMessage(string $queueName, MessageInterface $message): QueueClientInterface
177
    {
178
        $queueUrl = $this->declareQueue($queueName);
179
        $json = $this->messageMapper->toJson($message);
180
181
        $args = [
182
            'QueueUrl' => $queueUrl,
183
            'MessageBody' => $json
184
        ];
185
186
        $this->sqsClient->sendMessage($args);
187
188
        return $this;
189
    }
190
}
191