Completed
Pull Request — master (#117)
by
unknown
11:23
created

AwsProvider   B

Complexity

Total Complexity 50

Size/Duplication

Total Lines 582
Duplicated Lines 0 %

Coupling/Cohesion

Components 1
Dependencies 11

Test Coverage

Coverage 96.48%

Importance

Changes 0
Metric Value
wmc 50
lcom 1
cbo 11
dl 0
loc 582
ccs 219
cts 227
cp 0.9648
rs 8.6206
c 0
b 0
f 0

17 Methods

Rating   Name   Duplication   Size   Complexity  
A getProvider() 0 4 1
B create() 0 27 3
A __construct() 0 12 3
B destroy() 0 33 5
A publish() 0 58 4
B receive() 0 40 6
A delete() 0 19 2
B queueExists() 0 27 5
B createQueue() 0 32 2
A createSqsPolicy() 0 18 1
B topicExists() 0 33 5
A createTopic() 0 17 2
A getTopicSubscriptions() 0 8 1
B subscribeToTopic() 0 27 3
A unsubscribeFromTopic() 0 23 3
B onNotification() 0 24 3
A onMessageReceived() 0 11 1

How to fix   Complexity   

Complex Class

Complex classes like AwsProvider often do a lot of different things. To break such a class down, we need to identify a cohesive component within that class. A common approach to find such a component is to look for fields/methods that share the same prefixes, or suffixes. You can also have a look at the cohesion graph to spot any un-connected, or weakly-connected components.

Once you have determined the fields that belong together, you can apply the Extract Class refactoring. If the component makes sense as a sub-class, Extract Subclass is also a candidate, and is often faster.

While breaking up the class, it is a good idea to analyze how other classes use AwsProvider, and based on these observations, apply Extract Interface, too.

1
<?php
2
3
/**
4
 * Copyright 2014 Underground Elephant
5
 *
6
 * Licensed under the Apache License, Version 2.0 (the "License");
7
 * you may not use this file except in compliance with the License.
8
 * You may obtain a copy of the License at
9
 *
10
 *     http://www.apache.org/licenses/LICENSE-2.0
11
 *
12
 * Unless required by applicable law or agreed to in writing, software
13
 * distributed under the License is distributed on an "AS IS" BASIS,
14
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15
 * See the License for the specific language governing permissions and
16
 * limitations under the License.
17
 *
18
 * @package     qpush-bundle
19
 * @copyright   Underground Elephant 2014
20
 * @license     Apache License, Version 2.0
21
 */
22
23
namespace Uecode\Bundle\QPushBundle\Provider;
24
25
use Aws\Sns\Exception\SnsException;
26
use Aws\Sns\SnsClient;
27
use Aws\Sqs\Exception\SqsException;
28
use Aws\Sqs\SqsClient;
29
30
use Doctrine\Common\Cache\Cache;
31
use Monolog\Logger;
32
use Symfony\Component\EventDispatcher\EventDispatcherInterface;
33
use Uecode\Bundle\QPushBundle\Event\Events;
34
use Uecode\Bundle\QPushBundle\Event\MessageEvent;
35
use Uecode\Bundle\QPushBundle\Event\NotificationEvent;
36
use Uecode\Bundle\QPushBundle\Message\Message;
37
38
/**
39
 * @author Keith Kirk <[email protected]>
40
 */
41
class AwsProvider extends AbstractProvider
42
{
43
    /**
44
     * Aws SQS Client
45
     *
46
     * @var SqsClient
47
     */
48
    private $sqs;
49
50
    /**
51
     * Aws SNS Client
52
     *
53
     * @var SnsClient
54
     */
55
    private $sns;
56
57
    /**
58
     * SQS Queue URL
59
     *
60
     * @var string
61
     */
62
    private $queueUrl;
63
64
    /**
65
     * SNS Topic ARN
66
     *
67
     * @var string
68
     */
69
    private $topicArn;
70
71 15
    public function __construct($name, array $options, $client, Cache $cache, Logger $logger)
72
    {
73 15
        $this->name     = $name;
74 15
        $this->options  = $options;
75 15
        $this->cache    = $cache;
76 15
        $this->logger   = $logger;
77
78
        // get() method used for sdk v2, create methods for v3
79 15
        $useGet = method_exists($client, 'get');
80 15
        $this->sqs = $useGet ? $client->get('Sqs') : $client->createSqs();
81 15
        $this->sns = $useGet ? $client->get('Sns') : $client->createSns();
82 15
    }
83
84 1
    public function getProvider()
85
    {
86 1
        return "AWS";
87
    }
88
89
    /**
90
     * Builds the configured queues
91
     *
92
     * If a Queue name is passed and configured, this method will build only that
93
     * Queue.
94
     *
95
     * All Create methods are idempotent, if the resource exists, the current ARN
96
     * will be returned
97
     *
98
     */
99 1
    public function create()
100
    {
101 1
        $this->createQueue();
102
103 1
        if ($this->options['push_notifications']) {
104
           // Create the SNS Topic
105 1
           $this->createTopic();
106
107
           // Add the SQS Queue as a Subscriber to the SNS Topic
108 1
           $this->subscribeToTopic(
109 1
               $this->topicArn,
110 1
               'sqs',
111 1
               $this->sqs->getQueueArn($this->queueUrl)
112
           );
113
114
           // Add configured Subscribers to the SNS Topic
115 1
           foreach ($this->options['subscribers'] as $subscriber) {
116 1
                $this->subscribeToTopic(
117 1
                    $this->topicArn,
118 1
                    $subscriber['protocol'],
119 1
                    $subscriber['endpoint']
120
                );
121
            }
122
        }
123
124 1
        return true;
125
    }
126
127
    /**
128
     * @return Boolean
129
     */
130 1
    public function destroy()
131
    {
132 1
        $key = $this->getNameWithPrefix() . '_url';
133 1
        $this->cache->delete($key);
134
135 1
        if ($this->queueExists()) {
136
            // Delete the SQS Queue
137 1
            $this->sqs->deleteQueue([
138 1
                'QueueUrl' => $this->queueUrl
139
            ]);
140
141 1
            $this->log(200,"SQS Queue removed", ['QueueUrl' => $this->queueUrl]);
142
        }
143
144 1
        $key = $this->getNameWithPrefix() . '_arn';
145 1
        $this->cache->delete($key);
146
147 1
        if ($this->topicExists() || !empty($this->queueUrl)) {
148
            // Delete the SNS Topic
149 1
            $topicArn = !empty($this->topicArn)
150 1
                ? $this->topicArn
151 1
                : str_replace('sqs', 'sns', $this->queueUrl)
152
            ;
153
154 1
            $this->sns->deleteTopic([
155 1
                'TopicArn' => $topicArn
156
            ]);
157
158 1
            $this->log(200,"SNS Topic removed", ['TopicArn' => $topicArn]);
159
        }
160
161 1
        return true;
162
    }
163
164
    /**
165
     * {@inheritDoc}
166
     *
167
     * This method will either use a SNS Topic to publish a queued message or
168
     * straight to SQS depending on the application configuration.
169
     *
170
     * @return string
171
     */
172 2
    public function publish(array $message, array $options = [])
173
    {
174 2
        $options      = $this->mergeOptions($options);
175 2
        $publishStart = microtime(true);
176
177
        // ensures that the SQS Queue and SNS Topic exist
178 2
        if (!$this->queueExists()) {
179
            $this->create();
180
        }
181
182 2
        if ($options['push_notifications']) {
183
184 1
            if (!$this->topicExists()) {
185
                $this->create();
186
            }
187
188
            $message    = [
189 1
                'default'   => $this->getNameWithPrefix(),
190 1
                'sqs'       => json_encode($message),
191 1
                'http'      => $this->getNameWithPrefix(),
192 1
                'https'     => $this->getNameWithPrefix(),
193
            ];
194
195 1
            $result = $this->sns->publish([
196 1
                'TopicArn'          => $this->topicArn,
197 1
                'Subject'           => $this->getName(),
198 1
                'Message'           => json_encode($message),
199 1
                'MessageStructure'  => 'json'
200
            ]);
201
202
            $context = [
203 1
                'TopicArn'              => $this->topicArn,
204 1
                'MessageId'             => $result->get('MessageId'),
205 1
                'push_notifications'    => $options['push_notifications'],
206 1
                'publish_time'          => microtime(true) - $publishStart
207
            ];
208 1
            $this->log(200,"Message published to SNS", $context);
209
210 1
            return $result->get('MessageId');
211
        }
212
213 1
        $result = $this->sqs->sendMessage([
214 1
            'QueueUrl'               => $this->queueUrl,
215 1
            'MessageBody'            => json_encode($message),
216 1
            'DelaySeconds'           => $options['message_delay'],
217 1
            'MessageDeduplicationId' => hash('sha256',json_encode($message)),
218 1
            'MessageGroupId'         => $this->getNameWithPrefix()
219
        ]);
220
221
        $context = [
222 1
            'QueueUrl'              => $this->queueUrl,
223 1
            'MessageId'             => $result->get('MessageId'),
224 1
            'push_notifications'    => $options['push_notifications']
225
        ];
226 1
        $this->log(200,"Message published to SQS", $context);
227
228 1
        return $result->get('MessageId');
229
    }
230
231
    /**
232
     * {@inheritDoc}
233
     */
234 2
    public function receive(array $options = [])
235
    {
236 2
        $options = $this->mergeOptions($options);
237
238 2
        if (!$this->queueExists()) {
239
            $this->create();
240
        }
241
242 2
        $result = $this->sqs->receiveMessage([
243 2
            'QueueUrl'              => $this->queueUrl,
244 2
            'MaxNumberOfMessages'   => $options['messages_to_receive'],
245 2
            'WaitTimeSeconds'       => $options['receive_wait_time']
246
        ]);
247
248 2
        $messages = $result->get('Messages') ?: [];
249
250
        // Convert to Message Class
251 2
        foreach ($messages as &$message) {
252 2
            $id = $message['MessageId'];
253
            $metadata = [
254 2
                'ReceiptHandle' => $message['ReceiptHandle'],
255 2
                'MD5OfBody'     => $message['MD5OfBody']
256
            ];
257
258
            // When using SNS, the SQS Body is the entire SNS Message
259 2
            if(is_array($body = json_decode($message['Body'], true))
260 2
                && isset($body['Message'])
261
            ) {
262 2
                $body = json_decode($body['Message'], true);
263
            }
264
265 2
            $message = new Message($id, $body, $metadata);
266
267 2
            $context = ['MessageId' => $id];
268 2
            $this->log(200,"Message fetched from SQS Queue", $context);
269
270
        }
271
272 2
        return $messages;
273
    }
274
275
    /**
276
     * {@inheritDoc}
277
     *
278
     * @return bool
279
     */
280 2
    public function delete($id)
281
    {
282 2
        if (!$this->queueExists()) {
283
            return false;
284
        }
285
286 2
        $this->sqs->deleteMessage([
287 2
            'QueueUrl'      => $this->queueUrl,
288 2
            'ReceiptHandle' => $id
289
        ]);
290
291
        $context = [
292 2
            'QueueUrl'      => $this->queueUrl,
293 2
            'ReceiptHandle' => $id
294
        ];
295 2
        $this->log(200,"Message deleted from SQS Queue", $context);
296
297 2
        return true;
298
    }
299
300
    /**
301
     * Return the Queue Url
302
     *
303
     * This method relies on in-memory cache and the Cache provider
304
     * to reduce the need to needlessly call the create method on an existing
305
     * Queue.
306
     *
307
     * @return boolean
308
     */
309 9
    public function queueExists()
310
    {
311 9
        if (isset($this->queueUrl)) {
312 3
            return true;
313
        }
314
315 7
        $key = $this->getNameWithPrefix() . '_url';
316 7
        if ($this->cache->contains($key)) {
317 1
            $this->queueUrl = $this->cache->fetch($key);
318
319 1
            return true;
320
        }
321
322
        try {
323 6
            $result = $this->sqs->getQueueUrl([
324 6
                'QueueName' => $this->getNameWithPrefix()
325
            ]);
326
327 6
            if ($this->queueUrl = $result->get('QueueUrl')) {
328 6
                $this->cache->save($key, $this->queueUrl);
329
330 6
                return true;
331
            }
332
        } catch (SqsException $e) {}
0 ignored issues
show
Coding Style Comprehensibility introduced by
Consider adding a comment why this CATCH block is empty.
Loading history...
333
334
        return false;
335
    }
336
337
    /**
338
     * Creates an SQS Queue and returns the Queue Url
339
     *
340
     * The create method for SQS Queues is idempotent - if the queue already
341
     * exists, this method will return the Queue Url of the existing Queue.
342
     *
343
     * @return string
344
     */
345 3
    public function createQueue()
346
    {
347 3
        $result = $this->sqs->createQueue([
348 3
            'QueueName' => $this->getNameWithPrefix(),
349
            'Attributes'    => [
350 3
                'VisibilityTimeout'             => $this->options['message_timeout'],
351 3
                'MessageRetentionPeriod'        => $this->options['message_expiration'],
352 3
                'ReceiveMessageWaitTimeSeconds' => $this->options['receive_wait_time']
353
            ]
354
        ]);
355
356 3
        $this->queueUrl = $result->get('QueueUrl');
357
358 3
        $key = $this->getNameWithPrefix() . '_url';
359 3
        $this->cache->save($key, $this->queueUrl);
360
361 3
        $this->log(200, "Created SQS Queue", ['QueueUrl' => $this->queueUrl]);
362
363 3
        if ($this->options['push_notifications']) {
364
365 2
            $policy = $this->createSqsPolicy();
366
367 2
            $this->sqs->setQueueAttributes([
368 2
                'QueueUrl'      => $this->queueUrl,
369
                'Attributes'    => [
370 2
                    'Policy'    => $policy,
371
                ]
372
            ]);
373
374 2
            $this->log(200, "Created Updated SQS Policy");
375
        }
376 3
    }
377
378
    /**
379
     * Creates a Policy for SQS that's required to allow SNS SendMessage access
380
     *
381
     * @return string
382
     */
383 2
    public function createSqsPolicy()
384
    {
385 2
        $arn = $this->sqs->getQueueArn($this->queueUrl);
386
387 2
        return json_encode([
388 2
            'Version'   => '2008-10-17',
389 2
            'Id'        =>  sprintf('%s/SQSDefaultPolicy', $arn),
390
            'Statement' => [
391
                [
392 2
                    'Sid'       => 'SNSPermissions',
393 2
                    'Effect'    => 'Allow',
394
                    'Principal' => ['AWS' => '*'],
395 2
                    'Action'    => 'SQS:SendMessage',
396 2
                    'Resource'  => $arn
397
                ]
398
            ]
399
        ]);
400
    }
401
402
    /**
403
     * Checks to see if a Topic exists
404
     *
405
     * This method relies on in-memory cache and the Cache provider
406
     * to reduce the need to needlessly call the create method on an existing
407
     * Topic.
408
     *
409
     * @return boolean
410
     */
411 3
    public function topicExists()
412
    {
413 3
        if (isset($this->topicArn)) {
414 1
            return true;
415
        }
416
417 3
        $key = $this->getNameWithPrefix() . '_arn';
418 3
        if ($this->cache->contains($key)) {
419 1
            $this->topicArn = $this->cache->fetch($key);
420
421 1
            return true;
422
        }
423
424 3
        if (!empty($this->queueUrl)) {
425 2
            $queueArn = $this->sqs->getQueueArn($this->queueUrl);
426 2
            $topicArn = str_replace('sqs', 'sns', $queueArn);
427
428
            try {
429 2
                $this->sns->getTopicAttributes([
430 2
                    'TopicArn' => $topicArn
431
                ]);
432
            } catch (SnsException $e) {
433
                return false;
434
            }
435
436 2
            $this->topicArn = $topicArn;
437 2
            $this->cache->save($key, $this->topicArn);
438
439 2
            return true;
440
        }
441
442 1
        return false;
443
    }
444
445
    /**
446
     * Creates a SNS Topic and returns the ARN
447
     *
448
     * The create method for the SNS Topics is idempotent - if the topic already
449
     * exists, this method will return the Topic ARN of the existing Topic.
450
     *
451
     *
452
     * @return false|null
453
     */
454 2
    public function createTopic()
455
    {
456 2
        if (!$this->options['push_notifications']) {
457 1
            return false;
458
        }
459
460 2
        $result = $this->sns->createTopic([
461 2
            'Name' => $this->getNameWithPrefix()
462
        ]);
463
464 2
        $this->topicArn = $result->get('TopicArn');
465
466 2
        $key = $this->getNameWithPrefix() . '_arn';
467 2
        $this->cache->save($key, $this->topicArn);
468
469 2
        $this->log(200, "Created SNS Topic", ['TopicARN' => $this->topicArn]);
470 2
    }
471
472
    /**
473
     * Get a list of Subscriptions for the specified SNS Topic
474
     *
475
     * @param string $topicArn The SNS Topic Arn
476
     *
477
     * @return array
478
     */
479 4
    public function getTopicSubscriptions($topicArn)
480
    {
481 4
        $result = $this->sns->listSubscriptionsByTopic([
482 4
            'TopicArn' => $topicArn
483
        ]);
484
485 4
        return $result->get('Subscriptions');
486
    }
487
488
    /**
489
     * Subscribes an endpoint to a SNS Topic
490
     *
491
     * @param string $topicArn The ARN of the Topic
492
     * @param string $protocol The protocol of the Endpoint
493
     * @param string $endpoint The Endpoint of the Subscriber
494
     *
495
     * @return string
496
     */
497 2
    public function subscribeToTopic($topicArn, $protocol, $endpoint)
498
    {
499
        // Check against the current Topic Subscriptions
500 2
        $subscriptions = $this->getTopicSubscriptions($topicArn);
501 2
        foreach ($subscriptions as $subscription) {
502 2
            if ($endpoint === $subscription['Endpoint']) {
503 2
                return $subscription['SubscriptionArn'];
504
            }
505
        }
506
507 1
        $result = $this->sns->subscribe([
508 1
            'TopicArn' => $topicArn,
509 1
            'Protocol' => $protocol,
510 1
            'Endpoint' => $endpoint
511
        ]);
512
513 1
        $arn = $result->get('SubscriptionArn');
514
515
        $context = [
516 1
            'Endpoint' => $endpoint,
517 1
            'Protocol' => $protocol,
518 1
            'SubscriptionArn' => $arn
519
        ];
520 1
        $this->log(200, "Endpoint Subscribed to SNS Topic", $context);
521
522 1
        return $arn;
523
    }
524
525
    /**
526
     * Unsubscribes an endpoint from a SNS Topic
527
     *
528
     * The method will return TRUE on success, or FALSE if the Endpoint did not
529
     * have a Subscription on the SNS Topic
530
     *
531
     * @param string $topicArn The ARN of the Topic
532
     * @param string $protocol The protocol of the Endpoint
533
     * @param string $endpoint The Endpoint of the Subscriber
534
     *
535
     * @return Boolean
536
     */
537 1
    public function unsubscribeFromTopic($topicArn, $protocol, $endpoint)
538
    {
539
        // Check against the current Topic Subscriptions
540 1
        $subscriptions = $this->getTopicSubscriptions($topicArn);
541 1
        foreach ($subscriptions as $subscription) {
542 1
            if ($endpoint === $subscription['Endpoint']) {
543 1
                $this->sns->unsubscribe([
544 1
                    'SubscriptionArn' => $subscription['SubscriptionArn']
545
                ]);
546
547
                $context = [
548 1
                    'Endpoint' => $endpoint,
549 1
                    'Protocol' => $protocol,
550 1
                    'SubscriptionArn' => $subscription['SubscriptionArn']
551
                ];
552 1
                $this->log(200,"Endpoint unsubscribed from SNS Topic", $context);
553
554 1
                return true;
555
            }
556
        }
557
558 1
        return false;
559
    }
560
561
    /**
562
     * Handles SNS Notifications
563
     *
564
     * For Subscription notifications, this method will automatically confirm
565
     * the Subscription request
566
     *
567
     * For Message notifications, this method polls the queue and dispatches
568
     * the `{queue}.message_received` event for each message retrieved
569
     *
570
     * @param NotificationEvent $event The Notification Event
571
     * @param string $eventName Name of the event
572
     * @param EventDispatcherInterface $dispatcher
573
     * @return bool|void
574
     */
575 2
    public function onNotification(NotificationEvent $event, $eventName, EventDispatcherInterface $dispatcher)
576
    {
577 2
        if (NotificationEvent::TYPE_SUBSCRIPTION == $event->getType()) {
578 1
            $topicArn   = $event->getNotification()->getMetadata()->get('TopicArn');
0 ignored issues
show
Bug introduced by
The method getMetadata cannot be called on $event->getNotification() (of type array).

Methods can only be called on objects. This check looks for methods being called on variables that have been inferred to never be objects.

Loading history...
579 1
            $token      = $event->getNotification()->getMetadata()->get('Token');
0 ignored issues
show
Bug introduced by
The method getMetadata cannot be called on $event->getNotification() (of type array).

Methods can only be called on objects. This check looks for methods being called on variables that have been inferred to never be objects.

Loading history...
580
581 1
            $this->sns->confirmSubscription([
582 1
                'TopicArn'  => $topicArn,
583 1
                'Token'     => $token
584
            ]);
585
586 1
            $context = ['TopicArn' => $topicArn];
587 1
            $this->log(200,"Subscription to SNS Confirmed", $context);
588
589 1
            return;
590
        }
591
592 1
        $messages = $this->receive();
593 1
        foreach ($messages as $message) {
594
595 1
            $messageEvent = new MessageEvent($this->name, $message);
596 1
            $dispatcher->dispatch(Events::Message($this->name), $messageEvent);
597
        }
598 1
    }
599
600
    /**
601
     * Removes the message from queue after all other listeners have fired
602
     *
603
     * If an earlier listener has erred or stopped propagation, this method
604
     * will not fire and the Queued Message should become visible in queue again.
605
     *
606
     * Stops Event Propagation after removing the Message
607
     *
608
     * @param MessageEvent $event The SQS Message Event
609
     * @return bool|void
610
     */
611 1
    public function onMessageReceived(MessageEvent $event)
612
    {
613
        $receiptHandle = $event
614 1
            ->getMessage()
615 1
            ->getMetadata()
616 1
            ->get('ReceiptHandle');
617
618 1
        $this->delete($receiptHandle);
619
620 1
        $event->stopPropagation();
621 1
    }
622
}
623