Passed
Push — master ( 6b59e9...433163 )
by Patrick
25:50
created

SqsFifoQueue::getRestrictedValue()   A

Complexity

Conditions 1
Paths 1

Size

Total Lines 6
Code Lines 3

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 4
CRAP Score 1

Importance

Changes 1
Bugs 0 Features 0
Metric Value
cc 1
eloc 3
c 1
b 0
f 0
nc 1
nop 2
dl 0
loc 6
ccs 4
cts 4
cp 1
crap 1
rs 10
1
<?php
2
3
namespace ShiftOneLabs\LaravelSqsFifoQueue;
4
5
use LogicException;
6
use Aws\Sqs\SqsClient;
7
use ReflectionProperty;
8
use BadMethodCallException;
9
use InvalidArgumentException;
10
use Illuminate\Queue\SqsQueue;
11
use Illuminate\Mail\SendQueuedMailable;
0 ignored issues
show
Bug introduced by
The type Illuminate\Mail\SendQueuedMailable was not found. Maybe you did not declare it correctly or list all dependencies?

The issue could also be caused by a filter entry in the build configuration. If the path has been excluded in your configuration, e.g. excluded_paths: ["lib/*"], you can move it to the dependency path list as follows:

filter:
    dependency_paths: ["lib/*"]

For further information see https://scrutinizer-ci.com/docs/tools/php/php-scrutinizer/#list-dependency-paths

Loading history...
12
use Illuminate\Queue\CallQueuedHandler;
13
use ShiftOneLabs\LaravelSqsFifoQueue\Support\Arr;
14
use ShiftOneLabs\LaravelSqsFifoQueue\Support\Str;
15
use Illuminate\Notifications\SendQueuedNotifications;
0 ignored issues
show
Bug introduced by
The type Illuminate\Notifications\SendQueuedNotifications was not found. Maybe you did not declare it correctly or list all dependencies?

The issue could also be caused by a filter entry in the build configuration. If the path has been excluded in your configuration, e.g. excluded_paths: ["lib/*"], you can move it to the dependency path list as follows:

filter:
    dependency_paths: ["lib/*"]

For further information see https://scrutinizer-ci.com/docs/tools/php/php-scrutinizer/#list-dependency-paths

Loading history...
16
use ShiftOneLabs\LaravelSqsFifoQueue\Contracts\Queue\Deduplicator;
17
18
class SqsFifoQueue extends SqsQueue
19
{
20
    /**
21
     * The queue name suffix.
22
     *
23
     * @var string
24
     */
25
    protected $suffix;
26
27
    /**
28
     * The message group id of the fifo pipe in the queue.
29
     *
30
     * @var string
31
     */
32
    protected $group;
33
34
    /**
35
     * The driver to generate the deduplication id for the message.
36
     *
37
     * @var string
38
     */
39
    protected $deduplicator;
40
41
    /**
42
     * The flag to check if this queue is setup for delay.
43
     *
44
     * @var bool
45
     */
46
    protected $allowDelay;
47
48
    /**
49
     * Create a new Amazon SQS queue instance.
50
     *
51
     * @param  \Aws\Sqs\SqsClient  $sqs
52
     * @param  string  $default
53
     * @param  string  $prefix
54
     * @param  string  $suffix
55
     * @param  string  $group
56
     * @param  string  $deduplicator
57
     * @param  bool  $allowDelay
58
     *
59
     * @return void
60
     */
61 2940
    public function __construct(SqsClient $sqs, $default, $prefix = '', $suffix = '', $group = '', $deduplicator = '', $allowDelay = false)
62
    {
63 2940
        parent::__construct($sqs, $default, $prefix);
64
65 2940
        $this->suffix = $suffix;
66 2940
        $this->group = $group;
67 2940
        $this->deduplicator = $deduplicator;
68 2940
        $this->allowDelay = $allowDelay;
69 2940
    }
70
71
    /**
72
     * Set the underlying SQS instance.
73
     *
74
     * @param  \Aws\Sqs\SqsClient  $sqs
75
     *
76
     * @return \ShiftOneLabs\LaravelSqsFifoQueue\SqsFifoQueue
77
     */
78 234
    public function setSqs(SqsClient $sqs)
79
    {
80 234
        $this->sqs = $sqs;
81
82 210
        return $this;
83
    }
84
85
    /**
86
     * Push a raw payload onto the queue.
87
     *
88
     * @param  string  $payload
89
     * @param  string|null  $queue
90
     * @param  array  $options
91
     *
92
     * @return mixed
93
     */
94 1540
    public function pushRaw($payload, $queue = null, array $options = [])
95
    {
96
        $message = [
97 1540
            'QueueUrl' => $this->getQueue($queue), 'MessageBody' => $payload, 'MessageGroupId' => $this->getMeta($payload, 'group', $this->group),
98 248
        ];
99
100 1540
        if (($deduplication = $this->getDeduplicationId($payload, $queue)) !== false) {
101 840
            $message['MessageDeduplicationId'] = $deduplication;
102 136
        }
103
104 1400
        $response = $this->sqs->sendMessage($message);
105
106 1400
        return $response->get('MessageId');
107
    }
108
109
    /**
110
     * Push a new job onto the queue after a delay.
111
     *
112
     * SQS FIFO queues do not allow per-message delays, but the queue itself
113
     * can be configured to delay the message. If this queue is setup for
114
     * delayed messages, push the job to the queue instead of throwing.
115
     *
116
     * @param  \DateTime|int  $delay
117
     * @param  string  $job
118
     * @param  mixed  $data
119
     * @param  string|null  $queue
120
     *
121
     * @return mixed
122
     *
123
     * @throws BadMethodCallException
124
     */
125 140
    public function later($delay, $job, $data = '', $queue = null)
126
    {
127 140
        if ($this->allowDelay) {
128 70
            return $this->push($job, $data, $queue);
129
        }
130
131 70
        throw new BadMethodCallException('FIFO queues do not support per-message delays.');
132
    }
133
134
    /**
135
     * Get the queue or return the default.
136
     *
137
     * Laravel 7.x added support for a suffix, mainly to support Laravel Vapor.
138
     * Since SQS FIFO queues must end in ".fifo", supporting a suffix config
139
     * on these queues must be customized to work with the existing suffix.
140
     *
141
     * Additionally, this will provide support for the suffix config for older
142
     * versions of Laravel, in case anyone wants to use it.
143
     *
144
     * @param  string|null  $queue
145
     *
146
     * @return string
147
     */
148 1610
    public function getQueue($queue)
149
    {
150 1610
        $queue = $queue ?: $this->default;
151
152
        // Prefix support was not added until Laravel 5.1. Don't support a
153
        // suffix on versions that don't even support a prefix.
154 1610
        if (!property_exists($this, 'prefix')) {
155 441
            return $queue;
156
        }
157
158
        // Strip off the .fifo suffix to prepare for the config suffix.
159 1169
        $queue = Str::beforeLast($queue, '.fifo');
160
161
        // Modify the queue name as needed and re-add the ".fifo" suffix.
162 1169
        return (filter_var($queue, FILTER_VALIDATE_URL) === false
163 1169
            ? rtrim($this->prefix, '/').'/'.Str::finish($queue, $this->suffix)
164 1169
            : $queue).'.fifo';
165
    }
166
167
    /**
168
     * Get the deduplication id for the given driver.
169
     *
170
     * @param  string  $payload
171
     * @param  string  $queue
172
     *
173
     * @return string|bool
174
     *
175
     * @throws InvalidArgumentException
176
     */
177 1540
    protected function getDeduplicationId($payload, $queue)
178
    {
179 1540
        $driver = $this->getMeta($payload, 'deduplicator', $this->deduplicator);
180
181 1540
        if (empty($driver)) {
182 490
            return false;
183
        }
184
185 1050
        if ($this->container->bound($key = 'queue.sqs-fifo.deduplicator.'.$driver)) {
186 980
            $deduplicator = $this->container->make($key);
187
188 980
            if ($deduplicator instanceof Deduplicator) {
189 910
                return $deduplicator->generate($payload, $queue);
190
            }
191
192 70
            throw new InvalidArgumentException(sprintf('Deduplication method [%s] must resolve to a %s implementation.', $driver, Deduplicator::class));
193
        }
194
195 70
        throw new InvalidArgumentException(sprintf('Unsupported deduplication method [%s].', $driver));
196
    }
197
198
    /**
199
     * Create a payload string from the given job and data.
200
     *
201
     * @param  mixed  $job
202
     * @param  mixed  $data
203
     * @param  string|null  $queue
204
     *
205
     * @return string
206
     *
207
     * @throws \LogicException
208
     * @throws \InvalidArgumentException
209
     * @throws \Illuminate\Queue\InvalidPayloadException
210
     */
211 980
    protected function createPayload($job, $data = '', $queue = null)
212
    {
213 980
        $payload = parent::createPayload($job, $data, $queue);
214
215 980
        if (!is_object($job)) {
216 420
            return $payload;
217
        }
218
219
        // Laravel 5.4 reworked payload generate. If the parent class has
220
        // the `createPayloadArray` method, it has already been called
221
        // through the parent call to the "createPayload" method.
222 560
        if (method_exists(get_parent_class($this), 'createPayloadArray')) {
223 290
            return $payload;
224
        }
225
226
        // Laravel < 5.0 doesn't support pushing job instances onto the queue.
227
        // We must regenerate the payload using just the class name, instead
228
        // of the job instance, so the queue worker can handle the job.
229 270
        if (!class_exists(CallQueuedHandler::class)) {
230 84
            $payload = parent::createPayload(get_class($job), $data, $queue);
231 24
        }
232
233
        // Laravel <= 5.3 has the `setMeta` method. This is the method
234
        // used to add meta data to the payload generated by the
235
        // parent call to `createPayload` above.
236 270
        if (method_exists($this, 'setMeta')) {
237 270
            return $this->appendPayload($payload, $job);
238
        }
239
240
        // If neither of the above methods exist, we must be on a version
241
        // of Laravel that is not currently supported.
242
        throw new LogicException('"createPayloadArray" and "setMeta" methods both missing. This version of Laravel not supported.');
243
    }
244
245
    /**
246
     * Append meta data to the payload for Laravel <= 5.3.
247
     *
248
     * @param  string  $payload
249
     * @param  mixed  $job
250
     *
251
     * @return string
252
     */
253 270
    protected function appendPayload($payload, $job)
254
    {
255 270
        $meta = $this->getMetaPayload($job);
256
257 270
        if (array_key_exists('group', $meta)) {
258 135
            $payload = $this->setMeta($payload, 'group', $meta['group']);
0 ignored issues
show
Bug introduced by
The method setMeta() does not exist on ShiftOneLabs\LaravelSqsFifoQueue\SqsFifoQueue. ( Ignorable by Annotation )

If this is a false-positive, you can also ignore this issue in your code via the ignore-call  annotation

258
            /** @scrutinizer ignore-call */ 
259
            $payload = $this->setMeta($payload, 'group', $meta['group']);

This check looks for calls to methods that do not seem to exist on a given type. It looks for the method on the type itself as well as in inherited classes or implemented interfaces.

This is most likely a typographical error or the method has been renamed.

Loading history...
259 35
        }
260
261 270
        if (array_key_exists('deduplicator', $meta)) {
262 94
            $payload = $this->setMeta($payload, 'deduplicator', $meta['deduplicator']);
263 24
        }
264
265 270
        return $payload;
266
    }
267
268
    /**
269
     * Create a payload array from the given job and data.
270
     *
271
     * @param  mixed  $job
272
     * @param  mixed  $data
273
     * @param  string|null  $queue
274
     *
275
     * @return array
276
     */
277 464
    protected function createPayloadArray($job, $data = '', $queue = null)
278
    {
279 464
        return array_merge(
280 464
            parent::createPayloadArray($job, $data, $queue),
281 464
            $this->getMetaPayload($job)
282 16
        );
283
    }
284
285
    /**
286
     * Get the meta data to add to the payload.
287
     *
288
     * @param  mixed  $job
289
     *
290
     * @return array
291
     */
292 734
    protected function getMetaPayload($job)
293
    {
294 734
        if (!is_object($job)) {
295 174
            return [];
296
        }
297
298 560
        if ($job instanceof SendQueuedNotifications) {
299
            // The notification property was not made public until 5.4.12. To
300
            // support 5.3.0 - 5.4.11, we use reflection.
301 70
            $queueable = $this->getRestrictedValue($job, 'notification');
302 494
        } elseif ($job instanceof SendQueuedMailable) {
303
            // The mailable property was not made public until 5.4.12. To
304
            // support 5.3.0 - 5.4.11, we use reflection.
305 70
            $queueable = $this->getRestrictedValue($job, 'mailable');
306 4
        } else {
307 420
            $queueable = $job;
308
        }
309
310 560
        return array_filter(
311
            [
312 560
                'group' => isset($queueable->messageGroupId) ? $queueable->messageGroupId : null,
313 560
                'deduplicator' => isset($queueable->deduplicator) ? $queueable->deduplicator : null,
314 80
            ],
315 560
            function ($value) {
316 560
                return $value !== null;
317 480
            }
318 80
        );
319
    }
320
321
    /**
322
     * Get additional meta from a payload string.
323
     *
324
     * @param  string  $payload
325
     * @param  string  $key
326
     * @param  mixed  $default
327
     *
328
     * @return mixed
329
     */
330 1540
    protected function getMeta($payload, $key, $default = null)
331
    {
332 1540
        $payload = json_decode($payload, true);
333
334 1540
        return Arr::get($payload, $key, $default);
335
    }
336
337
    /**
338
     * Use reflection to get the value of a restricted (private/protected)
339
     * property on an object.
340
     *
341
     * @param  object  $object
342
     * @param  string  $property
343
     *
344
     * @return mixed
345
     */
346 140
    protected function getRestrictedValue($object, $property)
347
    {
348 140
        $reflectionProperty = new ReflectionProperty($object, $property);
349 140
        $reflectionProperty->setAccessible(true);
350
351 140
        return $reflectionProperty->getValue($object);
352
    }
353
}
354