IronMqProvider::__construct()   A
last analyzed

Complexity

Conditions 1
Paths 1

Size

Total Lines 8
Code Lines 6

Duplication

Lines 8
Ratio 100 %

Code Coverage

Tests 7
CRAP Score 1

Importance

Changes 0
Metric Value
c 0
b 0
f 0
dl 8
loc 8
ccs 7
cts 7
cp 1
rs 9.4285
cc 1
eloc 6
nc 1
nop 5
crap 1
1
<?php
2
3
/**
4
 * Copyright 2014 Underground Elephant
5
 *
6
 * Licensed under the Apache License, Version 2.0 (the "License");
7
 * you may not use this file except in compliance with the License.
8
 * You may obtain a copy of the License at
9
 *
10
 *     http://www.apache.org/licenses/LICENSE-2.0
11
 *
12
 * Unless required by applicable law or agreed to in writing, software
13
 * distributed under the License is distributed on an "AS IS" BASIS,
14
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15
 * See the License for the specific language governing permissions and
16
 * limitations under the License.
17
 *
18
 * @package     qpush-bundle
19
 * @copyright   Underground Elephant 2014
20
 * @license     Apache License, Version 2.0
21
 */
22
23
namespace Uecode\Bundle\QPushBundle\Provider;
24
25
use IronMQ\IronMQ;
26
use Doctrine\Common\Cache\Cache;
27
use Monolog\Logger;
28
use Symfony\Component\EventDispatcher\EventDispatcherInterface;
29
use Uecode\Bundle\QPushBundle\Event\Events;
30
use Uecode\Bundle\QPushBundle\Event\MessageEvent;
31
use Uecode\Bundle\QPushBundle\Event\NotificationEvent;
32
use Uecode\Bundle\QPushBundle\Message\Message;
33
34
/**
35
 * @author Keith Kirk <[email protected]>
36
 */
37
class IronMqProvider extends AbstractProvider
38
{
39
    /**
40
     * IronMQ Client
41
     *
42
     * @var IronMQ
43
     */
44
    private $ironmq;
45
46
    /**
47
     * IronMQ Queue
48
     *
49
     * @var object
50
     */
51
    private $queue;
52
53
    /**
54
     * @var Message[]
55
     */
56
    private $reservedMessages = [];
57
58 10 View Code Duplication
    public function __construct($name, array $options, $client, Cache $cache, Logger $logger)
0 ignored issues
show
Duplication introduced by
This method seems to be duplicated in your project.

Duplicated code is one of the most pungent code smells. If you need to duplicate the same code in three or more different places, we strongly encourage you to look into extracting the code into a single class or operation.

You can also find more detailed suggestions in the “Code” section of your repository.

Loading history...
59
    {
60 10
        $this->name     = $name;
61 10
        $this->options  = $options;
62 10
        $this->ironmq   = $client;
63 10
        $this->cache    = $cache;
64 10
        $this->logger   = $logger;
65 10
    }
66
67 1
    public function getProvider()
68
    {
69 1
        return "IronMQ";
70
    }
71
72
    /**
73
     * {@inheritDoc}
74
     */
75 4
    public function create()
76
    {
77 4
        if ($this->options['push_notifications']) {
78
            $params = [
79 3
                'type' => $this->options['push_type'],
80
                'push'      => [
81 3
                    'rate_limit'    => $this->options['rate_limit'],
82 3
                    'retries'       => $this->options['notification_retries'],
83 3
                    'retries_delay' => $this->options['notification_retries_delay'],
84
                    'subscribers'   => []
85
                ]
86
            ];
87
88 3
            foreach ($this->options['subscribers'] as $subscriber) {
89 3
                if ($subscriber['protocol'] == "email") {
90 1
                    throw new \InvalidArgumentException(
91 1
                        'IronMQ only supports `http` or `https` subscribers!'
92
                    );
93
                }
94
95 3
                $params['push']['subscribers'][] = ['url' => $subscriber['endpoint']];
96
            }
97
98
        } else {
99 1
            $params = ['type' => 'pull'];
100
        }
101
102 4
        $queueName = $this->getNameWithPrefix();
103
104 4
        $this->queue = $this->ironmq->createQueue($queueName, $params);
105
106 4
        $this->cache->save($queueName, json_encode($this->queue));
107
108 4
        $this->log(200, "Queue has been created.", $params);
109
110 4
        return true;
111
    }
112
113
    /**
114
     * {@inheritDoc}
115
     */
116 1
    public function destroy()
117
    {
118
        // Catch `queue not found` exceptions, throw the rest.
119 1
        $queueName = $this->getNameWithPrefix();
120
        try {
121 1
            $this->ironmq->deleteQueue($queueName);
122 1
            $this->queue = null;
123 1
        } catch ( \Exception $e) {
124 1 View Code Duplication
            if (false !== strpos($e->getMessage(), "Queue not found")) {
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated across your project.

Duplicated code is one of the most pungent code smells. If you need to duplicate the same code in three or more different places, we strongly encourage you to look into extracting the code into a single class or operation.

You can also find more detailed suggestions in the “Code” section of your repository.

Loading history...
125 1
                $this->log(400, "Queue did not exist");
126
            } else {
127 1
                throw $e;
128
            }
129
        }
130
131 1
        $this->cache->delete($queueName);
132
133 1
        $this->log(200, "Queue has been destroyed.");
134
135 1
        return true;
136
    }
137
138
    /**
139
     * {@inheritDoc}
140
     *
141
     * @return int
142
     */
143 1
    public function publish(array $message, array $options = [])
144
    {
145 1
        $options      = $this->mergeOptions($options);
146 1
        $publishStart = microtime(true);
147
148 1
        if (!$this->queueExists()) {
149 1
            $this->create();
150
        }
151
152 1
        $result = $this->ironmq->postMessage(
153 1
            $this->getNameWithPrefix(),
154 1
            json_encode($message + ['_qpush_queue' => $this->name]),
155
            [
156 1
                'timeout'       => $options['message_timeout'],
157 1
                'delay'         => $options['message_delay'],
158 1
                'expires_in'    => $options['message_expiration']
159
            ]
160
        );
161
162
        $context = [
163 1
            'message_id'    => $result->id,
164 1
            'publish_time'  => microtime(true) - $publishStart
165
        ];
166 1
        $this->log(200, "Message has been published.", $context);
167
168 1
        return (int) $result->id;
169
    }
170
171
    /**
172
     * {@inheritDoc}
173
     */
174 1
    public function receive(array $options = [])
175
    {
176 1
        $options = $this->mergeOptions($options);
177
178 1
        if (!$this->queueExists()) {
179 1
            $this->create();
180
        }
181
182 1
        $messages = $this->ironmq->reserveMessages(
183 1
            $this->getNameWithPrefix(),
184 1
            $options['messages_to_receive'],
185 1
            $options['message_timeout'],
186 1
            $options['receive_wait_time']
187
        );
188
189 1
        if (!is_array($messages)) {
190
            $this->log(200, "No messages found in queue.");
191
192
            return [];
193
        }
194
195
        // Convert to Message Class
196 1
        foreach ($messages as &$message) {
197 1
            $id         = $message->id;
198 1
            $body       = json_decode($message->body, true);
199
            $metadata   = [
200 1
                'reserved_count' => $message->reserved_count,
201 1
                'reservation_id' => $message->reservation_id
202
            ];
203
204 1
            unset($body['_qpush_queue']);
205
206 1
            $message = new Message($id, json_encode($body), $metadata);
207
208 1
            $this->log(200, "Message has been received.", ['message_id' => $id]);
209
        }
210
211 1
        $this->reservedMessages = array_combine(array_values(array_map(function (Message $message) {
212 1
            return $message->getId();
213 1
        }, $messages)), $messages);
214
215 1
        return $messages;
216
    }
217
218
    /**
219
     * {@inheritDoc}
220
     */
221 2
    public function delete($id)
222
    {
223 2
        $reservationId = $this->getReservationId($id);
224
225
        try {
226 2
            $this->ironmq->deleteMessage($this->getNameWithPrefix(), $id, $reservationId);
227 2
            $this->log(200, "Message deleted.", ['message_id' => $id]);
228 1
        } catch ( \Exception $e) {
229 1 View Code Duplication
            if (false !== strpos($e->getMessage(), "Queue not found")) {
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated across your project.

Duplicated code is one of the most pungent code smells. If you need to duplicate the same code in three or more different places, we strongly encourage you to look into extracting the code into a single class or operation.

You can also find more detailed suggestions in the “Code” section of your repository.

Loading history...
230 1
                $this->log(400, "Queue did not exist");
231
            } else {
232 1
                throw $e;
233
            }
234
        }
235
236 2
        return true;
237
    }
238
239
    /**
240
     * Checks whether or not the Queue exists
241
     *
242
     * This method relies on in-memory cache and the Cache provider
243
     * to reduce the need to needlessly call the create method on an existing
244
     * Queue.
245
     * @return bool
246
     * @throws \Exception
247
     */
248 4
    public function queueExists()
249
    {
250 4
        if (isset($this->queue)) {
251 2
            return true;
252
        }
253
254 4
        $queueName = $this->getNameWithPrefix();
255 4
        if ($this->cache->contains($queueName)) {
256
            $this->queue = json_decode($this->cache->fetch($queueName));
257
258
            return true;
259
        }
260
        try {
261 4
            $this->queue = $this->ironmq->getQueue($queueName);
262 4
            $this->cache->save($queueName, json_encode($this->queue));
263
        } catch (\Exception $e) {
264 View Code Duplication
            if (false !== strpos($e->getMessage(), "Queue not found")) {
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated across your project.

Duplicated code is one of the most pungent code smells. If you need to duplicate the same code in three or more different places, we strongly encourage you to look into extracting the code into a single class or operation.

You can also find more detailed suggestions in the “Code” section of your repository.

Loading history...
265
                $this->log(400, "Queue did not exist");
266
            } else {
267
                throw $e;
268
            }
269
        }
270
271 4
        return false;
272
    }
273
274
    /**
275
     * Polls the Queue on Notification from IronMQ
276
     *
277
     * Dispatches the `{queue}.message_received` event
278
     *
279
     * @param NotificationEvent $event The Notification Event
280
     * @param string $eventName Name of the event
281
     * @param EventDispatcherInterface $dispatcher
282
     * @return void
283
     */
284 1
    public function onNotification(NotificationEvent $event, $eventName, EventDispatcherInterface $dispatcher)
285
    {
286 1
        $message = new Message(
287 1
            $event->getNotification()->getId(),
0 ignored issues
show
Bug introduced by
The method getId cannot be called on $event->getNotification() (of type array).

Methods can only be called on objects. This check looks for methods being called on variables that have been inferred to never be objects.

Loading history...
288 1
            $event->getNotification()->getBody(),
0 ignored issues
show
Bug introduced by
The method getBody cannot be called on $event->getNotification() (of type array).

Methods can only be called on objects. This check looks for methods being called on variables that have been inferred to never be objects.

Loading history...
289 1
            $event->getNotification()->getMetadata()->toArray()
0 ignored issues
show
Bug introduced by
The method getMetadata cannot be called on $event->getNotification() (of type array).

Methods can only be called on objects. This check looks for methods being called on variables that have been inferred to never be objects.

Loading history...
290
        );
291
292 1
        $this->log(
293 1
            200,
294 1
            "Message has been received from Push Notification.",
295 1
            ['message_id' => $event->getNotification()->getId()]
0 ignored issues
show
Bug introduced by
The method getId cannot be called on $event->getNotification() (of type array).

Methods can only be called on objects. This check looks for methods being called on variables that have been inferred to never be objects.

Loading history...
296
        );
297
298 1
        $messageEvent = new MessageEvent($this->name, $message);
299
300 1
        $dispatcher->dispatch(
301 1
            Events::Message($this->name),
302 1
            $messageEvent
303
        );
304 1
    }
305
306
    /**
307
     * Removes the message from queue after all other listeners have fired
308
     *
309
     * If an earlier listener has failed or stopped propagation, this method
310
     * will not fire and the Queued Message should become visible in queue again.
311
     *
312
     * Stops Event Propagation after removing the Message
313
     *
314
     * @param MessageEvent $event The SQS Message Event
315
     * @return void
316
     */
317 1
    public function onMessageReceived(MessageEvent $event)
318
    {
319 1
        $metadata = $event->getMessage()->getMetadata();
320
321 1
        if (!$metadata->containsKey('iron-subscriber-message-id')) {
322 1
            $id = $event->getMessage()->getId();
323 1
            $this->delete($id);
324
        }
325
326 1
        $event->stopPropagation();
327 1
    }
328
329
    /**
330
     * Get queue info
331
     *
332
     * This allows to get queue size. Allowing to know if processing is finished or not
333
     *
334
     * @return mixed
335
     */
336 1
    public function queueInfo()
337
    {
338 1
        if ($this->queueExists()) {
339 1
            $queueName = $this->getNameWithPrefix();
340 1
            $this->queue = $this->ironmq->getQueue($queueName);
341
342 1
            return $this->queue;
343
        }
344
345 1
        return null;
346
    }
347
348
    /**
349
     * Publishes multiple message at once
350
     *
351
     * @param array $messages
352
     * @param array $options
353
     *
354
     * @return array
355
     */
356
    public function publishMessages(array $messages, array $options = [])
357
    {
358
        $options      = $this->mergeOptions($options);
359
        $publishStart = microtime(true);
360
361
        if (!$this->queueExists()) {
362
            $this->create();
363
        }
364
365
        $encodedMessages = [];
366
        foreach ($messages as $message) {
367
            $encodedMessages[] = json_encode($message + ['_qpush_queue' => $this->name]);
368
        }
369
370
        $result = $this->ironmq->postMessages(
371
            $this->getNameWithPrefix(),
372
            $encodedMessages,
373
            [
374
                'timeout'       => $options['message_timeout'],
375
                'delay'         => $options['message_delay'],
376
                'expires_in'    => $options['message_expiration']
377
            ]
378
        );
379
380
        $context = [
381
            'message_ids'    => $result->ids,
382
            'publish_time'  => microtime(true) - $publishStart
383
        ];
384
        $this->log(200, "Messages have been published.", $context);
385
386
        return $result->ids;
387
    }
388
389
    /**
390
     * @param $id
391
     * @return string|null
392
     */
393 2
    private function getReservationId($id)
394
    {
395 2
        if (!array_key_exists($id, $this->reservedMessages)) {
396 2
            return null;
397
        }
398
399
        $messageToDelete = $this->reservedMessages[$id];
400
        if (!$messageToDelete->getMetadata()->containsKey('reservation_id')) {
401
            return null;
402
        }
403
404
        return $messageToDelete->getMetadata()->get('reservation_id');
405
    }
406
}
407