Test Setup Failed
Push — master ( f4c1cb...842434 )
by Jonathan
04:58 queued 14s
created

SqsDriver::allowedClasses()   A

Complexity

Conditions 1
Paths 1

Size

Total Lines 6
Code Lines 4

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 3
CRAP Score 1

Importance

Changes 0
Metric Value
dl 0
loc 6
ccs 3
cts 3
cp 1
rs 9.4285
c 0
b 0
f 0
cc 1
eloc 4
nc 1
nop 0
crap 1
1
<?php
2
3
namespace Spekkionu\PMG\Queue\Sqs\Driver;
4
5
use Spekkionu\PMG\Queue\Sqs\Envelope\SqsEnvelope;
6
use Aws\Sqs\SqsClient;
7
use PMG\Queue\Driver\AbstractPersistanceDriver;
8
use PMG\Queue\Exception;
9
use PMG\Queue\Serializer\Serializer;
10
use PMG\Queue\DefaultEnvelope;
11
use PMG\Queue\Envelope;
12
use PMG\Queue\Message;
13
use PMG\Queue\Exception\InvalidEnvelope;
14
15
class SqsDriver extends AbstractPersistanceDriver
16
{
17
    /**
18
     * @var SqsClient
19
     */
20
    private $client;
21
22
    /**
23
     * @var array
24
     */
25
    private $queueUrls;
26
27
    /**
28
     * @param SqsClient $client
29
     * @param Serializer $serializer
30
     * @param array $queueUrls
31 11
     */
32
    public function __construct(SqsClient $client, Serializer $serializer, array $queueUrls = [])
33 11
    {
34 11
        parent::__construct($serializer);
35 11
        $this->client    = $client;
36 11
        $this->queueUrls = $queueUrls;
37
    }
38
39
    /**
40
     * {@inheritdoc}
41 1
     */
42
    public static function allowedClasses()
43 1
    {
44
        $cls = parent::allowedClasses();
45 1
        $cls[] = SqsEnvelope::class;
46 1
        return $cls;
47
    }
48 1
49 1
    /**
50 1
     * @inheritDoc
51
     */
52
    public function enqueue(string $queueName, Message $message): Envelope
53 1
    {
54
        $queueUrl = $this->getQueueUrl($queueName);
55
56
        $env  = new DefaultEnvelope($message);
57
        $data = $this->serialize($env);
58
59 2
        $result = $this->client->sendMessage([
60
            'QueueUrl'    => $queueUrl,
61 2
            'MessageBody' => $data,
62
        ]);
63 2
64 2
        return new SqsEnvelope($result['MessageId'], $env);
65 2
    }
66
67
    /**
68
     * @inheritDoc
69 2
     */
70 1
    public function dequeue(string $queueName)
71
    {
72
        $queueUrl = $this->getQueueUrl($queueName);
73 1
74
        $result = $this->client->receiveMessage([
75 1
            'QueueUrl'            => $queueUrl,
76
            'MaxNumberOfMessages' => 1,
77 1
            'AttributeNames'      => ['ApproximateReceiveCount'],
78 1
        ]);
79
80 1
        if ( ! $result || ! $messages = $result['Messages']) {
81
            return null;
82
        }
83
84
        $message = array_shift($messages);
85
86 3
        $wrapped = $this->unserialize($message['Body']);
87
88 3
        $msg = new DefaultEnvelope($wrapped->unwrap(), $message['Attributes']['ApproximateReceiveCount']);
89 1
        $env = new SqsEnvelope($message['MessageId'], $msg, $message['ReceiptHandle']);
90 1
91 1
        return $env;
92 1
    }
93
94
    /**
95
     * @inheritDoc
96
     */
97 2
    public function ack(string $queueName, Envelope $envelope)
98
    {
99 2
        if ( ! $envelope instanceof SqsEnvelope) {
100 2
            throw new InvalidEnvelope(sprintf(
101 2
                '%s requires that envelopes be instances of "%s", got "%s"',
102
                __CLASS__,
103 2
                SqsEnvelope::class,
104
                get_class($envelope)
105
            ));
106
        }
107
108 1
        $queueUrl = $this->getQueueUrl($queueName);
109
110 1
        $this->client->deleteMessage([
111
            'QueueUrl'      => $queueUrl,
112
            'ReceiptHandle' => $envelope->getReceiptHandle(),
113
        ]);
114
    }
115
116 1
    /**
117
     * @inheritDoc
118 1
     */
119
    public function retry(string $queueName, Envelope $envelope) : Envelope
120
    {
121
        return $envelope->retry();
122
    }
123
124
    /**
125
     * @inheritDoc
126 8
     */
127
    public function fail(string $queueName, Envelope $envelope)
128 8
    {
129 6
        return $this->ack($queueName, $envelope);
130
    }
131
132 2
    /**
133
     * Returns queue url
134 2
     *
135 1
     * @param  string $queueName The name of the queue
136
     *
137
     * @return string            The queue url
138 1
     */
139
    public function getQueueUrl($queueName)
140
    {
141
        if (array_key_exists($queueName, $this->queueUrls)) {
142
            return $this->queueUrls[$queueName];
143
        }
144
145
        $result = $this->client->getQueueUrl(['QueueName' => $queueName]);
146
147
        if ($result && $queueUrl = $result['QueueUrl']) {
148
            return $this->queueUrls[$queueName] = $queueUrl;
149
        }
150
151
        throw new \InvalidArgumentException("Queue url for queue {$queueName} not found");
152
    }
153
154
    /**
155
     * Release a message back to a ready state. This is used by the consumer
156
     * when it skips the retry system. This may happen if the consumer receives
157
     * a signal and has to exit early.
158
     *
159
     * @param $queueName The queue from which the message came
160
     * @param $envelope The message to release, should be the same instance
161
     *        returned from `dequeue`
162
     *
163
     * @throws Exception\DriverError if something goes wrong
164
     * @return void
165
     */
166
    public function release(string $queueName, Envelope $envelope)
167
    {
168
        return $this->ack($queueName, $envelope);
169
    }
170
}
171