Completed
Pull Request — master (#60)
by Chad
01:24
created

QueueTest::resetStuck()   B

Complexity

Conditions 1
Paths 1

Size

Total Lines 35
Code Lines 20

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
dl 0
loc 35
rs 8.8571
c 0
b 0
f 0
cc 1
eloc 20
nc 1
nop 0
1
<?php
2
3
namespace TraderInteractive\Mongo;
4
5
use MongoDB\BSON\ObjectId;
6
use MongoDB\BSON\UTCDateTime;
7
use MongoDB\Client;
8
use PHPUnit\Framework\TestCase;
9
10
/**
11
 * @coversDefaultClass \TraderInteractive\Mongo\Queue
12
 * @covers ::<private>
13
 * @covers ::__construct
14
 */
15
final class QueueTest extends TestCase
16
{
17
    private $collection;
18
    private $mongoUrl;
19
    private $queue;
20
21
    public function setUp()
22
    {
23
        $this->mongoUrl = getenv('TESTING_MONGO_URL') ?: 'mongodb://localhost:27017';
24
        $mongo = new Client(
25
            $this->mongoUrl,
26
            [],
27
            ['typeMap' => ['root' => 'array', 'document' => 'array', 'array' => 'array']]
28
        );
29
        $this->collection = $mongo->selectDatabase('testing')->selectCollection('messages');
30
        $this->collection->deleteMany([]);
31
32
        $this->queue = new Queue($this->collection);
33
    }
34
35
    public function tearDown()
36
    {
37
        (new Client($this->mongoUrl))->dropDatabase('testing');
38
    }
39
40
    /**
41
     * @test
42
     * @covers ::__construct
43
     * @expectedException \InvalidArgumentException
44
     */
45
    public function constructWithNonStringUrl()
46
    {
47
        new Queue(1, 'testing', 'messages');
48
    }
49
50
    /**
51
     * @test
52
     * @covers ::ensureGetIndex
53
     */
54
    public function ensureGetIndex()
55
    {
56
        $collection = (new Client($this->mongoUrl))->selectDatabase('testing')->selectCollection(uniqid());
57
        $queue = new Queue($collection);
58
        $queue->ensureGetIndex(['type' => 1], ['boo' => -1]);
59
        $queue->ensureGetIndex(['another.sub' => 1]);
60
61
        $indexes = iterator_to_array($collection->listIndexes());
62
        $this->assertSame(3, count($indexes));
63
64
        $expectedOne = [
65
            'earliestGet' => 1,
66
            'payload.type' => 1,
67
            'priority' => 1,
68
            'created' => 1,
69
            'payload.boo' => -1,
70
        ];
71
        $this->assertSame($expectedOne, $indexes[1]['key']);
72
73
        $expectedTwo = [
74
            'earliestGet' => 1,
75
            'payload.another.sub' => 1,
76
            'priority' => 1,
77
            'created' => 1,
78
        ];
79
        $this->assertSame($expectedTwo, $indexes[2]['key']);
80
    }
81
82
    /**
83
     * @test
84
     * @covers ::ensureGetIndex
85
     * @expectedException \Exception
86
     */
87
    public function ensureGetIndexWithTooLongCollectionName()
88
    {
89
        $collectionName = 'messages012345678901234567890123456789012345678901234567890123456789';
90
        $collectionName .= '012345678901234567890123456789012345678901234567890123456789';//128 chars
91
92
        $queue = new Queue($this->mongoUrl, 'testing', $collectionName);
93
        $queue->ensureGetIndex([]);
94
    }
95
96
    /**
97
     * @test
98
     * @covers ::ensureGetIndex
99
     * @expectedException \InvalidArgumentException
100
     */
101
    public function ensureGetIndexWithNonStringBeforeSortKey()
102
    {
103
        $this->queue->ensureGetIndex([0 => 1]);
104
    }
105
106
    /**
107
     * @test
108
     * @covers ::ensureGetIndex
109
     * @expectedException \InvalidArgumentException
110
     */
111
    public function ensureGetIndexWithNonStringAfterSortKey()
112
    {
113
        $this->queue->ensureGetIndex(['field' => 1], [0 => 1]);
114
    }
115
116
    /**
117
     * @test
118
     * @covers ::ensureGetIndex
119
     * @expectedException \InvalidArgumentException
120
     */
121
    public function ensureGetIndexWithBadBeforeSortValue()
122
    {
123
        $this->queue->ensureGetIndex(['field' => 'NotAnInt']);
124
    }
125
126
    /**
127
     * @test
128
     * @covers ::ensureGetIndex
129
     * @expectedException \InvalidArgumentException
130
     */
131
    public function ensureGetIndexWithBadAfterSortValue()
132
    {
133
        $this->queue->ensureGetIndex([], ['field' => 'NotAnInt']);
134
    }
135
136
    /**
137
     * Verifies the behaviour of the Queue when it cannot create an index after 5 attempts.
138
     *
139
     * @test
140
     * @covers ::ensureGetIndex
141
     * @expectedException \Exception
142
     * @expectedExceptionMessage couldnt create index after 5 attempts
143
     */
144
    public function ensureIndexCannotBeCreatedAfterFiveAttempts()
145
    {
146
        $mockCollection = $this->getMockBuilder('\MongoDB\Collection')->disableOriginalConstructor()->getMock();
147
148
        $mockCollection->method('listIndexes')->willReturn([]);
149
150
        $queue = new Queue($mockCollection);
151
        $queue->ensureCountIndex(['type' => 1], false);
152
    }
153
154
    /**
155
     * @test
156
     * @covers ::ensureCountIndex
157
     */
158
    public function ensureCountIndex()
159
    {
160
        $collection = (new Client($this->mongoUrl))->selectDatabase('testing')->selectCollection(uniqid());
161
        $queue = new Queue($collection);
162
        $queue->ensureCountIndex(['type' => 1, 'boo' => -1], false);
163
        $queue->ensureCountIndex(['another.sub' => 1], true);
164
165
        $indexes = iterator_to_array($collection->listIndexes());
166
        $this->assertSame(3, count($indexes));
167
168
        $expectedOne = ['payload.type' => 1, 'payload.boo' => -1];
169
        $this->assertSame($expectedOne, $indexes[1]['key']);
170
171
        $expectedTwo = ['earliestGet' => 1, 'payload.another.sub' => 1];
172
        $this->assertSame($expectedTwo, $indexes[2]['key']);
173
    }
174
175
    /**
176
     * @test
177
     * @covers ::ensureCountIndex
178
     */
179
    public function ensureCountIndexWithPrefixOfPrevious()
180
    {
181
        $this->queue->ensureCountIndex(['type' => 1, 'boo' => -1], false);
182
        $this->queue->ensureCountIndex(['type' => 1], false);
183
184
        $indexes = iterator_to_array($this->collection->listIndexes());
185
        $this->assertSame(2, count($indexes));
186
187
        $expected = ['payload.type' => 1, 'payload.boo' => -1];
188
        $this->assertSame($expected, $indexes[1]['key']);
189
    }
190
191
    /**
192
     * @test
193
     * @covers ::ensureCountIndex
194
     * @expectedException \InvalidArgumentException
195
     */
196
    public function ensureCountIndexWithNonStringKey()
197
    {
198
        $this->queue->ensureCountIndex([0 => 1], false);
199
    }
200
201
    /**
202
     * @test
203
     * @covers ::ensureCountIndex
204
     * @expectedException \InvalidArgumentException
205
     */
206
    public function ensureCountIndexWithBadValue()
207
    {
208
        $this->queue->ensureCountIndex(['field' => 'NotAnInt'], false);
209
    }
210
211
    /**
212
     * @test
213
     * @covers ::get
214
     */
215
    public function getByBadQuery()
216
    {
217
        $this->queue->send($this->getMessage(['key1' => 0, 'key2' => true]));
218
219
        $result = $this->queue->get(['key3' => 0]);
220
        $this->assertSame([], $result);
221
222
        $this->assertSame(1, $this->collection->count());
223
    }
224
225
    /**
226
     * @test
227
     * @covers ::get
228
     */
229
    public function getWithNegativePollDuration()
230
    {
231
        $this->queue->send($this->getMessage(['key1' => 0]));
232
        $this->assertNotNull($this->queue->get([], ['pollDurationInMillis' => -1]));
233
    }
234
235
    /**
236
     * @test
237
     * @covers ::get
238
     * @expectedException \InvalidArgumentException
239
     */
240
    public function getWithNonStringKey()
241
    {
242
        $this->queue->get([0 => 'a value']);
243
    }
244
245
    /**
246
     * @test
247
     * @covers ::get
248
     */
249
    public function getByFullQuery()
250
    {
251
        $messageOne = $this->getMessage(['key1' => 0, 'key2' => true]);
252
253
        $this->queue->send($messageOne);
254
        $this->queue->send($this->getMessage(['key' => 'value']));
255
256
        $result = $this->queue->get($messageOne->getPayload())[0];
257
258
        $this->assertSame((string)$messageOne->getId(), (string)$result->getId());
259
        $this->assertSame($messageOne->getPayload(), $result->getPayload());
260
    }
261
262
    /**
263
     * @test
264
     * @covers ::get
265
     */
266
    public function getBySubDocQuery()
267
    {
268
        $expected = $this->getMessage(
269
            [
270
                'one' => [
271
                    'two' => [
272
                        'three' => 5,
273
                        'notused' => 'notused',
274
                    ],
275
                    'notused' => 'notused',
276
                ],
277
                'notused' => 'notused',
278
            ]
279
        );
280
281
        $this->queue->send($this->getMessage(['key1' => 0, 'key2' => true]));
282
        $this->queue->send($expected);
283
284
        $actual = $this->queue->get(['one.two.three' => ['$gt' => 4]])[0];
285
        $this->assertSameMessage($expected, $actual);
286
    }
287
288
    /**
289
     * @test
290
     * @covers ::get
291
     */
292
    public function getBeforeAck()
293
    {
294
        $messageOne = $this->getMessage(['key1' => 0, 'key2' => true]);
295
296
        $this->queue->send($messageOne);
297
        $this->queue->send($this->getMessage(['key' => 'value']));
298
299
        $this->queue->get($messageOne->getPayload());
300
301
        //try get message we already have before ack
302
        $result = $this->queue->get($messageOne->getPayload());
303
        $this->assertSame([], $result);
304
    }
305
306
    /**
307
     * @test
308
     * @covers ::get
309
     */
310
    public function getWithCustomPriority()
311
    {
312
        $messageOne = $this->getMessage(['key' => 0], null, 0.5);
313
        $messageTwo = $this->getMessage(['key' => 1], null, 0.4);
314
        $messageThree = $this->getMessage(['key' => 2], null, 0.3);
315
316
        $this->queue->send($messageOne);
317
        $this->queue->send($messageTwo);
318
        $this->queue->send($messageThree);
319
320
        $messages = $this->queue->get([], ['maxNumberOfMessages' => 10]);
321
322
        $this->assertSameMessage($messageThree, $messages[0]);
323
        $this->assertSameMessage($messageTwo, $messages[1]);
324
        $this->assertSameMessage($messageOne, $messages[2]);
325
    }
326
327
    /**
328
     * @test
329
     * @covers ::get
330
     */
331
    public function getWithTimeBasedPriority()
332
    {
333
        $messageOne = $this->getMessage(['key' => 0]);
334
        $messageTwo = $this->getMessage(['key' => 1]);
335
        $messageThree = $this->getMessage(['key' => 2]);
336
337
        $this->queue->send($messageOne);
338
        $this->queue->send($messageTwo);
339
        $this->queue->send($messageThree);
340
341
        $messages = $this->queue->get([], ['maxNumberOfMessages' => 10]);
342
343
        $this->assertSameMessage($messageOne, $messages[0]);
344
        $this->assertSameMessage($messageTwo, $messages[1]);
345
        $this->assertSameMessage($messageThree, $messages[2]);
346
    }
347
348
    /**
349
     * @test
350
     * @covers ::get
351
     */
352
    public function getWait()
353
    {
354
        $start = microtime(true);
355
356
        $this->queue->get([]);
357
358
        $end = microtime(true);
359
360
        $this->assertTrue($end - $start >= 0.200);
361
        $this->assertTrue($end - $start < 0.300);
362
    }
363
364
    /**
365
     * @test
366
     * @covers ::get
367
     */
368
    public function earliestGet()
369
    {
370
         $messageOne = $this->getMessage(
371
             ['key1' => 0, 'key2' => true],
372
             new UTCDateTime((time() + 1) * 1000)
373
         );
374
375
         $this->queue->send($messageOne);
376
377
         $this->assertSame([], $this->queue->get($messageOne->getPayload()));
378
379
         sleep(1);
380
381
         $this->assertCount(1, $this->queue->get($messageOne->getPayload()));
382
    }
383
384
    /**
385
     * @test
386
     * @covers ::get
387
     */
388
    public function resetStuck()
389
    {
390
        $messageOne = $this->getMessage(['key' => 0]);
391
        $messageTwo = $this->getMessage(['key' => 1]);
392
393
        $this->queue->send($messageOne);
394
        $this->queue->send($messageTwo);
395
396
        //sets to running
397
        $this->collection->updateOne(
398
            ['payload.key' => 0],
399
            ['$set' => ['earliestGet' => new UTCDateTime(time() * 1000)]]
400
        );
401
        $this->collection->updateOne(
402
            ['payload.key' => 1],
403
            ['$set' => ['earliestGet' => new UTCDateTime(time() * 1000)]]
404
        );
405
406
        $this->assertSame(
407
            2,
408
            $this->collection->count(
409
                ['earliestGet' => ['$lte' => new UTCDateTime((int)(microtime(true) * 1000))]]
410
            )
411
        );
412
413
        //resets and gets messageOne
414
        $this->assertNotNull($this->queue->get($messageOne->getPayload()));
415
416
        $this->assertSame(
417
            1,
418
            $this->collection->count(
419
                ['earliestGet' => ['$lte' => new UTCDateTime((int)(microtime(true) * 1000))]]
420
            )
421
        );
422
    }
423
424
    /**
425
     * @test
426
     * @covers ::count
427
     * @expectedException \InvalidArgumentException
428
     */
429
    public function countWithNonStringKey()
430
    {
431
        $this->queue->count([0 => 'a value']);
432
    }
433
434
    /**
435
     * @test
436
     * @covers ::count
437
     */
438
    public function testCount()
439
    {
440
        $message = $this->getMessage(['boo' => 'scary']);
441
442
        $this->assertSame(0, $this->queue->count($message->getPayload(), true));
443
        $this->assertSame(0, $this->queue->count($message->getPayload(), false));
444
        $this->assertSame(0, $this->queue->count($message->getPayload()));
445
446
        $this->queue->send($message);
447
        $this->assertSame(1, $this->queue->count($message->getPayload(), false));
448
        $this->assertSame(0, $this->queue->count($message->getPayload(), true));
449
        $this->assertSame(1, $this->queue->count($message->getPayload()));
450
451
        $this->queue->get($message->getPayload());
452
        $this->assertSame(0, $this->queue->count($message->getPayload(), false));
453
        $this->assertSame(1, $this->queue->count($message->getPayload(), true));
454
        $this->assertSame(1, $this->queue->count($message->getPayload()));
455
    }
456
457
    /**
458
     * @test
459
     * @covers ::ack
460
     */
461
    public function ack()
462
    {
463
        $messageOne = $this->getMessage(['key1' => 0, 'key2' => true]);
464
465
        $this->queue->send($messageOne);
466
        $this->queue->send($this->getMessage(['key' => 'value']));
467
468
        $result = $this->queue->get($messageOne->getPayload())[0];
469
        $this->assertSame(2, $this->collection->count());
470
471
        $this->queue->ack($result);
472
        $this->assertSame(1, $this->collection->count());
473
    }
474
475
    /**
476
     * @test
477
     * @covers ::requeue
478
     */
479
    public function requeue()
480
    {
481
        $messages = [
482
            $this->getMessage(['id' => 1, 'key' => 'value']),
483
            $this->getMessage(['id' => 2, 'key' => 'value']),
484
            $this->getMessage(['id' => 3, 'key' => 'value']),
485
        ];
486
487
        foreach ($messages as $message) {
488
            $this->queue->send($message);
489
        }
490
491
        $query = ['key' => 'value'];
492
493
        $message = $this->queue->get($query)[0];
494
495
        $this->assertSameMessage($messages[0], $message);
496
497
        $this->queue->requeue($message->withEarliestGet(new UTCDateTime((int)(microtime(true) * 1000))));
498
499
        $actual = $this->queue->get([], ['maxNumberOfMessages' => 10]);
500
501
        $this->assertSameMessage($messages[1], $actual[0]);
502
        $this->assertSameMessage($messages[2], $actual[1]);
503
        $this->assertSameMessage($messages[0], $actual[2]);
504
    }
505
506
    /**
507
     * @test
508
     * @covers ::send
509
     */
510
    public function send()
511
    {
512
        $message = $this->getMessage(['key1' => 0, 'key2' => true], new UTCDateTime(34 * 1000), 0.8);
513
        $this->queue->send($message);
514
        $this->assertSingleMessage($message);
515
    }
516
517
    /**
518
     * @test
519
     * @covers ::send
520
     */
521
    public function sendWithHighEarliestGet()
522
    {
523
        $message = $this->getMessage([], new UTCDateTime(PHP_INT_MAX));
524
        $this->queue->send($message);
525
        $this->assertSingleMessage($message);
526
    }
527
528
    /**
529
     * @test
530
     * @covers ::send
531
     */
532
    public function sendWithLowEarliestGet()
533
    {
534
        $message = $this->getMessage([], new UTCDateTime(0));
535
        $this->queue->send($message);
536
        $this->assertSingleMessage($message);
537
    }
538
539
    private function assertSameMessage(Message $expected, Message $actual)
540
    {
541
        $this->assertSame((string)$expected->getId(), (string)$actual->getId());
542
        $this->assertSame($expected->getPayload(), $actual->getPayload());
543
        $this->assertSame($expected->getPriority(), $actual->getPriority());
544
    }
545
546
    private function assertSingleMessage(Message $expected)
547
    {
548
        $this->assertSame(1, $this->collection->count());
549
550
        $actual = $this->collection->findOne(
551
            [],
552
            ['typeMap' => ['root' => 'array', 'document' => 'array', 'array' => 'array']]
553
        );
554
555
        $this->assertSame((string)$expected->getId(), (string)$actual['_id']);
556
        $this->assertSame($expected->getPayload(), $actual['payload']);
557
        $this->assertSame($expected->getPriority(), $actual['priority']);
558
        $this->assertEquals($expected->getEarliestGet(), $actual['earliestGet']);
559
    }
560
561
    private function getMessage(array $payload = [], UTCDateTime $earliestGet = null, float $priority = 0.0)
562
    {
563
        return new Message(new ObjectId(), $payload, $earliestGet ?? new UTCDateTime(0), $priority);
564
    }
565
}
566