Test Failed
Pull Request — master (#60)
by Chad
02:04
created

AbstractQueue::ensureGetIndex()   A

Complexity

Conditions 1
Paths 1

Size

Total Lines 15
Code Lines 7

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
dl 0
loc 15
rs 9.4285
c 0
b 0
f 0
cc 1
eloc 7
nc 1
nop 2
1
<?php
2
/**
3
 * Defines the TraderInteractive\Mongo\Queue class.
4
 */
5
6
namespace TraderInteractive\Mongo;
7
8
use MongoDB\BSON\UTCDateTime;
9
10
/**
11
 * Abstraction of mongo db collection as priority queue.
12
 *
13
 * Tied priorities are ordered by time. So you may use a single priority for normal queuing (default args exist for
14
 * this purpose).  Using a random priority achieves random get()
15
 */
16
abstract class AbstractQueue implements QueueInterface
17
{
18
    /**
19
     * Maximum millisecond value to use for UTCDateTime creation.
20
     *
21
     * @var integer
22
     */
23
    const MONGO_INT32_MAX = PHP_INT_MAX;
24
25
    /**
26
     * mongo collection to use for queue.
27
     *
28
     * @var \MongoDB\Collection
29
     */
30
    protected $collection;
31
32
    /**
33
     * Ensure an index for the get() method.
34
     *
35
     * @param array $beforeSort Fields in get() call to index before the sort field in same format
36
     *                          as \MongoDB\Collection::ensureIndex()
37
     * @param array $afterSort  Fields in get() call to index after the sort field in same format as
38
     *                          \MongoDB\Collection::ensureIndex()
39
     *
40
     * @return void
41
     *
42
     * @throws \InvalidArgumentException value of $beforeSort or $afterSort is not 1 or -1 for ascending and descending
43
     * @throws \InvalidArgumentException key in $beforeSort or $afterSort was not a string
44
     */
45
    final public function ensureGetIndex(array $beforeSort = [], array $afterSort = [])
46
    {
47
        //using general rule: equality, sort, range or more equality tests in that order for index
48
        $completeFields = ['earliestGet' => 1];
49
50
        self::verifySort($beforeSort, 'beforeSort', $completeFields);
51
52
        $completeFields['priority'] = 1;
53
        $completeFields['created'] = 1;
54
55
        self::verifySort($afterSort, 'afterSort', $completeFields);
56
57
        //for the main query in get()
58
        $this->ensureIndex($completeFields);
59
    }
60
61
    /**
62
     * Ensure an index for the count() method.
63
     * Is a no-op if the generated index is a prefix of an existing one. If you have a similar ensureGetIndex call,
64
     * call it first.
65
     *
66
     * @param array $fields fields in count() call to index in same format as \MongoDB\Collection::createIndex()
67
     * @param bool $includeRunning whether to include the running field in the index
68
     *
69
     * @return void
70
     *
71
     * @throws \InvalidArgumentException key in $fields was not a string
72
     * @throws \InvalidArgumentException value of $fields is not 1 or -1 for ascending and descending
73
     */
74
    final public function ensureCountIndex(array $fields, bool $includeRunning)
75
    {
76
        $completeFields = [];
77
78
        if ($includeRunning) {
79
            $completeFields['earliestGet'] = 1;
80
        }
81
82
        self::verifySort($fields, 'fields', $completeFields);
83
84
        $this->ensureIndex($completeFields);
85
    }
86
87
    /**
88
     * Get a non running message from the queue.
89
     *
90
     * @param array $query in same format as \MongoDB\Collection::find() where top level fields do not contain
91
     *                     operators. Lower level fields can however. eg: valid {a: {$gt: 1}, "b.c": 3},
92
     *                     invalid {$and: [{...}, {...}]}
93
     * @param int $runningResetDuration second duration the message can stay unacked before it resets and can be
94
     *                                  retreived again.
95
     * @param int $waitDurationInMillis millisecond duration to wait for a message.
96
     * @param int $pollDurationInMillis millisecond duration to wait between polls.
97
     *
98
     * @return array|null the message or null if one is not found
99
     *
100
     * @throws \InvalidArgumentException key in $query was not a string
101
     */
102
    final public function get(
103
        array $query,
104
        int $runningResetDuration,
105
        int $waitDurationInMillis = 3000,
106
        int $pollDurationInMillis = 200
107
    ) {
108
109
        $completeQuery = $this->buildPayloadQuery(
110
            ['earliestGet' => ['$lte' => new UTCDateTime((int)(microtime(true) * 1000))]],
111
            $query
112
        );
113
114
        $resetTimestamp = $this->calcuateResetTimestamp($runningResetDuration);
115
116
        $update = ['$set' => ['earliestGet' => new UTCDateTime($resetTimestamp)]];
117
118
        //ints overflow to floats, should be fine
119
        $end = microtime(true) + ($waitDurationInMillis / 1000.0);
120
        $sleepTime = $this->calculateSleepTime($pollDurationInMillis);
121
122
        while (true) {
123
            $message = [];
124
            if ($this->tryFindOneAndUpdate($completeQuery, $update, $message)) {
125
                return $message;
126
            }
127
128
            if (microtime(true) >= $end) {
129
                return null;
130
            }
131
132
            usleep($sleepTime);
133
        }
134
135
        //ignore since always return from the function from the while loop
136
        //@codeCoverageIgnoreStart
137
    }
138
    //@codeCoverageIgnoreEnd
139
140
    private function buildPayloadQuery(array $initialQuery, array $payloadQuery)
141
    {
142
        foreach ($payloadQuery as $key => $value) {
143
            if (!is_string($key)) {
144
                throw new \InvalidArgumentException('key in $query was not a string');
145
            }
146
147
            $initialQuery["payload.{$key}"] = $value;
148
        }
149
150
        return $initialQuery;
151
    }
152
153
    private function calculateSleepTime(int $pollDurationInMillis) : int
154
    {
155
        $pollDurationInMillis = max($pollDurationInMillis, 0);
156
        $sleepTime = $pollDurationInMillis * 1000;
157
        //ints overflow to floats and already checked $pollDurationInMillis was positive
158
        return is_int($sleepTime) ? $sleepTime : PHP_INT_MAX;
159
    }
160
161
    private function calcuateResetTimeStamp(int $runningResetDuration) : int
162
    {
163
        $resetTimestamp = time() + $runningResetDuration;
164
        //ints overflow to floats
165
        if (!is_int($resetTimestamp)) {
166
            $resetTimestamp = $runningResetDuration > 0 ? self::MONGO_INT32_MAX : 0;
167
        }
168
169
        return min(max(0, $resetTimestamp * 1000), self::MONGO_INT32_MAX);
170
    }
171
172
    private function tryFindOneAndUpdate(array $query, array $update, array &$queueMessage) : bool
0 ignored issues
show
Unused Code introduced by
The parameter $query is not used and could be removed.

This check looks from parameters that have been defined for a function or method, but which are not used in the method body.

Loading history...
173
    {
174
        $findOneAndUpdateOptions = ['sort' => ['priority' => 1, 'created' => 1]];
175
        $findOneOptions = ['typeMap' => ['root' => 'array', 'document' => 'array', 'array' => 'array']];
176
177
        $id = $this->getIdFromMessage(
178
            $this->collection->findOneAndUpdate($completeQuery, $update, $findOneAndUpdateOptions)
0 ignored issues
show
Bug introduced by
The variable $completeQuery does not exist. Did you forget to declare it?

This check marks access to variables or properties that have not been declared yet. While PHP has no explicit notion of declaring a variable, accessing it before a value is assigned to it is most likely a bug.

Loading history...
179
        );
180
181
        if ($id !== null) {
182
            // findOneAndUpdate does not correctly return result according to typeMap options so just refetch.
183
            $message = $this->collection->findOne(['_id' => $id], $findOneOptions);
184
            //id on left of union operator so a possible id in payload doesnt wipe it out the generated one
185
            $queueMessage = ['id' => $id] + $message['payload'];
186
            return true;
187
        }
188
189
        return false;
190
    }
191
192
    private function getIdFromMessage($message)
193
    {
194
        if (is_array($message)) {
195
            return array_key_exists('_id', $message) ? $message['_id'] : null;
196
        }
197
198
        if (is_object($message)) {
199
            return isset($message->_id) ? $message->_id : null;
200
        }
201
202
        return null;
203
    }
204
205
    /**
206
     * Count queue messages.
207
     *
208
     * @param array $query in same format as \MongoDB\Collection::find() where top level fields do not contain
209
     *                     operators. Lower level fields can however. eg: valid {a: {$gt: 1}, "b.c": 3},
210
     *                     invalid {$and: [{...}, {...}]}
211
     * @param bool|null $running query a running message or not or all
212
     *
213
     * @return int the count
214
     *
215
     * @throws \InvalidArgumentException key in $query was not a string
216
     */
217
    final public function count(array $query, bool $running = null) : int
218
    {
219
        $totalQuery = [];
220
221
        if ($running === true || $running === false) {
222
            $key = $running ? '$gt' : '$lte';
223
            $totalQuery['earliestGet'] = [$key => new UTCDateTime((int)(microtime(true) * 1000))];
224
        }
225
226
        return $this->collection->count($this->buildPayloadQuery($totalQuery, $query));
227
    }
228
229
    /**
230
     * Acknowledge a message was processed and remove from queue.
231
     *
232
     * @param array $message message received from get()
233
     *
234
     * @return void
235
     *
236
     * @throws \InvalidArgumentException $message does not have a field "id" that is a MongoDB\BSON\ObjectID
237
     */
238
    final public function ack(array $message)
239
    {
240
        $id = null;
241
        if (array_key_exists('id', $message)) {
242
            $id = $message['id'];
243
        }
244
245
        if (!(is_a($id, 'MongoDB\BSON\ObjectID'))) {
246
            throw new \InvalidArgumentException('$message does not have a field "id" that is a ObjectID');
247
        }
248
249
        $this->collection->deleteOne(['_id' => $id]);
250
    }
251
252
    /**
253
     * Atomically acknowledge and send a message to the queue.
254
     *
255
     * @param array $message the message to ack received from get()
256
     * @param array $payload the data to store in the message to send. Data is handled same way
257
     *                       as \MongoDB\Collection::insertOne()
258
     * @param int $earliestGet earliest unix timestamp the message can be retreived.
259
     * @param float $priority priority for order out of get(). 0 is higher priority than 1
260
     * @param bool $newTimestamp true to give the payload a new timestamp or false to use given message timestamp
261
     *
262
     * @return void
263
     *
264
     * @throws \InvalidArgumentException $message does not have a field "id" that is a ObjectID
265
     * @throws \InvalidArgumentException $priority is NaN
266
     */
267
    final public function ackSend(
268
        array $message,
269
        array $payload,
270
        int $earliestGet = 0,
271
        float $priority = 0.0,
272
        bool $newTimestamp = true
273
    ) {
274
        $id = null;
275
        if (array_key_exists('id', $message)) {
276
            $id = $message['id'];
277
        }
278
279
        if (!(is_a($id, 'MongoDB\BSON\ObjectID'))) {
280
            throw new \InvalidArgumentException('$message does not have a field "id" that is a ObjectID');
281
        }
282
283
        if (is_nan($priority)) {
284
            throw new \InvalidArgumentException('$priority was NaN');
285
        }
286
287
        //Ensure $earliestGet is between 0 and MONGO_INT32_MAX
288
        $earliestGet = min(max(0, $earliestGet * 1000), self::MONGO_INT32_MAX);
289
290
        $toSet = [
291
            'payload' => $payload,
292
            'earliestGet' => new UTCDateTime($earliestGet),
293
            'priority' => $priority,
294
        ];
295
        if ($newTimestamp) {
296
            $toSet['created'] = new UTCDateTime((int)(microtime(true) * 1000));
297
        }
298
299
        //using upsert because if no documents found then the doc was removed (SHOULD ONLY HAPPEN BY SOMEONE MANUALLY)
300
        //so we can just send
301
        $this->collection->updateOne(['_id' => $id], ['$set' => $toSet], ['upsert' => true]);
302
    }
303
304
    /**
305
     * Requeue message to the queue. Same as ackSend() with the same message.
306
     *
307
     * @param array $message message received from get().
308
     * @param int $earliestGet earliest unix timestamp the message can be retreived.
309
     * @param float $priority priority for order out of get(). 0 is higher priority than 1
310
     * @param bool $newTimestamp true to give the payload a new timestamp or false to use given message timestamp
311
     *
312
     * @return void
313
     *
314
     * @throws \InvalidArgumentException $message does not have a field "id" that is a ObjectID
315
     * @throws \InvalidArgumentException priority is NaN
316
     */
317
    final public function requeue(
318
        array $message,
319
        int $earliestGet = 0,
320
        float $priority = 0.0,
321
        bool $newTimestamp = true
322
    ) {
323
        $forRequeue = $message;
324
        unset($forRequeue['id']);
325
        $this->ackSend($message, $forRequeue, $earliestGet, $priority, $newTimestamp);
326
    }
327
328
    /**
329
     * Send a message to the queue.
330
     *
331
     * @param array $payload the data to store in the message. Data is handled same way
332
     *                       as \MongoDB\Collection::insertOne()
333
     * @param int $earliestGet earliest unix timestamp the message can be retreived.
334
     * @param float $priority priority for order out of get(). 0 is higher priority than 1
335
     *
336
     * @return void
337
     *
338
     * @throws \InvalidArgumentException $priority is NaN
339
     */
340
    final public function send(array $payload, int $earliestGet = 0, float $priority = 0.0)
341
    {
342
        if (is_nan($priority)) {
343
            throw new \InvalidArgumentException('$priority was NaN');
344
        }
345
346
        //Ensure $earliestGet is between 0 and MONGO_INT32_MAX
347
        $earliestGet = min(max(0, $earliestGet * 1000), self::MONGO_INT32_MAX);
348
349
        $message = [
350
            'payload' => $payload,
351
            'earliestGet' => new UTCDateTime($earliestGet),
352
            'priority' => $priority,
353
            'created' => new UTCDateTime((int)(microtime(true) * 1000)),
354
        ];
355
356
        $this->collection->insertOne($message);
357
    }
358
359
    /**
360
     * Ensure index of correct specification and a unique name whether the specification or name already exist or not.
361
     * Will not create index if $index is a prefix of an existing index
362
     *
363
     * @param array $index index to create in same format as \MongoDB\Collection::createIndex()
364
     *
365
     * @return void
366
     *
367
     * @throws \Exception couldnt create index after 5 attempts
368
     */
369
    final private function ensureIndex(array $index)
370
    {
371
        if ($this->isIndexIncludedInExistingIndex($index)) {
372
            return;
373
        }
374
375
        for ($i = 0; $i < 5; ++$i) {
376
            if ($this->tryCreateIndex($index)) {
377
                return;
378
            }
379
        }
380
381
        throw new \Exception('couldnt create index after 5 attempts');
382
        //@codeCoverageIgnoreEnd
383
    }
384
385
    private function isIndexIncludedInExistingIndex(array $index) : bool
386
    {
387
        //if $index is a prefix of any existing index we are good
388
        foreach ($this->collection->listIndexes() as $existingIndex) {
389
            $slice = array_slice($existingIndex['key'], 0, count($index), true);
390
            if ($slice === $index) {
391
                return true;
392
            }
393
        }
394
395
        return false;
396
    }
397
398
    private function tryCreateIndex(array $index) : bool
399
    {
400
        for ($name = uniqid(); strlen($name) > 0; $name = substr($name, 0, -1)) {
401
            if ($this->tryCreateNamedIndex($index, $name)) {
402
                return true;
403
            }
404
        }
405
406
        return false;
407
    }
408
409
    private function tryCreateNamedIndex(array $index, string $name) : bool
410
    {
411
        //creating an index with same name and different spec does nothing.
412
        //creating an index with same spec and different name does nothing.
413
        //so we use any generated name, and then find the right spec after we have called,
414
        //and just go with that name.
415
        try {
416
            $this->collection->createIndex($index, ['name' => $name, 'background' => true]);
417
        } catch (\MongoDB\Exception\Exception $e) {
418
            //this happens when the name was too long, let continue
419
        }
420
421
        return $this->indexExists($index);
422
    }
423
424
    private function indexExists(array $index) : bool
425
    {
426
        foreach ($this->collection->listIndexes() as $existingIndex) {
427
            if ($existingIndex['key'] === $index) {
428
                return true;
429
            }
430
        }
431
432
        return false;
433
    }
434
435
    /**
436
     * Helper method to validate keys and values for the given sort array
437
     *
438
     * @param array  $sort             The proposed sort for a mongo index.
439
     * @param string $label            The name of the variable given to the public ensureXIndex method.
440
     * @param array  &$completedFields The final index array with payload. prefix added to fields.
441
     *
442
     * @return void
443
     */
444
    final private static function verifySort(array $sort, string $label, array &$completeFields)
445
    {
446
        foreach ($sort as $key => $value) {
447
            if (!is_string($key)) {
448
                throw new \InvalidArgumentException("key in \${$label} was not a string");
449
            }
450
451
            if ($value !== 1 && $value !== -1) {
452
                throw new \InvalidArgumentException(
453
                    "value of \${$label} is not 1 or -1 for ascending and descending"
454
                );
455
            }
456
457
            $completeFields["payload.{$key}"] = $value;
458
        }
459
    }
460
}
461