Completed
Push — master ( a4a48b...00b680 )
by Tomas
26:02 queued 10:53
created

AmazonSqsDriver::send()   A

Complexity

Conditions 1
Paths 1

Size

Total Lines 8

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 0
CRAP Score 2

Importance

Changes 0
Metric Value
dl 0
loc 8
ccs 0
cts 4
cp 0
rs 10
c 0
b 0
f 0
cc 1
nc 1
nop 2
crap 2
1
<?php
2
declare(strict_types=1);
3
4
namespace Tomaj\Hermes\Driver;
5
6
use Closure;
7
use Exception;
8
use Tomaj\Hermes\Dispatcher;
9
use Tomaj\Hermes\MessageInterface;
10
use Tomaj\Hermes\MessageSerializer;
11
use Aws\Sqs\SqsClient;
12
13
class AmazonSqsDriver implements DriverInterface
14
{
15
    use MaxItemsTrait;
16
    use RestartTrait;
17
    use SerializerAwareTrait;
18
19
    /**
20
     * @var SqsClient
21
     */
22
    private $client;
23
24
    /**
25
     * @var string
26
     */
27
    private $queueName;
28
29
    /**
30
     * string
31
     */
32
    private $queueUrl;
33
34
    /**
35
     * integer
36
     */
37
    private $sleepInterval = 0;
38
39
    /**
40
     * Create new Amazon SQS driver.
41
     *
42
     * You have to create aws client instnace and provide it to this driver.
43
     * You can use service builder or factory method.
44
     *
45
     * <code>
46
     *  use Aws\Sqs\SqsClient;
47
     *
48
     *  $client = SqsClient::factory(array(
49
     *    'profile' => '<profile in your aws credentials file>',
50
     *    'region'  => '<region name>'
51
     *  ));
52
     * </code>
53
     *
54
     * or
55
     *
56
     * <code>
57
     * use Aws\Common\Aws;
58
     *
59
     * // Create a service builder using a configuration file
60
     * $aws = Aws::factory('/path/to/my_config.json');
61
     *
62
     * // Get the client from the builder by namespace
63
     * $client = $aws->get('Sqs');
64
     * </code>
65
     *
66
     * More examples see: https://docs.aws.amazon.com/aws-sdk-php/v2/guide/service-sqs.html
67
     *
68
     *
69
     * @see examples/sqs folder
70
     *
71
     * @param SqsClient     $client
72
     * @param string        $queueName
73
     * @param array         $queueAttributes
74
     */
75
    public function __construct(SqsClient $client, string $queueName, array $queueAttributes = [])
76
    {
77
        $this->client = $client;
78
        $this->queueName = $queueName;
79
        $this->serializer = new MessageSerializer();
80
81
        $result = $client->createQueue([
82
            'QueueName' => $queueName,
83
            'Attributes' => $queueAttributes,
84
        ]);
85
86
        $this->queueUrl = $result->get('QueueUrl');
87
    }
88
89
    /**
90
     * {@inheritdoc}
91
     */
92
    public function send(MessageInterface $message, int $priority = Dispatcher::PRIORITY_MEDIUM): bool
93
    {
94
        $this->client->sendMessage([
95
            'QueueUrl' => $this->queueUrl,
96
            'MessageBody' => $this->serializer->serialize($message),
97
        ]);
98
        return true;
99
    }
100
101
    public function setupPriorityQueue(string $name, int $priority): void
102
    {
103
        throw new \Exception("AmazonSQS is not supporting priority queues now");
104
    }
105
106
    /**
107
     * {@inheritdoc}
108
     */
109
    public function wait(Closure $callback, array $priorities = []): void
110
    {
111
        while (true) {
112
            $this->checkRestart();
113
            if (!$this->shouldProcessNext()) {
114
                break;
115
            }
116
117
            $result = $this->client->receiveMessage([
118
                'QueueUrl' => $this->queueUrl,
119
                'WaitTimeSeconds' => 1,
120
            ]);
121
122
            $messages = $result['Messages'];
123
124
            if ($messages) {
125
                $hermesMessages = [];
126
                foreach ($messages as $message) {
127
                    $this->client->deleteMessage([
128
                        'QueueUrl' => $this->queueUrl,
129
                        'ReceiptHandle' => $message['ReceiptHandle'],
130
                    ]);
131
                    $hermesMessages[] = $this->serializer->unserialize($message['Body']);
132
                }
133
                foreach ($hermesMessages as $hermesMessage) {
134
                    $callback($hermesMessage);
135
                    $this->incrementProcessedItems();
136
                }
137
            } else {
138
                if ($this->sleepInterval) {
139
                    $this->checkRestart();
140
                    sleep($this->sleepInterval);
141
                }
142
            }
143
        }
144
    }
145
}
146