AwsProvider::createQueue()   B
last analyzed

Complexity

Conditions 4
Paths 6

Size

Total Lines 38
Code Lines 22

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 20
CRAP Score 4.0016

Importance

Changes 0
Metric Value
dl 0
loc 38
rs 8.5806
c 0
b 0
f 0
ccs 20
cts 21
cp 0.9524
cc 4
eloc 22
nc 6
nop 0
crap 4.0016
1
<?php
2
3
/**
4
 * Copyright 2014 Underground Elephant
5
 *
6
 * Licensed under the Apache License, Version 2.0 (the "License");
7
 * you may not use this file except in compliance with the License.
8
 * You may obtain a copy of the License at
9
 *
10
 *     http://www.apache.org/licenses/LICENSE-2.0
11
 *
12
 * Unless required by applicable law or agreed to in writing, software
13
 * distributed under the License is distributed on an "AS IS" BASIS,
14
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15
 * See the License for the specific language governing permissions and
16
 * limitations under the License.
17
 *
18
 * @package     qpush-bundle
19
 * @copyright   Underground Elephant 2014
20
 * @license     Apache License, Version 2.0
21
 */
22
23
namespace Uecode\Bundle\QPushBundle\Provider;
24
25
use Aws\Sns\Exception\SnsException;
26
use Aws\Sns\SnsClient;
27
use Aws\Sqs\Exception\SqsException;
28
use Aws\Sqs\SqsClient;
29
30
use Doctrine\Common\Cache\Cache;
31
use Monolog\Logger;
32
use Symfony\Component\EventDispatcher\EventDispatcherInterface;
33
use Uecode\Bundle\QPushBundle\Event\Events;
34
use Uecode\Bundle\QPushBundle\Event\MessageEvent;
35
use Uecode\Bundle\QPushBundle\Event\NotificationEvent;
36
use Uecode\Bundle\QPushBundle\Message\Message;
37
38
/**
39
 * @author Keith Kirk <[email protected]>
40
 */
41
class AwsProvider extends AbstractProvider
42
{
43
    /**
44
     * Aws SQS Client
45
     *
46
     * @var SqsClient
47
     */
48
    private $sqs;
49
50
    /**
51
     * Aws SNS Client
52
     *
53
     * @var SnsClient
54
     */
55
    private $sns;
56
57
    /**
58
     * SQS Queue URL
59
     *
60
     * @var string
61
     */
62
    private $queueUrl;
63
64
    /**
65
     * SNS Topic ARN
66
     *
67
     * @var string
68
     */
69
    private $topicArn;
70
71
    /**
72
     * @param string $name
73
     * @param array  $options
74
     * @param mixed  $client
75
     * @param Cache  $cache
76
     * @param Logger $logger
77
     */
78 15
    public function __construct($name, array $options, $client, Cache $cache, Logger $logger)
79
    {
80 15
        $this->name     = $name;
81 15
        $this->options  = $options;
82 15
        $this->cache    = $cache;
83 15
        $this->logger   = $logger;
84
85
        // get() method used for sdk v2, create methods for v3
86 15
        $useGet = method_exists($client, 'get');
87 15
        $this->sqs = $useGet ? $client->get('Sqs') : $client->createSqs();
88 15
        $this->sns = $useGet ? $client->get('Sns') : $client->createSns();
89 15
    }
90
91
    /**
92
     * @return string
93
     */
94 1
    public function getProvider()
95
    {
96 1
        return 'AWS';
97
    }
98
99
    /**
100
     * Builds the configured queues
101
     *
102
     * If a Queue name is passed and configured, this method will build only that
103
     * Queue.
104
     *
105
     * All Create methods are idempotent, if the resource exists, the current ARN
106
     * will be returned
107
     *
108
     * @return bool
109
     */
110 1
    public function create()
111
    {
112 1
        $this->createQueue();
113
114 1
        if ($this->options['push_notifications']) {
115
            // Create the SNS Topic
116 1
            $this->createTopic();
117
118
            // Add the SQS Queue as a Subscriber to the SNS Topic
119 1
            $this->subscribeToTopic(
120 1
                $this->topicArn,
121 1
                'sqs',
122 1
                $this->sqs->getQueueArn($this->queueUrl)
123
            );
124
125
            // Add configured Subscribers to the SNS Topic
126 1
            foreach ($this->options['subscribers'] as $subscriber) {
127 1
                $this->subscribeToTopic(
128 1
                    $this->topicArn,
129 1
                    $subscriber['protocol'],
130 1
                    $subscriber['endpoint']
131
                );
132
            }
133
        }
134
135 1
        return true;
136
    }
137
138
    /**
139
     * @return Boolean
140
     */
141 1
    public function destroy()
142
    {
143 1
        $key = $this->getNameWithPrefix() . '_url';
144 1
        $this->cache->delete($key);
145
146 1
        if ($this->queueExists()) {
147
            // Delete the SQS Queue
148 1
            $this->sqs->deleteQueue([
149 1
                'QueueUrl' => $this->queueUrl
150
            ]);
151
152 1
            $this->log(200,"SQS Queue removed", ['QueueUrl' => $this->queueUrl]);
153
        }
154
155 1
        $key = $this->getNameWithPrefix() . '_arn';
156 1
        $this->cache->delete($key);
157
158 1
        if ($this->topicExists() || !empty($this->queueUrl)) {
159
            // Delete the SNS Topic
160 1
            $topicArn = !empty($this->topicArn)
161 1
                ? $this->topicArn
162 1
                : str_replace('sqs', 'sns', $this->queueUrl)
163
            ;
164
165 1
            $this->sns->deleteTopic([
166 1
                'TopicArn' => $topicArn
167
            ]);
168
169 1
            $this->log(200,"SNS Topic removed", ['TopicArn' => $topicArn]);
170
        }
171
172 1
        return true;
173
    }
174
175
    /**
176
     * {@inheritDoc}
177
     *
178
     * This method will either use a SNS Topic to publish a queued message or
179
     * straight to SQS depending on the application configuration.
180
     *
181
     * @return string
182
     */
183 2
    public function publish(array $message, array $options = [])
184
    {
185 2
        $mergedOptions = $this->mergeOptions($options);
186
187 2
        if (isset($options['message_deduplication_id'])) {
188
            $mergedOptions['message_deduplication_id'] = $options['message_deduplication_id'];
189
        }
190
191 2
        if (isset($options['message_group_id'])) {
192
            $mergedOptions['message_group_id'] = $options['message_group_id'];
193
        }
194
195 2
        $options = $mergedOptions;
196
197 2
        $publishStart = microtime(true);
198
199
        // ensures that the SQS Queue and SNS Topic exist
200 2
        if (!$this->queueExists()) {
201
            $this->create();
202
        }
203
204 2
        if ($options['push_notifications']) {
205
206 1
            if (!$this->topicExists()) {
207
                $this->create();
208
            }
209
210
            $message    = [
211 1
                'default' => $this->getNameWithPrefix(),
212 1
                'sqs'     => json_encode($message),
213 1
                'http'    => $this->getNameWithPrefix(),
214 1
                'https'   => $this->getNameWithPrefix(),
215
            ];
216
217 1
            $result = $this->sns->publish([
218 1
                'TopicArn'         => $this->topicArn,
219 1
                'Subject'          => $this->getName(),
220 1
                'Message'          => json_encode($message),
221 1
                'MessageStructure' => 'json'
222
            ]);
223
224
            $context = [
225 1
                'TopicArn'           => $this->topicArn,
226 1
                'MessageId'          => $result->get('MessageId'),
227 1
                'push_notifications' => $options['push_notifications'],
228 1
                'publish_time'       => microtime(true) - $publishStart
229
            ];
230 1
            $this->log(200,"Message published to SNS", $context);
231
232 1
            return $result->get('MessageId');
233
        }
234
235
        $arguments = [
236 1
            'QueueUrl'      => $this->queueUrl,
237 1
            'MessageBody'   => json_encode($message),
238 1
            'DelaySeconds'  => $options['message_delay']
239
        ];
240
241 1
        if ($this->isQueueFIFO()) {
242
            if (isset($options['message_deduplication_id'])) {
243
                // Always use user supplied dedup id
244
                $arguments['MessageDeduplicationId'] = $options['message_deduplication_id'];
245
            } elseif ($options['content_based_deduplication'] !== true) {
246
                // If none is supplied and option "content_based_deduplication" is not set, generate default
247
                $arguments['MessageDeduplicationId'] = hash('sha256', json_encode($message));
248
            }
249
250
            $arguments['MessageGroupId'] = $this->getNameWithPrefix();
251
            if (isset($options['message_group_id'])) {
252
                $arguments['MessageGroupId'] = $options['message_group_id'];
253
            }
254
        }
255
256 1
        $result = $this->sqs->sendMessage($arguments);
257
258
        $context = [
259 1
            'QueueUrl'              => $this->queueUrl,
260 1
            'MessageId'             => $result->get('MessageId'),
261 1
            'push_notifications'    => $options['push_notifications'],
262 1
            'fifo'                  => $options['fifo']
263
        ];
264
265 1
        if ($this->isQueueFIFO()) {
266
            if (isset($arguments['MessageDeduplicationId'])) {
267
                $context['message_deduplication_id'] = $arguments['MessageDeduplicationId'];
268
            }
269
            $context['message_group_id'] = $arguments['MessageGroupId'];
270
        }
271
272 1
        $this->log(200,"Message published to SQS", $context);
273
274 1
        return $result->get('MessageId');
275
    }
276
277
    /**
278
     * {@inheritDoc}
279
     */
280 2
    public function receive(array $options = [])
281
    {
282 2
        $options = $this->mergeOptions($options);
283
284 2
        if (!$this->queueExists()) {
285
            $this->create();
286
        }
287
288 2
        $result = $this->sqs->receiveMessage([
289 2
            'QueueUrl'              => $this->queueUrl,
290 2
            'MaxNumberOfMessages'   => $options['messages_to_receive'],
291 2
            'WaitTimeSeconds'       => $options['receive_wait_time']
292
        ]);
293
294 2
        $messages = $result->get('Messages') ?: [];
295
296
        // Convert to Message Class
297 2
        foreach ($messages as &$message) {
298 2
            $id = $message['MessageId'];
299
            $metadata = [
300 2
                'ReceiptHandle' => $message['ReceiptHandle'],
301 2
                'MD5OfBody'     => $message['MD5OfBody']
302
            ];
303
304
            // When using SNS, the SQS Body is the entire SNS Message
305 2
            if(is_array($body = json_decode($message['Body'], true))
306 2
                && isset($body['Message'])
307
            ) {
308 2
                $body = json_decode($body['Message'], true);
309
            }
310
311 2
            $message = new Message($id, $body, $metadata);
312
313 2
            $context = ['MessageId' => $id];
314 2
            $this->log(200,"Message fetched from SQS Queue", $context);
315
316
        }
317
318 2
        return $messages;
319
    }
320
321
    /**
322
     * {@inheritDoc}
323
     *
324
     * @return bool
325
     */
326 2
    public function delete($id)
327
    {
328 2
        if (!$this->queueExists()) {
329
            return false;
330
        }
331
332 2
        $this->sqs->deleteMessage([
333 2
            'QueueUrl'      => $this->queueUrl,
334 2
            'ReceiptHandle' => $id
335
        ]);
336
337
        $context = [
338 2
            'QueueUrl'      => $this->queueUrl,
339 2
            'ReceiptHandle' => $id
340
        ];
341 2
        $this->log(200,"Message deleted from SQS Queue", $context);
342
343 2
        return true;
344
    }
345
346
    /**
347
     * Return the Queue Url
348
     *
349
     * This method relies on in-memory cache and the Cache provider
350
     * to reduce the need to needlessly call the create method on an existing
351
     * Queue.
352
     *
353
     * @return boolean
354
     */
355 10
    public function queueExists()
356
    {
357 10
        if (isset($this->queueUrl)) {
358 4
            return true;
359
        }
360
361 8
        $key = $this->getNameWithPrefix() . '_url';
362 8
        if ($this->cache->contains($key)) {
363 2
            $this->queueUrl = $this->cache->fetch($key);
364
365 2
            return true;
366
        }
367
368
        try {
369 6
            $result = $this->sqs->getQueueUrl([
370 6
                'QueueName' => $this->getNameWithPrefix()
371
            ]);
372
373 6
            $this->queueUrl = $result->get('QueueUrl');
374 6
            if ($this->queueUrl !== null) {
375 6
                $this->cache->save($key, $this->queueUrl);
376
377 6
                return true;
378
            }
379
        } catch (SqsException $e) {}
0 ignored issues
show
Coding Style Comprehensibility introduced by
Consider adding a comment why this CATCH block is empty.
Loading history...
380
381
        return false;
382
    }
383
384
    /**
385
     * Creates an SQS Queue and returns the Queue Url
386
     *
387
     * The create method for SQS Queues is idempotent - if the queue already
388
     * exists, this method will return the Queue Url of the existing Queue.
389
     */
390 4
    public function createQueue()
391
    {
392
        $attributes = [
393 4
            'VisibilityTimeout'             => $this->options['message_timeout'],
394 4
            'MessageRetentionPeriod'        => $this->options['message_expiration'],
395 4
            'ReceiveMessageWaitTimeSeconds' => $this->options['receive_wait_time']
396
        ];
397
398 4
        if ($this->isQueueFIFO()) {
399 1
            $attributes['FifoQueue'] = 'true';
400 1
            $attributes['ContentBasedDeduplication'] = $this->options['content_based_deduplication'] === true
401
                ? 'true'
402 1
                : 'false';
403
        }
404
405 4
        $result = $this->sqs->createQueue(['QueueName' => $this->getNameWithPrefix(), 'Attributes' => $attributes]);
406
407 4
        $this->queueUrl = $result->get('QueueUrl');
408
409 4
        $key = $this->getNameWithPrefix() . '_url';
410 4
        $this->cache->save($key, $this->queueUrl);
411
412 4
        $this->log(200, "Created SQS Queue", ['QueueUrl' => $this->queueUrl]);
413
414 4
        if ($this->options['push_notifications']) {
415
416 3
            $policy = $this->createSqsPolicy();
417
418 3
            $this->sqs->setQueueAttributes([
419 3
                'QueueUrl'      => $this->queueUrl,
420
                'Attributes'    => [
421 3
                    'Policy'    => $policy,
422
                ]
423
            ]);
424
425 3
            $this->log(200, "Created Updated SQS Policy");
426
        }
427 4
    }
428
429
    /**
430
     * Creates a Policy for SQS that's required to allow SNS SendMessage access
431
     *
432
     * @return string
433
     */
434 2
    public function createSqsPolicy()
435
    {
436 2
        $arn = $this->sqs->getQueueArn($this->queueUrl);
437
438 2
        return json_encode([
439 2
            'Version'   => '2008-10-17',
440 2
            'Id'        =>  sprintf('%s/SQSDefaultPolicy', $arn),
441
            'Statement' => [
442
                [
443 2
                    'Sid'       => 'SNSPermissions',
444 2
                    'Effect'    => 'Allow',
445
                    'Principal' => ['AWS' => '*'],
446 2
                    'Action'    => 'SQS:SendMessage',
447 2
                    'Resource'  => $arn
448
                ]
449
            ]
450
        ]);
451
    }
452
453
    /**
454
     * Checks to see if a Topic exists
455
     *
456
     * This method relies on in-memory cache and the Cache provider
457
     * to reduce the need to needlessly call the create method on an existing
458
     * Topic.
459
     *
460
     * @return boolean
461
     */
462 3
    public function topicExists()
463
    {
464 3
        if (isset($this->topicArn)) {
465 1
            return true;
466
        }
467
468 3
        $key = $this->getNameWithPrefix() . '_arn';
469 3
        if ($this->cache->contains($key)) {
470 1
            $this->topicArn = $this->cache->fetch($key);
471
472 1
            return true;
473
        }
474
475 3
        if (!empty($this->queueUrl)) {
476 2
            $queueArn = $this->sqs->getQueueArn($this->queueUrl);
477 2
            $topicArn = str_replace('sqs', 'sns', $queueArn);
478
479
            try {
480 2
                $this->sns->getTopicAttributes([
481 2
                    'TopicArn' => $topicArn
482
                ]);
483
            } catch (SnsException $e) {
484
                return false;
485
            }
486
487 2
            $this->topicArn = $topicArn;
488 2
            $this->cache->save($key, $this->topicArn);
489
490 2
            return true;
491
        }
492
493 1
        return false;
494
    }
495
496
    /**
497
     * Creates a SNS Topic and returns the ARN
498
     *
499
     * The create method for the SNS Topics is idempotent - if the topic already
500
     * exists, this method will return the Topic ARN of the existing Topic.
501
     *
502
     *
503
     * @return bool
504
     */
505 2
    public function createTopic()
506
    {
507 2
        if (!$this->options['push_notifications']) {
508 1
            return false;
509
        }
510
511 2
        $name = str_replace('.', '-', $this->getNameWithPrefix());
512 2
        $result = $this->sns->createTopic([
513 2
            'Name' => $name
514
        ]);
515
516 2
        $this->topicArn = $result->get('TopicArn');
517
518 2
        $key = $name . '_arn';
519 2
        $this->cache->save($key, $this->topicArn);
520
521 2
        $this->log(200, "Created SNS Topic", ['TopicARN' => $this->topicArn]);
522
523 2
        return true;
524
    }
525
526
    /**
527
     * Get a list of Subscriptions for the specified SNS Topic
528
     *
529
     * @param string $topicArn The SNS Topic Arn
530
     *
531
     * @return array
532
     */
533 4
    public function getTopicSubscriptions($topicArn)
534
    {
535 4
        $result = $this->sns->listSubscriptionsByTopic([
536 4
            'TopicArn' => $topicArn
537
        ]);
538
539 4
        return $result->get('Subscriptions');
540
    }
541
542
    /**
543
     * Subscribes an endpoint to a SNS Topic
544
     *
545
     * @param string $topicArn The ARN of the Topic
546
     * @param string $protocol The protocol of the Endpoint
547
     * @param string $endpoint The Endpoint of the Subscriber
548
     *
549
     * @return string
550
     */
551 2
    public function subscribeToTopic($topicArn, $protocol, $endpoint)
552
    {
553
        // Check against the current Topic Subscriptions
554 2
        $subscriptions = $this->getTopicSubscriptions($topicArn);
555 2
        foreach ($subscriptions as $subscription) {
556 2
            if ($endpoint === $subscription['Endpoint']) {
557 2
                return $subscription['SubscriptionArn'];
558
            }
559
        }
560
561 1
        $result = $this->sns->subscribe([
562 1
            'TopicArn' => $topicArn,
563 1
            'Protocol' => $protocol,
564 1
            'Endpoint' => $endpoint
565
        ]);
566
567 1
        $arn = $result->get('SubscriptionArn');
568
569
        $context = [
570 1
            'Endpoint' => $endpoint,
571 1
            'Protocol' => $protocol,
572 1
            'SubscriptionArn' => $arn
573
        ];
574 1
        $this->log(200, "Endpoint Subscribed to SNS Topic", $context);
575
576 1
        return $arn;
577
    }
578
579
    /**
580
     * Unsubscribes an endpoint from a SNS Topic
581
     *
582
     * The method will return TRUE on success, or FALSE if the Endpoint did not
583
     * have a Subscription on the SNS Topic
584
     *
585
     * @param string $topicArn The ARN of the Topic
586
     * @param string $protocol The protocol of the Endpoint
587
     * @param string $endpoint The Endpoint of the Subscriber
588
     *
589
     * @return Boolean
590
     */
591 1
    public function unsubscribeFromTopic($topicArn, $protocol, $endpoint)
592
    {
593
        // Check against the current Topic Subscriptions
594 1
        $subscriptions = $this->getTopicSubscriptions($topicArn);
595 1
        foreach ($subscriptions as $subscription) {
596 1
            if ($endpoint === $subscription['Endpoint']) {
597 1
                $this->sns->unsubscribe([
598 1
                    'SubscriptionArn' => $subscription['SubscriptionArn']
599
                ]);
600
601
                $context = [
602 1
                    'Endpoint' => $endpoint,
603 1
                    'Protocol' => $protocol,
604 1
                    'SubscriptionArn' => $subscription['SubscriptionArn']
605
                ];
606 1
                $this->log(200,"Endpoint unsubscribed from SNS Topic", $context);
607
608 1
                return true;
609
            }
610
        }
611
612 1
        return false;
613
    }
614
615
    /**
616
     * Handles SNS Notifications
617
     *
618
     * For Subscription notifications, this method will automatically confirm
619
     * the Subscription request
620
     *
621
     * For Message notifications, this method polls the queue and dispatches
622
     * the `{queue}.message_received` event for each message retrieved
623
     *
624
     * @param NotificationEvent $event The Notification Event
625
     * @param string $eventName Name of the event
626
     * @param EventDispatcherInterface $dispatcher
627
     *
628
     * @return void
629
     */
630 2
    public function onNotification(NotificationEvent $event, $eventName, EventDispatcherInterface $dispatcher)
631
    {
632 2
        if (NotificationEvent::TYPE_SUBSCRIPTION == $event->getType()) {
633 1
            $topicArn   = $event->getNotification()->getMetadata()->get('TopicArn');
0 ignored issues
show
Bug introduced by
The method getMetadata cannot be called on $event->getNotification() (of type array).

Methods can only be called on objects. This check looks for methods being called on variables that have been inferred to never be objects.

Loading history...
634 1
            $token      = $event->getNotification()->getMetadata()->get('Token');
0 ignored issues
show
Bug introduced by
The method getMetadata cannot be called on $event->getNotification() (of type array).

Methods can only be called on objects. This check looks for methods being called on variables that have been inferred to never be objects.

Loading history...
635
636 1
            $this->sns->confirmSubscription([
637 1
                'TopicArn'  => $topicArn,
638 1
                'Token'     => $token
639
            ]);
640
641 1
            $context = ['TopicArn' => $topicArn];
642 1
            $this->log(200,"Subscription to SNS Confirmed", $context);
643
644 1
            return;
645
        }
646
647 1
        $messages = $this->receive();
648 1
        foreach ($messages as $message) {
649
650 1
            $messageEvent = new MessageEvent($this->name, $message);
651 1
            $dispatcher->dispatch(Events::Message($this->name), $messageEvent);
652
        }
653 1
    }
654
655
    /**
656
     * Removes the message from queue after all other listeners have fired
657
     *
658
     * If an earlier listener has erred or stopped propagation, this method
659
     * will not fire and the Queued Message should become visible in queue again.
660
     *
661
     * Stops Event Propagation after removing the Message
662
     *
663
     * @param MessageEvent $event The SQS Message Event
664
     *
665
     * @return void
666
     */
667 1
    public function onMessageReceived(MessageEvent $event)
668
    {
669
        $receiptHandle = $event
670 1
            ->getMessage()
671 1
            ->getMetadata()
672 1
            ->get('ReceiptHandle');
673
674 1
        $this->delete($receiptHandle);
675
676 1
        $event->stopPropagation();
677 1
    }
678
679
    /**
680
     * @return bool
681
     */
682 3
    private function isQueueFIFO()
683
    {
684 3
        return $this->options['fifo'] === true;
685
    }
686
}
687