Completed
Push — master ( 38797d...534d41 )
by Nate
03:48
created

Sqs   A

Complexity

Total Complexity 14

Size/Duplication

Total Lines 167
Duplicated Lines 0 %

Coupling/Cohesion

Components 1
Dependencies 3

Test Coverage

Coverage 0%

Importance

Changes 0
Metric Value
wmc 14
lcom 1
cbo 3
dl 0
loc 167
ccs 0
cts 87
cp 0
rs 10
c 0
b 0
f 0

9 Methods

Rating   Name   Duplication   Size   Complexity  
A init() 0 5 1
A fetchJob() 0 13 3
A createJobFromMessage() 0 7 1
A postJob() 0 12 2
A deleteJob() 0 15 2
A releaseJob() 0 16 2
A getClient() 0 4 1
A getSize() 0 15 1
A purge() 0 7 1
1
<?php
2
3
/**
4
 * @author    Flipbox Factory
5
 * @copyright Copyright (c) 2017, Flipbox Digital
6
 * @link      https://github.com/flipbox/queue/releases/latest
7
 * @license   https://github.com/flipbox/queue/blob/master/LICENSE
8
 */
9
10
namespace flipbox\queue\queues;
11
12
use Aws\Sqs\SqsClient;
13
use flipbox\queue\jobs\JobInterface;
14
use yii\helpers\ArrayHelper;
15
16
/**
17
 * @author Flipbox Factory <[email protected]>
18
 * @since 1.0.0
19
 */
20
class Sqs extends AbstractQueue
21
{
22
23
    /**
24
     * The SQS url.
25
     *
26
     * @var string
27
     */
28
    public $url;
29
30
    /**
31
     * The config for SqsClient.
32
     *
33
     * This will be used for SqsClient::factory($config);
34
     * @var array
35
     */
36
    public $config = [];
37
38
    /**
39
     * Due to ability of the queue message to be visible automatically after
40
     * a certain of time, this is not required.
41
     *
42
     * @inheritdoc
43
     */
44
    public $releaseOnFailure = false;
45
46
    /**
47
     * Stores the SQS client.
48
     * @var \Aws\Sqs\SqsClient
49
     */
50
    private $client;
51
52
    /**
53
     * @inheritdoc
54
     */
55
    public function init()
56
    {
57
        parent::init();
58
        $this->client = new SqsClient($this->config);
59
    }
60
61
    /**
62
     * @inheritdoc
63
     */
64
    public function fetchJob()
65
    {
66
        $message = $this->client->receiveMessage([
67
            'QueueUrl' => $this->url,
68
            'AttributeNames' => ['ApproximateReceiveCount'],
69
            'MaxNumberOfMessages' => 1,
70
        ]);
71
        if (isset($message['Messages']) && count($message['Messages']) > 0) {
72
            return $this->createJobFromMessage($message['Messages'][0]);
73
        } else {
74
            return false;
75
        }
76
    }
77
78
    /**
79
     * Create job from SQS message.
80
     *
81
     * @param array $message
82
     * @return JobInterface
83
     */
84
    private function createJobFromMessage($message)
85
    {
86
        $job = $this->deserialize($message['Body']);
87
        $job->setHeader('ReceiptHandle', $message['ReceiptHandle']);
88
        $job->setId($message['MessageId']);
89
        return $job;
90
    }
91
92
    /**
93
     * @inheritdoc
94
     */
95
    public function postJob(JobInterface $job): bool
96
    {
97
        $model = $this->client->sendMessage([
98
            'QueueUrl' => $this->url,
99
            'MessageBody' => $this->serialize($job),
100
        ]);
101
        if ($model !== null) {
102
            $job->setId($model['MessageId']);
103
            return true;
104
        }
105
        return false;
106
    }
107
108
    /**
109
     * @inheritdoc
110
     */
111
    public function deleteJob(JobInterface $job): bool
112
    {
113
        $receiptHandle = $job->getHeader('ReceiptHandle');
114
115
        if (!empty($receiptHandle)) {
116
            $response = $this->client->deleteMessage([
117
                'QueueUrl' => $this->url,
118
                'ReceiptHandle' => $receiptHandle,
119
            ]);
120
121
            return $response !== null;
122
        }
123
124
        return false;
125
    }
126
127
    /**
128
     * @inheritdoc
129
     */
130
    public function releaseJob(JobInterface $job): bool
131
    {
132
        $receiptHandle = $job->getHeader('ReceiptHandle');
133
134
        if (!empty($receiptHandle)) {
135
            $response = $this->client->changeMessageVisibility([
136
                'QueueUrl' => $this->url,
137
                'ReceiptHandle' => $receiptHandle,
138
                'VisibilityTimeout' => 0,
139
            ]);
140
141
            return $response !== null;
142
        }
143
144
        return false;
145
    }
146
147
    /**
148
     * Returns the SQS client used.
149
     *
150
     * @return \Aws\Sqs\SqsClient
151
     */
152
    public function getClient()
153
    {
154
        return $this->client;
155
    }
156
157
    /**
158
     * @inheritdoc
159
     */
160
    public function getSize(): int
161
    {
162
        $response = $this->getClient()->getQueueAttributes([
163
            'QueueUrl' => $this->url,
164
            'AttributeNames' => [
165
                'ApproximateNumberOfMessages'
166
            ]
167
        ]);
168
        $attributes = $response->get('Attributes');
169
        return ArrayHelper::getValue(
170
            $attributes,
171
            'ApproximateNumberOfMessages',
172
            0
173
        );
174
    }
175
176
    /**
177
     * @inheritdoc
178
     */
179
    public function purge(): bool
180
    {
181
        $response = $this->getClient()->getQueueAttributes([
182
            'QueueUrl' => $this->url,
183
        ]);
184
        return $response !== null;
185
    }
186
}
187