Completed
Pull Request — master (#60)
by Chad
01:21
created

AbstractQueue   B

Complexity

Total Complexity 43

Size/Duplication

Total Lines 396
Duplicated Lines 0 %

Coupling/Cohesion

Components 1
Dependencies 2

Importance

Changes 0
Metric Value
wmc 43
lcom 1
cbo 2
dl 0
loc 396
rs 8.3157
c 0
b 0
f 0

19 Methods

Rating   Name   Duplication   Size   Complexity  
A ensureGetIndex() 0 15 1
A ensureCountIndex() 0 12 2
B get() 0 26 4
A count() 0 11 4
A ack() 0 4 1
A requeue() 0 11 1
A send() 0 12 1
A ensureIndex() 0 15 4
A buildPayloadQuery() 0 12 3
A calculateSleepTime() 0 7 2
A calculateEarliestGet() 0 6 1
A tryFindOneAndUpdate() 0 16 2
A isIndexIncludedInExistingIndex() 0 12 3
A tryCreateIndex() 0 10 3
A tryCreateNamedIndex() 0 14 2
A indexExists() 0 10 3
A verifySort() 0 12 3
A throwIfTrue() 0 10 2
A calculateEndTime() 0 5 1

How to fix   Complexity   

Complex Class

Complex classes like AbstractQueue often do a lot of different things. To break such a class down, we need to identify a cohesive component within that class. A common approach to find such a component is to look for fields/methods that share the same prefixes, or suffixes. You can also have a look at the cohesion graph to spot any un-connected, or weakly-connected components.

Once you have determined the fields that belong together, you can apply the Extract Class refactoring. If the component makes sense as a sub-class, Extract Subclass is also a candidate, and is often faster.

While breaking up the class, it is a good idea to analyze how other classes use AbstractQueue, and based on these observations, apply Extract Interface, too.

1
<?php
2
/**
3
 * Defines the TraderInteractive\Mongo\Queue class.
4
 */
5
6
namespace TraderInteractive\Mongo;
7
8
use ArrayObject;
9
use MongoDB\BSON\ObjectID;
10
use MongoDB\BSON\UTCDateTime;
11
use MongoDB\Operation\FindOneAndUpdate;
12
13
/**
14
 * Abstraction of mongo db collection as priority queue.
15
 *
16
 * Tied priorities are ordered by time. So you may use a single priority for normal queuing (default args exist for
17
 * this purpose).  Using a random priority achieves random get()
18
 */
19
abstract class AbstractQueue
20
{
21
    /**
22
     * Maximum millisecond value to use for UTCDateTime creation.
23
     *
24
     * @var integer
25
     */
26
    const MONGO_INT32_MAX = PHP_INT_MAX;
27
28
    /**
29
     * mongo collection to use for queue.
30
     *
31
     * @var \MongoDB\Collection
32
     */
33
    protected $collection;
34
35
    /**
36
     * @var array
37
     */
38
    const FIND_ONE_AND_UPDATE_OPTIONS = [
39
        'sort' => ['priority' => 1, 'created' => 1],
40
        'typeMap' => ['root' => 'array', 'document' => 'array', 'array' => 'array'],
41
        'returnDocument' => FindOneAndUpdate::RETURN_DOCUMENT_AFTER,
42
    ];
43
44
    /**
45
     * @var integer
46
     */
47
    const DEFAULT_MAX_NUMBER_OF_MESSAGES = 1;
48
49
    /**
50
     * @var integer
51
     */
52
    const DEFAULT_RUNNING_RESET_DURATION = 600000;
53
54
    /**
55
     * @var integer
56
     */
57
    const DEFAULT_WAIT_DURATION_IN_MILLISECONDS = 3000;
58
59
    /**
60
     * @var integer
61
     */
62
    const DEFAULT_POLL_DURATION_IN_MILLISECONDS = 200;
63
64
    /**
65
     * @var array
66
     */
67
    const DEFAULT_GET_OPTIONS = [
68
       'maxNumberOfMessages' => self::DEFAULT_MAX_NUMBER_OF_MESSAGES,
69
       'runningResetDuration' => self::DEFAULT_RUNNING_RESET_DURATION,
70
       'waitDurationInMillis' => self::DEFAULT_WAIT_DURATION_IN_MILLISECONDS,
71
       'pollDurationInMillis' => self::DEFAULT_POLL_DURATION_IN_MILLISECONDS,
72
    ];
73
74
    /**
75
     * Ensure an index for the get() method.
76
     *
77
     * @param array $beforeSort Fields in get() call to index before the sort field in same format
78
     *                          as \MongoDB\Collection::ensureIndex()
79
     * @param array $afterSort  Fields in get() call to index after the sort field in same format as
80
     *                          \MongoDB\Collection::ensureIndex()
81
     *
82
     * @return void
83
     *
84
     * @throws \InvalidArgumentException value of $beforeSort or $afterSort is not 1 or -1 for ascending and descending
85
     * @throws \InvalidArgumentException key in $beforeSort or $afterSort was not a string
86
     */
87
    final public function ensureGetIndex(array $beforeSort = [], array $afterSort = [])
88
    {
89
        //using general rule: equality, sort, range or more equality tests in that order for index
90
        $completeFields = ['earliestGet' => 1];
91
92
        $this->verifySort($beforeSort, 'beforeSort', $completeFields);
93
94
        $completeFields['priority'] = 1;
95
        $completeFields['created'] = 1;
96
97
        $this->verifySort($afterSort, 'afterSort', $completeFields);
98
99
        //for the main query in get()
100
        $this->ensureIndex($completeFields);
101
    }
102
103
    /**
104
     * Ensure an index for the count() method.
105
     * Is a no-op if the generated index is a prefix of an existing one. If you have a similar ensureGetIndex call,
106
     * call it first.
107
     *
108
     * @param array $fields fields in count() call to index in same format as \MongoDB\Collection::createIndex()
109
     * @param bool $includeRunning whether to include the running field in the index
110
     *
111
     * @return void
112
     *
113
     * @throws \InvalidArgumentException key in $fields was not a string
114
     * @throws \InvalidArgumentException value of $fields is not 1 or -1 for ascending and descending
115
     */
116
    final public function ensureCountIndex(array $fields, bool $includeRunning)
117
    {
118
        $completeFields = [];
119
120
        if ($includeRunning) {
121
            $completeFields['earliestGet'] = 1;
122
        }
123
124
        $this->verifySort($fields, 'fields', $completeFields);
125
126
        $this->ensureIndex($completeFields);
127
    }
128
129
    /**
130
     * Get a non running message from the queue.
131
     *
132
     * @param array $query in same format as \MongoDB\Collection::find() where top level fields do not contain
133
     *                     operators. Lower level fields can however. eg: valid {a: {$gt: 1}, "b.c": 3},
134
     *                     invalid {$and: [{...}, {...}]}
135
     * @param array $options Associative array of get options.
136
     *                           runningResetDuration => integer
137
     *                               The duration (in miiliseconds) that the received messages are hidden from
138
     *                               subsequent retrieve requests after being retrieved by a get() request.
139
     *                           waitDurationInMillis => integer
140
     *                               The duration (in milliseconds) for which the call will wait for a message to
141
     *                               arrive in the queue before returning. If a message is available, the call will
142
     *                               return sooner than WaitTimeSeconds.
143
     *                           pollDurationInMillis => integer
144
     *                               The millisecond duration to wait between polls.
145
     *                           maxNumberOfMessages => integer
146
     *                               The maximum number of messages to return with get(). All of the messages are not
147
     *                               necessarily returned.
148
     *
149
     * @return array Array of messages.
150
     *
151
     * @throws \InvalidArgumentException key in $query was not a string
152
     */
153
    final public function get(array $query, array $options = []) : array
154
    {
155
        $completeQuery = $this->buildPayloadQuery(
156
            ['earliestGet' => ['$lte' => new UTCDateTime((int)(microtime(true) * 1000))]],
157
            $query
158
        );
159
160
        $options += self::DEFAULT_GET_OPTIONS;
161
        $update = ['$set' => ['earliestGet' => $this->calculateEarliestGet($options['runningResetDuration'])]];
162
        $end = $this->calculateEndTime($options['waitDurationInMillis']);
163
        $sleepTime = $this->calculateSleepTime($options['pollDurationInMillis']);
164
        $messages = new ArrayObject();
165
        while (count($messages) < $options['maxNumberOfMessages']) {
166
            if ($this->tryFindOneAndUpdate($completeQuery, $update, $messages)) {
167
                continue;
168
            }
169
170
            if (microtime(true) < $end) {
171
                usleep($sleepTime);
172
            }
173
174
            break;
175
        }
176
177
        return $messages->getArrayCopy();
178
    }
179
180
    /**
181
     * Count queue messages.
182
     *
183
     * @param array $query in same format as \MongoDB\Collection::find() where top level fields do not contain
184
     *                     operators. Lower level fields can however. eg: valid {a: {$gt: 1}, "b.c": 3},
185
     *                     invalid {$and: [{...}, {...}]}
186
     * @param bool|null $running query a running message or not or all
187
     *
188
     * @return int the count
189
     *
190
     * @throws \InvalidArgumentException key in $query was not a string
191
     */
192
    final public function count(array $query, bool $running = null) : int
193
    {
194
        $totalQuery = [];
195
196
        if ($running === true || $running === false) {
197
            $key = $running ? '$gt' : '$lte';
198
            $totalQuery['earliestGet'] = [$key => new UTCDateTime((int)(microtime(true) * 1000))];
199
        }
200
201
        return $this->collection->count($this->buildPayloadQuery($totalQuery, $query));
202
    }
203
204
    /**
205
     * Acknowledge a message was processed and remove from queue.
206
     *
207
     * @param Message $message message received from get()
208
     *
209
     * @return void
210
     */
211
    final public function ack(Message $message)
212
    {
213
        $this->collection->deleteOne(['_id' => $message->getId()]);
214
    }
215
216
    /**
217
     * Atomically acknowledge and send a message to the queue.
218
     *
219
     * @param Message $message message received from get().
220
     *
221
     * @return void
222
     */
223
    final public function requeue(Message $message)
224
    {
225
        $set = [
226
            'payload' => $message->getPayload(),
227
            'earliestGet' => $message->getEarliestGet(),
228
            'priority' => $message->getPriority(),
229
            'created' => new UTCDateTime(),
230
        ];
231
232
        $this->collection->updateOne(['_id' => $message->getId()], ['$set' => $set], ['upsert' => true]);
233
    }
234
235
    /**
236
     * Send a message to the queue.
237
     *
238
     * @param Message $message The message to send.
239
     *
240
     * @return void
241
     */
242
    final public function send(Message $message)
243
    {
244
        $document = [
245
            '_id' => $message->getId(),
246
            'payload' => $message->getPayload(),
247
            'earliestGet' => $message->getEarliestGet(),
248
            'priority' => $message->getPriority(),
249
            'created' => new UTCDateTime(),
250
        ];
251
252
        $this->collection->insertOne($document);
253
    }
254
255
    /**
256
     * Ensure index of correct specification and a unique name whether the specification or name already exist or not.
257
     * Will not create index if $index is a prefix of an existing index
258
     *
259
     * @param array $index index to create in same format as \MongoDB\Collection::createIndex()
260
     *
261
     * @return void
262
     *
263
     * @throws \Exception couldnt create index after 5 attempts
264
     */
265
    final private function ensureIndex(array $index)
266
    {
267
        if ($this->isIndexIncludedInExistingIndex($index)) {
268
            return;
269
        }
270
271
        for ($i = 0; $i < 5; ++$i) {
272
            if ($this->tryCreateIndex($index)) {
273
                return;
274
            }
275
        }
276
277
        throw new \Exception('couldnt create index after 5 attempts');
278
        //@codeCoverageIgnoreEnd
279
    }
280
281
    private function buildPayloadQuery(array $initialQuery, array $payloadQuery)
282
    {
283
        foreach ($payloadQuery as $key => $value) {
284
            if (!is_string($key)) {
285
                throw new \InvalidArgumentException('key in $query was not a string');
286
            }
287
288
            $initialQuery["payload.{$key}"] = $value;
289
        }
290
291
        return $initialQuery;
292
    }
293
294
    private function calculateSleepTime(int $pollDurationInMillis) : int
295
    {
296
        $pollDurationInMillis = max($pollDurationInMillis, 0);
297
        $sleepTime = $pollDurationInMillis * 1000;
298
        //ints overflow to floats and already checked $pollDurationInMillis was positive
299
        return is_int($sleepTime) ? $sleepTime : PHP_INT_MAX;
300
    }
301
302
    private function calculateEarliestGet(int $runningResetDuration) : UTCDateTime
303
    {
304
        $resetTimestamp = time() + $runningResetDuration;
305
        //ints overflow to floats, max at PHP_INT_MAX
306
        return new UTCDateTime(min(max(0, $resetTimestamp * 1000), self::MONGO_INT32_MAX));
307
    }
308
309
    private function tryFindOneAndUpdate(array $query, array $update, ArrayObject $messages) : bool
310
    {
311
        $document = $this->collection->findOneAndUpdate($query, $update, self::FIND_ONE_AND_UPDATE_OPTIONS);
312
        if ($document === null) {
313
            return false;
314
        }
315
316
        $messages[] = new Message(
317
            $document['_id'],
318
            $document['payload'],
319
            $document['earliestGet'],
320
            $document['priority']
321
        );
322
323
        return true;
324
    }
325
326
    private function isIndexIncludedInExistingIndex(array $index) : bool
327
    {
328
        //if $index is a prefix of any existing index we are good
329
        foreach ($this->collection->listIndexes() as $existingIndex) {
330
            $slice = array_slice($existingIndex['key'], 0, count($index), true);
331
            if ($slice === $index) {
332
                return true;
333
            }
334
        }
335
336
        return false;
337
    }
338
339
    private function tryCreateIndex(array $index) : bool
340
    {
341
        for ($name = uniqid(); strlen($name) > 0; $name = substr($name, 0, -1)) {
342
            if ($this->tryCreateNamedIndex($index, $name)) {
343
                return true;
344
            }
345
        }
346
347
        return false;
348
    }
349
350
    private function tryCreateNamedIndex(array $index, string $name) : bool
351
    {
352
        //creating an index with same name and different spec does nothing.
353
        //creating an index with same spec and different name does nothing.
354
        //so we use any generated name, and then find the right spec after we have called,
355
        //and just go with that name.
356
        try {
357
            $this->collection->createIndex($index, ['name' => $name, 'background' => true]);
358
        } catch (\MongoDB\Exception\Exception $e) {
359
            //this happens when the name was too long, let continue
360
        }
361
362
        return $this->indexExists($index);
363
    }
364
365
    private function indexExists(array $index) : bool
366
    {
367
        foreach ($this->collection->listIndexes() as $existingIndex) {
368
            if ($existingIndex['key'] === $index) {
369
                return true;
370
            }
371
        }
372
373
        return false;
374
    }
375
376
    /**
377
     * Helper method to validate keys and values for the given sort array
378
     *
379
     * @param array  $sort             The proposed sort for a mongo index.
380
     * @param string $label            The name of the variable given to the public ensureXIndex method.
381
     * @param array  &$completedFields The final index array with payload. prefix added to fields.
382
     *
383
     * @return void
384
     */
385
    final private function verifySort(array $sort, string $label, array &$completeFields)
386
    {
387
        foreach ($sort as $key => $value) {
388
            $this->throwIfTrue(!is_string($key), "key in \${$label} was not a string");
389
            $this->throwIfTrue(
390
                $value !== 1 && $value !== -1,
391
                "value of \${$label} is not 1 or -1 for ascending and descending"
392
            );
393
394
            $completeFields["payload.{$key}"] = $value;
395
        }
396
    }
397
398
    private function throwIfTrue(
399
        bool $condition,
400
        string $message,
401
        string $exceptionClass = '\\InvalidArgumentException'
402
    ) {
403
        if ($condition === true) {
404
            $reflectionClass = new \ReflectionClass($exceptionClass);
405
            throw $reflectionClass->newInstanceArgs([$message]);
406
        }
407
    }
408
409
    private function calculateEndTime(int $waitDurationInMillis) : int
410
    {
411
        //ints overflow to floats, should be fine
412
        return microtime(true) + ($waitDurationInMillis / 1000.0);
413
    }
414
}
415