Passed
Pull Request — master (#60)
by Chad
03:11
created

AbstractQueue::indexExists()   A

Complexity

Conditions 3
Paths 3

Size

Total Lines 10
Code Lines 5

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
dl 0
loc 10
rs 9.4285
c 0
b 0
f 0
cc 3
eloc 5
nc 3
nop 1
1
<?php
2
/**
3
 * Defines the TraderInteractive\Mongo\Queue class.
4
 */
5
6
namespace TraderInteractive\Mongo;
7
8
use MongoDB\BSON\UTCDateTime;
9
10
/**
11
 * Abstraction of mongo db collection as priority queue.
12
 *
13
 * Tied priorities are ordered by time. So you may use a single priority for normal queuing (default args exist for
14
 * this purpose).  Using a random priority achieves random get()
15
 */
16
abstract class AbstractQueue implements QueueInterface
17
{
18
    /**
19
     * Maximum millisecond value to use for UTCDateTime creation.
20
     *
21
     * @var integer
22
     */
23
    const MONGO_INT32_MAX = PHP_INT_MAX;
24
25
    /**
26
     * mongo collection to use for queue.
27
     *
28
     * @var \MongoDB\Collection
29
     */
30
    protected $collection;
31
32
    /**
33
     * Ensure an index for the get() method.
34
     *
35
     * @param array $beforeSort Fields in get() call to index before the sort field in same format
36
     *                          as \MongoDB\Collection::ensureIndex()
37
     * @param array $afterSort  Fields in get() call to index after the sort field in same format as
38
     *                          \MongoDB\Collection::ensureIndex()
39
     *
40
     * @return void
41
     *
42
     * @throws \InvalidArgumentException value of $beforeSort or $afterSort is not 1 or -1 for ascending and descending
43
     * @throws \InvalidArgumentException key in $beforeSort or $afterSort was not a string
44
     */
45
    final public function ensureGetIndex(array $beforeSort = [], array $afterSort = [])
46
    {
47
        //using general rule: equality, sort, range or more equality tests in that order for index
48
        $completeFields = ['earliestGet' => 1];
49
50
        self::verifySort($beforeSort, 'beforeSort', $completeFields);
51
52
        $completeFields['priority'] = 1;
53
        $completeFields['created'] = 1;
54
55
        self::verifySort($afterSort, 'afterSort', $completeFields);
56
57
        //for the main query in get()
58
        $this->ensureIndex($completeFields);
59
    }
60
61
    /**
62
     * Ensure an index for the count() method.
63
     * Is a no-op if the generated index is a prefix of an existing one. If you have a similar ensureGetIndex call,
64
     * call it first.
65
     *
66
     * @param array $fields fields in count() call to index in same format as \MongoDB\Collection::createIndex()
67
     * @param bool $includeRunning whether to include the running field in the index
68
     *
69
     * @return void
70
     *
71
     * @throws \InvalidArgumentException key in $fields was not a string
72
     * @throws \InvalidArgumentException value of $fields is not 1 or -1 for ascending and descending
73
     */
74
    final public function ensureCountIndex(array $fields, bool $includeRunning)
75
    {
76
        $completeFields = [];
77
78
        if ($includeRunning) {
79
            $completeFields['earliestGet'] = 1;
80
        }
81
82
        self::verifySort($fields, 'fields', $completeFields);
83
84
        $this->ensureIndex($completeFields);
85
    }
86
87
    /**
88
     * Get a non running message from the queue.
89
     *
90
     * @param array $query in same format as \MongoDB\Collection::find() where top level fields do not contain
91
     *                     operators. Lower level fields can however. eg: valid {a: {$gt: 1}, "b.c": 3},
92
     *                     invalid {$and: [{...}, {...}]}
93
     * @param int $runningResetDuration second duration the message can stay unacked before it resets and can be
94
     *                                  retreived again.
95
     * @param int $waitDurationInMillis millisecond duration to wait for a message.
96
     * @param int $pollDurationInMillis millisecond duration to wait between polls.
97
     *
98
     * @return array|null the message or null if one is not found
99
     *
100
     * @throws \InvalidArgumentException key in $query was not a string
101
     */
102
    final public function get(
103
        array $query,
104
        int $runningResetDuration,
105
        int $waitDurationInMillis = 3000,
106
        int $pollDurationInMillis = 200
107
    ) {
108
109
        $completeQuery = $this->buildPayloadQuery(
110
            ['earliestGet' => ['$lte' => new UTCDateTime((int)(microtime(true) * 1000))]],
111
            $query
112
        );
113
114
        $resetTimestamp = $this->calcuateResetTimestamp($runningResetDuration);
115
116
        $update = ['$set' => ['earliestGet' => new UTCDateTime($resetTimestamp)]];
117
118
        //ints overflow to floats, should be fine
119
        $end = microtime(true) + ($waitDurationInMillis / 1000.0);
120
        $sleepTime = $this->calculateSleepTime($pollDurationInMillis);
121
122
        while (true) {
123
            $message = [];
124
            if ($this->tryFindOneAndUpdate($completeQuery, $update, $message)) {
125
                return $message;
126
            }
127
128
            if (microtime(true) >= $end) {
129
                return null;
130
            }
131
132
            usleep($sleepTime);
133
        }
134
135
        //ignore since always return from the function from the while loop
136
        //@codeCoverageIgnoreStart
137
    }
138
    //@codeCoverageIgnoreEnd
139
140
    private function buildPayloadQuery(array $initialQuery, array $payloadQuery)
141
    {
142
        foreach ($payloadQuery as $key => $value) {
143
            if (!is_string($key)) {
144
                throw new \InvalidArgumentException('key in $query was not a string');
145
            }
146
147
            $initialQuery["payload.{$key}"] = $value;
148
        }
149
150
        return $initialQuery;
151
    }
152
153
    private function calculateSleepTime(int $pollDurationInMillis) : int
154
    {
155
        $pollDurationInMillis = max($pollDurationInMillis, 0);
156
        $sleepTime = $pollDurationInMillis * 1000;
157
        //ints overflow to floats and already checked $pollDurationInMillis was positive
158
        return is_int($sleepTime) ? $sleepTime : PHP_INT_MAX;
159
    }
160
161
    private function calcuateResetTimeStamp(int $runningResetDuration) : int
162
    {
163
        $resetTimestamp = time() + $runningResetDuration;
164
        //ints overflow to floats
165
        if (!is_int($resetTimestamp)) {
166
            $resetTimestamp = $runningResetDuration > 0 ? self::MONGO_INT32_MAX : 0;
167
        }
168
169
        return min(max(0, $resetTimestamp * 1000), self::MONGO_INT32_MAX);
170
    }
171
172
    private function tryFindOneAndUpdate(array $query, array $update, array &$queueMessage) : bool
173
    {
174
        $options = [
175
            'sort' => ['priority' => 1, 'created' => 1],
176
            'typeMap' => ['root' => 'array', 'document' => 'array', 'array' => 'array'],
177
        ];
178
179
        $message = $this->collection->findOneAndUpdate($query, $update, $options);
180
        //checking if _id exist because findAndModify doesnt seem to return null when it can't match the query on
181
        //older mongo extension
182
        if ($message !== null && array_key_exists('_id', $message)) {
183
            // findOneAndUpdate does not correctly return result according to typeMap options so just refetch.
184
            $message = $this->collection->findOne(['_id' => $message['_id']], ['typeMap' => $options['typeMap']]);
185
            //id on left of union operator so a possible id in payload doesnt wipe it out the generated one
186
            $queueMessage = ['id' => $message['_id']] + $message['payload'];
187
            return true;
188
        }
189
190
        return false;
191
    }
192
193
    /**
194
     * Count queue messages.
195
     *
196
     * @param array $query in same format as \MongoDB\Collection::find() where top level fields do not contain
197
     *                     operators. Lower level fields can however. eg: valid {a: {$gt: 1}, "b.c": 3},
198
     *                     invalid {$and: [{...}, {...}]}
199
     * @param bool|null $running query a running message or not or all
200
     *
201
     * @return int the count
202
     *
203
     * @throws \InvalidArgumentException key in $query was not a string
204
     */
205
    final public function count(array $query, bool $running = null) : int
206
    {
207
        $totalQuery = [];
208
209
        if ($running === true || $running === false) {
210
            $key = $running ? '$gt' : '$lte';
211
            $totalQuery['earliestGet'] = [$key => new UTCDateTime((int)(microtime(true) * 1000))];
212
        }
213
214
        return $this->collection->count($this->buildPayloadQuery($totalQuery, $query));
215
    }
216
217
    /**
218
     * Acknowledge a message was processed and remove from queue.
219
     *
220
     * @param array $message message received from get()
221
     *
222
     * @return void
223
     *
224
     * @throws \InvalidArgumentException $message does not have a field "id" that is a MongoDB\BSON\ObjectID
225
     */
226
    final public function ack(array $message)
227
    {
228
        $id = null;
229
        if (array_key_exists('id', $message)) {
230
            $id = $message['id'];
231
        }
232
233
        if (!(is_a($id, 'MongoDB\BSON\ObjectID'))) {
234
            throw new \InvalidArgumentException('$message does not have a field "id" that is a ObjectID');
235
        }
236
237
        $this->collection->deleteOne(['_id' => $id]);
238
    }
239
240
    /**
241
     * Atomically acknowledge and send a message to the queue.
242
     *
243
     * @param array $message the message to ack received from get()
244
     * @param array $payload the data to store in the message to send. Data is handled same way
245
     *                       as \MongoDB\Collection::insertOne()
246
     * @param int $earliestGet earliest unix timestamp the message can be retreived.
247
     * @param float $priority priority for order out of get(). 0 is higher priority than 1
248
     * @param bool $newTimestamp true to give the payload a new timestamp or false to use given message timestamp
249
     *
250
     * @return void
251
     *
252
     * @throws \InvalidArgumentException $message does not have a field "id" that is a ObjectID
253
     * @throws \InvalidArgumentException $priority is NaN
254
     */
255
    final public function ackSend(
256
        array $message,
257
        array $payload,
258
        int $earliestGet = 0,
259
        float $priority = 0.0,
260
        bool $newTimestamp = true
261
    ) {
262
        $id = null;
263
        if (array_key_exists('id', $message)) {
264
            $id = $message['id'];
265
        }
266
267
        if (!(is_a($id, 'MongoDB\BSON\ObjectID'))) {
268
            throw new \InvalidArgumentException('$message does not have a field "id" that is a ObjectID');
269
        }
270
271
        if (is_nan($priority)) {
272
            throw new \InvalidArgumentException('$priority was NaN');
273
        }
274
275
        //Ensure $earliestGet is between 0 and MONGO_INT32_MAX
276
        $earliestGet = min(max(0, $earliestGet * 1000), self::MONGO_INT32_MAX);
277
278
        $toSet = [
279
            'payload' => $payload,
280
            'earliestGet' => new UTCDateTime($earliestGet),
281
            'priority' => $priority,
282
        ];
283
        if ($newTimestamp) {
284
            $toSet['created'] = new UTCDateTime((int)(microtime(true) * 1000));
285
        }
286
287
        //using upsert because if no documents found then the doc was removed (SHOULD ONLY HAPPEN BY SOMEONE MANUALLY)
288
        //so we can just send
289
        $this->collection->updateOne(['_id' => $id], ['$set' => $toSet], ['upsert' => true]);
290
    }
291
292
    /**
293
     * Requeue message to the queue. Same as ackSend() with the same message.
294
     *
295
     * @param array $message message received from get().
296
     * @param int $earliestGet earliest unix timestamp the message can be retreived.
297
     * @param float $priority priority for order out of get(). 0 is higher priority than 1
298
     * @param bool $newTimestamp true to give the payload a new timestamp or false to use given message timestamp
299
     *
300
     * @return void
301
     *
302
     * @throws \InvalidArgumentException $message does not have a field "id" that is a ObjectID
303
     * @throws \InvalidArgumentException priority is NaN
304
     */
305
    final public function requeue(
306
        array $message,
307
        int $earliestGet = 0,
308
        float $priority = 0.0,
309
        bool $newTimestamp = true
310
    ) {
311
        $forRequeue = $message;
312
        unset($forRequeue['id']);
313
        $this->ackSend($message, $forRequeue, $earliestGet, $priority, $newTimestamp);
314
    }
315
316
    /**
317
     * Send a message to the queue.
318
     *
319
     * @param array $payload the data to store in the message. Data is handled same way
320
     *                       as \MongoDB\Collection::insertOne()
321
     * @param int $earliestGet earliest unix timestamp the message can be retreived.
322
     * @param float $priority priority for order out of get(). 0 is higher priority than 1
323
     *
324
     * @return void
325
     *
326
     * @throws \InvalidArgumentException $priority is NaN
327
     */
328
    final public function send(array $payload, int $earliestGet = 0, float $priority = 0.0)
329
    {
330
        if (is_nan($priority)) {
331
            throw new \InvalidArgumentException('$priority was NaN');
332
        }
333
334
        //Ensure $earliestGet is between 0 and MONGO_INT32_MAX
335
        $earliestGet = min(max(0, $earliestGet * 1000), self::MONGO_INT32_MAX);
336
337
        $message = [
338
            'payload' => $payload,
339
            'earliestGet' => new UTCDateTime($earliestGet),
340
            'priority' => $priority,
341
            'created' => new UTCDateTime((int)(microtime(true) * 1000)),
342
        ];
343
344
        $this->collection->insertOne($message);
345
    }
346
347
    /**
348
     * Ensure index of correct specification and a unique name whether the specification or name already exist or not.
349
     * Will not create index if $index is a prefix of an existing index
350
     *
351
     * @param array $index index to create in same format as \MongoDB\Collection::createIndex()
352
     *
353
     * @return void
354
     *
355
     * @throws \Exception couldnt create index after 5 attempts
356
     */
357
    final private function ensureIndex(array $index)
358
    {
359
        if ($this->isIndexIncludedInExistingIndex($index)) {
360
            return;
361
        }
362
363
        for ($i = 0; $i < 5; ++$i) {
364
            if ($this->tryCreateIndex($index)) {
365
                return;
366
            }
367
        }
368
369
        throw new \Exception('couldnt create index after 5 attempts');
370
        //@codeCoverageIgnoreEnd
371
    }
372
373
    private function isIndexIncludedInExistingIndex(array $index) : bool
374
    {
375
        //if $index is a prefix of any existing index we are good
376
        foreach ($this->collection->listIndexes() as $existingIndex) {
377
            $slice = array_slice($existingIndex['key'], 0, count($index), true);
378
            if ($slice === $index) {
379
                return true;
380
            }
381
        }
382
383
        return false;
384
    }
385
386
    private function tryCreateIndex(array $index) : bool
387
    {
388
        for ($name = uniqid(); strlen($name) > 0; $name = substr($name, 0, -1)) {
389
            if ($this->tryCreateNamedIndex($index, $name)) {
390
                return true;
391
            }
392
        }
393
394
        return false;
395
    }
396
397
    private function tryCreateNamedIndex(array $index, string $name) : bool
398
    {
399
        //creating an index with same name and different spec does nothing.
400
        //creating an index with same spec and different name does nothing.
401
        //so we use any generated name, and then find the right spec after we have called,
402
        //and just go with that name.
403
        try {
404
            $this->collection->createIndex($index, ['name' => $name, 'background' => true]);
405
        } catch (\MongoDB\Exception\Exception $e) {
406
            //this happens when the name was too long, let continue
407
        }
408
409
        return $this->indexExists($index);
410
    }
411
412
    private function indexExists(array $index) : bool
413
    {
414
        foreach ($this->collection->listIndexes() as $existingIndex) {
415
            if ($existingIndex['key'] === $index) {
416
                return true;
417
            }
418
        }
419
420
        return false;
421
    }
422
423
    /**
424
     * Helper method to validate keys and values for the given sort array
425
     *
426
     * @param array  $sort             The proposed sort for a mongo index.
427
     * @param string $label            The name of the variable given to the public ensureXIndex method.
428
     * @param array  &$completedFields The final index array with payload. prefix added to fields.
429
     *
430
     * @return void
431
     */
432
    final private static function verifySort(array $sort, string $label, array &$completeFields)
433
    {
434
        foreach ($sort as $key => $value) {
435
            if (!is_string($key)) {
436
                throw new \InvalidArgumentException("key in \${$label} was not a string");
437
            }
438
439
            if ($value !== 1 && $value !== -1) {
440
                throw new \InvalidArgumentException(
441
                    "value of \${$label} is not 1 or -1 for ascending and descending"
442
                );
443
            }
444
445
            $completeFields["payload.{$key}"] = $value;
446
        }
447
    }
448
}
449