Completed
Pull Request — master (#44)
by Chad
02:13
created

Queue::verifyQuery()   A

Complexity

Conditions 3
Paths 3

Size

Total Lines 10
Code Lines 5

Duplication

Lines 0
Ratio 0 %

Importance

Changes 1
Bugs 0 Features 0
Metric Value
c 1
b 0
f 0
dl 0
loc 10
rs 9.4286
cc 3
eloc 5
nc 3
nop 2
1
<?php
2
/**
3
 * Defines the DominionEnterprises\Mongo\Queue class.
4
 */
5
6
namespace DominionEnterprises\Mongo;
7
8
/**
9
 * Abstraction of mongo db collection as priority queue.
10
 *
11
 * Tied priorities are ordered by time. So you may use a single priority for normal queuing (default args exist for
12
 * this purpose).  Using a random priority achieves random get()
13
 */
14
final class Queue implements QueueInterface
15
{
16
    const MONGO_INT32_MAX = 2147483647;//2147483648 can overflow in php mongo without using the MongoInt64
17
18
    /**
19
     * mongo collection to use for queue.
20
     *
21
     * @var \MongoCollection
22
     */
23
    private $collection;
24
25
    /**
26
     * Construct queue.
27
     *
28
     * @param \MongoCollection|string $collectionOrUrl A MongoCollection instance or the mongo connection url.
29
     * @param string $db the mongo db name
30
     * @param string $collection the collection name to use for the queue
31
     *
32
     * @throws \InvalidArgumentException $collectionOrUrl, $db or $collection was not a string
33
     */
34
    public function __construct($collectionOrUrl, $db = null, $collection = null)
35
    {
36
        if ($collectionOrUrl instanceof \MongoCollection) {
37
            $this->collection = $collectionOrUrl;
38
            return;
39
        }
40
41
        if (!is_string($collectionOrUrl)) {
42
            throw new \InvalidArgumentException('$collectionOrUrl was not a string');
43
        }
44
45
        if (!is_string($db)) {
46
            throw new \InvalidArgumentException('$db was not a string');
47
        }
48
49
        if (!is_string($collection)) {
50
            throw new \InvalidArgumentException('$collection was not a string');
51
        }
52
53
        $mongo = new \MongoClient($collectionOrUrl);
54
        $mongoDb = $mongo->selectDB($db);
55
        $this->collection = $mongoDb->selectCollection($collection);
56
    }
57
58
    /**
59
     * Ensure an index for the get() method.
60
     *
61
     * @param array $beforeSort Fields in get() call to index before the sort field in same format
62
     *                          as \MongoCollection::ensureIndex()
63
     * @param array $afterSort  Fields in get() call to index after the sort field in same format as
64
     *                          \MongoCollection::ensureIndex()
65
     *
66
     * @return void
67
     *
68
     * @throws \InvalidArgumentException value of $beforeSort or $afterSort is not 1 or -1 for ascending and descending
69
     * @throws \InvalidArgumentException key in $beforeSort or $afterSort was not a string
70
     */
71
    public function ensureGetIndex(array $beforeSort = [], array $afterSort = [])
72
    {
73
        //using general rule: equality, sort, range or more equality tests in that order for index
74
        $completeFields = ['running' => 1];
75
76
        self::verifySort($beforeSort, 'beforeSort', $completeFields);
77
78
        $completeFields['priority'] = 1;
79
        $completeFields['created'] = 1;
80
81
        self::verifySort($afterSort, 'afterSort', $completeFields);
82
83
        $completeFields['earliestGet'] = 1;
84
85
        //for the main query in get()
86
        $this->ensureIndex($completeFields);
87
88
        //for the stuck messages query in get()
89
        $this->ensureIndex(['running' => 1, 'resetTimestamp' => 1]);
90
    }
91
92
    /**
93
     * Ensure an index for the count() method.
94
     * Is a no-op if the generated index is a prefix of an existing one. If you have a similar ensureGetIndex call,
95
     * call it first.
96
     *
97
     * @param array $fields fields in count() call to index in same format as \MongoCollection::ensureIndex()
98
     * @param bool $includeRunning whether to include the running field in the index
99
     *
100
     * @return void
101
     *
102
     * @throws \InvalidArgumentException $includeRunning was not a boolean
103
     * @throws \InvalidArgumentException key in $fields was not a string
104
     * @throws \InvalidArgumentException value of $fields is not 1 or -1 for ascending and descending
105
     */
106
    public function ensureCountIndex(array $fields, $includeRunning)
107
    {
108
        if (!is_bool($includeRunning)) {
109
            throw new \InvalidArgumentException('$includeRunning was not a boolean');
110
        }
111
112
        $completeFields = [];
113
114
        if ($includeRunning) {
115
            $completeFields['running'] = 1;
116
        }
117
118
        self::verifySort($fields, 'fields', $completeFields);
119
120
        $this->ensureIndex($completeFields);
121
    }
122
123
    /**
124
     * Get a non running message from the queue.
125
     *
126
     * @param array $query in same format as \MongoCollection::find() where top level fields do not contain operators.
127
     *                     Lower level fields can however. eg: valid {a: {$gt: 1}, "b.c": 3},
128
     *                     invalid {$and: [{...}, {...}]}
129
     * @param int $runningResetDuration second duration the message can stay unacked before it resets and can be
130
     *                                  retreived again.
131
     * @param int $waitDurationInMillis millisecond duration to wait for a message.
132
     * @param int $pollDurationInMillis millisecond duration to wait between polls.
133
     *
134
     * @return array|null the message or null if one is not found
135
     *
136
     * @throws \InvalidArgumentException $runningResetDuration, $waitDurationInMillis or $pollDurationInMillis was not
137
     *                                   an int
138
     * @throws \InvalidArgumentException key in $query was not a string
139
     */
140
    public function get(array $query, $runningResetDuration, $waitDurationInMillis = 3000, $pollDurationInMillis = 200)
141
    {
142
        if (!is_int($runningResetDuration)) {
143
            throw new \InvalidArgumentException('$runningResetDuration was not an int');
144
        }
145
146
        if (!is_int($waitDurationInMillis)) {
147
            throw new \InvalidArgumentException('$waitDurationInMillis was not an int');
148
        }
149
150
        if (!is_int($pollDurationInMillis)) {
151
            throw new \InvalidArgumentException('$pollDurationInMillis was not an int');
152
        }
153
154
        if ($pollDurationInMillis < 0) {
155
            $pollDurationInMillis = 0;
156
        }
157
158
        //reset stuck messages
159
        $this->collection->update(
160
            ['running' => true, 'resetTimestamp' => ['$lte' => new \MongoDate()]],
161
            ['$set' => ['running' => false]],
162
            ['multiple' => true]
163
        );
164
165
        $completeQuery = ['running' => false];
166
        self::verifyQuery($query, $completeQuery);
167
        $completeQuery['earliestGet'] = ['$lte' => new \MongoDate()];
168
169
        $resetTimestamp = time() + $runningResetDuration;
170
        //ints overflow to floats
171
        if (!is_int($resetTimestamp)) {
172
            $resetTimestamp = $runningResetDuration > 0 ? self::MONGO_INT32_MAX : 0;
173
        }
174
175
        $update = ['$set' => ['resetTimestamp' => new \MongoDate($resetTimestamp), 'running' => true]];
176
        $fields = ['payload' => 1];
177
        $options = ['sort' => ['priority' => 1, 'created' => 1]];
178
179
        //ints overflow to floats, should be fine
180
        $end = microtime(true) + ($waitDurationInMillis / 1000.0);
181
182
        $sleepTime = $pollDurationInMillis * 1000;
183
        //ints overflow to floats and already checked $pollDurationInMillis was positive
184
        if (!is_int($sleepTime)) {
185
            //ignore since testing a giant sleep takes too long
186
            //@codeCoverageIgnoreStart
187
            $sleepTime = PHP_INT_MAX;
188
        }   //@codeCoverageIgnoreEnd
189
190
        while (true) {
191
            $message = $this->collection->findAndModify($completeQuery, $update, $fields, $options);
192
            //checking if _id exist because findAndModify doesnt seem to return null when it can't match the query on
193
            //older mongo extension
194
            if ($message !== null && array_key_exists('_id', $message)) {
195
                //id on left of union operator so a possible id in payload doesnt wipe it out the generated one
196
                return ['id' => $message['_id']] + $message['payload'];
197
            }
198
199
            if (microtime(true) >= $end) {
200
                return null;
201
            }
202
203
            usleep($sleepTime);
204
        }
205
206
        //ignore since always return from the function from the while loop
207
        //@codeCoverageIgnoreStart
208
    }
209
    //@codeCoverageIgnoreEnd
210
211
    /**
212
     * Count queue messages.
213
     *
214
     * @param array $query in same format as \MongoCollection::find() where top level fields do not contain operators.
215
     * Lower level fields can however. eg: valid {a: {$gt: 1}, "b.c": 3}, invalid {$and: [{...}, {...}]}
216
     * @param bool|null $running query a running message or not or all
217
     *
218
     * @return int the count
219
     *
220
     * @throws \InvalidArgumentException $running was not null and not a bool
221
     * @throws \InvalidArgumentException key in $query was not a string
222
     */
223
    public function count(array $query, $running = null)
224
    {
225
        if ($running !== null && !is_bool($running)) {
226
            throw new \InvalidArgumentException('$running was not null and not a bool');
227
        }
228
229
        $totalQuery = [];
230
231
        if ($running !== null) {
232
            $totalQuery['running'] = $running;
233
        }
234
235
        self::verifyQuery($query, $totalQuery);
236
237
        return $this->collection->count($totalQuery);
238
    }
239
240
    /**
241
     * Acknowledge a message was processed and remove from queue.
242
     *
243
     * @param array $message message received from get()
244
     *
245
     * @return void
246
     *
247
     * @throws \InvalidArgumentException $message does not have a field "id" that is a MongoId
248
     */
249
    public function ack(array $message)
250
    {
251
        $id = null;
252
        if (array_key_exists('id', $message)) {
253
            $id = $message['id'];
254
        }
255
256
        if (!($id instanceof \MongoId)) {
257
            throw new \InvalidArgumentException('$message does not have a field "id" that is a MongoId');
258
        }
259
260
        $this->collection->remove(['_id' => $id]);
261
    }
262
263
    /**
264
     * Atomically acknowledge and send a message to the queue.
265
     *
266
     * @param array $message the message to ack received from get()
267
     * @param array $payload the data to store in the message to send. Data is handled same way
268
     *                       as \MongoCollection::insert()
269
     * @param int $earliestGet earliest unix timestamp the message can be retreived.
270
     * @param float $priority priority for order out of get(). 0 is higher priority than 1
271
     * @param bool $newTimestamp true to give the payload a new timestamp or false to use given message timestamp
272
     *
273
     * @return void
274
     *
275
     * @throws \InvalidArgumentException $message does not have a field "id" that is a MongoId
276
     * @throws \InvalidArgumentException $earliestGet was not an int
277
     * @throws \InvalidArgumentException $priority was not a float
278
     * @throws \InvalidArgumentException $priority is NaN
279
     * @throws \InvalidArgumentException $newTimestamp was not a bool
280
     */
281
    public function ackSend(array $message, array $payload, $earliestGet = 0, $priority = 0.0, $newTimestamp = true)
282
    {
283
        $id = null;
284
        if (array_key_exists('id', $message)) {
285
            $id = $message['id'];
286
        }
287
288
        if (!($id instanceof \MongoId)) {
289
            throw new \InvalidArgumentException('$message does not have a field "id" that is a MongoId');
290
        }
291
292
        if (!is_int($earliestGet)) {
293
            throw new \InvalidArgumentException('$earliestGet was not an int');
294
        }
295
296
        if (!is_float($priority)) {
297
            throw new \InvalidArgumentException('$priority was not a float');
298
        }
299
300
        if (is_nan($priority)) {
301
            throw new \InvalidArgumentException('$priority was NaN');
302
        }
303
304
        if ($newTimestamp !== true && $newTimestamp !== false) {
305
            throw new \InvalidArgumentException('$newTimestamp was not a bool');
306
        }
307
308
        //Ensure $earliestGet is between 0 and MONGO_INT32_MAX
309
        $earliestGet = min(max(0, $earliestGet), self::MONGO_INT32_MAX);
310
311
        $toSet = [
312
            'payload' => $payload,
313
            'running' => false,
314
            'resetTimestamp' => new \MongoDate(self::MONGO_INT32_MAX),
315
            'earliestGet' => new \MongoDate($earliestGet),
316
            'priority' => $priority,
317
        ];
318
        if ($newTimestamp) {
319
            $toSet['created'] = new \MongoDate();
320
        }
321
322
        //using upsert because if no documents found then the doc was removed (SHOULD ONLY HAPPEN BY SOMEONE MANUALLY)
323
        //so we can just send
324
        $this->collection->update(['_id' => $id], ['$set' => $toSet], ['upsert' => true]);
325
    }
326
327
    /**
328
     * Requeue message to the queue. Same as ackSend() with the same message.
329
     *
330
     * @param array $message message received from get().
331
     * @param int $earliestGet earliest unix timestamp the message can be retreived.
332
     * @param float $priority priority for order out of get(). 0 is higher priority than 1
333
     * @param bool $newTimestamp true to give the payload a new timestamp or false to use given message timestamp
334
     *
335
     * @return void
336
     *
337
     * @throws \InvalidArgumentException $message does not have a field "id" that is a MongoId
338
     * @throws \InvalidArgumentException $earliestGet was not an int
339
     * @throws \InvalidArgumentException $priority was not a float
340
     * @throws \InvalidArgumentException priority is NaN
341
     * @throws \InvalidArgumentException $newTimestamp was not a bool
342
     */
343
    public function requeue(array $message, $earliestGet = 0, $priority = 0.0, $newTimestamp = true)
344
    {
345
        $forRequeue = $message;
346
        unset($forRequeue['id']);
347
        $this->ackSend($message, $forRequeue, $earliestGet, $priority, $newTimestamp);
348
    }
349
350
    /**
351
     * Send a message to the queue.
352
     *
353
     * @param array $payload the data to store in the message. Data is handled same way as \MongoCollection::insert()
354
     * @param int $earliestGet earliest unix timestamp the message can be retreived.
355
     * @param float $priority priority for order out of get(). 0 is higher priority than 1
356
     *
357
     * @return void
358
     *
359
     * @throws \InvalidArgumentException $earliestGet was not an int
360
     * @throws \InvalidArgumentException $priority was not a float
361
     * @throws \InvalidArgumentException $priority is NaN
362
     */
363
    public function send(array $payload, $earliestGet = 0, $priority = 0.0)
364
    {
365
        if (!is_int($earliestGet)) {
366
            throw new \InvalidArgumentException('$earliestGet was not an int');
367
        }
368
369
        if (!is_float($priority)) {
370
            throw new \InvalidArgumentException('$priority was not a float');
371
        }
372
373
        if (is_nan($priority)) {
374
            throw new \InvalidArgumentException('$priority was NaN');
375
        }
376
377
        //Ensure $earliestGet is between 0 and MONGO_INT32_MAX
378
        $earliestGet = min(max(0, $earliestGet), self::MONGO_INT32_MAX);
379
380
        $message = [
381
            'payload' => $payload,
382
            'running' => false,
383
            'resetTimestamp' => new \MongoDate(self::MONGO_INT32_MAX),
384
            'earliestGet' => new \MongoDate($earliestGet),
385
            'priority' => $priority,
386
            'created' => new \MongoDate(),
387
        ];
388
389
        $this->collection->insert($message);
390
    }
391
392
    /**
393
     * Ensure index of correct specification and a unique name whether the specification or name already exist or not.
394
     * Will not create index if $index is a prefix of an existing index
395
     *
396
     * @param array $index index to create in same format as \MongoCollection::ensureIndex()
397
     *
398
     * @return void
399
     *
400
     * @throws \Exception couldnt create index after 5 attempts
401
     */
402
    private function ensureIndex(array $index)
403
    {
404
        //if $index is a prefix of any existing index we are good
405
        foreach ($this->collection->getIndexInfo() as $existingIndex) {
406
            $slice = array_slice($existingIndex['key'], 0, count($index), true);
407
            if ($slice === $index) {
408
                return;
409
            }
410
        }
411
412
        for ($i = 0; $i < 5; ++$i) {
413
            for ($name = uniqid(); strlen($name) > 0; $name = substr($name, 0, -1)) {
414
                //creating an index with same name and different spec does nothing.
415
                //creating an index with same spec and different name does nothing.
416
                //so we use any generated name, and then find the right spec after we have called,
417
                //and just go with that name.
418
                try {
419
                    $this->collection->ensureIndex($index, ['name' => $name, 'background' => true]);
420
                } catch (\MongoException $e) {
421
                    //this happens when the name was too long, let continue
422
                }
423
424
                foreach ($this->collection->getIndexInfo() as $existingIndex) {
425
                    if ($existingIndex['key'] === $index) {
426
                        return;
427
                    }
428
                }
429
            }
430
        }
431
432
        throw new \Exception('couldnt create index after 5 attempts');
433
    }
434
435
    /**
436
     * Helper method to validate keys and values for the given sort array
437
     *
438
     * @param array  $sort             The proposed sort for a mongo index.
439
     * @param string $label            The name of the variable given to the public ensureXIndex method.
440
     * @param array  &$completedFields The final index array with payload. prefix added to fields.
441
     *
442
     * @return void
443
     */
444
    private static function verifySort(array $sort, $label, &$completeFields)
445
    {
446
        foreach ($sort as $key => $value) {
447
            if (!is_string($key)) {
448
                throw new \InvalidArgumentException("key in \${$label} was not a string");
449
            }
450
451
            if ($value !== 1 && $value !== -1) {
452
                throw new \InvalidArgumentException(
453
                    "value of \${$label} is not 1 or -1 for ascending and descending"
454
                );
455
            }
456
457
            $completeFields["payload.{$key}"] = $value;
458
        }
459
    }
460
461
    /**
462
     * Helper method to validate get queries.
463
     *
464
     * @param array $query The user provided query.
465
     * @param array &$completeQuery The validated query for gets.
466
     *
467
     * @return void
468
     */
469
    private static function verifyQuery(array $query, array &$completeQuery)
470
    {
471
        foreach ($query as $key => $value) {
472
            if (!is_string($key)) {
473
                throw new \InvalidArgumentException('key in $query was not a string');
474
            }
475
476
            $completeQuery["payload.{$key}"] = $value;
477
        }
478
    }
479
}
480