QueueReader   A
last analyzed

Complexity

Total Complexity 15

Size/Duplication

Total Lines 114
Duplicated Lines 0 %

Coupling/Cohesion

Components 1
Dependencies 7

Importance

Changes 0
Metric Value
wmc 15
lcom 1
cbo 7
dl 0
loc 114
rs 10
c 0
b 0
f 0

4 Methods

Rating   Name   Duplication   Size   Complexity  
A __construct() 0 13 1
A read() 0 17 5
A purge() 0 6 1
B consume() 0 21 8
1
<?php
2
3
namespace Cmp\Queues\Infrastructure\AWS\v20121105\Queue;
4
5
use Aws\Sqs\SqsClient;
6
use Cmp\Queues\Domain\Queue\Exception\GracefulStopException;
7
use Cmp\Queues\Domain\Queue\Exception\ReaderException;
8
use Cmp\Queues\Domain\Queue\Exception\TimeoutReaderException;
9
use Cmp\Queues\Domain\Task\Exception\ParseMessageException;
10
use Cmp\Queues\Domain\Queue\QueueReader as DomainQueueReader;
11
12
use Psr\Log\LoggerInterface;
13
14
class QueueReader implements DomainQueueReader
15
{
16
    /**
17
     * @var SqsClient
18
     */
19
    protected $sqs;
20
21
    /**
22
     * @var string
23
     */
24
    protected $queueUrl;
25
26
    /**
27
     * @var int
28
     */
29
    protected $messagesToRead;
30
31
    /**
32
     * @var MessageHandler
33
     */
34
    protected $messageHandler;
35
36
    /**
37
     * @var LoggerInterface
38
     */
39
    protected $logger;
40
41
    /**
42
     * @param SqsClient       $sqs
43
     * @param string          $queueUrl
44
     * @param int             $messagesToRead
45
     * @param MessageHandler  $messageHandler
46
     * @param LoggerInterface $logger
47
     */
48
    public function __construct(
49
        SqsClient $sqs,
50
        $queueUrl,
51
        $messagesToRead,
52
        MessageHandler $messageHandler,
53
        LoggerInterface $logger
54
    ) {
55
        $this->sqs = $sqs;
56
        $this->queueUrl = $queueUrl;
57
        $this->messagesToRead = $messagesToRead;
58
        $this->logger = $logger;
59
        $this->messageHandler = $messageHandler;
60
    }
61
62
    /**
63
     * @param callable $callback
64
     * @param int $timeout
65
     *
66
     * @throws GracefulStopException
67
     * @throws ParseMessageException
68
     * @throws ReaderException
69
     * @throws TimeoutReaderException
70
     */
71
    public function read(callable $callback, $timeout=0)
72
    {
73
        $this->messageHandler->setCallback($callback);
74
75
        try {
76
            $this->consume($timeout);
77
        } catch(ParseMessageException $e){
78
            throw $e;
79
        } catch(GracefulStopException $e) {
80
            $this->logger->info("Gracefully stopping the AWS queue reader", ["exception" => $e]);
81
            throw $e;
82
        } catch(TimeoutReaderException $e) {
83
            throw $e;
84
        } catch(\Exception $e) {
85
            throw new ReaderException("Error occurred while reading", 0, $e);
86
        }
87
    }
88
89
    /**
90
     * Deletes all messages from the queue
91
     */
92
    public function purge()
93
    {
94
        $this->sqs->purgeQueue([
95
            'QueueUrl' => $this->queueUrl,
96
        ]);
97
    }
98
99
    /**
100
     * @param int $timeout
101
     *
102
     * @throws ReaderException
103
     * @throws TimeoutReaderException
104
     * @throws ParseMessageException
105
     */
106
    protected function consume($timeout)
107
    {
108
        $result = $this->sqs->receiveMessage([
109
            'QueueUrl' => $this->queueUrl,
110
            'MessageAttributeNames' => ['All'],
111
            'MaxNumberOfMessages' => $this->messagesToRead,
112
            'WaitTimeSeconds' => $timeout,
113
        ]);
114
115
        if ($timeout != 0 && !isset($result['Messages'])) {
116
            throw new TimeoutReaderException();
117
        }
118
119
        $messages = isset($result['Messages']) ? $result['Messages'] : [];
120
        foreach ($messages as $message) {
121
            $response = $this->messageHandler->handleMessage($message);
122
            if(is_null($response) || !is_bool($response) || $response) {
123
                $this->sqs->deleteMessage(['QueueUrl' => $this->queueUrl, 'ReceiptHandle' => $message['ReceiptHandle']]);
124
            }
125
        }
126
    }
127
}
128