Completed
Pull Request — master (#60)
by Chad
01:27
created

AbstractQueue   B

Complexity

Total Complexity 54

Size/Duplication

Total Lines 442
Duplicated Lines 0 %

Coupling/Cohesion

Components 1
Dependencies 1

Importance

Changes 0
Metric Value
wmc 54
lcom 1
cbo 1
dl 0
loc 442
rs 7.0642
c 0
b 0
f 0

21 Methods

Rating   Name   Duplication   Size   Complexity  
A ensureGetIndex() 0 15 1
A ensureCountIndex() 0 12 2
B get() 0 31 3
A count() 0 11 4
A ack() 0 13 3
B ackSend() 0 25 2
A requeue() 0 10 1
A send() 0 18 2
A ensureIndex() 0 15 4
A buildPayloadQuery() 0 12 3
A calculateSleepTime() 0 7 2
A calcuateResetTimeStamp() 0 10 3
A tryFindOneAndUpdate() 0 19 2
B getIdFromMessage() 0 12 5
A isIndexIncludedInExistingIndex() 0 12 3
A tryCreateIndex() 0 10 3
A tryCreateNamedIndex() 0 14 2
A indexExists() 0 10 3
A verifySort() 0 12 3
A throwIfTrue() 0 10 2
A getEarliestGetAsUTCDateTime() 0 6 1

How to fix   Complexity   

Complex Class

Complex classes like AbstractQueue often do a lot of different things. To break such a class down, we need to identify a cohesive component within that class. A common approach to find such a component is to look for fields/methods that share the same prefixes, or suffixes. You can also have a look at the cohesion graph to spot any un-connected, or weakly-connected components.

Once you have determined the fields that belong together, you can apply the Extract Class refactoring. If the component makes sense as a sub-class, Extract Subclass is also a candidate, and is often faster.

While breaking up the class, it is a good idea to analyze how other classes use AbstractQueue, and based on these observations, apply Extract Interface, too.

1
<?php
2
/**
3
 * Defines the TraderInteractive\Mongo\Queue class.
4
 */
5
6
namespace TraderInteractive\Mongo;
7
8
use MongoDB\BSON\ObjectID;
9
use MongoDB\BSON\UTCDateTime;
10
11
/**
12
 * Abstraction of mongo db collection as priority queue.
13
 *
14
 * Tied priorities are ordered by time. So you may use a single priority for normal queuing (default args exist for
15
 * this purpose).  Using a random priority achieves random get()
16
 */
17
abstract class AbstractQueue
18
{
19
    /**
20
     * Maximum millisecond value to use for UTCDateTime creation.
21
     *
22
     * @var integer
23
     */
24
    const MONGO_INT32_MAX = PHP_INT_MAX;
25
26
    /**
27
     * mongo collection to use for queue.
28
     *
29
     * @var \MongoDB\Collection
30
     */
31
    protected $collection;
32
33
    /**
34
     * Ensure an index for the get() method.
35
     *
36
     * @param array $beforeSort Fields in get() call to index before the sort field in same format
37
     *                          as \MongoDB\Collection::ensureIndex()
38
     * @param array $afterSort  Fields in get() call to index after the sort field in same format as
39
     *                          \MongoDB\Collection::ensureIndex()
40
     *
41
     * @return void
42
     *
43
     * @throws \InvalidArgumentException value of $beforeSort or $afterSort is not 1 or -1 for ascending and descending
44
     * @throws \InvalidArgumentException key in $beforeSort or $afterSort was not a string
45
     */
46
    final public function ensureGetIndex(array $beforeSort = [], array $afterSort = [])
47
    {
48
        //using general rule: equality, sort, range or more equality tests in that order for index
49
        $completeFields = ['earliestGet' => 1];
50
51
        $this->verifySort($beforeSort, 'beforeSort', $completeFields);
52
53
        $completeFields['priority'] = 1;
54
        $completeFields['created'] = 1;
55
56
        $this->verifySort($afterSort, 'afterSort', $completeFields);
57
58
        //for the main query in get()
59
        $this->ensureIndex($completeFields);
60
    }
61
62
    /**
63
     * Ensure an index for the count() method.
64
     * Is a no-op if the generated index is a prefix of an existing one. If you have a similar ensureGetIndex call,
65
     * call it first.
66
     *
67
     * @param array $fields fields in count() call to index in same format as \MongoDB\Collection::createIndex()
68
     * @param bool $includeRunning whether to include the running field in the index
69
     *
70
     * @return void
71
     *
72
     * @throws \InvalidArgumentException key in $fields was not a string
73
     * @throws \InvalidArgumentException value of $fields is not 1 or -1 for ascending and descending
74
     */
75
    final public function ensureCountIndex(array $fields, bool $includeRunning)
76
    {
77
        $completeFields = [];
78
79
        if ($includeRunning) {
80
            $completeFields['earliestGet'] = 1;
81
        }
82
83
        $this->verifySort($fields, 'fields', $completeFields);
84
85
        $this->ensureIndex($completeFields);
86
    }
87
88
    /**
89
     * Get a non running message from the queue.
90
     *
91
     * @param array $query in same format as \MongoDB\Collection::find() where top level fields do not contain
92
     *                     operators. Lower level fields can however. eg: valid {a: {$gt: 1}, "b.c": 3},
93
     *                     invalid {$and: [{...}, {...}]}
94
     * @param int $runningResetDuration second duration the message can stay unacked before it resets and can be
95
     *                                  retreived again.
96
     * @param int $waitDurationInMillis millisecond duration to wait for a message.
97
     * @param int $pollDurationInMillis millisecond duration to wait between polls.
98
     *
99
     * @return array|null the message or null if one is not found
100
     *
101
     * @throws \InvalidArgumentException key in $query was not a string
102
     */
103
    final public function get(
104
        array $query,
105
        int $runningResetDuration,
106
        int $waitDurationInMillis = 3000,
107
        int $pollDurationInMillis = 200
108
    ) {
109
110
        $completeQuery = $this->buildPayloadQuery(
111
            ['earliestGet' => ['$lte' => new UTCDateTime((int)(microtime(true) * 1000))]],
112
            $query
113
        );
114
115
        $resetTimestamp = $this->calcuateResetTimestamp($runningResetDuration);
116
117
        $update = ['$set' => ['earliestGet' => new UTCDateTime($resetTimestamp)]];
118
119
        //ints overflow to floats, should be fine
120
        $end = microtime(true) + ($waitDurationInMillis / 1000.0);
121
        $sleepTime = $this->calculateSleepTime($pollDurationInMillis);
122
123
        do {
124
            $message = [];
125
            if ($this->tryFindOneAndUpdate($completeQuery, $update, $message)) {
126
                return $message;
127
            }
128
129
            usleep($sleepTime);
130
        } while (microtime(true) < $end);
131
132
        return null;
133
    }
134
135
    /**
136
     * Count queue messages.
137
     *
138
     * @param array $query in same format as \MongoDB\Collection::find() where top level fields do not contain
139
     *                     operators. Lower level fields can however. eg: valid {a: {$gt: 1}, "b.c": 3},
140
     *                     invalid {$and: [{...}, {...}]}
141
     * @param bool|null $running query a running message or not or all
142
     *
143
     * @return int the count
144
     *
145
     * @throws \InvalidArgumentException key in $query was not a string
146
     */
147
    final public function count(array $query, bool $running = null) : int
148
    {
149
        $totalQuery = [];
150
151
        if ($running === true || $running === false) {
152
            $key = $running ? '$gt' : '$lte';
153
            $totalQuery['earliestGet'] = [$key => new UTCDateTime((int)(microtime(true) * 1000))];
154
        }
155
156
        return $this->collection->count($this->buildPayloadQuery($totalQuery, $query));
157
    }
158
159
    /**
160
     * Acknowledge a message was processed and remove from queue.
161
     *
162
     * @param array $message message received from get()
163
     *
164
     * @return void
165
     *
166
     * @throws \InvalidArgumentException $message does not have a field "id" that is a MongoDB\BSON\ObjectID
167
     */
168
    final public function ack(array $message)
169
    {
170
        $id = null;
171
        if (array_key_exists('id', $message)) {
172
            $id = $message['id'];
173
        }
174
175
        if (!(is_a($id, 'MongoDB\BSON\ObjectID'))) {
176
            throw new \InvalidArgumentException('$message does not have a field "id" that is a ObjectID');
177
        }
178
179
        $this->collection->deleteOne(['_id' => $id]);
180
    }
181
182
    /**
183
     * Atomically acknowledge and send a message to the queue.
184
     *
185
     * @param array $message the message to ack received from get()
186
     * @param array $payload the data to store in the message to send. Data is handled same way
187
     *                       as \MongoDB\Collection::insertOne()
188
     * @param int $earliestGet earliest unix timestamp the message can be retreived.
189
     * @param float $priority priority for order out of get(). 0 is higher priority than 1
190
     * @param bool $newTimestamp true to give the payload a new timestamp or false to use given message timestamp
191
     *
192
     * @return void
193
     *
194
     * @throws \InvalidArgumentException $message does not have a field "id" that is a ObjectID
195
     * @throws \InvalidArgumentException $priority is NaN
196
     */
197
    final public function ackSend(
198
        array $message,
199
        array $payload,
200
        int $earliestGet = 0,
201
        float $priority = 0.0,
202
        bool $newTimestamp = true
203
    ) {
204
        $id = $message['id'] ?? null;
205
206
        $this->throwIfTrue(!is_a($id, ObjectID::class), '$message does not have a field "id" that is a ObjectID');
207
        $this->throwIfTrue(is_nan($priority), '$priority was NaN');
208
209
        $toSet = [
210
            'payload' => $payload,
211
            'earliestGet' => $this->getEarliestGetAsUTCDateTime($earliestGet),
212
            'priority' => $priority,
213
        ];
214
        if ($newTimestamp) {
215
            $toSet['created'] = new UTCDateTime((int)(microtime(true) * 1000));
216
        }
217
218
        //using upsert because if no documents found then the doc was removed (SHOULD ONLY HAPPEN BY SOMEONE MANUALLY)
219
        //so we can just send
220
        $this->collection->updateOne(['_id' => $id], ['$set' => $toSet], ['upsert' => true]);
221
    }
222
223
    /**
224
     * Requeue message to the queue. Same as ackSend() with the same message.
225
     *
226
     * @param array $message message received from get().
227
     * @param int $earliestGet earliest unix timestamp the message can be retreived.
228
     * @param float $priority priority for order out of get(). 0 is higher priority than 1
229
     * @param bool $newTimestamp true to give the payload a new timestamp or false to use given message timestamp
230
     *
231
     * @return void
232
     *
233
     * @throws \InvalidArgumentException $message does not have a field "id" that is a ObjectID
234
     * @throws \InvalidArgumentException priority is NaN
235
     */
236
    final public function requeue(
237
        array $message,
238
        int $earliestGet = 0,
239
        float $priority = 0.0,
240
        bool $newTimestamp = true
241
    ) {
242
        $forRequeue = $message;
243
        unset($forRequeue['id']);
244
        $this->ackSend($message, $forRequeue, $earliestGet, $priority, $newTimestamp);
245
    }
246
247
    /**
248
     * Send a message to the queue.
249
     *
250
     * @param array $payload the data to store in the message. Data is handled same way
251
     *                       as \MongoDB\Collection::insertOne()
252
     * @param int $earliestGet earliest unix timestamp the message can be retreived.
253
     * @param float $priority priority for order out of get(). 0 is higher priority than 1
254
     *
255
     * @return void
256
     *
257
     * @throws \InvalidArgumentException $priority is NaN
258
     */
259
    final public function send(array $payload, int $earliestGet = 0, float $priority = 0.0)
260
    {
261
        if (is_nan($priority)) {
262
            throw new \InvalidArgumentException('$priority was NaN');
263
        }
264
265
        //Ensure $earliestGet is between 0 and MONGO_INT32_MAX
266
        $earliestGet = min(max(0, $earliestGet * 1000), self::MONGO_INT32_MAX);
267
268
        $message = [
269
            'payload' => $payload,
270
            'earliestGet' => new UTCDateTime($earliestGet),
271
            'priority' => $priority,
272
            'created' => new UTCDateTime((int)(microtime(true) * 1000)),
273
        ];
274
275
        $this->collection->insertOne($message);
276
    }
277
278
    /**
279
     * Ensure index of correct specification and a unique name whether the specification or name already exist or not.
280
     * Will not create index if $index is a prefix of an existing index
281
     *
282
     * @param array $index index to create in same format as \MongoDB\Collection::createIndex()
283
     *
284
     * @return void
285
     *
286
     * @throws \Exception couldnt create index after 5 attempts
287
     */
288
    final private function ensureIndex(array $index)
289
    {
290
        if ($this->isIndexIncludedInExistingIndex($index)) {
291
            return;
292
        }
293
294
        for ($i = 0; $i < 5; ++$i) {
295
            if ($this->tryCreateIndex($index)) {
296
                return;
297
            }
298
        }
299
300
        throw new \Exception('couldnt create index after 5 attempts');
301
        //@codeCoverageIgnoreEnd
302
    }
303
304
    private function buildPayloadQuery(array $initialQuery, array $payloadQuery)
305
    {
306
        foreach ($payloadQuery as $key => $value) {
307
            if (!is_string($key)) {
308
                throw new \InvalidArgumentException('key in $query was not a string');
309
            }
310
311
            $initialQuery["payload.{$key}"] = $value;
312
        }
313
314
        return $initialQuery;
315
    }
316
317
    private function calculateSleepTime(int $pollDurationInMillis) : int
318
    {
319
        $pollDurationInMillis = max($pollDurationInMillis, 0);
320
        $sleepTime = $pollDurationInMillis * 1000;
321
        //ints overflow to floats and already checked $pollDurationInMillis was positive
322
        return is_int($sleepTime) ? $sleepTime : PHP_INT_MAX;
323
    }
324
325
    private function calcuateResetTimeStamp(int $runningResetDuration) : int
326
    {
327
        $resetTimestamp = time() + $runningResetDuration;
328
        //ints overflow to floats
329
        if (!is_int($resetTimestamp)) {
330
            $resetTimestamp = $runningResetDuration > 0 ? self::MONGO_INT32_MAX : 0;
331
        }
332
333
        return min(max(0, $resetTimestamp * 1000), self::MONGO_INT32_MAX);
334
    }
335
336
    private function tryFindOneAndUpdate(array $query, array $update, array &$queueMessage) : bool
337
    {
338
        $findOneAndUpdateOptions = ['sort' => ['priority' => 1, 'created' => 1]];
339
        $findOneOptions = ['typeMap' => ['root' => 'array', 'document' => 'array', 'array' => 'array']];
340
341
        $id = $this->getIdFromMessage(
342
            $this->collection->findOneAndUpdate($query, $update, $findOneAndUpdateOptions)
343
        );
344
345
        if ($id !== null) {
346
            // findOneAndUpdate does not correctly return result according to typeMap options so just refetch.
347
            $message = $this->collection->findOne(['_id' => $id], $findOneOptions);
348
            //id on left of union operator so a possible id in payload doesnt wipe it out the generated one
349
            $queueMessage = ['id' => $id] + $message['payload'];
350
            return true;
351
        }
352
353
        return false;
354
    }
355
356
    private function getIdFromMessage($message)
357
    {
358
        if (is_array($message)) {
359
            return array_key_exists('_id', $message) ? $message['_id'] : null;
360
        }
361
362
        if (is_object($message)) {
363
            return isset($message->_id) ? $message->_id : null;
364
        }
365
366
        return null;
367
    }
368
369
    private function isIndexIncludedInExistingIndex(array $index) : bool
370
    {
371
        //if $index is a prefix of any existing index we are good
372
        foreach ($this->collection->listIndexes() as $existingIndex) {
373
            $slice = array_slice($existingIndex['key'], 0, count($index), true);
374
            if ($slice === $index) {
375
                return true;
376
            }
377
        }
378
379
        return false;
380
    }
381
382
    private function tryCreateIndex(array $index) : bool
383
    {
384
        for ($name = uniqid(); strlen($name) > 0; $name = substr($name, 0, -1)) {
385
            if ($this->tryCreateNamedIndex($index, $name)) {
386
                return true;
387
            }
388
        }
389
390
        return false;
391
    }
392
393
    private function tryCreateNamedIndex(array $index, string $name) : bool
394
    {
395
        //creating an index with same name and different spec does nothing.
396
        //creating an index with same spec and different name does nothing.
397
        //so we use any generated name, and then find the right spec after we have called,
398
        //and just go with that name.
399
        try {
400
            $this->collection->createIndex($index, ['name' => $name, 'background' => true]);
401
        } catch (\MongoDB\Exception\Exception $e) {
402
            //this happens when the name was too long, let continue
403
        }
404
405
        return $this->indexExists($index);
406
    }
407
408
    private function indexExists(array $index) : bool
409
    {
410
        foreach ($this->collection->listIndexes() as $existingIndex) {
411
            if ($existingIndex['key'] === $index) {
412
                return true;
413
            }
414
        }
415
416
        return false;
417
    }
418
419
    /**
420
     * Helper method to validate keys and values for the given sort array
421
     *
422
     * @param array  $sort             The proposed sort for a mongo index.
423
     * @param string $label            The name of the variable given to the public ensureXIndex method.
424
     * @param array  &$completedFields The final index array with payload. prefix added to fields.
425
     *
426
     * @return void
427
     */
428
    final private function verifySort(array $sort, string $label, array &$completeFields)
429
    {
430
        foreach ($sort as $key => $value) {
431
            $this->throwIfTrue(!is_string($key), "key in \${$label} was not a string");
432
            $this->throwIfTrue(
433
                $value !== 1 && $value !== -1,
434
                "value of \${$label} is not 1 or -1 for ascending and descending"
435
            );
436
437
            $completeFields["payload.{$key}"] = $value;
438
        }
439
    }
440
441
    private function throwIfTrue(
442
        bool $condition,
443
        string $message,
444
        string $exceptionClass = '\\InvalidArgumentException'
445
    ) {
446
        if ($condition === true) {
447
            $reflectionClass = new \ReflectionClass($exceptionClass);
448
            throw $reflectionClass->newInstanceArgs([$message]);
449
        }
450
    }
451
452
    private function getEarliestGetAsUTCDateTime(int $timestamp) : UTCDateTime
453
    {
454
        //Ensure $earliestGet is between 0 and MONGO_INT32_MAX
455
        $earliestGet = min(max(0, $timestamp * 1000), self::MONGO_INT32_MAX);
456
        return new UTCDateTime($earliestGet);
457
    }
458
}
459