Completed
Pull Request — master (#45)
by Chad
03:13
created

Queue::count()   C

Complexity

Conditions 8
Paths 7

Size

Total Lines 23
Code Lines 12

Duplication

Lines 7
Ratio 30.43 %

Importance

Changes 3
Bugs 0 Features 0
Metric Value
c 3
b 0
f 0
dl 7
loc 23
rs 6.1403
cc 8
eloc 12
nc 7
nop 2
1
<?php
2
/**
3
 * Defines the DominionEnterprises\Mongo\Queue class.
4
 */
5
6
namespace DominionEnterprises\Mongo;
7
8
/**
9
 * Abstraction of mongo db collection as priority queue.
10
 *
11
 * Tied priorities are ordered by time. So you may use a single priority for normal queuing (default args exist for
12
 * this purpose).  Using a random priority achieves random get()
13
 */
14
final class Queue implements QueueInterface
15
{
16
    const MONGO_INT32_MAX = 2147483647;//2147483648 can overflow in php mongo without using the MongoInt64
17
18
    /**
19
     * mongo collection to use for queue.
20
     *
21
     * @var \MongoCollection
22
     */
23
    private $collection;
24
25
    /**
26
     * Construct queue.
27
     *
28
     * @param \MongoCollection|string $collectionOrUrl A MongoCollection instance or the mongo connection url.
29
     * @param string $db the mongo db name
30
     * @param string $collection the collection name to use for the queue
31
     *
32
     * @throws \InvalidArgumentException $collectionOrUrl, $db or $collection was not a string
33
     */
34
    public function __construct($collectionOrUrl, $db = null, $collection = null)
35
    {
36
        if ($collectionOrUrl instanceof \MongoCollection) {
37
            $this->collection = $collectionOrUrl;
38
            return;
39
        }
40
41
        if (!is_string($collectionOrUrl)) {
42
            throw new \InvalidArgumentException('$collectionOrUrl was not a string');
43
        }
44
45
        if (!is_string($db)) {
46
            throw new \InvalidArgumentException('$db was not a string');
47
        }
48
49
        if (!is_string($collection)) {
50
            throw new \InvalidArgumentException('$collection was not a string');
51
        }
52
53
        $mongo = new \MongoClient($collectionOrUrl);
54
        $mongoDb = $mongo->selectDB($db);
55
        $this->collection = $mongoDb->selectCollection($collection);
56
    }
57
58
    /**
59
     * Ensure an index for the get() method.
60
     *
61
     * @param array $beforeSort Fields in get() call to index before the sort field in same format
62
     *                          as \MongoCollection::ensureIndex()
63
     * @param array $afterSort  Fields in get() call to index after the sort field in same format as
64
     *                          \MongoCollection::ensureIndex()
65
     *
66
     * @return void
67
     *
68
     * @throws \InvalidArgumentException value of $beforeSort or $afterSort is not 1 or -1 for ascending and descending
69
     * @throws \InvalidArgumentException key in $beforeSort or $afterSort was not a string
70
     */
71
    public function ensureGetIndex(array $beforeSort = [], array $afterSort = [])
72
    {
73
        //using general rule: equality, sort, range or more equality tests in that order for index
74
        $completeFields = ['earliestGet' => 1];
75
76
        self::verifySort($beforeSort, 'beforeSort', $completeFields);
77
78
        $completeFields['priority'] = 1;
79
        $completeFields['created'] = 1;
80
81
        self::verifySort($afterSort, 'afterSort', $completeFields);
82
83
        //for the main query in get()
84
        $this->ensureIndex($completeFields);
85
    }
86
87
    /**
88
     * Ensure an index for the count() method.
89
     * Is a no-op if the generated index is a prefix of an existing one. If you have a similar ensureGetIndex call,
90
     * call it first.
91
     *
92
     * @param array $fields fields in count() call to index in same format as \MongoCollection::ensureIndex()
93
     * @param bool $includeRunning whether to include the running field in the index
94
     *
95
     * @return void
96
     *
97
     * @throws \InvalidArgumentException $includeRunning was not a boolean
98
     * @throws \InvalidArgumentException key in $fields was not a string
99
     * @throws \InvalidArgumentException value of $fields is not 1 or -1 for ascending and descending
100
     */
101
    public function ensureCountIndex(array $fields, $includeRunning)
102
    {
103
        if (!is_bool($includeRunning)) {
104
            throw new \InvalidArgumentException('$includeRunning was not a boolean');
105
        }
106
107
        $completeFields = [];
108
109
        if ($includeRunning) {
110
            $completeFields['earliestGet'] = 1;
111
        }
112
113
        self::verifySort($fields, 'fields', $completeFields);
114
115
        $this->ensureIndex($completeFields);
116
    }
117
118
    /**
119
     * Get a non running message from the queue.
120
     *
121
     * @param array $query in same format as \MongoCollection::find() where top level fields do not contain operators.
122
     *                     Lower level fields can however. eg: valid {a: {$gt: 1}, "b.c": 3},
123
     *                     invalid {$and: [{...}, {...}]}
124
     * @param int $runningResetDuration second duration the message can stay unacked before it resets and can be
125
     *                                  retreived again.
126
     * @param int $waitDurationInMillis millisecond duration to wait for a message.
127
     * @param int $pollDurationInMillis millisecond duration to wait between polls.
128
     *
129
     * @return array|null the message or null if one is not found
130
     *
131
     * @throws \InvalidArgumentException $runningResetDuration, $waitDurationInMillis or $pollDurationInMillis was not
132
     *                                   an int
133
     * @throws \InvalidArgumentException key in $query was not a string
134
     */
135
    public function get(array $query, $runningResetDuration, $waitDurationInMillis = 3000, $pollDurationInMillis = 200)
136
    {
137
        if (!is_int($runningResetDuration)) {
138
            throw new \InvalidArgumentException('$runningResetDuration was not an int');
139
        }
140
141
        if (!is_int($waitDurationInMillis)) {
142
            throw new \InvalidArgumentException('$waitDurationInMillis was not an int');
143
        }
144
145
        if (!is_int($pollDurationInMillis)) {
146
            throw new \InvalidArgumentException('$pollDurationInMillis was not an int');
147
        }
148
149
        if ($pollDurationInMillis < 0) {
150
            $pollDurationInMillis = 0;
151
        }
152
153
        $completeQuery = ['earliestGet' => ['$lte' => new \MongoDate()]];
154 View Code Duplication
        foreach ($query as $key => $value) {
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated across your project.

Duplicated code is one of the most pungent code smells. If you need to duplicate the same code in three or more different places, we strongly encourage you to look into extracting the code into a single class or operation.

You can also find more detailed suggestions in the “Code” section of your repository.

Loading history...
155
            if (!is_string($key)) {
156
                throw new \InvalidArgumentException('key in $query was not a string');
157
            }
158
159
            $completeQuery["payload.{$key}"] = $value;
160
        }
161
162
        $resetTimestamp = time() + $runningResetDuration;
163
        //ints overflow to floats
164
        if (!is_int($resetTimestamp)) {
165
            $resetTimestamp = $runningResetDuration > 0 ? self::MONGO_INT32_MAX : 0;
166
        }
167
168
        $update = ['$set' => ['earliestGet' => new \MongoDate($resetTimestamp)]];
169
        $fields = ['payload' => 1];
170
        $options = ['sort' => ['priority' => 1, 'created' => 1]];
171
172
        //ints overflow to floats, should be fine
173
        $end = microtime(true) + ($waitDurationInMillis / 1000.0);
174
175
        $sleepTime = $pollDurationInMillis * 1000;
176
        //ints overflow to floats and already checked $pollDurationInMillis was positive
177
        if (!is_int($sleepTime)) {
178
            //ignore since testing a giant sleep takes too long
179
            //@codeCoverageIgnoreStart
180
            $sleepTime = PHP_INT_MAX;
181
        }   //@codeCoverageIgnoreEnd
182
183
        while (true) {
184
            $message = $this->collection->findAndModify($completeQuery, $update, $fields, $options);
185
            //checking if _id exist because findAndModify doesnt seem to return null when it can't match the query on
186
            //older mongo extension
187
            if ($message !== null && array_key_exists('_id', $message)) {
188
                //id on left of union operator so a possible id in payload doesnt wipe it out the generated one
189
                return ['id' => $message['_id']] + $message['payload'];
190
            }
191
192
            if (microtime(true) >= $end) {
193
                return null;
194
            }
195
196
            usleep($sleepTime);
197
        }
198
199
        //ignore since always return from the function from the while loop
200
        //@codeCoverageIgnoreStart
201
    }
202
    //@codeCoverageIgnoreEnd
203
204
    /**
205
     * Count queue messages.
206
     *
207
     * @param array $query in same format as \MongoCollection::find() where top level fields do not contain operators.
208
     * Lower level fields can however. eg: valid {a: {$gt: 1}, "b.c": 3}, invalid {$and: [{...}, {...}]}
209
     * @param bool|null $running query a running message or not or all
210
     *
211
     * @return int the count
212
     *
213
     * @throws \InvalidArgumentException $running was not null and not a bool
214
     * @throws \InvalidArgumentException key in $query was not a string
215
     */
216
    public function count(array $query, $running = null)
217
    {
218
        if ($running !== null && !is_bool($running)) {
219
            throw new \InvalidArgumentException('$running was not null and not a bool');
220
        }
221
222
        $totalQuery = [];
223
224
        if ($running === true || $running === false) {
225
            $key = $running ? '$gt' : '$lte';
226
            $totalQuery['earliestGet'] = [$key => new \MongoDate()];
227
        }
228
229 View Code Duplication
        foreach ($query as $key => $value) {
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated across your project.

Duplicated code is one of the most pungent code smells. If you need to duplicate the same code in three or more different places, we strongly encourage you to look into extracting the code into a single class or operation.

You can also find more detailed suggestions in the “Code” section of your repository.

Loading history...
230
            if (!is_string($key)) {
231
                throw new \InvalidArgumentException('key in $query was not a string');
232
            }
233
234
            $totalQuery["payload.{$key}"] = $value;
235
        }
236
237
        return $this->collection->count($totalQuery);
238
    }
239
240
    /**
241
     * Acknowledge a message was processed and remove from queue.
242
     *
243
     * @param array $message message received from get()
244
     *
245
     * @return void
246
     *
247
     * @throws \InvalidArgumentException $message does not have a field "id" that is a MongoId
248
     */
249
    public function ack(array $message)
250
    {
251
        $id = null;
252
        if (array_key_exists('id', $message)) {
253
            $id = $message['id'];
254
        }
255
256
        if (!($id instanceof \MongoId)) {
257
            throw new \InvalidArgumentException('$message does not have a field "id" that is a MongoId');
258
        }
259
260
        $this->collection->remove(['_id' => $id]);
261
    }
262
263
    /**
264
     * Atomically acknowledge and send a message to the queue.
265
     *
266
     * @param array $message the message to ack received from get()
267
     * @param array $payload the data to store in the message to send. Data is handled same way
268
     *                       as \MongoCollection::insert()
269
     * @param int $earliestGet earliest unix timestamp the message can be retreived.
270
     * @param float $priority priority for order out of get(). 0 is higher priority than 1
271
     * @param bool $newTimestamp true to give the payload a new timestamp or false to use given message timestamp
272
     *
273
     * @return void
274
     *
275
     * @throws \InvalidArgumentException $message does not have a field "id" that is a MongoId
276
     * @throws \InvalidArgumentException $earliestGet was not an int
277
     * @throws \InvalidArgumentException $priority was not a float
278
     * @throws \InvalidArgumentException $priority is NaN
279
     * @throws \InvalidArgumentException $newTimestamp was not a bool
280
     */
281
    public function ackSend(array $message, array $payload, $earliestGet = 0, $priority = 0.0, $newTimestamp = true)
282
    {
283
        $id = null;
284
        if (array_key_exists('id', $message)) {
285
            $id = $message['id'];
286
        }
287
288
        if (!($id instanceof \MongoId)) {
289
            throw new \InvalidArgumentException('$message does not have a field "id" that is a MongoId');
290
        }
291
292
        if (!is_int($earliestGet)) {
293
            throw new \InvalidArgumentException('$earliestGet was not an int');
294
        }
295
296
        if (!is_float($priority)) {
297
            throw new \InvalidArgumentException('$priority was not a float');
298
        }
299
300
        if (is_nan($priority)) {
301
            throw new \InvalidArgumentException('$priority was NaN');
302
        }
303
304
        if ($newTimestamp !== true && $newTimestamp !== false) {
305
            throw new \InvalidArgumentException('$newTimestamp was not a bool');
306
        }
307
308
        //Ensure $earliestGet is between 0 and MONGO_INT32_MAX
309
        $earliestGet = min(max(0, $earliestGet), self::MONGO_INT32_MAX);
310
311
        $toSet = [
312
            'payload' => $payload,
313
            'earliestGet' => new \MongoDate($earliestGet),
314
            'priority' => $priority,
315
        ];
316
        if ($newTimestamp) {
317
            $toSet['created'] = new \MongoDate();
318
        }
319
320
        //using upsert because if no documents found then the doc was removed (SHOULD ONLY HAPPEN BY SOMEONE MANUALLY)
321
        //so we can just send
322
        $this->collection->update(['_id' => $id], ['$set' => $toSet], ['upsert' => true]);
323
    }
324
325
    /**
326
     * Requeue message to the queue. Same as ackSend() with the same message.
327
     *
328
     * @param array $message message received from get().
329
     * @param int $earliestGet earliest unix timestamp the message can be retreived.
330
     * @param float $priority priority for order out of get(). 0 is higher priority than 1
331
     * @param bool $newTimestamp true to give the payload a new timestamp or false to use given message timestamp
332
     *
333
     * @return void
334
     *
335
     * @throws \InvalidArgumentException $message does not have a field "id" that is a MongoId
336
     * @throws \InvalidArgumentException $earliestGet was not an int
337
     * @throws \InvalidArgumentException $priority was not a float
338
     * @throws \InvalidArgumentException priority is NaN
339
     * @throws \InvalidArgumentException $newTimestamp was not a bool
340
     */
341
    public function requeue(array $message, $earliestGet = 0, $priority = 0.0, $newTimestamp = true)
342
    {
343
        $forRequeue = $message;
344
        unset($forRequeue['id']);
345
        $this->ackSend($message, $forRequeue, $earliestGet, $priority, $newTimestamp);
346
    }
347
348
    /**
349
     * Send a message to the queue.
350
     *
351
     * @param array $payload the data to store in the message. Data is handled same way as \MongoCollection::insert()
352
     * @param int $earliestGet earliest unix timestamp the message can be retreived.
353
     * @param float $priority priority for order out of get(). 0 is higher priority than 1
354
     *
355
     * @return void
356
     *
357
     * @throws \InvalidArgumentException $earliestGet was not an int
358
     * @throws \InvalidArgumentException $priority was not a float
359
     * @throws \InvalidArgumentException $priority is NaN
360
     */
361
    public function send(array $payload, $earliestGet = 0, $priority = 0.0)
362
    {
363
        if (!is_int($earliestGet)) {
364
            throw new \InvalidArgumentException('$earliestGet was not an int');
365
        }
366
367
        if (!is_float($priority)) {
368
            throw new \InvalidArgumentException('$priority was not a float');
369
        }
370
371
        if (is_nan($priority)) {
372
            throw new \InvalidArgumentException('$priority was NaN');
373
        }
374
375
        //Ensure $earliestGet is between now and MONGO_INT32_MAX
376
        $earliestGet = min(max(0, $earliestGet), self::MONGO_INT32_MAX);
377
378
        $message = [
379
            'payload' => $payload,
380
            'earliestGet' => new \MongoDate($earliestGet),
381
            'priority' => $priority,
382
            'created' => new \MongoDate(),
383
        ];
384
385
        $this->collection->insert($message);
386
    }
387
388
    /**
389
     * Ensure index of correct specification and a unique name whether the specification or name already exist or not.
390
     * Will not create index if $index is a prefix of an existing index
391
     *
392
     * @param array $index index to create in same format as \MongoCollection::ensureIndex()
393
     *
394
     * @return void
395
     *
396
     * @throws \Exception couldnt create index after 5 attempts
397
     */
398
    private function ensureIndex(array $index)
399
    {
400
        //if $index is a prefix of any existing index we are good
401
        foreach ($this->collection->getIndexInfo() as $existingIndex) {
402
            $slice = array_slice($existingIndex['key'], 0, count($index), true);
403
            if ($slice === $index) {
404
                return;
405
            }
406
        }
407
408
        for ($i = 0; $i < 5; ++$i) {
409
            for ($name = uniqid(); strlen($name) > 0; $name = substr($name, 0, -1)) {
410
                //creating an index with same name and different spec does nothing.
411
                //creating an index with same spec and different name does nothing.
412
                //so we use any generated name, and then find the right spec after we have called,
413
                //and just go with that name.
414
                try {
415
                    $this->collection->ensureIndex($index, ['name' => $name, 'background' => true]);
416
                } catch (\MongoException $e) {
417
                    //this happens when the name was too long, let continue
418
                }
419
420
                foreach ($this->collection->getIndexInfo() as $existingIndex) {
421
                    if ($existingIndex['key'] === $index) {
422
                        return;
423
                    }
424
                }
425
            }
426
        }
427
428
        throw new \Exception('couldnt create index after 5 attempts');
429
    }
430
431
    /**
432
     * Helper method to validate keys and values for the given sort array
433
     *
434
     * @param array  $sort             The proposed sort for a mongo index.
435
     * @param string $label            The name of the variable given to the public ensureXIndex method.
436
     * @param array  &$completedFields The final index array with payload. prefix added to fields.
437
     *
438
     * @return void
439
     */
440
    private static function verifySort(array $sort, $label, &$completeFields)
441
    {
442
        foreach ($sort as $key => $value) {
443
            if (!is_string($key)) {
444
                throw new \InvalidArgumentException("key in \${$label} was not a string");
445
            }
446
447
            if ($value !== 1 && $value !== -1) {
448
                throw new \InvalidArgumentException(
449
                    "value of \${$label} is not 1 or -1 for ascending and descending"
450
                );
451
            }
452
453
            $completeFields["payload.{$key}"] = $value;
454
        }
455
    }
456
}
457