Passed
Push — staging ( 95ce59...f4a45a )
by Patrick
02:34
created

SqsFifoQueue::createPayload()   A

Complexity

Conditions 5
Paths 6

Size

Total Lines 32
Code Lines 10

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 11
CRAP Score 5.0144

Importance

Changes 1
Bugs 0 Features 0
Metric Value
cc 5
eloc 10
c 1
b 0
f 0
nc 6
nop 3
dl 0
loc 32
ccs 11
cts 12
cp 0.9167
crap 5.0144
rs 9.6111

1 Method

Rating   Name   Duplication   Size   Complexity  
A SqsFifoQueue::getMeta() 0 5 1
1
<?php
2
3
namespace ShiftOneLabs\LaravelSqsFifoQueue;
4
5
use Aws\Sqs\SqsClient;
6
use BadMethodCallException;
7
use Illuminate\Support\Arr;
8
use InvalidArgumentException;
9
use Illuminate\Queue\SqsQueue;
10
use Illuminate\Mail\SendQueuedMailable;
11
use Illuminate\Notifications\SendQueuedNotifications;
12
use ShiftOneLabs\LaravelSqsFifoQueue\Contracts\Queue\Deduplicator;
13
14
class SqsFifoQueue extends SqsQueue
15
{
16
    /**
17
     * The queue name suffix.
18
     *
19
     * This property was made protected in Laravel 10x. The redefinition
20
     * here can be removed when support for < Laravel 10x is dropped.
21
     *
22
     * @var string
23
     */
24
    protected $suffix;
25
26
    /**
27
     * The message group id of the fifo pipe in the queue.
28
     *
29
     * @var string
30
     */
31
    protected $group;
32
33
    /**
34
     * The driver to generate the deduplication id for the message.
35
     *
36
     * @var string
37
     */
38
    protected $deduplicator;
39
40
    /**
41
     * The flag to check if this queue is setup for delay.
42
     *
43
     * @var bool
44
     */
45
    protected $allowDelay;
46
47
    /**
48
     * Create a new Amazon SQS queue instance.
49
     *
50
     * @param  \Aws\Sqs\SqsClient  $sqs
51
     * @param  string  $default
52
     * @param  string  $prefix
53
     * @param  string  $suffix
54
     * @param  bool  $dispatchAfterCommit
55
     * @param  string  $group
56
     * @param  string  $deduplicator
57
     * @param  bool  $allowDelay
58
     *
59
     * @return void
60
     */
61 2740
    public function __construct(SqsClient $sqs, $default, $prefix = '', $suffix = '', $dispatchAfterCommit = false, $group = '', $deduplicator = '', $allowDelay = false)
62
    {
63 2740
        parent::__construct($sqs, $default, $prefix, $suffix, $dispatchAfterCommit);
64
65 2740
        /**
66 2740
         * The suffix property on SqsQueue was not made protected until Laravel 10x.
67 2740
         * Since it is private on the parent class, the parent constructor will
68 2740
         * not set the property on this class, so we must do it manually.
69 2740
         */
70
        $this->suffix = $suffix;
71
        $this->group = $group;
72
        $this->deduplicator = $deduplicator;
73
        $this->allowDelay = $allowDelay;
74
    }
75
76
    /**
77
     * Set the underlying SQS instance.
78 232
     *
79
     * @param  \Aws\Sqs\SqsClient  $sqs
80 232
     *
81
     * @return \ShiftOneLabs\LaravelSqsFifoQueue\SqsFifoQueue
82 216
     */
83
    public function setSqs(SqsClient $sqs)
84
    {
85
        $this->sqs = $sqs;
86
87
        return $this;
88
    }
89
90
    /**
91
     * Push a raw payload onto the queue.
92
     *
93
     * @param  string  $payload
94 1300
     * @param  string|null  $queue
95
     * @param  array  $options
96
     *
97 1300
     * @return mixed
98 200
     */
99
    public function pushRaw($payload, $queue = null, array $options = [])
100 1300
    {
101 578
        $message = [
102 88
            'QueueUrl' => $this->getQueue($queue), 'MessageBody' => $payload, 'MessageGroupId' => $this->getMeta($payload, 'group', $this->group),
103
        ];
104 1156
105
        if (($deduplication = $this->getDeduplicationId($payload, $queue)) !== false) {
106 1156
            $message['MessageDeduplicationId'] = $deduplication;
107
        }
108
109
        $response = $this->sqs->sendMessage($message);
110
111
        return $response->get('MessageId');
112
    }
113
114
    /**
115
     * Push a new job onto the queue after (n) seconds.
116
     *
117
     * SQS FIFO queues do not allow per-message delays, but the queue itself
118
     * can be configured to delay the message. If this queue is setup for
119
     * delayed messages, push the job to the queue instead of throwing.
120
     *
121
     * @param  \DateTime|int  $delay
122
     * @param  string  $job
123
     * @param  mixed  $data
124
     * @param  string|null  $queue
125 144
     *
126
     * @return mixed
127 144
     *
128 72
     * @throws BadMethodCallException
129
     */
130
    public function later($delay, $job, $data = '', $queue = null)
131 72
    {
132
        if ($this->allowDelay) {
133
            return $this->push($job, $data, $queue);
134
        }
135
136
        throw new BadMethodCallException('FIFO queues do not support per-message delays.');
137
    }
138
139
    /**
140
     * Get the deduplication id for the given driver.
141
     *
142
     * @param  string  $payload
143
     * @param  string  $queue
144
     *
145
     * @return string|bool
146
     *
147
     * @throws InvalidArgumentException
148 1372
     */
149
    protected function getDeduplicationId($payload, $queue)
150 1372
    {
151
        $driver = $this->getMeta($payload, 'deduplicator', $this->deduplicator);
152
153
        if (empty($driver)) {
154 1372
            return false;
155 357
        }
156
157
        if ($this->container->bound($key = 'queue.sqs-fifo.deduplicator.'.$driver)) {
158
            $deduplicator = $this->container->make($key);
159 1015
160
            if ($deduplicator instanceof Deduplicator) {
161
                return $deduplicator->generate($payload, $queue);
162 1015
            }
163 1015
164 1015
            throw new InvalidArgumentException(sprintf('Deduplication method [%s] must resolve to a %s implementation.', $driver, Deduplicator::class));
165
        }
166
167
        throw new InvalidArgumentException(sprintf('Unsupported deduplication method [%s].', $driver));
168
    }
169
170
    /**
171
     * Create a payload array from the given job and data.
172
     *
173
     * @param  string|object  $job
174
     * @param  string  $queue
175
     * @param  mixed  $data
176
     *
177 1300
     * @return array
178
     */
179 1300
    protected function createPayloadArray($job, $queue, $data = '')
180
    {
181 1300
        return array_merge(
182 506
            parent::createPayloadArray($job, $queue, $data),
183
            $this->getMetaPayload($job)
184
        );
185 794
    }
186 722
187
    /**
188 722
     * Get the meta data to add to the payload.
189 650
     *
190
     * @param  mixed  $job
191
     *
192 72
     * @return array
193
     */
194
    protected function getMetaPayload($job)
195 72
    {
196
        if (!is_object($job)) {
197
            return [];
198
        }
199
200
        if ($job instanceof SendQueuedNotifications) {
201
            $queueable = $job->notification;
202
        } elseif ($job instanceof SendQueuedMailable) {
203
            $queueable = $job->mailable;
204
        } else {
205
            $queueable = $job;
206
        }
207
208
        return array_filter(
209
            [
210
                'group' => isset($queueable->messageGroupId) ? $queueable->messageGroupId : null,
211 724
                'deduplicator' => isset($queueable->deduplicator) ? $queueable->deduplicator : null,
212
            ],
213 724
            function ($value) {
214
                return $value !== null;
215 724
            }
216 216
        );
217
    }
218
219
    /**
220
     * Get additional meta from a payload string.
221
     *
222 508
     * @param  string  $payload
223 279
     * @param  string  $key
224
     * @param  mixed  $default
225
     *
226
     * @return mixed
227
     */
228
    protected function getMeta($payload, $key, $default = null)
229 229
    {
230 70
        $payload = json_decode($payload, true);
231 20
232
        return Arr::get($payload, $key, $default);
233
    }
234
}
235