Test Failed
Push — master ( a78113...ae13e1 )
by Damian
03:52
created

SqsQueue::peek()   A

Complexity

Conditions 3
Paths 2

Size

Total Lines 14
Code Lines 7

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
eloc 7
dl 0
loc 14
rs 10
c 0
b 0
f 0
cc 3
nc 2
nop 0
1
<?php declare(strict_types=1);
2
3
namespace Initx\Driver;
4
5
use Aws\Result;
6
use Aws\Sqs\SqsClient;
7
use Initx\Envelope;
8
use Initx\Exception\IllegalStateException;
9
use Initx\Exception\NoSuchElementException;
10
use Initx\Queue;
11
use JMS\Serializer\SerializerInterface;
12
use Ramsey\Uuid\Uuid;
13
14
class SqsQueue implements Queue
15
{
16
    use HasFallbackSerializer;
17
18
    /**
19
     * @var SqsClient
20
     */
21
    private $client;
22
23
    /**
24
     * @var string
25
     */
26
    private $queueName;
27
28
    /**
29
     * @var string
30
     */
31
    private $queueUrl;
32
33
    /**
34
     * @var SerializerInterface
35
     */
36
    private $serializer;
37
38
    public function __construct(SqsClient $client, string $queueName, ?SerializerInterface $serializer = null)
39
    {
40
        $this->client = $client;
41
        $this->queueName = $queueName;
42
        $this->serializer = $serializer;
43
        $this->fallbackSerializer();
44
    }
45
46
    public function add(Envelope $envelope): void
47
    {
48
        if (!$this->offer($envelope)) {
49
            throw new IllegalStateException("Could not add to queue $this->queueName");
50
        }
51
    }
52
53
    public function offer(Envelope $envelope): bool
54
    {
55
        $this->resolveQueueUrl();
56
57
        $serialized = $this->serializer->serialize($envelope, 'json');
58
        $args = [
59
            'QueueUrl' => $this->queueUrl,
60
            'MessageBody' => $serialized,
61
        ];
62
63
        if ($this->isFifo()) {
64
            $args['MessageGroupId'] = __CLASS__;
65
            $args['MessageDeduplicationId'] = Uuid::uuid4()->toString();
66
        }
67
68
        $result = $this->client->sendMessage($args);
69
70
        return $result ? true : false;
0 ignored issues
show
introduced by
$result is of type Aws\Result, thus it always evaluated to true.
Loading history...
71
    }
72
73
    public function remove(): Envelope
74
    {
75
        $envelope = $this->poll();
76
77
        if (!$envelope) {
78
            throw new NoSuchElementException();
79
        }
80
81
        return $envelope;
82
    }
83
84
    public function poll(): ?Envelope
85
    {
86
        $this->resolveQueueUrl();
87
        $result = $this->queryForOneMessage();
88
89
        if ($result->get('Messages') && count($result->get('Messages'))) {
90
            $message = $result->get('Messages')[0];
91
            $envelope = $this->serializer->deserialize($message['Body'], Envelope::class, 'json');
92
93
            // remove message from queue
94
            $this->client->deleteMessage([
95
                'QueueUrl' => $this->queueUrl,
96
                'ReceiptHandle' => $message['ReceiptHandle'],
97
            ]);
98
99
            return $envelope;
100
        }
101
102
        // no messages
103
        return null;
104
    }
105
106
    public function element(): Envelope
107
    {
108
        $envelope = $this->peek();
109
110
        if (!$envelope) {
111
            throw new NoSuchElementException();
112
        }
113
114
        return $envelope;
115
    }
116
117
    public function peek(): ?Envelope
118
    {
119
        $this->resolveQueueUrl();
120
        $result = $this->queryForOneMessage();
121
122
        if ($result->get('Messages') && count($result->get('Messages'))) {
123
            $message = $result->get('Messages')[0];
124
            $envelope = $this->serializer->deserialize($message['Body'], Envelope::class, 'json');
125
126
            return $envelope;
127
        }
128
129
        // no messages
130
        return null;
131
    }
132
133
    private function resolveQueueUrl(): void
134
    {
135
        if ($this->queueUrl === null) {
136
            $result = $this->client->getQueueUrl(['QueueName' => $this->queueName]);
137
138
            if (!$result || !$this->queueUrl = $result->get('QueueUrl')) {
0 ignored issues
show
introduced by
$result is of type Aws\Result, thus it always evaluated to true.
Loading history...
139
                throw new IllegalStateException("Could not resolve queue url from queue name '{$this->queueName}'");
140
            }
141
        }
142
143
    }
144
145
    private function queryForOneMessage(): Result
146
    {
147
        return $this->client->receiveMessage([
148
            'MaxNumberOfMessages' => 1,
149
            'QueueUrl' => $this->queueUrl,
150
        ]);
151
    }
152
153
    private function isFifo(): bool
154
    {
155
        return $this->endsWith($this->queueName, '.fifo');
156
    }
157
158
    private function endsWith(string $haystack, string $needle): bool
159
    {
160
        $length = strlen($needle);
161
162
        return (substr($haystack, -$length) === $needle);
163
    }
164
}
165