Completed
Push — master ( db25a3...dab7fa )
by Damian
03:54 queued 11s
created

SqsQueue::__construct()   A

Complexity

Conditions 1
Paths 1

Size

Total Lines 5
Code Lines 3

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 4
CRAP Score 1

Importance

Changes 0
Metric Value
cc 1
eloc 3
nc 1
nop 3
dl 0
loc 5
ccs 4
cts 4
cp 1
crap 1
rs 10
c 0
b 0
f 0
1
<?php declare(strict_types=1);
2
3
namespace Initx\Querabilis\Driver;
4
5
use Aws\Result;
6
use Aws\Sqs\SqsClient;
7
use Initx\Querabilis\Envelope;
8
use Initx\Querabilis\Exception\IllegalStateException;
9
use Initx\Querabilis\Queue;
10
use JMS\Serializer\SerializerInterface;
11
use Ramsey\Uuid\Uuid;
12
13
final class SqsQueue implements Queue
14
{
15
    use HasFallbackSerializer;
16
    use HasDefaultRemoveAndElement;
17
18
    /**
19
     * @var SqsClient
20
     */
21
    private $client;
22
23
    /**
24
     * @var string
25
     */
26
    private $queueName;
27
28
    /**
29
     * @var string
30
     */
31
    private $queueUrl = '';
32
33
    /**
34
     * @var SerializerInterface
35
     */
36
    private $serializer;
37
38 4
    public function __construct(SqsClient $client, string $queueName, ?SerializerInterface $serializer = null)
39
    {
40 4
        $this->client = $client;
41 4
        $this->queueName = $queueName;
42 4
        $this->serializer = $this->fallbackSerializer($serializer);
43 4
    }
44
45 2
    public function add(Envelope $envelope): bool
46
    {
47 2
        if (!$this->offer($envelope)) {
48 1
            throw new IllegalStateException("Could not add to queue $this->queueName");
49
        }
50
51 1
        return true;
52
    }
53
54 2
    public function offer(Envelope $envelope): bool
55
    {
56 2
        $this->resolveQueueUrl();
57
58 2
        $serialized = $this->serializer->serialize($envelope, 'json');
59
        $args = [
60 2
            'QueueUrl' => $this->queueUrl,
61 2
            'MessageBody' => $serialized,
62
        ];
63
64 2
        if ($this->isFifo()) {
65 1
            $args['MessageGroupId'] = self::class;
66 1
            $args['MessageDeduplicationId'] = Uuid::uuid4()->toString();
67
        }
68
69 2
        return (bool)$this->client->sendMessage($args);
70
    }
71
72 2
    public function poll(): ?Envelope
73
    {
74 2
        $this->resolveQueueUrl();
75 2
        $result = $this->queryForOneMessage();
76
77 2
        if ($result->get('Messages') && count($result->get('Messages'))) {
78 1
            $message = $result->get('Messages')[0];
79 1
            $envelope = $this->serializer->deserialize($message['Body'], Envelope::class, 'json');
80
81
            // remove message from queue
82 1
            $this->client->deleteMessage([
83 1
                'QueueUrl' => $this->queueUrl,
84 1
                'ReceiptHandle' => $message['ReceiptHandle'],
85
            ]);
86
87 1
            return $envelope;
88
        }
89
90
        // no messages
91 1
        return null;
92
    }
93
94 2
    public function peek(): ?Envelope
95
    {
96 2
        $this->resolveQueueUrl();
97 2
        $result = $this->queryForOneMessage();
98
99 2
        if ($result->get('Messages') && count($result->get('Messages'))) {
100 1
            $message = $result->get('Messages')[0];
101
102 1
            return $this->serializer->deserialize($message['Body'], Envelope::class, 'json');
103
        }
104
105
        // no messages
106 1
        return null;
107
    }
108
109 4
    private function resolveQueueUrl(): void
110
    {
111 4
        if (!empty($this->queueUrl)) {
112
            return;
113
        }
114
115 4
        $result = $this->client->getQueueUrl(['QueueName' => $this->queueName]);
116 4
        $this->queueUrl = $result->get('QueueUrl');
117
118 4
        if (empty($this->queueUrl)) {
119
            throw new IllegalStateException("Could not resolve queue url from queue name '{$this->queueName}'");
120
        }
121 4
    }
122
123 2
    private function queryForOneMessage(): Result
124
    {
125 2
        return $this->client->receiveMessage([
126 2
            'MaxNumberOfMessages' => 1,
127 2
            'QueueUrl' => $this->queueUrl,
128
        ]);
129
    }
130
131 2
    private function isFifo(): bool
132
    {
133 2
        return $this->endsWith($this->queueName, '.fifo');
134
    }
135
136 2
    private function endsWith(string $haystack, string $needle): bool
137
    {
138 2
        $length = strlen($needle);
139
140 2
        return substr($haystack, -$length) === $needle;
141
    }
142
}
143