1 | <?php |
||
15 | class SqsDriver extends AbstractPersistanceDriver |
||
16 | { |
||
17 | /** |
||
18 | * @var SqsClient |
||
19 | */ |
||
20 | private $client; |
||
21 | |||
22 | /** |
||
23 | * @var array |
||
24 | */ |
||
25 | private $queueUrls; |
||
26 | |||
27 | /** |
||
28 | * @param SqsClient $client |
||
29 | * @param Serializer $serializer |
||
30 | * @param array $queueUrls |
||
31 | */ |
||
32 | 12 | public function __construct(SqsClient $client, Serializer $serializer, array $queueUrls = []) |
|
33 | { |
||
34 | 12 | parent::__construct($serializer); |
|
35 | 12 | $this->client = $client; |
|
36 | 12 | $this->queueUrls = $queueUrls; |
|
37 | 12 | } |
|
38 | |||
39 | /** |
||
40 | * {@inheritdoc} |
||
41 | */ |
||
42 | public static function allowedClasses() |
||
43 | { |
||
44 | $cls = parent::allowedClasses(); |
||
45 | $cls[] = SqsEnvelope::class; |
||
46 | return $cls; |
||
47 | } |
||
48 | |||
49 | /** |
||
50 | * @inheritDoc |
||
51 | */ |
||
52 | 1 | public function enqueue(string $queueName, Message $message): Envelope |
|
53 | { |
||
54 | 1 | $queueUrl = $this->getQueueUrl($queueName); |
|
55 | |||
56 | 1 | $env = new DefaultEnvelope($message); |
|
57 | 1 | $data = $this->serialize($env); |
|
58 | |||
59 | 1 | $result = $this->client->sendMessage([ |
|
60 | 1 | 'QueueUrl' => $queueUrl, |
|
61 | 1 | 'MessageBody' => $data, |
|
62 | ]); |
||
63 | |||
64 | 1 | return new SqsEnvelope($result['MessageId'], $env); |
|
65 | } |
||
66 | |||
67 | /** |
||
68 | * @inheritDoc |
||
69 | */ |
||
70 | 2 | public function dequeue(string $queueName) |
|
93 | |||
94 | /** |
||
95 | * @inheritDoc |
||
96 | */ |
||
97 | 4 | public function ack(string $queueName, Envelope $envelope) |
|
98 | { |
||
99 | 4 | if ( ! $envelope instanceof SqsEnvelope) { |
|
100 | 1 | throw new InvalidEnvelope(sprintf( |
|
101 | 1 | '%s requires that envelopes be instances of "%s", got "%s"', |
|
102 | 1 | __CLASS__, |
|
103 | 1 | SqsEnvelope::class, |
|
104 | 1 | get_class($envelope) |
|
105 | )); |
||
106 | } |
||
107 | |||
108 | 3 | $queueUrl = $this->getQueueUrl($queueName); |
|
109 | |||
110 | 3 | $this->client->deleteMessage([ |
|
111 | 3 | 'QueueUrl' => $queueUrl, |
|
112 | 3 | 'ReceiptHandle' => $envelope->getReceiptHandle(), |
|
113 | ]); |
||
114 | 3 | } |
|
115 | |||
116 | /** |
||
117 | * @inheritDoc |
||
118 | */ |
||
119 | 1 | public function retry(string $queueName, Envelope $envelope) : Envelope |
|
120 | { |
||
121 | 1 | return $envelope->retry(); |
|
122 | } |
||
123 | |||
124 | /** |
||
125 | * @inheritDoc |
||
126 | */ |
||
127 | 1 | public function fail(string $queueName, Envelope $envelope) |
|
131 | |||
132 | /** |
||
133 | * Returns queue url |
||
134 | * |
||
135 | * @param string $queueName The name of the queue |
||
136 | * |
||
137 | * @return string The queue url |
||
138 | */ |
||
139 | 9 | public function getQueueUrl($queueName) |
|
153 | |||
154 | /** |
||
155 | * Release a message back to a ready state. This is used by the consumer |
||
156 | * when it skips the retry system. This may happen if the consumer receives |
||
157 | * a signal and has to exit early. |
||
158 | * |
||
159 | * @param $queueName The queue from which the message came |
||
160 | * @param $envelope The message to release, should be the same instance |
||
161 | * returned from `dequeue` |
||
162 | * |
||
163 | * @throws Exception\DriverError if something goes wrong |
||
164 | * @return void |
||
165 | */ |
||
166 | 1 | public function release(string $queueName, Envelope $envelope) |
|
170 | } |
||
171 |