Completed
Pull Request — master (#136)
by
unknown
02:54
created

AwsProvider::subscribeToTopic()   B

Complexity

Conditions 3
Paths 3

Size

Total Lines 27
Code Lines 16

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 0
CRAP Score 12

Importance

Changes 0
Metric Value
c 0
b 0
f 0
dl 0
loc 27
ccs 0
cts 22
cp 0
rs 8.8571
cc 3
eloc 16
nc 3
nop 3
crap 12
1
<?php
2
3
/**
4
 * Copyright 2014 Underground Elephant
5
 *
6
 * Licensed under the Apache License, Version 2.0 (the "License");
7
 * you may not use this file except in compliance with the License.
8
 * You may obtain a copy of the License at
9
 *
10
 *     http://www.apache.org/licenses/LICENSE-2.0
11
 *
12
 * Unless required by applicable law or agreed to in writing, software
13
 * distributed under the License is distributed on an "AS IS" BASIS,
14
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15
 * See the License for the specific language governing permissions and
16
 * limitations under the License.
17
 *
18
 * @package     qpush-bundle
19
 * @copyright   Underground Elephant 2014
20
 * @license     Apache License, Version 2.0
21
 */
22
23
namespace Uecode\Bundle\QPushBundle\Provider;
24
25
use Aws\Sns\Exception\SnsException;
26
use Aws\Sns\SnsClient;
27
use Aws\Sqs\Exception\SqsException;
28
use Aws\Sqs\SqsClient;
29
30
use Doctrine\Common\Cache\Cache;
31
use Monolog\Logger;
32
use Symfony\Component\EventDispatcher\EventDispatcherInterface;
33
use Uecode\Bundle\QPushBundle\Event\Events;
34
use Uecode\Bundle\QPushBundle\Event\MessageEvent;
35
use Uecode\Bundle\QPushBundle\Event\NotificationEvent;
36
use Uecode\Bundle\QPushBundle\Message\Message;
37
38
/**
39
 * @author Keith Kirk <[email protected]>
40
 */
41
class AwsProvider extends AbstractProvider
42
{
43
    /**
44
     * Aws SQS Client
45
     *
46
     * @var SqsClient
47
     */
48
    private $sqs;
49
50
    /**
51
     * Aws SNS Client
52
     *
53
     * @var SnsClient
54
     */
55
    private $sns;
56
57
    /**
58
     * SQS Queue URL
59
     *
60
     * @var string
61
     */
62
    private $queueUrl;
63
64
    /**
65
     * SNS Topic ARN
66
     *
67
     * @var string
68
     */
69
    private $topicArn;
70
71
    /**
72
     * @param string $name
73
     * @param array  $options
74
     * @param mixed  $client
75
     * @param Cache  $cache
76
     * @param Logger $logger
77
     */
78
    public function __construct($name, array $options, $client, Cache $cache, Logger $logger)
79
    {
80
        $this->name     = $name;
81
        $this->options  = $options;
82
        $this->cache    = $cache;
83
        $this->logger   = $logger;
84
85
        // get() method used for sdk v2, create methods for v3
86
        $useGet = method_exists($client, 'get');
87
        $this->sqs = $useGet ? $client->get('Sqs') : $client->createSqs();
88
        $this->sns = $useGet ? $client->get('Sns') : $client->createSns();
89
    }
90
91
    /**
92
     * @return string
93
     */
94
    public function getProvider()
95
    {
96
        return 'AWS';
97
    }
98
99
    /**
100
     * Builds the configured queues
101
     *
102
     * If a Queue name is passed and configured, this method will build only that
103
     * Queue.
104
     *
105
     * All Create methods are idempotent, if the resource exists, the current ARN
106
     * will be returned
107
     *
108
     * @return bool
109
     */
110
    public function create()
111
    {
112
        $this->createQueue();
113
114
        if ($this->options['push_notifications']) {
115
            // Create the SNS Topic
116
            $this->createTopic();
117
118
            if(!$this->options['push_notifications_only']) {
119
                // Add the SQS Queue as a Subscriber to the SNS Topic
120
                $this->subscribeToTopic(
121
                    $this->topicArn,
122
                    'sqs',
123
                    $this->sqs->getQueueArn($this->queueUrl)
124
                );
125
            }
126
127
            // Add configured Subscribers to the SNS Topic
128
            foreach ($this->options['subscribers'] as $subscriber) {
129
                $this->subscribeToTopic(
130
                    $this->topicArn,
131
                    $subscriber['protocol'],
132
                    $subscriber['endpoint']
133
                );
134
            }
135
        }
136
137
        return true;
138
    }
139
140
    /**
141
     * @return Boolean
142
     */
143
    public function destroy()
144
    {
145
        $key = $this->getNameWithPrefix() . '_url';
146
        $this->cache->delete($key);
147
148
        if ($this->queueExists()) {
149
            // Delete the SQS Queue
150
            $this->sqs->deleteQueue([
151
                'QueueUrl' => $this->queueUrl
152
            ]);
153
154
            $this->log(200,"SQS Queue removed", ['QueueUrl' => $this->queueUrl]);
155
        }
156
157
        $key = $this->getNameWithPrefix() . '_arn';
158
        $this->cache->delete($key);
159
160
        if ($this->options['push_notifications_only'] || ($this->topicExists() || !empty($this->queueUrl))) {
161
            // Delete the SNS Topic
162
            $topicArn = !empty($this->topicArn)
163
                ? $this->topicArn
164
                : str_replace('sqs', 'sns', $this->queueUrl)
165
            ;
166
167
            $this->sns->deleteTopic([
168
                'TopicArn' => $topicArn
169
            ]);
170
171
            $this->log(200,"SNS Topic removed", ['TopicArn' => $topicArn]);
172
        }
173
174
        return true;
175
    }
176
177
    /**
178
     * {@inheritDoc}
179
     *
180
     * This method will either use a SNS Topic to publish a queued message or
181
     * straight to SQS depending on the application configuration.
182
     *
183
     * @return string
184
     */
185
    public function publish(array $message, array $options = [])
186
    {
187
        $mergedOptions = $this->mergeOptions($options);
188
189
        if (isset($options['message_deduplication_id'])) {
190
            $mergedOptions['message_deduplication_id'] = $options['message_deduplication_id'];
191
        }
192
193
        if (isset($options['message_group_id'])) {
194
            $mergedOptions['message_group_id'] = $options['message_group_id'];
195
        }
196
197
        $options = $mergedOptions;
198
199
        $publishStart = microtime(true);
200
201
        // ensures that the SQS Queue and SNS Topic exist
202
        if(!$this->options['push_notifications_only']) {
203
            if (!$this->queueExists()) {
204
                $this->create();
205
            }
206
        }
207
208
        if ($options['push_notifications']) {
209
210
            if (!$this->topicExists()) {
211
                $this->create();
212
            }
213
            
214
            if($this->options['push_notifications_only']) {
215
                $jsonMessage = json_encode($message);
216
                $message    = [
217
                    'default'  => $jsonMessage,
218
                    'http'     => $jsonMessage,
219
                    'https'    => $jsonMessage,
220
                ];
221
            } else {
222
                $message    = [
223
                    'default'  => $this->getNameWithPrefix(),
224
                    'sqs'      => json_encode($message),
225
                    'http'     => $this->getNameWithPrefix(),
226
                    'https'    => $this->getNameWithPrefix(),
227
                ];
228
            }
229
230
            $result = $this->sns->publish([
231
                'TopicArn'         => $this->topicArn,
232
                'Subject'          => $this->getName(),
233
                'Message'          => json_encode($message),
234
                'MessageStructure' => 'json'
235
            ]);
236
237
            $context = [
238
                'TopicArn'           => $this->topicArn,
239
                'MessageId'          => $result->get('MessageId'),
240
                'push_notifications' => $options['push_notifications'],
241
                'publish_time'       => microtime(true) - $publishStart
242
            ];
243
            $this->log(200,"Message published to SNS", $context);
244
245
            return $result->get('MessageId');
246
        }
247
248
        $arguments = [
249
            'QueueUrl'      => $this->queueUrl,
250
            'MessageBody'   => json_encode($message),
251
            'DelaySeconds'  => $options['message_delay']
252
        ];
253
254
        if ($this->isQueueFIFO()) {
255
            if (isset($options['message_deduplication_id'])) {
256
                // Always use user supplied dedup id
257
                $arguments['MessageDeduplicationId'] = $options['message_deduplication_id'];
258
            } elseif ($options['content_based_deduplication'] !== true) {
259
                // If none is supplied and option "content_based_deduplication" is not set, generate default
260
                $arguments['MessageDeduplicationId'] = hash('sha256', json_encode($message));
261
            }
262
263
            $arguments['MessageGroupId'] = $this->getNameWithPrefix();
264
            if (isset($options['message_group_id'])) {
265
                $arguments['MessageGroupId'] = $options['message_group_id'];
266
            }
267
        }
268
269
        $result = $this->sqs->sendMessage($arguments);
270
271
        $context = [
272
            'QueueUrl'              => $this->queueUrl,
273
            'MessageId'             => $result->get('MessageId'),
274
            'push_notifications'    => $options['push_notifications'],
275
            'fifo'                  => $options['fifo']
276
        ];
277
278
        if ($this->isQueueFIFO()) {
279
            if (isset($arguments['MessageDeduplicationId'])) {
280
                $context['message_deduplication_id'] = $arguments['MessageDeduplicationId'];
281
            }
282
            $context['message_group_id'] = $arguments['MessageGroupId'];
283
        }
284
285
        $this->log(200,"Message published to SQS", $context);
286
287
        return $result->get('MessageId');
288
    }
289
290
    /**
291
     * {@inheritDoc}
292
     */
293
    public function receive(array $options = [])
294
    {
295
        $options = $this->mergeOptions($options);
296
297
        if (!$this->queueExists()) {
298
            $this->create();
299
        }
300
301
        $result = $this->sqs->receiveMessage([
302
            'QueueUrl'              => $this->queueUrl,
303
            'MaxNumberOfMessages'   => $options['messages_to_receive'],
304
            'WaitTimeSeconds'       => $options['receive_wait_time']
305
        ]);
306
307
        $messages = $result->get('Messages') ?: [];
308
309
        // Convert to Message Class
310
        foreach ($messages as &$message) {
311
            $id = $message['MessageId'];
312
            $metadata = [
313
                'ReceiptHandle' => $message['ReceiptHandle'],
314
                'MD5OfBody'     => $message['MD5OfBody']
315
            ];
316
317
            // When using SNS, the SQS Body is the entire SNS Message
318
            if(is_array($body = json_decode($message['Body'], true))
319
                && isset($body['Message'])
320
            ) {
321
                $body = json_decode($body['Message'], true);
322
            }
323
324
            $message = new Message($id, $body, $metadata);
325
326
            $context = ['MessageId' => $id];
327
            $this->log(200,"Message fetched from SQS Queue", $context);
328
329
        }
330
331
        return $messages;
332
    }
333
334
    /**
335
     * {@inheritDoc}
336
     *
337
     * @return bool
338
     */
339
    public function delete($id)
340
    {
341
        if($this->options['push_notifications_only']) {
342
            return true;
343
        } else {
344
            if (!$this->queueExists()) {
345
                return false;
346
            }
347
348
            $this->sqs->deleteMessage([
349
                'QueueUrl'      => $this->queueUrl,
350
                'ReceiptHandle' => $id
351
            ]);
352
353
            $context = [
354
                'QueueUrl'      => $this->queueUrl,
355
                'ReceiptHandle' => $id
356
            ];
357
            $this->log(200,"Message deleted from SQS Queue", $context);
358
359
            return true;
360
        }
361
    }
362
363
    /**
364
     * Return the Queue Url
365
     *
366
     * This method relies on in-memory cache and the Cache provider
367
     * to reduce the need to needlessly call the create method on an existing
368
     * Queue.
369
     *
370
     * @return boolean
371
     */
372
    public function queueExists()
373
    {
374
        if (isset($this->queueUrl)) {
375
            return true;
376
        }
377
378
        $key = $this->getNameWithPrefix() . '_url';
379
        if ($this->cache->contains($key)) {
380
            $this->queueUrl = $this->cache->fetch($key);
381
382
            return true;
383
        }
384
385
        try {
386
            $result = $this->sqs->getQueueUrl([
387
                'QueueName' => $this->getNameWithPrefix()
388
            ]);
389
390
            $this->queueUrl = $result->get('QueueUrl');
391
            if ($this->queueUrl !== null) {
392
                $this->cache->save($key, $this->queueUrl);
393
394
                return true;
395
            }
396
        } catch (SqsException $e) {}
0 ignored issues
show
Coding Style Comprehensibility introduced by
Consider adding a comment why this CATCH block is empty.
Loading history...
397
398
        return false;
399
    }
400
401
    /**
402
     * Creates an SQS Queue and returns the Queue Url
403
     *
404
     * The create method for SQS Queues is idempotent - if the queue already
405
     * exists, this method will return the Queue Url of the existing Queue.
406
     */
407
    public function createQueue()
408
    {
409
        if(!$this->options['push_notifications_only']) {
410
            $attributes = [
411
                'VisibilityTimeout'             => $this->options['message_timeout'],
412
                'MessageRetentionPeriod'        => $this->options['message_expiration'],
413
                'ReceiveMessageWaitTimeSeconds' => $this->options['receive_wait_time']
414
            ];
415
416
            if ($this->isQueueFIFO()) {
417
                $attributes['FifoQueue'] = 'true';
418
                $attributes['ContentBasedDeduplication'] = $this->options['content_based_deduplication'] === true
419
                    ? 'true'
420
                    : 'false';
421
            }
422
423
            $result = $this->sqs->createQueue(['QueueName' => $this->getNameWithPrefix(), 'Attributes' => $attributes]);
424
425
            $this->queueUrl = $result->get('QueueUrl');
426
427
            $key = $this->getNameWithPrefix() . '_url';
428
            $this->cache->save($key, $this->queueUrl);
429
430
            $this->log(200, "Created SQS Queue", ['QueueUrl' => $this->queueUrl]);
431
432
            if ($this->options['push_notifications']) {
433
434
                $policy = $this->createSqsPolicy();
435
436
                $this->sqs->setQueueAttributes([
437
                    'QueueUrl'      => $this->queueUrl,
438
                    'Attributes'    => [
439
                        'Policy'    => $policy,
440
                    ]
441
                ]);
442
443
                $this->log(200, "Created Updated SQS Policy");
444
            }
445
        }
446
    }
447
448
    /**
449
     * Creates a Policy for SQS that's required to allow SNS SendMessage access
450
     *
451
     * @return string
452
     */
453
    public function createSqsPolicy()
454
    {
455
        $arn = $this->sqs->getQueueArn($this->queueUrl);
456
457
        return json_encode([
458
            'Version'   => '2008-10-17',
459
            'Id'        =>  sprintf('%s/SQSDefaultPolicy', $arn),
460
            'Statement' => [
461
                [
462
                    'Sid'       => 'SNSPermissions',
463
                    'Effect'    => 'Allow',
464
                    'Principal' => ['AWS' => '*'],
465
                    'Action'    => 'SQS:SendMessage',
466
                    'Resource'  => $arn
467
                ]
468
            ]
469
        ]);
470
    }
471
472
    /**
473
     * Checks to see if a Topic exists
474
     *
475
     * This method relies on in-memory cache and the Cache provider
476
     * to reduce the need to needlessly call the create method on an existing
477
     * Topic.
478
     *
479
     * @return boolean
480
     */
481
    public function topicExists()
482
    {
483
        if (isset($this->topicArn)) {
484
            return true;
485
        }
486
487
        $key = $this->getNameWithPrefix() . '_arn';
488
        if ($this->cache->contains($key)) {
489
            $this->topicArn = $this->cache->fetch($key);
490
491
            return true;
492
        }
493
494
        if (!empty($this->queueUrl)) {
495
            $queueArn = $this->sqs->getQueueArn($this->queueUrl);
496
            $topicArn = str_replace('sqs', 'sns', $queueArn);
497
498
            try {
499
                $this->sns->getTopicAttributes([
500
                    'TopicArn' => $topicArn
501
                ]);
502
            } catch (SnsException $e) {
503
                return false;
504
            }
505
506
            $this->topicArn = $topicArn;
507
            $this->cache->save($key, $this->topicArn);
508
509
            return true;
510
        }
511
512
        return false;
513
    }
514
515
    /**
516
     * Creates a SNS Topic and returns the ARN
517
     *
518
     * The create method for the SNS Topics is idempotent - if the topic already
519
     * exists, this method will return the Topic ARN of the existing Topic.
520
     *
521
     *
522
     * @return bool
523
     */
524
    public function createTopic()
525
    {
526
        if (!$this->options['push_notifications']) {
527
            return false;
528
        }
529
530
        $name = str_replace('.', '-', $this->getNameWithPrefix());
531
        $result = $this->sns->createTopic([
532
            'Name' => $name
533
        ]);
534
535
        $this->topicArn = $result->get('TopicArn');
536
537
        $key = $name . '_arn';
538
        $this->cache->save($key, $this->topicArn);
539
540
        $this->log(200, "Created SNS Topic", ['TopicARN' => $this->topicArn]);
541
542
        return true;
543
    }
544
545
    /**
546
     * Get a list of Subscriptions for the specified SNS Topic
547
     *
548
     * @param string $topicArn The SNS Topic Arn
549
     *
550
     * @return array
551
     */
552
    public function getTopicSubscriptions($topicArn)
553
    {
554
        $result = $this->sns->listSubscriptionsByTopic([
555
            'TopicArn' => $topicArn
556
        ]);
557
558
        return $result->get('Subscriptions');
559
    }
560
561
    /**
562
     * Subscribes an endpoint to a SNS Topic
563
     *
564
     * @param string $topicArn The ARN of the Topic
565
     * @param string $protocol The protocol of the Endpoint
566
     * @param string $endpoint The Endpoint of the Subscriber
567
     *
568
     * @return string
569
     */
570
    public function subscribeToTopic($topicArn, $protocol, $endpoint)
571
    {
572
        // Check against the current Topic Subscriptions
573
        $subscriptions = $this->getTopicSubscriptions($topicArn);
574
        foreach ($subscriptions as $subscription) {
575
            if ($endpoint === $subscription['Endpoint']) {
576
                return $subscription['SubscriptionArn'];
577
            }
578
        }
579
580
        $result = $this->sns->subscribe([
581
            'TopicArn' => $topicArn,
582
            'Protocol' => $protocol,
583
            'Endpoint' => $endpoint
584
        ]);
585
586
        $arn = $result->get('SubscriptionArn');
587
588
        $context = [
589
            'Endpoint' => $endpoint,
590
            'Protocol' => $protocol,
591
            'SubscriptionArn' => $arn
592
        ];
593
        $this->log(200, "Endpoint Subscribed to SNS Topic", $context);
594
595
        return $arn;
596
    }
597
598
    /**
599
     * Unsubscribes an endpoint from a SNS Topic
600
     *
601
     * The method will return TRUE on success, or FALSE if the Endpoint did not
602
     * have a Subscription on the SNS Topic
603
     *
604
     * @param string $topicArn The ARN of the Topic
605
     * @param string $protocol The protocol of the Endpoint
606
     * @param string $endpoint The Endpoint of the Subscriber
607
     *
608
     * @return Boolean
609
     */
610
    public function unsubscribeFromTopic($topicArn, $protocol, $endpoint)
611
    {
612
        // Check against the current Topic Subscriptions
613
        $subscriptions = $this->getTopicSubscriptions($topicArn);
614
        foreach ($subscriptions as $subscription) {
615
            if ($endpoint === $subscription['Endpoint']) {
616
                $this->sns->unsubscribe([
617
                    'SubscriptionArn' => $subscription['SubscriptionArn']
618
                ]);
619
620
                $context = [
621
                    'Endpoint' => $endpoint,
622
                    'Protocol' => $protocol,
623
                    'SubscriptionArn' => $subscription['SubscriptionArn']
624
                ];
625
                $this->log(200,"Endpoint unsubscribed from SNS Topic", $context);
626
627
                return true;
628
            }
629
        }
630
631
        return false;
632
    }
633
634
    /**
635
     * Handles SNS Notifications
636
     *
637
     * For Subscription notifications, this method will automatically confirm
638
     * the Subscription request
639
     *
640
     * For Message notifications, this method polls the queue and dispatches
641
     * the `{queue}.message_received` event for each message retrieved
642
     *
643
     * @param NotificationEvent $event The Notification Event
644
     * @param string $eventName Name of the event
645
     * @param EventDispatcherInterface $dispatcher
646
     *
647
     * @return void
648
     */
649
    public function onNotification(NotificationEvent $event, $eventName, EventDispatcherInterface $dispatcher)
650
    {
651
        if (NotificationEvent::TYPE_SUBSCRIPTION == $event->getType()) {
652
            $topicArn   = $event->getNotification()->getMetadata()->get('TopicArn');
0 ignored issues
show
Bug introduced by
The method getMetadata cannot be called on $event->getNotification() (of type array).

Methods can only be called on objects. This check looks for methods being called on variables that have been inferred to never be objects.

Loading history...
653
            $token      = $event->getNotification()->getMetadata()->get('Token');
0 ignored issues
show
Bug introduced by
The method getMetadata cannot be called on $event->getNotification() (of type array).

Methods can only be called on objects. This check looks for methods being called on variables that have been inferred to never be objects.

Loading history...
654
655
            $this->sns->confirmSubscription([
656
                'TopicArn'  => $topicArn,
657
                'Token'     => $token
658
            ]);
659
660
            $context = ['TopicArn' => $topicArn];
661
            $this->log(200,"Subscription to SNS Confirmed", $context);
662
663
            return;
664
        }
665
        
666
        if($this->options['push_notifications_only']) {
667
            $notification = $event->getNotification();
668
            $message = new Message($notification->getId(), $notification->getBody(), (array)$notification->getMetadata());
0 ignored issues
show
Bug introduced by
The method getId cannot be called on $notification (of type array).

Methods can only be called on objects. This check looks for methods being called on variables that have been inferred to never be objects.

Loading history...
Bug introduced by
The method getBody cannot be called on $notification (of type array).

Methods can only be called on objects. This check looks for methods being called on variables that have been inferred to never be objects.

Loading history...
669
            $messageEvent = new MessageEvent($this->name, $message);
670
            $dispatcher->dispatch(Events::Message($this->name), $messageEvent);
671
        } else {
672
            $messages = $this->receive();
673
            foreach ($messages as $message) {
674
                $messageEvent = new MessageEvent($this->name, $message);
675
                $dispatcher->dispatch(Events::Message($this->name), $messageEvent);
676
            }
677
        }
678
679
    }
680
681
    /**
682
     * Removes the message from queue after all other listeners have fired
683
     *
684
     * If an earlier listener has erred or stopped propagation, this method
685
     * will not fire and the Queued Message should become visible in queue again.
686
     *
687
     * Stops Event Propagation after removing the Message
688
     *
689
     * @param MessageEvent $event The SQS Message Event
690
     *
691
     * @return void
692
     */
693
    public function onMessageReceived(MessageEvent $event)
694
    {
695
        $receiptHandle = $event
696
            ->getMessage()
697
            ->getMetadata()
698
            ->get('ReceiptHandle');
699
700
        $this->delete($receiptHandle);
701
702
        $event->stopPropagation();
703
    }
704
705
    /**
706
     * @return bool
707
     */
708
    private function isQueueFIFO()
709
    {
710
        return $this->options['fifo'] === true;
711
    }
712
}
713