SqsDriver::enqueue()   A
last analyzed

Complexity

Conditions 1
Paths 1

Size

Total Lines 14
Code Lines 8

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 8
CRAP Score 1

Importance

Changes 0
Metric Value
dl 0
loc 14
ccs 8
cts 8
cp 1
rs 9.4285
c 0
b 0
f 0
cc 1
eloc 8
nc 1
nop 2
crap 1
1
<?php
2
3
namespace Spekkionu\PMG\Queue\Sqs\Driver;
4
5
use Spekkionu\PMG\Queue\Sqs\Envelope\SqsEnvelope;
6
use Aws\Sqs\SqsClient;
7
use PMG\Queue\Driver\AbstractPersistanceDriver;
8
use PMG\Queue\Exception;
9
use PMG\Queue\Serializer\Serializer;
10
use PMG\Queue\DefaultEnvelope;
11
use PMG\Queue\Envelope;
12
use PMG\Queue\Message;
13
use PMG\Queue\Exception\InvalidEnvelope;
14
15
class SqsDriver extends AbstractPersistanceDriver
16
{
17
    /**
18
     * @var SqsClient
19
     */
20
    private $client;
21
22
    /**
23
     * @var array
24
     */
25
    private $queueUrls;
26
27
    /**
28
     * @param SqsClient $client
29
     * @param Serializer $serializer
30
     * @param array $queueUrls
31
     */
32 12
    public function __construct(SqsClient $client, Serializer $serializer, array $queueUrls = [])
33
    {
34 12
        parent::__construct($serializer);
35 12
        $this->client    = $client;
36 12
        $this->queueUrls = $queueUrls;
37 12
    }
38
39
    /**
40
     * {@inheritdoc}
41
     */
42
    public static function allowedClasses()
43
    {
44
        $cls = parent::allowedClasses();
45
        $cls[] = SqsEnvelope::class;
46
        return $cls;
47
    }
48
49
    /**
50
     * @inheritDoc
51
     */
52 1
    public function enqueue(string $queueName, Message $message): Envelope
53
    {
54 1
        $queueUrl = $this->getQueueUrl($queueName);
55
56 1
        $env  = new DefaultEnvelope($message);
57 1
        $data = $this->serialize($env);
58
59 1
        $result = $this->client->sendMessage([
60 1
            'QueueUrl'    => $queueUrl,
61 1
            'MessageBody' => $data,
62
        ]);
63
64 1
        return new SqsEnvelope($result['MessageId'], $env);
65
    }
66
67
    /**
68
     * @inheritDoc
69
     */
70 2
    public function dequeue(string $queueName)
71
    {
72 2
        $queueUrl = $this->getQueueUrl($queueName);
73
74 2
        $result = $this->client->receiveMessage([
75 2
            'QueueUrl'            => $queueUrl,
76 2
            'MaxNumberOfMessages' => 1,
77
            'AttributeNames'      => ['ApproximateReceiveCount'],
78
        ]);
79
80 2
        if ( ! $result || ! $messages = $result['Messages']) {
81 1
            return null;
82
        }
83
84 1
        $message = array_shift($messages);
85
86 1
        $wrapped = $this->unserialize($message['Body']);
87
88 1
        $msg = new DefaultEnvelope($wrapped->unwrap(), $message['Attributes']['ApproximateReceiveCount']);
89 1
        $env = new SqsEnvelope($message['MessageId'], $msg, $message['ReceiptHandle']);
90
91 1
        return $env;
92
    }
93
94
    /**
95
     * @inheritDoc
96
     */
97 4
    public function ack(string $queueName, Envelope $envelope)
98
    {
99 4
        if ( ! $envelope instanceof SqsEnvelope) {
100 1
            throw new InvalidEnvelope(sprintf(
101 1
                '%s requires that envelopes be instances of "%s", got "%s"',
102 1
                __CLASS__,
103 1
                SqsEnvelope::class,
104 1
                get_class($envelope)
105
            ));
106
        }
107
108 3
        $queueUrl = $this->getQueueUrl($queueName);
109
110 3
        $this->client->deleteMessage([
111 3
            'QueueUrl'      => $queueUrl,
112 3
            'ReceiptHandle' => $envelope->getReceiptHandle(),
113
        ]);
114 3
    }
115
116
    /**
117
     * @inheritDoc
118
     */
119 1
    public function retry(string $queueName, Envelope $envelope) : Envelope
120
    {
121 1
        return $envelope->retry();
122
    }
123
124
    /**
125
     * @inheritDoc
126
     */
127 1
    public function fail(string $queueName, Envelope $envelope)
128
    {
129 1
        return $this->ack($queueName, $envelope);
130
    }
131
132
    /**
133
     * Returns queue url
134
     *
135
     * @param  string $queueName The name of the queue
136
     *
137
     * @return string            The queue url
138
     */
139 9
    public function getQueueUrl($queueName)
140
    {
141 9
        if (array_key_exists($queueName, $this->queueUrls)) {
142 7
            return $this->queueUrls[$queueName];
143
        }
144
145 2
        $result = $this->client->getQueueUrl(['QueueName' => $queueName]);
146
147 2
        if ($result && $queueUrl = $result['QueueUrl']) {
148 1
            return $this->queueUrls[$queueName] = $queueUrl;
149
        }
150
151 1
        throw new \InvalidArgumentException("Queue url for queue {$queueName} not found");
152
    }
153
154
    /**
155
     * Release a message back to a ready state. This is used by the consumer
156
     * when it skips the retry system. This may happen if the consumer receives
157
     * a signal and has to exit early.
158
     *
159
     * @param $queueName The queue from which the message came
160
     * @param $envelope The message to release, should be the same instance
161
     *        returned from `dequeue`
162
     *
163
     * @throws Exception\DriverError if something goes wrong
164
     * @return void
165
     */
166 1
    public function release(string $queueName, Envelope $envelope)
167
    {
168 1
        return $this->ack($queueName, $envelope);
169
    }
170
}
171