Passed
Pull Request — master (#63)
by Chad
03:56 queued 01:17
created

Queue::getIdFromMessage()   B

Complexity

Conditions 5
Paths 5

Size

Total Lines 12
Code Lines 6

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
dl 0
loc 12
rs 8.8571
c 0
b 0
f 0
cc 5
eloc 6
nc 5
nop 1
1
<?php
2
/**
3
 * Defines the DominionEnterprises\Mongo\Queue class.
4
 */
5
6
namespace DominionEnterprises\Mongo;
7
8
use MongoDB\BSON\UTCDateTime;
9
10
/**
11
 * Abstraction of mongo db collection as priority queue.
12
 *
13
 * Tied priorities are ordered by time. So you may use a single priority for normal queuing (default args exist for
14
 * this purpose).  Using a random priority achieves random get()
15
 */
16
final class Queue implements QueueInterface
17
{
18
    /**
19
     * Maximum millisecond value to use for UTCDateTime creation.
20
     *
21
     * @var integer
22
     */
23
    const MONGO_INT32_MAX = PHP_INT_MAX;
24
25
    /**
26
     * mongo collection to use for queue.
27
     *
28
     * @var \MongoDB\Collection
29
     */
30
    private $collection;
31
32
    /**
33
     * Construct queue.
34
     *
35
     * @param \MongoDB\Collection|string $collectionOrUrl A MongoCollection instance or the mongo connection url.
36
     * @param string $db the mongo db name
37
     * @param string $collection the collection name to use for the queue
38
     *
39
     * @throws \InvalidArgumentException $collectionOrUrl, $db or $collection was not a string
40
     */
41
    public function __construct($collectionOrUrl, $db = null, $collection = null)
42
    {
43
        if ($collectionOrUrl instanceof \MongoDB\Collection) {
44
            $this->collection = $collectionOrUrl;
45
            return;
46
        }
47
48
        if (!is_string($collectionOrUrl)) {
49
            throw new \InvalidArgumentException('$collectionOrUrl was not a string');
50
        }
51
52
        if (!is_string($db)) {
53
            throw new \InvalidArgumentException('$db was not a string');
54
        }
55
56
        if (!is_string($collection)) {
57
            throw new \InvalidArgumentException('$collection was not a string');
58
        }
59
60
        $mongo = new \MongoDB\Client($collectionOrUrl, [], ['typeMap' => ['root' => 'array', 'document' => 'array', 'array' => 'array']]);
61
        $mongoDb = $mongo->selectDatabase($db);
62
        $this->collection = $mongoDb->selectCollection($collection);
63
    }
64
65
    /**
66
     * Ensure an index for the get() method.
67
     *
68
     * @param array $beforeSort Fields in get() call to index before the sort field in same format
69
     *                          as \MongoDB\Collection::ensureIndex()
70
     * @param array $afterSort  Fields in get() call to index after the sort field in same format as
71
     *                          \MongoDB\Collection::ensureIndex()
72
     *
73
     * @return void
74
     *
75
     * @throws \InvalidArgumentException value of $beforeSort or $afterSort is not 1 or -1 for ascending and descending
76
     * @throws \InvalidArgumentException key in $beforeSort or $afterSort was not a string
77
     */
78
    public function ensureGetIndex(array $beforeSort = [], array $afterSort = [])
79
    {
80
        //using general rule: equality, sort, range or more equality tests in that order for index
81
        $completeFields = ['earliestGet' => 1];
82
83
        self::verifySort($beforeSort, 'beforeSort', $completeFields);
84
85
        $completeFields['priority'] = 1;
86
        $completeFields['created'] = 1;
87
88
        self::verifySort($afterSort, 'afterSort', $completeFields);
89
90
        //for the main query in get()
91
        $this->ensureIndex($completeFields);
92
    }
93
94
    /**
95
     * Ensure an index for the count() method.
96
     * Is a no-op if the generated index is a prefix of an existing one. If you have a similar ensureGetIndex call,
97
     * call it first.
98
     *
99
     * @param array $fields fields in count() call to index in same format as \MongoDB\Collection::createIndex()
100
     * @param bool $includeRunning whether to include the running field in the index
101
     *
102
     * @return void
103
     *
104
     * @throws \InvalidArgumentException $includeRunning was not a boolean
105
     * @throws \InvalidArgumentException key in $fields was not a string
106
     * @throws \InvalidArgumentException value of $fields is not 1 or -1 for ascending and descending
107
     */
108
    public function ensureCountIndex(array $fields, $includeRunning)
109
    {
110
        if (!is_bool($includeRunning)) {
111
            throw new \InvalidArgumentException('$includeRunning was not a boolean');
112
        }
113
114
        $completeFields = [];
115
116
        if ($includeRunning) {
117
            $completeFields['earliestGet'] = 1;
118
        }
119
120
        self::verifySort($fields, 'fields', $completeFields);
121
122
        $this->ensureIndex($completeFields);
123
    }
124
125
    /**
126
     * Get a non running message from the queue.
127
     *
128
     * @param array $query in same format as \MongoDB\Collection::find() where top level fields do not contain operators.
129
     *                     Lower level fields can however. eg: valid {a: {$gt: 1}, "b.c": 3},
130
     *                     invalid {$and: [{...}, {...}]}
131
     * @param int $runningResetDuration second duration the message can stay unacked before it resets and can be
132
     *                                  retreived again.
133
     * @param int $waitDurationInMillis millisecond duration to wait for a message.
134
     * @param int $pollDurationInMillis millisecond duration to wait between polls.
135
     *
136
     * @return array|null the message or null if one is not found
137
     *
138
     * @throws \InvalidArgumentException $runningResetDuration, $waitDurationInMillis or $pollDurationInMillis was not
139
     *                                   an int
140
     * @throws \InvalidArgumentException key in $query was not a string
141
     */
142
    public function get(array $query, $runningResetDuration, $waitDurationInMillis = 3000, $pollDurationInMillis = 200)
143
    {
144
        if (!is_int($runningResetDuration)) {
145
            throw new \InvalidArgumentException('$runningResetDuration was not an int');
146
        }
147
148
        if (!is_int($waitDurationInMillis)) {
149
            throw new \InvalidArgumentException('$waitDurationInMillis was not an int');
150
        }
151
152
        if (!is_int($pollDurationInMillis)) {
153
            throw new \InvalidArgumentException('$pollDurationInMillis was not an int');
154
        }
155
156
        if ($pollDurationInMillis < 0) {
157
            $pollDurationInMillis = 0;
158
        }
159
160
        $completeQuery = ['earliestGet' => ['$lte' => new UTCDateTime((int)(microtime(true) * 1000))]];
161 View Code Duplication
        foreach ($query as $key => $value) {
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated across your project.

Duplicated code is one of the most pungent code smells. If you need to duplicate the same code in three or more different places, we strongly encourage you to look into extracting the code into a single class or operation.

You can also find more detailed suggestions in the “Code” section of your repository.

Loading history...
162
            if (!is_string($key)) {
163
                throw new \InvalidArgumentException('key in $query was not a string');
164
            }
165
166
            $completeQuery["payload.{$key}"] = $value;
167
        }
168
169
        $resetTimestamp = time() + $runningResetDuration;
170
        //ints overflow to floats
171
        if (!is_int($resetTimestamp)) {
172
            $resetTimestamp = $runningResetDuration > 0 ? self::MONGO_INT32_MAX : 0;
173
        }
174
175
        $resetTimestamp = min(max(0, $resetTimestamp * 1000), self::MONGO_INT32_MAX);
176
177
        $update = ['$set' => ['earliestGet' => new UTCDateTime($resetTimestamp)]];
178
        $findOneAndUpdateOptions = ['sort' => ['priority' => 1, 'created' => 1]];
179
        $findOneOptions = ['typeMap' => ['root' => 'array', 'document' => 'array', 'array' => 'array']];
180
181
        //ints overflow to floats, should be fine
182
        $end = microtime(true) + ($waitDurationInMillis / 1000.0);
183
184
        $sleepTime = $pollDurationInMillis * 1000;
185
        //ints overflow to floats and already checked $pollDurationInMillis was positive
186
        if (!is_int($sleepTime)) {
187
            //ignore since testing a giant sleep takes too long
188
            //@codeCoverageIgnoreStart
189
            $sleepTime = PHP_INT_MAX;
190
        }   //@codeCoverageIgnoreEnd
191
192
        while (true) {
193
            $id = $this->getIdFromMessage(
194
                $this->collection->findOneAndUpdate($completeQuery, $update, $findOneAndUpdateOptions)
195
            );
196
            if ($id !== null) {
197
                // findOneAndUpdate does not correctly return result according to typeMap options so just refetch.
198
                $message = $this->collection->findOne(['_id' => $id], $findOneOptions);
199
                //id on left of union operator so a possible id in payload doesnt wipe it out the generated one
200
                return ['id' => $id] + $message['payload'];
201
            }
202
203
            if (microtime(true) >= $end) {
204
                return null;
205
            }
206
207
            usleep($sleepTime);
208
        }
209
210
        //ignore since always return from the function from the while loop
211
        //@codeCoverageIgnoreStart
212
    }
213
    //@codeCoverageIgnoreEnd
214
215
    private function getIdFromMessage($message)
216
    {
217
        if (is_array($message)) {
218
            return array_key_exists('_id', $message) ? $message['_id'] : null;
219
        }
220
221
        if (is_object($message)) {
222
            return isset($message->_id) ? $message->_id : null;
223
        }
224
225
        return null;
226
    }
227
228
    /**
229
     * Count queue messages.
230
     *
231
     * @param array $query in same format as \MongoDB\Collection::find() where top level fields do not contain operators.
232
     * Lower level fields can however. eg: valid {a: {$gt: 1}, "b.c": 3}, invalid {$and: [{...}, {...}]}
233
     * @param bool|null $running query a running message or not or all
234
     *
235
     * @return int the count
236
     *
237
     * @throws \InvalidArgumentException $running was not null and not a bool
238
     * @throws \InvalidArgumentException key in $query was not a string
239
     */
240
    public function count(array $query, $running = null)
241
    {
242
        if ($running !== null && !is_bool($running)) {
243
            throw new \InvalidArgumentException('$running was not null and not a bool');
244
        }
245
246
        $totalQuery = [];
247
248
        if ($running === true || $running === false) {
249
            $key = $running ? '$gt' : '$lte';
250
            $totalQuery['earliestGet'] = [$key => new UTCDateTime((int)(microtime(true) * 1000))];
251
        }
252
253 View Code Duplication
        foreach ($query as $key => $value) {
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated across your project.

Duplicated code is one of the most pungent code smells. If you need to duplicate the same code in three or more different places, we strongly encourage you to look into extracting the code into a single class or operation.

You can also find more detailed suggestions in the “Code” section of your repository.

Loading history...
254
            if (!is_string($key)) {
255
                throw new \InvalidArgumentException('key in $query was not a string');
256
            }
257
258
            $totalQuery["payload.{$key}"] = $value;
259
        }
260
261
        return $this->collection->count($totalQuery);
262
    }
263
264
    /**
265
     * Acknowledge a message was processed and remove from queue.
266
     *
267
     * @param array $message message received from get()
268
     *
269
     * @return void
270
     *
271
     * @throws \InvalidArgumentException $message does not have a field "id" that is a MongoDB\BSON\ObjectID
272
     */
273
    public function ack(array $message)
274
    {
275
        $id = null;
276
        if (array_key_exists('id', $message)) {
277
            $id = $message['id'];
278
        }
279
280
        if (!(is_a($id, 'MongoDB\BSON\ObjectID'))) {
281
            throw new \InvalidArgumentException('$message does not have a field "id" that is a ObjectID');
282
        }
283
284
        $this->collection->deleteOne(['_id' => $id]);
285
    }
286
287
    /**
288
     * Atomically acknowledge and send a message to the queue.
289
     *
290
     * @param array $message the message to ack received from get()
291
     * @param array $payload the data to store in the message to send. Data is handled same way
292
     *                       as \MongoDB\Collection::insertOne()
293
     * @param int $earliestGet earliest unix timestamp the message can be retreived.
294
     * @param float $priority priority for order out of get(). 0 is higher priority than 1
295
     * @param bool $newTimestamp true to give the payload a new timestamp or false to use given message timestamp
296
     *
297
     * @return void
298
     *
299
     * @throws \InvalidArgumentException $message does not have a field "id" that is a ObjectID
300
     * @throws \InvalidArgumentException $earliestGet was not an int
301
     * @throws \InvalidArgumentException $priority was not a float
302
     * @throws \InvalidArgumentException $priority is NaN
303
     * @throws \InvalidArgumentException $newTimestamp was not a bool
304
     */
305
    public function ackSend(array $message, array $payload, $earliestGet = 0, $priority = 0.0, $newTimestamp = true)
306
    {
307
        $id = null;
308
        if (array_key_exists('id', $message)) {
309
            $id = $message['id'];
310
        }
311
312
        if (!(is_a($id, 'MongoDB\BSON\ObjectID'))) {
313
            throw new \InvalidArgumentException('$message does not have a field "id" that is a ObjectID');
314
        }
315
316
        if (!is_int($earliestGet)) {
317
            throw new \InvalidArgumentException('$earliestGet was not an int');
318
        }
319
320
        if (!is_float($priority)) {
321
            throw new \InvalidArgumentException('$priority was not a float');
322
        }
323
324
        if (is_nan($priority)) {
325
            throw new \InvalidArgumentException('$priority was NaN');
326
        }
327
328
        if ($newTimestamp !== true && $newTimestamp !== false) {
329
            throw new \InvalidArgumentException('$newTimestamp was not a bool');
330
        }
331
332
        //Ensure $earliestGet is between 0 and MONGO_INT32_MAX
333
        $earliestGet = min(max(0, $earliestGet * 1000), self::MONGO_INT32_MAX);
334
335
        $toSet = [
336
            'payload' => $payload,
337
            'earliestGet' => new UTCDateTime($earliestGet),
338
            'priority' => $priority,
339
        ];
340
        if ($newTimestamp) {
341
            $toSet['created'] = new UTCDateTime((int)(microtime(true) * 1000));
342
        }
343
344
        //using upsert because if no documents found then the doc was removed (SHOULD ONLY HAPPEN BY SOMEONE MANUALLY)
345
        //so we can just send
346
        $this->collection->updateOne(['_id' => $id], ['$set' => $toSet], ['upsert' => true]);
347
    }
348
349
    /**
350
     * Requeue message to the queue. Same as ackSend() with the same message.
351
     *
352
     * @param array $message message received from get().
353
     * @param int $earliestGet earliest unix timestamp the message can be retreived.
354
     * @param float $priority priority for order out of get(). 0 is higher priority than 1
355
     * @param bool $newTimestamp true to give the payload a new timestamp or false to use given message timestamp
356
     *
357
     * @return void
358
     *
359
     * @throws \InvalidArgumentException $message does not have a field "id" that is a ObjectID
360
     * @throws \InvalidArgumentException $earliestGet was not an int
361
     * @throws \InvalidArgumentException $priority was not a float
362
     * @throws \InvalidArgumentException priority is NaN
363
     * @throws \InvalidArgumentException $newTimestamp was not a bool
364
     */
365
    public function requeue(array $message, $earliestGet = 0, $priority = 0.0, $newTimestamp = true)
366
    {
367
        $forRequeue = $message;
368
        unset($forRequeue['id']);
369
        $this->ackSend($message, $forRequeue, $earliestGet, $priority, $newTimestamp);
370
    }
371
372
    /**
373
     * Send a message to the queue.
374
     *
375
     * @param array $payload the data to store in the message. Data is handled same way as \MongoDB\Collection::insertOne()
376
     * @param int $earliestGet earliest unix timestamp the message can be retreived.
377
     * @param float $priority priority for order out of get(). 0 is higher priority than 1
378
     *
379
     * @return void
380
     *
381
     * @throws \InvalidArgumentException $earliestGet was not an int
382
     * @throws \InvalidArgumentException $priority was not a float
383
     * @throws \InvalidArgumentException $priority is NaN
384
     */
385
    public function send(array $payload, $earliestGet = 0, $priority = 0.0)
386
    {
387
        if (!is_int($earliestGet)) {
388
            throw new \InvalidArgumentException('$earliestGet was not an int');
389
        }
390
391
        if (!is_float($priority)) {
392
            throw new \InvalidArgumentException('$priority was not a float');
393
        }
394
395
        if (is_nan($priority)) {
396
            throw new \InvalidArgumentException('$priority was NaN');
397
        }
398
399
        //Ensure $earliestGet is between 0 and MONGO_INT32_MAX
400
        $earliestGet = min(max(0, $earliestGet * 1000), self::MONGO_INT32_MAX);
401
402
        $message = [
403
            'payload' => $payload,
404
            'earliestGet' => new UTCDateTime($earliestGet),
405
            'priority' => $priority,
406
            'created' => new UTCDateTime((int)(microtime(true) * 1000)),
407
        ];
408
409
        $this->collection->insertOne($message);
410
    }
411
412
    /**
413
     * Ensure index of correct specification and a unique name whether the specification or name already exist or not.
414
     * Will not create index if $index is a prefix of an existing index
415
     *
416
     * @param array $index index to create in same format as \MongoDB\Collection::createIndex()
417
     *
418
     * @return void
419
     *
420
     * @throws \Exception couldnt create index after 5 attempts
421
     */
422
    private function ensureIndex(array $index)
423
    {
424
        //if $index is a prefix of any existing index we are good
425
        foreach ($this->collection->listIndexes() as $existingIndex) {
426
            $slice = array_slice($existingIndex['key'], 0, count($index), true);
427
            if ($slice === $index) {
428
                return;
429
            }
430
        }
431
432
        for ($i = 0; $i < 5; ++$i) {
433
            for ($name = uniqid(); strlen($name) > 0; $name = substr($name, 0, -1)) {
434
                //creating an index with same name and different spec does nothing.
435
                //creating an index with same spec and different name does nothing.
436
                //so we use any generated name, and then find the right spec after we have called,
437
                //and just go with that name.
438
                try {
439
                    $this->collection->createIndex($index, ['name' => $name, 'background' => true]);
440
                } catch (\MongoDB\Exception\Exception $e) {
441
                    //this happens when the name was too long, let continue
442
                }
443
444
                foreach ($this->collection->listIndexes() as $existingIndex) {
445
                    if ($existingIndex['key'] === $index) {
446
                        return;
447
                    }
448
                }
449
            }
450
        }
451
452
        throw new \Exception('couldnt create index after 5 attempts');
453
        //@codeCoverageIgnoreEnd
454
    }
455
456
    /**
457
     * Helper method to validate keys and values for the given sort array
458
     *
459
     * @param array  $sort             The proposed sort for a mongo index.
460
     * @param string $label            The name of the variable given to the public ensureXIndex method.
461
     * @param array  &$completedFields The final index array with payload. prefix added to fields.
462
     *
463
     * @return void
464
     */
465
    private static function verifySort(array $sort, $label, &$completeFields)
466
    {
467
        foreach ($sort as $key => $value) {
468
            if (!is_string($key)) {
469
                throw new \InvalidArgumentException("key in \${$label} was not a string");
470
            }
471
472
            if ($value !== 1 && $value !== -1) {
473
                throw new \InvalidArgumentException(
474
                    "value of \${$label} is not 1 or -1 for ascending and descending"
475
                );
476
            }
477
478
            $completeFields["payload.{$key}"] = $value;
479
        }
480
    }
481
}
482