SqsFifoQueue   A
last analyzed

Complexity

Total Complexity 18

Size/Duplication

Total Lines 219
Duplicated Lines 0 %

Test Coverage

Coverage 100%

Importance

Changes 6
Bugs 1 Features 1
Metric Value
eloc 48
c 6
b 1
f 1
dl 0
loc 219
ccs 56
cts 56
cp 1
rs 10
wmc 18

8 Methods

Rating   Name   Duplication   Size   Complexity  
A getMeta() 0 5 1
A __construct() 0 13 1
A getMetaPayload() 0 21 6
A getDeduplicationId() 0 19 4
A createPayloadArray() 0 5 1
A setSqs() 0 5 1
A later() 0 7 2
A pushRaw() 0 13 2
1
<?php
2
3
namespace ShiftOneLabs\LaravelSqsFifoQueue;
4
5
use Aws\Sqs\SqsClient;
6
use BadMethodCallException;
7
use Illuminate\Support\Arr;
8
use InvalidArgumentException;
9
use Illuminate\Queue\SqsQueue;
10
use Illuminate\Mail\SendQueuedMailable;
11
use Illuminate\Notifications\SendQueuedNotifications;
12
use ShiftOneLabs\LaravelSqsFifoQueue\Contracts\Queue\Deduplicator;
13
14
class SqsFifoQueue extends SqsQueue
15
{
16
    /**
17
     * The queue name suffix.
18
     *
19
     * This property was made protected in Laravel 10x. The redefinition
20
     * here can be removed when support for < Laravel 10x is dropped.
21
     *
22
     * @var string
23
     */
24
    protected $suffix;
25
26
    /**
27
     * The message group id of the fifo pipe in the queue.
28
     *
29
     * @var string
30
     */
31
    protected $group;
32
33
    /**
34
     * The driver to generate the deduplication id for the message.
35
     *
36
     * @var string
37
     */
38
    protected $deduplicator;
39
40
    /**
41
     * The flag to check if this queue is setup for delay.
42
     *
43
     * @var bool
44
     */
45
    protected $allowDelay;
46
47
    /**
48
     * Create a new Amazon SQS queue instance.
49
     *
50
     * @param  \Aws\Sqs\SqsClient  $sqs
51
     * @param  string  $default
52
     * @param  string  $prefix
53
     * @param  string  $suffix
54
     * @param  bool  $dispatchAfterCommit
55
     * @param  string  $group
56
     * @param  string  $deduplicator
57
     * @param  bool  $allowDelay
58
     *
59
     * @return void
60
     */
61 405
    public function __construct(SqsClient $sqs, $default, $prefix = '', $suffix = '', $dispatchAfterCommit = false, $group = '', $deduplicator = '', $allowDelay = false)
62
    {
63 405
        parent::__construct($sqs, $default, $prefix, $suffix, $dispatchAfterCommit);
64
65
        /**
66
         * The suffix property on SqsQueue was not made protected until Laravel 10x.
67
         * Since it is private on the parent class, the parent constructor will
68
         * not set the property on this class, so we must do it manually.
69
         */
70 405
        $this->suffix = $suffix;
71 405
        $this->group = $group;
72 405
        $this->deduplicator = $deduplicator;
73 405
        $this->allowDelay = $allowDelay;
74
    }
75
76
    /**
77
     * Set the underlying SQS instance.
78
     *
79
     * @param  \Aws\Sqs\SqsClient  $sqs
80
     *
81
     * @return \ShiftOneLabs\LaravelSqsFifoQueue\SqsFifoQueue
82
     */
83 27
    public function setSqs(SqsClient $sqs)
84
    {
85 27
        $this->sqs = $sqs;
86
87 27
        return $this;
88
    }
89
90
    /**
91
     * Push a raw payload onto the queue.
92
     *
93
     * @param  string  $payload
94
     * @param  string|null  $queue
95
     * @param  array  $options
96
     *
97
     * @return mixed
98
     */
99 189
    public function pushRaw($payload, $queue = null, array $options = [])
100
    {
101 189
        $message = [
102 189
            'QueueUrl' => $this->getQueue($queue), 'MessageBody' => $payload, 'MessageGroupId' => strval($this->getMeta($payload, 'group', $this->group)),
103 189
        ];
104
105 189
        if (($deduplication = $this->getDeduplicationId($payload, $queue)) !== false) {
106 90
            $message['MessageDeduplicationId'] = strval($deduplication);
107
        }
108
109 171
        $response = $this->sqs->sendMessage($message);
110
111 171
        return $response->get('MessageId');
112
    }
113
114
    /**
115
     * Push a new job onto the queue after (n) seconds.
116
     *
117
     * SQS FIFO queues do not allow per-message delays, but the queue itself
118
     * can be configured to delay the message. If this queue is setup for
119
     * delayed messages, push the job to the queue instead of throwing.
120
     *
121
     * @param  \DateTime|int  $delay
122
     * @param  string  $job
123
     * @param  mixed  $data
124
     * @param  string|null  $queue
125
     *
126
     * @return mixed
127
     *
128
     * @throws BadMethodCallException
129
     */
130 18
    public function later($delay, $job, $data = '', $queue = null)
131
    {
132 18
        if ($this->allowDelay) {
133 9
            return $this->push($job, $data, $queue);
134
        }
135
136 9
        throw new BadMethodCallException('FIFO queues do not support per-message delays.');
137
    }
138
139
    /**
140
     * Get the deduplication id for the given driver.
141
     *
142
     * @param  string  $payload
143
     * @param  string  $queue
144
     *
145
     * @return string|bool
146
     *
147
     * @throws InvalidArgumentException
148
     */
149 189
    protected function getDeduplicationId($payload, $queue)
150
    {
151 189
        $driver = $this->getMeta($payload, 'deduplicator', $this->deduplicator);
152
153 189
        if (empty($driver)) {
154 72
            return false;
155
        }
156
157 117
        if ($this->container->bound($key = 'queue.sqs-fifo.deduplicator.'.$driver)) {
158 108
            $deduplicator = $this->container->make($key);
159
160 108
            if ($deduplicator instanceof Deduplicator) {
161 99
                return $deduplicator->generate($payload, $queue);
162
            }
163
164 9
            throw new InvalidArgumentException(sprintf('Deduplication method [%s] must resolve to a %s implementation.', $driver, Deduplicator::class));
165
        }
166
167 9
        throw new InvalidArgumentException(sprintf('Unsupported deduplication method [%s].', $driver));
168
    }
169
170
    /**
171
     * Create a payload array from the given job and data.
172
     *
173
     * @param  string|object  $job
174
     * @param  string  $queue
175
     * @param  mixed  $data
176
     *
177
     * @return array
178
     */
179 99
    protected function createPayloadArray($job, $queue, $data = '')
180
    {
181 99
        return array_merge(
182 99
            parent::createPayloadArray($job, $queue, $data),
183 99
            $this->getMetaPayload($job)
184 99
        );
185
    }
186
187
    /**
188
     * Get the meta data to add to the payload.
189
     *
190
     * @param  mixed  $job
191
     *
192
     * @return array
193
     */
194 99
    protected function getMetaPayload($job)
195
    {
196 99
        if (!is_object($job)) {
197 27
            return [];
198
        }
199
200 72
        if ($job instanceof SendQueuedNotifications) {
201 18
            $queueable = $job->notification;
202 54
        } elseif ($job instanceof SendQueuedMailable) {
203 18
            $queueable = $job->mailable;
204
        } else {
205 36
            $queueable = $job;
206
        }
207
208 72
        return array_filter(
209 72
            [
210 72
                'group' => isset($queueable->messageGroupId) ? $queueable->messageGroupId : null,
211 72
                'deduplicator' => isset($queueable->deduplicator) ? $queueable->deduplicator : null,
212 72
            ],
213 72
            function ($value) {
214 72
                return $value !== null;
215 72
            }
216 72
        );
217
    }
218
219
    /**
220
     * Get additional meta from a payload string.
221
     *
222
     * @param  string  $payload
223
     * @param  string  $key
224
     * @param  mixed  $default
225
     *
226
     * @return mixed
227
     */
228 189
    protected function getMeta($payload, $key, $default = null)
229
    {
230 189
        $payload = json_decode($payload, true);
231
232 189
        return Arr::get($payload, $key, $default);
233
    }
234
}
235