Sqs::fetchJob()   A
last analyzed

Complexity

Conditions 3
Paths 2

Size

Total Lines 13
Code Lines 9

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 0
CRAP Score 12

Importance

Changes 0
Metric Value
dl 0
loc 13
ccs 0
cts 13
cp 0
rs 9.4285
c 0
b 0
f 0
cc 3
eloc 9
nc 2
nop 0
crap 12
1
<?php
2
3
/**
4
 * @author    Flipbox Factory
5
 * @copyright Copyright (c) 2017, Flipbox Digital
6
 * @link      https://github.com/flipbox/queue/releases/latest
7
 * @license   https://github.com/flipbox/queue/blob/master/LICENSE
8
 */
9
10
namespace flipbox\queue\queues;
11
12
use Aws\Sqs\SqsClient;
13
use flipbox\queue\jobs\JobInterface;
14
use yii\helpers\ArrayHelper;
15
16
/**
17
 * @author Flipbox Factory <[email protected]>
18
 * @since 1.0.0
19
 */
20
class Sqs extends AbstractQueue
21
{
22
23
    /**
24
     * The SQS url.
25
     *
26
     * @var string
27
     */
28
    public $url;
29
30
    /**
31
     * The config for SqsClient.
32
     *
33
     * This will be used for SqsClient::factory($config);
34
     * @var array
35
     */
36
    public $config = [];
37
38
    /**
39
     * Due to ability of the queue message to be visible automatically after
40
     * a certain of time, this is not required.
41
     *
42
     * @inheritdoc
43
     */
44
    public $releaseOnFailure = false;
45
46
    /**
47
     * Stores the SQS client.
48
     * @var \Aws\Sqs\SqsClient
49
     */
50
    private $client;
51
52
    /**
53
     * @inheritdoc
54
     */
55
    public function init()
56
    {
57
        parent::init();
58
        $this->client = new SqsClient($this->config);
59
    }
60
61
    /**
62
     * @inheritdoc
63
     */
64
    public function fetchJob()
65
    {
66
        $message = $this->client->receiveMessage([
67
            'QueueUrl' => $this->url,
68
            'AttributeNames' => ['ApproximateReceiveCount'],
69
            'MaxNumberOfMessages' => 1,
70
        ]);
71
        if (isset($message['Messages']) && count($message['Messages']) > 0) {
72
            return $this->createJobFromMessage($message['Messages'][0]);
73
        } else {
74
            return false;
75
        }
76
    }
77
78
    /**
79
     * Create job from SQS message.
80
     *
81
     * @param array $message
82
     * @return JobInterface
83
     */
84
    private function createJobFromMessage($message)
85
    {
86
        $job = $this->deserialize($message['Body']);
87
        $job->setHeader('ReceiptHandle', $message['ReceiptHandle']);
88
        $job->setId($message['MessageId']);
89
        return $job;
90
    }
91
92
    /**
93
     * @param JobInterface $job
94
     * @param array $options
95
     * @return array
96
     */
97
    protected function mergePostOptions(JobInterface $job, array $options = [])
98
    {
99
        if (!isset($options['DelaySeconds']) && ($delay = $job->getOptions()->getDelay())) {
100
            $options['DelaySeconds'] = $delay;
101
        }
102
103
        return $options;
104
    }
105
106
    /**
107
     * @inheritdoc
108
     */
109
    protected function postJob(JobInterface $job, array $options = []): bool
110
    {
111
        $baseSettings =[
112
            'QueueUrl' => $this->url,
113
            'MessageBody' => $this->serialize($job),
114
        ];
115
116
        $settings = array_merge(
117
            //Merge any options found together
118
            $this->mergePostOptions($job, $options),
119
            //Base Settings should overwrite any attempt to change the url or message body.
120
            //Put this array last in the merge
121
            $baseSettings
122
        );
123
124
        $model = $this->client->sendMessage($settings);
125
        if ($model !== null) {
126
            $job->setId($model['MessageId']);
127
            return true;
128
        }
129
        return false;
130
    }
131
132
    /**
133
     * @inheritdoc
134
     */
135
    public function deleteJob(JobInterface $job): bool
136
    {
137
        $receiptHandle = $job->getHeader('ReceiptHandle');
138
139
        if (!empty($receiptHandle)) {
140
            $response = $this->client->deleteMessage([
141
                'QueueUrl' => $this->url,
142
                'ReceiptHandle' => $receiptHandle,
143
            ]);
144
145
            return $response !== null;
146
        }
147
148
        return false;
149
    }
150
151
    /**
152
     * @inheritdoc
153
     */
154
    public function releaseJob(JobInterface $job): bool
155
    {
156
        $receiptHandle = $job->getHeader('ReceiptHandle');
157
158
        if (!empty($receiptHandle)) {
159
            $response = $this->client->changeMessageVisibility([
160
                'QueueUrl' => $this->url,
161
                'ReceiptHandle' => $receiptHandle,
162
                'VisibilityTimeout' => 0,
163
            ]);
164
165
            return $response !== null;
166
        }
167
168
        return false;
169
    }
170
171
    /**
172
     * Returns the SQS client used.
173
     *
174
     * @return \Aws\Sqs\SqsClient
175
     */
176
    public function getClient()
177
    {
178
        return $this->client;
179
    }
180
181
    /**
182
     * @inheritdoc
183
     */
184
    public function getSize(): int
185
    {
186
        $response = $this->getClient()->getQueueAttributes([
187
            'QueueUrl' => $this->url,
188
            'AttributeNames' => [
189
                'ApproximateNumberOfMessages'
190
            ]
191
        ]);
192
        $attributes = $response->get('Attributes');
193
        return ArrayHelper::getValue(
194
            $attributes,
195
            'ApproximateNumberOfMessages',
196
            0
197
        );
198
    }
199
200
    /**
201
     * @inheritdoc
202
     */
203
    public function purge(): bool
204
    {
205
        $response = $this->getClient()->getQueueAttributes([
206
            'QueueUrl' => $this->url,
207
        ]);
208
        return $response !== null;
209
    }
210
}
211