Issues (8)

src/AbstractQueue.php (1 issue)

Severity
1
<?php
2
/**
3
 * Defines the TraderInteractive\Mongo\Queue class.
4
 */
5
6
namespace TraderInteractive\Mongo;
7
8
use ArrayObject;
9
use MongoDB\BSON\ObjectID;
10
use MongoDB\BSON\UTCDateTime;
11
use MongoDB\Operation\FindOneAndUpdate;
12
13
/**
14
 * Abstraction of mongo db collection as priority queue.
15
 *
16
 * Tied priorities are ordered by time. So you may use a single priority for normal queuing (default args exist for
17
 * this purpose).  Using a random priority achieves random get()
18
 */
19
abstract class AbstractQueue
20
{
21
    /**
22
     * Maximum millisecond value to use for UTCDateTime creation.
23
     *
24
     * @var integer
25
     */
26
    const MONGO_INT32_MAX = PHP_INT_MAX;
27
28
    /**
29
     * mongo collection to use for queue.
30
     *
31
     * @var \MongoDB\Collection
32
     */
33
    protected $collection;
34
35
    /**
36
     * @var array
37
     */
38
    const FIND_ONE_AND_UPDATE_OPTIONS = [
39
        'sort' => ['priority' => 1, 'created' => 1],
40
        'typeMap' => ['root' => 'array', 'document' => 'array', 'array' => 'array'],
41
        'returnDocument' => FindOneAndUpdate::RETURN_DOCUMENT_AFTER,
42
    ];
43
44
    /**
45
     * @var integer
46
     */
47
    const DEFAULT_MAX_NUMBER_OF_MESSAGES = 1;
48
49
    /**
50
     * @var integer
51
     */
52
    const DEFAULT_RUNNING_RESET_DURATION = 600000;
53
54
    /**
55
     * @var integer
56
     */
57
    const DEFAULT_WAIT_DURATION_IN_MILLISECONDS = 3000;
58
59
    /**
60
     * @var integer
61
     */
62
    const DEFAULT_POLL_DURATION_IN_MILLISECONDS = 200;
63
64
    /**
65
     * @var array
66
     */
67
    const DEFAULT_GET_OPTIONS = [
68
       'maxNumberOfMessages' => self::DEFAULT_MAX_NUMBER_OF_MESSAGES,
69
       'runningResetDuration' => self::DEFAULT_RUNNING_RESET_DURATION,
70
       'waitDurationInMillis' => self::DEFAULT_WAIT_DURATION_IN_MILLISECONDS,
71
       'pollDurationInMillis' => self::DEFAULT_POLL_DURATION_IN_MILLISECONDS,
72
    ];
73
74
    /**
75
     * Ensure an index for the get() method.
76
     *
77
     * @param array $beforeSort Fields in get() call to index before the sort field in same format
78
     *                          as \MongoDB\Collection::ensureIndex()
79
     * @param array $afterSort  Fields in get() call to index after the sort field in same format as
80
     *                          \MongoDB\Collection::ensureIndex()
81
     *
82
     * @return void
83
     *
84
     * @throws \InvalidArgumentException value of $beforeSort or $afterSort is not 1 or -1 for ascending and descending
85
     * @throws \InvalidArgumentException key in $beforeSort or $afterSort was not a string
86
     */
87
    final public function ensureGetIndex(array $beforeSort = [], array $afterSort = [])
88
    {
89
        //using general rule: equality, sort, range or more equality tests in that order for index
90
        $completeFields = ['earliestGet' => 1];
91
92
        $this->verifySort($beforeSort, 'beforeSort', $completeFields);
93
94
        $completeFields['priority'] = 1;
95
        $completeFields['created'] = 1;
96
97
        $this->verifySort($afterSort, 'afterSort', $completeFields);
98
99
        //for the main query in get()
100
        $this->ensureIndex($completeFields);
101
    }
102
103
    /**
104
     * Ensure an index for the count() method.
105
     * Is a no-op if the generated index is a prefix of an existing one. If you have a similar ensureGetIndex call,
106
     * call it first.
107
     *
108
     * @param array $fields fields in count() call to index in same format as \MongoDB\Collection::createIndex()
109
     * @param bool $includeRunning whether to include the running field in the index
110
     *
111
     * @return void
112
     *
113
     * @throws \InvalidArgumentException key in $fields was not a string
114
     * @throws \InvalidArgumentException value of $fields is not 1 or -1 for ascending and descending
115
     */
116
    final public function ensureCountIndex(array $fields, bool $includeRunning)
117
    {
118
        $completeFields = [];
119
120
        if ($includeRunning) {
121
            $completeFields['earliestGet'] = 1;
122
        }
123
124
        $this->verifySort($fields, 'fields', $completeFields);
125
126
        $this->ensureIndex($completeFields);
127
    }
128
129
    /**
130
     * Get a non running message from the queue.
131
     *
132
     * @param array $query in same format as \MongoDB\Collection::find() where top level fields do not contain
133
     *                     operators. Lower level fields can however. eg: valid {a: {$gt: 1}, "b.c": 3},
134
     *                     invalid {$and: [{...}, {...}]}
135
     * @param array $options Associative array of get options.
136
     *                           runningResetDuration => integer
137
     *                               The duration (in miiliseconds) that the received messages are hidden from
138
     *                               subsequent retrieve requests after being retrieved by a get() request.
139
     *                           waitDurationInMillis => integer
140
     *                               The duration (in milliseconds) for which the call will wait for a message to
141
     *                               arrive in the queue before returning. If a message is available, the call will
142
     *                               return sooner than WaitTimeSeconds.
143
     *                           pollDurationInMillis => integer
144
     *                               The millisecond duration to wait between polls.
145
     *                           maxNumberOfMessages => integer
146
     *                               The maximum number of messages to return with get(). All of the messages are not
147
     *                               necessarily returned.
148
     *
149
     * @return array Array of messages.
150
     *
151
     * @throws \InvalidArgumentException key in $query was not a string
152
     */
153
    final public function get(array $query, array $options = []) : array
154
    {
155
        $completeQuery = $this->buildPayloadQuery(
156
            ['earliestGet' => ['$lte' => new UTCDateTime((int)(microtime(true) * 1000))]],
157
            $query
158
        );
159
160
        $options += static::DEFAULT_GET_OPTIONS;
161
        $update = ['$set' => ['earliestGet' => $this->calculateEarliestGet($options['runningResetDuration'])]];
162
        $end = $this->calculateEndTime($options['waitDurationInMillis']);
163
        $sleepTime = $this->calculateSleepTime($options['pollDurationInMillis']);
164
        $messages = new ArrayObject();
165
        while (count($messages) < $options['maxNumberOfMessages']) {
166
            if ($this->tryFindOneAndUpdate($completeQuery, $update, $messages)) {
167
                continue;
168
            }
169
170
            if (microtime(true) < $end) {
171
                usleep($sleepTime);
172
            }
173
174
            break;
175
        }
176
177
        return $messages->getArrayCopy();
178
    }
179
180
    /**
181
     * Count queue messages.
182
     *
183
     * @param array $query in same format as \MongoDB\Collection::find() where top level fields do not contain
184
     *                     operators. Lower level fields can however. eg: valid {a: {$gt: 1}, "b.c": 3},
185
     *                     invalid {$and: [{...}, {...}]}
186
     * @param bool|null $running query a running message or not or all
187
     *
188
     * @return int the count
189
     *
190
     * @throws \InvalidArgumentException key in $query was not a string
191
     */
192
    final public function count(array $query, bool $running = null) : int
193
    {
194
        $totalQuery = [];
195
196
        if ($running === true || $running === false) {
197
            $key = $running ? '$gt' : '$lte';
198
            $totalQuery['earliestGet'] = [$key => new UTCDateTime((int)(microtime(true) * 1000))];
199
        }
200
201
        return $this->collection->countDocuments($this->buildPayloadQuery($totalQuery, $query));
202
    }
203
204
    /**
205
     * Acknowledge a message was processed and remove from queue.
206
     *
207
     * @param Message $message message received from get()
208
     *
209
     * @return void
210
     */
211
    final public function ack(Message $message)
212
    {
213
        $this->collection->deleteOne(['_id' => $message->getId()]);
214
    }
215
216
    /**
217
     * Atomically acknowledge and send a message to the queue.
218
     *
219
     * @param Message $message message received from get().
220
     *
221
     * @return void
222
     */
223
    final public function requeue(Message $message)
224
    {
225
        $set = [
226
            'payload' => $message->getPayload(),
227
            'earliestGet' => $message->getEarliestGet(),
228
            'priority' => $message->getPriority(),
229
            'machineName' => gethostname(),
230
            'created' => new UTCDateTime(),
231
        ];
232
233
        $this->collection->updateOne(['_id' => $message->getId()], ['$set' => $set], ['upsert' => true]);
234
    }
235
236
    /**
237
     * Send a message to the queue.
238
     *
239
     * @param Message $message The message to send.
240
     *
241
     * @return void
242
     */
243
    final public function send(Message $message)
244
    {
245
        $document = [
246
            '_id' => $message->getId(),
247
            'payload' => $message->getPayload(),
248
            'earliestGet' => $message->getEarliestGet(),
249
            'priority' => $message->getPriority(),
250
            'machineName' => gethostname(),
251
            'created' => new UTCDateTime(),
252
        ];
253
254
        $this->collection->insertOne($document);
255
    }
256
257
    /**
258
     * Ensure index of correct specification and a unique name whether the specification or name already exist or not.
259
     * Will not create index if $index is a prefix of an existing index
260
     *
261
     * @param array $index index to create in same format as \MongoDB\Collection::createIndex()
262
     *
263
     * @return void
264
     *
265
     * @throws \Exception couldnt create index after 5 attempts
266
     */
267
    private function ensureIndex(array $index)
268
    {
269
        if ($this->isIndexIncludedInExistingIndex($index)) {
270
            return;
271
        }
272
273
        for ($i = 0; $i < 5; ++$i) {
274
            if ($this->tryCreateIndex($index)) {
275
                return;
276
            }
277
        }
278
279
        throw new \Exception('couldnt create index after 5 attempts');
280
        //@codeCoverageIgnoreEnd
281
    }
282
283
    private function buildPayloadQuery(array $initialQuery, array $payloadQuery)
284
    {
285
        foreach ($payloadQuery as $key => $value) {
286
            if (!is_string($key)) {
287
                throw new \InvalidArgumentException('key in $query was not a string');
288
            }
289
290
            $initialQuery["payload.{$key}"] = $value;
291
        }
292
293
        return $initialQuery;
294
    }
295
296
    private function calculateSleepTime(int $pollDurationInMillis) : int
297
    {
298
        $pollDurationInMillis = max($pollDurationInMillis, 0);
299
        $sleepTime = $pollDurationInMillis * 1000;
300
        //ints overflow to floats and already checked $pollDurationInMillis was positive
301
        return is_int($sleepTime) ? $sleepTime : PHP_INT_MAX;
0 ignored issues
show
The condition is_int($sleepTime) is always true.
Loading history...
302
    }
303
304
    private function calculateEarliestGet(int $runningResetDuration) : UTCDateTime
305
    {
306
        $resetTimestamp = time() + $runningResetDuration;
307
        //ints overflow to floats, max at PHP_INT_MAX
308
        return new UTCDateTime(min(max(0, $resetTimestamp * 1000), static::MONGO_INT32_MAX));
309
    }
310
311
    private function tryFindOneAndUpdate(array $query, array $update, ArrayObject $messages) : bool
312
    {
313
        $document = $this->collection->findOneAndUpdate($query, $update, static::FIND_ONE_AND_UPDATE_OPTIONS);
314
        if ($document === null) {
315
            return false;
316
        }
317
318
        $messages[] = new Message(
319
            $document['_id'],
320
            $document['payload'],
321
            $document['earliestGet'],
322
            $document['priority']
323
        );
324
325
        return true;
326
    }
327
328
    private function isIndexIncludedInExistingIndex(array $index) : bool
329
    {
330
        //if $index is a prefix of any existing index we are good
331
        foreach ($this->collection->listIndexes() as $existingIndex) {
332
            $slice = array_slice($existingIndex['key'], 0, count($index), true);
333
            if ($slice === $index) {
334
                return true;
335
            }
336
        }
337
338
        return false;
339
    }
340
341
    private function tryCreateIndex(array $index) : bool
342
    {
343
        for ($name = uniqid(); strlen($name) > 0; $name = substr($name, 0, -1)) {
344
            if ($this->tryCreateNamedIndex($index, $name)) {
345
                return true;
346
            }
347
        }
348
349
        return false;
350
    }
351
352
    private function tryCreateNamedIndex(array $index, string $name) : bool
353
    {
354
        //creating an index with same name and different spec does nothing.
355
        //creating an index with same spec and different name does nothing.
356
        //so we use any generated name, and then find the right spec after we have called,
357
        //and just go with that name.
358
        try {
359
            $this->collection->createIndex($index, ['name' => $name, 'background' => true]);
360
        } catch (\MongoDB\Exception\Exception $e) {
361
            //this happens when the name was too long, let continue
362
        }
363
364
        return $this->indexExists($index);
365
    }
366
367
    private function indexExists(array $index) : bool
368
    {
369
        foreach ($this->collection->listIndexes() as $existingIndex) {
370
            if ($existingIndex['key'] === $index) {
371
                return true;
372
            }
373
        }
374
375
        return false;
376
    }
377
378
    /**
379
     * Helper method to validate keys and values for the given sort array
380
     *
381
     * @param array  $sort             The proposed sort for a mongo index.
382
     * @param string $label            The name of the variable given to the public ensureXIndex method.
383
     * @param array  &$completedFields The final index array with payload. prefix added to fields.
384
     *
385
     * @return void
386
     */
387
    private function verifySort(array $sort, string $label, array &$completeFields)
388
    {
389
        foreach ($sort as $key => $value) {
390
            $this->throwIfTrue(!is_string($key), "key in \${$label} was not a string");
391
            $this->throwIfTrue(
392
                $value !== 1 && $value !== -1,
393
                "value of \${$label} is not 1 or -1 for ascending and descending"
394
            );
395
396
            $completeFields["payload.{$key}"] = $value;
397
        }
398
    }
399
400
    private function throwIfTrue(
401
        bool $condition,
402
        string $message,
403
        string $exceptionClass = '\\InvalidArgumentException'
404
    ) {
405
        if ($condition === true) {
406
            $reflectionClass = new \ReflectionClass($exceptionClass);
407
            throw $reflectionClass->newInstanceArgs([$message]);
408
        }
409
    }
410
411
    private function calculateEndTime(int $waitDurationInMillis) : float
412
    {
413
        return microtime(true) + ($waitDurationInMillis / 1000.0);
414
    }
415
}
416