SqsQueue::queryForOneMessage()   A
last analyzed

Complexity

Conditions 1
Paths 1

Size

Total Lines 5
Code Lines 3

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 4
CRAP Score 1

Importance

Changes 0
Metric Value
cc 1
eloc 3
nc 1
nop 0
dl 0
loc 5
ccs 4
cts 4
cp 1
crap 1
rs 10
c 0
b 0
f 0
1
<?php declare(strict_types=1);
2
3
namespace Initx\Querabilis\Driver;
4
5
use Aws\Result;
6
use Aws\Sqs\SqsClient;
7
use Initx\Querabilis\Envelope;
8
use Initx\Querabilis\Exception\IllegalStateException;
9
use Initx\Querabilis\Queue;
10
use JMS\Serializer\SerializerInterface;
11
use Ramsey\Uuid\Uuid;
12
13
final class SqsQueue implements Queue
14
{
15
    use HasFallbackSerializer;
16
    use HasDefaultRemoveAndElement;
17
18
    /**
19
     * @var SqsClient
20
     */
21
    private $client;
22
23
    /**
24
     * @var string
25
     */
26
    private $queueName;
27
28
    /**
29
     * @var string
30
     */
31
    private $queueUrl = '';
32
33
    /**
34
     * @var SerializerInterface
35
     */
36
    private $serializer;
37
38 4
    public function __construct(SqsClient $client, string $queueName, ?SerializerInterface $serializer = null)
39
    {
40 4
        $this->client = $client;
41 4
        $this->queueName = $queueName;
42 4
        $this->serializer = $this->fallbackSerializer($serializer);
43 4
    }
44
45 2
    public function add(Envelope $envelope): bool
46
    {
47 2
        if (!$this->offer($envelope)) {
48 1
            throw new IllegalStateException("Could not add to queue $this->queueName");
49
        }
50
51 1
        return true;
52
    }
53
54 2
    public function offer(Envelope $envelope): bool
55
    {
56 2
        $this->resolveQueueUrl();
57
58 2
        $serialized = $this->serializer->serialize($envelope, 'json');
59
        $args = [
60 2
            'QueueUrl' => $this->queueUrl,
61 2
            'MessageBody' => $serialized,
62
        ];
63
64 2
        if ($this->isFifo()) {
65 1
            $args['MessageGroupId'] = self::class;
66 1
            $args['MessageDeduplicationId'] = Uuid::uuid4()->toString();
67
        }
68
69 2
        return (bool)$this->client->sendMessage($args);
70
    }
71
72 2
    public function poll(): ?Envelope
73
    {
74 2
        $this->resolveQueueUrl();
75 2
        $result = $this->queryForOneMessage();
76 2
        $messages = $result->get('Messages');
77
78 2
        if ($messages && \count($messages)) {
79 1
            $message = $result->get('Messages')[0];
80 1
            $envelope = $this->serializer->deserialize($message['Body'], Envelope::class, 'json');
81
82
            // remove message from queue
83 1
            $this->client->deleteMessage([
84 1
                'QueueUrl' => $this->queueUrl,
85 1
                'ReceiptHandle' => $message['ReceiptHandle'],
86
            ]);
87
88 1
            return $envelope;
89
        }
90
91
        // no messages
92 1
        return null;
93
    }
94
95 2
    public function peek(): ?Envelope
96
    {
97 2
        $this->resolveQueueUrl();
98 2
        $result = $this->queryForOneMessage();
99 2
        $messages = $result->get('Messages');
100
101 2
        if ($messages && \count($messages)) {
102 1
            $message = $result->get('Messages')[0];
103
104 1
            return $this->serializer->deserialize($message['Body'], Envelope::class, 'json');
105
        }
106
107
        // no messages
108 1
        return null;
109
    }
110
111 4
    private function resolveQueueUrl(): void
112
    {
113 4
        if (!empty($this->queueUrl)) {
114
            return;
115
        }
116
117 4
        $result = $this->client->getQueueUrl(['QueueName' => $this->queueName]);
118 4
        $this->queueUrl = $result->get('QueueUrl');
119
120 4
        if (empty($this->queueUrl)) {
121
            throw new IllegalStateException("Could not resolve queue url from queue name '{$this->queueName}'");
122
        }
123 4
    }
124
125 2
    private function queryForOneMessage(): Result
126
    {
127 2
        return $this->client->receiveMessage([
128 2
            'MaxNumberOfMessages' => 1,
129 2
            'QueueUrl' => $this->queueUrl,
130
        ]);
131
    }
132
133 2
    private function isFifo(): bool
134
    {
135 2
        return $this->endsWith($this->queueName, '.fifo');
136
    }
137
138 2
    private function endsWith(string $haystack, string $needle): bool
139
    {
140 2
        $length = strlen($needle);
141
142 2
        return substr($haystack, -$length) === $needle;
143
    }
144
}
145