Failed Conditions
Pull Request — master (#51)
by Chad
02:56
created

src/Queue.php (1 issue)

Upgrade to new PHP Analysis Engine

These results are based on our legacy PHP analysis, consider migrating to our new PHP analysis engine instead. Learn more

1
<?php
2
/**
3
 * Defines the DominionEnterprises\Mongo\Queue class.
4
 */
5
6
namespace DominionEnterprises\Mongo;
7
8
use MongoDB\BSON\UTCDateTime;
9
10
/**
11
 * Abstraction of mongo db collection as priority queue.
12
 *
13
 * Tied priorities are ordered by time. So you may use a single priority for normal queuing (default args exist for
14
 * this purpose).  Using a random priority achieves random get()
15
 */
16
final class Queue implements QueueInterface
17
{
18
    /**
19
     * Maximum millisecond value to use for UTCDateTime creation.
20
     *
21
     * @var integer
22
     */
23
    const MONGO_INT32_MAX = PHP_INT_MAX;
24
25
    /**
26
     * mongo collection to use for queue.
27
     *
28
     * @var \MongoDB\Collection
29
     */
30
    private $collection;
31
32
    /**
33
     * Construct queue.
34
     *
35
     * @param \MongoDB\Collection|string $collectionOrUrl A MongoCollection instance or the mongo connection url.
36
     * @param string $db the mongo db name
37
     * @param string $collection the collection name to use for the queue
38
     *
39
     * @throws \InvalidArgumentException $collectionOrUrl, $db or $collection was not a string
40
     */
41
    public function __construct($collectionOrUrl, $db = null, $collection = null)
42
    {
43
        if ($collectionOrUrl instanceof \MongoDB\Collection) {
44
            $this->collection = $collectionOrUrl;
45
            return;
46
        }
47
48
        if (!is_string($collectionOrUrl)) {
49
            throw new \InvalidArgumentException('$collectionOrUrl was not a string');
50
        }
51
52
        if (!is_string($db)) {
53
            throw new \InvalidArgumentException('$db was not a string');
54
        }
55
56
        if (!is_string($collection)) {
57
            throw new \InvalidArgumentException('$collection was not a string');
58
        }
59
60
        $mongo = new \MongoDB\Client($collectionOrUrl, [], ['typeMap' => ['root' => 'array', 'document' => 'array', 'array' => 'array']]);
61
        $mongoDb = $mongo->selectDatabase($db);
62
        $this->collection = $mongoDb->selectCollection($collection);
63
    }
64
65
    /**
66
     * Ensure an index for the get() method.
67
     *
68
     * @param array $beforeSort Fields in get() call to index before the sort field in same format
69
     *                          as \MongoDB\Collection::ensureIndex()
70
     * @param array $afterSort  Fields in get() call to index after the sort field in same format as
71
     *                          \MongoDB\Collection::ensureIndex()
72
     *
73
     * @return void
74
     *
75
     * @throws \InvalidArgumentException value of $beforeSort or $afterSort is not 1 or -1 for ascending and descending
76
     * @throws \InvalidArgumentException key in $beforeSort or $afterSort was not a string
77
     */
78
    public function ensureGetIndex(array $beforeSort = [], array $afterSort = [])
79
    {
80
        //using general rule: equality, sort, range or more equality tests in that order for index
81
        $completeFields = ['earliestGet' => 1];
82
83
        self::verifySort($beforeSort, 'beforeSort', $completeFields);
84
85
        $completeFields['priority'] = 1;
86
        $completeFields['created'] = 1;
87
88
        self::verifySort($afterSort, 'afterSort', $completeFields);
89
90
        //for the main query in get()
91
        $this->ensureIndex($completeFields);
92
    }
93
94
    /**
95
     * Ensure an index for the count() method.
96
     * Is a no-op if the generated index is a prefix of an existing one. If you have a similar ensureGetIndex call,
97
     * call it first.
98
     *
99
     * @param array $fields fields in count() call to index in same format as \MongoDB\Collection::createIndex()
100
     * @param bool $includeRunning whether to include the running field in the index
101
     *
102
     * @return void
103
     *
104
     * @throws \InvalidArgumentException $includeRunning was not a boolean
105
     * @throws \InvalidArgumentException key in $fields was not a string
106
     * @throws \InvalidArgumentException value of $fields is not 1 or -1 for ascending and descending
107
     */
108
    public function ensureCountIndex(array $fields, $includeRunning)
109
    {
110
        if (!is_bool($includeRunning)) {
111
            throw new \InvalidArgumentException('$includeRunning was not a boolean');
112
        }
113
114
        $completeFields = [];
115
116
        if ($includeRunning) {
117
            $completeFields['earliestGet'] = 1;
118
        }
119
120
        self::verifySort($fields, 'fields', $completeFields);
121
122
        $this->ensureIndex($completeFields);
123
    }
124
125
    /**
126
     * Get a non running message from the queue.
127
     *
128
     * @param array $query in same format as \MongoDB\Collection::find() where top level fields do not contain operators.
129
     *                     Lower level fields can however. eg: valid {a: {$gt: 1}, "b.c": 3},
130
     *                     invalid {$and: [{...}, {...}]}
131
     * @param int $runningResetDuration second duration the message can stay unacked before it resets and can be
132
     *                                  retreived again.
133
     * @param int $waitDurationInMillis millisecond duration to wait for a message.
134
     * @param int $pollDurationInMillis millisecond duration to wait between polls.
135
     *
136
     * @return array|null the message or null if one is not found
137
     *
138
     * @throws \InvalidArgumentException $runningResetDuration, $waitDurationInMillis or $pollDurationInMillis was not
139
     *                                   an int
140
     * @throws \InvalidArgumentException key in $query was not a string
141
     */
142
    public function get(array $query, $runningResetDuration, $waitDurationInMillis = 3000, $pollDurationInMillis = 200)
143
    {
144
        if (!is_int($runningResetDuration)) {
145
            throw new \InvalidArgumentException('$runningResetDuration was not an int');
146
        }
147
148
        if (!is_int($waitDurationInMillis)) {
149
            throw new \InvalidArgumentException('$waitDurationInMillis was not an int');
150
        }
151
152
        if (!is_int($pollDurationInMillis)) {
153
            throw new \InvalidArgumentException('$pollDurationInMillis was not an int');
154
        }
155
156
        if ($pollDurationInMillis < 0) {
157
            $pollDurationInMillis = 0;
158
        }
159
160
        $completeQuery['earliestGet'] = ['$lte' => new UTCDateTime((int)(microtime(true) * 1000))];
0 ignored issues
show
Coding Style Comprehensibility introduced by
$completeQuery was never initialized. Although not strictly required by PHP, it is generally a good practice to add $completeQuery = array(); before regardless.

Adding an explicit array definition is generally preferable to implicit array definition as it guarantees a stable state of the code.

Let’s take a look at an example:

foreach ($collection as $item) {
    $myArray['foo'] = $item->getFoo();

    if ($item->hasBar()) {
        $myArray['bar'] = $item->getBar();
    }

    // do something with $myArray
}

As you can see in this example, the array $myArray is initialized the first time when the foreach loop is entered. You can also see that the value of the bar key is only written conditionally; thus, its value might result from a previous iteration.

This might or might not be intended. To make your intention clear, your code more readible and to avoid accidental bugs, we recommend to add an explicit initialization $myArray = array() either outside or inside the foreach loop.

Loading history...
161 View Code Duplication
        foreach ($query as $key => $value) {
162
            if (!is_string($key)) {
163
                throw new \InvalidArgumentException('key in $query was not a string');
164
            }
165
166
            $completeQuery["payload.{$key}"] = $value;
167
        }
168
169
        $resetTimestamp = time() + $runningResetDuration;
170
        //ints overflow to floats
171
        if (!is_int($resetTimestamp)) {
172
            $resetTimestamp = $runningResetDuration > 0 ? self::MONGO_INT32_MAX : 0;
173
        }
174
175
        $resetTimestamp = min(max(0, $resetTimestamp * 1000), self::MONGO_INT32_MAX);
176
177
        $update = ['$set' => ['earliestGet' => new UTCDateTime($resetTimestamp)]];
178
        $options = ['sort' => ['priority' => 1, 'created' => 1]];
179
180
        //ints overflow to floats, should be fine
181
        $end = microtime(true) + ($waitDurationInMillis / 1000.0);
182
183
        $sleepTime = $pollDurationInMillis * 1000;
184
        //ints overflow to floats and already checked $pollDurationInMillis was positive
185
        if (!is_int($sleepTime)) {
186
            //ignore since testing a giant sleep takes too long
187
            //@codeCoverageIgnoreStart
188
            $sleepTime = PHP_INT_MAX;
189
        }   //@codeCoverageIgnoreEnd
190
191
        while (true) {
192
            $message = $this->collection->findOneAndUpdate($completeQuery, $update, $options);
193
            //checking if _id exist because findAndModify doesnt seem to return null when it can't match the query on
194
            //older mongo extension
195
            if ($message !== null && array_key_exists('_id', $message)) {
196
                // findOneAndUpdate does not correctly return result according to typeMap options so just refetch.
197
                $message = $this->collection->findOne(['_id' => $message->_id]);
198
                //id on left of union operator so a possible id in payload doesnt wipe it out the generated one
199
                return ['id' => $message['_id']] + (array)$message['payload'];
200
            }
201
202
            if (microtime(true) >= $end) {
203
                return null;
204
            }
205
206
            usleep($sleepTime);
207
        }
208
209
        //ignore since always return from the function from the while loop
210
        //@codeCoverageIgnoreStart
211
    }
212
    //@codeCoverageIgnoreEnd
213
214
    /**
215
     * Count queue messages.
216
     *
217
     * @param array $query in same format as \MongoDB\Collection::find() where top level fields do not contain operators.
218
     * Lower level fields can however. eg: valid {a: {$gt: 1}, "b.c": 3}, invalid {$and: [{...}, {...}]}
219
     * @param bool|null $running query a running message or not or all
220
     *
221
     * @return int the count
222
     *
223
     * @throws \InvalidArgumentException $running was not null and not a bool
224
     * @throws \InvalidArgumentException key in $query was not a string
225
     */
226
    public function count(array $query, $running = null)
227
    {
228
        if ($running !== null && !is_bool($running)) {
229
            throw new \InvalidArgumentException('$running was not null and not a bool');
230
        }
231
232
        $totalQuery = [];
233
234
        if ($running === true || $running === false) {
235
            $key = $running ? '$gt' : '$lte';
236
            $totalQuery['earliestGet'] = [$key => new UTCDateTime((int)(microtime(true) * 1000))];
237
        }
238
239 View Code Duplication
        foreach ($query as $key => $value) {
240
            if (!is_string($key)) {
241
                throw new \InvalidArgumentException('key in $query was not a string');
242
            }
243
244
            $totalQuery["payload.{$key}"] = $value;
245
        }
246
247
        return $this->collection->count($totalQuery);
248
    }
249
250
    /**
251
     * Acknowledge a message was processed and remove from queue.
252
     *
253
     * @param array $message message received from get()
254
     *
255
     * @return void
256
     *
257
     * @throws \InvalidArgumentException $message does not have a field "id" that is a MongoDB\BSON\ObjectID
258
     */
259
    public function ack(array $message)
260
    {
261
        $id = null;
262
        if (array_key_exists('id', $message)) {
263
            $id = $message['id'];
264
        }
265
266
        if (!(is_a($id, 'MongoDB\BSON\ObjectID'))) {
267
            throw new \InvalidArgumentException('$message does not have a field "id" that is a ObjectID');
268
        }
269
270
        $this->collection->deleteOne(['_id' => $id]);
271
    }
272
273
    /**
274
     * Atomically acknowledge and send a message to the queue.
275
     *
276
     * @param array $message the message to ack received from get()
277
     * @param array $payload the data to store in the message to send. Data is handled same way
278
     *                       as \MongoDB\Collection::insertOne()
279
     * @param int $earliestGet earliest unix timestamp the message can be retreived.
280
     * @param float $priority priority for order out of get(). 0 is higher priority than 1
281
     * @param bool $newTimestamp true to give the payload a new timestamp or false to use given message timestamp
282
     *
283
     * @return void
284
     *
285
     * @throws \InvalidArgumentException $message does not have a field "id" that is a ObjectID
286
     * @throws \InvalidArgumentException $earliestGet was not an int
287
     * @throws \InvalidArgumentException $priority was not a float
288
     * @throws \InvalidArgumentException $priority is NaN
289
     * @throws \InvalidArgumentException $newTimestamp was not a bool
290
     */
291
    public function ackSend(array $message, array $payload, $earliestGet = 0, $priority = 0.0, $newTimestamp = true)
292
    {
293
        $id = null;
294
        if (array_key_exists('id', $message)) {
295
            $id = $message['id'];
296
        }
297
298
        if (!(is_a($id, 'MongoDB\BSON\ObjectID'))) {
299
            throw new \InvalidArgumentException('$message does not have a field "id" that is a ObjectID');
300
        }
301
302
        if (!is_int($earliestGet)) {
303
            throw new \InvalidArgumentException('$earliestGet was not an int');
304
        }
305
306
        if (!is_float($priority)) {
307
            throw new \InvalidArgumentException('$priority was not a float');
308
        }
309
310
        if (is_nan($priority)) {
311
            throw new \InvalidArgumentException('$priority was NaN');
312
        }
313
314
        if ($newTimestamp !== true && $newTimestamp !== false) {
315
            throw new \InvalidArgumentException('$newTimestamp was not a bool');
316
        }
317
318
        //Ensure $earliestGet is between 0 and MONGO_INT32_MAX
319
        $earliestGet = min(max(0, $earliestGet * 1000), self::MONGO_INT32_MAX);
320
321
        $toSet = [
322
            'payload' => $payload,
323
            'earliestGet' => new UTCDateTime($earliestGet),
324
            'priority' => $priority,
325
        ];
326
        if ($newTimestamp) {
327
            $toSet['created'] = new UTCDateTime((int)(microtime(true) * 1000));
328
        }
329
330
        //using upsert because if no documents found then the doc was removed (SHOULD ONLY HAPPEN BY SOMEONE MANUALLY)
331
        //so we can just send
332
        $this->collection->updateOne(['_id' => $id], ['$set' => $toSet], ['upsert' => true]);
333
    }
334
335
    /**
336
     * Requeue message to the queue. Same as ackSend() with the same message.
337
     *
338
     * @param array $message message received from get().
339
     * @param int $earliestGet earliest unix timestamp the message can be retreived.
340
     * @param float $priority priority for order out of get(). 0 is higher priority than 1
341
     * @param bool $newTimestamp true to give the payload a new timestamp or false to use given message timestamp
342
     *
343
     * @return void
344
     *
345
     * @throws \InvalidArgumentException $message does not have a field "id" that is a ObjectID
346
     * @throws \InvalidArgumentException $earliestGet was not an int
347
     * @throws \InvalidArgumentException $priority was not a float
348
     * @throws \InvalidArgumentException priority is NaN
349
     * @throws \InvalidArgumentException $newTimestamp was not a bool
350
     */
351
    public function requeue(array $message, $earliestGet = 0, $priority = 0.0, $newTimestamp = true)
352
    {
353
        $forRequeue = $message;
354
        unset($forRequeue['id']);
355
        $this->ackSend($message, $forRequeue, $earliestGet, $priority, $newTimestamp);
356
    }
357
358
    /**
359
     * Send a message to the queue.
360
     *
361
     * @param array $payload the data to store in the message. Data is handled same way as \MongoDB\Collection::insertOne()
362
     * @param int $earliestGet earliest unix timestamp the message can be retreived.
363
     * @param float $priority priority for order out of get(). 0 is higher priority than 1
364
     *
365
     * @return void
366
     *
367
     * @throws \InvalidArgumentException $earliestGet was not an int
368
     * @throws \InvalidArgumentException $priority was not a float
369
     * @throws \InvalidArgumentException $priority is NaN
370
     */
371
    public function send(array $payload, $earliestGet = 0, $priority = 0.0)
372
    {
373
        if (!is_int($earliestGet)) {
374
            throw new \InvalidArgumentException('$earliestGet was not an int');
375
        }
376
377
        if (!is_float($priority)) {
378
            throw new \InvalidArgumentException('$priority was not a float');
379
        }
380
381
        if (is_nan($priority)) {
382
            throw new \InvalidArgumentException('$priority was NaN');
383
        }
384
385
        //Ensure $earliestGet is between 0 and MONGO_INT32_MAX
386
        $earliestGet = min(max(0, $earliestGet * 1000), self::MONGO_INT32_MAX);
387
388
        $message = [
389
            'payload' => $payload,
390
            'earliestGet' => new UTCDateTime($earliestGet),
391
            'priority' => $priority,
392
            'created' => new UTCDateTime((int)(microtime(true) * 1000)),
393
        ];
394
395
        $this->collection->insertOne($message);
396
    }
397
398
    /**
399
     * Ensure index of correct specification and a unique name whether the specification or name already exist or not.
400
     * Will not create index if $index is a prefix of an existing index
401
     *
402
     * @param array $index index to create in same format as \MongoDB\Collection::createIndex()
403
     *
404
     * @return void
405
     *
406
     * @throws \Exception couldnt create index after 5 attempts
407
     */
408
    private function ensureIndex(array $index)
409
    {
410
        //if $index is a prefix of any existing index we are good
411
        foreach ($this->collection->listIndexes() as $existingIndex) {
412
            $slice = array_slice($existingIndex['key'], 0, count($index), true);
413
            if ($slice === $index) {
414
                return;
415
            }
416
        }
417
418
        for ($i = 0; $i < 5; ++$i) {
419
            for ($name = uniqid(); strlen($name) > 0; $name = substr($name, 0, -1)) {
420
                //creating an index with same name and different spec does nothing.
421
                //creating an index with same spec and different name does nothing.
422
                //so we use any generated name, and then find the right spec after we have called,
423
                //and just go with that name.
424
                try {
425
                    $this->collection->createIndex($index, ['name' => $name, 'background' => true]);
426
                } catch (\MongoDB\Exception\Exception $e) {
427
                    //this happens when the name was too long, let continue
428
                }
429
430
                foreach ($this->collection->listIndexes() as $existingIndex) {
431
                    if ($existingIndex['key'] === $index) {
432
                        return;
433
                    }
434
                }
435
            }
436
        }
437
438
        throw new \Exception('couldnt create index after 5 attempts');
439
        //@codeCoverageIgnoreEnd
440
    }
441
442
    /**
443
     * Helper method to validate keys and values for the given sort array
444
     *
445
     * @param array  $sort             The proposed sort for a mongo index.
446
     * @param string $label            The name of the variable given to the public ensureXIndex method.
447
     * @param array  &$completedFields The final index array with payload. prefix added to fields.
448
     *
449
     * @return void
450
     */
451
    private static function verifySort(array $sort, $label, &$completeFields)
452
    {
453
        foreach ($sort as $key => $value) {
454
            if (!is_string($key)) {
455
                throw new \InvalidArgumentException("key in \${$label} was not a string");
456
            }
457
458
            if ($value !== 1 && $value !== -1) {
459
                throw new \InvalidArgumentException(
460
                    "value of \${$label} is not 1 or -1 for ascending and descending"
461
                );
462
            }
463
464
            $completeFields["payload.{$key}"] = $value;
465
        }
466
    }
467
}
468