Completed
Push — master ( eef2d6...f1adda )
by Keith
03:34
created

IronMqProvider::queueInfo()   A

Complexity

Conditions 2
Paths 2

Size

Total Lines 11
Code Lines 6

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 6
CRAP Score 2

Importance

Changes 1
Bugs 0 Features 1
Metric Value
c 1
b 0
f 1
dl 0
loc 11
ccs 6
cts 6
cp 1
rs 9.4286
cc 2
eloc 6
nc 2
nop 0
crap 2
1
<?php
2
3
/**
4
 * Copyright 2014 Underground Elephant
5
 *
6
 * Licensed under the Apache License, Version 2.0 (the "License");
7
 * you may not use this file except in compliance with the License.
8
 * You may obtain a copy of the License at
9
 *
10
 *     http://www.apache.org/licenses/LICENSE-2.0
11
 *
12
 * Unless required by applicable law or agreed to in writing, software
13
 * distributed under the License is distributed on an "AS IS" BASIS,
14
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15
 * See the License for the specific language governing permissions and
16
 * limitations under the License.
17
 *
18
 * @package     qpush-bundle
19
 * @copyright   Underground Elephant 2014
20
 * @license     Apache License, Version 2.0
21
 */
22
23
namespace Uecode\Bundle\QPushBundle\Provider;
24
25
use IronMQ\IronMQ;
26
use Doctrine\Common\Cache\Cache;
27
use Symfony\Bridge\Monolog\Logger;
28
use Symfony\Component\EventDispatcher\EventDispatcherInterface;
29
use Uecode\Bundle\QPushBundle\Event\Events;
30
use Uecode\Bundle\QPushBundle\Event\MessageEvent;
31
use Uecode\Bundle\QPushBundle\Event\NotificationEvent;
32
use Uecode\Bundle\QPushBundle\Message\Message;
33
34
/**
35
 * @author Keith Kirk <[email protected]>
36
 */
37
class IronMqProvider extends AbstractProvider
38
{
39
    /**
40
     * IronMQ Client
41
     *
42
     * @var IronMQ
43
     */
44
    private $ironmq;
45
46
    /**
47
     * IronMQ Queue
48
     *
49
     * @var stdObject
50
     */
51
    private $queue;
52
53 10 View Code Duplication
    public function __construct($name, array $options, $client, Cache $cache, Logger $logger)
0 ignored issues
show
Duplication introduced by
This method seems to be duplicated in your project.

Duplicated code is one of the most pungent code smells. If you need to duplicate the same code in three or more different places, we strongly encourage you to look into extracting the code into a single class or operation.

You can also find more detailed suggestions in the “Code” section of your repository.

Loading history...
54
    {
55 10
        $this->name     = $name;
56 10
        $this->options  = $options;
57 10
        $this->ironmq   = $client;
58 10
        $this->cache    = $cache;
59 10
        $this->logger   = $logger;
60 10
    }
61
62 1
    public function getProvider()
63
    {
64 1
        return "IronMQ";
65
    }
66
67
    /**
68
     * {@inheritDoc}
69
     */
70 4
    public function create()
71
    {
72 4
        if ($this->options['push_notifications']) {
73
            $params = [
74 3
                'type' => $this->options['push_type'],
75
                'push'      => [
76 3
                    'rate_limit'    => $this->options['rate_limit'],
77 3
                    'retries'       => $this->options['notification_retries'],
78 3
                    'retries_delay' => $this->options['notification_retries_delay'],
79 3
                    'subscribers'   => []
80 3
                ]
81 3
            ];
82
83 3
            foreach ($this->options['subscribers'] as $subscriber) {
84 3
                if ($subscriber['protocol'] == "email") {
85 1
                    throw new \InvalidArgumentException(
86
                        'IronMQ only supports `http` or `https` subscribers!'
87 1
                    );
88
                }
89
90 3
                $params['push']['subscribers'][] = ['url' => $subscriber['endpoint']];
91 3
            }
92
93 3
        } else {
94 1
            $params = ['push_type' => 'pull'];
95
        }
96
97 4
        $result = $this->ironmq->createQueue($this->getNameWithPrefix(), $params);
98 4
        $this->queue = $result;
99
100 4
        $key = $this->getNameWithPrefix();
101 4
        $this->cache->save($key, json_encode($this->queue));
102
103 4
        $this->log(200, "Queue has been created.", $params);
104
105 4
        return true;
106
    }
107
108
    /**
109
     * {@inheritDoc}
110
     */
111 1
    public function destroy()
112
    {
113
        // Catch `queue not found` exceptions, throw the rest.
114
        try {
115 1
            $this->ironmq->deleteQueue($this->getNameWithPrefix());
116 1
        } catch ( \Exception $e) {
117 1 View Code Duplication
            if (false !== strpos($e->getMessage(), "Queue not found")) {
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated across your project.

Duplicated code is one of the most pungent code smells. If you need to duplicate the same code in three or more different places, we strongly encourage you to look into extracting the code into a single class or operation.

You can also find more detailed suggestions in the “Code” section of your repository.

Loading history...
118 1
                $this->log(400, "Queue did not exist");
119 1
            } else {
120 1
                throw $e;
121
            }
122
        }
123
124 1
        $key = $this->getNameWithPrefix();
125 1
        $this->cache->delete($key);
126
127 1
        $this->log(200, "Queue has been destroyed.");
128
129 1
        return true;
130
    }
131
132
    /**
133
     * {@inheritDoc}
134
     *
135
     * @return int
136
     */
137 1
    public function publish(array $message, array $options = [])
138
    {
139 1
        $options      = $this->mergeOptions($options);
140 1
        $publishStart = microtime(true);
141
142 1
        if (!$this->queueExists()) {
143 1
            $this->create();
144 1
        }
145
146 1
        $result = $this->ironmq->postMessage(
147 1
            $this->getNameWithPrefix(),
148 1
            json_encode($message + ['_qpush_queue' => $this->name]),
149
            [
150 1
                'timeout'       => $options['message_timeout'],
151 1
                'delay'         => $options['message_delay'],
152 1
                'expires_in'    => $options['message_expiration']
153 1
            ]
154 1
        );
155
156
        $context = [
157 1
            'message_id'    => $result->id,
158 1
            'publish_time'  => microtime(true) - $publishStart
159 1
        ];
160 1
        $this->log(200, "Message has been published.", $context);
161
162 1
        return $result->id;
163
    }
164
165
    /**
166
     * {@inheritDoc}
167
     */
168 1
    public function receive(array $options = [])
169
    {
170 1
        $options = $this->mergeOptions($options);
171
172 1
        if (!$this->queueExists()) {
173 1
            $this->create();
174 1
        }
175
176 1
        $messages = $this->ironmq->getMessages(
0 ignored issues
show
Deprecated Code introduced by
The method IronMQ\IronMQ::getMessages() has been deprecated with message: Use reserveMessages instead

This method has been deprecated. The supplier of the class has supplied an explanatory message.

The explanatory message should give you some clue as to whether and when the method will be removed from the class and what other method or class to use instead.

Loading history...
177 1
            $this->getNameWithPrefix(),
178 1
            $options['messages_to_receive'],
179 1
            $options['message_timeout'],
180 1
            $options['receive_wait_time']
181 1
        );
182
183 1
        if (!is_array($messages)) {
184
            $this->log(200, "No messages found in queue.");
185
186
            return [];
187
        }
188
189
        // Convert to Message Class
190 1
        foreach ($messages as &$message) {
191 1
            $id         = $message->id;
192 1
            $body       = json_decode($message->body, true);
193
            $metadata   = [
194 1
                'timeout'           => $message->timeout,
195 1
                'reserved_count'    => $message->reserved_count,
196 1
                'push_status'       => $message->push_status
197 1
            ];
198
199 1
            unset($body['_qpush_queue']);
200
201 1
            $message = new Message($id, json_encode($body), $metadata);
202
203 1
            $this->log(200, "Message has been received.", ['message_id' => $id]);
204 1
        }
205
206 1
        return $messages;
207
    }
208
209
    /**
210
     * {@inheritDoc}
211
     */
212 2
    public function delete($id)
213
    {
214
        try {
215 2
            $result = $this->ironmq->deleteMessage($this->getNameWithPrefix(), $id);
0 ignored issues
show
Unused Code introduced by
$result is not used, you could remove the assignment.

This check looks for variable assignements that are either overwritten by other assignments or where the variable is not used subsequently.

$myVar = 'Value';
$higher = false;

if (rand(1, 6) > 3) {
    $higher = true;
} else {
    $higher = false;
}

Both the $myVar assignment in line 1 and the $higher assignment in line 2 are dead. The first because $myVar is never used and the second because $higher is always overwritten for every possible time line.

Loading history...
216 2
            $this->log(200, "Message deleted.", ['message_id' => $id]);
217 2
        } catch ( \Exception $e) {
218 1 View Code Duplication
            if (false !== strpos($e->getMessage(), "Queue not found")) {
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated across your project.

Duplicated code is one of the most pungent code smells. If you need to duplicate the same code in three or more different places, we strongly encourage you to look into extracting the code into a single class or operation.

You can also find more detailed suggestions in the “Code” section of your repository.

Loading history...
219 1
                $this->log(400, "Queue did not exist");
220 1
            } else {
221 1
                throw $e;
222
            }
223
        }
224
225 2
        return true;
226
    }
227
228
    /**
229
     * Checks whether or not the Queue exsits
230
     *
231
     * This method relies on in-memory cache and the Cache provider
232
     * to reduce the need to needlessly call the create method on an existing
233
     * Queue.
234
     *
235
     * @return Boolean
236
     */
237 4
    public function queueExists()
238
    {
239 4
        if (isset($this->queue)) {
240 2
            return true;
241
        }
242
243 4
        $key = $this->getNameWithPrefix();
244 4
        if ($this->cache->contains($key)) {
245 1
            $this->queue = json_decode($this->cache->fetch($key));
246
247 1
            return true;
248
        }
249
250 4
        return false;
251
    }
252
253
    /**
254
     * Polls the Queue on Notification from IronMQ
255
     *
256
     * Dispatches the `{queue}.message_received` event
257
     *
258
     * @param NotificationEvent $event The Notification Event
259
     * @param string $eventName Name of the event
260
     * @param EventDispatcherInterface $dispatcher
261
     * @return void
262
     */
263 1
    public function onNotification(NotificationEvent $event, $eventName, EventDispatcherInterface $dispatcher)
264
    {
265 1
        $message = new Message(
266 1
            $event->getNotification()->getId(),
0 ignored issues
show
Bug introduced by
The method getId cannot be called on $event->getNotification() (of type array).

Methods can only be called on objects. This check looks for methods being called on variables that have been inferred to never be objects.

Loading history...
267 1
            $event->getNotification()->getBody(),
0 ignored issues
show
Bug introduced by
The method getBody cannot be called on $event->getNotification() (of type array).

Methods can only be called on objects. This check looks for methods being called on variables that have been inferred to never be objects.

Loading history...
268 1
            $event->getNotification()->getMetadata()->toArray()
0 ignored issues
show
Bug introduced by
The method getMetadata cannot be called on $event->getNotification() (of type array).

Methods can only be called on objects. This check looks for methods being called on variables that have been inferred to never be objects.

Loading history...
269 1
        );
270
271 1
        $this->log(
272 1
            200,
273 1
            "Message has been received from Push Notification.",
274 1
            ['message_id' => $event->getNotification()->getId()]
0 ignored issues
show
Bug introduced by
The method getId cannot be called on $event->getNotification() (of type array).

Methods can only be called on objects. This check looks for methods being called on variables that have been inferred to never be objects.

Loading history...
275 1
        );
276
277 1
        $messageEvent = new MessageEvent($this->name, $message);
278
279 1
        $dispatcher->dispatch(
280 1
            Events::Message($this->name),
281
            $messageEvent
282 1
        );
283 1
    }
284
285
    /**
286
     * Removes the message from queue after all other listeners have fired
287
     *
288
     * If an earlier listener has errored or stopped propigation, this method
289
     * will not fire and the Queued Message should become visible in queue again.
290
     *
291
     * Stops Event Propagation after removing the Message
292
     *
293
     * @param MessageEvent $event The SQS Message Event
294
     * @return void
295
     */
296 1
    public function onMessageReceived(MessageEvent $event)
297
    {
298 1
        $metadata = $event->getMessage()->getMetadata();
299
300 1
        if (!$metadata->containsKey('iron-subscriber-message-id')) {
301 1
            $id = $event->getMessage()->getId();
302 1
            $this->delete($id);
303 1
        }
304
305 1
        $event->stopPropagation();
306 1
    }
307
308
    /**
309
     * Get queue info
310
     *
311
     * This allows to get queue size. Allowing to know if processing is finished or not
312
     *
313
     * @return stdObject|null
314
     */
315 1
    public function queueInfo()
316
    {
317 1
        if ($this->queueExists()) {
318 1
            $key = $this->getNameWithPrefix();
319 1
            $this->queue = $this->ironmq->getQueue($key);
320
321 1
            return $this->queue;
322
        }
323
324 1
        return null;
325
    }
326
327
    /**
328
     * Publishes multiple message at once
329
     *
330
     * @param array $messages
331
     * @param array $options
332
     *
333
     * @return array
334
     */
335
    public function publishMessages(array $messages, array $options = [])
336
    {
337
        $options      = $this->mergeOptions($options);
338
        $publishStart = microtime(true);
339
340
        if (!$this->queueExists()) {
341
            $this->create();
342
        }
343
344
        $encodedMessages = [];
345
        foreach ($messages as $message) {
346
            $encodedMessages[] = json_encode($message + ['_qpush_queue' => $this->name]);
347
        }
348
349
        $result = $this->ironmq->postMessages(
350
            $this->getNameWithPrefix(),
351
            $encodedMessages,
352
            [
353
                'timeout'       => $options['message_timeout'],
354
                'delay'         => $options['message_delay'],
355
                'expires_in'    => $options['message_expiration']
356
            ]
357
        );
358
359
        $context = [
360
            'message_ids'    => $result->ids,
361
            'publish_time'  => microtime(true) - $publishStart
362
        ];
363
        $this->log(200, "Messages have been published.", $context);
364
365
        return $result->ids;
366
    }
367
}
368