Passed
Pull Request — master (#60)
by Chad
02:08
created

AbstractQueue   B

Complexity

Total Complexity 51

Size/Duplication

Total Lines 417
Duplicated Lines 3.36 %

Coupling/Cohesion

Components 1
Dependencies 1

Importance

Changes 0
Metric Value
wmc 51
lcom 1
cbo 1
dl 14
loc 417
rs 8.3206
c 0
b 0
f 0

14 Methods

Rating   Name   Duplication   Size   Complexity  
A ensureGetIndex() 0 15 1
A ensureCountIndex() 0 12 2
C get() 7 65 11
B count() 7 19 6
A ack() 0 13 3
B ackSend() 0 36 5
A requeue() 0 10 1
A send() 0 18 2
A ensureIndex() 0 15 4
A isIndexIncludedInExistingIndex() 0 12 3
A tryCreateIndex() 0 10 3
A tryCreateNamedIndex() 0 14 2
A indexExists() 0 10 3
B verifySort() 0 16 5

How to fix   Duplicated Code    Complexity   

Duplicated Code

Duplicate code is one of the most pungent code smells. A rule that is often used is to re-structure code once it is duplicated in three or more places.

Common duplication problems, and corresponding solutions are:

Complex Class

 Tip:   Before tackling complexity, make sure that you eliminate any duplication first. This often can reduce the size of classes significantly.

Complex classes like AbstractQueue often do a lot of different things. To break such a class down, we need to identify a cohesive component within that class. A common approach to find such a component is to look for fields/methods that share the same prefixes, or suffixes. You can also have a look at the cohesion graph to spot any un-connected, or weakly-connected components.

Once you have determined the fields that belong together, you can apply the Extract Class refactoring. If the component makes sense as a sub-class, Extract Subclass is also a candidate, and is often faster.

While breaking up the class, it is a good idea to analyze how other classes use AbstractQueue, and based on these observations, apply Extract Interface, too.

1
<?php
2
/**
3
 * Defines the TraderInteractive\Mongo\Queue class.
4
 */
5
6
namespace TraderInteractive\Mongo;
7
8
use MongoDB\BSON\UTCDateTime;
9
10
/**
11
 * Abstraction of mongo db collection as priority queue.
12
 *
13
 * Tied priorities are ordered by time. So you may use a single priority for normal queuing (default args exist for
14
 * this purpose).  Using a random priority achieves random get()
15
 */
16
abstract class AbstractQueue implements QueueInterface
17
{
18
    /**
19
     * Maximum millisecond value to use for UTCDateTime creation.
20
     *
21
     * @var integer
22
     */
23
    const MONGO_INT32_MAX = PHP_INT_MAX;
24
25
    /**
26
     * mongo collection to use for queue.
27
     *
28
     * @var \MongoDB\Collection
29
     */
30
    protected $collection;
31
32
    /**
33
     * Ensure an index for the get() method.
34
     *
35
     * @param array $beforeSort Fields in get() call to index before the sort field in same format
36
     *                          as \MongoDB\Collection::ensureIndex()
37
     * @param array $afterSort  Fields in get() call to index after the sort field in same format as
38
     *                          \MongoDB\Collection::ensureIndex()
39
     *
40
     * @return void
41
     *
42
     * @throws \InvalidArgumentException value of $beforeSort or $afterSort is not 1 or -1 for ascending and descending
43
     * @throws \InvalidArgumentException key in $beforeSort or $afterSort was not a string
44
     */
45
    final public function ensureGetIndex(array $beforeSort = [], array $afterSort = [])
46
    {
47
        //using general rule: equality, sort, range or more equality tests in that order for index
48
        $completeFields = ['earliestGet' => 1];
49
50
        self::verifySort($beforeSort, 'beforeSort', $completeFields);
51
52
        $completeFields['priority'] = 1;
53
        $completeFields['created'] = 1;
54
55
        self::verifySort($afterSort, 'afterSort', $completeFields);
56
57
        //for the main query in get()
58
        $this->ensureIndex($completeFields);
59
    }
60
61
    /**
62
     * Ensure an index for the count() method.
63
     * Is a no-op if the generated index is a prefix of an existing one. If you have a similar ensureGetIndex call,
64
     * call it first.
65
     *
66
     * @param array $fields fields in count() call to index in same format as \MongoDB\Collection::createIndex()
67
     * @param bool $includeRunning whether to include the running field in the index
68
     *
69
     * @return void
70
     *
71
     * @throws \InvalidArgumentException key in $fields was not a string
72
     * @throws \InvalidArgumentException value of $fields is not 1 or -1 for ascending and descending
73
     */
74
    final public function ensureCountIndex(array $fields, bool $includeRunning)
75
    {
76
        $completeFields = [];
77
78
        if ($includeRunning) {
79
            $completeFields['earliestGet'] = 1;
80
        }
81
82
        self::verifySort($fields, 'fields', $completeFields);
83
84
        $this->ensureIndex($completeFields);
85
    }
86
87
    /**
88
     * Get a non running message from the queue.
89
     *
90
     * @param array $query in same format as \MongoDB\Collection::find() where top level fields do not contain
91
     *                     operators. Lower level fields can however. eg: valid {a: {$gt: 1}, "b.c": 3},
92
     *                     invalid {$and: [{...}, {...}]}
93
     * @param int $runningResetDuration second duration the message can stay unacked before it resets and can be
94
     *                                  retreived again.
95
     * @param int $waitDurationInMillis millisecond duration to wait for a message.
96
     * @param int $pollDurationInMillis millisecond duration to wait between polls.
97
     *
98
     * @return array|null the message or null if one is not found
99
     *
100
     * @throws \InvalidArgumentException key in $query was not a string
101
     */
102
    final public function get(
103
        array $query,
104
        int $runningResetDuration,
105
        int $waitDurationInMillis = 3000,
106
        int $pollDurationInMillis = 200
107
    ) {
108
        if ($pollDurationInMillis < 0) {
109
            $pollDurationInMillis = 0;
110
        }
111
112
        $completeQuery = ['earliestGet' => ['$lte' => new UTCDateTime((int)(microtime(true) * 1000))]];
113 View Code Duplication
        foreach ($query as $key => $value) {
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated across your project.

Duplicated code is one of the most pungent code smells. If you need to duplicate the same code in three or more different places, we strongly encourage you to look into extracting the code into a single class or operation.

You can also find more detailed suggestions in the “Code” section of your repository.

Loading history...
114
            if (!is_string($key)) {
115
                throw new \InvalidArgumentException('key in $query was not a string');
116
            }
117
118
            $completeQuery["payload.{$key}"] = $value;
119
        }
120
121
        $resetTimestamp = time() + $runningResetDuration;
122
        //ints overflow to floats
123
        if (!is_int($resetTimestamp)) {
124
            $resetTimestamp = $runningResetDuration > 0 ? self::MONGO_INT32_MAX : 0;
125
        }
126
127
        $resetTimestamp = min(max(0, $resetTimestamp * 1000), self::MONGO_INT32_MAX);
128
129
        $update = ['$set' => ['earliestGet' => new UTCDateTime($resetTimestamp)]];
130
        $options = [
131
            'sort' => ['priority' => 1, 'created' => 1],
132
            'typeMap' => ['root' => 'array', 'document' => 'array', 'array' => 'array'],
133
        ];
134
135
        //ints overflow to floats, should be fine
136
        $end = microtime(true) + ($waitDurationInMillis / 1000.0);
137
138
        $sleepTime = $pollDurationInMillis * 1000;
139
        //ints overflow to floats and already checked $pollDurationInMillis was positive
140
        if (!is_int($sleepTime)) {
141
            //ignore since testing a giant sleep takes too long
142
            //@codeCoverageIgnoreStart
143
            $sleepTime = PHP_INT_MAX;
144
        }   //@codeCoverageIgnoreEnd
145
146
        while (true) {
147
            $message = $this->collection->findOneAndUpdate($completeQuery, $update, $options);
148
            //checking if _id exist because findAndModify doesnt seem to return null when it can't match the query on
149
            //older mongo extension
150
            if ($message !== null && array_key_exists('_id', $message)) {
151
                // findOneAndUpdate does not correctly return result according to typeMap options so just refetch.
152
                $message = $this->collection->findOne(['_id' => $message['_id']]);
153
                //id on left of union operator so a possible id in payload doesnt wipe it out the generated one
154
                return ['id' => $message['_id']] + (array)$message['payload'];
155
            }
156
157
            if (microtime(true) >= $end) {
158
                return null;
159
            }
160
161
            usleep($sleepTime);
162
        }
163
164
        //ignore since always return from the function from the while loop
165
        //@codeCoverageIgnoreStart
166
    }
167
    //@codeCoverageIgnoreEnd
168
169
    /**
170
     * Count queue messages.
171
     *
172
     * @param array $query in same format as \MongoDB\Collection::find() where top level fields do not contain
173
     *                     operators. Lower level fields can however. eg: valid {a: {$gt: 1}, "b.c": 3},
174
     *                     invalid {$and: [{...}, {...}]}
175
     * @param bool|null $running query a running message or not or all
176
     *
177
     * @return int the count
178
     *
179
     * @throws \InvalidArgumentException key in $query was not a string
180
     */
181
    final public function count(array $query, bool $running = null) : int
182
    {
183
        $totalQuery = [];
184
185
        if ($running === true || $running === false) {
186
            $key = $running ? '$gt' : '$lte';
187
            $totalQuery['earliestGet'] = [$key => new UTCDateTime((int)(microtime(true) * 1000))];
188
        }
189
190 View Code Duplication
        foreach ($query as $key => $value) {
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated across your project.

Duplicated code is one of the most pungent code smells. If you need to duplicate the same code in three or more different places, we strongly encourage you to look into extracting the code into a single class or operation.

You can also find more detailed suggestions in the “Code” section of your repository.

Loading history...
191
            if (!is_string($key)) {
192
                throw new \InvalidArgumentException('key in $query was not a string');
193
            }
194
195
            $totalQuery["payload.{$key}"] = $value;
196
        }
197
198
        return $this->collection->count($totalQuery);
199
    }
200
201
    /**
202
     * Acknowledge a message was processed and remove from queue.
203
     *
204
     * @param array $message message received from get()
205
     *
206
     * @return void
207
     *
208
     * @throws \InvalidArgumentException $message does not have a field "id" that is a MongoDB\BSON\ObjectID
209
     */
210
    final public function ack(array $message)
211
    {
212
        $id = null;
213
        if (array_key_exists('id', $message)) {
214
            $id = $message['id'];
215
        }
216
217
        if (!(is_a($id, 'MongoDB\BSON\ObjectID'))) {
218
            throw new \InvalidArgumentException('$message does not have a field "id" that is a ObjectID');
219
        }
220
221
        $this->collection->deleteOne(['_id' => $id]);
222
    }
223
224
    /**
225
     * Atomically acknowledge and send a message to the queue.
226
     *
227
     * @param array $message the message to ack received from get()
228
     * @param array $payload the data to store in the message to send. Data is handled same way
229
     *                       as \MongoDB\Collection::insertOne()
230
     * @param int $earliestGet earliest unix timestamp the message can be retreived.
231
     * @param float $priority priority for order out of get(). 0 is higher priority than 1
232
     * @param bool $newTimestamp true to give the payload a new timestamp or false to use given message timestamp
233
     *
234
     * @return void
235
     *
236
     * @throws \InvalidArgumentException $message does not have a field "id" that is a ObjectID
237
     * @throws \InvalidArgumentException $priority is NaN
238
     */
239
    final public function ackSend(
240
        array $message,
241
        array $payload,
242
        int $earliestGet = 0,
243
        float $priority = 0.0,
244
        bool $newTimestamp = true
245
    ) {
246
        $id = null;
247
        if (array_key_exists('id', $message)) {
248
            $id = $message['id'];
249
        }
250
251
        if (!(is_a($id, 'MongoDB\BSON\ObjectID'))) {
252
            throw new \InvalidArgumentException('$message does not have a field "id" that is a ObjectID');
253
        }
254
255
        if (is_nan($priority)) {
256
            throw new \InvalidArgumentException('$priority was NaN');
257
        }
258
259
        //Ensure $earliestGet is between 0 and MONGO_INT32_MAX
260
        $earliestGet = min(max(0, $earliestGet * 1000), self::MONGO_INT32_MAX);
261
262
        $toSet = [
263
            'payload' => $payload,
264
            'earliestGet' => new UTCDateTime($earliestGet),
265
            'priority' => $priority,
266
        ];
267
        if ($newTimestamp) {
268
            $toSet['created'] = new UTCDateTime((int)(microtime(true) * 1000));
269
        }
270
271
        //using upsert because if no documents found then the doc was removed (SHOULD ONLY HAPPEN BY SOMEONE MANUALLY)
272
        //so we can just send
273
        $this->collection->updateOne(['_id' => $id], ['$set' => $toSet], ['upsert' => true]);
274
    }
275
276
    /**
277
     * Requeue message to the queue. Same as ackSend() with the same message.
278
     *
279
     * @param array $message message received from get().
280
     * @param int $earliestGet earliest unix timestamp the message can be retreived.
281
     * @param float $priority priority for order out of get(). 0 is higher priority than 1
282
     * @param bool $newTimestamp true to give the payload a new timestamp or false to use given message timestamp
283
     *
284
     * @return void
285
     *
286
     * @throws \InvalidArgumentException $message does not have a field "id" that is a ObjectID
287
     * @throws \InvalidArgumentException priority is NaN
288
     */
289
    final public function requeue(
290
        array $message,
291
        int $earliestGet = 0,
292
        float $priority = 0.0,
293
        bool $newTimestamp = true
294
    ) {
295
        $forRequeue = $message;
296
        unset($forRequeue['id']);
297
        $this->ackSend($message, $forRequeue, $earliestGet, $priority, $newTimestamp);
298
    }
299
300
    /**
301
     * Send a message to the queue.
302
     *
303
     * @param array $payload the data to store in the message. Data is handled same way
304
     *                       as \MongoDB\Collection::insertOne()
305
     * @param int $earliestGet earliest unix timestamp the message can be retreived.
306
     * @param float $priority priority for order out of get(). 0 is higher priority than 1
307
     *
308
     * @return void
309
     *
310
     * @throws \InvalidArgumentException $priority is NaN
311
     */
312
    final public function send(array $payload, int $earliestGet = 0, float $priority = 0.0)
313
    {
314
        if (is_nan($priority)) {
315
            throw new \InvalidArgumentException('$priority was NaN');
316
        }
317
318
        //Ensure $earliestGet is between 0 and MONGO_INT32_MAX
319
        $earliestGet = min(max(0, $earliestGet * 1000), self::MONGO_INT32_MAX);
320
321
        $message = [
322
            'payload' => $payload,
323
            'earliestGet' => new UTCDateTime($earliestGet),
324
            'priority' => $priority,
325
            'created' => new UTCDateTime((int)(microtime(true) * 1000)),
326
        ];
327
328
        $this->collection->insertOne($message);
329
    }
330
331
    /**
332
     * Ensure index of correct specification and a unique name whether the specification or name already exist or not.
333
     * Will not create index if $index is a prefix of an existing index
334
     *
335
     * @param array $index index to create in same format as \MongoDB\Collection::createIndex()
336
     *
337
     * @return void
338
     *
339
     * @throws \Exception couldnt create index after 5 attempts
340
     */
341
    final private function ensureIndex(array $index)
342
    {
343
        if ($this->isIndexIncludedInExistingIndex($index)) {
344
            return;
345
        }
346
347
        for ($i = 0; $i < 5; ++$i) {
348
            if ($this->tryCreateIndex($index)) {
349
                return;
350
            }
351
        }
352
353
        throw new \Exception('couldnt create index after 5 attempts');
354
        //@codeCoverageIgnoreEnd
355
    }
356
357
    private function isIndexIncludedInExistingIndex(array $index) : bool
358
    {
359
        //if $index is a prefix of any existing index we are good
360
        foreach ($this->collection->listIndexes() as $existingIndex) {
361
            $slice = array_slice($existingIndex['key'], 0, count($index), true);
362
            if ($slice === $index) {
363
                return true;
364
            }
365
        }
366
367
        return false;
368
    }
369
370
    private function tryCreateIndex(array $index) : bool
371
    {
372
        for ($name = uniqid(); strlen($name) > 0; $name = substr($name, 0, -1)) {
373
            if ($this->tryCreateNamedIndex($index, $name)) {
374
                return true;
375
            }
376
        }
377
378
        return false;
379
    }
380
381
    private function tryCreateNamedIndex(array $index, string $name) : bool
382
    {
383
        //creating an index with same name and different spec does nothing.
384
        //creating an index with same spec and different name does nothing.
385
        //so we use any generated name, and then find the right spec after we have called,
386
        //and just go with that name.
387
        try {
388
            $this->collection->createIndex($index, ['name' => $name, 'background' => true]);
389
        } catch (\MongoDB\Exception\Exception $e) {
390
            //this happens when the name was too long, let continue
391
        }
392
393
        return $this->indexExists($index);
394
    }
395
396
    private function indexExists(array $index) : bool
397
    {
398
        foreach ($this->collection->listIndexes() as $existingIndex) {
399
            if ($existingIndex['key'] === $index) {
400
                return true;
401
            }
402
        }
403
404
        return false;
405
    }
406
407
    /**
408
     * Helper method to validate keys and values for the given sort array
409
     *
410
     * @param array  $sort             The proposed sort for a mongo index.
411
     * @param string $label            The name of the variable given to the public ensureXIndex method.
412
     * @param array  &$completedFields The final index array with payload. prefix added to fields.
413
     *
414
     * @return void
415
     */
416
    final private static function verifySort(array $sort, string $label, array &$completeFields)
417
    {
418
        foreach ($sort as $key => $value) {
419
            if (!is_string($key)) {
420
                throw new \InvalidArgumentException("key in \${$label} was not a string");
421
            }
422
423
            if ($value !== 1 && $value !== -1) {
424
                throw new \InvalidArgumentException(
425
                    "value of \${$label} is not 1 or -1 for ascending and descending"
426
                );
427
            }
428
429
            $completeFields["payload.{$key}"] = $value;
430
        }
431
    }
432
}
433