Completed
Pull Request — master (#60)
by Chad
01:20
created

AbstractQueue::throwIfTrue()   A

Complexity

Conditions 2
Paths 2

Size

Total Lines 10
Code Lines 7

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
dl 0
loc 10
rs 9.4285
c 0
b 0
f 0
cc 2
eloc 7
nc 2
nop 3
1
<?php
2
/**
3
 * Defines the TraderInteractive\Mongo\Queue class.
4
 */
5
6
namespace TraderInteractive\Mongo;
7
8
use MongoDB\BSON\ObjectID;
9
use MongoDB\BSON\UTCDateTime;
10
11
/**
12
 * Abstraction of mongo db collection as priority queue.
13
 *
14
 * Tied priorities are ordered by time. So you may use a single priority for normal queuing (default args exist for
15
 * this purpose).  Using a random priority achieves random get()
16
 */
17
abstract class AbstractQueue
18
{
19
    /**
20
     * Maximum millisecond value to use for UTCDateTime creation.
21
     *
22
     * @var integer
23
     */
24
    const MONGO_INT32_MAX = PHP_INT_MAX;
25
26
    /**
27
     * mongo collection to use for queue.
28
     *
29
     * @var \MongoDB\Collection
30
     */
31
    protected $collection;
32
33
    /**
34
     * Ensure an index for the get() method.
35
     *
36
     * @param array $beforeSort Fields in get() call to index before the sort field in same format
37
     *                          as \MongoDB\Collection::ensureIndex()
38
     * @param array $afterSort  Fields in get() call to index after the sort field in same format as
39
     *                          \MongoDB\Collection::ensureIndex()
40
     *
41
     * @return void
42
     *
43
     * @throws \InvalidArgumentException value of $beforeSort or $afterSort is not 1 or -1 for ascending and descending
44
     * @throws \InvalidArgumentException key in $beforeSort or $afterSort was not a string
45
     */
46
    final public function ensureGetIndex(array $beforeSort = [], array $afterSort = [])
47
    {
48
        //using general rule: equality, sort, range or more equality tests in that order for index
49
        $completeFields = ['earliestGet' => 1];
50
51
        self::verifySort($beforeSort, 'beforeSort', $completeFields);
52
53
        $completeFields['priority'] = 1;
54
        $completeFields['created'] = 1;
55
56
        self::verifySort($afterSort, 'afterSort', $completeFields);
57
58
        //for the main query in get()
59
        $this->ensureIndex($completeFields);
60
    }
61
62
    /**
63
     * Ensure an index for the count() method.
64
     * Is a no-op if the generated index is a prefix of an existing one. If you have a similar ensureGetIndex call,
65
     * call it first.
66
     *
67
     * @param array $fields fields in count() call to index in same format as \MongoDB\Collection::createIndex()
68
     * @param bool $includeRunning whether to include the running field in the index
69
     *
70
     * @return void
71
     *
72
     * @throws \InvalidArgumentException key in $fields was not a string
73
     * @throws \InvalidArgumentException value of $fields is not 1 or -1 for ascending and descending
74
     */
75
    final public function ensureCountIndex(array $fields, bool $includeRunning)
76
    {
77
        $completeFields = [];
78
79
        if ($includeRunning) {
80
            $completeFields['earliestGet'] = 1;
81
        }
82
83
        self::verifySort($fields, 'fields', $completeFields);
84
85
        $this->ensureIndex($completeFields);
86
    }
87
88
    /**
89
     * Get a non running message from the queue.
90
     *
91
     * @param array $query in same format as \MongoDB\Collection::find() where top level fields do not contain
92
     *                     operators. Lower level fields can however. eg: valid {a: {$gt: 1}, "b.c": 3},
93
     *                     invalid {$and: [{...}, {...}]}
94
     * @param int $runningResetDuration second duration the message can stay unacked before it resets and can be
95
     *                                  retreived again.
96
     * @param int $waitDurationInMillis millisecond duration to wait for a message.
97
     * @param int $pollDurationInMillis millisecond duration to wait between polls.
98
     *
99
     * @return array|null the message or null if one is not found
100
     *
101
     * @throws \InvalidArgumentException key in $query was not a string
102
     */
103
    final public function get(
104
        array $query,
105
        int $runningResetDuration,
106
        int $waitDurationInMillis = 3000,
107
        int $pollDurationInMillis = 200
108
    ) {
109
110
        $completeQuery = $this->buildPayloadQuery(
111
            ['earliestGet' => ['$lte' => new UTCDateTime((int)(microtime(true) * 1000))]],
112
            $query
113
        );
114
115
        $resetTimestamp = $this->calcuateResetTimestamp($runningResetDuration);
116
117
        $update = ['$set' => ['earliestGet' => new UTCDateTime($resetTimestamp)]];
118
119
        //ints overflow to floats, should be fine
120
        $end = microtime(true) + ($waitDurationInMillis / 1000.0);
121
        $sleepTime = $this->calculateSleepTime($pollDurationInMillis);
122
123
        while (true) {
124
            $message = [];
125
            if ($this->tryFindOneAndUpdate($completeQuery, $update, $message)) {
126
                return $message;
127
            }
128
129
            if (microtime(true) >= $end) {
130
                return null;
131
            }
132
133
            usleep($sleepTime);
134
        }
135
136
        //ignore since always return from the function from the while loop
137
        //@codeCoverageIgnoreStart
138
    }
139
    //@codeCoverageIgnoreEnd
140
141
    private function buildPayloadQuery(array $initialQuery, array $payloadQuery)
142
    {
143
        foreach ($payloadQuery as $key => $value) {
144
            if (!is_string($key)) {
145
                throw new \InvalidArgumentException('key in $query was not a string');
146
            }
147
148
            $initialQuery["payload.{$key}"] = $value;
149
        }
150
151
        return $initialQuery;
152
    }
153
154
    private function calculateSleepTime(int $pollDurationInMillis) : int
155
    {
156
        $pollDurationInMillis = max($pollDurationInMillis, 0);
157
        $sleepTime = $pollDurationInMillis * 1000;
158
        //ints overflow to floats and already checked $pollDurationInMillis was positive
159
        return is_int($sleepTime) ? $sleepTime : PHP_INT_MAX;
160
    }
161
162
    private function calcuateResetTimeStamp(int $runningResetDuration) : int
163
    {
164
        $resetTimestamp = time() + $runningResetDuration;
165
        //ints overflow to floats
166
        if (!is_int($resetTimestamp)) {
167
            $resetTimestamp = $runningResetDuration > 0 ? self::MONGO_INT32_MAX : 0;
168
        }
169
170
        return min(max(0, $resetTimestamp * 1000), self::MONGO_INT32_MAX);
171
    }
172
173
    private function tryFindOneAndUpdate(array $query, array $update, array &$queueMessage) : bool
174
    {
175
        $findOneAndUpdateOptions = ['sort' => ['priority' => 1, 'created' => 1]];
176
        $findOneOptions = ['typeMap' => ['root' => 'array', 'document' => 'array', 'array' => 'array']];
177
178
        $id = $this->getIdFromMessage(
179
            $this->collection->findOneAndUpdate($query, $update, $findOneAndUpdateOptions)
180
        );
181
182
        if ($id !== null) {
183
            // findOneAndUpdate does not correctly return result according to typeMap options so just refetch.
184
            $message = $this->collection->findOne(['_id' => $id], $findOneOptions);
185
            //id on left of union operator so a possible id in payload doesnt wipe it out the generated one
186
            $queueMessage = ['id' => $id] + $message['payload'];
187
            return true;
188
        }
189
190
        return false;
191
    }
192
193
    private function getIdFromMessage($message)
194
    {
195
        if (is_array($message)) {
196
            return array_key_exists('_id', $message) ? $message['_id'] : null;
197
        }
198
199
        if (is_object($message)) {
200
            return isset($message->_id) ? $message->_id : null;
201
        }
202
203
        return null;
204
    }
205
206
    /**
207
     * Count queue messages.
208
     *
209
     * @param array $query in same format as \MongoDB\Collection::find() where top level fields do not contain
210
     *                     operators. Lower level fields can however. eg: valid {a: {$gt: 1}, "b.c": 3},
211
     *                     invalid {$and: [{...}, {...}]}
212
     * @param bool|null $running query a running message or not or all
213
     *
214
     * @return int the count
215
     *
216
     * @throws \InvalidArgumentException key in $query was not a string
217
     */
218
    final public function count(array $query, bool $running = null) : int
219
    {
220
        $totalQuery = [];
221
222
        if ($running === true || $running === false) {
223
            $key = $running ? '$gt' : '$lte';
224
            $totalQuery['earliestGet'] = [$key => new UTCDateTime((int)(microtime(true) * 1000))];
225
        }
226
227
        return $this->collection->count($this->buildPayloadQuery($totalQuery, $query));
228
    }
229
230
    /**
231
     * Acknowledge a message was processed and remove from queue.
232
     *
233
     * @param array $message message received from get()
234
     *
235
     * @return void
236
     *
237
     * @throws \InvalidArgumentException $message does not have a field "id" that is a MongoDB\BSON\ObjectID
238
     */
239
    final public function ack(array $message)
240
    {
241
        $id = null;
242
        if (array_key_exists('id', $message)) {
243
            $id = $message['id'];
244
        }
245
246
        if (!(is_a($id, 'MongoDB\BSON\ObjectID'))) {
247
            throw new \InvalidArgumentException('$message does not have a field "id" that is a ObjectID');
248
        }
249
250
        $this->collection->deleteOne(['_id' => $id]);
251
    }
252
253
    /**
254
     * Atomically acknowledge and send a message to the queue.
255
     *
256
     * @param array $message the message to ack received from get()
257
     * @param array $payload the data to store in the message to send. Data is handled same way
258
     *                       as \MongoDB\Collection::insertOne()
259
     * @param int $earliestGet earliest unix timestamp the message can be retreived.
260
     * @param float $priority priority for order out of get(). 0 is higher priority than 1
261
     * @param bool $newTimestamp true to give the payload a new timestamp or false to use given message timestamp
262
     *
263
     * @return void
264
     *
265
     * @throws \InvalidArgumentException $message does not have a field "id" that is a ObjectID
266
     * @throws \InvalidArgumentException $priority is NaN
267
     */
268
    final public function ackSend(
269
        array $message,
270
        array $payload,
271
        int $earliestGet = 0,
272
        float $priority = 0.0,
273
        bool $newTimestamp = true
274
    ) {
275
        $id = $message['id'] ?? null;
276
277
        $this->throwIfTrue(!is_a($id, ObjectID::class), '$message does not have a field "id" that is a ObjectID');
278
        $this->throwIfTrue(is_nan($priority), '$priority was NaN');
279
280
        //Ensure $earliestGet is between 0 and MONGO_INT32_MAX
281
        $earliestGet = min(max(0, $earliestGet * 1000), self::MONGO_INT32_MAX);
282
283
        $toSet = [
284
            'payload' => $payload,
285
            'earliestGet' => new UTCDateTime($earliestGet),
286
            'priority' => $priority,
287
        ];
288
        if ($newTimestamp) {
289
            $toSet['created'] = new UTCDateTime((int)(microtime(true) * 1000));
290
        }
291
292
        //using upsert because if no documents found then the doc was removed (SHOULD ONLY HAPPEN BY SOMEONE MANUALLY)
293
        //so we can just send
294
        $this->collection->updateOne(['_id' => $id], ['$set' => $toSet], ['upsert' => true]);
295
    }
296
297
    /**
298
     * Requeue message to the queue. Same as ackSend() with the same message.
299
     *
300
     * @param array $message message received from get().
301
     * @param int $earliestGet earliest unix timestamp the message can be retreived.
302
     * @param float $priority priority for order out of get(). 0 is higher priority than 1
303
     * @param bool $newTimestamp true to give the payload a new timestamp or false to use given message timestamp
304
     *
305
     * @return void
306
     *
307
     * @throws \InvalidArgumentException $message does not have a field "id" that is a ObjectID
308
     * @throws \InvalidArgumentException priority is NaN
309
     */
310
    final public function requeue(
311
        array $message,
312
        int $earliestGet = 0,
313
        float $priority = 0.0,
314
        bool $newTimestamp = true
315
    ) {
316
        $forRequeue = $message;
317
        unset($forRequeue['id']);
318
        $this->ackSend($message, $forRequeue, $earliestGet, $priority, $newTimestamp);
319
    }
320
321
    /**
322
     * Send a message to the queue.
323
     *
324
     * @param array $payload the data to store in the message. Data is handled same way
325
     *                       as \MongoDB\Collection::insertOne()
326
     * @param int $earliestGet earliest unix timestamp the message can be retreived.
327
     * @param float $priority priority for order out of get(). 0 is higher priority than 1
328
     *
329
     * @return void
330
     *
331
     * @throws \InvalidArgumentException $priority is NaN
332
     */
333
    final public function send(array $payload, int $earliestGet = 0, float $priority = 0.0)
334
    {
335
        if (is_nan($priority)) {
336
            throw new \InvalidArgumentException('$priority was NaN');
337
        }
338
339
        //Ensure $earliestGet is between 0 and MONGO_INT32_MAX
340
        $earliestGet = min(max(0, $earliestGet * 1000), self::MONGO_INT32_MAX);
341
342
        $message = [
343
            'payload' => $payload,
344
            'earliestGet' => new UTCDateTime($earliestGet),
345
            'priority' => $priority,
346
            'created' => new UTCDateTime((int)(microtime(true) * 1000)),
347
        ];
348
349
        $this->collection->insertOne($message);
350
    }
351
352
    /**
353
     * Ensure index of correct specification and a unique name whether the specification or name already exist or not.
354
     * Will not create index if $index is a prefix of an existing index
355
     *
356
     * @param array $index index to create in same format as \MongoDB\Collection::createIndex()
357
     *
358
     * @return void
359
     *
360
     * @throws \Exception couldnt create index after 5 attempts
361
     */
362
    final private function ensureIndex(array $index)
363
    {
364
        if ($this->isIndexIncludedInExistingIndex($index)) {
365
            return;
366
        }
367
368
        for ($i = 0; $i < 5; ++$i) {
369
            if ($this->tryCreateIndex($index)) {
370
                return;
371
            }
372
        }
373
374
        throw new \Exception('couldnt create index after 5 attempts');
375
        //@codeCoverageIgnoreEnd
376
    }
377
378
    private function isIndexIncludedInExistingIndex(array $index) : bool
379
    {
380
        //if $index is a prefix of any existing index we are good
381
        foreach ($this->collection->listIndexes() as $existingIndex) {
382
            $slice = array_slice($existingIndex['key'], 0, count($index), true);
383
            if ($slice === $index) {
384
                return true;
385
            }
386
        }
387
388
        return false;
389
    }
390
391
    private function tryCreateIndex(array $index) : bool
392
    {
393
        for ($name = uniqid(); strlen($name) > 0; $name = substr($name, 0, -1)) {
394
            if ($this->tryCreateNamedIndex($index, $name)) {
395
                return true;
396
            }
397
        }
398
399
        return false;
400
    }
401
402
    private function tryCreateNamedIndex(array $index, string $name) : bool
403
    {
404
        //creating an index with same name and different spec does nothing.
405
        //creating an index with same spec and different name does nothing.
406
        //so we use any generated name, and then find the right spec after we have called,
407
        //and just go with that name.
408
        try {
409
            $this->collection->createIndex($index, ['name' => $name, 'background' => true]);
410
        } catch (\MongoDB\Exception\Exception $e) {
411
            //this happens when the name was too long, let continue
412
        }
413
414
        return $this->indexExists($index);
415
    }
416
417
    private function indexExists(array $index) : bool
418
    {
419
        foreach ($this->collection->listIndexes() as $existingIndex) {
420
            if ($existingIndex['key'] === $index) {
421
                return true;
422
            }
423
        }
424
425
        return false;
426
    }
427
428
    /**
429
     * Helper method to validate keys and values for the given sort array
430
     *
431
     * @param array  $sort             The proposed sort for a mongo index.
432
     * @param string $label            The name of the variable given to the public ensureXIndex method.
433
     * @param array  &$completedFields The final index array with payload. prefix added to fields.
434
     *
435
     * @return void
436
     */
437
    final private static function verifySort(array $sort, string $label, array &$completeFields)
438
    {
439
        foreach ($sort as $key => $value) {
440
            if (!is_string($key)) {
441
                throw new \InvalidArgumentException("key in \${$label} was not a string");
442
            }
443
444
            if ($value !== 1 && $value !== -1) {
445
                throw new \InvalidArgumentException(
446
                    "value of \${$label} is not 1 or -1 for ascending and descending"
447
                );
448
            }
449
450
            $completeFields["payload.{$key}"] = $value;
451
        }
452
    }
453
454
    private function throwIfTrue(
455
        bool $condition,
456
        string $message,
457
        string $exceptionClass = '\\InvalidArgumentException'
458
    ) {
459
        if ($condition === true) {
460
            $reflectionClass = new \ReflectionClass($exceptionClass);
461
            throw $reflectionClass->newInstanceArgs([$message]);
462
        }
463
    }
464
465
}
466