Completed
Pull Request — master (#60)
by Chad
01:24
created

QueueTest::resetStuck()   B

Complexity

Conditions 1
Paths 1

Size

Total Lines 35
Code Lines 20

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
dl 0
loc 35
rs 8.8571
c 0
b 0
f 0
cc 1
eloc 20
nc 1
nop 0
1
<?php
2
3
namespace TraderInteractive\Mongo;
4
5
use MongoDB\BSON\ObjectId;
6
use MongoDB\BSON\UTCDateTime;
7
use MongoDB\Client;
8
use PHPUnit\Framework\TestCase;
9
10
/**
11
 * @coversDefaultClass \TraderInteractive\Mongo\Queue
12
 * @covers ::<private>
13
 * @covers ::__construct
14
 */
15
final class QueueTest extends TestCase
16
{
17
    private $collection;
18
    private $mongoUrl;
19
    private $queue;
20
21
    public function setUp()
22
    {
23
        $this->mongoUrl = getenv('TESTING_MONGO_URL') ?: 'mongodb://localhost:27017';
24
        $mongo = new Client(
25
            $this->mongoUrl,
26
            [],
27
            ['typeMap' => ['root' => 'array', 'document' => 'array', 'array' => 'array']]
28
        );
29
        $this->collection = $mongo->selectDatabase('testing')->selectCollection('messages');
30
        $this->collection->deleteMany([]);
31
32
        $this->queue = new Queue($this->collection);
33
    }
34
35
    public function tearDown()
36
    {
37
        (new Client($this->mongoUrl))->dropDatabase('testing');
38
    }
39
40
    /**
41
     * @test
42
     * @covers ::__construct
43
     * @expectedException \InvalidArgumentException
44
     */
45
    public function constructWithNonStringUrl()
46
    {
47
        new Queue(1, 'testing', 'messages');
48
    }
49
50
    /**
51
     * @test
52
     * @covers ::ensureGetIndex
53
     */
54
    public function ensureGetIndex()
55
    {
56
        $collection = (new Client($this->mongoUrl))->selectDatabase('testing')->selectCollection(uniqid());
57
        $queue = new Queue($collection);
58
        $queue->ensureGetIndex(['type' => 1], ['boo' => -1]);
59
        $queue->ensureGetIndex(['another.sub' => 1]);
60
61
        $indexes = iterator_to_array($collection->listIndexes());
62
        $this->assertSame(3, count($indexes));
63
64
        $expectedOne = [
65
            'earliestGet' => 1,
66
            'payload.type' => 1,
67
            'priority' => 1,
68
            'created' => 1,
69
            'payload.boo' => -1,
70
        ];
71
        $this->assertSame($expectedOne, $indexes[1]['key']);
72
73
        $expectedTwo = [
74
            'earliestGet' => 1,
75
            'payload.another.sub' => 1,
76
            'priority' => 1,
77
            'created' => 1,
78
        ];
79
        $this->assertSame($expectedTwo, $indexes[2]['key']);
80
    }
81
82
    /**
83
     * @test
84
     * @covers ::ensureGetIndex
85
     * @expectedException \Exception
86
     */
87
    public function ensureGetIndexWithTooLongCollectionName()
88
    {
89
        $collectionName = 'messages012345678901234567890123456789012345678901234567890123456789';
90
        $collectionName .= '012345678901234567890123456789012345678901234567890123456789';//128 chars
91
92
        $queue = new Queue($this->mongoUrl, 'testing', $collectionName);
93
        $queue->ensureGetIndex([]);
94
    }
95
96
    /**
97
     * @test
98
     * @covers ::ensureGetIndex
99
     * @expectedException \InvalidArgumentException
100
     */
101
    public function ensureGetIndexWithNonStringBeforeSortKey()
102
    {
103
        $this->queue->ensureGetIndex([0 => 1]);
104
    }
105
106
    /**
107
     * @test
108
     * @covers ::ensureGetIndex
109
     * @expectedException \InvalidArgumentException
110
     */
111
    public function ensureGetIndexWithNonStringAfterSortKey()
112
    {
113
        $this->queue->ensureGetIndex(['field' => 1], [0 => 1]);
114
    }
115
116
    /**
117
     * @test
118
     * @covers ::ensureGetIndex
119
     * @expectedException \InvalidArgumentException
120
     */
121
    public function ensureGetIndexWithBadBeforeSortValue()
122
    {
123
        $this->queue->ensureGetIndex(['field' => 'NotAnInt']);
124
    }
125
126
    /**
127
     * @test
128
     * @covers ::ensureGetIndex
129
     * @expectedException \InvalidArgumentException
130
     */
131
    public function ensureGetIndexWithBadAfterSortValue()
132
    {
133
        $this->queue->ensureGetIndex([], ['field' => 'NotAnInt']);
134
    }
135
136
    /**
137
     * Verifies the behaviour of the Queue when it cannot create an index after 5 attempts.
138
     *
139
     * @test
140
     * @covers ::ensureGetIndex
141
     * @expectedException \Exception
142
     * @expectedExceptionMessage couldnt create index after 5 attempts
143
     */
144
    public function ensureIndexCannotBeCreatedAfterFiveAttempts()
145
    {
146
        $mockCollection = $this->getMockBuilder('\MongoDB\Collection')->disableOriginalConstructor()->getMock();
147
148
        $mockCollection->method('listIndexes')->willReturn([]);
149
150
        $queue = new Queue($mockCollection);
151
        $queue->ensureCountIndex(['type' => 1], false);
152
    }
153
154
    /**
155
     * @test
156
     * @covers ::ensureCountIndex
157
     */
158
    public function ensureCountIndex()
159
    {
160
        $collection = (new Client($this->mongoUrl))->selectDatabase('testing')->selectCollection(uniqid());
161
        $queue = new Queue($collection);
162
        $queue->ensureCountIndex(['type' => 1, 'boo' => -1], false);
163
        $queue->ensureCountIndex(['another.sub' => 1], true);
164
165
        $indexes = iterator_to_array($collection->listIndexes());
166
        $this->assertSame(3, count($indexes));
167
168
        $expectedOne = ['payload.type' => 1, 'payload.boo' => -1];
169
        $this->assertSame($expectedOne, $indexes[1]['key']);
170
171
        $expectedTwo = ['earliestGet' => 1, 'payload.another.sub' => 1];
172
        $this->assertSame($expectedTwo, $indexes[2]['key']);
173
    }
174
175
    /**
176
     * @test
177
     * @covers ::ensureCountIndex
178
     */
179
    public function ensureCountIndexWithPrefixOfPrevious()
180
    {
181
        $this->queue->ensureCountIndex(['type' => 1, 'boo' => -1], false);
182
        $this->queue->ensureCountIndex(['type' => 1], false);
183
184
        $indexes = iterator_to_array($this->collection->listIndexes());
185
        $this->assertSame(2, count($indexes));
186
187
        $expected = ['payload.type' => 1, 'payload.boo' => -1];
188
        $this->assertSame($expected, $indexes[1]['key']);
189
    }
190
191
    /**
192
     * @test
193
     * @covers ::ensureCountIndex
194
     * @expectedException \InvalidArgumentException
195
     */
196
    public function ensureCountIndexWithNonStringKey()
197
    {
198
        $this->queue->ensureCountIndex([0 => 1], false);
199
    }
200
201
    /**
202
     * @test
203
     * @covers ::ensureCountIndex
204
     * @expectedException \InvalidArgumentException
205
     */
206
    public function ensureCountIndexWithBadValue()
207
    {
208
        $this->queue->ensureCountIndex(['field' => 'NotAnInt'], false);
209
    }
210
211
    /**
212
     * @test
213
     * @covers ::get
214
     */
215
    public function getByBadQuery()
216
    {
217
        $this->queue->send($this->getMessage(['key1' => 0, 'key2' => true]));
218
219
        $result = $this->queue->get(['key3' => 0]);
220
        $this->assertSame([], $result);
221
222
        $this->assertSame(1, $this->collection->count());
223
    }
224
225
    /**
226
     * @test
227
     * @covers ::get
228
     *
229
     * @return void
230
     */
231
    public function getWithOverflowResetTimestamp()
232
    {
233
        $this->queue->send($this->getMessage());
234
        $message = $this->queue->get([], ['runningResetDuration' => PHP_INT_MAX])[0];
235
        $this->assertEquals(new UTCDateTime(PHP_INT_MAX), $message->getEarliestGet());
236
    }
237
238
    /**
239
     * @test
240
     * @covers ::get
241
     */
242
    public function getWithNegativePollDuration()
243
    {
244
        $this->queue->send($this->getMessage(['key1' => 0]));
245
        $this->assertNotNull($this->queue->get([], ['pollDurationInMillis' => -1]));
246
    }
247
248
    /**
249
     * @test
250
     * @covers ::get
251
     * @expectedException \InvalidArgumentException
252
     */
253
    public function getWithNonStringKey()
254
    {
255
        $this->queue->get([0 => 'a value']);
256
    }
257
258
    /**
259
     * @test
260
     * @covers ::get
261
     */
262
    public function getByFullQuery()
263
    {
264
        $messageOne = $this->getMessage(['key1' => 0, 'key2' => true]);
265
266
        $this->queue->send($messageOne);
267
        $this->queue->send($this->getMessage(['key' => 'value']));
268
269
        $result = $this->queue->get($messageOne->getPayload())[0];
270
271
        $this->assertSame((string)$messageOne->getId(), (string)$result->getId());
272
        $this->assertSame($messageOne->getPayload(), $result->getPayload());
273
    }
274
275
    /**
276
     * @test
277
     * @covers ::get
278
     */
279
    public function getBySubDocQuery()
280
    {
281
        $expected = $this->getMessage(
282
            [
283
                'one' => [
284
                    'two' => [
285
                        'three' => 5,
286
                        'notused' => 'notused',
287
                    ],
288
                    'notused' => 'notused',
289
                ],
290
                'notused' => 'notused',
291
            ]
292
        );
293
294
        $this->queue->send($this->getMessage(['key1' => 0, 'key2' => true]));
295
        $this->queue->send($expected);
296
297
        $actual = $this->queue->get(['one.two.three' => ['$gt' => 4]])[0];
298
        $this->assertSameMessage($expected, $actual);
299
    }
300
301
    /**
302
     * @test
303
     * @covers ::get
304
     */
305
    public function getBeforeAck()
306
    {
307
        $messageOne = $this->getMessage(['key1' => 0, 'key2' => true]);
308
309
        $this->queue->send($messageOne);
310
        $this->queue->send($this->getMessage(['key' => 'value']));
311
312
        $this->queue->get($messageOne->getPayload());
313
314
        //try get message we already have before ack
315
        $result = $this->queue->get($messageOne->getPayload());
316
        $this->assertSame([], $result);
317
    }
318
319
    /**
320
     * @test
321
     * @covers ::get
322
     */
323
    public function getWithCustomPriority()
324
    {
325
        $messageOne = $this->getMessage(['key' => 0], null, 0.5);
326
        $messageTwo = $this->getMessage(['key' => 1], null, 0.4);
327
        $messageThree = $this->getMessage(['key' => 2], null, 0.3);
328
329
        $this->queue->send($messageOne);
330
        $this->queue->send($messageTwo);
331
        $this->queue->send($messageThree);
332
333
        $messages = $this->queue->get([], ['maxNumberOfMessages' => 10]);
334
335
        $this->assertSameMessage($messageThree, $messages[0]);
336
        $this->assertSameMessage($messageTwo, $messages[1]);
337
        $this->assertSameMessage($messageOne, $messages[2]);
338
    }
339
340
    /**
341
     * @test
342
     * @covers ::get
343
     */
344
    public function getWithTimeBasedPriority()
345
    {
346
        $messageOne = $this->getMessage(['key' => 0]);
347
        $messageTwo = $this->getMessage(['key' => 1]);
348
        $messageThree = $this->getMessage(['key' => 2]);
349
350
        $this->queue->send($messageOne);
351
        $this->queue->send($messageTwo);
352
        $this->queue->send($messageThree);
353
354
        $messages = $this->queue->get([], ['maxNumberOfMessages' => 10]);
355
356
        $this->assertSameMessage($messageOne, $messages[0]);
357
        $this->assertSameMessage($messageTwo, $messages[1]);
358
        $this->assertSameMessage($messageThree, $messages[2]);
359
    }
360
361
    /**
362
     * @test
363
     * @covers ::get
364
     */
365
    public function getWait()
366
    {
367
        $start = microtime(true);
368
369
        $this->queue->get([]);
370
371
        $end = microtime(true);
372
373
        $this->assertTrue($end - $start >= 0.200);
374
        $this->assertTrue($end - $start < 0.300);
375
    }
376
377
    /**
378
     * @test
379
     * @covers ::get
380
     */
381
    public function earliestGet()
382
    {
383
         $messageOne = $this->getMessage(
384
             ['key1' => 0, 'key2' => true],
385
             new UTCDateTime((time() + 1) * 1000)
386
         );
387
388
         $this->queue->send($messageOne);
389
390
         $this->assertSame([], $this->queue->get($messageOne->getPayload()));
391
392
         sleep(1);
393
394
         $this->assertCount(1, $this->queue->get($messageOne->getPayload()));
395
    }
396
397
    /**
398
     * @test
399
     * @covers ::get
400
     */
401
    public function resetStuck()
402
    {
403
        $messageOne = $this->getMessage(['key' => 0]);
404
        $messageTwo = $this->getMessage(['key' => 1]);
405
406
        $this->queue->send($messageOne);
407
        $this->queue->send($messageTwo);
408
409
        //sets to running
410
        $this->collection->updateOne(
411
            ['payload.key' => 0],
412
            ['$set' => ['earliestGet' => new UTCDateTime(time() * 1000)]]
413
        );
414
        $this->collection->updateOne(
415
            ['payload.key' => 1],
416
            ['$set' => ['earliestGet' => new UTCDateTime(time() * 1000)]]
417
        );
418
419
        $this->assertSame(
420
            2,
421
            $this->collection->count(
422
                ['earliestGet' => ['$lte' => new UTCDateTime((int)(microtime(true) * 1000))]]
423
            )
424
        );
425
426
        //resets and gets messageOne
427
        $this->assertNotNull($this->queue->get($messageOne->getPayload()));
428
429
        $this->assertSame(
430
            1,
431
            $this->collection->count(
432
                ['earliestGet' => ['$lte' => new UTCDateTime((int)(microtime(true) * 1000))]]
433
            )
434
        );
435
    }
436
437
    /**
438
     * @test
439
     * @covers ::count
440
     * @expectedException \InvalidArgumentException
441
     */
442
    public function countWithNonStringKey()
443
    {
444
        $this->queue->count([0 => 'a value']);
445
    }
446
447
    /**
448
     * @test
449
     * @covers ::count
450
     */
451
    public function testCount()
452
    {
453
        $message = $this->getMessage(['boo' => 'scary']);
454
455
        $this->assertSame(0, $this->queue->count($message->getPayload(), true));
456
        $this->assertSame(0, $this->queue->count($message->getPayload(), false));
457
        $this->assertSame(0, $this->queue->count($message->getPayload()));
458
459
        $this->queue->send($message);
460
        $this->assertSame(1, $this->queue->count($message->getPayload(), false));
461
        $this->assertSame(0, $this->queue->count($message->getPayload(), true));
462
        $this->assertSame(1, $this->queue->count($message->getPayload()));
463
464
        $this->queue->get($message->getPayload());
465
        $this->assertSame(0, $this->queue->count($message->getPayload(), false));
466
        $this->assertSame(1, $this->queue->count($message->getPayload(), true));
467
        $this->assertSame(1, $this->queue->count($message->getPayload()));
468
    }
469
470
    /**
471
     * @test
472
     * @covers ::ack
473
     */
474
    public function ack()
475
    {
476
        $messageOne = $this->getMessage(['key1' => 0, 'key2' => true]);
477
478
        $this->queue->send($messageOne);
479
        $this->queue->send($this->getMessage(['key' => 'value']));
480
481
        $result = $this->queue->get($messageOne->getPayload())[0];
482
        $this->assertSame(2, $this->collection->count());
483
484
        $this->queue->ack($result);
485
        $this->assertSame(1, $this->collection->count());
486
    }
487
488
    /**
489
     * @test
490
     * @covers ::requeue
491
     */
492
    public function requeue()
493
    {
494
        $messages = [
495
            $this->getMessage(['id' => 1, 'key' => 'value']),
496
            $this->getMessage(['id' => 2, 'key' => 'value']),
497
            $this->getMessage(['id' => 3, 'key' => 'value']),
498
        ];
499
500
        foreach ($messages as $message) {
501
            $this->queue->send($message);
502
        }
503
504
        $query = ['key' => 'value'];
505
506
        $message = $this->queue->get($query)[0];
507
508
        $this->assertSameMessage($messages[0], $message);
509
510
        $this->queue->requeue($message->withEarliestGet(new UTCDateTime((int)(microtime(true) * 1000))));
511
512
        $actual = $this->queue->get([], ['maxNumberOfMessages' => 10]);
513
514
        $this->assertSameMessage($messages[1], $actual[0]);
515
        $this->assertSameMessage($messages[2], $actual[1]);
516
        $this->assertSameMessage($messages[0], $actual[2]);
517
    }
518
519
    /**
520
     * @test
521
     * @covers ::send
522
     */
523
    public function send()
524
    {
525
        $message = $this->getMessage(['key1' => 0, 'key2' => true], new UTCDateTime(34 * 1000), 0.8);
526
        $this->queue->send($message);
527
        $this->assertSingleMessage($message);
528
    }
529
530
    /**
531
     * @test
532
     * @covers ::send
533
     */
534
    public function sendWithHighEarliestGet()
535
    {
536
        $message = $this->getMessage([], new UTCDateTime(PHP_INT_MAX));
537
        $this->queue->send($message);
538
        $this->assertSingleMessage($message);
539
    }
540
541
    /**
542
     * @test
543
     * @covers ::send
544
     */
545
    public function sendWithLowEarliestGet()
546
    {
547
        $message = $this->getMessage([], new UTCDateTime(0));
548
        $this->queue->send($message);
549
        $this->assertSingleMessage($message);
550
    }
551
552
    private function assertSameMessage(Message $expected, Message $actual)
553
    {
554
        $this->assertSame((string)$expected->getId(), (string)$actual->getId());
555
        $this->assertSame($expected->getPayload(), $actual->getPayload());
556
        $this->assertSame($expected->getPriority(), $actual->getPriority());
557
    }
558
559
    private function assertSingleMessage(Message $expected)
560
    {
561
        $this->assertSame(1, $this->collection->count());
562
563
        $actual = $this->collection->findOne(
564
            [],
565
            ['typeMap' => ['root' => 'array', 'document' => 'array', 'array' => 'array']]
566
        );
567
568
        $this->assertSame((string)$expected->getId(), (string)$actual['_id']);
569
        $this->assertSame($expected->getPayload(), $actual['payload']);
570
        $this->assertSame($expected->getPriority(), $actual['priority']);
571
        $this->assertEquals($expected->getEarliestGet(), $actual['earliestGet']);
572
    }
573
574
    private function getMessage(array $payload = [], UTCDateTime $earliestGet = null, float $priority = 0.0)
575
    {
576
        return new Message(new ObjectId(), $payload, $earliestGet ?? new UTCDateTime(0), $priority);
577
    }
578
}
579