Completed
Pull Request — master (#60)
by Chad
02:34
created

AbstractQueue::indexExists()   A

Complexity

Conditions 3
Paths 3

Size

Total Lines 10
Code Lines 5

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
dl 0
loc 10
rs 9.4285
c 0
b 0
f 0
cc 3
eloc 5
nc 3
nop 1
1
<?php
2
/**
3
 * Defines the TraderInteractive\Mongo\Queue class.
4
 */
5
6
namespace TraderInteractive\Mongo;
7
8
use MongoDB\BSON\UTCDateTime;
9
10
/**
11
 * Abstraction of mongo db collection as priority queue.
12
 *
13
 * Tied priorities are ordered by time. So you may use a single priority for normal queuing (default args exist for
14
 * this purpose).  Using a random priority achieves random get()
15
 */
16
abstract class AbstractQueue
17
{
18
    /**
19
     * Maximum millisecond value to use for UTCDateTime creation.
20
     *
21
     * @var integer
22
     */
23
    const MONGO_INT32_MAX = PHP_INT_MAX;
24
25
    /**
26
     * mongo collection to use for queue.
27
     *
28
     * @var \MongoDB\Collection
29
     */
30
    protected $collection;
31
32
    /**
33
     * Ensure an index for the get() method.
34
     *
35
     * @param array $beforeSort Fields in get() call to index before the sort field in same format
36
     *                          as \MongoDB\Collection::ensureIndex()
37
     * @param array $afterSort  Fields in get() call to index after the sort field in same format as
38
     *                          \MongoDB\Collection::ensureIndex()
39
     *
40
     * @return void
41
     *
42
     * @throws \InvalidArgumentException value of $beforeSort or $afterSort is not 1 or -1 for ascending and descending
43
     * @throws \InvalidArgumentException key in $beforeSort or $afterSort was not a string
44
     */
45
    final public function ensureGetIndex(array $beforeSort = [], array $afterSort = [])
46
    {
47
        //using general rule: equality, sort, range or more equality tests in that order for index
48
        $completeFields = ['earliestGet' => 1];
49
50
        self::verifySort($beforeSort, 'beforeSort', $completeFields);
51
52
        $completeFields['priority'] = 1;
53
        $completeFields['created'] = 1;
54
55
        self::verifySort($afterSort, 'afterSort', $completeFields);
56
57
        //for the main query in get()
58
        $this->ensureIndex($completeFields);
59
    }
60
61
    /**
62
     * Ensure an index for the count() method.
63
     * Is a no-op if the generated index is a prefix of an existing one. If you have a similar ensureGetIndex call,
64
     * call it first.
65
     *
66
     * @param array $fields fields in count() call to index in same format as \MongoDB\Collection::createIndex()
67
     * @param bool $includeRunning whether to include the running field in the index
68
     *
69
     * @return void
70
     *
71
     * @throws \InvalidArgumentException key in $fields was not a string
72
     * @throws \InvalidArgumentException value of $fields is not 1 or -1 for ascending and descending
73
     */
74
    final public function ensureCountIndex(array $fields, bool $includeRunning)
75
    {
76
        $completeFields = [];
77
78
        if ($includeRunning) {
79
            $completeFields['earliestGet'] = 1;
80
        }
81
82
        self::verifySort($fields, 'fields', $completeFields);
83
84
        $this->ensureIndex($completeFields);
85
    }
86
87
    /**
88
     * Get a non running message from the queue.
89
     *
90
     * @param array $query in same format as \MongoDB\Collection::find() where top level fields do not contain
91
     *                     operators. Lower level fields can however. eg: valid {a: {$gt: 1}, "b.c": 3},
92
     *                     invalid {$and: [{...}, {...}]}
93
     * @param int $runningResetDuration second duration the message can stay unacked before it resets and can be
94
     *                                  retreived again.
95
     * @param int $waitDurationInMillis millisecond duration to wait for a message.
96
     * @param int $pollDurationInMillis millisecond duration to wait between polls.
97
     *
98
     * @return array|null the message or null if one is not found
99
     *
100
     * @throws \InvalidArgumentException key in $query was not a string
101
     */
102
    final public function get(
103
        array $query,
104
        int $runningResetDuration,
105
        int $waitDurationInMillis = 3000,
106
        int $pollDurationInMillis = 200
107
    ) {
108
109
        $completeQuery = $this->buildPayloadQuery(
110
            ['earliestGet' => ['$lte' => new UTCDateTime((int)(microtime(true) * 1000))]],
111
            $query
112
        );
113
114
        $resetTimestamp = $this->calcuateResetTimestamp($runningResetDuration);
115
116
        $update = ['$set' => ['earliestGet' => new UTCDateTime($resetTimestamp)]];
117
118
        //ints overflow to floats, should be fine
119
        $end = microtime(true) + ($waitDurationInMillis / 1000.0);
120
        $sleepTime = $this->calculateSleepTime($pollDurationInMillis);
121
122
        while (true) {
123
            $message = [];
124
            if ($this->tryFindOneAndUpdate($completeQuery, $update, $message)) {
125
                return $message;
126
            }
127
128
            if (microtime(true) >= $end) {
129
                return null;
130
            }
131
132
            usleep($sleepTime);
133
        }
134
135
        //ignore since always return from the function from the while loop
136
        //@codeCoverageIgnoreStart
137
    }
138
    //@codeCoverageIgnoreEnd
139
140
    private function buildPayloadQuery(array $initialQuery, array $payloadQuery)
141
    {
142
        foreach ($payloadQuery as $key => $value) {
143
            if (!is_string($key)) {
144
                throw new \InvalidArgumentException('key in $query was not a string');
145
            }
146
147
            $initialQuery["payload.{$key}"] = $value;
148
        }
149
150
        return $initialQuery;
151
    }
152
153
    private function calculateSleepTime(int $pollDurationInMillis) : int
154
    {
155
        $pollDurationInMillis = max($pollDurationInMillis, 0);
156
        $sleepTime = $pollDurationInMillis * 1000;
157
        //ints overflow to floats and already checked $pollDurationInMillis was positive
158
        return is_int($sleepTime) ? $sleepTime : PHP_INT_MAX;
159
    }
160
161
    private function calcuateResetTimeStamp(int $runningResetDuration) : int
162
    {
163
        $resetTimestamp = time() + $runningResetDuration;
164
        //ints overflow to floats
165
        if (!is_int($resetTimestamp)) {
166
            $resetTimestamp = $runningResetDuration > 0 ? self::MONGO_INT32_MAX : 0;
167
        }
168
169
        return min(max(0, $resetTimestamp * 1000), self::MONGO_INT32_MAX);
170
    }
171
172
    private function tryFindOneAndUpdate(array $query, array $update, array &$queueMessage) : bool
173
    {
174
        $findOneAndUpdateOptions = ['sort' => ['priority' => 1, 'created' => 1]];
175
        $findOneOptions = ['typeMap' => ['root' => 'array', 'document' => 'array', 'array' => 'array']];
176
177
        $id = $this->getIdFromMessage(
178
            $this->collection->findOneAndUpdate($query, $update, $findOneAndUpdateOptions)
179
        );
180
181
        if ($id !== null) {
182
            // findOneAndUpdate does not correctly return result according to typeMap options so just refetch.
183
            $message = $this->collection->findOne(['_id' => $id], $findOneOptions);
184
            //id on left of union operator so a possible id in payload doesnt wipe it out the generated one
185
            $queueMessage = ['id' => $id] + $message['payload'];
186
            return true;
187
        }
188
189
        return false;
190
    }
191
192
    private function getIdFromMessage($message)
193
    {
194
        if (is_array($message)) {
195
            return array_key_exists('_id', $message) ? $message['_id'] : null;
196
        }
197
198
        if (is_object($message)) {
199
            return isset($message->_id) ? $message->_id : null;
200
        }
201
202
        return null;
203
    }
204
205
    /**
206
     * Count queue messages.
207
     *
208
     * @param array $query in same format as \MongoDB\Collection::find() where top level fields do not contain
209
     *                     operators. Lower level fields can however. eg: valid {a: {$gt: 1}, "b.c": 3},
210
     *                     invalid {$and: [{...}, {...}]}
211
     * @param bool|null $running query a running message or not or all
212
     *
213
     * @return int the count
214
     *
215
     * @throws \InvalidArgumentException key in $query was not a string
216
     */
217
    final public function count(array $query, bool $running = null) : int
218
    {
219
        $totalQuery = [];
220
221
        if ($running === true || $running === false) {
222
            $key = $running ? '$gt' : '$lte';
223
            $totalQuery['earliestGet'] = [$key => new UTCDateTime((int)(microtime(true) * 1000))];
224
        }
225
226
        return $this->collection->count($this->buildPayloadQuery($totalQuery, $query));
227
    }
228
229
    /**
230
     * Acknowledge a message was processed and remove from queue.
231
     *
232
     * @param array $message message received from get()
233
     *
234
     * @return void
235
     *
236
     * @throws \InvalidArgumentException $message does not have a field "id" that is a MongoDB\BSON\ObjectID
237
     */
238
    final public function ack(array $message)
239
    {
240
        $id = null;
241
        if (array_key_exists('id', $message)) {
242
            $id = $message['id'];
243
        }
244
245
        if (!(is_a($id, 'MongoDB\BSON\ObjectID'))) {
246
            throw new \InvalidArgumentException('$message does not have a field "id" that is a ObjectID');
247
        }
248
249
        $this->collection->deleteOne(['_id' => $id]);
250
    }
251
252
    /**
253
     * Atomically acknowledge and send a message to the queue.
254
     *
255
     * @param array $message the message to ack received from get()
256
     * @param array $payload the data to store in the message to send. Data is handled same way
257
     *                       as \MongoDB\Collection::insertOne()
258
     * @param int $earliestGet earliest unix timestamp the message can be retreived.
259
     * @param float $priority priority for order out of get(). 0 is higher priority than 1
260
     * @param bool $newTimestamp true to give the payload a new timestamp or false to use given message timestamp
261
     *
262
     * @return void
263
     *
264
     * @throws \InvalidArgumentException $message does not have a field "id" that is a ObjectID
265
     * @throws \InvalidArgumentException $priority is NaN
266
     */
267
    final public function ackSend(
268
        array $message,
269
        array $payload,
270
        int $earliestGet = 0,
271
        float $priority = 0.0,
272
        bool $newTimestamp = true
273
    ) {
274
        $id = null;
275
        if (array_key_exists('id', $message)) {
276
            $id = $message['id'];
277
        }
278
279
        if (!(is_a($id, 'MongoDB\BSON\ObjectID'))) {
280
            throw new \InvalidArgumentException('$message does not have a field "id" that is a ObjectID');
281
        }
282
283
        if (is_nan($priority)) {
284
            throw new \InvalidArgumentException('$priority was NaN');
285
        }
286
287
        //Ensure $earliestGet is between 0 and MONGO_INT32_MAX
288
        $earliestGet = min(max(0, $earliestGet * 1000), self::MONGO_INT32_MAX);
289
290
        $toSet = [
291
            'payload' => $payload,
292
            'earliestGet' => new UTCDateTime($earliestGet),
293
            'priority' => $priority,
294
        ];
295
        if ($newTimestamp) {
296
            $toSet['created'] = new UTCDateTime((int)(microtime(true) * 1000));
297
        }
298
299
        //using upsert because if no documents found then the doc was removed (SHOULD ONLY HAPPEN BY SOMEONE MANUALLY)
300
        //so we can just send
301
        $this->collection->updateOne(['_id' => $id], ['$set' => $toSet], ['upsert' => true]);
302
    }
303
304
    /**
305
     * Requeue message to the queue. Same as ackSend() with the same message.
306
     *
307
     * @param array $message message received from get().
308
     * @param int $earliestGet earliest unix timestamp the message can be retreived.
309
     * @param float $priority priority for order out of get(). 0 is higher priority than 1
310
     * @param bool $newTimestamp true to give the payload a new timestamp or false to use given message timestamp
311
     *
312
     * @return void
313
     *
314
     * @throws \InvalidArgumentException $message does not have a field "id" that is a ObjectID
315
     * @throws \InvalidArgumentException priority is NaN
316
     */
317
    final public function requeue(
318
        array $message,
319
        int $earliestGet = 0,
320
        float $priority = 0.0,
321
        bool $newTimestamp = true
322
    ) {
323
        $forRequeue = $message;
324
        unset($forRequeue['id']);
325
        $this->ackSend($message, $forRequeue, $earliestGet, $priority, $newTimestamp);
326
    }
327
328
    /**
329
     * Send a message to the queue.
330
     *
331
     * @param array $payload the data to store in the message. Data is handled same way
332
     *                       as \MongoDB\Collection::insertOne()
333
     * @param int $earliestGet earliest unix timestamp the message can be retreived.
334
     * @param float $priority priority for order out of get(). 0 is higher priority than 1
335
     *
336
     * @return void
337
     *
338
     * @throws \InvalidArgumentException $priority is NaN
339
     */
340
    final public function send(array $payload, int $earliestGet = 0, float $priority = 0.0)
341
    {
342
        if (is_nan($priority)) {
343
            throw new \InvalidArgumentException('$priority was NaN');
344
        }
345
346
        //Ensure $earliestGet is between 0 and MONGO_INT32_MAX
347
        $earliestGet = min(max(0, $earliestGet * 1000), self::MONGO_INT32_MAX);
348
349
        $message = [
350
            'payload' => $payload,
351
            'earliestGet' => new UTCDateTime($earliestGet),
352
            'priority' => $priority,
353
            'created' => new UTCDateTime((int)(microtime(true) * 1000)),
354
        ];
355
356
        $this->collection->insertOne($message);
357
    }
358
359
    /**
360
     * Ensure index of correct specification and a unique name whether the specification or name already exist or not.
361
     * Will not create index if $index is a prefix of an existing index
362
     *
363
     * @param array $index index to create in same format as \MongoDB\Collection::createIndex()
364
     *
365
     * @return void
366
     *
367
     * @throws \Exception couldnt create index after 5 attempts
368
     */
369
    final private function ensureIndex(array $index)
370
    {
371
        if ($this->isIndexIncludedInExistingIndex($index)) {
372
            return;
373
        }
374
375
        for ($i = 0; $i < 5; ++$i) {
376
            if ($this->tryCreateIndex($index)) {
377
                return;
378
            }
379
        }
380
381
        throw new \Exception('couldnt create index after 5 attempts');
382
        //@codeCoverageIgnoreEnd
383
    }
384
385
    private function isIndexIncludedInExistingIndex(array $index) : bool
386
    {
387
        //if $index is a prefix of any existing index we are good
388
        foreach ($this->collection->listIndexes() as $existingIndex) {
389
            $slice = array_slice($existingIndex['key'], 0, count($index), true);
390
            if ($slice === $index) {
391
                return true;
392
            }
393
        }
394
395
        return false;
396
    }
397
398
    private function tryCreateIndex(array $index) : bool
399
    {
400
        for ($name = uniqid(); strlen($name) > 0; $name = substr($name, 0, -1)) {
401
            if ($this->tryCreateNamedIndex($index, $name)) {
402
                return true;
403
            }
404
        }
405
406
        return false;
407
    }
408
409
    private function tryCreateNamedIndex(array $index, string $name) : bool
410
    {
411
        //creating an index with same name and different spec does nothing.
412
        //creating an index with same spec and different name does nothing.
413
        //so we use any generated name, and then find the right spec after we have called,
414
        //and just go with that name.
415
        try {
416
            $this->collection->createIndex($index, ['name' => $name, 'background' => true]);
417
        } catch (\MongoDB\Exception\Exception $e) {
418
            //this happens when the name was too long, let continue
419
        }
420
421
        return $this->indexExists($index);
422
    }
423
424
    private function indexExists(array $index) : bool
425
    {
426
        foreach ($this->collection->listIndexes() as $existingIndex) {
427
            if ($existingIndex['key'] === $index) {
428
                return true;
429
            }
430
        }
431
432
        return false;
433
    }
434
435
    /**
436
     * Helper method to validate keys and values for the given sort array
437
     *
438
     * @param array  $sort             The proposed sort for a mongo index.
439
     * @param string $label            The name of the variable given to the public ensureXIndex method.
440
     * @param array  &$completedFields The final index array with payload. prefix added to fields.
441
     *
442
     * @return void
443
     */
444
    final private static function verifySort(array $sort, string $label, array &$completeFields)
445
    {
446
        foreach ($sort as $key => $value) {
447
            if (!is_string($key)) {
448
                throw new \InvalidArgumentException("key in \${$label} was not a string");
449
            }
450
451
            if ($value !== 1 && $value !== -1) {
452
                throw new \InvalidArgumentException(
453
                    "value of \${$label} is not 1 or -1 for ascending and descending"
454
                );
455
            }
456
457
            $completeFields["payload.{$key}"] = $value;
458
        }
459
    }
460
}
461