Completed
Pull Request — master (#60)
by Chad
01:27
created

AbstractQueue   C

Complexity

Total Complexity 57

Size/Duplication

Total Lines 452
Duplicated Lines 0 %

Coupling/Cohesion

Components 1
Dependencies 1

Importance

Changes 0
Metric Value
wmc 57
lcom 1
cbo 1
dl 0
loc 452
rs 6.433
c 0
b 0
f 0

21 Methods

Rating   Name   Duplication   Size   Complexity  
A ensureGetIndex() 0 15 1
A ensureCountIndex() 0 12 2
B get() 0 36 4
A buildPayloadQuery() 0 12 3
A calculateSleepTime() 0 7 2
A calcuateResetTimeStamp() 0 10 3
A tryFindOneAndUpdate() 0 19 2
B getIdFromMessage() 0 12 5
A count() 0 11 4
A ack() 0 13 3
B ackSend() 0 25 2
A requeue() 0 10 1
A send() 0 18 2
A ensureIndex() 0 15 4
A isIndexIncludedInExistingIndex() 0 12 3
A tryCreateIndex() 0 10 3
A tryCreateNamedIndex() 0 14 2
A indexExists() 0 10 3
B verifySort() 0 16 5
A throwIfTrue() 0 10 2
A getEarliestGetAsUTCDateTime() 0 6 1

How to fix   Complexity   

Complex Class

Complex classes like AbstractQueue often do a lot of different things. To break such a class down, we need to identify a cohesive component within that class. A common approach to find such a component is to look for fields/methods that share the same prefixes, or suffixes. You can also have a look at the cohesion graph to spot any un-connected, or weakly-connected components.

Once you have determined the fields that belong together, you can apply the Extract Class refactoring. If the component makes sense as a sub-class, Extract Subclass is also a candidate, and is often faster.

While breaking up the class, it is a good idea to analyze how other classes use AbstractQueue, and based on these observations, apply Extract Interface, too.

1
<?php
2
/**
3
 * Defines the TraderInteractive\Mongo\Queue class.
4
 */
5
6
namespace TraderInteractive\Mongo;
7
8
use MongoDB\BSON\ObjectID;
9
use MongoDB\BSON\UTCDateTime;
10
11
/**
12
 * Abstraction of mongo db collection as priority queue.
13
 *
14
 * Tied priorities are ordered by time. So you may use a single priority for normal queuing (default args exist for
15
 * this purpose).  Using a random priority achieves random get()
16
 */
17
abstract class AbstractQueue
18
{
19
    /**
20
     * Maximum millisecond value to use for UTCDateTime creation.
21
     *
22
     * @var integer
23
     */
24
    const MONGO_INT32_MAX = PHP_INT_MAX;
25
26
    /**
27
     * mongo collection to use for queue.
28
     *
29
     * @var \MongoDB\Collection
30
     */
31
    protected $collection;
32
33
    /**
34
     * Ensure an index for the get() method.
35
     *
36
     * @param array $beforeSort Fields in get() call to index before the sort field in same format
37
     *                          as \MongoDB\Collection::ensureIndex()
38
     * @param array $afterSort  Fields in get() call to index after the sort field in same format as
39
     *                          \MongoDB\Collection::ensureIndex()
40
     *
41
     * @return void
42
     *
43
     * @throws \InvalidArgumentException value of $beforeSort or $afterSort is not 1 or -1 for ascending and descending
44
     * @throws \InvalidArgumentException key in $beforeSort or $afterSort was not a string
45
     */
46
    final public function ensureGetIndex(array $beforeSort = [], array $afterSort = [])
47
    {
48
        //using general rule: equality, sort, range or more equality tests in that order for index
49
        $completeFields = ['earliestGet' => 1];
50
51
        self::verifySort($beforeSort, 'beforeSort', $completeFields);
52
53
        $completeFields['priority'] = 1;
54
        $completeFields['created'] = 1;
55
56
        self::verifySort($afterSort, 'afterSort', $completeFields);
57
58
        //for the main query in get()
59
        $this->ensureIndex($completeFields);
60
    }
61
62
    /**
63
     * Ensure an index for the count() method.
64
     * Is a no-op if the generated index is a prefix of an existing one. If you have a similar ensureGetIndex call,
65
     * call it first.
66
     *
67
     * @param array $fields fields in count() call to index in same format as \MongoDB\Collection::createIndex()
68
     * @param bool $includeRunning whether to include the running field in the index
69
     *
70
     * @return void
71
     *
72
     * @throws \InvalidArgumentException key in $fields was not a string
73
     * @throws \InvalidArgumentException value of $fields is not 1 or -1 for ascending and descending
74
     */
75
    final public function ensureCountIndex(array $fields, bool $includeRunning)
76
    {
77
        $completeFields = [];
78
79
        if ($includeRunning) {
80
            $completeFields['earliestGet'] = 1;
81
        }
82
83
        self::verifySort($fields, 'fields', $completeFields);
84
85
        $this->ensureIndex($completeFields);
86
    }
87
88
    /**
89
     * Get a non running message from the queue.
90
     *
91
     * @param array $query in same format as \MongoDB\Collection::find() where top level fields do not contain
92
     *                     operators. Lower level fields can however. eg: valid {a: {$gt: 1}, "b.c": 3},
93
     *                     invalid {$and: [{...}, {...}]}
94
     * @param int $runningResetDuration second duration the message can stay unacked before it resets and can be
95
     *                                  retreived again.
96
     * @param int $waitDurationInMillis millisecond duration to wait for a message.
97
     * @param int $pollDurationInMillis millisecond duration to wait between polls.
98
     *
99
     * @return array|null the message or null if one is not found
100
     *
101
     * @throws \InvalidArgumentException key in $query was not a string
102
     */
103
    final public function get(
104
        array $query,
105
        int $runningResetDuration,
106
        int $waitDurationInMillis = 3000,
107
        int $pollDurationInMillis = 200
108
    ) {
109
110
        $completeQuery = $this->buildPayloadQuery(
111
            ['earliestGet' => ['$lte' => new UTCDateTime((int)(microtime(true) * 1000))]],
112
            $query
113
        );
114
115
        $resetTimestamp = $this->calcuateResetTimestamp($runningResetDuration);
116
117
        $update = ['$set' => ['earliestGet' => new UTCDateTime($resetTimestamp)]];
118
119
        //ints overflow to floats, should be fine
120
        $end = microtime(true) + ($waitDurationInMillis / 1000.0);
121
        $sleepTime = $this->calculateSleepTime($pollDurationInMillis);
122
123
        while (true) {
124
            $message = [];
125
            if ($this->tryFindOneAndUpdate($completeQuery, $update, $message)) {
126
                return $message;
127
            }
128
129
            if (microtime(true) >= $end) {
130
                return null;
131
            }
132
133
            usleep($sleepTime);
134
        }
135
136
        //ignore since always return from the function from the while loop
137
        //@codeCoverageIgnoreStart
138
    }
139
    //@codeCoverageIgnoreEnd
140
141
    private function buildPayloadQuery(array $initialQuery, array $payloadQuery)
142
    {
143
        foreach ($payloadQuery as $key => $value) {
144
            if (!is_string($key)) {
145
                throw new \InvalidArgumentException('key in $query was not a string');
146
            }
147
148
            $initialQuery["payload.{$key}"] = $value;
149
        }
150
151
        return $initialQuery;
152
    }
153
154
    private function calculateSleepTime(int $pollDurationInMillis) : int
155
    {
156
        $pollDurationInMillis = max($pollDurationInMillis, 0);
157
        $sleepTime = $pollDurationInMillis * 1000;
158
        //ints overflow to floats and already checked $pollDurationInMillis was positive
159
        return is_int($sleepTime) ? $sleepTime : PHP_INT_MAX;
160
    }
161
162
    private function calcuateResetTimeStamp(int $runningResetDuration) : int
163
    {
164
        $resetTimestamp = time() + $runningResetDuration;
165
        //ints overflow to floats
166
        if (!is_int($resetTimestamp)) {
167
            $resetTimestamp = $runningResetDuration > 0 ? self::MONGO_INT32_MAX : 0;
168
        }
169
170
        return min(max(0, $resetTimestamp * 1000), self::MONGO_INT32_MAX);
171
    }
172
173
    private function tryFindOneAndUpdate(array $query, array $update, array &$queueMessage) : bool
174
    {
175
        $findOneAndUpdateOptions = ['sort' => ['priority' => 1, 'created' => 1]];
176
        $findOneOptions = ['typeMap' => ['root' => 'array', 'document' => 'array', 'array' => 'array']];
177
178
        $id = $this->getIdFromMessage(
179
            $this->collection->findOneAndUpdate($query, $update, $findOneAndUpdateOptions)
180
        );
181
182
        if ($id !== null) {
183
            // findOneAndUpdate does not correctly return result according to typeMap options so just refetch.
184
            $message = $this->collection->findOne(['_id' => $id], $findOneOptions);
185
            //id on left of union operator so a possible id in payload doesnt wipe it out the generated one
186
            $queueMessage = ['id' => $id] + $message['payload'];
187
            return true;
188
        }
189
190
        return false;
191
    }
192
193
    private function getIdFromMessage($message)
194
    {
195
        if (is_array($message)) {
196
            return array_key_exists('_id', $message) ? $message['_id'] : null;
197
        }
198
199
        if (is_object($message)) {
200
            return isset($message->_id) ? $message->_id : null;
201
        }
202
203
        return null;
204
    }
205
206
    /**
207
     * Count queue messages.
208
     *
209
     * @param array $query in same format as \MongoDB\Collection::find() where top level fields do not contain
210
     *                     operators. Lower level fields can however. eg: valid {a: {$gt: 1}, "b.c": 3},
211
     *                     invalid {$and: [{...}, {...}]}
212
     * @param bool|null $running query a running message or not or all
213
     *
214
     * @return int the count
215
     *
216
     * @throws \InvalidArgumentException key in $query was not a string
217
     */
218
    final public function count(array $query, bool $running = null) : int
219
    {
220
        $totalQuery = [];
221
222
        if ($running === true || $running === false) {
223
            $key = $running ? '$gt' : '$lte';
224
            $totalQuery['earliestGet'] = [$key => new UTCDateTime((int)(microtime(true) * 1000))];
225
        }
226
227
        return $this->collection->count($this->buildPayloadQuery($totalQuery, $query));
228
    }
229
230
    /**
231
     * Acknowledge a message was processed and remove from queue.
232
     *
233
     * @param array $message message received from get()
234
     *
235
     * @return void
236
     *
237
     * @throws \InvalidArgumentException $message does not have a field "id" that is a MongoDB\BSON\ObjectID
238
     */
239
    final public function ack(array $message)
240
    {
241
        $id = null;
242
        if (array_key_exists('id', $message)) {
243
            $id = $message['id'];
244
        }
245
246
        if (!(is_a($id, 'MongoDB\BSON\ObjectID'))) {
247
            throw new \InvalidArgumentException('$message does not have a field "id" that is a ObjectID');
248
        }
249
250
        $this->collection->deleteOne(['_id' => $id]);
251
    }
252
253
    /**
254
     * Atomically acknowledge and send a message to the queue.
255
     *
256
     * @param array $message the message to ack received from get()
257
     * @param array $payload the data to store in the message to send. Data is handled same way
258
     *                       as \MongoDB\Collection::insertOne()
259
     * @param int $earliestGet earliest unix timestamp the message can be retreived.
260
     * @param float $priority priority for order out of get(). 0 is higher priority than 1
261
     * @param bool $newTimestamp true to give the payload a new timestamp or false to use given message timestamp
262
     *
263
     * @return void
264
     *
265
     * @throws \InvalidArgumentException $message does not have a field "id" that is a ObjectID
266
     * @throws \InvalidArgumentException $priority is NaN
267
     */
268
    final public function ackSend(
269
        array $message,
270
        array $payload,
271
        int $earliestGet = 0,
272
        float $priority = 0.0,
273
        bool $newTimestamp = true
274
    ) {
275
        $id = $message['id'] ?? null;
276
277
        $this->throwIfTrue(!is_a($id, ObjectID::class), '$message does not have a field "id" that is a ObjectID');
278
        $this->throwIfTrue(is_nan($priority), '$priority was NaN');
279
280
        $toSet = [
281
            'payload' => $payload,
282
            'earliestGet' => $this->getEarliestGetAsUTCDateTime($earliestGet),
283
            'priority' => $priority,
284
        ];
285
        if ($newTimestamp) {
286
            $toSet['created'] = new UTCDateTime((int)(microtime(true) * 1000));
287
        }
288
289
        //using upsert because if no documents found then the doc was removed (SHOULD ONLY HAPPEN BY SOMEONE MANUALLY)
290
        //so we can just send
291
        $this->collection->updateOne(['_id' => $id], ['$set' => $toSet], ['upsert' => true]);
292
    }
293
294
    /**
295
     * Requeue message to the queue. Same as ackSend() with the same message.
296
     *
297
     * @param array $message message received from get().
298
     * @param int $earliestGet earliest unix timestamp the message can be retreived.
299
     * @param float $priority priority for order out of get(). 0 is higher priority than 1
300
     * @param bool $newTimestamp true to give the payload a new timestamp or false to use given message timestamp
301
     *
302
     * @return void
303
     *
304
     * @throws \InvalidArgumentException $message does not have a field "id" that is a ObjectID
305
     * @throws \InvalidArgumentException priority is NaN
306
     */
307
    final public function requeue(
308
        array $message,
309
        int $earliestGet = 0,
310
        float $priority = 0.0,
311
        bool $newTimestamp = true
312
    ) {
313
        $forRequeue = $message;
314
        unset($forRequeue['id']);
315
        $this->ackSend($message, $forRequeue, $earliestGet, $priority, $newTimestamp);
316
    }
317
318
    /**
319
     * Send a message to the queue.
320
     *
321
     * @param array $payload the data to store in the message. Data is handled same way
322
     *                       as \MongoDB\Collection::insertOne()
323
     * @param int $earliestGet earliest unix timestamp the message can be retreived.
324
     * @param float $priority priority for order out of get(). 0 is higher priority than 1
325
     *
326
     * @return void
327
     *
328
     * @throws \InvalidArgumentException $priority is NaN
329
     */
330
    final public function send(array $payload, int $earliestGet = 0, float $priority = 0.0)
331
    {
332
        if (is_nan($priority)) {
333
            throw new \InvalidArgumentException('$priority was NaN');
334
        }
335
336
        //Ensure $earliestGet is between 0 and MONGO_INT32_MAX
337
        $earliestGet = min(max(0, $earliestGet * 1000), self::MONGO_INT32_MAX);
338
339
        $message = [
340
            'payload' => $payload,
341
            'earliestGet' => new UTCDateTime($earliestGet),
342
            'priority' => $priority,
343
            'created' => new UTCDateTime((int)(microtime(true) * 1000)),
344
        ];
345
346
        $this->collection->insertOne($message);
347
    }
348
349
    /**
350
     * Ensure index of correct specification and a unique name whether the specification or name already exist or not.
351
     * Will not create index if $index is a prefix of an existing index
352
     *
353
     * @param array $index index to create in same format as \MongoDB\Collection::createIndex()
354
     *
355
     * @return void
356
     *
357
     * @throws \Exception couldnt create index after 5 attempts
358
     */
359
    final private function ensureIndex(array $index)
360
    {
361
        if ($this->isIndexIncludedInExistingIndex($index)) {
362
            return;
363
        }
364
365
        for ($i = 0; $i < 5; ++$i) {
366
            if ($this->tryCreateIndex($index)) {
367
                return;
368
            }
369
        }
370
371
        throw new \Exception('couldnt create index after 5 attempts');
372
        //@codeCoverageIgnoreEnd
373
    }
374
375
    private function isIndexIncludedInExistingIndex(array $index) : bool
376
    {
377
        //if $index is a prefix of any existing index we are good
378
        foreach ($this->collection->listIndexes() as $existingIndex) {
379
            $slice = array_slice($existingIndex['key'], 0, count($index), true);
380
            if ($slice === $index) {
381
                return true;
382
            }
383
        }
384
385
        return false;
386
    }
387
388
    private function tryCreateIndex(array $index) : bool
389
    {
390
        for ($name = uniqid(); strlen($name) > 0; $name = substr($name, 0, -1)) {
391
            if ($this->tryCreateNamedIndex($index, $name)) {
392
                return true;
393
            }
394
        }
395
396
        return false;
397
    }
398
399
    private function tryCreateNamedIndex(array $index, string $name) : bool
400
    {
401
        //creating an index with same name and different spec does nothing.
402
        //creating an index with same spec and different name does nothing.
403
        //so we use any generated name, and then find the right spec after we have called,
404
        //and just go with that name.
405
        try {
406
            $this->collection->createIndex($index, ['name' => $name, 'background' => true]);
407
        } catch (\MongoDB\Exception\Exception $e) {
408
            //this happens when the name was too long, let continue
409
        }
410
411
        return $this->indexExists($index);
412
    }
413
414
    private function indexExists(array $index) : bool
415
    {
416
        foreach ($this->collection->listIndexes() as $existingIndex) {
417
            if ($existingIndex['key'] === $index) {
418
                return true;
419
            }
420
        }
421
422
        return false;
423
    }
424
425
    /**
426
     * Helper method to validate keys and values for the given sort array
427
     *
428
     * @param array  $sort             The proposed sort for a mongo index.
429
     * @param string $label            The name of the variable given to the public ensureXIndex method.
430
     * @param array  &$completedFields The final index array with payload. prefix added to fields.
431
     *
432
     * @return void
433
     */
434
    final private static function verifySort(array $sort, string $label, array &$completeFields)
435
    {
436
        foreach ($sort as $key => $value) {
437
            if (!is_string($key)) {
438
                throw new \InvalidArgumentException("key in \${$label} was not a string");
439
            }
440
441
            if ($value !== 1 && $value !== -1) {
442
                throw new \InvalidArgumentException(
443
                    "value of \${$label} is not 1 or -1 for ascending and descending"
444
                );
445
            }
446
447
            $completeFields["payload.{$key}"] = $value;
448
        }
449
    }
450
451
    private function throwIfTrue(
452
        bool $condition,
453
        string $message,
454
        string $exceptionClass = '\\InvalidArgumentException'
455
    ) {
456
        if ($condition === true) {
457
            $reflectionClass = new \ReflectionClass($exceptionClass);
458
            throw $reflectionClass->newInstanceArgs([$message]);
459
        }
460
    }
461
462
    private function getEarliestGetAsUTCDateTime(int $timestamp) : UTCDateTime
463
    {
464
        //Ensure $earliestGet is between 0 and MONGO_INT32_MAX
465
        $earliestGet = min(max(0, $timestamp * 1000), self::MONGO_INT32_MAX);
466
        return new UTCDateTime($earliestGet);
467
    }
468
}
469