GitHub Access Token became invalid

It seems like the GitHub access token used for retrieving details about this repository from GitHub became invalid. This might prevent certain types of inspections from being run (in particular, everything related to pull requests).
Please ask an admin of your repository to re-new the access token on this website.
Completed
Push — master ( 967239...06466a )
by Márk
02:52
created

Driver::pushMessage()   A

Complexity

Conditions 2
Paths 2

Size

Total Lines 16

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 11
CRAP Score 2.0145

Importance

Changes 0
Metric Value
dl 0
loc 16
ccs 11
cts 13
cp 0.8462
rs 9.7333
c 0
b 0
f 0
cc 2
nc 2
nop 2
crap 2.0145
1
<?php
2
3
namespace Bernard\Driver\Sqs;
4
5
use Aws\Sqs\Exception\SqsException;
6
use Aws\Sqs\SqsClient;
7
use Bernard\Driver\AbstractPrefetchDriver;
8
9
/**
10
 * Implements a Driver for use with AWS SQS client API:.
11
 *
12
 * @see http://docs.aws.amazon.com/aws-sdk-php/v3/api/api-sqs-2012-11-05.html
13
 */
14
final class Driver extends AbstractPrefetchDriver
15
{
16
    const AWS_SQS_FIFO_SUFFIX = '.fifo';
17
    const AWS_SQS_EXCEPTION_BAD_REQUEST = 400;
18
    const AWS_SQS_EXCEPTION_NOT_FOUND = 404;
19
20
    private $sqs;
21
    private $queueUrls;
22
23
    /**
24
     * @param SqsClient $sqs
25
     * @param array     $queueUrls
26
     * @param int|null  $prefetch
27
     */
28 13
    public function __construct(SqsClient $sqs, array $queueUrls = [], $prefetch = null)
29
    {
30 13
        parent::__construct($prefetch);
31
32 13
        $this->sqs = $sqs;
33 13
        $this->queueUrls = $queueUrls;
34 13
    }
35
36
    /**
37
     * {@inheritdoc}
38
     */
39 1
    public function listQueues()
40
    {
41 1
        $result = $this->sqs->listQueues();
42
43 1
        if (!$queueUrls = $result->get('QueueUrls')) {
44
            return array_keys($this->queueUrls);
45
        }
46
47 1
        foreach ($queueUrls as $queueUrl) {
48 1
            if (in_array($queueUrl, $this->queueUrls)) {
49 1
                continue;
50
            }
51
52 1
            $queueName = current(array_reverse(explode('/', $queueUrl)));
53 1
            $this->queueUrls[$queueName] = $queueUrl;
54 1
        }
55
56 1
        return array_keys($this->queueUrls);
57
    }
58
59
    /**
60
     * {@inheritdoc}
61
     *
62
     * @see http://docs.aws.amazon.com/aws-sdk-php/v3/api/api-sqs-2012-11-05.html#createqueue
63
     *
64
     * @throws SqsException
65
     */
66 2
    public function createQueue($queueName)
67
    {
68 2
        if ($this->queueExists($queueName)) {
69 2
            return;
70
        }
71
72
        $parameters = [
73 2
            'QueueName' => $queueName,
74 2
        ];
75
76 2
        if ($this->isFifoQueue($queueName)) {
77 1
            $parameters['Attributes'] = [
78 1
                'FifoQueue' => 'true',
79
            ];
80 1
        }
81
82 2
        $result = $this->sqs->createQueue($parameters);
83
84 2
        $this->queueUrls[$queueName] = $result['QueueUrl'];
85 2
    }
86
87
    /**
88
     * @param string $queueName
89
     *
90
     * @return bool
91
     *
92
     * @throws SqsException
93
     */
94 2
    private function queueExists($queueName)
95
    {
96
        try {
97 2
            $this->resolveUrl($queueName);
98
99 2
            return true;
100 2
        } catch (\InvalidArgumentException $exception) {
101 2
            return false;
102
        } catch (SqsException $exception) {
103
            if ($previousException = $exception->getPrevious()) {
104
                switch ($previousException->getCode()) {
105
                    case self::AWS_SQS_EXCEPTION_BAD_REQUEST:
106
                    case self::AWS_SQS_EXCEPTION_NOT_FOUND:
107
                        return false;
108
                }
109
            }
110
111
            throw $exception;
112
        }
113
    }
114
115
    /**
116
     * @param string $queueName
117
     *
118
     * @return bool
119
     */
120 4
    private function isFifoQueue($queueName)
121
    {
122 4
        return $this->endsWith($queueName, self::AWS_SQS_FIFO_SUFFIX);
123
    }
124
125
    /**
126
     * @param string $haystack
127
     * @param string $needle
128
     *
129
     * @return bool
130
     */
131 4
    private function endsWith($haystack, $needle)
132
    {
133 4
        $length = strlen($needle);
134 4
        if ($length === 0) {
135
            return true;
136
        }
137
138 4
        return substr($haystack, -$length) === $needle;
139
    }
140
141
    /**
142
     * {@inheritdoc}
143
     */
144 1
    public function countMessages($queueName)
145
    {
146 1
        $queueUrl = $this->resolveUrl($queueName);
147
148 1
        $result = $this->sqs->getQueueAttributes([
149 1
            'QueueUrl' => $queueUrl,
150 1
            'AttributeNames' => ['ApproximateNumberOfMessages'],
151 1
        ]);
152
153 1
        if (isset($result['Attributes']['ApproximateNumberOfMessages'])) {
154 1
            return (int) $result['Attributes']['ApproximateNumberOfMessages'];
155
        }
156
157
        return 0;
158
    }
159
160
    /**
161
     * {@inheritdoc}
162
     */
163 2
    public function pushMessage($queueName, $message)
164
    {
165 2
        $queueUrl = $this->resolveUrl($queueName);
166
167
        $parameters = [
168 2
            'QueueUrl' => $queueUrl,
169 2
            'MessageBody' => $message,
170 2
        ];
171
172 2
        if ($this->isFifoQueue($queueName)) {
173 1
            $parameters['MessageGroupId'] = __METHOD__;
174 1
            $parameters['MessageDeduplicationId'] = md5($message);
175 1
        }
176
177 2
        $this->sqs->sendMessage($parameters);
178 2
    }
179
180
    /**
181
     * {@inheritdoc}
182
     */
183 3
    public function popMessage($queueName, $duration = 5)
184
    {
185 3
        if ($message = $this->cache->pop($queueName)) {
186 1
            return $message;
187
        }
188
189 3
        $queueUrl = $this->resolveUrl($queueName);
190
191 2
        $result = $this->sqs->receiveMessage([
192 2
            'QueueUrl' => $queueUrl,
193 2
            'MaxNumberOfMessages' => $this->prefetch,
194 2
            'WaitTimeSeconds' => $duration,
195 2
        ]);
196
197 2
        if (!$result || !$messages = $result->get('Messages')) {
198 1
            return [null, null];
199
        }
200
201 2
        foreach ($messages as $message) {
202 2
            $this->cache->push($queueName, [$message['Body'], $message['ReceiptHandle']]);
203 2
        }
204
205 2
        return $this->cache->pop($queueName);
206
    }
207
208
    /**
209
     * {@inheritdoc}
210
     */
211 1
    public function acknowledgeMessage($queueName, $receipt)
212
    {
213 1
        $queueUrl = $this->resolveUrl($queueName);
214
215 1
        $this->sqs->deleteMessage([
216 1
            'QueueUrl' => $queueUrl,
217 1
            'ReceiptHandle' => $receipt,
218 1
        ]);
219 1
    }
220
221
    /**
222
     * {@inheritdoc}
223
     */
224
    public function peekQueue($queueName, $index = 0, $limit = 20)
225
    {
226
        return [];
227
    }
228
229
    /**
230
     * {@inheritdoc}
231
     *
232
     * @see http://docs.aws.amazon.com/aws-sdk-php/v3/api/api-sqs-2012-11-05.html#deletequeue
233
     */
234 1
    public function removeQueue($queueName)
235
    {
236 1
        $queueUrl = $this->resolveUrl($queueName);
237
238 1
        $this->sqs->deleteQueue([
239 1
            'QueueUrl' => $queueUrl,
240 1
        ]);
241 1
    }
242
243
    /**
244
     * {@inheritdoc}
245
     */
246 1
    public function info()
247
    {
248
        return [
249 1
            'prefetch' => $this->prefetch,
250 1
        ];
251
    }
252
253
    /**
254
     * AWS works with queue URLs rather than queue names. Returns either queue URL (if queue exists) for given name or null if not.
255
     *
256
     * @param string $queueName
257
     *
258
     * @return mixed
259
     *
260
     * @throws SqsException
261
     */
262 10
    private function resolveUrl($queueName)
263
    {
264 10
        if (isset($this->queueUrls[$queueName])) {
265 5
            return $this->queueUrls[$queueName];
266
        }
267
268 8
        $result = $this->sqs->getQueueUrl(['QueueName' => $queueName]);
269
270 8
        if ($result && $queueUrl = $result->get('QueueUrl')) {
271 5
            return $this->queueUrls[$queueName] = $queueUrl;
272
        }
273
274 3
        throw new \InvalidArgumentException('Queue "'.$queueName.'" cannot be resolved to an url.');
275
    }
276
}
277