GitHub Access Token became invalid

It seems like the GitHub access token used for retrieving details about this repository from GitHub became invalid. This might prevent certain types of inspections from being run (in particular, everything related to pull requests).
Please ask an admin of your repository to re-new the access token on this website.
Completed
Pull Request — master (#219)
by Jonathan
03:42
created

SqsDriver   A

Complexity

Total Complexity 22

Size/Duplication

Total Lines 170
Duplicated Lines 0 %

Coupling/Cohesion

Components 1
Dependencies 4

Importance

Changes 11
Bugs 1 Features 1
Metric Value
wmc 22
c 11
b 1
f 1
lcom 1
cbo 4
dl 0
loc 170
rs 10

11 Methods

Rating   Name   Duplication   Size   Complexity  
A __construct() 0 7 1
A listQueues() 0 19 4
A createQueue() 0 3 1
A countMessages() 0 15 2
A pushMessage() 0 9 1
B popMessage() 0 24 5
A acknowledgeMessage() 0 9 1
A peekQueue() 0 4 1
A removeQueue() 0 3 1
A info() 0 6 1
A resolveUrl() 0 14 4
1
<?php
2
3
namespace Bernard\Driver;
4
5
use Aws\Sqs\Exception\SqsException;
6
use Aws\Sqs\SqsClient;
7
8
/**
9
 * Implements a Driver for use with AWS SQS client API:
10
 * @link http://docs.aws.amazon.com/aws-sdk-php/v3/api/api-sqs-2012-11-05.html
11
 *
12
 * @package Bernard
13
 */
14
class SqsDriver extends AbstractPrefetchDriver
15
{
16
    protected $sqs;
17
    protected $queueUrls;
18
19
    /**
20
     * @param SqsClient $sqs
21
     * @param array     $queueUrls
22
     * @param int|null  $prefetch
23
     */
24
    public function __construct(SqsClient $sqs, array $queueUrls = [], $prefetch = null)
25
    {
26
        parent::__construct($prefetch);
27
28
        $this->sqs = $sqs;
29
        $this->queueUrls = $queueUrls;
30
    }
31
32
    /**
33
     * {@inheritdoc}
34
     */
35
    public function listQueues()
36
    {
37
        $result = $this->sqs->listQueues();
38
39
        if (!$queueUrls = $result->get('QueueUrls')) {
40
            return array_keys($this->queueUrls);
41
        }
42
43
        foreach ($queueUrls as $queueUrl) {
44
            if (in_array($queueUrl, $this->queueUrls)) {
45
                continue;
46
            }
47
48
            $queueName = current(array_reverse(explode('/', $queueUrl)));
49
            $this->queueUrls[$queueName] = $queueUrl;
50
        }
51
52
        return array_keys($this->queueUrls);
53
    }
54
55
    /**
56
     * {@inheritdoc}
57
     */
58
    public function createQueue($queueName)
59
    {
60
    }
61
62
    /**
63
     * {@inheritdoc}
64
     */
65
    public function countMessages($queueName)
66
    {
67
        $queueUrl = $this->resolveUrl($queueName);
68
69
        $result = $this->sqs->getQueueAttributes([
70
            'QueueUrl' => $queueUrl,
71
            'AttributeNames' => ['ApproximateNumberOfMessages'],
72
        ]);
73
74
        if (isset($result['Attributes']['ApproximateNumberOfMessages'])) {
75
            return $result['Attributes']['ApproximateNumberOfMessages'];
76
        }
77
78
        return 0;
79
    }
80
81
    /**
82
     * {@inheritdoc}
83
     */
84
    public function pushMessage($queueName, $message)
85
    {
86
        $queueUrl = $this->resolveUrl($queueName);
87
88
        $this->sqs->sendMessage([
89
            'QueueUrl' => $queueUrl,
90
            'MessageBody' => $message,
91
        ]);
92
    }
93
94
    /**
95
     * {@inheritdoc}
96
     */
97
    public function popMessage($queueName, $duration = 5)
98
    {
99
        if ($message = $this->cache->pop($queueName)) {
100
            return $message;
101
        }
102
103
        $queueUrl = $this->resolveUrl($queueName);
104
105
        $result = $this->sqs->receiveMessage([
106
            'QueueUrl' => $queueUrl,
107
            'MaxNumberOfMessages' => $this->prefetch,
108
            'WaitTimeSeconds' => $duration,
109
        ]);
110
111
        if (!$result || !$messages = $result->get('Messages')) {
112
            return [null, null];
113
        }
114
115
        foreach ($messages as $message) {
116
            $this->cache->push($queueName, [$message['Body'], $message['ReceiptHandle']]);
117
        }
118
119
        return $this->cache->pop($queueName);
120
    }
121
122
    /**
123
     * {@inheritdoc}
124
     */
125
    public function acknowledgeMessage($queueName, $receipt)
126
    {
127
        $queueUrl = $this->resolveUrl($queueName);
128
129
        $this->sqs->deleteMessage([
130
            'QueueUrl' => $queueUrl,
131
            'ReceiptHandle' => $receipt,
132
        ]);
133
    }
134
135
    /**
136
     * {@inheritdoc}
137
     */
138
    public function peekQueue($queueName, $index = 0, $limit = 20)
139
    {
140
        return [];
141
    }
142
143
    /**
144
     * {@inheritdoc}
145
     */
146
    public function removeQueue($queueName)
147
    {
148
    }
149
150
    /**
151
     * {@inheritdoc}
152
     */
153
    public function info()
154
    {
155
        return [
156
            'prefetch' => $this->prefetch,
157
        ];
158
    }
159
160
    /**
161
     * AWS works with queue URLs rather than queue names. Returns either queue URL (if queue exists) for given name or null if not.
162
     *
163
     * @param string $queueName
164
     *
165
     * @return mixed
166
     *
167
     * @throws SqsException
168
     */
169
    protected function resolveUrl($queueName)
170
    {
171
        if (isset($this->queueUrls[$queueName])) {
172
            return $this->queueUrls[$queueName];
173
        }
174
175
        $result = $this->sqs->getQueueUrl(['QueueName' => $queueName]);
176
177
        if ($result && $queueUrl = $result->get('QueueUrl')) {
178
            return $this->queueUrls[$queueName] = $queueUrl;
179
        }
180
181
        throw new \InvalidArgumentException('Queue "' . $queueName .'" cannot be resolved to an url.');
182
    }
183
}
184