Completed
Push — master ( 573964...a4f217 )
by Patrick
25:18
created

SqsFifoQueue::setSqs()   A

Complexity

Conditions 1
Paths 1

Size

Total Lines 5
Code Lines 2

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 3
CRAP Score 1

Importance

Changes 1
Bugs 0 Features 0
Metric Value
cc 1
eloc 2
c 1
b 0
f 0
nc 1
nop 1
dl 0
loc 5
ccs 3
cts 3
cp 1
crap 1
rs 10
1
<?php
2
3
namespace ShiftOneLabs\LaravelSqsFifoQueue;
4
5
use LogicException;
6
use Aws\Sqs\SqsClient;
7
use BadMethodCallException;
8
use InvalidArgumentException;
9
use Illuminate\Queue\SqsQueue;
10
use Illuminate\Queue\CallQueuedHandler;
11
use ShiftOneLabs\LaravelSqsFifoQueue\Support\Arr;
12
use ShiftOneLabs\LaravelSqsFifoQueue\Contracts\Queue\Deduplicator;
13
14
class SqsFifoQueue extends SqsQueue
15
{
16
    /**
17
     * The message group id of the fifo pipe in the queue.
18
     *
19
     * @var string
20
     */
21
    protected $group;
22
23
    /**
24
     * The driver to generate the deduplication id for the message.
25
     *
26
     * @var string
27
     */
28
    protected $deduplicator;
29
30
    /**
31
     * Create a new Amazon SQS queue instance.
32
     *
33
     * @param  \Aws\Sqs\SqsClient  $sqs
34
     * @param  string  $default
35
     * @param  string  $prefix
36
     * @param  string  $group
37
     * @param  string  $deduplicator
38
     *
39
     * @return void
40
     */
41 1474
    public function __construct(SqsClient $sqs, $default, $prefix = '', $group = '', $deduplicator = '')
42
    {
43 1474
        parent::__construct($sqs, $default, $prefix);
44
45 1474
        $this->group = $group;
46 1474
        $this->deduplicator = $deduplicator;
47 1474
    }
48
49
    /**
50
     * Set the underlying SQS instance.
51
     *
52
     * @param  \Aws\Sqs\SqsClient  $sqs
53
     *
54
     * @return \ShiftOneLabs\LaravelSqsFifoQueue\SqsFifoQueue
55
     */
56 201
    public function setSqs(SqsClient $sqs)
57
    {
58 201
        $this->sqs = $sqs;
59
60 201
        return $this;
61
    }
62
63
    /**
64
     * Push a raw payload onto the queue.
65
     *
66
     * @param  string  $payload
67
     * @param  string|null  $queue
68
     * @param  array  $options
69
     *
70
     * @return mixed
71
     */
72 1273
    public function pushRaw($payload, $queue = null, array $options = [])
73
    {
74
        $message = [
75 1273
            'QueueUrl' => $this->getQueue($queue), 'MessageBody' => $payload, 'MessageGroupId' => $this->getMeta($payload, 'group', $this->group),
76 228
        ];
77
78 1273
        if (($deduplication = $this->getDeduplicationId($payload, $queue)) !== false) {
79 737
            $message['MessageDeduplicationId'] = $deduplication;
80 141
        }
81
82 1139
        $response = $this->sqs->sendMessage($message);
83
84 1139
        return $response->get('MessageId');
85
    }
86
87
    /**
88
     * Push a new job onto the queue after a delay.
89
     *
90
     * @param  \DateTime|int  $delay
91
     * @param  string  $job
92
     * @param  mixed  $data
93
     * @param  string|null  $queue
94
     *
95
     * @return mixed
96
     *
97
     * @throws BadMethodCallException
98
     */
99 67
    public function later($delay, $job, $data = '', $queue = null)
100
    {
101 67
        throw new BadMethodCallException('FIFO queues do not support per-message delays.');
102
    }
103
104
    /**
105
     * Get the deduplication id for the given driver.
106
     *
107
     * @param  string  $payload
108
     * @param  string  $queue
109
     *
110
     * @return string|bool
111
     *
112
     * @throws InvalidArgumentException
113
     */
114 1273
    protected function getDeduplicationId($payload, $queue)
115
    {
116 1273
        $driver = $this->getMeta($payload, 'deduplicator', $this->deduplicator);
117
118 1273
        if (empty($driver)) {
119 335
            return false;
120
        }
121
122 938
        if ($this->container->bound($key = 'queue.sqs-fifo.deduplicator.'.$driver)) {
123 871
            $deduplicator = $this->container->make($key);
124
125 871
            if ($deduplicator instanceof Deduplicator) {
126 804
                return $deduplicator->generate($payload, $queue);
127
            }
128
129 67
            throw new InvalidArgumentException(sprintf('Deduplication method [%s] must resolve to a %s implementation.', $driver, Deduplicator::class));
130
        }
131
132 67
        throw new InvalidArgumentException(sprintf('Unsupported deduplication method [%s].', $driver));
133
    }
134
135
    /**
136
     * Create a payload string from the given job and data.
137
     *
138
     * @param  mixed  $job
139
     * @param  mixed  $data
140
     * @param  string|null  $queue
141
     *
142
     * @return string
143
     *
144
     * @throws \LogicException
145
     * @throws \InvalidArgumentException
146
     * @throws \Illuminate\Queue\InvalidPayloadException
147
     */
148 737
    protected function createPayload($job, $data = '', $queue = null)
149
    {
150 737
        $payload = parent::createPayload($job, $data, $queue);
151
152 737
        if (!is_object($job)) {
153 335
            return $payload;
154
        }
155
156
        // Laravel 5.4 reworked payload generate. If the parent class has
157
        // the `createPayloadArray` method, it has already been called
158
        // through the parent call to the "createPayload" method.
159 402
        if (method_exists(get_parent_class($this), 'createPayloadArray')) {
160 156
            return $payload;
161
        }
162
163
        // Laravel < 5.0 doesn't support pushing job instances onto the queue.
164
        // We must regenerate the payload using just the class name, instead
165
        // of the job instance, so the queue worker can handle the job.
166 246
        if (!class_exists(CallQueuedHandler::class)) {
167 90
            $payload = parent::createPayload(get_class($job), $data, $queue);
168 30
        }
169
170
        // Laravel <= 5.3 has the `setMeta` method. This is the method
171
        // used to add meta data to the payload generated by the
172
        // parent call to `createPayload` above.
173 246
        if (method_exists($this, 'setMeta')) {
174 246
            return $this->appendPayload($payload, $job);
175
        }
176
177
        // If neither of the above methods exist, we must be on a version
178
        // of Laravel that is not currently supported.
179
        throw new LogicException('"createPayloadArray" and "setMeta" methods both missing. This version of Laravel not supported.');
180
    }
181
182
    /**
183
     * Append meta data to the payload for Laravel <= 5.3.
184
     *
185
     * @param  string  $payload
186
     * @param  mixed  $job
187
     *
188
     * @return string
189
     */
190 246
    protected function appendPayload($payload, $job)
191
    {
192 246
        $meta = $this->getMetaPayload($job);
193
194 246
        if (array_key_exists('group', $meta)) {
195 123
            $payload = $this->setMeta($payload, 'group', $meta['group']);
0 ignored issues
show
Bug introduced by
The method setMeta() does not exist on ShiftOneLabs\LaravelSqsFifoQueue\SqsFifoQueue. ( Ignorable by Annotation )

If this is a false-positive, you can also ignore this issue in your code via the ignore-call  annotation

195
            /** @scrutinizer ignore-call */ 
196
            $payload = $this->setMeta($payload, 'group', $meta['group']);

This check looks for calls to methods that do not seem to exist on a given type. It looks for the method on the type itself as well as in inherited classes or implemented interfaces.

This is most likely a typographical error or the method has been renamed.

Loading history...
196 33
        }
197
198 246
        if (array_key_exists('deduplicator', $meta)) {
199 82
            $payload = $this->setMeta($payload, 'deduplicator', $meta['deduplicator']);
200 22
        }
201
202 246
        return $payload;
203
    }
204
205
    /**
206
     * Create a payload array from the given job and data.
207
     *
208
     * @param  mixed  $job
209
     * @param  mixed  $data
210
     * @param  string|null  $queue
211
     *
212
     * @return array
213
     */
214 286
    protected function createPayloadArray($job, $data = '', $queue = null)
215
    {
216 286
        return array_merge(
217 286
            parent::createPayloadArray($job, $data, $queue),
218 286
            $this->getMetaPayload($job)
219 11
        );
220
    }
221
222
    /**
223
     * Get the meta data to add to the payload.
224
     *
225
     * @param  mixed  $job
226
     *
227
     * @return array
228
     */
229 532
    protected function getMetaPayload($job)
230
    {
231 532
        if (!is_object($job)) {
232 130
            return [];
233
        }
234
235 402
        return array_filter(
236
            [
237 402
                'group' => isset($job->messageGroupId) ? $job->messageGroupId : null,
238 402
                'deduplicator' => isset($job->deduplicator) ? $job->deduplicator : null,
239 72
            ],
240 402
            function ($value) {
241 402
                return $value !== null;
242 330
            }
243 72
        );
244
    }
245
246
    /**
247
     * Get additional meta from a payload string.
248
     *
249
     * @param  string  $payload
250
     * @param  string  $key
251
     * @param  mixed  $default
252
     *
253
     * @return mixed
254
     */
255 1273
    protected function getMeta($payload, $key, $default = null)
256
    {
257 1273
        $payload = json_decode($payload, true);
258
259 1273
        return Arr::get($payload, $key, $default);
260
    }
261
}
262