Completed
Pull Request — master (#43)
by Chad
06:24 queued 03:04
created

Queue::send()   B

Complexity

Conditions 4
Paths 4

Size

Total Lines 27
Code Lines 15

Duplication

Lines 0
Ratio 0 %

Importance

Changes 6
Bugs 0 Features 0
Metric Value
c 6
b 0
f 0
dl 0
loc 27
rs 8.5806
cc 4
eloc 15
nc 4
nop 3
1
<?php
2
/**
3
 * Defines the DominionEnterprises\Mongo\Queue class.
4
 */
5
6
namespace DominionEnterprises\Mongo;
7
8
/**
9
 * Abstraction of mongo db collection as priority queue.
10
 *
11
 * Tied priorities are ordered by time. So you may use a single priority for normal queuing (default args exist for
12
 * this purpose).  Using a random priority achieves random get()
13
 */
14
final class Queue implements QueueInterface
15
{
16
    const MONGO_INT32_MAX = 2147483647;//2147483648 can overflow in php mongo without using the MongoInt64
17
18
    /**
19
     * mongo collection to use for queue.
20
     *
21
     * @var \MongoCollection
22
     */
23
    private $collection;
24
25
    /**
26
     * Construct queue.
27
     *
28
     * @param \MongoCollection|string $collectionOrUrl A MongoCollection instance or the mongo connection url.
29
     * @param string $db the mongo db name
30
     * @param string $collection the collection name to use for the queue
31
     *
32
     * @throws \InvalidArgumentException $collectionOrUrl, $db or $collection was not a string
33
     */
34
    public function __construct($collectionOrUrl, $db = null, $collection = null)
35
    {
36
        if ($collectionOrUrl instanceof \MongoCollection) {
37
            $this->collection = $collectionOrUrl;
38
            return;
39
        }
40
41
        if (!is_string($collectionOrUrl)) {
42
            throw new \InvalidArgumentException('$collectionOrUrl was not a string');
43
        }
44
45
        if (!is_string($db)) {
46
            throw new \InvalidArgumentException('$db was not a string');
47
        }
48
49
        if (!is_string($collection)) {
50
            throw new \InvalidArgumentException('$collection was not a string');
51
        }
52
53
        $mongo = new \MongoClient($collectionOrUrl);
54
        $mongoDb = $mongo->selectDB($db);
55
        $this->collection = $mongoDb->selectCollection($collection);
56
    }
57
58
    /**
59
     * Ensure an index for the get() method.
60
     *
61
     * @param array $beforeSort Fields in get() call to index before the sort field in same format
62
     *                          as \MongoCollection::ensureIndex()
63
     * @param array $afterSort  Fields in get() call to index after the sort field in same format as
64
     *                          \MongoCollection::ensureIndex()
65
     *
66
     * @return void
67
     *
68
     * @throws \InvalidArgumentException value of $beforeSort or $afterSort is not 1 or -1 for ascending and descending
69
     * @throws \InvalidArgumentException key in $beforeSort or $afterSort was not a string
70
     */
71
    public function ensureGetIndex(array $beforeSort = [], array $afterSort = [])
72
    {
73
        //using general rule: equality, sort, range or more equality tests in that order for index
74
        $completeFields = ['resetTimestamp' => 1];
75
76
        self::verifySort($beforeSort, 'beforeSort', $completeFields);
77
78
        $completeFields['priority'] = 1;
79
        $completeFields['created'] = 1;
80
81
        self::verifySort($afterSort, 'afterSort', $completeFields);
82
83
        $completeFields['earliestGet'] = 1;
84
85
        //for the main query in get()
86
        $this->ensureIndex($completeFields);
87
    }
88
89
    /**
90
     * Ensure an index for the count() method.
91
     * Is a no-op if the generated index is a prefix of an existing one. If you have a similar ensureGetIndex call,
92
     * call it first.
93
     *
94
     * @param array $fields fields in count() call to index in same format as \MongoCollection::ensureIndex()
95
     * @param bool $includeRunning whether to include the running field in the index
96
     *
97
     * @return void
98
     *
99
     * @throws \InvalidArgumentException $includeRunning was not a boolean
100
     * @throws \InvalidArgumentException key in $fields was not a string
101
     * @throws \InvalidArgumentException value of $fields is not 1 or -1 for ascending and descending
102
     */
103
    public function ensureCountIndex(array $fields, $includeRunning)
104
    {
105
        if (!is_bool($includeRunning)) {
106
            throw new \InvalidArgumentException('$includeRunning was not a boolean');
107
        }
108
109
        $completeFields = [];
110
111
        if ($includeRunning) {
112
            $completeFields['resetTimestamp'] = 1;
113
        }
114
115
        self::verifySort($fields, 'fields', $completeFields);
116
117
        $this->ensureIndex($completeFields);
118
    }
119
120
    /**
121
     * Get a non running message from the queue.
122
     *
123
     * @param array $query in same format as \MongoCollection::find() where top level fields do not contain operators.
124
     *                     Lower level fields can however. eg: valid {a: {$gt: 1}, "b.c": 3},
125
     *                     invalid {$and: [{...}, {...}]}
126
     * @param int $runningResetDuration second duration the message can stay unacked before it resets and can be
127
     *                                  retreived again.
128
     * @param int $waitDurationInMillis millisecond duration to wait for a message.
129
     * @param int $pollDurationInMillis millisecond duration to wait between polls.
130
     *
131
     * @return array|null the message or null if one is not found
132
     *
133
     * @throws \InvalidArgumentException $runningResetDuration, $waitDurationInMillis or $pollDurationInMillis was not
134
     *                                   an int
135
     * @throws \InvalidArgumentException key in $query was not a string
136
     */
137
    public function get(array $query, $runningResetDuration, $waitDurationInMillis = 3000, $pollDurationInMillis = 200)
138
    {
139
        if (!is_int($runningResetDuration)) {
140
            throw new \InvalidArgumentException('$runningResetDuration was not an int');
141
        }
142
143
        if (!is_int($waitDurationInMillis)) {
144
            throw new \InvalidArgumentException('$waitDurationInMillis was not an int');
145
        }
146
147
        if (!is_int($pollDurationInMillis)) {
148
            throw new \InvalidArgumentException('$pollDurationInMillis was not an int');
149
        }
150
151
        if ($pollDurationInMillis < 0) {
152
            $pollDurationInMillis = 0;
153
        }
154
155
        $completeQuery = ['resetTimestamp' => ['$lte' => new \MongoDate()]];
156 View Code Duplication
        foreach ($query as $key => $value) {
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated across your project.

Duplicated code is one of the most pungent code smells. If you need to duplicate the same code in three or more different places, we strongly encourage you to look into extracting the code into a single class or operation.

You can also find more detailed suggestions in the “Code” section of your repository.

Loading history...
157
            if (!is_string($key)) {
158
                throw new \InvalidArgumentException('key in $query was not a string');
159
            }
160
161
            $completeQuery["payload.{$key}"] = $value;
162
        }
163
164
        $completeQuery['earliestGet'] = ['$lte' => new \MongoDate()];
165
166
        $resetTimestamp = time() + $runningResetDuration;
167
        //ints overflow to floats
168
        if (!is_int($resetTimestamp)) {
169
            $resetTimestamp = $runningResetDuration > 0 ? self::MONGO_INT32_MAX : 0;
170
        }
171
172
        $update = ['$set' => ['resetTimestamp' => new \MongoDate($resetTimestamp)]];
173
        $fields = ['payload' => 1];
174
        $options = ['sort' => ['priority' => 1, 'created' => 1]];
175
176
        //ints overflow to floats, should be fine
177
        $end = microtime(true) + ($waitDurationInMillis / 1000.0);
178
179
        $sleepTime = $pollDurationInMillis * 1000;
180
        //ints overflow to floats and already checked $pollDurationInMillis was positive
181
        if (!is_int($sleepTime)) {
182
            //ignore since testing a giant sleep takes too long
183
            //@codeCoverageIgnoreStart
184
            $sleepTime = PHP_INT_MAX;
185
        }   //@codeCoverageIgnoreEnd
186
187
        while (true) {
188
            $message = $this->collection->findAndModify($completeQuery, $update, $fields, $options);
189
            //checking if _id exist because findAndModify doesnt seem to return null when it can't match the query on
190
            //older mongo extension
191
            if ($message !== null && array_key_exists('_id', $message)) {
192
                //id on left of union operator so a possible id in payload doesnt wipe it out the generated one
193
                return ['id' => $message['_id']] + $message['payload'];
194
            }
195
196
            if (microtime(true) >= $end) {
197
                return null;
198
            }
199
200
            usleep($sleepTime);
201
        }
202
203
        //ignore since always return from the function from the while loop
204
        //@codeCoverageIgnoreStart
205
    }
206
    //@codeCoverageIgnoreEnd
207
208
    /**
209
     * Count queue messages.
210
     *
211
     * @param array $query in same format as \MongoCollection::find() where top level fields do not contain operators.
212
     * Lower level fields can however. eg: valid {a: {$gt: 1}, "b.c": 3}, invalid {$and: [{...}, {...}]}
213
     * @param bool|null $running query a running message or not or all
214
     *
215
     * @return int the count
216
     *
217
     * @throws \InvalidArgumentException $running was not null and not a bool
218
     * @throws \InvalidArgumentException key in $query was not a string
219
     */
220
    public function count(array $query, $running = null)
221
    {
222
        if ($running !== null && !is_bool($running)) {
223
            throw new \InvalidArgumentException('$running was not null and not a bool');
224
        }
225
226
        $totalQuery = [];
227
228
        if ($running !== null) {
229
            $key = $running ? '$gt' : '$lte';
230
            $totalQuery['resetTimestamp'] = [$key => new \MongoDate()];
231
        }
232
233 View Code Duplication
        foreach ($query as $key => $value) {
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated across your project.

Duplicated code is one of the most pungent code smells. If you need to duplicate the same code in three or more different places, we strongly encourage you to look into extracting the code into a single class or operation.

You can also find more detailed suggestions in the “Code” section of your repository.

Loading history...
234
            if (!is_string($key)) {
235
                throw new \InvalidArgumentException('key in $query was not a string');
236
            }
237
238
            $totalQuery["payload.{$key}"] = $value;
239
        }
240
241
        return $this->collection->count($totalQuery);
242
    }
243
244
    /**
245
     * Acknowledge a message was processed and remove from queue.
246
     *
247
     * @param array $message message received from get()
248
     *
249
     * @return void
250
     *
251
     * @throws \InvalidArgumentException $message does not have a field "id" that is a MongoId
252
     */
253
    public function ack(array $message)
254
    {
255
        $id = null;
256
        if (array_key_exists('id', $message)) {
257
            $id = $message['id'];
258
        }
259
260
        if (!($id instanceof \MongoId)) {
261
            throw new \InvalidArgumentException('$message does not have a field "id" that is a MongoId');
262
        }
263
264
        $this->collection->remove(['_id' => $id]);
265
    }
266
267
    /**
268
     * Atomically acknowledge and send a message to the queue.
269
     *
270
     * @param array $message the message to ack received from get()
271
     * @param array $payload the data to store in the message to send. Data is handled same way
272
     *                       as \MongoCollection::insert()
273
     * @param int $earliestGet earliest unix timestamp the message can be retreived.
274
     * @param float $priority priority for order out of get(). 0 is higher priority than 1
275
     * @param bool $newTimestamp true to give the payload a new timestamp or false to use given message timestamp
276
     *
277
     * @return void
278
     *
279
     * @throws \InvalidArgumentException $message does not have a field "id" that is a MongoId
280
     * @throws \InvalidArgumentException $earliestGet was not an int
281
     * @throws \InvalidArgumentException $priority was not a float
282
     * @throws \InvalidArgumentException $priority is NaN
283
     * @throws \InvalidArgumentException $newTimestamp was not a bool
284
     */
285
    public function ackSend(array $message, array $payload, $earliestGet = 0, $priority = 0.0, $newTimestamp = true)
286
    {
287
        $id = null;
288
        if (array_key_exists('id', $message)) {
289
            $id = $message['id'];
290
        }
291
292
        if (!($id instanceof \MongoId)) {
293
            throw new \InvalidArgumentException('$message does not have a field "id" that is a MongoId');
294
        }
295
296
        if (!is_int($earliestGet)) {
297
            throw new \InvalidArgumentException('$earliestGet was not an int');
298
        }
299
300
        if (!is_float($priority)) {
301
            throw new \InvalidArgumentException('$priority was not a float');
302
        }
303
304
        if (is_nan($priority)) {
305
            throw new \InvalidArgumentException('$priority was NaN');
306
        }
307
308
        if ($newTimestamp !== true && $newTimestamp !== false) {
309
            throw new \InvalidArgumentException('$newTimestamp was not a bool');
310
        }
311
312
        //Ensure $earliestGet is between 0 and MONGO_INT32_MAX
313
        $earliestGet = min(max(0, $earliestGet), self::MONGO_INT32_MAX);
314
315
        $toSet = [
316
            'payload' => $payload,
317
            'resetTimestamp' => new \MongoDate(),
318
            'earliestGet' => new \MongoDate($earliestGet),
319
            'priority' => $priority,
320
        ];
321
        if ($newTimestamp) {
322
            $toSet['created'] = new \MongoDate();
323
        }
324
325
        //using upsert because if no documents found then the doc was removed (SHOULD ONLY HAPPEN BY SOMEONE MANUALLY)
326
        //so we can just send
327
        $this->collection->update(['_id' => $id], ['$set' => $toSet], ['upsert' => true]);
328
    }
329
330
    /**
331
     * Requeue message to the queue. Same as ackSend() with the same message.
332
     *
333
     * @param array $message message received from get().
334
     * @param int $earliestGet earliest unix timestamp the message can be retreived.
335
     * @param float $priority priority for order out of get(). 0 is higher priority than 1
336
     * @param bool $newTimestamp true to give the payload a new timestamp or false to use given message timestamp
337
     *
338
     * @return void
339
     *
340
     * @throws \InvalidArgumentException $message does not have a field "id" that is a MongoId
341
     * @throws \InvalidArgumentException $earliestGet was not an int
342
     * @throws \InvalidArgumentException $priority was not a float
343
     * @throws \InvalidArgumentException priority is NaN
344
     * @throws \InvalidArgumentException $newTimestamp was not a bool
345
     */
346
    public function requeue(array $message, $earliestGet = 0, $priority = 0.0, $newTimestamp = true)
347
    {
348
        $forRequeue = $message;
349
        unset($forRequeue['id']);
350
        $this->ackSend($message, $forRequeue, $earliestGet, $priority, $newTimestamp);
351
    }
352
353
    /**
354
     * Send a message to the queue.
355
     *
356
     * @param array $payload the data to store in the message. Data is handled same way as \MongoCollection::insert()
357
     * @param int $earliestGet earliest unix timestamp the message can be retreived.
358
     * @param float $priority priority for order out of get(). 0 is higher priority than 1
359
     *
360
     * @return void
361
     *
362
     * @throws \InvalidArgumentException $earliestGet was not an int
363
     * @throws \InvalidArgumentException $priority was not a float
364
     * @throws \InvalidArgumentException $priority is NaN
365
     */
366
    public function send(array $payload, $earliestGet = 0, $priority = 0.0)
367
    {
368
        if (!is_int($earliestGet)) {
369
            throw new \InvalidArgumentException('$earliestGet was not an int');
370
        }
371
372
        if (!is_float($priority)) {
373
            throw new \InvalidArgumentException('$priority was not a float');
374
        }
375
376
        if (is_nan($priority)) {
377
            throw new \InvalidArgumentException('$priority was NaN');
378
        }
379
380
        //Ensure $earliestGet is between 0 and MONGO_INT32_MAX
381
        $earliestGet = min(max(0, $earliestGet), self::MONGO_INT32_MAX);
382
383
        $message = [
384
            'payload' => $payload,
385
            'resetTimestamp' => new \MongoDate(),
386
            'earliestGet' => new \MongoDate($earliestGet),
387
            'priority' => $priority,
388
            'created' => new \MongoDate(),
389
        ];
390
391
        $this->collection->insert($message);
392
    }
393
394
    /**
395
     * Ensure index of correct specification and a unique name whether the specification or name already exist or not.
396
     * Will not create index if $index is a prefix of an existing index
397
     *
398
     * @param array $index index to create in same format as \MongoCollection::ensureIndex()
399
     *
400
     * @return void
401
     *
402
     * @throws \Exception couldnt create index after 5 attempts
403
     */
404
    private function ensureIndex(array $index)
405
    {
406
        //if $index is a prefix of any existing index we are good
407
        foreach ($this->collection->getIndexInfo() as $existingIndex) {
408
            $slice = array_slice($existingIndex['key'], 0, count($index), true);
409
            if ($slice === $index) {
410
                return;
411
            }
412
        }
413
414
        for ($i = 0; $i < 5; ++$i) {
415
            for ($name = uniqid(); strlen($name) > 0; $name = substr($name, 0, -1)) {
416
                //creating an index with same name and different spec does nothing.
417
                //creating an index with same spec and different name does nothing.
418
                //so we use any generated name, and then find the right spec after we have called,
419
                //and just go with that name.
420
                try {
421
                    $this->collection->ensureIndex($index, ['name' => $name, 'background' => true]);
422
                } catch (\MongoException $e) {
423
                    //this happens when the name was too long, let continue
424
                }
425
426
                foreach ($this->collection->getIndexInfo() as $existingIndex) {
427
                    if ($existingIndex['key'] === $index) {
428
                        return;
429
                    }
430
                }
431
            }
432
        }
433
434
        throw new \Exception('couldnt create index after 5 attempts');
435
    }
436
437
    /**
438
     * Helper method to validate keys and values for the given sort array
439
     *
440
     * @param array  $sort             The proposed sort for a mongo index.
441
     * @param string $label            The name of the variable given to the public ensureXIndex method.
442
     * @param array  &$completedFields The final index array with payload. prefix added to fields.
443
     *
444
     * @return void
445
     */
446
    private static function verifySort(array $sort, $label, &$completeFields)
447
    {
448
        foreach ($sort as $key => $value) {
449
            if (!is_string($key)) {
450
                throw new \InvalidArgumentException("key in \${$label} was not a string");
451
            }
452
453
            if ($value !== 1 && $value !== -1) {
454
                throw new \InvalidArgumentException(
455
                    "value of \${$label} is not 1 or -1 for ascending and descending"
456
                );
457
            }
458
459
            $completeFields["payload.{$key}"] = $value;
460
        }
461
    }
462
}
463