Completed
Pull Request — master (#108)
by
unknown
04:43
created

AwsProvider::destroy()   B

Complexity

Conditions 5
Paths 6

Size

Total Lines 33
Code Lines 17

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 17
CRAP Score 5

Importance

Changes 2
Bugs 1 Features 1
Metric Value
c 2
b 1
f 1
dl 0
loc 33
ccs 17
cts 17
cp 1
rs 8.439
cc 5
eloc 17
nc 6
nop 0
crap 5
1
<?php
2
3
/**
4
 * Copyright 2014 Underground Elephant
5
 *
6
 * Licensed under the Apache License, Version 2.0 (the "License");
7
 * you may not use this file except in compliance with the License.
8
 * You may obtain a copy of the License at
9
 *
10
 *     http://www.apache.org/licenses/LICENSE-2.0
11
 *
12
 * Unless required by applicable law or agreed to in writing, software
13
 * distributed under the License is distributed on an "AS IS" BASIS,
14
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15
 * See the License for the specific language governing permissions and
16
 * limitations under the License.
17
 *
18
 * @package     qpush-bundle
19
 * @copyright   Underground Elephant 2014
20
 * @license     Apache License, Version 2.0
21
 */
22
23
namespace Uecode\Bundle\QPushBundle\Provider;
24
25
use Aws\Sns\Exception\NotFoundException;
26
use Aws\Sns\SnsClient;
27
use Aws\Sqs\Exception\SqsException;
28
use Aws\Sqs\SqsClient;
29
30
use Doctrine\Common\Cache\Cache;
31
use Symfony\Bridge\Monolog\Logger;
32
use Symfony\Component\EventDispatcher\EventDispatcherInterface;
33
use Uecode\Bundle\QPushBundle\Event\Events;
34
use Uecode\Bundle\QPushBundle\Event\MessageEvent;
35
use Uecode\Bundle\QPushBundle\Event\NotificationEvent;
36
use Uecode\Bundle\QPushBundle\Message\Message;
37
38
/**
39
 * @author Keith Kirk <[email protected]>
40
 */
41
class AwsProvider extends AbstractProvider
42
{
43
    /**
44
     * Aws SQS Client
45
     *
46
     * @var SqsClient
47
     */
48
    private $sqs;
49
50
    /**
51
     * Aws SNS Client
52
     *
53
     * @var SnsClient
54
     */
55
    private $sns;
56
57
    /**
58
     * SQS Queue URL
59
     *
60
     * @var string
61
     */
62
    private $queueUrl;
63
64
    /**
65
     * SNS Topic ARN
66
     *
67
     * @var string
68
     */
69
    private $topicArn;
70
71
    /**
72
     * Flag whether queues should automatically be setup if not available.
73
     *
74
     * @var boolean
75
     */
76
    private $queueAutoSetup = true;
77
78 15
    public function __construct($name, array $options, $client, Cache $cache, Logger $logger)
79
    {
80 15
        $this->name     = $name;
81 15
        $this->options  = $options;
82 15
        $this->cache    = $cache;
83 15
        $this->logger   = $logger;
84
85
        // get() method used for sdk v2, create methods for v3
86 15
        $useGet = method_exists($client, 'get');
87 15
        $this->sqs = $useGet ? $client->get('Sqs') : $client->createSqs();
88 15
        $this->sns = $useGet ? $client->get('Sns') : $client->createSns();
89
90 15
        if (isset($options['disable_queue_auto_setup']) && $options['disable_queue_auto_setup']) {
91
            $this->queueAutoSetup = false;
92
        }
93 15
    }
94
95 1
    public function getProvider()
96
    {
97 1
        return "AWS";
98
    }
99
100
    /**
101
     * Builds the configured queues
102
     *
103
     * If a Queue name is passed and configured, this method will build only that
104
     * Queue.
105
     *
106
     * All Create methods are idempotent, if the resource exists, the current ARN
107
     * will be returned
108
     *
109
     */
110 1
    public function create()
111
    {
112 1
        $this->createQueue();
113
114 1
        if ($this->options['push_notifications']) {
115
           // Create the SNS Topic
116 1
           $this->createTopic();
117
118
           // Add the SQS Queue as a Subscriber to the SNS Topic
119 1
           $this->subscribeToTopic(
120 1
               $this->topicArn,
121 1
               'sqs',
122 1
               $this->sqs->getQueueArn($this->queueUrl)
123
           );
124
125
           // Add configured Subscribers to the SNS Topic
126 1
           foreach ($this->options['subscribers'] as $subscriber) {
127 1
                $this->subscribeToTopic(
128 1
                    $this->topicArn,
129 1
                    $subscriber['protocol'],
130 1
                    $subscriber['endpoint']
131
                );
132
            }
133
        }
134
135 1
        return true;
136
    }
137
138
    /**
139
     * @return Boolean
140
     */
141 1
    public function destroy()
142
    {
143 1
        $key = $this->getNameWithPrefix() . '_url';
144 1
        $this->cache->delete($key);
145
146 1
        if ($this->queueExists()) {
147
            // Delete the SQS Queue
148 1
            $this->sqs->deleteQueue([
149 1
                'QueueUrl' => $this->queueUrl
150
            ]);
151
152 1
            $this->log(200,"SQS Queue removed", ['QueueUrl' => $this->queueUrl]);
153
        }
154
155 1
        $key = $this->getNameWithPrefix() . '_arn';
156 1
        $this->cache->delete($key);
157
158 1
        if ($this->topicExists() || !empty($this->queueUrl)) {
159
            // Delete the SNS Topic
160 1
            $topicArn = !empty($this->topicArn)
161 1
                ? $this->topicArn
162 1
                : str_replace('sqs', 'sns', $this->queueUrl)
163
            ;
164
165 1
            $this->sns->deleteTopic([
166 1
                'TopicArn' => $topicArn
167
            ]);
168
169 1
            $this->log(200,"SNS Topic removed", ['TopicArn' => $topicArn]);
170
        }
171
172 1
        return true;
173
    }
174
175
    /**
176
     * {@inheritDoc}
177
     *
178
     * This method will either use a SNS Topic to publish a queued message or
179
     * straight to SQS depending on the application configuration.
180
     *
181
     * @return string
182
     */
183 2
    public function publish(array $message, array $options = [])
184
    {
185 2
        $options      = $this->mergeOptions($options);
186 2
        $publishStart = microtime(true);
187
188
        // ensures that the SQS Queue and SNS Topic exist
189 2
        if (!$this->queueExists() && $this->queueAutoSetup) {
190
            $this->create();
191
        }
192
193 2
        if ($options['push_notifications']) {
194
195 1
            if (!$this->topicExists()) {
196
                $this->create();
197
            }
198
199
            $message    = [
200 1
                'default'   => $this->getNameWithPrefix(),
201 1
                'sqs'       => json_encode($message),
202 1
                'http'      => $this->getNameWithPrefix(),
203 1
                'https'     => $this->getNameWithPrefix(),
204
            ];
205
206 1
            $result = $this->sns->publish([
207 1
                'TopicArn'          => $this->topicArn,
208 1
                'Subject'           => $this->getName(),
209 1
                'Message'           => json_encode($message),
210 1
                'MessageStructure'  => 'json'
211
            ]);
212
213
            $context = [
214 1
                'TopicArn'              => $this->topicArn,
215 1
                'MessageId'             => $result->get('MessageId'),
216 1
                'push_notifications'    => $options['push_notifications'],
217 1
                'publish_time'          => microtime(true) - $publishStart
218
            ];
219 1
            $this->log(200,"Message published to SNS", $context);
220
221 1
            return $result->get('MessageId');
222
        }
223
224 1
        $result = $this->sqs->sendMessage([
225 1
            'QueueUrl'      => $this->queueUrl,
226 1
            'MessageBody'   => json_encode($message),
227 1
            'DelaySeconds'  => $options['message_delay']
228
        ]);
229
230
        $context = [
231 1
            'QueueUrl'              => $this->queueUrl,
232 1
            'MessageId'             => $result->get('MessageId'),
233 1
            'push_notifications'    => $options['push_notifications']
234
        ];
235 1
        $this->log(200,"Message published to SQS", $context);
236
237 1
        return $result->get('MessageId');
238
    }
239
240
    /**
241
     * {@inheritDoc}
242
     */
243 2
    public function receive(array $options = [])
244
    {
245 2
        $options = $this->mergeOptions($options);
246
247 2
        if (!$this->queueExists() && $this->queueAutoSetup) {
248
            $this->create();
249
        }
250
251 2
        $result = $this->sqs->receiveMessage([
252 2
            'QueueUrl'              => $this->queueUrl,
253 2
            'MaxNumberOfMessages'   => $options['messages_to_receive'],
254 2
            'WaitTimeSeconds'       => $options['receive_wait_time']
255
        ]);
256
257 2
        $messages = $result->get('Messages') ?: [];
258
259
        // Convert to Message Class
260 2
        foreach ($messages as &$message) {
261 2
            $id = $message['MessageId'];
262
            $metadata = [
263 2
                'ReceiptHandle' => $message['ReceiptHandle'],
264 2
                'MD5OfBody'     => $message['MD5OfBody']
265
            ];
266
267
            // When using SNS, the SQS Body is the entire SNS Message
268 2
            if(is_array($body = json_decode($message['Body'], true))
269 2
                && isset($body['Message'])
270
            ) {
271 2
                $body = json_decode($body['Message'], true);
272
            }
273
274 2
            $message = new Message($id, $body, $metadata);
275
276 2
            $context = ['MessageId' => $id];
277 2
            $this->log(200,"Message fetched from SQS Queue", $context);
278
279
        }
280
281 2
        return $messages;
282
    }
283
284
    /**
285
     * {@inheritDoc}
286
     *
287
     * @return bool
288
     */
289 2
    public function delete($id)
290
    {
291 2
        if (!$this->queueExists()) {
292
            return false;
293
        }
294
295 2
        $this->sqs->deleteMessage([
296 2
            'QueueUrl'      => $this->queueUrl,
297 2
            'ReceiptHandle' => $id
298
        ]);
299
300
        $context = [
301 2
            'QueueUrl'      => $this->queueUrl,
302 2
            'ReceiptHandle' => $id
303
        ];
304 2
        $this->log(200,"Message deleted from SQS Queue", $context);
305
306 2
        return true;
307
    }
308
309
    /**
310
     * Return the Queue Url
311
     *
312
     * This method relies on in-memory cache and the Cache provider
313
     * to reduce the need to needlessly call the create method on an existing
314
     * Queue.
315
     *
316
     * @return boolean
317
     */
318 9
    public function queueExists()
319
    {
320 9
        if (isset($this->queueUrl)) {
321 3
            return true;
322
        }
323
324 7
        $key = $this->getNameWithPrefix() . '_url';
325 7
        if ($this->cache->contains($key)) {
326 1
            $this->queueUrl = $this->cache->fetch($key);
327
328 1
            return true;
329
        }
330
331
        try {
332 6
            $result = $this->sqs->getQueueUrl([
333 6
                'QueueName' => $this->getNameWithPrefix()
334
            ]);
335
336 6
            if ($this->queueUrl = $result->get('QueueUrl')) {
337 6
                $this->cache->save($key, $this->queueUrl);
338
339 6
                return true;
340
            }
341
        } catch (SqsException $e) {}
0 ignored issues
show
Coding Style Comprehensibility introduced by
Consider adding a comment why this CATCH block is empty.
Loading history...
342
343
        return false;
344
    }
345
346
    /**
347
     * Creates an SQS Queue and returns the Queue Url
348
     *
349
     * The create method for SQS Queues is idempotent - if the queue already
350
     * exists, this method will return the Queue Url of the existing Queue.
351
     *
352
     * @return string
353
     */
354 3
    public function createQueue()
355
    {
356 3
        $result = $this->sqs->createQueue([
357 3
            'QueueName' => $this->getNameWithPrefix(),
358
            'Attributes'    => [
359 3
                'VisibilityTimeout'             => $this->options['message_timeout'],
360 3
                'MessageRetentionPeriod'        => $this->options['message_expiration'],
361 3
                'ReceiveMessageWaitTimeSeconds' => $this->options['receive_wait_time']
362
            ]
363
        ]);
364
365 3
        $this->queueUrl = $result->get('QueueUrl');
366
367 3
        $key = $this->getNameWithPrefix() . '_url';
368 3
        $this->cache->save($key, $this->queueUrl);
369
370 3
        $this->log(200, "Created SQS Queue", ['QueueUrl' => $this->queueUrl]);
371
372 3
        if ($this->options['push_notifications']) {
373
374 2
            $policy = $this->createSqsPolicy();
375
376 2
            $this->sqs->setQueueAttributes([
377 2
                'QueueUrl'      => $this->queueUrl,
378
                'Attributes'    => [
379 2
                    'Policy'    => $policy,
380
                ]
381
            ]);
382
383 2
            $this->log(200, "Created Updated SQS Policy");
384
        }
385 3
    }
386
387
    /**
388
     * Creates a Policy for SQS that's required to allow SNS SendMessage access
389
     *
390
     * @return string
391
     */
392 2
    public function createSqsPolicy()
393
    {
394 2
        $arn = $this->sqs->getQueueArn($this->queueUrl);
395
396 2
        return json_encode([
397 2
            'Version'   => '2008-10-17',
398 2
            'Id'        =>  sprintf('%s/SQSDefaultPolicy', $arn),
399
            'Statement' => [
400
                [
401 2
                    'Sid'       => 'SNSPermissions',
402 2
                    'Effect'    => 'Allow',
403
                    'Principal' => ['AWS' => '*'],
404 2
                    'Action'    => 'SQS:SendMessage',
405 2
                    'Resource'  => $arn
406
                ]
407
            ]
408
        ]);
409
    }
410
411
    /**
412
     * Checks to see if a Topic exists
413
     *
414
     * This method relies on in-memory cache and the Cache provider
415
     * to reduce the need to needlessly call the create method on an existing
416
     * Topic.
417
     *
418
     * @return boolean
419
     */
420 3
    public function topicExists()
421
    {
422 3
        if (isset($this->topicArn)) {
423 1
            return true;
424
        }
425
426 3
        $key = $this->getNameWithPrefix() . '_arn';
427 3
        if ($this->cache->contains($key)) {
428 1
            $this->topicArn = $this->cache->fetch($key);
429
430 1
            return true;
431
        }
432
433 3
        if (!empty($this->queueUrl)) {
434 2
            $queueArn = $this->sqs->getQueueArn($this->queueUrl);
435 2
            $topicArn = str_replace('sqs', 'sns', $queueArn);
436
437
            try {
438 2
                $this->sns->getTopicAttributes([
439 2
                    'TopicArn' => $topicArn
440
                ]);
441
            } catch (NotFoundException $e) {
442
                return false;
443
            }
444
445 2
            $this->topicArn = $topicArn;
446 2
            $this->cache->save($key, $this->topicArn);
447
448 2
            return true;
449
        }
450
451 1
        return false;
452
    }
453
454
    /**
455
     * Creates a SNS Topic and returns the ARN
456
     *
457
     * The create method for the SNS Topics is idempotent - if the topic already
458
     * exists, this method will return the Topic ARN of the existing Topic.
459
     *
460
     *
461
     * @return false|null
462
     */
463 2
    public function createTopic()
464
    {
465 2
        if (!$this->options['push_notifications']) {
466 1
            return false;
467
        }
468
469 2
        $result = $this->sns->createTopic([
470 2
            'Name' => $this->getNameWithPrefix()
471
        ]);
472
473 2
        $this->topicArn = $result->get('TopicArn');
474
475 2
        $key = $this->getNameWithPrefix() . '_arn';
476 2
        $this->cache->save($key, $this->topicArn);
477
478 2
        $this->log(200, "Created SNS Topic", ['TopicARN' => $this->topicArn]);
479 2
    }
480
481
    /**
482
     * Get a list of Subscriptions for the specified SNS Topic
483
     *
484
     * @param string $topicArn The SNS Topic Arn
485
     *
486
     * @return array
487
     */
488 4
    public function getTopicSubscriptions($topicArn)
489
    {
490 4
        $result = $this->sns->listSubscriptionsByTopic([
491 4
            'TopicArn' => $topicArn
492
        ]);
493
494 4
        return $result->get('Subscriptions');
495
    }
496
497
    /**
498
     * Subscribes an endpoint to a SNS Topic
499
     *
500
     * @param string $topicArn The ARN of the Topic
501
     * @param string $protocol The protocol of the Endpoint
502
     * @param string $endpoint The Endpoint of the Subscriber
503
     *
504
     * @return string
505
     */
506 2
    public function subscribeToTopic($topicArn, $protocol, $endpoint)
507
    {
508
        // Check against the current Topic Subscriptions
509 2
        $subscriptions = $this->getTopicSubscriptions($topicArn);
510 2
        foreach ($subscriptions as $subscription) {
511 2
            if ($endpoint === $subscription['Endpoint']) {
512 2
                return $subscription['SubscriptionArn'];
513
            }
514
        }
515
516 1
        $result = $this->sns->subscribe([
517 1
            'TopicArn' => $topicArn,
518 1
            'Protocol' => $protocol,
519 1
            'Endpoint' => $endpoint
520
        ]);
521
522 1
        $arn = $result->get('SubscriptionArn');
523
524
        $context = [
525 1
            'Endpoint' => $endpoint,
526 1
            'Protocol' => $protocol,
527 1
            'SubscriptionArn' => $arn
528
        ];
529 1
        $this->log(200, "Endpoint Subscribed to SNS Topic", $context);
530
531 1
        return $arn;
532
    }
533
534
    /**
535
     * Unsubscribes an endpoint from a SNS Topic
536
     *
537
     * The method will return TRUE on success, or FALSE if the Endpoint did not
538
     * have a Subscription on the SNS Topic
539
     *
540
     * @param string $topicArn The ARN of the Topic
541
     * @param string $protocol The protocol of the Endpoint
542
     * @param string $endpoint The Endpoint of the Subscriber
543
     *
544
     * @return Boolean
545
     */
546 1
    public function unsubscribeFromTopic($topicArn, $protocol, $endpoint)
547
    {
548
        // Check against the current Topic Subscriptions
549 1
        $subscriptions = $this->getTopicSubscriptions($topicArn);
550 1
        foreach ($subscriptions as $subscription) {
551 1
            if ($endpoint === $subscription['Endpoint']) {
552 1
                $this->sns->unsubscribe([
553 1
                    'SubscriptionArn' => $subscription['SubscriptionArn']
554
                ]);
555
556
                $context = [
557 1
                    'Endpoint' => $endpoint,
558 1
                    'Protocol' => $protocol,
559 1
                    'SubscriptionArn' => $subscription['SubscriptionArn']
560
                ];
561 1
                $this->log(200,"Endpoint unsubscribed from SNS Topic", $context);
562
563 1
                return true;
564
            }
565
        }
566
567 1
        return false;
568
    }
569
570
    /**
571
     * Handles SNS Notifications
572
     *
573
     * For Subscription notifications, this method will automatically confirm
574
     * the Subscription request
575
     *
576
     * For Message notifications, this method polls the queue and dispatches
577
     * the `{queue}.message_received` event for each message retrieved
578
     *
579
     * @param NotificationEvent $event The Notification Event
580
     * @param string $eventName Name of the event
581
     * @param EventDispatcherInterface $dispatcher
582
     * @return bool|void
583
     */
584 2
    public function onNotification(NotificationEvent $event, $eventName, EventDispatcherInterface $dispatcher)
585
    {
586 2
        if (NotificationEvent::TYPE_SUBSCRIPTION == $event->getType()) {
587 1
            $topicArn   = $event->getNotification()->getMetadata()->get('TopicArn');
0 ignored issues
show
Bug introduced by
The method getMetadata cannot be called on $event->getNotification() (of type array).

Methods can only be called on objects. This check looks for methods being called on variables that have been inferred to never be objects.

Loading history...
588 1
            $token      = $event->getNotification()->getMetadata()->get('Token');
0 ignored issues
show
Bug introduced by
The method getMetadata cannot be called on $event->getNotification() (of type array).

Methods can only be called on objects. This check looks for methods being called on variables that have been inferred to never be objects.

Loading history...
589
590 1
            $this->sns->confirmSubscription([
591 1
                'TopicArn'  => $topicArn,
592 1
                'Token'     => $token
593
            ]);
594
595 1
            $context = ['TopicArn' => $topicArn];
596 1
            $this->log(200,"Subscription to SNS Confirmed", $context);
597
598 1
            return;
599
        }
600
601 1
        $messages = $this->receive();
602 1
        foreach ($messages as $message) {
603
604 1
            $messageEvent = new MessageEvent($this->name, $message);
605 1
            $dispatcher->dispatch(Events::Message($this->name), $messageEvent);
606
        }
607 1
    }
608
609
    /**
610
     * Removes the message from queue after all other listeners have fired
611
     *
612
     * If an earlier listener has erred or stopped propagation, this method
613
     * will not fire and the Queued Message should become visible in queue again.
614
     *
615
     * Stops Event Propagation after removing the Message
616
     *
617
     * @param MessageEvent $event The SQS Message Event
618
     * @return bool|void
619
     */
620 1
    public function onMessageReceived(MessageEvent $event)
621
    {
622
        $receiptHandle = $event
623 1
            ->getMessage()
624 1
            ->getMetadata()
625 1
            ->get('ReceiptHandle');
626
627 1
        $this->delete($receiptHandle);
628
629 1
        $event->stopPropagation();
630 1
    }
631
}
632