Test Failed
Pull Request — master (#63)
by Chad
04:40
created

Queue   C

Complexity

Total Complexity 61

Size/Duplication

Total Lines 452
Duplicated Lines 3.1 %

Coupling/Cohesion

Components 1
Dependencies 3

Importance

Changes 0
Metric Value
wmc 61
lcom 1
cbo 3
dl 14
loc 452
rs 6.018
c 0
b 0
f 0

How to fix   Duplicated Code    Complexity   

Duplicated Code

Duplicate code is one of the most pungent code smells. A rule that is often used is to re-structure code once it is duplicated in three or more places.

Common duplication problems, and corresponding solutions are:

Complex Class

 Tip:   Before tackling complexity, make sure that you eliminate any duplication first. This often can reduce the size of classes significantly.

Complex classes like Queue often do a lot of different things. To break such a class down, we need to identify a cohesive component within that class. A common approach to find such a component is to look for fields/methods that share the same prefixes, or suffixes. You can also have a look at the cohesion graph to spot any un-connected, or weakly-connected components.

Once you have determined the fields that belong together, you can apply the Extract Class refactoring. If the component makes sense as a sub-class, Extract Subclass is also a candidate, and is often faster.

While breaking up the class, it is a good idea to analyze how other classes use Queue, and based on these observations, apply Extract Interface, too.

1
<?php
2
/**
3
 * Defines the DominionEnterprises\Mongo\Queue class.
4
 */
5
6
namespace DominionEnterprises\Mongo;
7
8
use MongoDB\BSON\UTCDateTime;
9
10
/**
11
 * Abstraction of mongo db collection as priority queue.
12
 *
13
 * Tied priorities are ordered by time. So you may use a single priority for normal queuing (default args exist for
14
 * this purpose).  Using a random priority achieves random get()
15
 */
16
final class Queue implements QueueInterface
17
{
18
    /**
19
     * Maximum millisecond value to use for UTCDateTime creation.
20
     *
21
     * @var integer
22
     */
23
    const MONGO_INT32_MAX = PHP_INT_MAX;
24
25
    /**
26
     * mongo collection to use for queue.
27
     *
28
     * @var \MongoDB\Collection
29
     */
30
    private $collection;
31
32
    /**
33
     * Construct queue.
34
     *
35
     * @param \MongoDB\Collection|string $collectionOrUrl A MongoCollection instance or the mongo connection url.
36
     * @param string $db the mongo db name
37
     * @param string $collection the collection name to use for the queue
38
     *
39
     * @throws \InvalidArgumentException $collectionOrUrl, $db or $collection was not a string
40
     */
41
    public function __construct($collectionOrUrl, $db = null, $collection = null)
42
    {
43
        if ($collectionOrUrl instanceof \MongoDB\Collection) {
44
            $this->collection = $collectionOrUrl;
45
            return;
46
        }
47
48
        if (!is_string($collectionOrUrl)) {
49
            throw new \InvalidArgumentException('$collectionOrUrl was not a string');
50
        }
51
52
        if (!is_string($db)) {
53
            throw new \InvalidArgumentException('$db was not a string');
54
        }
55
56
        if (!is_string($collection)) {
57
            throw new \InvalidArgumentException('$collection was not a string');
58
        }
59
60
        $mongo = new \MongoDB\Client($collectionOrUrl, [], ['typeMap' => ['root' => 'array', 'document' => 'array', 'array' => 'array']]);
61
        $mongoDb = $mongo->selectDatabase($db);
62
        $this->collection = $mongoDb->selectCollection($collection);
63
    }
64
65
    /**
66
     * Ensure an index for the get() method.
67
     *
68
     * @param array $beforeSort Fields in get() call to index before the sort field in same format
69
     *                          as \MongoDB\Collection::ensureIndex()
70
     * @param array $afterSort  Fields in get() call to index after the sort field in same format as
71
     *                          \MongoDB\Collection::ensureIndex()
72
     *
73
     * @return void
74
     *
75
     * @throws \InvalidArgumentException value of $beforeSort or $afterSort is not 1 or -1 for ascending and descending
76
     * @throws \InvalidArgumentException key in $beforeSort or $afterSort was not a string
77
     */
78
    public function ensureGetIndex(array $beforeSort = [], array $afterSort = [])
79
    {
80
        //using general rule: equality, sort, range or more equality tests in that order for index
81
        $completeFields = ['earliestGet' => 1];
82
83
        self::verifySort($beforeSort, 'beforeSort', $completeFields);
84
85
        $completeFields['priority'] = 1;
86
        $completeFields['created'] = 1;
87
88
        self::verifySort($afterSort, 'afterSort', $completeFields);
89
90
        //for the main query in get()
91
        $this->ensureIndex($completeFields);
92
    }
93
94
    /**
95
     * Ensure an index for the count() method.
96
     * Is a no-op if the generated index is a prefix of an existing one. If you have a similar ensureGetIndex call,
97
     * call it first.
98
     *
99
     * @param array $fields fields in count() call to index in same format as \MongoDB\Collection::createIndex()
100
     * @param bool $includeRunning whether to include the running field in the index
101
     *
102
     * @return void
103
     *
104
     * @throws \InvalidArgumentException $includeRunning was not a boolean
105
     * @throws \InvalidArgumentException key in $fields was not a string
106
     * @throws \InvalidArgumentException value of $fields is not 1 or -1 for ascending and descending
107
     */
108
    public function ensureCountIndex(array $fields, $includeRunning)
109
    {
110
        if (!is_bool($includeRunning)) {
111
            throw new \InvalidArgumentException('$includeRunning was not a boolean');
112
        }
113
114
        $completeFields = [];
115
116
        if ($includeRunning) {
117
            $completeFields['earliestGet'] = 1;
118
        }
119
120
        self::verifySort($fields, 'fields', $completeFields);
121
122
        $this->ensureIndex($completeFields);
123
    }
124
125
    /**
126
     * Get a non running message from the queue.
127
     *
128
     * @param array $query in same format as \MongoDB\Collection::find() where top level fields do not contain operators.
129
     *                     Lower level fields can however. eg: valid {a: {$gt: 1}, "b.c": 3},
130
     *                     invalid {$and: [{...}, {...}]}
131
     * @param int $runningResetDuration second duration the message can stay unacked before it resets and can be
132
     *                                  retreived again.
133
     * @param int $waitDurationInMillis millisecond duration to wait for a message.
134
     * @param int $pollDurationInMillis millisecond duration to wait between polls.
135
     *
136
     * @return array|null the message or null if one is not found
137
     *
138
     * @throws \InvalidArgumentException $runningResetDuration, $waitDurationInMillis or $pollDurationInMillis was not
139
     *                                   an int
140
     * @throws \InvalidArgumentException key in $query was not a string
141
     */
142
    public function get(array $query, $runningResetDuration, $waitDurationInMillis = 3000, $pollDurationInMillis = 200)
143
    {
144
        if (!is_int($runningResetDuration)) {
145
            throw new \InvalidArgumentException('$runningResetDuration was not an int');
146
        }
147
148
        if (!is_int($waitDurationInMillis)) {
149
            throw new \InvalidArgumentException('$waitDurationInMillis was not an int');
150
        }
151
152
        if (!is_int($pollDurationInMillis)) {
153
            throw new \InvalidArgumentException('$pollDurationInMillis was not an int');
154
        }
155
156
        if ($pollDurationInMillis < 0) {
157
            $pollDurationInMillis = 0;
158
        }
159
160
        $completeQuery = ['earliestGet' => ['$lte' => new UTCDateTime((int)(microtime(true) * 1000))]];
161
        foreach ($query as $key => $value) {
162
            if (!is_string($key)) {
163
                throw new \InvalidArgumentException('key in $query was not a string');
164
            }
165
166
            $completeQuery["payload.{$key}"] = $value;
167
        }
168
169
        $resetTimestamp = time() + $runningResetDuration;
170
        //ints overflow to floats
171
        if (!is_int($resetTimestamp)) {
172
            $resetTimestamp = $runningResetDuration > 0 ? self::MONGO_INT32_MAX : 0;
173
        }
174
175
        $resetTimestamp = min(max(0, $resetTimestamp * 1000), self::MONGO_INT32_MAX);
176
177
        $update = ['$set' => ['earliestGet' => new UTCDateTime($resetTimestamp)]];
178
        $findOneAndUpdateOptions = ['sort' => ['priority' => 1, 'created' => 1]];
179
        $findOneOptions = ['typeMap' => ['root' => 'array', 'document' => 'array', 'array' => 'array']],
0 ignored issues
show
Bug introduced by
This code did not parse for me. Apparently, there is an error somewhere around this line:

Syntax error, unexpected ','
Loading history...
180
181
        //ints overflow to floats, should be fine
182
        $end = microtime(true) + ($waitDurationInMillis / 1000.0);
183
184
        $sleepTime = $pollDurationInMillis * 1000;
185
        //ints overflow to floats and already checked $pollDurationInMillis was positive
186
        if (!is_int($sleepTime)) {
187
            //ignore since testing a giant sleep takes too long
188
            //@codeCoverageIgnoreStart
189
            $sleepTime = PHP_INT_MAX;
190
        }   //@codeCoverageIgnoreEnd
191
192
        while (true) {
193
            $id = $this->getIdFromMessage(
194
                $this->collection->findOneAndUpdate($completeQuery, $update, $findOneAndUpdateOptions)
195
            );
196
            if ($id !== null) {
197
                // findOneAndUpdate does not correctly return result according to typeMap options so just refetch.
198
                $message = $this->collection->findOne(['_id' => $id], $findOneOptions);
199
                //id on left of union operator so a possible id in payload doesnt wipe it out the generated one
200
                return ['id' => $id] + $message['payload'];
201
            }
202
203
            if (microtime(true) >= $end) {
204
                return null;
205
            }
206
207
            usleep($sleepTime);
208
        }
209
210
        //ignore since always return from the function from the while loop
211
        //@codeCoverageIgnoreStart
212
    }
213
    //@codeCoverageIgnoreEnd
214
215
    private function getIdFromMessage($message)
216
    {
217
        if (is_array($message)) {
218
            return array_key_exists('_id', $message) ? $message['_id'] : null;
219
        }
220
221
        if (is_object($message)) {
222
            return isset($message->_id) ? $message->_id : null;
223
        }
224
225
        return null;
226
    }
227
228
    /**
229
     * Count queue messages.
230
     *
231
     * @param array $query in same format as \MongoDB\Collection::find() where top level fields do not contain operators.
232
     * Lower level fields can however. eg: valid {a: {$gt: 1}, "b.c": 3}, invalid {$and: [{...}, {...}]}
233
     * @param bool|null $running query a running message or not or all
234
     *
235
     * @return int the count
236
     *
237
     * @throws \InvalidArgumentException $running was not null and not a bool
238
     * @throws \InvalidArgumentException key in $query was not a string
239
     */
240
    public function count(array $query, $running = null)
241
    {
242
        if ($running !== null && !is_bool($running)) {
243
            throw new \InvalidArgumentException('$running was not null and not a bool');
244
        }
245
246
        $totalQuery = [];
247
248
        if ($running === true || $running === false) {
249
            $key = $running ? '$gt' : '$lte';
250
            $totalQuery['earliestGet'] = [$key => new UTCDateTime((int)(microtime(true) * 1000))];
251
        }
252
253
        foreach ($query as $key => $value) {
254
            if (!is_string($key)) {
255
                throw new \InvalidArgumentException('key in $query was not a string');
256
            }
257
258
            $totalQuery["payload.{$key}"] = $value;
259
        }
260
261
        return $this->collection->count($totalQuery);
262
    }
263
264
    /**
265
     * Acknowledge a message was processed and remove from queue.
266
     *
267
     * @param array $message message received from get()
268
     *
269
     * @return void
270
     *
271
     * @throws \InvalidArgumentException $message does not have a field "id" that is a MongoDB\BSON\ObjectID
272
     */
273
    public function ack(array $message)
274
    {
275
        $id = null;
276
        if (array_key_exists('id', $message)) {
277
            $id = $message['id'];
278
        }
279
280
        if (!(is_a($id, 'MongoDB\BSON\ObjectID'))) {
281
            throw new \InvalidArgumentException('$message does not have a field "id" that is a ObjectID');
282
        }
283
284
        $this->collection->deleteOne(['_id' => $id]);
285
    }
286
287
    /**
288
     * Atomically acknowledge and send a message to the queue.
289
     *
290
     * @param array $message the message to ack received from get()
291
     * @param array $payload the data to store in the message to send. Data is handled same way
292
     *                       as \MongoDB\Collection::insertOne()
293
     * @param int $earliestGet earliest unix timestamp the message can be retreived.
294
     * @param float $priority priority for order out of get(). 0 is higher priority than 1
295
     * @param bool $newTimestamp true to give the payload a new timestamp or false to use given message timestamp
296
     *
297
     * @return void
298
     *
299
     * @throws \InvalidArgumentException $message does not have a field "id" that is a ObjectID
300
     * @throws \InvalidArgumentException $earliestGet was not an int
301
     * @throws \InvalidArgumentException $priority was not a float
302
     * @throws \InvalidArgumentException $priority is NaN
303
     * @throws \InvalidArgumentException $newTimestamp was not a bool
304
     */
305
    public function ackSend(array $message, array $payload, $earliestGet = 0, $priority = 0.0, $newTimestamp = true)
306
    {
307
        $id = null;
308
        if (array_key_exists('id', $message)) {
309
            $id = $message['id'];
310
        }
311
312
        if (!(is_a($id, 'MongoDB\BSON\ObjectID'))) {
313
            throw new \InvalidArgumentException('$message does not have a field "id" that is a ObjectID');
314
        }
315
316
        if (!is_int($earliestGet)) {
317
            throw new \InvalidArgumentException('$earliestGet was not an int');
318
        }
319
320
        if (!is_float($priority)) {
321
            throw new \InvalidArgumentException('$priority was not a float');
322
        }
323
324
        if (is_nan($priority)) {
325
            throw new \InvalidArgumentException('$priority was NaN');
326
        }
327
328
        if ($newTimestamp !== true && $newTimestamp !== false) {
329
            throw new \InvalidArgumentException('$newTimestamp was not a bool');
330
        }
331
332
        //Ensure $earliestGet is between 0 and MONGO_INT32_MAX
333
        $earliestGet = min(max(0, $earliestGet * 1000), self::MONGO_INT32_MAX);
334
335
        $toSet = [
336
            'payload' => $payload,
337
            'earliestGet' => new UTCDateTime($earliestGet),
338
            'priority' => $priority,
339
        ];
340
        if ($newTimestamp) {
341
            $toSet['created'] = new UTCDateTime((int)(microtime(true) * 1000));
342
        }
343
344
        //using upsert because if no documents found then the doc was removed (SHOULD ONLY HAPPEN BY SOMEONE MANUALLY)
345
        //so we can just send
346
        $this->collection->updateOne(['_id' => $id], ['$set' => $toSet], ['upsert' => true]);
347
    }
348
349
    /**
350
     * Requeue message to the queue. Same as ackSend() with the same message.
351
     *
352
     * @param array $message message received from get().
353
     * @param int $earliestGet earliest unix timestamp the message can be retreived.
354
     * @param float $priority priority for order out of get(). 0 is higher priority than 1
355
     * @param bool $newTimestamp true to give the payload a new timestamp or false to use given message timestamp
356
     *
357
     * @return void
358
     *
359
     * @throws \InvalidArgumentException $message does not have a field "id" that is a ObjectID
360
     * @throws \InvalidArgumentException $earliestGet was not an int
361
     * @throws \InvalidArgumentException $priority was not a float
362
     * @throws \InvalidArgumentException priority is NaN
363
     * @throws \InvalidArgumentException $newTimestamp was not a bool
364
     */
365
    public function requeue(array $message, $earliestGet = 0, $priority = 0.0, $newTimestamp = true)
366
    {
367
        $forRequeue = $message;
368
        unset($forRequeue['id']);
369
        $this->ackSend($message, $forRequeue, $earliestGet, $priority, $newTimestamp);
370
    }
371
372
    /**
373
     * Send a message to the queue.
374
     *
375
     * @param array $payload the data to store in the message. Data is handled same way as \MongoDB\Collection::insertOne()
376
     * @param int $earliestGet earliest unix timestamp the message can be retreived.
377
     * @param float $priority priority for order out of get(). 0 is higher priority than 1
378
     *
379
     * @return void
380
     *
381
     * @throws \InvalidArgumentException $earliestGet was not an int
382
     * @throws \InvalidArgumentException $priority was not a float
383
     * @throws \InvalidArgumentException $priority is NaN
384
     */
385
    public function send(array $payload, $earliestGet = 0, $priority = 0.0)
386
    {
387
        if (!is_int($earliestGet)) {
388
            throw new \InvalidArgumentException('$earliestGet was not an int');
389
        }
390
391
        if (!is_float($priority)) {
392
            throw new \InvalidArgumentException('$priority was not a float');
393
        }
394
395
        if (is_nan($priority)) {
396
            throw new \InvalidArgumentException('$priority was NaN');
397
        }
398
399
        //Ensure $earliestGet is between 0 and MONGO_INT32_MAX
400
        $earliestGet = min(max(0, $earliestGet * 1000), self::MONGO_INT32_MAX);
401
402
        $message = [
403
            'payload' => $payload,
404
            'earliestGet' => new UTCDateTime($earliestGet),
405
            'priority' => $priority,
406
            'created' => new UTCDateTime((int)(microtime(true) * 1000)),
407
        ];
408
409
        $this->collection->insertOne($message);
410
    }
411
412
    /**
413
     * Ensure index of correct specification and a unique name whether the specification or name already exist or not.
414
     * Will not create index if $index is a prefix of an existing index
415
     *
416
     * @param array $index index to create in same format as \MongoDB\Collection::createIndex()
417
     *
418
     * @return void
419
     *
420
     * @throws \Exception couldnt create index after 5 attempts
421
     */
422
    private function ensureIndex(array $index)
423
    {
424
        //if $index is a prefix of any existing index we are good
425
        foreach ($this->collection->listIndexes() as $existingIndex) {
426
            $slice = array_slice($existingIndex['key'], 0, count($index), true);
427
            if ($slice === $index) {
428
                return;
429
            }
430
        }
431
432
        for ($i = 0; $i < 5; ++$i) {
433
            for ($name = uniqid(); strlen($name) > 0; $name = substr($name, 0, -1)) {
434
                //creating an index with same name and different spec does nothing.
435
                //creating an index with same spec and different name does nothing.
436
                //so we use any generated name, and then find the right spec after we have called,
437
                //and just go with that name.
438
                try {
439
                    $this->collection->createIndex($index, ['name' => $name, 'background' => true]);
440
                } catch (\MongoDB\Exception\Exception $e) {
441
                    //this happens when the name was too long, let continue
442
                }
443
444
                foreach ($this->collection->listIndexes() as $existingIndex) {
445
                    if ($existingIndex['key'] === $index) {
446
                        return;
447
                    }
448
                }
449
            }
450
        }
451
452
        throw new \Exception('couldnt create index after 5 attempts');
453
        //@codeCoverageIgnoreEnd
454
    }
455
456
    /**
457
     * Helper method to validate keys and values for the given sort array
458
     *
459
     * @param array  $sort             The proposed sort for a mongo index.
460
     * @param string $label            The name of the variable given to the public ensureXIndex method.
461
     * @param array  &$completedFields The final index array with payload. prefix added to fields.
462
     *
463
     * @return void
464
     */
465
    private static function verifySort(array $sort, $label, &$completeFields)
466
    {
467
        foreach ($sort as $key => $value) {
468
            if (!is_string($key)) {
469
                throw new \InvalidArgumentException("key in \${$label} was not a string");
470
            }
471
472
            if ($value !== 1 && $value !== -1) {
473
                throw new \InvalidArgumentException(
474
                    "value of \${$label} is not 1 or -1 for ascending and descending"
475
                );
476
            }
477
478
            $completeFields["payload.{$key}"] = $value;
479
        }
480
    }
481
}
482