Completed
Push — master ( c2ba48...1fd225 )
by Markus
14s queued 12s
created

QueueClient::receiveMessage()   A

Complexity

Conditions 2
Paths 2

Size

Total Lines 17
Code Lines 9

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
eloc 9
dl 0
loc 17
rs 9.9666
c 0
b 0
f 0
cc 2
nc 2
nop 1
1
<?php
2
3
namespace Jellyfish\QueueSQS;
4
5
use Aws\Sqs\SqsClient;
6
use Jellyfish\Queue\MessageInterface;
7
use Jellyfish\Queue\MessageMapperInterface;
8
use Jellyfish\Queue\QueueClientInterface;
9
use Jellyfish\QueueSQS\Exception\CreateQueueException;
10
11
class QueueClient implements QueueClientInterface
12
{
13
    /**
14
     * @var \Aws\Sqs\SqsClient
15
     */
16
    protected $sqsClient;
17
18
    /**
19
     * @var \Jellyfish\Queue\MessageMapperInterface
20
     */
21
    protected $messageMapper;
22
23
    /**
24
     * @param \Aws\Sqs\SqsClient $sqsClient
25
     * @param MessageMapperInterface $messageMapper
26
     */
27
    public function __construct(
28
        SqsClient $sqsClient,
29
        MessageMapperInterface $messageMapper
30
    ) {
31
        $this->sqsClient = $sqsClient;
32
        $this->messageMapper = $messageMapper;
33
    }
34
35
36
    /**
37
     * @param string $queueName
38
     *
39
     * @return string
40
     *
41
     * @throws \Jellyfish\QueueSQS\Exception\CreateQueueException
42
     */
43
    protected function declareQueue(string $queueName): string
44
    {
45
        $queueUrl = $this->getQueueUrl($queueName);
46
47
        if ($queueUrl === null) {
48
            $queueUrl = $this->createQueue($queueName);
49
        }
50
51
        return $queueUrl;
52
    }
53
54
    /**
55
     * @param string $queueName
56
     *
57
     * @return string
58
     */
59
    protected function getQueueUrl(string $queueName): ?string
60
    {
61
        $args = [
62
            'QueueName' => $queueName
63
        ];
64
65
        $result = $this->sqsClient->getQueueUrl($args);
66
67
        if (!$result->hasKey('QueueUrl')) {
68
            return null;
69
        }
70
71
        return $result->get('QueueUrl');
72
    }
73
74
    /**
75
     * @param string $queueName
76
     *
77
     * @return string
78
     *
79
     * @throws \Jellyfish\QueueSQS\Exception\CreateQueueException
80
     */
81
    protected function createQueue(string $queueName): string
82
    {
83
        $args = [
84
            'QueueName' => $queueName
85
        ];
86
87
        $result = $this->sqsClient->createQueue($args);
88
89
        if (!$result->hasKey('QueueUrl')) {
90
            throw new CreateQueueException(sprintf('Could not create queue "%s".', $queueName));
91
        }
92
93
        return $result->get('QueueUrl');
0 ignored issues
show
Bug Best Practice introduced by
The expression return $result->get('QueueUrl') could return the type null which is incompatible with the type-hinted return string. Consider adding an additional type-check to rule them out.
Loading history...
94
    }
95
96
    /**
97
     * @param string $queueName
98
     *
99
     * @return \Jellyfish\Queue\MessageInterface|null
100
     *
101
     * @throws \Jellyfish\QueueSQS\Exception\CreateQueueException
102
     */
103
    public function receiveMessage(string $queueName): ?MessageInterface
104
    {
105
        $queueUrl = $this->declareQueue($queueName);
106
107
        $args = [
108
            'QueueUrl' => $queueUrl,
109
            'MaxNumberOfMessages' => 1
110
        ];
111
112
        $result = $this->sqsClient->receiveMessage($args);
113
        $messageAsJson = $result->search('Messages[0].Body');
114
115
        if ($messageAsJson === null) {
116
            return null;
117
        }
118
119
        return $this->messageMapper->fromJson($messageAsJson);
120
    }
121
122
    /**
123
     * @param string $queueName
124
     * @param \Jellyfish\Queue\MessageInterface $message
125
     *
126
     * @return \Jellyfish\Queue\QueueClientInterface
127
     *
128
     * @throws \Jellyfish\QueueSQS\Exception\CreateQueueException
129
     */
130
    public function sendMessage(string $queueName, MessageInterface $message): QueueClientInterface
131
    {
132
        $queueUrl = $this->declareQueue($queueName);
133
        $json = $this->messageMapper->toJson($message);
134
135
        $args = [
136
            'QueueUrl' => $queueUrl,
137
            'MessageBody' => $json
138
        ];
139
140
        $this->sqsClient->sendMessage($args);
141
142
        return $this;
143
    }
144
}
145