Completed
Pull Request — master (#15)
by Alex
08:30
created

QueueReader   A

Complexity

Total Complexity 11

Size/Duplication

Total Lines 110
Duplicated Lines 0 %

Coupling/Cohesion

Components 1
Dependencies 4

Importance

Changes 0
Metric Value
wmc 11
lcom 1
cbo 4
dl 0
loc 110
rs 10
c 0
b 0
f 0

6 Methods

Rating   Name   Duplication   Size   Complexity  
A __construct() 0 7 1
A read() 0 11 2
A purge() 0 4 1
A consume() 0 15 3
A initialize() 0 9 2
A createQueue() 0 7 2
1
<?php
2
3
namespace Cmp\Queues\Infrastructure\AWS\v20121105\Queue;
4
5
use Aws\Sqs\SqsClient;
6
use Cmp\Queues\Domain\Queue\Exception\ReaderException;
7
use Cmp\Queues\Domain\Queue\QueueReader as DomainQueueReader;
8
use Exception;
9
use Psr\Log\LoggerInterface;
10
11
class QueueReader implements DomainQueueReader
12
{
13
    /**
14
     * @var SqsClient
15
     */
16
    protected $client;
17
18
    /**
19
     * @var string
20
     */
21
    protected $queueUrl;
22
23
    /**
24
     * @var string
25
     */
26
    protected $queueName;
27
28
    /**
29
     * @var MessageHandler
30
     */
31
    protected $messageHandler;
32
33
    /**
34
     * @var LoggerInterface
35
     */
36
    protected $logger;
37
38
    /**
39
     * @param SqsClient       $client
40
     * @param string          $queueName
41
     * @param MessageHandler  $messageHandler
42
     * @param LoggerInterface $logger
43
     */
44
    public function __construct(SqsClient $client, $queueName, MessageHandler $messageHandler, LoggerInterface $logger)
45
    {
46
        $this->client = $client;
47
        $this->queueName = $queueName;
48
        $this->logger = $logger;
49
        $this->messageHandler = $messageHandler;
50
    }
51
52
    /**
53
     * @param callable $callback
54
     * @param int      $timeout
55
     *
56
     * @throws ReaderException
57
     */
58
    public function read(callable $callback, $timeout=0)
59
    {
60
        $this->initialize();
61
        $this->messageHandler->setCallback($callback);
62
63
        try {
64
            $this->consume($timeout);
65
        } catch(\Exception $e) {
66
            throw new ReaderException("Error occurred while reading", 0, $e);
67
        }
68
    }
69
70
    /**
71
     * Deletes all messages from the queue
72
     */
73
    public function purge()
74
    {
75
        $this->client->purgeQueue($this->queueUrl);
0 ignored issues
show
Documentation introduced by
$this->queueUrl is of type string, but the function expects a array.

It seems like the type of the argument is not accepted by the function/method which you are calling.

In some cases, in particular if PHP’s automatic type-juggling kicks in this might be fine. In other cases, however this might be a bug.

We suggest to add an explicit type cast like in the following example:

function acceptsInteger($int) { }

$x = '123'; // string "123"

// Instead of
acceptsInteger($x);

// we recommend to use
acceptsInteger((integer) $x);
Loading history...
76
    }
77
78
    /**
79
     * @param int $timeout
80
     */
81
    protected function consume($timeout)
82
    {
83
        $result = $this->client->receiveMessage([
84
            'QueueUrl' => $this->queueUrl,
85
            'MessageAttributeNames' => ['All'],
86
            'MaxNumberOfMessages' => 10,
87
            'WaitTimeSeconds' => $timeout,
88
        ]);
89
90
        $messages = isset($result['Messages']) ? $result['Messages'] : [];
91
        foreach ($messages as $message) {
92
            $this->messageHandler->handleMessage($message);
93
            $this->client->deleteMessage(['QueueUrl' => $this->queueUrl, 'ReceiptHandle' => $message['ReceiptHandle']]);
94
        }
95
    }
96
97
    /**
98
     * @throws ReaderException
99
     */
100
    protected function initialize()
101
    {
102
        try {
103
            $this->createQueue();
104
        } catch (Exception $e) {
105
            $this->logger->error('Error trying to create queue', ['exception' => $e]);
106
            throw new ReaderException('Error initializing queue reader', 0, $e);
107
        }
108
    }
109
110
    /**
111
     * Creates the queue in SQS, nothing will happen if the queue already exists
112
     */
113
    protected function createQueue()
114
    {
115
        if (!isset($this->queueUrl)) {
116
            $result = $this->client->createQueue(array('QueueName' => $this->queueName));
117
            $this->queueUrl = $result['QueueUrl'];
118
        }
119
    }
120
}
121