GitHub Access Token became invalid

It seems like the GitHub access token used for retrieving details about this repository from GitHub became invalid. This might prevent certain types of inspections from being run (in particular, everything related to pull requests).
Please ask an admin of your repository to re-new the access token on this website.
Completed
Pull Request — master (#318)
by
unknown
07:21
created

SqsDriver   A

Complexity

Total Complexity 28

Size/Duplication

Total Lines 226
Duplicated Lines 0 %

Coupling/Cohesion

Components 1
Dependencies 4

Importance

Changes 0
Metric Value
wmc 28
lcom 1
cbo 4
dl 0
loc 226
rs 10
c 0
b 0
f 0

13 Methods

Rating   Name   Duplication   Size   Complexity  
A __construct() 0 7 1
A listQueues() 0 19 4
A createQueue() 0 15 3
A isFifoQueue() 0 4 1
A endsWith() 0 9 2
A countMessages() 0 15 2
A pushMessage() 0 16 2
B popMessage() 0 24 5
A acknowledgeMessage() 0 9 1
A peekQueue() 0 4 1
A removeQueue() 0 8 1
A info() 0 6 1
A resolveUrl() 0 14 4
1
<?php
2
3
namespace Bernard\Driver;
4
5
use Aws\Sqs\Exception\SqsException;
6
use Aws\Sqs\SqsClient;
7
8
/**
9
 * Implements a Driver for use with AWS SQS client API:
10
 * @link http://docs.aws.amazon.com/aws-sdk-php/v3/api/api-sqs-2012-11-05.html
11
 *
12
 * @package Bernard
13
 */
14
class SqsDriver extends AbstractPrefetchDriver
15
{
16
    const AWS_SQS_FIFO_SUFFIX = '.fifo';
17
18
    protected $sqs;
19
    protected $queueUrls;
20
21
    /**
22
     * @param SqsClient $sqs
23
     * @param array     $queueUrls
24
     * @param int|null  $prefetch
25
     */
26
    public function __construct(SqsClient $sqs, array $queueUrls = [], $prefetch = null)
27
    {
28
        parent::__construct($prefetch);
29
30
        $this->sqs = $sqs;
31
        $this->queueUrls = $queueUrls;
32
    }
33
34
    /**
35
     * {@inheritdoc}
36
     */
37
    public function listQueues()
38
    {
39
        $result = $this->sqs->listQueues();
40
41
        if (!$queueUrls = $result->get('QueueUrls')) {
42
            return array_keys($this->queueUrls);
43
        }
44
45
        foreach ($queueUrls as $queueUrl) {
46
            if (in_array($queueUrl, $this->queueUrls)) {
47
                continue;
48
            }
49
50
            $queueName = current(array_reverse(explode('/', $queueUrl)));
51
            $this->queueUrls[$queueName] = $queueUrl;
52
        }
53
54
        return array_keys($this->queueUrls);
55
    }
56
57
    /**
58
     * {@inheritdoc}
59
     *
60
     * @link http://docs.aws.amazon.com/aws-sdk-php/v3/api/api-sqs-2012-11-05.html#createqueue
61
     */
62
    public function createQueue($queueName)
63
    {
64
        try {
65
            $this->resolveUrl($queueName);
66
        } catch (\InvalidArgumentException $e) {
67
            $result = $this->sqs->createQueue([
68
                'QueueName' => $queueName,
69
                'Attributes' => [
70
                    'FifoQueue' => $this->isFifoQueue($queueName) ? 'true' : 'false',
71
                ],
72
            ]);
73
74
            $this->queueUrls[$queueName] = $result['QueueUrl'];
75
        }
76
    }
77
78
    /**
79
     * @param string $queueName
80
     *
81
     * @return bool
82
     */
83
    private function isFifoQueue($queueName)
84
    {
85
        return $this->endsWith($queueName, self::AWS_SQS_FIFO_SUFFIX);
86
    }
87
88
    /**
89
     * @param string $haystack
90
     * @param string $needle
91
     *
92
     * @return bool
93
     */
94
    private function endsWith($haystack, $needle)
95
    {
96
        $length = strlen($needle);
97
        if ($length === 0) {
98
            return true;
99
        }
100
101
        return (substr($haystack, -$length) === $needle);
102
    }
103
104
    /**
105
     * {@inheritdoc}
106
     */
107
    public function countMessages($queueName)
108
    {
109
        $queueUrl = $this->resolveUrl($queueName);
110
111
        $result = $this->sqs->getQueueAttributes([
112
            'QueueUrl' => $queueUrl,
113
            'AttributeNames' => ['ApproximateNumberOfMessages'],
114
        ]);
115
116
        if (isset($result['Attributes']['ApproximateNumberOfMessages'])) {
117
            return (int)$result['Attributes']['ApproximateNumberOfMessages'];
118
        }
119
120
        return 0;
121
    }
122
123
    /**
124
     * {@inheritdoc}
125
     */
126
    public function pushMessage($queueName, $message)
127
    {
128
        $queueUrl = $this->resolveUrl($queueName);
129
130
        $parameters = [
131
            'QueueUrl' => $queueUrl,
132
            'MessageBody' => $message,
133
        ];
134
135
        if($this->isFifoQueue($queueName)){
0 ignored issues
show
Coding Style introduced by
Expected 1 space after IF keyword; 0 found
Loading history...
136
            $parameters['MessageGroupId'] = __METHOD__;
137
            $parameters['MessageDeduplicationId'] = md5($message);
138
        }
139
140
        $this->sqs->sendMessage($parameters);
141
    }
142
143
    /**
144
     * {@inheritdoc}
145
     */
146
    public function popMessage($queueName, $duration = 5)
147
    {
148
        if ($message = $this->cache->pop($queueName)) {
149
            return $message;
150
        }
151
152
        $queueUrl = $this->resolveUrl($queueName);
153
154
        $result = $this->sqs->receiveMessage([
155
            'QueueUrl' => $queueUrl,
156
            'MaxNumberOfMessages' => $this->prefetch,
157
            'WaitTimeSeconds' => $duration,
158
        ]);
159
160
        if (!$result || !$messages = $result->get('Messages')) {
161
            return [null, null];
162
        }
163
164
        foreach ($messages as $message) {
165
            $this->cache->push($queueName, [$message['Body'], $message['ReceiptHandle']]);
166
        }
167
168
        return $this->cache->pop($queueName);
169
    }
170
171
    /**
172
     * {@inheritdoc}
173
     */
174
    public function acknowledgeMessage($queueName, $receipt)
175
    {
176
        $queueUrl = $this->resolveUrl($queueName);
177
178
        $this->sqs->deleteMessage([
179
            'QueueUrl' => $queueUrl,
180
            'ReceiptHandle' => $receipt,
181
        ]);
182
    }
183
184
    /**
185
     * {@inheritdoc}
186
     */
187
    public function peekQueue($queueName, $index = 0, $limit = 20)
188
    {
189
        return [];
190
    }
191
192
    /**
193
     * {@inheritdoc}
194
     *
195
     * @link http://docs.aws.amazon.com/aws-sdk-php/v3/api/api-sqs-2012-11-05.html#deletequeue
196
     */
197
    public function removeQueue($queueName)
198
    {
199
        $queueUrl = $this->resolveUrl($queueName);
200
201
        $this->sqs->deleteQueue([
202
            'QueueUrl' => $queueUrl,
203
        ]);
204
    }
205
206
    /**
207
     * {@inheritdoc}
208
     */
209
    public function info()
210
    {
211
        return [
212
            'prefetch' => $this->prefetch,
213
        ];
214
    }
215
216
    /**
217
     * AWS works with queue URLs rather than queue names. Returns either queue URL (if queue exists) for given name or null if not.
218
     *
219
     * @param string $queueName
220
     *
221
     * @return mixed
222
     *
223
     * @throws SqsException
224
     */
225
    protected function resolveUrl($queueName)
226
    {
227
        if (isset($this->queueUrls[$queueName])) {
228
            return $this->queueUrls[$queueName];
229
        }
230
231
        $result = $this->sqs->getQueueUrl(['QueueName' => $queueName]);
232
233
        if ($result && $queueUrl = $result->get('QueueUrl')) {
234
            return $this->queueUrls[$queueName] = $queueUrl;
235
        }
236
237
        throw new \InvalidArgumentException('Queue "' . $queueName .'" cannot be resolved to an url.');
238
    }
239
}
240