1 | <?php |
||
17 | class SqsFifoQueue extends SqsQueue |
||
18 | { |
||
19 | /** @var array */ |
||
20 | private $options; |
||
21 | |||
22 | /** |
||
23 | * Create a new Amazon SQS queue instance. |
||
24 | * |
||
25 | * @param \Aws\Sqs\SqsClient $sqs |
||
26 | * @param string $default |
||
27 | * @param string $prefix |
||
28 | * @param array $options |
||
29 | */ |
||
30 | public function __construct(SqsClient $sqs, $default, $prefix = '', array $options = []) |
||
31 | { |
||
32 | $this->sqs = $sqs; |
||
33 | $this->prefix = $prefix; |
||
34 | $this->default = $default; |
||
35 | $this->options = $options; |
||
36 | |||
37 | if (Arr::get($this->options, 'polling') !== 'long') { |
||
38 | return; |
||
39 | } |
||
40 | |||
41 | $this->sqs->setQueueAttributes([ |
||
42 | 'Attributes' => [ |
||
43 | 'ReceiveMessageWaitTimeSeconds' => Arr::get($this->options, 'wait_time', 20), |
||
44 | ], |
||
45 | 'QueueUrl' => $this->getQueue($default), |
||
46 | ]); |
||
47 | } |
||
48 | |||
49 | /** |
||
50 | * Push a raw payload onto the queue. |
||
51 | * |
||
52 | * @see http://docs.aws.amazon.com/AWSSimpleQueueService/latest/SQSDeveloperGuide/general-recommendations.html |
||
53 | * @see http://docs.aws.amazon.com/AWSSimpleQueueService/latest/SQSDeveloperGuide/FIFO-queue-recommendations.html |
||
54 | * |
||
55 | * @param string $payload |
||
56 | * @param string $queue |
||
57 | * @param array $options |
||
58 | * @return mixed |
||
59 | */ |
||
60 | public function pushRaw($payload, $queue = null, array $options = []) |
||
61 | { |
||
62 | $messageGroupId = $this->getMessageGroupId(); |
||
63 | $messageDeduplicationId = $this->getMessageDeduplicationId($payload); |
||
64 | |||
65 | $messageId = $this->sqs->sendMessage([ |
||
66 | 'QueueUrl' => $this->getQueue($queue), |
||
67 | 'MessageBody' => $payload, |
||
68 | 'MessageGroupId' => $messageGroupId, |
||
69 | 'MessageDeduplicationId' => $messageDeduplicationId, |
||
70 | ])->get('MessageId'); |
||
71 | |||
72 | return $messageId; |
||
73 | } |
||
74 | |||
75 | /** |
||
76 | * FIFO queues don't support per-message delays, only per-queue delays |
||
77 | * |
||
78 | * @see http://docs.aws.amazon.com/AWSSimpleQueueService/latest/SQSDeveloperGuide/FIFO-queues.html |
||
79 | */ |
||
80 | public function later($delay, $job, $data = '', $queue = null) |
||
81 | { |
||
82 | throw new BadMethodCallException('FIFO queues don\'t support per-message delays, only per-queue delays'); |
||
83 | } |
||
84 | |||
85 | /** |
||
86 | * Pop the next job off of the queue. |
||
87 | * |
||
88 | * @param string $queue |
||
89 | * @return \Illuminate\Contracts\Queue\Job|null |
||
90 | */ |
||
91 | public function pop($queue = null) |
||
92 | { |
||
93 | $response = $this->sqs->receiveMessage([ |
||
94 | 'QueueUrl' => $queue = $this->getQueue($queue), |
||
95 | 'AttributeNames' => ['ApproximateReceiveCount'], |
||
96 | 'MaxNumberOfMessages' => 1, |
||
97 | 'WaitTimeSeconds' => Arr::get($this->options, 'wait_time', 20), |
||
98 | ]); |
||
99 | |||
100 | if (! is_null($response['Messages']) && count($response['Messages']) > 0) { |
||
101 | return new SqsJob( |
||
102 | $this->container, |
||
103 | $this->sqs, |
||
104 | $response['Messages'][0], |
||
105 | $this->connectionName, |
||
106 | $queue, |
||
107 | ); |
||
|
|||
108 | } |
||
109 | } |
||
110 | |||
111 | protected function getMessageGroupId(): string |
||
112 | { |
||
113 | $messageGroupId = session()->getId(); |
||
114 | if (empty($messageGroupId)) { |
||
115 | $messageGroupId = str_random(40); |
||
116 | } |
||
117 | |||
118 | return $messageGroupId; |
||
119 | } |
||
120 | |||
121 | protected function getMessageDeduplicationId(string $payload): string |
||
122 | { |
||
123 | return config('app.debug') ? str_random(40) : sha1($payload); |
||
124 | } |
||
125 | } |
||
126 |