Failed Conditions
Pull Request — master (#46)
by
unknown
02:04
created

Queue::get()   C

Complexity

Conditions 14
Paths 101

Size

Total Lines 77
Code Lines 36

Duplication

Lines 7
Ratio 9.09 %

Importance

Changes 7
Bugs 0 Features 0
Metric Value
c 7
b 0
f 0
dl 7
loc 77
rs 5.1909
cc 14
eloc 36
nc 101
nop 4

How to fix   Long Method    Complexity   

Long Method

Small methods make your code easier to understand, in particular if combined with a good name. Besides, if your method is small, finding a good name is usually much easier.

For example, if you find yourself adding comments to a method's body, this is usually a good sign to extract the commented part to a new method, and use the comment as a starting point when coming up with a good name for this new method.

Commonly applied refactorings include:

1
<?php
2
/**
3
 * Defines the DominionEnterprises\Mongo\Queue class.
4
 */
5
6
namespace DominionEnterprises\Mongo;
7
8
/**
9
 * Abstraction of mongo db collection as priority queue.
10
 *
11
 * Tied priorities are ordered by time. So you may use a single priority for normal queuing (default args exist for
12
 * this purpose).  Using a random priority achieves random get()
13
 */
14
final class Queue implements QueueInterface
15
{
16
    const MONGO_INT32_MAX = 2147483647;//2147483648 can overflow in php mongo without using the MongoInt64
17
18
    /**
19
     * mongo collection to use for queue.
20
     *
21
     * @var \MongoDB\Collection
22
     */
23
    private $collection;
24
25
    /**
26
     * Construct queue.
27
     *
28
     * @param \MongoDB\Collection|string $collectionOrUrl A MongoCollection instance or the mongo connection url.
29
     * @param string $db the mongo db name
30
     * @param string $collection the collection name to use for the queue
31
     *
32
     * @throws \InvalidArgumentException $collectionOrUrl, $db or $collection was not a string
33
     */
34
    public function __construct($collectionOrUrl, $db = null, $collection = null)
35
    {
36
        if ($collectionOrUrl instanceof \MongoDB\Collection) {
37
            $this->collection = $collectionOrUrl;
38
            return;
39
        }
40
41
        if (!is_string($collectionOrUrl)) {
42
            throw new \InvalidArgumentException('$collectionOrUrl was not a string');
43
        }
44
45
        if (!is_string($db)) {
46
            throw new \InvalidArgumentException('$db was not a string');
47
        }
48
49
        if (!is_string($collection)) {
50
            throw new \InvalidArgumentException('$collection was not a string');
51
        }
52
53
        $mongo = new \MongoDB\Client($collectionOrUrl, [], ['typeMap' => ['root' => 'array', 'document' => 'array', 'array' => 'array']]);
54
        $mongoDb = $mongo->selectDatabase($db);
55
        $this->collection = $mongoDb->selectCollection($collection);
56
    }
57
58
    /**
59
     * Ensure an index for the get() method.
60
     *
61
     * @param array $beforeSort Fields in get() call to index before the sort field in same format
62
     *                          as \MongoDB\Collection::ensureIndex()
63
     * @param array $afterSort  Fields in get() call to index after the sort field in same format as
64
     *                          \MongoDB\Collection::ensureIndex()
65
     *
66
     * @return void
67
     *
68
     * @throws \InvalidArgumentException value of $beforeSort or $afterSort is not 1 or -1 for ascending and descending
69
     * @throws \InvalidArgumentException key in $beforeSort or $afterSort was not a string
70
     */
71
    public function ensureGetIndex(array $beforeSort = [], array $afterSort = [])
72
    {
73
        //using general rule: equality, sort, range or more equality tests in that order for index
74
        $completeFields = ['running' => 1];
75
76
        self::verifySort($beforeSort, 'beforeSort', $completeFields);
77
78
        $completeFields['priority'] = 1;
79
        $completeFields['created'] = 1;
80
81
        self::verifySort($afterSort, 'afterSort', $completeFields);
82
83
        $completeFields['earliestGet'] = 1;
84
85
        //for the main query in get()
86
        $this->ensureIndex($completeFields);
87
88
        //for the stuck messages query in get()
89
        $this->ensureIndex(['running' => 1, 'resetTimestamp' => 1]);
90
    }
91
92
    /**
93
     * Ensure an index for the count() method.
94
     * Is a no-op if the generated index is a prefix of an existing one. If you have a similar ensureGetIndex call,
95
     * call it first.
96
     *
97
     * @param array $fields fields in count() call to index in same format as \MongoDB\Collection::createIndex()
98
     * @param bool $includeRunning whether to include the running field in the index
99
     *
100
     * @return void
101
     *
102
     * @throws \InvalidArgumentException $includeRunning was not a boolean
103
     * @throws \InvalidArgumentException key in $fields was not a string
104
     * @throws \InvalidArgumentException value of $fields is not 1 or -1 for ascending and descending
105
     */
106
    public function ensureCountIndex(array $fields, $includeRunning)
107
    {
108
        if (!is_bool($includeRunning)) {
109
            throw new \InvalidArgumentException('$includeRunning was not a boolean');
110
        }
111
112
        $completeFields = [];
113
114
        if ($includeRunning) {
115
            $completeFields['running'] = 1;
116
        }
117
118
        self::verifySort($fields, 'fields', $completeFields);
119
120
        $this->ensureIndex($completeFields);
121
    }
122
123
    /**
124
     * Get a non running message from the queue.
125
     *
126
     * @param array $query in same format as \MongoDB\Collection::find() where top level fields do not contain operators.
127
     *                     Lower level fields can however. eg: valid {a: {$gt: 1}, "b.c": 3},
128
     *                     invalid {$and: [{...}, {...}]}
129
     * @param int $runningResetDuration second duration the message can stay unacked before it resets and can be
130
     *                                  retreived again.
131
     * @param int $waitDurationInMillis millisecond duration to wait for a message.
132
     * @param int $pollDurationInMillis millisecond duration to wait between polls.
133
     *
134
     * @return array|null the message or null if one is not found
135
     *
136
     * @throws \InvalidArgumentException $runningResetDuration, $waitDurationInMillis or $pollDurationInMillis was not
137
     *                                   an int
138
     * @throws \InvalidArgumentException key in $query was not a string
139
     */
140
    public function get(array $query, $runningResetDuration, $waitDurationInMillis = 3000, $pollDurationInMillis = 200)
141
    {
142
        if (!is_int($runningResetDuration)) {
143
            throw new \InvalidArgumentException('$runningResetDuration was not an int');
144
        }
145
146
        if (!is_int($waitDurationInMillis)) {
147
            throw new \InvalidArgumentException('$waitDurationInMillis was not an int');
148
        }
149
150
        if (!is_int($pollDurationInMillis)) {
151
            throw new \InvalidArgumentException('$pollDurationInMillis was not an int');
152
        }
153
154
        if ($pollDurationInMillis < 0) {
155
            $pollDurationInMillis = 0;
156
        }
157
158
        //reset stuck messages
159
        $this->collection->updateMany(
160
            ['running' => true, 'resetTimestamp' => ['$lte' => new \MongoDB\BSON\UTCDateTime(microtime(true) * 1000)]],
161
            ['$set' => ['running' => false]]
162
        );
163
164
        $completeQuery = ['running' => false];
165 View Code Duplication
        foreach ($query as $key => $value) {
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated across your project.

Duplicated code is one of the most pungent code smells. If you need to duplicate the same code in three or more different places, we strongly encourage you to look into extracting the code into a single class or operation.

You can also find more detailed suggestions in the “Code” section of your repository.

Loading history...
166
            if (!is_string($key)) {
167
                throw new \InvalidArgumentException('key in $query was not a string');
168
            }
169
170
            $completeQuery["payload.{$key}"] = $value;
171
        }
172
173
        $completeQuery['earliestGet'] = ['$lte' => new \MongoDB\BSON\UTCDateTime(microtime(true) * 1000)];
174
175
        $resetTimestamp = time() + $runningResetDuration;
176
        //ints overflow to floats
177
        if (!is_int($resetTimestamp)) {
178
            $resetTimestamp = $runningResetDuration > 0 ? self::MONGO_INT32_MAX : 0;
179
        }
180
181
        $update = ['$set' => ['resetTimestamp' => new \MongoDB\BSON\UTCDateTime($resetTimestamp * 1000), 'running' => true]];
182
        $fields = ['payload' => 1];
0 ignored issues
show
Unused Code introduced by
$fields is not used, you could remove the assignment.

This check looks for variable assignements that are either overwritten by other assignments or where the variable is not used subsequently.

$myVar = 'Value';
$higher = false;

if (rand(1, 6) > 3) {
    $higher = true;
} else {
    $higher = false;
}

Both the $myVar assignment in line 1 and the $higher assignment in line 2 are dead. The first because $myVar is never used and the second because $higher is always overwritten for every possible time line.

Loading history...
183
        $options = ['sort' => ['priority' => 1, 'created' => 1]];
184
185
        //ints overflow to floats, should be fine
186
        $end = microtime(true) + ($waitDurationInMillis / 1000.0);
187
188
        $sleepTime = $pollDurationInMillis * 1000;
189
        //ints overflow to floats and already checked $pollDurationInMillis was positive
190
        if (!is_int($sleepTime)) {
191
            //ignore since testing a giant sleep takes too long
192
            //@codeCoverageIgnoreStart
193
            $sleepTime = PHP_INT_MAX;
194
        }   //@codeCoverageIgnoreEnd
195
196
        while (true) {
197
            $message = $this->collection->findOneAndUpdate($completeQuery, $update, $options);
198
            //checking if _id exist because findAndModify doesnt seem to return null when it can't match the query on
199
            //older mongo extension
200
            if ($message !== null && array_key_exists('_id', $message)) {
201
                // findOneAndUpdate does not correctly return result according to typeMap options so just refetch.
202
                $message = $this->collection->findOne(['_id' => $message->_id]);
203
                //id on left of union operator so a possible id in payload doesnt wipe it out the generated one
204
                return ['id' => $message['_id']] + (array)$message['payload'];
205
            }
206
207
            if (microtime(true) >= $end) {
208
                return null;
209
            }
210
211
            usleep($sleepTime);
212
        }
213
214
        //ignore since always return from the function from the while loop
215
        //@codeCoverageIgnoreStart
216
    }
217
    //@codeCoverageIgnoreEnd
218
219
    /**
220
     * Count queue messages.
221
     *
222
     * @param array $query in same format as \MongoDB\Collection::find() where top level fields do not contain operators.
223
     * Lower level fields can however. eg: valid {a: {$gt: 1}, "b.c": 3}, invalid {$and: [{...}, {...}]}
224
     * @param bool|null $running query a running message or not or all
225
     *
226
     * @return int the count
227
     *
228
     * @throws \InvalidArgumentException $running was not null and not a bool
229
     * @throws \InvalidArgumentException key in $query was not a string
230
     */
231
    public function count(array $query, $running = null)
232
    {
233
        if ($running !== null && !is_bool($running)) {
234
            throw new \InvalidArgumentException('$running was not null and not a bool');
235
        }
236
237
        $totalQuery = [];
238
239
        if ($running !== null) {
240
            $totalQuery['running'] = $running;
241
        }
242
243 View Code Duplication
        foreach ($query as $key => $value) {
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated across your project.

Duplicated code is one of the most pungent code smells. If you need to duplicate the same code in three or more different places, we strongly encourage you to look into extracting the code into a single class or operation.

You can also find more detailed suggestions in the “Code” section of your repository.

Loading history...
244
            if (!is_string($key)) {
245
                throw new \InvalidArgumentException('key in $query was not a string');
246
            }
247
248
            $totalQuery["payload.{$key}"] = $value;
249
        }
250
251
        return $this->collection->count($totalQuery);
252
    }
253
254
    /**
255
     * Acknowledge a message was processed and remove from queue.
256
     *
257
     * @param array $message message received from get()
258
     *
259
     * @return void
260
     *
261
     * @throws \InvalidArgumentException $message does not have a field "id" that is a MongoDB\BSON\ObjectID
262
     */
263
    public function ack(array $message)
264
    {
265
        $id = null;
266
        if (array_key_exists('id', $message)) {
267
            $id = $message['id'];
268
        }
269
270
        if (!($id instanceof \MongoDB\BSON\ObjectID)) {
0 ignored issues
show
Bug introduced by
The class MongoDB\BSON\ObjectID does not exist. Did you forget a USE statement, or did you not list all dependencies?

This error could be the result of:

1. Missing dependencies

PHP Analyzer uses your composer.json file (if available) to determine the dependencies of your project and to determine all the available classes and functions. It expects the composer.json to be in the root folder of your repository.

Are you sure this class is defined by one of your dependencies, or did you maybe not list a dependency in either the require or require-dev section?

2. Missing use statement

PHP does not complain about undefined classes in ìnstanceof checks. For example, the following PHP code will work perfectly fine:

if ($x instanceof DoesNotExist) {
    // Do something.
}

If you have not tested against this specific condition, such errors might go unnoticed.

Loading history...
271
            throw new \InvalidArgumentException('$message does not have a field "id" that is a ObjectID');
272
        }
273
274
        $this->collection->deleteOne(['_id' => $id]);
275
    }
276
277
    /**
278
     * Atomically acknowledge and send a message to the queue.
279
     *
280
     * @param array $message the message to ack received from get()
281
     * @param array $payload the data to store in the message to send. Data is handled same way
282
     *                       as \MongoDB\Collection::insertOne()
283
     * @param int $earliestGet earliest unix timestamp the message can be retreived.
284
     * @param float $priority priority for order out of get(). 0 is higher priority than 1
285
     * @param bool $newTimestamp true to give the payload a new timestamp or false to use given message timestamp
286
     *
287
     * @return void
288
     *
289
     * @throws \InvalidArgumentException $message does not have a field "id" that is a ObjectID
290
     * @throws \InvalidArgumentException $earliestGet was not an int
291
     * @throws \InvalidArgumentException $priority was not a float
292
     * @throws \InvalidArgumentException $priority is NaN
293
     * @throws \InvalidArgumentException $newTimestamp was not a bool
294
     */
295
    public function ackSend(array $message, array $payload, $earliestGet = 0, $priority = 0.0, $newTimestamp = true)
296
    {
297
        $id = null;
298
        if (array_key_exists('id', $message)) {
299
            $id = $message['id'];
300
        }
301
302
        if (!($id instanceof \MongoDB\BSON\ObjectId)) {
0 ignored issues
show
Bug introduced by
The class MongoDB\BSON\ObjectID does not exist. Did you forget a USE statement, or did you not list all dependencies?

This error could be the result of:

1. Missing dependencies

PHP Analyzer uses your composer.json file (if available) to determine the dependencies of your project and to determine all the available classes and functions. It expects the composer.json to be in the root folder of your repository.

Are you sure this class is defined by one of your dependencies, or did you maybe not list a dependency in either the require or require-dev section?

2. Missing use statement

PHP does not complain about undefined classes in ìnstanceof checks. For example, the following PHP code will work perfectly fine:

if ($x instanceof DoesNotExist) {
    // Do something.
}

If you have not tested against this specific condition, such errors might go unnoticed.

Loading history...
303
            throw new \InvalidArgumentException('$message does not have a field "id" that is a ObjectId');
304
        }
305
306
        if (!is_int($earliestGet)) {
307
            throw new \InvalidArgumentException('$earliestGet was not an int');
308
        }
309
310
        if (!is_float($priority)) {
311
            throw new \InvalidArgumentException('$priority was not a float');
312
        }
313
314
        if (is_nan($priority)) {
315
            throw new \InvalidArgumentException('$priority was NaN');
316
        }
317
318
        if ($newTimestamp !== true && $newTimestamp !== false) {
319
            throw new \InvalidArgumentException('$newTimestamp was not a bool');
320
        }
321
322
        //Ensure $earliestGet is between 0 and MONGO_INT32_MAX
323
        $earliestGet = min(max(0, $earliestGet), self::MONGO_INT32_MAX);
324
325
        $toSet = [
326
            'payload' => $payload,
327
            'running' => false,
328
            'resetTimestamp' => new \MongoDB\BSON\UTCDateTime(self::MONGO_INT32_MAX),
329
            'earliestGet' => new \MongoDB\BSON\UTCDateTime($earliestGet),
330
            'priority' => $priority,
331
        ];
332
        if ($newTimestamp) {
333
            $toSet['created'] = new \MongoDB\BSON\UTCDateTime(microtime(true) * 1000);
334
        }
335
336
        //using upsert because if no documents found then the doc was removed (SHOULD ONLY HAPPEN BY SOMEONE MANUALLY)
337
        //so we can just send
338
        $this->collection->updateOne(['_id' => $id], ['$set' => $toSet], ['upsert' => true]);
339
    }
340
341
    /**
342
     * Requeue message to the queue. Same as ackSend() with the same message.
343
     *
344
     * @param array $message message received from get().
345
     * @param int $earliestGet earliest unix timestamp the message can be retreived.
346
     * @param float $priority priority for order out of get(). 0 is higher priority than 1
347
     * @param bool $newTimestamp true to give the payload a new timestamp or false to use given message timestamp
348
     *
349
     * @return void
350
     *
351
     * @throws \InvalidArgumentException $message does not have a field "id" that is a ObjectId
352
     * @throws \InvalidArgumentException $earliestGet was not an int
353
     * @throws \InvalidArgumentException $priority was not a float
354
     * @throws \InvalidArgumentException priority is NaN
355
     * @throws \InvalidArgumentException $newTimestamp was not a bool
356
     */
357
    public function requeue(array $message, $earliestGet = 0, $priority = 0.0, $newTimestamp = true)
358
    {
359
        $forRequeue = $message;
360
        unset($forRequeue['id']);
361
        $this->ackSend($message, $forRequeue, $earliestGet, $priority, $newTimestamp);
362
    }
363
364
    /**
365
     * Send a message to the queue.
366
     *
367
     * @param array $payload the data to store in the message. Data is handled same way as \MongoDB\Collection::insertOne()
368
     * @param int $earliestGet earliest unix timestamp the message can be retreived.
369
     * @param float $priority priority for order out of get(). 0 is higher priority than 1
370
     *
371
     * @return void
372
     *
373
     * @throws \InvalidArgumentException $earliestGet was not an int
374
     * @throws \InvalidArgumentException $priority was not a float
375
     * @throws \InvalidArgumentException $priority is NaN
376
     */
377
    public function send(array $payload, $earliestGet = 0, $priority = 0.0)
378
    {
379
        if (!is_int($earliestGet)) {
380
            throw new \InvalidArgumentException('$earliestGet was not an int');
381
        }
382
383
        if (!is_float($priority)) {
384
            throw new \InvalidArgumentException('$priority was not a float');
385
        }
386
387
        if (is_nan($priority)) {
388
            throw new \InvalidArgumentException('$priority was NaN');
389
        }
390
391
        //Ensure $earliestGet is between 0 and MONGO_INT32_MAX
392
        $earliestGet = min(max(0, $earliestGet), self::MONGO_INT32_MAX);
393
394
        $message = [
395
            'payload' => $payload,
396
            'running' => false,
397
            'resetTimestamp' => new \MongoDB\BSON\UTCDateTime(self::MONGO_INT32_MAX * 1000),
398
            'earliestGet' => new \MongoDB\BSON\UTCDateTime($earliestGet * 1000),
399
            'priority' => $priority,
400
            'created' => new \MongoDB\BSON\UTCDateTime(microtime(true) * 1000),
401
        ];
402
403
        $this->collection->insertOne($message);
404
    }
405
406
    /**
407
     * Ensure index of correct specification and a unique name whether the specification or name already exist or not.
408
     * Will not create index if $index is a prefix of an existing index
409
     *
410
     * @param array $index index to create in same format as \MongoDB\Collection::createIndex()
411
     *
412
     * @return void
413
     *
414
     * @throws \Exception couldnt create index after 5 attempts
415
     */
416
    private function ensureIndex(array $index)
417
    {
418
        //if $index is a prefix of any existing index we are good
419
        foreach ($this->collection->listIndexes() as $existingIndex) {
420
            $slice = array_slice($existingIndex['key'], 0, count($index), true);
421
            if ($slice === $index) {
422
                return;
423
            }
424
        }
425
426
        for ($i = 0; $i < 5; ++$i) {
427
            for ($name = uniqid(); strlen($name) > 0; $name = substr($name, 0, -1)) {
428
                //creating an index with same name and different spec does nothing.
429
                //creating an index with same spec and different name does nothing.
430
                //so we use any generated name, and then find the right spec after we have called,
431
                //and just go with that name.
432
                try {
433
                    $this->collection->createIndex($index, ['name' => $name, 'background' => true]);
434
                } catch (\MongoDB\Exception\Exception $e) {
435
                    //this happens when the name was too long, let continue
436
                }
437
438
                foreach ($this->collection->listIndexes() as $existingIndex) {
439
                    if ($existingIndex['key'] === $index) {
440
                        return;
441
                    }
442
                }
443
            }
444
        }
445
446
        throw new \Exception('couldnt create index after 5 attempts');
447
    }
448
449
    /**
450
     * Helper method to validate keys and values for the given sort array
451
     *
452
     * @param array  $sort             The proposed sort for a mongo index.
453
     * @param string $label            The name of the variable given to the public ensureXIndex method.
454
     * @param array  &$completedFields The final index array with payload. prefix added to fields.
455
     *
456
     * @return void
457
     */
458
    private static function verifySort(array $sort, $label, &$completeFields)
459
    {
460
        foreach ($sort as $key => $value) {
461
            if (!is_string($key)) {
462
                throw new \InvalidArgumentException("key in \${$label} was not a string");
463
            }
464
465
            if ($value !== 1 && $value !== -1) {
466
                throw new \InvalidArgumentException(
467
                    "value of \${$label} is not 1 or -1 for ascending and descending"
468
                );
469
            }
470
471
            $completeFields["payload.{$key}"] = $value;
472
        }
473
    }
474
}
475