Completed
Pull Request — master (#118)
by
unknown
04:11
created

AwsProvider::createTopic()   A

Complexity

Conditions 2
Paths 2

Size

Total Lines 17
Code Lines 9

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 10
CRAP Score 2

Importance

Changes 0
Metric Value
c 0
b 0
f 0
dl 0
loc 17
ccs 10
cts 10
cp 1
rs 9.4285
cc 2
eloc 9
nc 2
nop 0
crap 2
1
<?php
2
3
/**
4
 * Copyright 2014 Underground Elephant
5
 *
6
 * Licensed under the Apache License, Version 2.0 (the "License");
7
 * you may not use this file except in compliance with the License.
8
 * You may obtain a copy of the License at
9
 *
10
 *     http://www.apache.org/licenses/LICENSE-2.0
11
 *
12
 * Unless required by applicable law or agreed to in writing, software
13
 * distributed under the License is distributed on an "AS IS" BASIS,
14
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15
 * See the License for the specific language governing permissions and
16
 * limitations under the License.
17
 *
18
 * @package     qpush-bundle
19
 * @copyright   Underground Elephant 2014
20
 * @license     Apache License, Version 2.0
21
 */
22
23
namespace Uecode\Bundle\QPushBundle\Provider;
24
25
use Aws\Sns\Exception\SnsException;
26
use Aws\Sns\SnsClient;
27
use Aws\Sqs\Exception\SqsException;
28
use Aws\Sqs\SqsClient;
29
30
use Doctrine\Common\Cache\Cache;
31
use Monolog\Logger;
32
use Symfony\Component\EventDispatcher\EventDispatcherInterface;
33
use Uecode\Bundle\QPushBundle\Event\Events;
34
use Uecode\Bundle\QPushBundle\Event\MessageEvent;
35
use Uecode\Bundle\QPushBundle\Event\NotificationEvent;
36
use Uecode\Bundle\QPushBundle\Message\Message;
37
38
/**
39
 * @author Keith Kirk <[email protected]>
40
 */
41
class AwsProvider extends AbstractProvider
42
{
43
    /**
44
     * Aws SQS Client
45
     *
46
     * @var SqsClient
47
     */
48
    private $sqs;
49
50
    /**
51
     * Aws SNS Client
52
     *
53
     * @var SnsClient
54
     */
55
    private $sns;
56
57
    /**
58
     * SQS Queue URL
59
     *
60
     * @var string
61
     */
62
    private $queueUrl;
63
64
    /**
65
     * SNS Topic ARN
66
     *
67
     * @var string
68
     */
69
    private $topicArn;
70
71 16
    public function __construct($name, array $options, $client, Cache $cache, Logger $logger)
72
    {
73 16
        $this->name     = $name;
74 16
        $this->options  = $options;
75 16
        $this->cache    = $cache;
76 16
        $this->logger   = $logger;
77
78
        // get() method used for sdk v2, create methods for v3
79 16
        $useGet = method_exists($client, 'get');
80 16
        $this->sqs = $useGet ? $client->get('Sqs') : $client->createSqs();
81 16
        $this->sns = $useGet ? $client->get('Sns') : $client->createSns();
82 16
    }
83
84 1
    public function getProvider()
85
    {
86 1
        return "AWS";
87
    }
88
89
    /**
90
     * Builds the configured queues
91
     *
92
     * If a Queue name is passed and configured, this method will build only that
93
     * Queue.
94
     *
95
     * All Create methods are idempotent, if the resource exists, the current ARN
96
     * will be returned
97
     *
98
     */
99 1
    public function create()
100
    {
101 1
        $this->createQueue();
102
103 1
        if ($this->options['push_notifications']) {
104
           // Create the SNS Topic
105 1
           $this->createTopic();
106
107
           // Add the SQS Queue as a Subscriber to the SNS Topic
108 1
           $this->subscribeToTopic(
109 1
               $this->topicArn,
110 1
               'sqs',
111 1
               $this->sqs->getQueueArn($this->queueUrl)
112
           );
113
114
           // Add configured Subscribers to the SNS Topic
115 1
           foreach ($this->options['subscribers'] as $subscriber) {
116 1
                $this->subscribeToTopic(
117 1
                    $this->topicArn,
118 1
                    $subscriber['protocol'],
119 1
                    $subscriber['endpoint']
120
                );
121
            }
122
        }
123
124 1
        return true;
125
    }
126
127
    /**
128
     * @return Boolean
129
     */
130 1
    public function destroy()
131
    {
132 1
        $key = $this->getNameWithPrefix() . '_url';
133 1
        $this->cache->delete($key);
134
135 1
        if ($this->queueExists()) {
136
            // Delete the SQS Queue
137 1
            $this->sqs->deleteQueue([
138 1
                'QueueUrl' => $this->queueUrl
139
            ]);
140
141 1
            $this->log(200,"SQS Queue removed", ['QueueUrl' => $this->queueUrl]);
142
        }
143
144 1
        $key = $this->getNameWithPrefix() . '_arn';
145 1
        $this->cache->delete($key);
146
147 1
        if ($this->topicExists() || !empty($this->queueUrl)) {
148
            // Delete the SNS Topic
149 1
            $topicArn = !empty($this->topicArn)
150 1
                ? $this->topicArn
151 1
                : str_replace('sqs', 'sns', $this->queueUrl)
152
            ;
153
154 1
            $this->sns->deleteTopic([
155 1
                'TopicArn' => $topicArn
156
            ]);
157
158 1
            $this->log(200,"SNS Topic removed", ['TopicArn' => $topicArn]);
159
        }
160
161 1
        return true;
162
    }
163
164
    /**
165
     * {@inheritDoc}
166
     *
167
     * This method will either use a SNS Topic to publish a queued message or
168
     * straight to SQS depending on the application configuration.
169
     *
170
     * @return string
171
     */
172 3
    public function publish(array $message, array $options = [])
173
    {
174
        // There are two 'extra' options that can be sent to this method that are no in the AwsProvider configuration
175
        // as they don't make sense in the context of global configuration
176
177 3
        $extraOptions = $this->getExtraOptions($options, ['group_id', 'deduplication_id']);
178
179 3
        $options      = $this->mergeOptions($options, $extraOptions);
180 3
        $publishStart = microtime(true);
181
182
        // ensures that the SQS Queue and SNS Topic exist
183 3
        if (!$this->queueExists()) {
184
            $this->create();
185
        }
186
187 3
        if ($options['push_notifications']) {
188
189 1
            if (!$this->topicExists()) {
190
                $this->create();
191
            }
192
193
            $message    = [
194 1
                'default'   => $this->getNameWithPrefix(),
195 1
                'sqs'       => json_encode($message),
196 1
                'http'      => $this->getNameWithPrefix(),
197 1
                'https'     => $this->getNameWithPrefix(),
198
            ];
199
200 1
            $result = $this->sns->publish([
201 1
                'TopicArn'          => $this->topicArn,
202 1
                'Subject'           => $this->getName(),
203 1
                'Message'           => json_encode($message),
204 1
                'MessageStructure'  => 'json'
205
            ]);
206
207
            $context = [
208 1
                'TopicArn'              => $this->topicArn,
209 1
                'MessageId'             => $result->get('MessageId'),
210 1
                'push_notifications'    => $options['push_notifications'],
211 1
                'publish_time'          => microtime(true) - $publishStart
212
            ];
213 1
            $this->log(200,"Message published to SNS", $context);
214
215 1
            return $result->get('MessageId');
216
        }
217
218
        // To make this work with AWS FIFO queues we also need to add a MessageId and a MessageDeduplicationId.
219
        // Sadly (although I guess it's one less API call) the only way to determine the queue type is to look for
220
        // the .fifo suffix on the QueueName
221
        // See Note: http://docs.aws.amazon.com/AWSSimpleQueueService/latest/APIReference/API_GetQueueAttributes.html
222
223
        $messageParameters = [
224 2
            'QueueUrl'      => $this->queueUrl,
225 2
            'MessageBody'   => json_encode($message),
226 2
            'DelaySeconds'  => $options['message_delay']
227
        ];
228
229 2
        if (preg_match('/\.fifo$/', $this->getNameWithPrefix())) {
230 1
            $messageParameters['MessageGroupId'] = array_key_exists('group_id', $options) ? $options['group_id'] : 'qpush-group';
231 1
            $messageParameters['MessageDeduplicationId'] = array_key_exists('deduplication_id', $options) ? $options['deduplication_id'] : md5($messageParameters['MessageBody']);
232
        }
233
234 2
        $result = $this->sqs->sendMessage($messageParameters);
235
236
        $context = [
237 2
            'QueueUrl'              => $this->queueUrl,
238 2
            'MessageId'             => $result->get('MessageId'),
239 2
            'push_notifications'    => $options['push_notifications']
240
        ];
241 2
        $this->log(200,"Message published to SQS", $context);
242
243 2
        return $result->get('MessageId');
244
    }
245
246
    /**
247
     * {@inheritDoc}
248
     */
249 2
    public function receive(array $options = [])
250
    {
251 2
        $options = $this->mergeOptions($options);
252
253 2
        if (!$this->queueExists()) {
254
            $this->create();
255
        }
256
257 2
        $result = $this->sqs->receiveMessage([
258 2
            'QueueUrl'              => $this->queueUrl,
259 2
            'MaxNumberOfMessages'   => $options['messages_to_receive'],
260 2
            'WaitTimeSeconds'       => $options['receive_wait_time']
261
        ]);
262
263 2
        $messages = $result->get('Messages') ?: [];
264
265
        // Convert to Message Class
266 2
        foreach ($messages as &$message) {
267 2
            $id = $message['MessageId'];
268
            $metadata = [
269 2
                'ReceiptHandle' => $message['ReceiptHandle'],
270 2
                'MD5OfBody'     => $message['MD5OfBody']
271
            ];
272
273
            // When using SNS, the SQS Body is the entire SNS Message
274 2
            if(is_array($body = json_decode($message['Body'], true))
275 2
                && isset($body['Message'])
276
            ) {
277 2
                $body = json_decode($body['Message'], true);
278
            }
279
280 2
            $message = new Message($id, $body, $metadata);
281
282 2
            $context = ['MessageId' => $id];
283 2
            $this->log(200,"Message fetched from SQS Queue", $context);
284
285
        }
286
287 2
        return $messages;
288
    }
289
290
    /**
291
     * {@inheritDoc}
292
     *
293
     * @return bool
294
     */
295 2
    public function delete($id)
296
    {
297 2
        if (!$this->queueExists()) {
298
            return false;
299
        }
300
301 2
        $this->sqs->deleteMessage([
302 2
            'QueueUrl'      => $this->queueUrl,
303 2
            'ReceiptHandle' => $id
304
        ]);
305
306
        $context = [
307 2
            'QueueUrl'      => $this->queueUrl,
308 2
            'ReceiptHandle' => $id
309
        ];
310 2
        $this->log(200,"Message deleted from SQS Queue", $context);
311
312 2
        return true;
313
    }
314
315
    /**
316
     * Return the Queue Url
317
     *
318
     * This method relies on in-memory cache and the Cache provider
319
     * to reduce the need to needlessly call the create method on an existing
320
     * Queue.
321
     *
322
     * @return boolean
323
     */
324 9
    public function queueExists()
325
    {
326 9
        if (isset($this->queueUrl)) {
327 3
            return true;
328
        }
329
330 7
        $key = $this->getNameWithPrefix() . '_url';
331 7
        if ($this->cache->contains($key)) {
332 1
            $this->queueUrl = $this->cache->fetch($key);
333
334 1
            return true;
335
        }
336
337
        try {
338 6
            $result = $this->sqs->getQueueUrl([
339 6
                'QueueName' => $this->getNameWithPrefix()
340
            ]);
341
342 6
            if ($this->queueUrl = $result->get('QueueUrl')) {
343 6
                $this->cache->save($key, $this->queueUrl);
344
345 6
                return true;
346
            }
347
        } catch (SqsException $e) {}
0 ignored issues
show
Coding Style Comprehensibility introduced by
Consider adding a comment why this CATCH block is empty.
Loading history...
348
349
        return false;
350
    }
351
352
    /**
353
     * Creates an SQS Queue and returns the Queue Url
354
     *
355
     * The create method for SQS Queues is idempotent - if the queue already
356
     * exists, this method will return the Queue Url of the existing Queue.
357
     *
358
     * @return string
359
     */
360 3
    public function createQueue()
361
    {
362 3
        $result = $this->sqs->createQueue([
363 3
            'QueueName' => $this->getNameWithPrefix(),
364
            'Attributes'    => [
365 3
                'VisibilityTimeout'             => $this->options['message_timeout'],
366 3
                'MessageRetentionPeriod'        => $this->options['message_expiration'],
367 3
                'ReceiveMessageWaitTimeSeconds' => $this->options['receive_wait_time']
368
            ]
369
        ]);
370
371 3
        $this->queueUrl = $result->get('QueueUrl');
372
373 3
        $key = $this->getNameWithPrefix() . '_url';
374 3
        $this->cache->save($key, $this->queueUrl);
375
376 3
        $this->log(200, "Created SQS Queue", ['QueueUrl' => $this->queueUrl]);
377
378 3
        if ($this->options['push_notifications']) {
379
380 2
            $policy = $this->createSqsPolicy();
381
382 2
            $this->sqs->setQueueAttributes([
383 2
                'QueueUrl'      => $this->queueUrl,
384
                'Attributes'    => [
385 2
                    'Policy'    => $policy,
386
                ]
387
            ]);
388
389 2
            $this->log(200, "Created Updated SQS Policy");
390
        }
391 3
    }
392
393
    /**
394
     * Creates a Policy for SQS that's required to allow SNS SendMessage access
395
     *
396
     * @return string
397
     */
398 2
    public function createSqsPolicy()
399
    {
400 2
        $arn = $this->sqs->getQueueArn($this->queueUrl);
401
402 2
        return json_encode([
403 2
            'Version'   => '2008-10-17',
404 2
            'Id'        =>  sprintf('%s/SQSDefaultPolicy', $arn),
405
            'Statement' => [
406
                [
407 2
                    'Sid'       => 'SNSPermissions',
408 2
                    'Effect'    => 'Allow',
409
                    'Principal' => ['AWS' => '*'],
410 2
                    'Action'    => 'SQS:SendMessage',
411 2
                    'Resource'  => $arn
412
                ]
413
            ]
414
        ]);
415
    }
416
417
    /**
418
     * Checks to see if a Topic exists
419
     *
420
     * This method relies on in-memory cache and the Cache provider
421
     * to reduce the need to needlessly call the create method on an existing
422
     * Topic.
423
     *
424
     * @return boolean
425
     */
426 3
    public function topicExists()
427
    {
428 3
        if (isset($this->topicArn)) {
429 1
            return true;
430
        }
431
432 3
        $key = $this->getNameWithPrefix() . '_arn';
433 3
        if ($this->cache->contains($key)) {
434 1
            $this->topicArn = $this->cache->fetch($key);
435
436 1
            return true;
437
        }
438
439 3
        if (!empty($this->queueUrl)) {
440 2
            $queueArn = $this->sqs->getQueueArn($this->queueUrl);
441 2
            $topicArn = str_replace('sqs', 'sns', $queueArn);
442
443
            try {
444 2
                $this->sns->getTopicAttributes([
445 2
                    'TopicArn' => $topicArn
446
                ]);
447
            } catch (SnsException $e) {
448
                return false;
449
            }
450
451 2
            $this->topicArn = $topicArn;
452 2
            $this->cache->save($key, $this->topicArn);
453
454 2
            return true;
455
        }
456
457 1
        return false;
458
    }
459
460
    /**
461
     * Creates a SNS Topic and returns the ARN
462
     *
463
     * The create method for the SNS Topics is idempotent - if the topic already
464
     * exists, this method will return the Topic ARN of the existing Topic.
465
     *
466
     *
467
     * @return false|null
468
     */
469 2
    public function createTopic()
470
    {
471 2
        if (!$this->options['push_notifications']) {
472 1
            return false;
473
        }
474
475 2
        $result = $this->sns->createTopic([
476 2
            'Name' => $this->getNameWithPrefix()
477
        ]);
478
479 2
        $this->topicArn = $result->get('TopicArn');
480
481 2
        $key = $this->getNameWithPrefix() . '_arn';
482 2
        $this->cache->save($key, $this->topicArn);
483
484 2
        $this->log(200, "Created SNS Topic", ['TopicARN' => $this->topicArn]);
485 2
    }
486
487
    /**
488
     * Get a list of Subscriptions for the specified SNS Topic
489
     *
490
     * @param string $topicArn The SNS Topic Arn
491
     *
492
     * @return array
493
     */
494 4
    public function getTopicSubscriptions($topicArn)
495
    {
496 4
        $result = $this->sns->listSubscriptionsByTopic([
497 4
            'TopicArn' => $topicArn
498
        ]);
499
500 4
        return $result->get('Subscriptions');
501
    }
502
503
    /**
504
     * Subscribes an endpoint to a SNS Topic
505
     *
506
     * @param string $topicArn The ARN of the Topic
507
     * @param string $protocol The protocol of the Endpoint
508
     * @param string $endpoint The Endpoint of the Subscriber
509
     *
510
     * @return string
511
     */
512 2
    public function subscribeToTopic($topicArn, $protocol, $endpoint)
513
    {
514
        // Check against the current Topic Subscriptions
515 2
        $subscriptions = $this->getTopicSubscriptions($topicArn);
516 2
        foreach ($subscriptions as $subscription) {
517 2
            if ($endpoint === $subscription['Endpoint']) {
518 2
                return $subscription['SubscriptionArn'];
519
            }
520
        }
521
522 1
        $result = $this->sns->subscribe([
523 1
            'TopicArn' => $topicArn,
524 1
            'Protocol' => $protocol,
525 1
            'Endpoint' => $endpoint
526
        ]);
527
528 1
        $arn = $result->get('SubscriptionArn');
529
530
        $context = [
531 1
            'Endpoint' => $endpoint,
532 1
            'Protocol' => $protocol,
533 1
            'SubscriptionArn' => $arn
534
        ];
535 1
        $this->log(200, "Endpoint Subscribed to SNS Topic", $context);
536
537 1
        return $arn;
538
    }
539
540
    /**
541
     * Unsubscribes an endpoint from a SNS Topic
542
     *
543
     * The method will return TRUE on success, or FALSE if the Endpoint did not
544
     * have a Subscription on the SNS Topic
545
     *
546
     * @param string $topicArn The ARN of the Topic
547
     * @param string $protocol The protocol of the Endpoint
548
     * @param string $endpoint The Endpoint of the Subscriber
549
     *
550
     * @return Boolean
551
     */
552 1
    public function unsubscribeFromTopic($topicArn, $protocol, $endpoint)
553
    {
554
        // Check against the current Topic Subscriptions
555 1
        $subscriptions = $this->getTopicSubscriptions($topicArn);
556 1
        foreach ($subscriptions as $subscription) {
557 1
            if ($endpoint === $subscription['Endpoint']) {
558 1
                $this->sns->unsubscribe([
559 1
                    'SubscriptionArn' => $subscription['SubscriptionArn']
560
                ]);
561
562
                $context = [
563 1
                    'Endpoint' => $endpoint,
564 1
                    'Protocol' => $protocol,
565 1
                    'SubscriptionArn' => $subscription['SubscriptionArn']
566
                ];
567 1
                $this->log(200,"Endpoint unsubscribed from SNS Topic", $context);
568
569 1
                return true;
570
            }
571
        }
572
573 1
        return false;
574
    }
575
576
    /**
577
     * Handles SNS Notifications
578
     *
579
     * For Subscription notifications, this method will automatically confirm
580
     * the Subscription request
581
     *
582
     * For Message notifications, this method polls the queue and dispatches
583
     * the `{queue}.message_received` event for each message retrieved
584
     *
585
     * @param NotificationEvent $event The Notification Event
586
     * @param string $eventName Name of the event
587
     * @param EventDispatcherInterface $dispatcher
588
     * @return bool|void
589
     */
590 2
    public function onNotification(NotificationEvent $event, $eventName, EventDispatcherInterface $dispatcher)
591
    {
592 2
        if (NotificationEvent::TYPE_SUBSCRIPTION == $event->getType()) {
593 1
            $topicArn   = $event->getNotification()->getMetadata()->get('TopicArn');
0 ignored issues
show
Bug introduced by
The method getMetadata cannot be called on $event->getNotification() (of type array).

Methods can only be called on objects. This check looks for methods being called on variables that have been inferred to never be objects.

Loading history...
594 1
            $token      = $event->getNotification()->getMetadata()->get('Token');
0 ignored issues
show
Bug introduced by
The method getMetadata cannot be called on $event->getNotification() (of type array).

Methods can only be called on objects. This check looks for methods being called on variables that have been inferred to never be objects.

Loading history...
595
596 1
            $this->sns->confirmSubscription([
597 1
                'TopicArn'  => $topicArn,
598 1
                'Token'     => $token
599
            ]);
600
601 1
            $context = ['TopicArn' => $topicArn];
602 1
            $this->log(200,"Subscription to SNS Confirmed", $context);
603
604 1
            return;
605
        }
606
607 1
        $messages = $this->receive();
608 1
        foreach ($messages as $message) {
609
610 1
            $messageEvent = new MessageEvent($this->name, $message);
611 1
            $dispatcher->dispatch(Events::Message($this->name), $messageEvent);
612
        }
613 1
    }
614
615
    /**
616
     * Removes the message from queue after all other listeners have fired
617
     *
618
     * If an earlier listener has erred or stopped propagation, this method
619
     * will not fire and the Queued Message should become visible in queue again.
620
     *
621
     * Stops Event Propagation after removing the Message
622
     *
623
     * @param MessageEvent $event The SQS Message Event
624
     * @return bool|void
625
     */
626 1
    public function onMessageReceived(MessageEvent $event)
627
    {
628
        $receiptHandle = $event
629 1
            ->getMessage()
630 1
            ->getMetadata()
631 1
            ->get('ReceiptHandle');
632
633 1
        $this->delete($receiptHandle);
634
635 1
        $event->stopPropagation();
636 1
    }
637
638
    /**
639
     * @param array $options
640
     * @param array $keys
641
     * @return array
642
     */
643 3
    private function getExtraOptions(array $options, array $keys)
644
    {
645 3
        return array_intersect_key($options, array_flip($keys));
646
    }
647
}
648