Completed
Push — master ( 3b7044...b8bdc6 )
by Keith
11s
created

AwsProvider::publish()   D

Complexity

Conditions 12
Paths 184

Size

Total Lines 93
Code Lines 54

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 0
CRAP Score 156

Importance

Changes 0
Metric Value
c 0
b 0
f 0
dl 0
loc 93
ccs 0
cts 71
cp 0
rs 4.6933
cc 12
eloc 54
nc 184
nop 2
crap 156

How to fix   Long Method    Complexity   

Long Method

Small methods make your code easier to understand, in particular if combined with a good name. Besides, if your method is small, finding a good name is usually much easier.

For example, if you find yourself adding comments to a method's body, this is usually a good sign to extract the commented part to a new method, and use the comment as a starting point when coming up with a good name for this new method.

Commonly applied refactorings include:

1
<?php
2
3
/**
4
 * Copyright 2014 Underground Elephant
5
 *
6
 * Licensed under the Apache License, Version 2.0 (the "License");
7
 * you may not use this file except in compliance with the License.
8
 * You may obtain a copy of the License at
9
 *
10
 *     http://www.apache.org/licenses/LICENSE-2.0
11
 *
12
 * Unless required by applicable law or agreed to in writing, software
13
 * distributed under the License is distributed on an "AS IS" BASIS,
14
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15
 * See the License for the specific language governing permissions and
16
 * limitations under the License.
17
 *
18
 * @package     qpush-bundle
19
 * @copyright   Underground Elephant 2014
20
 * @license     Apache License, Version 2.0
21
 */
22
23
namespace Uecode\Bundle\QPushBundle\Provider;
24
25
use Aws\Sns\Exception\SnsException;
26
use Aws\Sns\SnsClient;
27
use Aws\Sqs\Exception\SqsException;
28
use Aws\Sqs\SqsClient;
29
30
use Doctrine\Common\Cache\Cache;
31
use Monolog\Logger;
32
use Symfony\Component\EventDispatcher\EventDispatcherInterface;
33
use Uecode\Bundle\QPushBundle\Event\Events;
34
use Uecode\Bundle\QPushBundle\Event\MessageEvent;
35
use Uecode\Bundle\QPushBundle\Event\NotificationEvent;
36
use Uecode\Bundle\QPushBundle\Message\Message;
37
38
/**
39
 * @author Keith Kirk <[email protected]>
40
 */
41
class AwsProvider extends AbstractProvider
42
{
43
    /**
44
     * Aws SQS Client
45
     *
46
     * @var SqsClient
47
     */
48
    private $sqs;
49
50
    /**
51
     * Aws SNS Client
52
     *
53
     * @var SnsClient
54
     */
55
    private $sns;
56
57
    /**
58
     * SQS Queue URL
59
     *
60
     * @var string
61
     */
62
    private $queueUrl;
63
64
    /**
65
     * SNS Topic ARN
66
     *
67
     * @var string
68
     */
69
    private $topicArn;
70
71
    /**
72
     * @param string $name
73
     * @param array  $options
74
     * @param mixed  $client
75
     * @param Cache  $cache
76
     * @param Logger $logger
77
     */
78
    public function __construct($name, array $options, $client, Cache $cache, Logger $logger)
79
    {
80
        $this->name     = $name;
81
        $this->options  = $options;
82
        $this->cache    = $cache;
83
        $this->logger   = $logger;
84
85
        // get() method used for sdk v2, create methods for v3
86
        $useGet = method_exists($client, 'get');
87
        $this->sqs = $useGet ? $client->get('Sqs') : $client->createSqs();
88
        $this->sns = $useGet ? $client->get('Sns') : $client->createSns();
89
    }
90
91
    /**
92
     * @return string
93
     */
94
    public function getProvider()
95
    {
96
        return 'AWS';
97
    }
98
99
    /**
100
     * Builds the configured queues
101
     *
102
     * If a Queue name is passed and configured, this method will build only that
103
     * Queue.
104
     *
105
     * All Create methods are idempotent, if the resource exists, the current ARN
106
     * will be returned
107
     *
108
     * @return bool
109
     */
110
    public function create()
111
    {
112
        $this->createQueue();
113
114
        if ($this->options['push_notifications']) {
115
            // Create the SNS Topic
116
            $this->createTopic();
117
118
            // Add the SQS Queue as a Subscriber to the SNS Topic
119
            $this->subscribeToTopic(
120
                $this->topicArn,
121
                'sqs',
122
                $this->sqs->getQueueArn($this->queueUrl)
123
            );
124
125
            // Add configured Subscribers to the SNS Topic
126
            foreach ($this->options['subscribers'] as $subscriber) {
127
                $this->subscribeToTopic(
128
                    $this->topicArn,
129
                    $subscriber['protocol'],
130
                    $subscriber['endpoint']
131
                );
132
            }
133
        }
134
135
        return true;
136
    }
137
138
    /**
139
     * @return Boolean
140
     */
141
    public function destroy()
142
    {
143
        $key = $this->getNameWithPrefix() . '_url';
144
        $this->cache->delete($key);
145
146
        if ($this->queueExists()) {
147
            // Delete the SQS Queue
148
            $this->sqs->deleteQueue([
149
                'QueueUrl' => $this->queueUrl
150
            ]);
151
152
            $this->log(200,"SQS Queue removed", ['QueueUrl' => $this->queueUrl]);
153
        }
154
155
        $key = $this->getNameWithPrefix() . '_arn';
156
        $this->cache->delete($key);
157
158
        if ($this->topicExists() || !empty($this->queueUrl)) {
159
            // Delete the SNS Topic
160
            $topicArn = !empty($this->topicArn)
161
                ? $this->topicArn
162
                : str_replace('sqs', 'sns', $this->queueUrl)
163
            ;
164
165
            $this->sns->deleteTopic([
166
                'TopicArn' => $topicArn
167
            ]);
168
169
            $this->log(200,"SNS Topic removed", ['TopicArn' => $topicArn]);
170
        }
171
172
        return true;
173
    }
174
175
    /**
176
     * {@inheritDoc}
177
     *
178
     * This method will either use a SNS Topic to publish a queued message or
179
     * straight to SQS depending on the application configuration.
180
     *
181
     * @return string
182
     */
183
    public function publish(array $message, array $options = [])
184
    {
185
        $mergedOptions = $this->mergeOptions($options);
186
187
        if (isset($options['message_deduplication_id'])) {
188
            $mergedOptions['message_deduplication_id'] = $options['message_deduplication_id'];
189
        }
190
191
        if (isset($options['message_group_id'])) {
192
            $mergedOptions['message_group_id'] = $options['message_group_id'];
193
        }
194
195
        $options = $mergedOptions;
196
197
        $publishStart = microtime(true);
198
199
        // ensures that the SQS Queue and SNS Topic exist
200
        if (!$this->queueExists()) {
201
            $this->create();
202
        }
203
204
        if ($options['push_notifications']) {
205
206
            if (!$this->topicExists()) {
207
                $this->create();
208
            }
209
210
            $message    = [
211
                'default' => $this->getNameWithPrefix(),
212
                'sqs'     => json_encode($message),
213
                'http'    => $this->getNameWithPrefix(),
214
                'https'   => $this->getNameWithPrefix(),
215
            ];
216
217
            $result = $this->sns->publish([
218
                'TopicArn'         => $this->topicArn,
219
                'Subject'          => $this->getName(),
220
                'Message'          => json_encode($message),
221
                'MessageStructure' => 'json'
222
            ]);
223
224
            $context = [
225
                'TopicArn'           => $this->topicArn,
226
                'MessageId'          => $result->get('MessageId'),
227
                'push_notifications' => $options['push_notifications'],
228
                'publish_time'       => microtime(true) - $publishStart
229
            ];
230
            $this->log(200,"Message published to SNS", $context);
231
232
            return $result->get('MessageId');
233
        }
234
235
        $arguments = [
236
            'QueueUrl'      => $this->queueUrl,
237
            'MessageBody'   => json_encode($message),
238
            'DelaySeconds'  => $options['message_delay']
239
        ];
240
241
        if ($this->isQueueFIFO()) {
242
            if (isset($options['message_deduplication_id'])) {
243
                // Always use user supplied dedup id
244
                $arguments['MessageDeduplicationId'] = $options['message_deduplication_id'];
245
            } elseif ($options['content_based_deduplication'] !== true) {
246
                // If none is supplied and option "content_based_deduplication" is not set, generate default
247
                $arguments['MessageDeduplicationId'] = hash('sha256', json_encode($message));
248
            }
249
250
            $arguments['MessageGroupId'] = $this->getNameWithPrefix();
251
            if (isset($options['message_group_id'])) {
252
                $arguments['MessageGroupId'] = $options['message_group_id'];
253
            }
254
        }
255
256
        $result = $this->sqs->sendMessage($arguments);
257
258
        $context = [
259
            'QueueUrl'              => $this->queueUrl,
260
            'MessageId'             => $result->get('MessageId'),
261
            'push_notifications'    => $options['push_notifications'],
262
            'fifo'                  => $options['fifo']
263
        ];
264
265
        if ($this->isQueueFIFO()) {
266
            if (isset($arguments['MessageDeduplicationId'])) {
267
                $context['message_deduplication_id'] = $arguments['MessageDeduplicationId'];
268
            }
269
            $context['message_group_id'] = $arguments['MessageGroupId'];
270
        }
271
272
        $this->log(200,"Message published to SQS", $context);
273
274
        return $result->get('MessageId');
275
    }
276
277
    /**
278
     * {@inheritDoc}
279
     */
280
    public function receive(array $options = [])
281
    {
282
        $options = $this->mergeOptions($options);
283
284
        if (!$this->queueExists()) {
285
            $this->create();
286
        }
287
288
        $result = $this->sqs->receiveMessage([
289
            'QueueUrl'              => $this->queueUrl,
290
            'MaxNumberOfMessages'   => $options['messages_to_receive'],
291
            'WaitTimeSeconds'       => $options['receive_wait_time']
292
        ]);
293
294
        $messages = $result->get('Messages') ?: [];
295
296
        // Convert to Message Class
297
        foreach ($messages as &$message) {
298
            $id = $message['MessageId'];
299
            $metadata = [
300
                'ReceiptHandle' => $message['ReceiptHandle'],
301
                'MD5OfBody'     => $message['MD5OfBody']
302
            ];
303
304
            // When using SNS, the SQS Body is the entire SNS Message
305
            if(is_array($body = json_decode($message['Body'], true))
306
                && isset($body['Message'])
307
            ) {
308
                $body = json_decode($body['Message'], true);
309
            }
310
311
            $message = new Message($id, $body, $metadata);
312
313
            $context = ['MessageId' => $id];
314
            $this->log(200,"Message fetched from SQS Queue", $context);
315
316
        }
317
318
        return $messages;
319
    }
320
321
    /**
322
     * {@inheritDoc}
323
     *
324
     * @return bool
325
     */
326
    public function delete($id)
327
    {
328
        if (!$this->queueExists()) {
329
            return false;
330
        }
331
332
        $this->sqs->deleteMessage([
333
            'QueueUrl'      => $this->queueUrl,
334
            'ReceiptHandle' => $id
335
        ]);
336
337
        $context = [
338
            'QueueUrl'      => $this->queueUrl,
339
            'ReceiptHandle' => $id
340
        ];
341
        $this->log(200,"Message deleted from SQS Queue", $context);
342
343
        return true;
344
    }
345
346
    /**
347
     * Return the Queue Url
348
     *
349
     * This method relies on in-memory cache and the Cache provider
350
     * to reduce the need to needlessly call the create method on an existing
351
     * Queue.
352
     *
353
     * @return boolean
354
     */
355
    public function queueExists()
356
    {
357
        if (isset($this->queueUrl)) {
358
            return true;
359
        }
360
361
        $key = $this->getNameWithPrefix() . '_url';
362
        if ($this->cache->contains($key)) {
363
            $this->queueUrl = $this->cache->fetch($key);
364
365
            return true;
366
        }
367
368
        try {
369
            $result = $this->sqs->getQueueUrl([
370
                'QueueName' => $this->getNameWithPrefix()
371
            ]);
372
373
            $this->queueUrl = $result->get('QueueUrl');
374
            if ($this->queueUrl !== null) {
375
                $this->cache->save($key, $this->queueUrl);
376
377
                return true;
378
            }
379
        } catch (SqsException $e) {}
0 ignored issues
show
Coding Style Comprehensibility introduced by
Consider adding a comment why this CATCH block is empty.
Loading history...
380
381
        return false;
382
    }
383
384
    /**
385
     * Creates an SQS Queue and returns the Queue Url
386
     *
387
     * The create method for SQS Queues is idempotent - if the queue already
388
     * exists, this method will return the Queue Url of the existing Queue.
389
     */
390
    public function createQueue()
391
    {
392
        $attributes = [
393
            'VisibilityTimeout'             => $this->options['message_timeout'],
394
            'MessageRetentionPeriod'        => $this->options['message_expiration'],
395
            'ReceiveMessageWaitTimeSeconds' => $this->options['receive_wait_time']
396
        ];
397
398
        if ($this->isQueueFIFO()) {
399
            $attributes['FifoQueue'] = 'true';
400
            $attributes['ContentBasedDeduplication'] = $this->options['content_based_deduplication'] === true
401
                ? 'true'
402
                : 'false';
403
        }
404
405
        $result = $this->sqs->createQueue(['QueueName' => $this->getNameWithPrefix(), 'Attributes' => $attributes]);
406
407
        $this->queueUrl = $result->get('QueueUrl');
408
409
        $key = $this->getNameWithPrefix() . '_url';
410
        $this->cache->save($key, $this->queueUrl);
411
412
        $this->log(200, "Created SQS Queue", ['QueueUrl' => $this->queueUrl]);
413
414
        if ($this->options['push_notifications']) {
415
416
            $policy = $this->createSqsPolicy();
417
418
            $this->sqs->setQueueAttributes([
419
                'QueueUrl'      => $this->queueUrl,
420
                'Attributes'    => [
421
                    'Policy'    => $policy,
422
                ]
423
            ]);
424
425
            $this->log(200, "Created Updated SQS Policy");
426
        }
427
    }
428
429
    /**
430
     * Creates a Policy for SQS that's required to allow SNS SendMessage access
431
     *
432
     * @return string
433
     */
434
    public function createSqsPolicy()
435
    {
436
        $arn = $this->sqs->getQueueArn($this->queueUrl);
437
438
        return json_encode([
439
            'Version'   => '2008-10-17',
440
            'Id'        =>  sprintf('%s/SQSDefaultPolicy', $arn),
441
            'Statement' => [
442
                [
443
                    'Sid'       => 'SNSPermissions',
444
                    'Effect'    => 'Allow',
445
                    'Principal' => ['AWS' => '*'],
446
                    'Action'    => 'SQS:SendMessage',
447
                    'Resource'  => $arn
448
                ]
449
            ]
450
        ]);
451
    }
452
453
    /**
454
     * Checks to see if a Topic exists
455
     *
456
     * This method relies on in-memory cache and the Cache provider
457
     * to reduce the need to needlessly call the create method on an existing
458
     * Topic.
459
     *
460
     * @return boolean
461
     */
462
    public function topicExists()
463
    {
464
        if (isset($this->topicArn)) {
465
            return true;
466
        }
467
468
        $key = $this->getNameWithPrefix() . '_arn';
469
        if ($this->cache->contains($key)) {
470
            $this->topicArn = $this->cache->fetch($key);
471
472
            return true;
473
        }
474
475
        if (!empty($this->queueUrl)) {
476
            $queueArn = $this->sqs->getQueueArn($this->queueUrl);
477
            $topicArn = str_replace('sqs', 'sns', $queueArn);
478
479
            try {
480
                $this->sns->getTopicAttributes([
481
                    'TopicArn' => $topicArn
482
                ]);
483
            } catch (SnsException $e) {
484
                return false;
485
            }
486
487
            $this->topicArn = $topicArn;
488
            $this->cache->save($key, $this->topicArn);
489
490
            return true;
491
        }
492
493
        return false;
494
    }
495
496
    /**
497
     * Creates a SNS Topic and returns the ARN
498
     *
499
     * The create method for the SNS Topics is idempotent - if the topic already
500
     * exists, this method will return the Topic ARN of the existing Topic.
501
     *
502
     *
503
     * @return bool
504
     */
505
    public function createTopic()
506
    {
507
        if (!$this->options['push_notifications']) {
508
            return false;
509
        }
510
511
        $result = $this->sns->createTopic([
512
            'Name' => $this->getNameWithPrefix()
513
        ]);
514
515
        $this->topicArn = $result->get('TopicArn');
516
517
        $key = $this->getNameWithPrefix() . '_arn';
518
        $this->cache->save($key, $this->topicArn);
519
520
        $this->log(200, "Created SNS Topic", ['TopicARN' => $this->topicArn]);
521
522
        return true;
523
    }
524
525
    /**
526
     * Get a list of Subscriptions for the specified SNS Topic
527
     *
528
     * @param string $topicArn The SNS Topic Arn
529
     *
530
     * @return array
531
     */
532
    public function getTopicSubscriptions($topicArn)
533
    {
534
        $result = $this->sns->listSubscriptionsByTopic([
535
            'TopicArn' => $topicArn
536
        ]);
537
538
        return $result->get('Subscriptions');
539
    }
540
541
    /**
542
     * Subscribes an endpoint to a SNS Topic
543
     *
544
     * @param string $topicArn The ARN of the Topic
545
     * @param string $protocol The protocol of the Endpoint
546
     * @param string $endpoint The Endpoint of the Subscriber
547
     *
548
     * @return string
549
     */
550
    public function subscribeToTopic($topicArn, $protocol, $endpoint)
551
    {
552
        // Check against the current Topic Subscriptions
553
        $subscriptions = $this->getTopicSubscriptions($topicArn);
554
        foreach ($subscriptions as $subscription) {
555
            if ($endpoint === $subscription['Endpoint']) {
556
                return $subscription['SubscriptionArn'];
557
            }
558
        }
559
560
        $result = $this->sns->subscribe([
561
            'TopicArn' => $topicArn,
562
            'Protocol' => $protocol,
563
            'Endpoint' => $endpoint
564
        ]);
565
566
        $arn = $result->get('SubscriptionArn');
567
568
        $context = [
569
            'Endpoint' => $endpoint,
570
            'Protocol' => $protocol,
571
            'SubscriptionArn' => $arn
572
        ];
573
        $this->log(200, "Endpoint Subscribed to SNS Topic", $context);
574
575
        return $arn;
576
    }
577
578
    /**
579
     * Unsubscribes an endpoint from a SNS Topic
580
     *
581
     * The method will return TRUE on success, or FALSE if the Endpoint did not
582
     * have a Subscription on the SNS Topic
583
     *
584
     * @param string $topicArn The ARN of the Topic
585
     * @param string $protocol The protocol of the Endpoint
586
     * @param string $endpoint The Endpoint of the Subscriber
587
     *
588
     * @return Boolean
589
     */
590
    public function unsubscribeFromTopic($topicArn, $protocol, $endpoint)
591
    {
592
        // Check against the current Topic Subscriptions
593
        $subscriptions = $this->getTopicSubscriptions($topicArn);
594
        foreach ($subscriptions as $subscription) {
595
            if ($endpoint === $subscription['Endpoint']) {
596
                $this->sns->unsubscribe([
597
                    'SubscriptionArn' => $subscription['SubscriptionArn']
598
                ]);
599
600
                $context = [
601
                    'Endpoint' => $endpoint,
602
                    'Protocol' => $protocol,
603
                    'SubscriptionArn' => $subscription['SubscriptionArn']
604
                ];
605
                $this->log(200,"Endpoint unsubscribed from SNS Topic", $context);
606
607
                return true;
608
            }
609
        }
610
611
        return false;
612
    }
613
614
    /**
615
     * Handles SNS Notifications
616
     *
617
     * For Subscription notifications, this method will automatically confirm
618
     * the Subscription request
619
     *
620
     * For Message notifications, this method polls the queue and dispatches
621
     * the `{queue}.message_received` event for each message retrieved
622
     *
623
     * @param NotificationEvent $event The Notification Event
624
     * @param string $eventName Name of the event
625
     * @param EventDispatcherInterface $dispatcher
626
     *
627
     * @return void
628
     */
629
    public function onNotification(NotificationEvent $event, $eventName, EventDispatcherInterface $dispatcher)
630
    {
631
        if (NotificationEvent::TYPE_SUBSCRIPTION == $event->getType()) {
632
            $topicArn   = $event->getNotification()->getMetadata()->get('TopicArn');
0 ignored issues
show
Bug introduced by
The method getMetadata cannot be called on $event->getNotification() (of type array).

Methods can only be called on objects. This check looks for methods being called on variables that have been inferred to never be objects.

Loading history...
633
            $token      = $event->getNotification()->getMetadata()->get('Token');
0 ignored issues
show
Bug introduced by
The method getMetadata cannot be called on $event->getNotification() (of type array).

Methods can only be called on objects. This check looks for methods being called on variables that have been inferred to never be objects.

Loading history...
634
635
            $this->sns->confirmSubscription([
636
                'TopicArn'  => $topicArn,
637
                'Token'     => $token
638
            ]);
639
640
            $context = ['TopicArn' => $topicArn];
641
            $this->log(200,"Subscription to SNS Confirmed", $context);
642
643
            return;
644
        }
645
646
        $messages = $this->receive();
647
        foreach ($messages as $message) {
648
649
            $messageEvent = new MessageEvent($this->name, $message);
650
            $dispatcher->dispatch(Events::Message($this->name), $messageEvent);
651
        }
652
    }
653
654
    /**
655
     * Removes the message from queue after all other listeners have fired
656
     *
657
     * If an earlier listener has erred or stopped propagation, this method
658
     * will not fire and the Queued Message should become visible in queue again.
659
     *
660
     * Stops Event Propagation after removing the Message
661
     *
662
     * @param MessageEvent $event The SQS Message Event
663
     *
664
     * @return void
665
     */
666
    public function onMessageReceived(MessageEvent $event)
667
    {
668
        $receiptHandle = $event
669
            ->getMessage()
670
            ->getMetadata()
671
            ->get('ReceiptHandle');
672
673
        $this->delete($receiptHandle);
674
675
        $event->stopPropagation();
676
    }
677
678
    /**
679
     * @return bool
680
     */
681
    private function isQueueFIFO()
682
    {
683
        return $this->options['fifo'] === true;
684
    }
685
}
686