1
|
|
|
<?php |
2
|
|
|
|
3
|
|
|
namespace ShiftOneLabs\LaravelSqsFifoQueue; |
4
|
|
|
|
5
|
|
|
use LogicException; |
6
|
|
|
use Aws\Sqs\SqsClient; |
7
|
|
|
use BadMethodCallException; |
8
|
|
|
use InvalidArgumentException; |
9
|
|
|
use Illuminate\Queue\SqsQueue; |
10
|
|
|
use ShiftOneLabs\LaravelSqsFifoQueue\Contracts\Queue\Deduplicator; |
11
|
|
|
|
12
|
|
|
class SqsFifoQueue extends SqsQueue |
13
|
|
|
{ |
14
|
|
|
|
15
|
|
|
/** |
16
|
|
|
* The message group id of the fifo pipe in the queue. |
17
|
|
|
* |
18
|
|
|
* @var string |
19
|
|
|
*/ |
20
|
|
|
protected $group; |
21
|
|
|
|
22
|
|
|
/** |
23
|
|
|
* The driver to generate the deduplication id for the message. |
24
|
|
|
* |
25
|
|
|
* @var string |
26
|
|
|
*/ |
27
|
|
|
protected $deduplicator; |
28
|
|
|
|
29
|
|
|
/** |
30
|
|
|
* Create a new Amazon SQS queue instance. |
31
|
|
|
* |
32
|
|
|
* @param \Aws\Sqs\SqsClient $sqs |
33
|
|
|
* @param string $default |
34
|
|
|
* @param string $prefix |
35
|
|
|
* @param string $group |
36
|
|
|
* @param string $deduplicator |
37
|
|
|
* |
38
|
|
|
* @return void |
39
|
|
|
*/ |
40
|
80 |
|
public function __construct(SqsClient $sqs, $default, $prefix = '', $group = '', $deduplicator = '') |
41
|
|
|
{ |
42
|
80 |
|
parent::__construct($sqs, $default, $prefix); |
43
|
|
|
|
44
|
80 |
|
$this->group = $group; |
45
|
80 |
|
$this->deduplicator = $deduplicator; |
46
|
80 |
|
} |
47
|
|
|
|
48
|
|
|
/** |
49
|
|
|
* Set the underlying SQS instance. |
50
|
|
|
* |
51
|
|
|
* @param \Aws\Sqs\SqsClient $sqs |
52
|
|
|
* |
53
|
|
|
* @return \ShiftOneLabs\LaravelSqsFifoQueue\SqsFifoQueue |
54
|
|
|
*/ |
55
|
15 |
|
public function setSqs(SqsClient $sqs) |
56
|
|
|
{ |
57
|
15 |
|
$this->sqs = $sqs; |
58
|
|
|
|
59
|
15 |
|
return $this; |
60
|
|
|
} |
61
|
|
|
|
62
|
|
|
/** |
63
|
|
|
* Push a raw payload onto the queue. |
64
|
|
|
* |
65
|
|
|
* @param string $payload |
66
|
|
|
* @param string|null $queue |
67
|
|
|
* @param array $options |
68
|
|
|
* |
69
|
|
|
* @return mixed |
70
|
|
|
*/ |
71
|
65 |
|
public function pushRaw($payload, $queue = null, array $options = []) |
72
|
|
|
{ |
73
|
|
|
$message = [ |
74
|
65 |
|
'QueueUrl' => $this->getQueue($queue), 'MessageBody' => $payload, 'MessageGroupId' => $this->getMeta($payload, 'group', $this->group), |
75
|
26 |
|
]; |
76
|
|
|
|
77
|
65 |
|
if (($deduplication = $this->getDeduplicationId($payload, $queue)) !== false) { |
78
|
35 |
|
$message['MessageDeduplicationId'] = $deduplication; |
79
|
14 |
|
} |
80
|
|
|
|
81
|
60 |
|
$response = $this->sqs->sendMessage($message); |
82
|
|
|
|
83
|
60 |
|
return $response->get('MessageId'); |
84
|
|
|
} |
85
|
|
|
|
86
|
|
|
/** |
87
|
|
|
* Push a new job onto the queue after a delay. |
88
|
|
|
* |
89
|
|
|
* @param \DateTime|int $delay |
90
|
|
|
* @param string $job |
91
|
|
|
* @param mixed $data |
92
|
|
|
* @param string|null $queue |
93
|
|
|
* |
94
|
|
|
* @return mixed |
95
|
|
|
* |
96
|
|
|
* @throws BadMethodCallException |
97
|
|
|
*/ |
98
|
5 |
|
public function later($delay, $job, $data = '', $queue = null) |
99
|
|
|
{ |
100
|
5 |
|
throw new BadMethodCallException('FIFO queues do not support per-message delays.'); |
101
|
|
|
} |
102
|
|
|
|
103
|
|
|
/** |
104
|
|
|
* Get the deduplication id for the given driver. |
105
|
|
|
* |
106
|
|
|
* @param string $payload |
107
|
|
|
* @param string $queue |
108
|
|
|
* |
109
|
|
|
* @return string|bool |
110
|
|
|
* |
111
|
|
|
* @throws InvalidArgumentException |
112
|
|
|
*/ |
113
|
65 |
|
protected function getDeduplicationId($payload, $queue) |
114
|
|
|
{ |
115
|
65 |
|
$driver = $this->getMeta($payload, 'deduplicator', $this->deduplicator); |
116
|
|
|
|
117
|
65 |
|
if (empty($driver)) { |
118
|
20 |
|
return false; |
119
|
|
|
} |
120
|
|
|
|
121
|
45 |
|
if ($this->container->bound($key = 'queue.sqs-fifo.deduplicator.'.$driver)) { |
122
|
40 |
|
$deduplicator = $this->container->make($key); |
123
|
|
|
|
124
|
40 |
|
if ($deduplicator instanceof Deduplicator) { |
125
|
40 |
|
return $deduplicator->generate($payload, $queue); |
126
|
|
|
} |
127
|
|
|
|
128
|
|
|
throw new InvalidArgumentException(sprintf('Deduplication method [%s] must resolve to a %s implementation.', $driver, Deduplicator::class)); |
129
|
|
|
} |
130
|
|
|
|
131
|
5 |
|
throw new InvalidArgumentException(sprintf('Unsupported deduplication method [%s].', $driver)); |
132
|
|
|
} |
133
|
|
|
|
134
|
|
|
/** |
135
|
|
|
* Create a payload string from the given job and data. |
136
|
|
|
* |
137
|
|
|
* @param mixed $job |
138
|
|
|
* @param mixed $data |
139
|
|
|
* @param string|null $queue |
140
|
|
|
* |
141
|
|
|
* @return string |
142
|
|
|
* |
143
|
|
|
* @throws \LogicException |
144
|
|
|
* @throws \InvalidArgumentException |
145
|
|
|
* @throws \Illuminate\Queue\InvalidPayloadException |
146
|
|
|
*/ |
147
|
30 |
|
protected function createPayload($job, $data = '', $queue = null) |
148
|
|
|
{ |
149
|
30 |
|
$payload = parent::createPayload($job, $data, $queue); |
|
|
|
|
150
|
|
|
|
151
|
30 |
|
if (!is_object($job)) { |
152
|
10 |
|
return $payload; |
153
|
|
|
} |
154
|
|
|
|
155
|
|
|
// Laravel 5.4 reworked payload generate. If the parent class has |
156
|
|
|
// the `createPayloadArray` method, it has already been called |
157
|
|
|
// through the parent call to the "createPayload" method. |
158
|
20 |
|
if (method_exists(get_parent_class($this), 'createPayloadArray')) { |
159
|
16 |
|
return $payload; |
160
|
|
|
} |
161
|
|
|
|
162
|
|
|
// Laravel <= 5.3 has the `setMeta` method. This is the method |
163
|
|
|
// used to add meta data to the payload generated by the |
164
|
|
|
// parent call to `createPayload` above. |
165
|
4 |
|
if (method_exists($this, 'setMeta')) { |
166
|
4 |
|
return $this->appendPayload($payload, $job); |
167
|
|
|
} |
168
|
|
|
|
169
|
|
|
// If neither of the above methods exist, we must be on a version |
170
|
|
|
// of Laravel that is not currently supported. |
171
|
|
|
throw new LogicException('"createPayloadArray" and "setMeta" methods both missing. This version of Laravel not supported.'); |
172
|
|
|
} |
173
|
|
|
|
174
|
|
|
/** |
175
|
|
|
* Append meta data to the payload for Laravel <= 5.3. |
176
|
|
|
* |
177
|
|
|
* @param string $payload |
178
|
|
|
* @param mixed $job |
179
|
|
|
* |
180
|
|
|
* @return string |
181
|
|
|
*/ |
182
|
4 |
|
protected function appendPayload($payload, $job) |
183
|
|
|
{ |
184
|
4 |
|
$meta = $this->getMetaPayload($job); |
185
|
|
|
|
186
|
4 |
|
if (array_key_exists('group', $meta)) { |
187
|
1 |
|
$payload = $this->setMeta($payload, 'group', $meta['group']); |
188
|
1 |
|
} |
189
|
|
|
|
190
|
4 |
|
if (array_key_exists('deduplicator', $meta)) { |
191
|
2 |
|
$payload = $this->setMeta($payload, 'deduplicator', $meta['deduplicator']); |
192
|
2 |
|
} |
193
|
|
|
|
194
|
4 |
|
return $payload; |
195
|
|
|
} |
196
|
|
|
|
197
|
|
|
/** |
198
|
|
|
* Create a payload array from the given job and data. |
199
|
|
|
* |
200
|
|
|
* @param mixed $job |
201
|
|
|
* @param mixed $data |
202
|
|
|
* @param string|null $queue |
203
|
|
|
* |
204
|
|
|
* @return array |
205
|
|
|
*/ |
206
|
24 |
|
protected function createPayloadArray($job, $data = '', $queue = null) |
207
|
|
|
{ |
208
|
24 |
|
return array_merge( |
209
|
24 |
|
parent::createPayloadArray($job, $data, $queue), |
|
|
|
|
210
|
24 |
|
$this->getMetaPayload($job) |
211
|
6 |
|
); |
212
|
|
|
} |
213
|
|
|
|
214
|
|
|
/** |
215
|
|
|
* Get the meta data to add to the payload. |
216
|
|
|
* |
217
|
|
|
* @param mixed $job |
218
|
|
|
* |
219
|
|
|
* @return array |
220
|
|
|
*/ |
221
|
28 |
|
protected function getMetaPayload($job) |
222
|
|
|
{ |
223
|
28 |
|
if (!is_object($job)) { |
224
|
8 |
|
return []; |
225
|
|
|
} |
226
|
|
|
|
227
|
20 |
|
return array_filter( |
228
|
|
|
[ |
229
|
20 |
|
'group' => isset($job->messageGroupId) ? $job->messageGroupId : null, |
230
|
20 |
|
'deduplicator' => isset($job->deduplicator) ? $job->deduplicator : null, |
231
|
8 |
|
], |
232
|
20 |
|
function ($value) { |
233
|
20 |
|
return $value !== null; |
234
|
12 |
|
} |
235
|
8 |
|
); |
236
|
|
|
} |
237
|
|
|
|
238
|
|
|
/** |
239
|
|
|
* Get additional meta from a payload string. |
240
|
|
|
* |
241
|
|
|
* @param string $payload |
242
|
|
|
* @param string $key |
243
|
|
|
* @param mixed $default |
244
|
|
|
* |
245
|
|
|
* @return mixed |
246
|
|
|
*/ |
247
|
65 |
|
protected function getMeta($payload, $key, $default = null) |
248
|
|
|
{ |
249
|
65 |
|
$payload = json_decode($payload, true); |
250
|
|
|
|
251
|
65 |
|
return array_get($payload, $key, $default); |
252
|
|
|
} |
253
|
|
|
} |
254
|
|
|
|
This check compares calls to functions or methods with their respective definitions. If the call has more arguments than are defined, it raises an issue.
If a function is defined several times with a different number of parameters, the check may pick up the wrong definition and report false positives. One codebase where this has been known to happen is Wordpress.
In this case you can add the
@ignore
PhpDoc annotation to the duplicate definition and it will be ignored.