Passed
Pull Request — master (#60)
by Chad
03:55
created

AbstractQueue::count()   C

Complexity

Conditions 8
Paths 7

Size

Total Lines 23
Code Lines 12

Duplication

Lines 7
Ratio 30.43 %

Importance

Changes 0
Metric Value
dl 7
loc 23
rs 6.1403
c 0
b 0
f 0
cc 8
eloc 12
nc 7
nop 2
1
<?php
2
/**
3
 * Defines the TraderInteractive\Mongo\Queue class.
4
 */
5
6
namespace TraderInteractive\Mongo;
7
8
use MongoDB\BSON\UTCDateTime;
9
10
/**
11
 * Abstraction of mongo db collection as priority queue.
12
 *
13
 * Tied priorities are ordered by time. So you may use a single priority for normal queuing (default args exist for
14
 * this purpose).  Using a random priority achieves random get()
15
 */
16
abstract class AbstractQueue implements QueueInterface
17
{
18
    /**
19
     * Maximum millisecond value to use for UTCDateTime creation.
20
     *
21
     * @var integer
22
     */
23
    const MONGO_INT32_MAX = PHP_INT_MAX;
24
25
    /**
26
     * mongo collection to use for queue.
27
     *
28
     * @var \MongoDB\Collection
29
     */
30
    protected $collection;
31
32
    /**
33
     * Ensure an index for the get() method.
34
     *
35
     * @param array $beforeSort Fields in get() call to index before the sort field in same format
36
     *                          as \MongoDB\Collection::ensureIndex()
37
     * @param array $afterSort  Fields in get() call to index after the sort field in same format as
38
     *                          \MongoDB\Collection::ensureIndex()
39
     *
40
     * @return void
41
     *
42
     * @throws \InvalidArgumentException value of $beforeSort or $afterSort is not 1 or -1 for ascending and descending
43
     * @throws \InvalidArgumentException key in $beforeSort or $afterSort was not a string
44
     */
45
    final public function ensureGetIndex(array $beforeSort = [], array $afterSort = [])
46
    {
47
        //using general rule: equality, sort, range or more equality tests in that order for index
48
        $completeFields = ['earliestGet' => 1];
49
50
        self::verifySort($beforeSort, 'beforeSort', $completeFields);
51
52
        $completeFields['priority'] = 1;
53
        $completeFields['created'] = 1;
54
55
        self::verifySort($afterSort, 'afterSort', $completeFields);
56
57
        //for the main query in get()
58
        $this->ensureIndex($completeFields);
59
    }
60
61
    /**
62
     * Ensure an index for the count() method.
63
     * Is a no-op if the generated index is a prefix of an existing one. If you have a similar ensureGetIndex call,
64
     * call it first.
65
     *
66
     * @param array $fields fields in count() call to index in same format as \MongoDB\Collection::createIndex()
67
     * @param bool $includeRunning whether to include the running field in the index
68
     *
69
     * @return void
70
     *
71
     * @throws \InvalidArgumentException key in $fields was not a string
72
     * @throws \InvalidArgumentException value of $fields is not 1 or -1 for ascending and descending
73
     */
74
    final public function ensureCountIndex(array $fields, bool $includeRunning)
75
    {
76
        $completeFields = [];
77
78
        if ($includeRunning) {
79
            $completeFields['earliestGet'] = 1;
80
        }
81
82
        self::verifySort($fields, 'fields', $completeFields);
83
84
        $this->ensureIndex($completeFields);
85
    }
86
87
    /**
88
     * Get a non running message from the queue.
89
     *
90
     * @param array $query in same format as \MongoDB\Collection::find() where top level fields do not contain
91
     *                     operators. Lower level fields can however. eg: valid {a: {$gt: 1}, "b.c": 3},
92
     *                     invalid {$and: [{...}, {...}]}
93
     * @param int $runningResetDuration second duration the message can stay unacked before it resets and can be
94
     *                                  retreived again.
95
     * @param int $waitDurationInMillis millisecond duration to wait for a message.
96
     * @param int $pollDurationInMillis millisecond duration to wait between polls.
97
     *
98
     * @return array|null the message or null if one is not found
99
     *
100
     * @throws \InvalidArgumentException key in $query was not a string
101
     */
102
    final public function get(
103
        array $query,
104
        int $runningResetDuration,
105
        int $waitDurationInMillis = 3000,
106
        int $pollDurationInMillis = 200
107
    ) {
108
        if ($pollDurationInMillis < 0) {
109
            $pollDurationInMillis = 0;
110
        }
111
112
        $completeQuery = ['earliestGet' => ['$lte' => new UTCDateTime((int)(microtime(true) * 1000))]];
113 View Code Duplication
        foreach ($query as $key => $value) {
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated across your project.

Duplicated code is one of the most pungent code smells. If you need to duplicate the same code in three or more different places, we strongly encourage you to look into extracting the code into a single class or operation.

You can also find more detailed suggestions in the “Code” section of your repository.

Loading history...
114
            if (!is_string($key)) {
115
                throw new \InvalidArgumentException('key in $query was not a string');
116
            }
117
118
            $completeQuery["payload.{$key}"] = $value;
119
        }
120
121
        $resetTimestamp = time() + $runningResetDuration;
122
        //ints overflow to floats
123
        if (!is_int($resetTimestamp)) {
124
            $resetTimestamp = $runningResetDuration > 0 ? self::MONGO_INT32_MAX : 0;
125
        }
126
127
        $resetTimestamp = min(max(0, $resetTimestamp * 1000), self::MONGO_INT32_MAX);
128
129
        $update = ['$set' => ['earliestGet' => new UTCDateTime($resetTimestamp)]];
130
        $options = ['sort' => ['priority' => 1, 'created' => 1]];
131
132
        //ints overflow to floats, should be fine
133
        $end = microtime(true) + ($waitDurationInMillis / 1000.0);
134
135
        $sleepTime = $pollDurationInMillis * 1000;
136
        //ints overflow to floats and already checked $pollDurationInMillis was positive
137
        if (!is_int($sleepTime)) {
138
            //ignore since testing a giant sleep takes too long
139
            //@codeCoverageIgnoreStart
140
            $sleepTime = PHP_INT_MAX;
141
        }   //@codeCoverageIgnoreEnd
142
143
        while (true) {
144
            $message = $this->collection->findOneAndUpdate($completeQuery, $update, $options);
145
            //checking if _id exist because findAndModify doesnt seem to return null when it can't match the query on
146
            //older mongo extension
147
            if ($message !== null && array_key_exists('_id', $message)) {
148
                // findOneAndUpdate does not correctly return result according to typeMap options so just refetch.
149
                $message = $this->collection->findOne(['_id' => $message['_id']]);
150
                //id on left of union operator so a possible id in payload doesnt wipe it out the generated one
151
                return ['id' => $message['_id']] + (array)$message['payload'];
152
            }
153
154
            if (microtime(true) >= $end) {
155
                return null;
156
            }
157
158
            usleep($sleepTime);
159
        }
160
161
        //ignore since always return from the function from the while loop
162
        //@codeCoverageIgnoreStart
163
    }
164
    //@codeCoverageIgnoreEnd
165
166
    /**
167
     * Count queue messages.
168
     *
169
     * @param array $query in same format as \MongoDB\Collection::find() where top level fields do not contain
170
     *                     operators. Lower level fields can however. eg: valid {a: {$gt: 1}, "b.c": 3},
171
     *                     invalid {$and: [{...}, {...}]}
172
     * @param bool|null $running query a running message or not or all
173
     *
174
     * @return int the count
175
     *
176
     * @throws \InvalidArgumentException $running was not null and not a bool
177
     * @throws \InvalidArgumentException key in $query was not a string
178
     */
179
    final public function count(array $query, $running = null)
180
    {
181
        if ($running !== null && !is_bool($running)) {
182
            throw new \InvalidArgumentException('$running was not null and not a bool');
183
        }
184
185
        $totalQuery = [];
186
187
        if ($running === true || $running === false) {
188
            $key = $running ? '$gt' : '$lte';
189
            $totalQuery['earliestGet'] = [$key => new UTCDateTime((int)(microtime(true) * 1000))];
190
        }
191
192 View Code Duplication
        foreach ($query as $key => $value) {
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated across your project.

Duplicated code is one of the most pungent code smells. If you need to duplicate the same code in three or more different places, we strongly encourage you to look into extracting the code into a single class or operation.

You can also find more detailed suggestions in the “Code” section of your repository.

Loading history...
193
            if (!is_string($key)) {
194
                throw new \InvalidArgumentException('key in $query was not a string');
195
            }
196
197
            $totalQuery["payload.{$key}"] = $value;
198
        }
199
200
        return $this->collection->count($totalQuery);
201
    }
202
203
    /**
204
     * Acknowledge a message was processed and remove from queue.
205
     *
206
     * @param array $message message received from get()
207
     *
208
     * @return void
209
     *
210
     * @throws \InvalidArgumentException $message does not have a field "id" that is a MongoDB\BSON\ObjectID
211
     */
212
    final public function ack(array $message)
213
    {
214
        $id = null;
215
        if (array_key_exists('id', $message)) {
216
            $id = $message['id'];
217
        }
218
219
        if (!(is_a($id, 'MongoDB\BSON\ObjectID'))) {
220
            throw new \InvalidArgumentException('$message does not have a field "id" that is a ObjectID');
221
        }
222
223
        $this->collection->deleteOne(['_id' => $id]);
224
    }
225
226
    /**
227
     * Atomically acknowledge and send a message to the queue.
228
     *
229
     * @param array $message the message to ack received from get()
230
     * @param array $payload the data to store in the message to send. Data is handled same way
231
     *                       as \MongoDB\Collection::insertOne()
232
     * @param int $earliestGet earliest unix timestamp the message can be retreived.
233
     * @param float $priority priority for order out of get(). 0 is higher priority than 1
234
     * @param bool $newTimestamp true to give the payload a new timestamp or false to use given message timestamp
235
     *
236
     * @return void
237
     *
238
     * @throws \InvalidArgumentException $message does not have a field "id" that is a ObjectID
239
     * @throws \InvalidArgumentException $priority is NaN
240
     */
241
    final public function ackSend(
242
        array $message,
243
        array $payload,
244
        int $earliestGet = 0,
245
        float $priority = 0.0,
246
        bool $newTimestamp = true
247
    ) {
248
        $id = null;
249
        if (array_key_exists('id', $message)) {
250
            $id = $message['id'];
251
        }
252
253
        if (!(is_a($id, 'MongoDB\BSON\ObjectID'))) {
254
            throw new \InvalidArgumentException('$message does not have a field "id" that is a ObjectID');
255
        }
256
257
        if (is_nan($priority)) {
258
            throw new \InvalidArgumentException('$priority was NaN');
259
        }
260
261
        //Ensure $earliestGet is between 0 and MONGO_INT32_MAX
262
        $earliestGet = min(max(0, $earliestGet * 1000), self::MONGO_INT32_MAX);
263
264
        $toSet = [
265
            'payload' => $payload,
266
            'earliestGet' => new UTCDateTime($earliestGet),
267
            'priority' => $priority,
268
        ];
269
        if ($newTimestamp) {
270
            $toSet['created'] = new UTCDateTime((int)(microtime(true) * 1000));
271
        }
272
273
        //using upsert because if no documents found then the doc was removed (SHOULD ONLY HAPPEN BY SOMEONE MANUALLY)
274
        //so we can just send
275
        $this->collection->updateOne(['_id' => $id], ['$set' => $toSet], ['upsert' => true]);
276
    }
277
278
    /**
279
     * Requeue message to the queue. Same as ackSend() with the same message.
280
     *
281
     * @param array $message message received from get().
282
     * @param int $earliestGet earliest unix timestamp the message can be retreived.
283
     * @param float $priority priority for order out of get(). 0 is higher priority than 1
284
     * @param bool $newTimestamp true to give the payload a new timestamp or false to use given message timestamp
285
     *
286
     * @return void
287
     *
288
     * @throws \InvalidArgumentException $message does not have a field "id" that is a ObjectID
289
     * @throws \InvalidArgumentException priority is NaN
290
     */
291
    final public function requeue(
292
        array $message,
293
        int $earliestGet = 0,
294
        float $priority = 0.0,
295
        bool $newTimestamp = true
296
    ) {
297
        $forRequeue = $message;
298
        unset($forRequeue['id']);
299
        $this->ackSend($message, $forRequeue, $earliestGet, $priority, $newTimestamp);
300
    }
301
302
    /**
303
     * Send a message to the queue.
304
     *
305
     * @param array $payload the data to store in the message. Data is handled same way
306
     *                       as \MongoDB\Collection::insertOne()
307
     * @param int $earliestGet earliest unix timestamp the message can be retreived.
308
     * @param float $priority priority for order out of get(). 0 is higher priority than 1
309
     *
310
     * @return void
311
     *
312
     * @throws \InvalidArgumentException $priority is NaN
313
     */
314
    final public function send(array $payload, int $earliestGet = 0, float $priority = 0.0)
315
    {
316
        if (is_nan($priority)) {
317
            throw new \InvalidArgumentException('$priority was NaN');
318
        }
319
320
        //Ensure $earliestGet is between 0 and MONGO_INT32_MAX
321
        $earliestGet = min(max(0, $earliestGet * 1000), self::MONGO_INT32_MAX);
322
323
        $message = [
324
            'payload' => $payload,
325
            'earliestGet' => new UTCDateTime($earliestGet),
326
            'priority' => $priority,
327
            'created' => new UTCDateTime((int)(microtime(true) * 1000)),
328
        ];
329
330
        $this->collection->insertOne($message);
331
    }
332
333
    /**
334
     * Ensure index of correct specification and a unique name whether the specification or name already exist or not.
335
     * Will not create index if $index is a prefix of an existing index
336
     *
337
     * @param array $index index to create in same format as \MongoDB\Collection::createIndex()
338
     *
339
     * @return void
340
     *
341
     * @throws \Exception couldnt create index after 5 attempts
342
     */
343
    final private function ensureIndex(array $index)
344
    {
345
        //if $index is a prefix of any existing index we are good
346
        foreach ($this->collection->listIndexes() as $existingIndex) {
347
            $slice = array_slice($existingIndex['key'], 0, count($index), true);
348
            if ($slice === $index) {
349
                return;
350
            }
351
        }
352
353
        for ($i = 0; $i < 5; ++$i) {
354
            for ($name = uniqid(); strlen($name) > 0; $name = substr($name, 0, -1)) {
355
                //creating an index with same name and different spec does nothing.
356
                //creating an index with same spec and different name does nothing.
357
                //so we use any generated name, and then find the right spec after we have called,
358
                //and just go with that name.
359
                try {
360
                    $this->collection->createIndex($index, ['name' => $name, 'background' => true]);
361
                } catch (\MongoDB\Exception\Exception $e) {
362
                    //this happens when the name was too long, let continue
363
                }
364
365
                foreach ($this->collection->listIndexes() as $existingIndex) {
366
                    if ($existingIndex['key'] === $index) {
367
                        return;
368
                    }
369
                }
370
            }
371
        }
372
373
        throw new \Exception('couldnt create index after 5 attempts');
374
        //@codeCoverageIgnoreEnd
375
    }
376
377
    /**
378
     * Helper method to validate keys and values for the given sort array
379
     *
380
     * @param array  $sort             The proposed sort for a mongo index.
381
     * @param string $label            The name of the variable given to the public ensureXIndex method.
382
     * @param array  &$completedFields The final index array with payload. prefix added to fields.
383
     *
384
     * @return void
385
     */
386
    final private static function verifySort(array $sort, $label, &$completeFields)
387
    {
388
        foreach ($sort as $key => $value) {
389
            if (!is_string($key)) {
390
                throw new \InvalidArgumentException("key in \${$label} was not a string");
391
            }
392
393
            if ($value !== 1 && $value !== -1) {
394
                throw new \InvalidArgumentException(
395
                    "value of \${$label} is not 1 or -1 for ascending and descending"
396
                );
397
            }
398
399
            $completeFields["payload.{$key}"] = $value;
400
        }
401
    }
402
}
403