Completed
Push — master ( e01996...810b60 )
by Alex
13s
created

QueueReader::initialize()   A

Complexity

Conditions 2
Paths 3

Size

Total Lines 10
Code Lines 7

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
dl 0
loc 10
rs 9.4285
c 0
b 0
f 0
cc 2
eloc 7
nc 3
nop 2
1
<?php
2
3
namespace Cmp\Queues\Infrastructure\AWS\v20121105\Queue;
4
5
use Aws\Sqs\SqsClient;
6
use Cmp\Queues\Domain\Queue\Exception\ReaderException;
7
use Cmp\Queues\Domain\Queue\Exception\TimeoutReaderException;
8
use Cmp\Queues\Domain\Queue\QueueReader as DomainQueueReader;
9
use Psr\Log\LoggerInterface;
10
11
class QueueReader implements DomainQueueReader
12
{
13
    /**
14
     * @var SqsClient
15
     */
16
    protected $sqs;
17
18
    /**
19
     * @var string
20
     */
21
    protected $queueUrl;
22
23
    /**
24
     * @var MessageHandler
25
     */
26
    protected $messageHandler;
27
28
    /**
29
     * @var LoggerInterface
30
     */
31
    protected $logger;
32
33
    /**
34
     * @param SqsClient       $sqs
35
     * @param string          $queueUrl
36
     * @param MessageHandler  $messageHandler
37
     * @param LoggerInterface $logger
38
     */
39
    public function __construct(
40
        SqsClient $sqs,
41
        $queueUrl,
42
        MessageHandler $messageHandler,
43
        LoggerInterface $logger
44
    ) {
45
        $this->sqs = $sqs;
46
        $this->queueUrl = $queueUrl;
47
        $this->logger = $logger;
48
        $this->messageHandler = $messageHandler;
49
    }
50
51
    /**
52
     * @param callable $callback
53
     * @param int      $timeout
54
     *
55
     * @throws TimeoutReaderException
56
     * @throws ReaderException
57
     */
58
    public function read(callable $callback, $timeout=0)
59
    {
60
        $this->messageHandler->setCallback($callback);
61
62
        try {
63
            $this->consume($timeout);
64
        } catch(TimeoutReaderException $e) {
65
            throw $e;
66
        } catch(\Exception $e) {
67
            throw new ReaderException("Error occurred while reading", 0, $e);
68
        }
69
    }
70
71
    /**
72
     * Deletes all messages from the queue
73
     */
74
    public function purge()
75
    {
76
        $this->sqs->purgeQueue($this->queueUrl);
0 ignored issues
show
Documentation introduced by
$this->queueUrl is of type string, but the function expects a array.

It seems like the type of the argument is not accepted by the function/method which you are calling.

In some cases, in particular if PHP’s automatic type-juggling kicks in this might be fine. In other cases, however this might be a bug.

We suggest to add an explicit type cast like in the following example:

function acceptsInteger($int) { }

$x = '123'; // string "123"

// Instead of
acceptsInteger($x);

// we recommend to use
acceptsInteger((integer) $x);
Loading history...
77
    }
78
79
    /**
80
     * @param int $timeout
81
     *
82
     * @throws TimeoutReaderException
83
     */
84
    protected function consume($timeout)
85
    {
86
        $result = $this->sqs->receiveMessage([
87
            'QueueUrl' => $this->queueUrl,
88
            'MessageAttributeNames' => ['All'],
89
            'MaxNumberOfMessages' => 10,
90
            'WaitTimeSeconds' => $timeout,
91
        ]);
92
93
        if ($timeout != 0 && !isset($result['Messages'])) {
94
            throw new TimeoutReaderException();
95
        }
96
97
        $messages = isset($result['Messages']) ? $result['Messages'] : [];
98
        foreach ($messages as $message) {
99
            $this->messageHandler->handleMessage($message);
100
            $this->sqs->deleteMessage(['QueueUrl' => $this->queueUrl, 'ReceiptHandle' => $message['ReceiptHandle']]);
101
        }
102
    }
103
}
104