Failed Conditions
Pull Request — master (#52)
by Chad
03:40
created

AbstractQueue::ensureIndex()   C

Complexity

Conditions 8
Paths 17

Size

Total Lines 33
Code Lines 14

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
dl 0
loc 33
rs 5.3846
c 0
b 0
f 0
cc 8
eloc 14
nc 17
nop 1
1
<?php
2
/**
3
 * Defines the DominionEnterprises\Mongo\Queue class.
4
 */
5
6
namespace DominionEnterprises\Mongo;
7
8
/**
9
 * Abstraction of mongo db collection as priority queue.
10
 *
11
 * Tied priorities are ordered by time. So you may use a single priority for normal queuing (default args exist for
12
 * this purpose).  Using a random priority achieves random get()
13
 */
14
abstract class AbstractQueue implements QueueInterface
15
{
16
    /**
17
     * Maximum millisecond value to use for UTCDateTime creation.
18
     *
19
     * @var integer
20
     */
21
    const MONGO_INT32_MAX = PHP_INT_MAX;
22
23
    /**
24
     * mongo collection to use for queue.
25
     *
26
     * @var \MongoDB\Collection
27
     */
28
    protected $collection;
29
30
    /**
31
     * Ensure an index for the get() method.
32
     *
33
     * @param array $beforeSort Fields in get() call to index before the sort field in same format
34
     *                          as \MongoDB\Collection::ensureIndex()
35
     * @param array $afterSort  Fields in get() call to index after the sort field in same format as
36
     *                          \MongoDB\Collection::ensureIndex()
37
     *
38
     * @return void
39
     *
40
     * @throws \InvalidArgumentException value of $beforeSort or $afterSort is not 1 or -1 for ascending and descending
41
     * @throws \InvalidArgumentException key in $beforeSort or $afterSort was not a string
42
     */
43
    final public function ensureGetIndex(array $beforeSort = [], array $afterSort = [])
44
    {
45
        //using general rule: equality, sort, range or more equality tests in that order for index
46
        $completeFields = ['running' => 1];
47
48
        self::verifySort($beforeSort, 'beforeSort', $completeFields);
49
50
        $completeFields['priority'] = 1;
51
        $completeFields['created'] = 1;
52
53
        self::verifySort($afterSort, 'afterSort', $completeFields);
54
55
        $completeFields['earliestGet'] = 1;
56
57
        //for the main query in get()
58
        $this->ensureIndex($completeFields);
59
60
        //for the stuck messages query in get()
61
        $this->ensureIndex(['running' => 1, 'resetTimestamp' => 1]);
62
    }
63
64
    /**
65
     * Ensure an index for the count() method.
66
     * Is a no-op if the generated index is a prefix of an existing one. If you have a similar ensureGetIndex call,
67
     * call it first.
68
     *
69
     * @param array $fields fields in count() call to index in same format as \MongoDB\Collection::createIndex()
70
     * @param bool $includeRunning whether to include the running field in the index
71
     *
72
     * @return void
73
     *
74
     * @throws \InvalidArgumentException $includeRunning was not a boolean
75
     * @throws \InvalidArgumentException key in $fields was not a string
76
     * @throws \InvalidArgumentException value of $fields is not 1 or -1 for ascending and descending
77
     */
78
    final public function ensureCountIndex(array $fields, $includeRunning)
79
    {
80
        if (!is_bool($includeRunning)) {
81
            throw new \InvalidArgumentException('$includeRunning was not a boolean');
82
        }
83
84
        $completeFields = [];
85
86
        if ($includeRunning) {
87
            $completeFields['running'] = 1;
88
        }
89
90
        self::verifySort($fields, 'fields', $completeFields);
91
92
        $this->ensureIndex($completeFields);
93
    }
94
95
    /**
96
     * Get a non running message from the queue.
97
     *
98
     * @param array $query in same format as \MongoDB\Collection::find() where top level fields do not contain operators.
99
     *                     Lower level fields can however. eg: valid {a: {$gt: 1}, "b.c": 3},
100
     *                     invalid {$and: [{...}, {...}]}
101
     * @param int $runningResetDuration second duration the message can stay unacked before it resets and can be
102
     *                                  retreived again.
103
     * @param int $waitDurationInMillis millisecond duration to wait for a message.
104
     * @param int $pollDurationInMillis millisecond duration to wait between polls.
105
     *
106
     * @return array|null the message or null if one is not found
107
     *
108
     * @throws \InvalidArgumentException $runningResetDuration, $waitDurationInMillis or $pollDurationInMillis was not
109
     *                                   an int
110
     * @throws \InvalidArgumentException key in $query was not a string
111
     */
112
    final public function get(array $query, $runningResetDuration, $waitDurationInMillis = 3000, $pollDurationInMillis = 200)
113
    {
114
        if (!is_int($runningResetDuration)) {
115
            throw new \InvalidArgumentException('$runningResetDuration was not an int');
116
        }
117
118
        if (!is_int($waitDurationInMillis)) {
119
            throw new \InvalidArgumentException('$waitDurationInMillis was not an int');
120
        }
121
122
        if (!is_int($pollDurationInMillis)) {
123
            throw new \InvalidArgumentException('$pollDurationInMillis was not an int');
124
        }
125
126
        if ($pollDurationInMillis < 0) {
127
            $pollDurationInMillis = 0;
128
        }
129
130
        //reset stuck messages
131
        $this->collection->updateMany(
132
            ['running' => true, 'resetTimestamp' => ['$lte' => new \MongoDB\BSON\UTCDateTime((int)(microtime(true) * 1000))]],
133
            ['$set' => ['running' => false]]
134
        );
135
136
        $completeQuery = ['running' => false];
137 View Code Duplication
        foreach ($query as $key => $value) {
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated across your project.

Duplicated code is one of the most pungent code smells. If you need to duplicate the same code in three or more different places, we strongly encourage you to look into extracting the code into a single class or operation.

You can also find more detailed suggestions in the “Code” section of your repository.

Loading history...
138
            if (!is_string($key)) {
139
                throw new \InvalidArgumentException('key in $query was not a string');
140
            }
141
142
            $completeQuery["payload.{$key}"] = $value;
143
        }
144
145
        $completeQuery['earliestGet'] = ['$lte' => new \MongoDB\BSON\UTCDateTime((int)(microtime(true) * 1000))];
146
147
        $resetTimestamp = time() + $runningResetDuration;
148
        //ints overflow to floats
149
        if (!is_int($resetTimestamp)) {
150
            $resetTimestamp = $runningResetDuration > 0 ? self::MONGO_INT32_MAX : 0;
151
        }
152
153
        $resetTimestamp = min(max(0, $resetTimestamp * 1000), self::MONGO_INT32_MAX);
154
155
        $update = ['$set' => ['resetTimestamp' => new \MongoDB\BSON\UTCDateTime($resetTimestamp), 'running' => true]];
156
        $options = ['sort' => ['priority' => 1, 'created' => 1]];
157
158
        //ints overflow to floats, should be fine
159
        $end = microtime(true) + ($waitDurationInMillis / 1000.0);
160
161
        $sleepTime = $pollDurationInMillis * 1000;
162
        //ints overflow to floats and already checked $pollDurationInMillis was positive
163
        if (!is_int($sleepTime)) {
164
            //ignore since testing a giant sleep takes too long
165
            //@codeCoverageIgnoreStart
166
            $sleepTime = PHP_INT_MAX;
167
        }   //@codeCoverageIgnoreEnd
168
169
        while (true) {
170
            $message = $this->collection->findOneAndUpdate($completeQuery, $update, $options);
171
            //checking if _id exist because findAndModify doesnt seem to return null when it can't match the query on
172
            //older mongo extension
173
            if ($message !== null && array_key_exists('_id', $message)) {
174
                // findOneAndUpdate does not correctly return result according to typeMap options so just refetch.
175
                $message = $this->collection->findOne(['_id' => $message->_id]);
176
                //id on left of union operator so a possible id in payload doesnt wipe it out the generated one
177
                return ['id' => $message['_id']] + (array)$message['payload'];
178
            }
179
180
            if (microtime(true) >= $end) {
181
                return null;
182
            }
183
184
            usleep($sleepTime);
185
        }
186
187
        //ignore since always return from the function from the while loop
188
        //@codeCoverageIgnoreStart
189
    }
190
    //@codeCoverageIgnoreEnd
191
192
    /**
193
     * Count queue messages.
194
     *
195
     * @param array $query in same format as \MongoDB\Collection::find() where top level fields do not contain operators.
196
     * Lower level fields can however. eg: valid {a: {$gt: 1}, "b.c": 3}, invalid {$and: [{...}, {...}]}
197
     * @param bool|null $running query a running message or not or all
198
     *
199
     * @return int the count
200
     *
201
     * @throws \InvalidArgumentException $running was not null and not a bool
202
     * @throws \InvalidArgumentException key in $query was not a string
203
     */
204
    final public function count(array $query, $running = null)
205
    {
206
        if ($running !== null && !is_bool($running)) {
207
            throw new \InvalidArgumentException('$running was not null and not a bool');
208
        }
209
210
        $totalQuery = [];
211
212
        if ($running !== null) {
213
            $totalQuery['running'] = $running;
214
        }
215
216 View Code Duplication
        foreach ($query as $key => $value) {
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated across your project.

Duplicated code is one of the most pungent code smells. If you need to duplicate the same code in three or more different places, we strongly encourage you to look into extracting the code into a single class or operation.

You can also find more detailed suggestions in the “Code” section of your repository.

Loading history...
217
            if (!is_string($key)) {
218
                throw new \InvalidArgumentException('key in $query was not a string');
219
            }
220
221
            $totalQuery["payload.{$key}"] = $value;
222
        }
223
224
        return $this->collection->count($totalQuery);
225
    }
226
227
    /**
228
     * Acknowledge a message was processed and remove from queue.
229
     *
230
     * @param array $message message received from get()
231
     *
232
     * @return void
233
     *
234
     * @throws \InvalidArgumentException $message does not have a field "id" that is a MongoDB\BSON\ObjectID
235
     */
236
    final public function ack(array $message)
237
    {
238
        $id = null;
239
        if (array_key_exists('id', $message)) {
240
            $id = $message['id'];
241
        }
242
243
        if (!(is_a($id, 'MongoDB\BSON\ObjectID'))) {
244
            throw new \InvalidArgumentException('$message does not have a field "id" that is a ObjectID');
245
        }
246
247
        $this->collection->deleteOne(['_id' => $id]);
248
    }
249
250
    /**
251
     * Atomically acknowledge and send a message to the queue.
252
     *
253
     * @param array $message the message to ack received from get()
254
     * @param array $payload the data to store in the message to send. Data is handled same way
255
     *                       as \MongoDB\Collection::insertOne()
256
     * @param int $earliestGet earliest unix timestamp the message can be retreived.
257
     * @param float $priority priority for order out of get(). 0 is higher priority than 1
258
     * @param bool $newTimestamp true to give the payload a new timestamp or false to use given message timestamp
259
     *
260
     * @return void
261
     *
262
     * @throws \InvalidArgumentException $message does not have a field "id" that is a ObjectID
263
     * @throws \InvalidArgumentException $earliestGet was not an int
264
     * @throws \InvalidArgumentException $priority was not a float
265
     * @throws \InvalidArgumentException $priority is NaN
266
     * @throws \InvalidArgumentException $newTimestamp was not a bool
267
     */
268
    final public function ackSend(array $message, array $payload, $earliestGet = 0, $priority = 0.0, $newTimestamp = true)
269
    {
270
        $id = null;
271
        if (array_key_exists('id', $message)) {
272
            $id = $message['id'];
273
        }
274
275
        if (!(is_a($id, 'MongoDB\BSON\ObjectID'))) {
276
            throw new \InvalidArgumentException('$message does not have a field "id" that is a ObjectID');
277
        }
278
279
        if (!is_int($earliestGet)) {
280
            throw new \InvalidArgumentException('$earliestGet was not an int');
281
        }
282
283
        if (!is_float($priority)) {
284
            throw new \InvalidArgumentException('$priority was not a float');
285
        }
286
287
        if (is_nan($priority)) {
288
            throw new \InvalidArgumentException('$priority was NaN');
289
        }
290
291
        if ($newTimestamp !== true && $newTimestamp !== false) {
292
            throw new \InvalidArgumentException('$newTimestamp was not a bool');
293
        }
294
295
        //Ensure $earliestGet is between 0 and MONGO_INT32_MAX
296
        $earliestGet = min(max(0, $earliestGet * 1000), self::MONGO_INT32_MAX);
297
298
        $toSet = [
299
            'payload' => $payload,
300
            'running' => false,
301
            'resetTimestamp' => new \MongoDB\BSON\UTCDateTime(self::MONGO_INT32_MAX),
302
            'earliestGet' => new \MongoDB\BSON\UTCDateTime($earliestGet),
303
            'priority' => $priority,
304
        ];
305
        if ($newTimestamp) {
306
            $toSet['created'] = new \MongoDB\BSON\UTCDateTime((int)(microtime(true) * 1000));
307
        }
308
309
        //using upsert because if no documents found then the doc was removed (SHOULD ONLY HAPPEN BY SOMEONE MANUALLY)
310
        //so we can just send
311
        $this->collection->updateOne(['_id' => $id], ['$set' => $toSet], ['upsert' => true]);
312
    }
313
314
    /**
315
     * Requeue message to the queue. Same as ackSend() with the same message.
316
     *
317
     * @param array $message message received from get().
318
     * @param int $earliestGet earliest unix timestamp the message can be retreived.
319
     * @param float $priority priority for order out of get(). 0 is higher priority than 1
320
     * @param bool $newTimestamp true to give the payload a new timestamp or false to use given message timestamp
321
     *
322
     * @return void
323
     *
324
     * @throws \InvalidArgumentException $message does not have a field "id" that is a ObjectID
325
     * @throws \InvalidArgumentException $earliestGet was not an int
326
     * @throws \InvalidArgumentException $priority was not a float
327
     * @throws \InvalidArgumentException priority is NaN
328
     * @throws \InvalidArgumentException $newTimestamp was not a bool
329
     */
330
    final public function requeue(array $message, $earliestGet = 0, $priority = 0.0, $newTimestamp = true)
331
    {
332
        $forRequeue = $message;
333
        unset($forRequeue['id']);
334
        $this->ackSend($message, $forRequeue, $earliestGet, $priority, $newTimestamp);
335
    }
336
337
    /**
338
     * Send a message to the queue.
339
     *
340
     * @param array $payload the data to store in the message. Data is handled same way as \MongoDB\Collection::insertOne()
341
     * @param int $earliestGet earliest unix timestamp the message can be retreived.
342
     * @param float $priority priority for order out of get(). 0 is higher priority than 1
343
     *
344
     * @return void
345
     *
346
     * @throws \InvalidArgumentException $earliestGet was not an int
347
     * @throws \InvalidArgumentException $priority was not a float
348
     * @throws \InvalidArgumentException $priority is NaN
349
     */
350
    final public function send(array $payload, $earliestGet = 0, $priority = 0.0)
351
    {
352
        if (!is_int($earliestGet)) {
353
            throw new \InvalidArgumentException('$earliestGet was not an int');
354
        }
355
356
        if (!is_float($priority)) {
357
            throw new \InvalidArgumentException('$priority was not a float');
358
        }
359
360
        if (is_nan($priority)) {
361
            throw new \InvalidArgumentException('$priority was NaN');
362
        }
363
364
        //Ensure $earliestGet is between 0 and MONGO_INT32_MAX
365
        $earliestGet = min(max(0, $earliestGet * 1000), self::MONGO_INT32_MAX);
366
367
        $message = [
368
            'payload' => $payload,
369
            'running' => false,
370
            'resetTimestamp' => new \MongoDB\BSON\UTCDateTime(self::MONGO_INT32_MAX),
371
            'earliestGet' => new \MongoDB\BSON\UTCDateTime($earliestGet),
372
            'priority' => $priority,
373
            'created' => new \MongoDB\BSON\UTCDateTime((int)(microtime(true) * 1000)),
374
        ];
375
376
        $this->collection->insertOne($message);
377
    }
378
379
    /**
380
     * Ensure index of correct specification and a unique name whether the specification or name already exist or not.
381
     * Will not create index if $index is a prefix of an existing index
382
     *
383
     * @param array $index index to create in same format as \MongoDB\Collection::createIndex()
384
     *
385
     * @return void
386
     *
387
     * @throws \Exception couldnt create index after 5 attempts
388
     */
389
    final protected function ensureIndex(array $index)
390
    {
391
        //if $index is a prefix of any existing index we are good
392
        foreach ($this->collection->listIndexes() as $existingIndex) {
393
            $slice = array_slice($existingIndex['key'], 0, count($index), true);
394
            if ($slice === $index) {
395
                return;
396
            }
397
        }
398
399
        for ($i = 0; $i < 5; ++$i) {
400
            for ($name = uniqid(); strlen($name) > 0; $name = substr($name, 0, -1)) {
401
                //creating an index with same name and different spec does nothing.
402
                //creating an index with same spec and different name does nothing.
403
                //so we use any generated name, and then find the right spec after we have called,
404
                //and just go with that name.
405
                try {
406
                    $this->collection->createIndex($index, ['name' => $name, 'background' => true]);
407
                } catch (\MongoDB\Exception\Exception $e) {
408
                    //this happens when the name was too long, let continue
409
                }
410
411
                foreach ($this->collection->listIndexes() as $existingIndex) {
412
                    if ($existingIndex['key'] === $index) {
413
                        return;
414
                    }
415
                }
416
            }
417
        }
418
419
        throw new \Exception('couldnt create index after 5 attempts');
420
        //@codeCoverageIgnoreEnd
421
    }
422
423
    /**
424
     * Helper method to validate keys and values for the given sort array
425
     *
426
     * @param array  $sort             The proposed sort for a mongo index.
427
     * @param string $label            The name of the variable given to the public ensureXIndex method.
428
     * @param array  &$completedFields The final index array with payload. prefix added to fields.
429
     *
430
     * @return void
431
     */
432
    final protected static function verifySort(array $sort, $label, &$completeFields)
433
    {
434
        foreach ($sort as $key => $value) {
435
            if (!is_string($key)) {
436
                throw new \InvalidArgumentException("key in \${$label} was not a string");
437
            }
438
439
            if ($value !== 1 && $value !== -1) {
440
                throw new \InvalidArgumentException(
441
                    "value of \${$label} is not 1 or -1 for ascending and descending"
442
                );
443
            }
444
445
            $completeFields["payload.{$key}"] = $value;
446
        }
447
    }
448
}
449