Completed
Pull Request — master (#60)
by Chad
01:24
created

AbstractQueue::ensureGetIndex()   A

Complexity

Conditions 1
Paths 1

Size

Total Lines 15
Code Lines 7

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
dl 0
loc 15
rs 9.4285
c 0
b 0
f 0
cc 1
eloc 7
nc 1
nop 2
1
<?php
2
/**
3
 * Defines the TraderInteractive\Mongo\Queue class.
4
 */
5
6
namespace TraderInteractive\Mongo;
7
8
use ArrayObject;
9
use MongoDB\BSON\ObjectID;
10
use MongoDB\BSON\UTCDateTime;
11
use MongoDB\Operation\FindOneAndUpdate;
12
13
/**
14
 * Abstraction of mongo db collection as priority queue.
15
 *
16
 * Tied priorities are ordered by time. So you may use a single priority for normal queuing (default args exist for
17
 * this purpose).  Using a random priority achieves random get()
18
 */
19
abstract class AbstractQueue
20
{
21
    /**
22
     * Maximum millisecond value to use for UTCDateTime creation.
23
     *
24
     * @var integer
25
     */
26
    const MONGO_INT32_MAX = PHP_INT_MAX;
27
28
    /**
29
     * mongo collection to use for queue.
30
     *
31
     * @var \MongoDB\Collection
32
     */
33
    protected $collection;
34
35
    /**
36
     * @var array
37
     */
38
    const FIND_ONE_AND_UPDATE_OPTIONS = [
39
        'sort' => ['priority' => 1, 'created' => 1],
40
        'typeMap' => ['root' => 'array', 'document' => 'array', 'array' => 'array'],
41
        'returnDocument' => FindOneAndUpdate::RETURN_DOCUMENT_AFTER,
42
    ];
43
44
    /**
45
     * @var integer
46
     */
47
    const DEFAULT_MAX_NUMBER_OF_MESSAGES = 1;
48
49
    /**
50
     * @var integer
51
     */
52
    const DEFAULT_RUNNING_RESET_DURATION = 600000;
53
54
    /**
55
     * @var integer
56
     */
57
    const DEFAULT_WAIT_DURATION_IN_MILLISECONDS = 3000;
58
59
    /**
60
     * @var integer
61
     */
62
    const DEFAULT_POLL_DURATION_IN_MILLISECONDS = 200;
63
64
    /**
65
     * @var array
66
     */
67
    const DEFAULT_GET_OPTIONS = [
68
       'maxNumberOfMessages' => self::DEFAULT_MAX_NUMBER_OF_MESSAGES,
69
       'runningResetDuration' => self::DEFAULT_RUNNING_RESET_DURATION,
70
       'waitDurationInMillis' => self::DEFAULT_WAIT_DURATION_IN_MILLISECONDS,
71
       'pollDurationInMillis' => self::DEFAULT_POLL_DURATION_IN_MILLISECONDS,
72
    ];
73
74
    /**
75
     * Ensure an index for the get() method.
76
     *
77
     * @param array $beforeSort Fields in get() call to index before the sort field in same format
78
     *                          as \MongoDB\Collection::ensureIndex()
79
     * @param array $afterSort  Fields in get() call to index after the sort field in same format as
80
     *                          \MongoDB\Collection::ensureIndex()
81
     *
82
     * @return void
83
     *
84
     * @throws \InvalidArgumentException value of $beforeSort or $afterSort is not 1 or -1 for ascending and descending
85
     * @throws \InvalidArgumentException key in $beforeSort or $afterSort was not a string
86
     */
87
    final public function ensureGetIndex(array $beforeSort = [], array $afterSort = [])
88
    {
89
        //using general rule: equality, sort, range or more equality tests in that order for index
90
        $completeFields = ['earliestGet' => 1];
91
92
        $this->verifySort($beforeSort, 'beforeSort', $completeFields);
93
94
        $completeFields['priority'] = 1;
95
        $completeFields['created'] = 1;
96
97
        $this->verifySort($afterSort, 'afterSort', $completeFields);
98
99
        //for the main query in get()
100
        $this->ensureIndex($completeFields);
101
    }
102
103
    /**
104
     * Ensure an index for the count() method.
105
     * Is a no-op if the generated index is a prefix of an existing one. If you have a similar ensureGetIndex call,
106
     * call it first.
107
     *
108
     * @param array $fields fields in count() call to index in same format as \MongoDB\Collection::createIndex()
109
     * @param bool $includeRunning whether to include the running field in the index
110
     *
111
     * @return void
112
     *
113
     * @throws \InvalidArgumentException key in $fields was not a string
114
     * @throws \InvalidArgumentException value of $fields is not 1 or -1 for ascending and descending
115
     */
116
    final public function ensureCountIndex(array $fields, bool $includeRunning)
117
    {
118
        $completeFields = [];
119
120
        if ($includeRunning) {
121
            $completeFields['earliestGet'] = 1;
122
        }
123
124
        $this->verifySort($fields, 'fields', $completeFields);
125
126
        $this->ensureIndex($completeFields);
127
    }
128
129
    /**
130
     * Get a non running message from the queue.
131
     *
132
     * @param array $query in same format as \MongoDB\Collection::find() where top level fields do not contain
133
     *                     operators. Lower level fields can however. eg: valid {a: {$gt: 1}, "b.c": 3},
134
     *                     invalid {$and: [{...}, {...}]}
135
     * @param array $options Associative array of get options.
136
     *                           runningResetDuration => integer
137
     *                               The duration (in miiliseconds) that the received messages are hidden from
138
     *                               subsequent retrieve requests after being retrieved by a get() request.
139
     *                           waitDurationInMillis => integer
140
     *                               The duration (in milliseconds) for which the call will wait for a message to
141
     *                               arrive in the queue before returning. If a message is available, the call will
142
     *                               return sooner than WaitTimeSeconds.
143
     *                           pollDurationInMillis => integer
144
     *                               The millisecond duration to wait between polls.
145
     *                           maxNumberOfMessages => integer
146
     *                               The maximum number of messages to return with get(). All of the messages are not
147
     *                               necessarily returned.
148
     *
149
     * @return array Array of messages.
150
     *
151
     * @throws \InvalidArgumentException key in $query was not a string
152
     */
153
    final public function get(array $query, array $options = []) : array
154
    {
155
        $options += self::DEFAULT_GET_OPTIONS;
156
157
        $completeQuery = $this->buildPayloadQuery(
158
            ['earliestGet' => ['$lte' => new UTCDateTime((int)(microtime(true) * 1000))]],
159
            $query
160
        );
161
162
        $resetTimestamp = $this->calcuateResetTimestamp($options['runningResetDuration']);
163
164
        $update = ['$set' => ['earliestGet' => new UTCDateTime($resetTimestamp)]];
165
166
        //ints overflow to floats, should be fine
167
        $end = microtime(true) + ($options['waitDurationInMillis'] / 1000.0);
168
        $sleepTime = $this->calculateSleepTime($options['pollDurationInMillis']);
169
170
        $messages = new ArrayObject();
171
172
        while (count($messages) < $options['maxNumberOfMessages']) {
173
            if ($this->tryFindOneAndUpdate($completeQuery, $update, $messages)) {
174
                continue;
175
            }
176
177
            if (microtime(true) < $end) {
178
                usleep($sleepTime);
179
            }
180
181
            break;
182
        }
183
184
        return $messages->getArrayCopy();
185
    }
186
187
    /**
188
     * Count queue messages.
189
     *
190
     * @param array $query in same format as \MongoDB\Collection::find() where top level fields do not contain
191
     *                     operators. Lower level fields can however. eg: valid {a: {$gt: 1}, "b.c": 3},
192
     *                     invalid {$and: [{...}, {...}]}
193
     * @param bool|null $running query a running message or not or all
194
     *
195
     * @return int the count
196
     *
197
     * @throws \InvalidArgumentException key in $query was not a string
198
     */
199
    final public function count(array $query, bool $running = null) : int
200
    {
201
        $totalQuery = [];
202
203
        if ($running === true || $running === false) {
204
            $key = $running ? '$gt' : '$lte';
205
            $totalQuery['earliestGet'] = [$key => new UTCDateTime((int)(microtime(true) * 1000))];
206
        }
207
208
        return $this->collection->count($this->buildPayloadQuery($totalQuery, $query));
209
    }
210
211
    /**
212
     * Acknowledge a message was processed and remove from queue.
213
     *
214
     * @param Message $message message received from get()
215
     *
216
     * @return void
217
     */
218
    final public function ack(Message $message)
219
    {
220
        $this->collection->deleteOne(['_id' => $message->getId()]);
221
    }
222
223
    /**
224
     * Atomically acknowledge and send a message to the queue.
225
     *
226
     * @param Message $message message received from get().
227
     *
228
     * @return void
229
     */
230
    final public function requeue(Message $message)
231
    {
232
        $set = [
233
            'payload' => $message->getPayload(),
234
            'earliestGet' => $message->getEarliestGet(),
235
            'priority' => $message->getPriority(),
236
            'created' => new UTCDateTime(),
237
        ];
238
239
        $this->collection->updateOne(['_id' => $message->getId()], ['$set' => $set], ['upsert' => true]);
240
    }
241
242
    /**
243
     * Send a message to the queue.
244
     *
245
     * @param Message $message The message to send.
246
     *
247
     * @return void
248
     */
249
    final public function send(Message $message)
250
    {
251
        $document = [
252
            '_id' => $message->getId(),
253
            'payload' => $message->getPayload(),
254
            'earliestGet' => $message->getEarliestGet(),
255
            'priority' => $message->getPriority(),
256
            'created' => new UTCDateTime(),
257
        ];
258
259
        $this->collection->insertOne($document);
260
    }
261
262
    /**
263
     * Ensure index of correct specification and a unique name whether the specification or name already exist or not.
264
     * Will not create index if $index is a prefix of an existing index
265
     *
266
     * @param array $index index to create in same format as \MongoDB\Collection::createIndex()
267
     *
268
     * @return void
269
     *
270
     * @throws \Exception couldnt create index after 5 attempts
271
     */
272
    final private function ensureIndex(array $index)
273
    {
274
        if ($this->isIndexIncludedInExistingIndex($index)) {
275
            return;
276
        }
277
278
        for ($i = 0; $i < 5; ++$i) {
279
            if ($this->tryCreateIndex($index)) {
280
                return;
281
            }
282
        }
283
284
        throw new \Exception('couldnt create index after 5 attempts');
285
        //@codeCoverageIgnoreEnd
286
    }
287
288
    private function buildPayloadQuery(array $initialQuery, array $payloadQuery)
289
    {
290
        foreach ($payloadQuery as $key => $value) {
291
            if (!is_string($key)) {
292
                throw new \InvalidArgumentException('key in $query was not a string');
293
            }
294
295
            $initialQuery["payload.{$key}"] = $value;
296
        }
297
298
        return $initialQuery;
299
    }
300
301
    private function calculateSleepTime(int $pollDurationInMillis) : int
302
    {
303
        $pollDurationInMillis = max($pollDurationInMillis, 0);
304
        $sleepTime = $pollDurationInMillis * 1000;
305
        //ints overflow to floats and already checked $pollDurationInMillis was positive
306
        return is_int($sleepTime) ? $sleepTime : PHP_INT_MAX;
307
    }
308
309
    private function calcuateResetTimeStamp(int $runningResetDuration) : int
310
    {
311
        $resetTimestamp = time() + $runningResetDuration;
312
        //ints overflow to floats
313
        if (!is_int($resetTimestamp)) {
314
            $resetTimestamp = $runningResetDuration > 0 ? self::MONGO_INT32_MAX : 0;
315
        }
316
317
        return min(max(0, $resetTimestamp * 1000), self::MONGO_INT32_MAX);
318
    }
319
320
    private function tryFindOneAndUpdate(array $query, array $update, ArrayObject $messages) : bool
321
    {
322
        $document = $this->collection->findOneAndUpdate($query, $update, self::FIND_ONE_AND_UPDATE_OPTIONS);
323
        if ($document === null) {
324
            return false;
325
        }
326
327
        $messages[] = new Message(
328
            $document['_id'],
329
            $document['payload'],
330
            $document['earliestGet'],
331
            $document['priority']
332
        );
333
334
        return true;
335
    }
336
337
    private function isIndexIncludedInExistingIndex(array $index) : bool
338
    {
339
        //if $index is a prefix of any existing index we are good
340
        foreach ($this->collection->listIndexes() as $existingIndex) {
341
            $slice = array_slice($existingIndex['key'], 0, count($index), true);
342
            if ($slice === $index) {
343
                return true;
344
            }
345
        }
346
347
        return false;
348
    }
349
350
    private function tryCreateIndex(array $index) : bool
351
    {
352
        for ($name = uniqid(); strlen($name) > 0; $name = substr($name, 0, -1)) {
353
            if ($this->tryCreateNamedIndex($index, $name)) {
354
                return true;
355
            }
356
        }
357
358
        return false;
359
    }
360
361
    private function tryCreateNamedIndex(array $index, string $name) : bool
362
    {
363
        //creating an index with same name and different spec does nothing.
364
        //creating an index with same spec and different name does nothing.
365
        //so we use any generated name, and then find the right spec after we have called,
366
        //and just go with that name.
367
        try {
368
            $this->collection->createIndex($index, ['name' => $name, 'background' => true]);
369
        } catch (\MongoDB\Exception\Exception $e) {
370
            //this happens when the name was too long, let continue
371
        }
372
373
        return $this->indexExists($index);
374
    }
375
376
    private function indexExists(array $index) : bool
377
    {
378
        foreach ($this->collection->listIndexes() as $existingIndex) {
379
            if ($existingIndex['key'] === $index) {
380
                return true;
381
            }
382
        }
383
384
        return false;
385
    }
386
387
    /**
388
     * Helper method to validate keys and values for the given sort array
389
     *
390
     * @param array  $sort             The proposed sort for a mongo index.
391
     * @param string $label            The name of the variable given to the public ensureXIndex method.
392
     * @param array  &$completedFields The final index array with payload. prefix added to fields.
393
     *
394
     * @return void
395
     */
396
    final private function verifySort(array $sort, string $label, array &$completeFields)
397
    {
398
        foreach ($sort as $key => $value) {
399
            $this->throwIfTrue(!is_string($key), "key in \${$label} was not a string");
400
            $this->throwIfTrue(
401
                $value !== 1 && $value !== -1,
402
                "value of \${$label} is not 1 or -1 for ascending and descending"
403
            );
404
405
            $completeFields["payload.{$key}"] = $value;
406
        }
407
    }
408
409
    private function throwIfTrue(
410
        bool $condition,
411
        string $message,
412
        string $exceptionClass = '\\InvalidArgumentException'
413
    ) {
414
        if ($condition === true) {
415
            $reflectionClass = new \ReflectionClass($exceptionClass);
416
            throw $reflectionClass->newInstanceArgs([$message]);
417
        }
418
    }
419
}
420