Completed
Pull Request — master (#15)
by Alex
01:44
created

QueueReader::bindToSNS()   B

Complexity

Conditions 1
Paths 1

Size

Total Lines 32
Code Lines 21

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
dl 0
loc 32
rs 8.8571
c 0
b 0
f 0
cc 1
eloc 21
nc 1
nop 1
1
<?php
2
3
namespace Cmp\Queues\Infrastructure\AWS\v20121105\Queue;
4
5
use Aws\Sns\SnsClient;
6
use Aws\Sqs\SqsClient;
7
use Cmp\Queues\Domain\Queue\Exception\ReaderException;
8
use Cmp\Queues\Domain\Queue\Exception\TimeoutReaderException;
9
use Cmp\Queues\Domain\Queue\QueueReader as DomainQueueReader;
10
use Exception;
11
use Psr\Log\LoggerInterface;
12
13
class QueueReader implements DomainQueueReader
14
{
15
    /**
16
     * @var SqsClient
17
     */
18
    protected $sqs;
19
20
    /**
21
     * @var SnsClient
22
     */
23
    protected $sns;
24
25
    /**
26
     * @var string
27
     */
28
    protected $queueUrl;
29
30
    /**
31
     * @var MessageHandler
32
     */
33
    protected $messageHandler;
34
35
    /**
36
     * @var LoggerInterface
37
     */
38
    protected $logger;
39
40
    /**
41
     * @param SqsClient       $sqs
42
     * @param SnsClient       $sns
43
     * @param string          $queueName
44
     * @param string          $topicName
45
     * @param MessageHandler  $messageHandler
46
     * @param LoggerInterface $logger
47
     */
48
    public function __construct(
49
        SqsClient $sqs,
50
        SnsClient $sns,
51
        $queueName,
52
        $topicName,
53
        MessageHandler $messageHandler,
54
        LoggerInterface $logger
55
    ) {
56
        $this->sqs = $sqs;
57
        $this->sns = $sns;
58
        $this->logger = $logger;
59
        $this->messageHandler = $messageHandler;
60
61
        $this->initialize($queueName, $topicName);
62
    }
63
64
    /**
65
     * @param callable $callback
66
     * @param int      $timeout
67
     *
68
     * @throws TimeoutReaderException
69
     * @throws ReaderException
70
     */
71
    public function read(callable $callback, $timeout=0)
72
    {
73
        $this->messageHandler->setCallback($callback);
74
75
        try {
76
            $this->consume($timeout);
77
        } catch(TimeoutReaderException $e) {
78
            throw $e;
79
        } catch(\Exception $e) {
80
            throw new ReaderException("Error occurred while reading", 0, $e);
81
        }
82
    }
83
84
    /**
85
     * Deletes all messages from the queue
86
     */
87
    public function purge()
88
    {
89
        $this->sqs->purgeQueue($this->queueUrl);
0 ignored issues
show
Documentation introduced by
$this->queueUrl is of type string, but the function expects a array.

It seems like the type of the argument is not accepted by the function/method which you are calling.

In some cases, in particular if PHP’s automatic type-juggling kicks in this might be fine. In other cases, however this might be a bug.

We suggest to add an explicit type cast like in the following example:

function acceptsInteger($int) { }

$x = '123'; // string "123"

// Instead of
acceptsInteger($x);

// we recommend to use
acceptsInteger((integer) $x);
Loading history...
90
    }
91
92
    /**
93
     * @param int $timeout
94
     *
95
     * @throws TimeoutReaderException
96
     */
97
    protected function consume($timeout)
98
    {
99
        $result = $this->sqs->receiveMessage([
100
            'QueueUrl' => $this->queueUrl,
101
            'MessageAttributeNames' => ['All'],
102
            'MaxNumberOfMessages' => 10,
103
            'WaitTimeSeconds' => $timeout,
104
        ]);
105
106
        if ($timeout != 0 && !isset($result['Messages'])) {
107
            throw new TimeoutReaderException();
108
        }
109
110
        $messages = isset($result['Messages']) ? $result['Messages'] : [];
111
        foreach ($messages as $message) {
112
            $this->messageHandler->handleMessage($message);
113
            $this->sqs->deleteMessage(['QueueUrl' => $this->queueUrl, 'ReceiptHandle' => $message['ReceiptHandle']]);
114
        }
115
    }
116
117
    /**
118
     * @param string $queueName
119
     * @param string $topicName
120
     *
121
     * @throws ReaderException
122
     */
123
    protected function initialize($queueName, $topicName)
124
    {
125
        try {
126
            $this->createQueue($queueName);
127
            $this->bindToSNS($topicName);
128
        } catch (Exception $e) {
129
            $this->logger->error('Error trying to create queue', ['exception' => $e]);
130
            throw new ReaderException('Error initializing queue reader', 0, $e);
131
        }
132
    }
133
134
    /**
135
     * Creates the queue in SQS, nothing will happen if the queue already exists
136
     *
137
     * @param string $queueName
138
     */
139
    protected function createQueue($queueName)
140
    {
141
        $result = $this->sqs->createQueue(array('QueueName' => $queueName));
142
        $this->queueUrl = $result['QueueUrl'];
143
    }
144
145
    /**
146
     * @param string $topicName
147
     */
148
    protected function bindToSNS($topicName)
149
    {
150
        $result = $this->sns->createTopic(['Name' => $topicName]);
151
        $topicArn = $result->get('TopicArn');
152
153
        $queueArn = $this->sqs->getQueueArn($this->queueUrl);
154
        $this->sns->subscribe([
155
            'TopicArn' => $topicArn,
156
            'Protocol' => 'sqs',
157
            'Endpoint' => $queueArn,
158
        ]);
159
160
        $this->sqs->setQueueAttributes([
161
            'QueueUrl' => $this->queueUrl,
162
            'Attributes' => [
163
                'Policy' => json_encode([
164
                    'Version' => '2012-10-17',
165
                    'Statement'  => [
166
                        'Effect' => 'Allow',
167
                        'Principal' => '*',
168
                        'Action' => 'sqs:SendMessage',
169
                        'Resource' => $queueArn,
170
                        'Condition' => [
171
                                'ArnEquals' => [
172
                                    'aws:SourceArn' => $topicArn,
173
                           ],
174
                        ],
175
                    ],
176
                ]),
177
            ]
178
        ]);
179
    }
180
}
181