Passed
Push — master ( e00bfb...6b59e9 )
by Patrick
24:00
created

SqsFifoQueue::getQueue()   A

Complexity

Conditions 4
Paths 3

Size

Total Lines 17
Code Lines 7

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 8
CRAP Score 4

Importance

Changes 0
Metric Value
cc 4
eloc 7
c 0
b 0
f 0
nc 3
nop 1
dl 0
loc 17
ccs 8
cts 8
cp 1
crap 4
rs 10
1
<?php
2
3
namespace ShiftOneLabs\LaravelSqsFifoQueue;
4
5
use LogicException;
6
use Aws\Sqs\SqsClient;
7
use BadMethodCallException;
8
use InvalidArgumentException;
9
use Illuminate\Queue\SqsQueue;
10
use Illuminate\Queue\CallQueuedHandler;
11
use ShiftOneLabs\LaravelSqsFifoQueue\Support\Arr;
12
use ShiftOneLabs\LaravelSqsFifoQueue\Support\Str;
13
use ShiftOneLabs\LaravelSqsFifoQueue\Contracts\Queue\Deduplicator;
14
15
class SqsFifoQueue extends SqsQueue
16
{
17
    /**
18
     * The queue name suffix.
19
     *
20
     * @var string
21
     */
22
    protected $suffix;
23
24
    /**
25
     * The message group id of the fifo pipe in the queue.
26
     *
27
     * @var string
28
     */
29
    protected $group;
30
31
    /**
32
     * The driver to generate the deduplication id for the message.
33
     *
34
     * @var string
35
     */
36
    protected $deduplicator;
37
38
    /**
39
     * The flag to check if this queue is setup for delay.
40
     *
41
     * @var bool
42
     */
43
    protected $allowDelay;
44
45
    /**
46
     * Create a new Amazon SQS queue instance.
47
     *
48
     * @param  \Aws\Sqs\SqsClient  $sqs
49
     * @param  string  $default
50
     * @param  string  $prefix
51
     * @param  string  $suffix
52
     * @param  string  $group
53
     * @param  string  $deduplicator
54
     * @param  bool  $allowDelay
55
     *
56
     * @return void
57
     */
58 2800
    public function __construct(SqsClient $sqs, $default, $prefix = '', $suffix = '', $group = '', $deduplicator = '', $allowDelay = false)
59
    {
60 2800
        parent::__construct($sqs, $default, $prefix);
61
62 2800
        $this->suffix = $suffix;
63 2800
        $this->group = $group;
64 2800
        $this->deduplicator = $deduplicator;
65 2800
        $this->allowDelay = $allowDelay;
66 2800
    }
67
68
    /**
69
     * Set the underlying SQS instance.
70
     *
71
     * @param  \Aws\Sqs\SqsClient  $sqs
72
     *
73
     * @return \ShiftOneLabs\LaravelSqsFifoQueue\SqsFifoQueue
74
     */
75 232
    public function setSqs(SqsClient $sqs)
76
    {
77 210
        $this->sqs = $sqs;
78
79 210
        return $this;
80 28
    }
81
82
    /**
83
     * Push a raw payload onto the queue.
84
     *
85
     * @param  string  $payload
86
     * @param  string|null  $queue
87
     * @param  array  $options
88
     *
89
     * @return mixed
90
     */
91 1400
    public function pushRaw($payload, $queue = null, array $options = [])
92
    {
93
        $message = [
94 1400
            'QueueUrl' => $this->getQueue($queue), 'MessageBody' => $payload, 'MessageGroupId' => $this->getMeta($payload, 'group', $this->group),
95 240
        ];
96
97 1400
        if (($deduplication = $this->getDeduplicationId($payload, $queue)) !== false) {
98 770
            $message['MessageDeduplicationId'] = $deduplication;
99 132
        }
100
101 1260
        $response = $this->sqs->sendMessage($message);
102
103 1260
        return $response->get('MessageId');
104
    }
105
106
    /**
107
     * Push a new job onto the queue after a delay.
108
     *
109
     * SQS FIFO queues do not allow per-message delays, but the queue itself
110
     * can be configured to delay the message. If this queue is setup for
111
     * delayed messages, push the job to the queue instead of throwing.
112
     *
113
     * @param  \DateTime|int  $delay
114
     * @param  string  $job
115
     * @param  mixed  $data
116
     * @param  string|null  $queue
117
     *
118
     * @return mixed
119
     *
120
     * @throws BadMethodCallException
121
     */
122 140
    public function later($delay, $job, $data = '', $queue = null)
123
    {
124 140
        if ($this->allowDelay) {
125 70
            return $this->push($job, $data, $queue);
126
        }
127
128 70
        throw new BadMethodCallException('FIFO queues do not support per-message delays.');
129
    }
130
131
    /**
132
     * Get the queue or return the default.
133
     *
134
     * Laravel 7.x added support for a suffix, mainly to support Laravel Vapor.
135
     * Since SQS FIFO queues must end in ".fifo", supporting a suffix config
136
     * on these queues must be customized to work with the existing suffix.
137
     *
138
     * Additionally, this will provide support for the suffix config for older
139
     * versions of Laravel, in case anyone wants to use it.
140
     *
141
     * @param  string|null  $queue
142
     *
143
     * @return string
144
     */
145 1470
    public function getQueue($queue)
146
    {
147 1470
        $queue = $queue ?: $this->default;
148
149
        // Prefix support was not added until Laravel 5.1. Don't support a
150
        // suffix on versions that don't even support a prefix.
151 1470
        if (!property_exists($this, 'prefix')) {
152 441
            return $queue;
153
        }
154
155
        // Strip off the .fifo suffix to prepare for the config suffix.
156 1029
        $queue = Str::beforeLast($queue, '.fifo');
157
158
        // Modify the queue name as needed and re-add the ".fifo" suffix.
159 1029
        return (filter_var($queue, FILTER_VALIDATE_URL) === false
160 1029
            ? rtrim($this->prefix, '/').'/'.Str::finish($queue, $this->suffix)
161 1029
            : $queue).'.fifo';
162
    }
163
164
    /**
165
     * Get the deduplication id for the given driver.
166
     *
167
     * @param  string  $payload
168
     * @param  string  $queue
169
     *
170
     * @return string|bool
171
     *
172
     * @throws InvalidArgumentException
173
     */
174 1400
    protected function getDeduplicationId($payload, $queue)
175
    {
176 1400
        $driver = $this->getMeta($payload, 'deduplicator', $this->deduplicator);
177
178 1400
        if (empty($driver)) {
179 420
            return false;
180
        }
181
182 980
        if ($this->container->bound($key = 'queue.sqs-fifo.deduplicator.'.$driver)) {
183 910
            $deduplicator = $this->container->make($key);
184
185 910
            if ($deduplicator instanceof Deduplicator) {
186 840
                return $deduplicator->generate($payload, $queue);
187
            }
188
189 70
            throw new InvalidArgumentException(sprintf('Deduplication method [%s] must resolve to a %s implementation.', $driver, Deduplicator::class));
190
        }
191
192 70
        throw new InvalidArgumentException(sprintf('Unsupported deduplication method [%s].', $driver));
193
    }
194
195
    /**
196
     * Create a payload string from the given job and data.
197
     *
198
     * @param  mixed  $job
199
     * @param  mixed  $data
200
     * @param  string|null  $queue
201
     *
202
     * @return string
203
     *
204
     * @throws \LogicException
205
     * @throws \InvalidArgumentException
206
     * @throws \Illuminate\Queue\InvalidPayloadException
207
     */
208 840
    protected function createPayload($job, $data = '', $queue = null)
209
    {
210 840
        $payload = parent::createPayload($job, $data, $queue);
211
212 840
        if (!is_object($job)) {
213 420
            return $payload;
214
        }
215
216
        // Laravel 5.4 reworked payload generate. If the parent class has
217
        // the `createPayloadArray` method, it has already been called
218
        // through the parent call to the "createPayload" method.
219 420
        if (method_exists(get_parent_class($this), 'createPayloadArray')) {
220 174
            return $payload;
221
        }
222
223
        // Laravel < 5.0 doesn't support pushing job instances onto the queue.
224
        // We must regenerate the payload using just the class name, instead
225
        // of the job instance, so the queue worker can handle the job.
226 246
        if (!class_exists(CallQueuedHandler::class)) {
227 84
            $payload = parent::createPayload(get_class($job), $data, $queue);
228 24
        }
229
230
        // Laravel <= 5.3 has the `setMeta` method. This is the method
231
        // used to add meta data to the payload generated by the
232
        // parent call to `createPayload` above.
233 246
        if (method_exists($this, 'setMeta')) {
234 246
            return $this->appendPayload($payload, $job);
235
        }
236
237
        // If neither of the above methods exist, we must be on a version
238
        // of Laravel that is not currently supported.
239
        throw new LogicException('"createPayloadArray" and "setMeta" methods both missing. This version of Laravel not supported.');
240
    }
241
242
    /**
243
     * Append meta data to the payload for Laravel <= 5.3.
244
     *
245
     * @param  string  $payload
246
     * @param  mixed  $job
247
     *
248
     * @return string
249
     */
250 246
    protected function appendPayload($payload, $job)
251
    {
252 246
        $meta = $this->getMetaPayload($job);
253
254 246
        if (array_key_exists('group', $meta)) {
255 123
            $payload = $this->setMeta($payload, 'group', $meta['group']);
0 ignored issues
show
Bug introduced by
The method setMeta() does not exist on ShiftOneLabs\LaravelSqsFifoQueue\SqsFifoQueue. ( Ignorable by Annotation )

If this is a false-positive, you can also ignore this issue in your code via the ignore-call  annotation

255
            /** @scrutinizer ignore-call */ 
256
            $payload = $this->setMeta($payload, 'group', $meta['group']);

This check looks for calls to methods that do not seem to exist on a given type. It looks for the method on the type itself as well as in inherited classes or implemented interfaces.

This is most likely a typographical error or the method has been renamed.

Loading history...
256 33
        }
257
258 246
        if (array_key_exists('deduplicator', $meta)) {
259 82
            $payload = $this->setMeta($payload, 'deduplicator', $meta['deduplicator']);
260 22
        }
261
262 246
        return $payload;
263
    }
264
265
    /**
266
     * Create a payload array from the given job and data.
267
     *
268
     * @param  mixed  $job
269
     * @param  mixed  $data
270
     * @param  string|null  $queue
271
     *
272
     * @return array
273
     */
274 348
    protected function createPayloadArray($job, $data = '', $queue = null)
275
    {
276 348
        return array_merge(
277 348
            parent::createPayloadArray($job, $data, $queue),
278 348
            $this->getMetaPayload($job)
279 12
        );
280
    }
281
282
    /**
283
     * Get the meta data to add to the payload.
284
     *
285
     * @param  mixed  $job
286
     *
287
     * @return array
288
     */
289 594
    protected function getMetaPayload($job)
290
    {
291 594
        if (!is_object($job)) {
292 174
            return [];
293
        }
294
295 420
        return array_filter(
296
            [
297 420
                'group' => isset($job->messageGroupId) ? $job->messageGroupId : null,
298 420
                'deduplicator' => isset($job->deduplicator) ? $job->deduplicator : null,
299 72
            ],
300 420
            function ($value) {
301 420
                return $value !== null;
302 348
            }
303 72
        );
304
    }
305
306
    /**
307
     * Get additional meta from a payload string.
308
     *
309
     * @param  string  $payload
310
     * @param  string  $key
311
     * @param  mixed  $default
312
     *
313
     * @return mixed
314
     */
315 1400
    protected function getMeta($payload, $key, $default = null)
316
    {
317 1400
        $payload = json_decode($payload, true);
318
319 1400
        return Arr::get($payload, $key, $default);
320
    }
321
}
322