Completed
Push — master ( ce57dd...eef2d6 )
by Keith
02:27
created

AwsProvider::topicExists()   B

Complexity

Conditions 5
Paths 5

Size

Total Lines 33
Code Lines 19

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 17
CRAP Score 5.0043

Importance

Changes 3
Bugs 0 Features 1
Metric Value
c 3
b 0
f 1
dl 0
loc 33
ccs 17
cts 18
cp 0.9444
rs 8.439
cc 5
eloc 19
nc 5
nop 0
crap 5.0043
1
<?php
2
3
/**
4
 * Copyright 2014 Underground Elephant
5
 *
6
 * Licensed under the Apache License, Version 2.0 (the "License");
7
 * you may not use this file except in compliance with the License.
8
 * You may obtain a copy of the License at
9
 *
10
 *     http://www.apache.org/licenses/LICENSE-2.0
11
 *
12
 * Unless required by applicable law or agreed to in writing, software
13
 * distributed under the License is distributed on an "AS IS" BASIS,
14
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15
 * See the License for the specific language governing permissions and
16
 * limitations under the License.
17
 *
18
 * @package     qpush-bundle
19
 * @copyright   Underground Elephant 2014
20
 * @license     Apache License, Version 2.0
21
 */
22
23
namespace Uecode\Bundle\QPushBundle\Provider;
24
25
use Aws\Sns\SnsClient;
26
use Aws\Sns\Exception\NotFoundException;
27
use Aws\Sqs\SqsClient;
28
use Aws\Sqs\Exception\SqsException;
29
30
use Doctrine\Common\Cache\Cache;
31
use Symfony\Bridge\Monolog\Logger;
32
use Symfony\Component\EventDispatcher\EventDispatcherInterface;
33
use Uecode\Bundle\QPushBundle\Event\Events;
34
use Uecode\Bundle\QPushBundle\Event\MessageEvent;
35
use Uecode\Bundle\QPushBundle\Event\NotificationEvent;
36
use Uecode\Bundle\QPushBundle\Message\Message;
37
38
/**
39
 * @author Keith Kirk <[email protected]>
40
 */
41
class AwsProvider extends AbstractProvider
42
{
43
    /**
44
     * Aws SQS Client
45
     *
46
     * @var SqsClient
47
     */
48
    private $sqs;
49
50
    /**
51
     * Aws SNS Client
52
     *
53
     * @var SnsClient
54
     */
55
    private $sns;
56
57
    /**
58
     * SQS Queue URL
59
     *
60
     * @var string
61
     */
62
    private $queueUrl;
63
64
    /**
65
     * SNS Topic ARN
66
     *
67
     * @var string
68
     */
69
    private $topicArn;
70
71 15
    public function __construct($name, array $options, $client, Cache $cache, Logger $logger)
72
    {
73 15
        $this->name     = $name;
74 15
        $this->options  = $options;
75 15
        $this->cache    = $cache;
76 15
        $this->logger   = $logger;
77
78
        // get() method used for sdk v2, create methods for v3
79 15
        $useGet = method_exists($client, 'get');
80 15
        $this->sqs = $useGet ? $client->get('Sqs') : $client->createSqs();
81 15
        $this->sns = $useGet ? $client->get('Sns') : $client->createSns();
82 15
    }
83
84 1
    public function getProvider()
85
    {
86 1
        return "AWS";
87
    }
88
89
    /**
90
     * Builds the configured queues
91
     *
92
     * If a Queue name is passed and configured, this method will build only that
93
     * Queue.
94
     *
95
     * All Create methods are idempotent, if the resource exists, the current ARN
96
     * will be returned
97
     *
98
     */
99 1
    public function create()
100
    {
101 1
        $this->createQueue();
102
103 1
        if ($this->options['push_notifications']) {
104
           // Create the SNS Topic
105 1
           $this->createTopic();
106
107
           // Add the SQS Queue as a Subscriber to the SNS Topic
108 1
           $this->subscribeToTopic(
109 1
               $this->topicArn,
110 1
               'sqs',
111 1
               $this->sqs->getQueueArn($this->queueUrl)
112 1
           );
113
114
           // Add configured Subscribers to the SNS Topic
115 1
           foreach ($this->options['subscribers'] as $subscriber) {
116 1
                $this->subscribeToTopic(
117 1
                    $this->topicArn,
118 1
                    $subscriber['protocol'],
119 1
                    $subscriber['endpoint']
120 1
                );
121 1
            }
122 1
        }
123
124 1
        return true;
125
    }
126
127
    /**
128
     * @return Boolean
129
     */
130 1
    public function destroy()
131
    {
132 1
        $key = $this->getNameWithPrefix() . '_url';
133 1
        $this->cache->delete($key);
134
135 1
        if ($this->queueExists()) {
136
            // Delete the SQS Queue
137 1
            $this->sqs->deleteQueue([
138 1
                'QueueUrl' => $this->queueUrl
139 1
            ]);
140
141 1
            $this->log(200,"SQS Queue removed", ['QueueUrl' => $this->queueUrl]);
142 1
        }
143
144 1
        $key = $this->getNameWithPrefix() . '_arn';
145 1
        $this->cache->delete($key);
146
147 1
        if ($this->topicExists() || !empty($this->queueUrl)) {
148
            // Delete the SNS Topic
149 1
            $topicArn = !empty($this->topicArn)
150 1
                ? $this->topicArn
151 1
                : str_replace('sqs', 'sns', $this->queueUrl)
152 1
            ;
153
154 1
            $this->sns->deleteTopic([
155
                'TopicArn' => $topicArn
156 1
            ]);
157
158 1
            $this->log(200,"SNS Topic removed", ['TopicArn' => $topicArn]);
159 1
        }
160
161 1
        return true;
162
    }
163
164
    /**
165
     * {@inheritDoc}
166
     *
167
     * This method will either use a SNS Topic to publish a queued message or
168
     * straight to SQS depending on the application configuration.
169
     *
170
     * @return string
171
     */
172 2
    public function publish(array $message, array $options = [])
173
    {
174 2
        $options      = $this->mergeOptions($options);
175 2
        $publishStart = microtime(true);
176
177
        // ensures that the SQS Queue and SNS Topic exist
178 2
        if (!$this->queueExists()) {
179
            $this->create();
180
        }
181
182 2
        if ($options['push_notifications']) {
183
184 1
            if (!$this->topicExists()) {
185
                $this->create();
186
            }
187
188
            $message    = [
189 1
                'default'   => $this->getNameWithPrefix(),
190 1
                'sqs'       => json_encode($message),
191 1
                'http'      => $this->getNameWithPrefix(),
192 1
                'https'     => $this->getNameWithPrefix(),
193 1
            ];
194
195 1
            $result = $this->sns->publish([
196 1
                'TopicArn'          => $this->topicArn,
197 1
                'Subject'           => $this->getName(),
198 1
                'Message'           => json_encode($message),
199
                'MessageStructure'  => 'json'
200 1
            ]);
201
202
            $context = [
203 1
                'TopicArn'              => $this->topicArn,
204 1
                'MessageId'             => $result->get('MessageId'),
205 1
                'push_notifications'    => $options['push_notifications'],
206 1
                'publish_time'          => microtime(true) - $publishStart
207 1
            ];
208 1
            $this->log(200,"Message published to SNS", $context);
209
210 1
            return $result->get('MessageId');
211
        }
212
213 1
        $result = $this->sqs->sendMessage([
214 1
            'QueueUrl'      => $this->queueUrl,
215 1
            'MessageBody'   => json_encode($message),
216 1
            'DelaySeconds'  => $options['message_delay']
217 1
        ]);
218
219
        $context = [
220 1
            'QueueUrl'              => $this->queueUrl,
221 1
            'MessageId'             => $result->get('MessageId'),
222 1
            'push_notifications'    => $options['push_notifications']
223 1
        ];
224 1
        $this->log(200,"Message published to SQS", $context);
225
226 1
        return $result->get('MessageId');
227
    }
228
229
    /**
230
     * {@inheritDoc}
231
     */
232 2
    public function receive(array $options = [])
233
    {
234 2
        $options = $this->mergeOptions($options);
235
236 2
        if (!$this->queueExists()) {
237
            $this->create();
238
        }
239
240 2
        $result = $this->sqs->receiveMessage([
241 2
            'QueueUrl'              => $this->queueUrl,
242 2
            'MaxNumberOfMessages'   => $options['messages_to_receive'],
243 2
            'WaitTimeSeconds'       => $options['receive_wait_time']
244 2
        ]);
245
246 2
        $messages = $result->get('Messages') ?: [];
247
248
        // Convert to Message Class
249 2
        foreach ($messages as &$message) {
250 2
            $id = $message['MessageId'];
251
            $metadata = [
252 2
                'ReceiptHandle' => $message['ReceiptHandle'],
253 2
                'MD5OfBody'     => $message['MD5OfBody']
254 2
            ];
255
256
            // When using SNS, the SQS Body is the entire SNS Message
257 2
            if(is_array($body = json_decode($message['Body'], true))
258 2
                && isset($body['Message'])
259 2
            ) {
260 2
                $body = json_decode($body['Message'], true);
261 2
            }
262
263 2
            $message = new Message($id, $body, $metadata);
264
265 2
            $context = ['MessageId' => $id];
266 2
            $this->log(200,"Message fetched from SQS Queue", $context);
267
268 2
        }
269
270 2
        return $messages;
271
    }
272
273
    /**
274
     * {@inheritDoc}
275
     *
276
     * @return bool
277
     */
278 2
    public function delete($id)
279
    {
280 2
        if (!$this->queueExists()) {
281
            return false;
282
        }
283
284 2
        $this->sqs->deleteMessage([
285 2
            'QueueUrl'      => $this->queueUrl,
286
            'ReceiptHandle' => $id
287 2
        ]);
288
289
        $context = [
290 2
            'QueueUrl'      => $this->queueUrl,
291
            'ReceiptHandle' => $id
292 2
        ];
293 2
        $this->log(200,"Message deleted from SQS Queue", $context);
294
295 2
        return true;
296
    }
297
298
    /**
299
     * Return the Queue Url
300
     *
301
     * This method relies on in-memory cache and the Cache provider
302
     * to reduce the need to needlessly call the create method on an existing
303
     * Queue.
304
     *
305
     * @return boolean
306
     */
307 9
    public function queueExists()
308
    {
309 9
        if (isset($this->queueUrl)) {
310 3
            return true;
311
        }
312
313 7
        $key = $this->getNameWithPrefix() . '_url';
314 7
        if ($this->cache->contains($key)) {
315 1
            $this->queueUrl = $this->cache->fetch($key);
316
317 1
            return true;
318
        }
319
320
        try {
321 6
            $result = $this->sqs->getQueueUrl([
322 6
                'QueueName' => $this->getNameWithPrefix()
323 6
            ]);
324
325 6
            if ($this->queueUrl = $result->get('QueueUrl')) {
326 6
                $this->cache->save($key, $this->queueUrl);
327
328 6
                return true;
329
            }
330
        } catch (SqsException $e) {}
0 ignored issues
show
Coding Style Comprehensibility introduced by
Consider adding a comment why this CATCH block is empty.
Loading history...
331
332
        return false;
333
    }
334
335
    /**
336
     * Creates an SQS Queue and returns the Queue Url
337
     *
338
     * The create method for SQS Queues is idempotent - if the queue already
339
     * exists, this method will return the Queue Url of the existing Queue.
340
     *
341
     * @return string
342
     */
343 3
    public function createQueue()
344
    {
345 3
        $result = $this->sqs->createQueue([
346 3
            'QueueName' => $this->getNameWithPrefix(),
347
            'Attributes'    => [
348 3
                'VisibilityTimeout'             => $this->options['message_timeout'],
349 3
                'MessageRetentionPeriod'        => $this->options['message_expiration'],
350 3
                'ReceiveMessageWaitTimeSeconds' => $this->options['receive_wait_time']
351 3
            ]
352 3
        ]);
353
354 3
        $this->queueUrl = $result->get('QueueUrl');
355
356 3
        $key = $this->getNameWithPrefix() . '_url';
357 3
        $this->cache->save($key, $this->queueUrl);
358
359 3
        $this->log(200, "Created SQS Queue", ['QueueUrl' => $this->queueUrl]);
360
361 3
        if ($this->options['push_notifications']) {
362
363 2
            $policy = $this->createSqsPolicy();
364
365 2
            $this->sqs->setQueueAttributes([
366 2
                'QueueUrl'      => $this->queueUrl,
367
                'Attributes'    => [
368 2
                    'Policy'    => $policy,
369
                ]
370 2
            ]);
371
372 2
            $this->log(200, "Created Updated SQS Policy");
373 2
        }
374 3
    }
375
376
    /**
377
     * Creates a Policy for SQS that's required to allow SNS SendMessage access
378
     *
379
     * @return string
380
     */
381 2
    public function createSqsPolicy()
382
    {
383 2
        $arn = $this->sqs->getQueueArn($this->queueUrl);
384
385 2
        return json_encode([
386 2
            'Version'   => '2008-10-17',
387 2
            'Id'        =>  sprintf('%s/SQSDefaultPolicy', $arn),
388
            'Statement' => [
389
                [
390 2
                    'Sid'       => 'SNSPermissions',
391 2
                    'Effect'    => 'Allow',
392 2
                    'Principal' => ['AWS' => '*'],
393 2
                    'Action'    => 'SQS:SendMessage',
394
                    'Resource'  => $arn
395 2
                ]
396 2
            ]
397 2
        ]);
398
    }
399
400
    /**
401
     * Checks to see if a Topic exists
402
     *
403
     * This method relies on in-memory cache and the Cache provider
404
     * to reduce the need to needlessly call the create method on an existing
405
     * Topic.
406
     *
407
     * @return boolean
408
     */
409 3
    public function topicExists()
410
    {
411 3
        if (isset($this->topicArn)) {
412 1
            return true;
413
        }
414
415 3
        $key = $this->getNameWithPrefix() . '_arn';
416 3
        if ($this->cache->contains($key)) {
417 1
            $this->topicArn = $this->cache->fetch($key);
418
419 1
            return true;
420
        }
421
422 3
        if (!empty($this->queueUrl)) {
423 2
            $queueArn = $this->sqs->getQueueArn($this->queueUrl);
424 2
            $topicArn = str_replace('sqs', 'sns', $queueArn);
425
426
            try {
427 2
                $this->sns->getTopicAttributes([
428
                    'TopicArn' => $topicArn
429 2
                ]);
430 2
            } catch (NotFoundException $e) {
431
                return false;
432
            }
433
434 2
            $this->topicArn = $topicArn;
435 2
            $this->cache->save($key, $this->topicArn);
436
437 2
            return true;
438
        }
439
440 1
        return false;
441
    }
442
443
    /**
444
     * Creates a SNS Topic and returns the ARN
445
     *
446
     * The create method for the SNS Topics is idempotent - if the topic already
447
     * exists, this method will return the Topic ARN of the existing Topic.
448
     *
449
     *
450
     * @return false|null
451
     */
452 2
    public function createTopic()
453
    {
454 2
        if (!$this->options['push_notifications']) {
455 1
            return false;
456
        }
457
458 2
        $result = $this->sns->createTopic([
459 2
            'Name' => $this->getNameWithPrefix()
460 2
        ]);
461
462 2
        $this->topicArn = $result->get('TopicArn');
463
464 2
        $key = $this->getNameWithPrefix() . '_arn';
465 2
        $this->cache->save($key, $this->topicArn);
466
467 2
        $this->log(200, "Created SNS Topic", ['TopicARN' => $this->topicArn]);
468 2
    }
469
470
    /**
471
     * Get a list of Subscriptions for the specified SNS Topic
472
     *
473
     * @param string $topicArn The SNS Topic Arn
474
     *
475
     * @return array
476
     */
477 4
    public function getTopicSubscriptions($topicArn)
478
    {
479 4
        $result = $this->sns->listSubscriptionsByTopic([
480
            'TopicArn' => $topicArn
481 4
        ]);
482
483 4
        return $result->get('Subscriptions');
484
    }
485
486
    /**
487
     * Subscribes an endpoint to a SNS Topic
488
     *
489
     * @param string $topicArn The ARN of the Topic
490
     * @param string $protocol The protocol of the Endpoint
491
     * @param string $endpoint The Endpoint of the Subscriber
492
     *
493
     * @return string
494
     */
495 2
    public function subscribeToTopic($topicArn, $protocol, $endpoint)
496
    {
497
        // Check against the current Topic Subscriptions
498 2
        $subscriptions = $this->getTopicSubscriptions($topicArn);
499 2
        foreach ($subscriptions as $subscription) {
500 2
            if ($endpoint === $subscription['Endpoint']) {
501 1
                return $subscription['SubscriptionArn'];
502
            }
503 1
        }
504
505 1
        $result = $this->sns->subscribe([
506 1
            'TopicArn' => $topicArn,
507 1
            'Protocol' => $protocol,
508
            'Endpoint' => $endpoint
509 1
        ]);
510
511 1
        $arn = $result->get('SubscriptionArn');
512
513
        $context = [
514 1
            'Endpoint' => $endpoint,
515 1
            'Protocol' => $protocol,
516
            'SubscriptionArn' => $arn
517 1
        ];
518 1
        $this->log(200, "Endpoint Subscribed to SNS Topic", $context);
519
520 1
        return $arn;
521
    }
522
523
    /**
524
     * Unsubscribes an endpoint from a SNS Topic
525
     *
526
     * The method will return TRUE on success, or FALSE if the Endpoint did not
527
     * have a Subscription on the SNS Topic
528
     *
529
     * @param string $topicArn The ARN of the Topic
530
     * @param string $protocol The protocol of the Endpoint
531
     * @param string $endpoint The Endpoint of the Subscriber
532
     *
533
     * @return Boolean
534
     */
535 1
    public function unsubscribeFromTopic($topicArn, $protocol, $endpoint)
536
    {
537
        // Check against the current Topic Subscriptions
538 1
        $subscriptions = $this->getTopicSubscriptions($topicArn);
539 1
        foreach ($subscriptions as $subscription) {
540 1
            if ($endpoint === $subscription['Endpoint']) {
541 1
                $this->sns->unsubscribe([
542 1
                    'SubscriptionArn' => $subscription['SubscriptionArn']
543 1
                ]);
544
545
                $context = [
546 1
                    'Endpoint' => $endpoint,
547 1
                    'Protocol' => $protocol,
548 1
                    'SubscriptionArn' => $subscription['SubscriptionArn']
549 1
                ];
550 1
                $this->log(200,"Endpoint unsubscribed from SNS Topic", $context);
551
552 1
                return true;
553
            }
554 1
        }
555
556 1
        return false;
557
    }
558
559
    /**
560
     * Handles SNS Notifications
561
     *
562
     * For Subscription notifications, this method will automatically confirm
563
     * the Subscription request
564
     *
565
     * For Message notifications, this method polls the queue and dispatches
566
     * the `{queue}.message_received` event for each message retrieved
567
     *
568
     * @param NotificationEvent $event The Notification Event
569
     * @param string $eventName Name of the event
570
     * @param EventDispatcherInterface $dispatcher
571
     * @return bool|void
572
     */
573 2
    public function onNotification(NotificationEvent $event, $eventName, EventDispatcherInterface $dispatcher)
574
    {
575 2
        if (NotificationEvent::TYPE_SUBSCRIPTION == $event->getType()) {
576 1
            $topicArn   = $event->getNotification()->getMetadata()->get('TopicArn');
0 ignored issues
show
Bug introduced by
The method getMetadata cannot be called on $event->getNotification() (of type array).

Methods can only be called on objects. This check looks for methods being called on variables that have been inferred to never be objects.

Loading history...
577 1
            $token      = $event->getNotification()->getMetadata()->get('Token');
0 ignored issues
show
Bug introduced by
The method getMetadata cannot be called on $event->getNotification() (of type array).

Methods can only be called on objects. This check looks for methods being called on variables that have been inferred to never be objects.

Loading history...
578
579 1
            $this->sns->confirmSubscription([
580 1
                'TopicArn'  => $topicArn,
581
                'Token'     => $token
582 1
            ]);
583
584 1
            $context = ['TopicArn' => $topicArn];
585 1
            $this->log(200,"Subscription to SNS Confirmed", $context);
586
587 1
            return;
588
        }
589
590 1
        $messages = $this->receive();
591 1
        foreach ($messages as $message) {
592
593 1
            $messageEvent = new MessageEvent($this->name, $message);
594 1
            $dispatcher->dispatch(Events::Message($this->name), $messageEvent);
595 1
        }
596 1
    }
597
598
    /**
599
     * Removes the message from queue after all other listeners have fired
600
     *
601
     * If an earlier listener has erred or stopped propagation, this method
602
     * will not fire and the Queued Message should become visible in queue again.
603
     *
604
     * Stops Event Propagation after removing the Message
605
     *
606
     * @param MessageEvent $event The SQS Message Event
607
     * @return bool|void
608
     */
609 1
    public function onMessageReceived(MessageEvent $event)
610
    {
611
        $receiptHandle = $event
612 1
            ->getMessage()
613 1
            ->getMetadata()
614 1
            ->get('ReceiptHandle');
615
616 1
        $this->delete($receiptHandle);
617
618 1
        $event->stopPropagation();
619 1
    }
620
}
621