Completed
Branch master (339a46)
by Damian
04:14
created

SqsQueue::element()   A

Complexity

Conditions 2
Paths 2

Size

Total Lines 9
Code Lines 4

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 5
CRAP Score 2

Importance

Changes 0
Metric Value
cc 2
eloc 4
nc 2
nop 0
dl 0
loc 9
ccs 5
cts 5
cp 1
crap 2
rs 10
c 0
b 0
f 0
1
<?php declare(strict_types=1);
2
3
namespace Initx\Querabilis\Driver;
4
5
use Aws\Result;
6
use Aws\Sqs\SqsClient;
7
use Initx\Querabilis\Envelope;
8
use Initx\Querabilis\Exception\IllegalStateException;
9
use Initx\Querabilis\Exception\NoSuchElementException;
10
use Initx\Querabilis\Queue;
11
use JMS\Serializer\SerializerInterface;
12
use Ramsey\Uuid\Uuid;
13
14
final class SqsQueue implements Queue
15
{
16
    use HasFallbackSerializer;
17
    use HasDefaultRemoveAndElement;
18
19
    /**
20
     * @var SqsClient
21
     */
22
    private $client;
23
24
    /**
25
     * @var string
26
     */
27
    private $queueName;
28
29
    /**
30
     * @var string
31
     */
32
    private $queueUrl = '';
33
34
    /**
35
     * @var SerializerInterface
36
     */
37
    private $serializer;
38
39 4
    public function __construct(SqsClient $client, string $queueName, ?SerializerInterface $serializer = null)
40
    {
41 4
        $this->client = $client;
42 4
        $this->queueName = $queueName;
43 4
        $this->serializer = $this->fallbackSerializer($serializer);
44 4
    }
45
46 2
    public function add(Envelope $envelope): bool
47
    {
48 2
        if (!$this->offer($envelope)) {
49 1
            throw new IllegalStateException("Could not add to queue $this->queueName");
50
        }
51
52 1
        return true;
53
    }
54
55 2
    public function offer(Envelope $envelope): bool
56
    {
57 2
        $this->resolveQueueUrl();
58
59 2
        $serialized = $this->serializer->serialize($envelope, 'json');
60
        $args = [
61 2
            'QueueUrl' => $this->queueUrl,
62 2
            'MessageBody' => $serialized,
63
        ];
64
65 2
        if ($this->isFifo()) {
66 1
            $args['MessageGroupId'] = __CLASS__;
67 1
            $args['MessageDeduplicationId'] = Uuid::uuid4()->toString();
68
        }
69
70 2
        return (bool)$this->client->sendMessage($args);
71
    }
72
73 2
    public function poll(): ?Envelope
74
    {
75 2
        $this->resolveQueueUrl();
76 2
        $result = $this->queryForOneMessage();
77
78 2
        if ($result->get('Messages') && count($result->get('Messages'))) {
79 1
            $message = $result->get('Messages')[0];
80 1
            $envelope = $this->serializer->deserialize($message['Body'], Envelope::class, 'json');
81
82
            // remove message from queue
83 1
            $this->client->deleteMessage([
84 1
                'QueueUrl' => $this->queueUrl,
85 1
                'ReceiptHandle' => $message['ReceiptHandle'],
86
            ]);
87
88 1
            return $envelope;
89
        }
90
91
        // no messages
92 1
        return null;
93
    }
94
95 2
    public function peek(): ?Envelope
96
    {
97 2
        $this->resolveQueueUrl();
98 2
        $result = $this->queryForOneMessage();
99
100 2
        if ($result->get('Messages') && count($result->get('Messages'))) {
101 1
            $message = $result->get('Messages')[0];
102 1
            $envelope = $this->serializer->deserialize($message['Body'], Envelope::class, 'json');
103
104 1
            return $envelope;
105
        }
106
107
        // no messages
108 1
        return null;
109
    }
110
111 4
    private function resolveQueueUrl(): void
112
    {
113 4
        if (empty($this->queueUrl)) {
114 4
            $result = $this->client->getQueueUrl(['QueueName' => $this->queueName]);
115 4
            $this->queueUrl = $result->get('QueueUrl');
116 4
            if (empty($this->queueUrl)) {
117
                throw new IllegalStateException("Could not resolve queue url from queue name '{$this->queueName}'");
118
            }
119
        }
120 4
    }
121
122 2
    private function queryForOneMessage(): Result
123
    {
124 2
        return $this->client->receiveMessage([
125 2
            'MaxNumberOfMessages' => 1,
126 2
            'QueueUrl' => $this->queueUrl,
127
        ]);
128
    }
129
130 2
    private function isFifo(): bool
131
    {
132 2
        return $this->endsWith($this->queueName, '.fifo');
133
    }
134
135 2
    private function endsWith(string $haystack, string $needle): bool
136
    {
137 2
        $length = strlen($needle);
138
139 2
        return (substr($haystack, -$length) === $needle);
140
    }
141
}
142