QueueTest   A
last analyzed

Complexity

Total Complexity 39

Size/Duplication

Total Lines 517
Duplicated Lines 0 %

Importance

Changes 13
Bugs 0 Features 0
Metric Value
eloc 189
c 13
b 0
f 0
dl 0
loc 517
rs 9.28
wmc 39

36 Methods

Rating   Name   Duplication   Size   Complexity  
A send() 0 5 1
A earliestGet() 0 14 1
A sendAllMessages() 0 4 2
A tearDown() 0 3 1
A getWithCustomPriority() 0 15 1
A getWithNegativePollDuration() 0 4 1
A getWithTimeBasedPriority() 0 15 1
A getByFullQuery() 0 11 1
A ensureCountIndex() 0 15 1
A assertSingleMessage() 0 14 1
A getWait() 0 10 1
A ensureGetIndex() 0 15 1
A getWithOverflowResetTimestamp() 0 5 1
A ensureCountIndexWithPrefixOfPrevious() 0 10 1
A getMessage() 0 7 1
A sendWithHighEarliestGet() 0 5 1
A requeue() 0 21 1
A getBySubDocQuery() 0 20 1
A assertSameMessage() 0 5 1
A getBeforeAck() 0 12 1
A ack() 0 9 2
A setUp() 0 12 2
A sendWithLowEarliestGet() 0 5 1
A getByBadQuery() 0 8 1
A testCount() 0 17 1
A countWithNonStringKey() 0 4 1
A ensureGetIndexWithTooLongCollectionName() 0 8 1
A ensureGetIndexWithBadAfterSortValue() 0 4 1
A ensureCountIndexWithNonStringKey() 0 4 1
A getWithNonStringKey() 0 4 1
A ensureIndexCannotBeCreatedAfterFiveAttempts() 0 10 1
A ensureGetIndexWithNonStringAfterSortKey() 0 4 1
A ensureCountIndexWithBadValue() 0 4 1
A ensureGetIndexWithBadBeforeSortValue() 0 4 1
A constructWithNonStringUrl() 0 4 1
A ensureGetIndexWithNonStringBeforeSortKey() 0 4 1
1
<?php
2
3
namespace TraderInteractive\Mongo;
4
5
use MongoDB\BSON\ObjectId;
6
use MongoDB\BSON\UTCDateTime;
7
use MongoDB\Client;
8
use PHPUnit\Framework\TestCase;
9
10
/**
11
 * @coversDefaultClass \TraderInteractive\Mongo\Queue
12
 * @covers ::<private>
13
 * @covers ::__construct
14
 */
15
final class QueueTest extends TestCase
16
{
17
    private $collection;
18
    private $mongoUrl;
19
    private $queue;
20
21
    public function setUp(): void
22
    {
23
        $this->mongoUrl = getenv('TESTING_MONGO_URL') ?: 'mongodb://localhost:27017';
24
        $mongo = new Client(
25
            $this->mongoUrl,
26
            [],
27
            ['typeMap' => ['root' => 'array', 'document' => 'array', 'array' => 'array']]
28
        );
29
        $this->collection = $mongo->selectDatabase('testing')->selectCollection('messages');
30
        $this->collection->deleteMany([]);
31
32
        $this->queue = new Queue($this->collection);
33
    }
34
35
    public function tearDown(): void
36
    {
37
        (new Client($this->mongoUrl))->dropDatabase('testing');
38
    }
39
40
    /**
41
     * @test
42
     * @covers ::__construct
43
     */
44
    public function constructWithNonStringUrl()
45
    {
46
        $this->expectException(\InvalidArgumentException::class);
47
        new Queue(1, 'testing', 'messages');
48
    }
49
50
    /**
51
     * @test
52
     * @covers ::ensureGetIndex
53
     */
54
    public function ensureGetIndex()
55
    {
56
        $collection = (new Client($this->mongoUrl))->selectDatabase('testing')->selectCollection(uniqid());
57
        $queue = new Queue($collection);
58
        $queue->ensureGetIndex(['type' => 1], ['boo' => -1]);
59
        $queue->ensureGetIndex(['another.sub' => 1]);
60
61
        $indexes = iterator_to_array($collection->listIndexes());
62
        $this->assertSame(3, count($indexes));
63
64
        $expectedOne = ['earliestGet' => 1, 'payload.type' => 1, 'priority' => 1, 'created' => 1, 'payload.boo' => -1];
65
        $this->assertSame($expectedOne, $indexes[1]['key']);
66
67
        $expectedTwo = ['earliestGet' => 1, 'payload.another.sub' => 1, 'priority' => 1, 'created' => 1];
68
        $this->assertSame($expectedTwo, $indexes[2]['key']);
69
    }
70
71
    /**
72
     * @test
73
     * @covers ::ensureGetIndex
74
     */
75
    public function ensureGetIndexWithTooLongCollectionName()
76
    {
77
        $this->expectException(\Exception::class);
78
        $collectionName = 'messages012345678901234567890123456789012345678901234567890123456789';
79
        $collectionName .= '012345678901234567890123456789012345678901234567890123456789';//128 chars
80
81
        $queue = new Queue($this->mongoUrl, 'testing', $collectionName);
82
        $queue->ensureGetIndex([]);
83
    }
84
85
    /**
86
     * @test
87
     * @covers ::ensureGetIndex
88
     */
89
    public function ensureGetIndexWithNonStringBeforeSortKey()
90
    {
91
        $this->expectException(\InvalidArgumentException::class);
92
        $this->queue->ensureGetIndex([0 => 1]);
93
    }
94
95
    /**
96
     * @test
97
     * @covers ::ensureGetIndex
98
     */
99
    public function ensureGetIndexWithNonStringAfterSortKey()
100
    {
101
        $this->expectException(\InvalidArgumentException::class);
102
        $this->queue->ensureGetIndex(['field' => 1], [0 => 1]);
103
    }
104
105
    /**
106
     * @test
107
     * @covers ::ensureGetIndex
108
     */
109
    public function ensureGetIndexWithBadBeforeSortValue()
110
    {
111
        $this->expectException(\InvalidArgumentException::class);
112
        $this->queue->ensureGetIndex(['field' => 'NotAnInt']);
113
    }
114
115
    /**
116
     * @test
117
     * @covers ::ensureGetIndex
118
     */
119
    public function ensureGetIndexWithBadAfterSortValue()
120
    {
121
        $this->expectException(\InvalidArgumentException::class);
122
        $this->queue->ensureGetIndex([], ['field' => 'NotAnInt']);
123
    }
124
125
    /**
126
     * Verifies the behaviour of the Queue when it cannot create an index after 5 attempts.
127
     *
128
     * @test
129
     * @covers ::ensureGetIndex
130
     */
131
    public function ensureIndexCannotBeCreatedAfterFiveAttempts()
132
    {
133
        $this->expectException(\Exception::class);
134
        $this->expectExceptionMessage('couldnt create index after 5 attempts');
135
        $mockCollection = $this->getMockBuilder('\MongoDB\Collection')->disableOriginalConstructor()->getMock();
136
137
        $mockCollection->method('listIndexes')->willReturn([]);
138
139
        $queue = new Queue($mockCollection);
140
        $queue->ensureCountIndex(['type' => 1], false);
141
    }
142
143
    /**
144
     * @test
145
     * @covers ::ensureCountIndex
146
     */
147
    public function ensureCountIndex()
148
    {
149
        $collection = (new Client($this->mongoUrl))->selectDatabase('testing')->selectCollection(uniqid());
150
        $queue = new Queue($collection);
151
        $queue->ensureCountIndex(['type' => 1, 'boo' => -1], false);
152
        $queue->ensureCountIndex(['another.sub' => 1], true);
153
154
        $indexes = iterator_to_array($collection->listIndexes());
155
        $this->assertSame(3, count($indexes));
156
157
        $expectedOne = ['payload.type' => 1, 'payload.boo' => -1];
158
        $this->assertSame($expectedOne, $indexes[1]['key']);
159
160
        $expectedTwo = ['earliestGet' => 1, 'payload.another.sub' => 1];
161
        $this->assertSame($expectedTwo, $indexes[2]['key']);
162
    }
163
164
    /**
165
     * @test
166
     * @covers ::ensureCountIndex
167
     */
168
    public function ensureCountIndexWithPrefixOfPrevious()
169
    {
170
        $this->queue->ensureCountIndex(['type' => 1, 'boo' => -1], false);
171
        $this->queue->ensureCountIndex(['type' => 1], false);
172
173
        $indexes = iterator_to_array($this->collection->listIndexes());
174
        $this->assertSame(2, count($indexes));
175
176
        $expected = ['payload.type' => 1, 'payload.boo' => -1];
177
        $this->assertSame($expected, $indexes[1]['key']);
178
    }
179
180
    /**
181
     * @test
182
     * @covers ::ensureCountIndex
183
     */
184
    public function ensureCountIndexWithNonStringKey()
185
    {
186
        $this->expectException(\InvalidArgumentException::class);
187
        $this->queue->ensureCountIndex([0 => 1], false);
188
    }
189
190
    /**
191
     * @test
192
     * @covers ::ensureCountIndex
193
     */
194
    public function ensureCountIndexWithBadValue()
195
    {
196
        $this->expectException(\InvalidArgumentException::class);
197
        $this->queue->ensureCountIndex(['field' => 'NotAnInt'], false);
198
    }
199
200
    /**
201
     * @test
202
     * @covers ::get
203
     */
204
    public function getByBadQuery()
205
    {
206
        $this->queue->send($this->getMessage(['key1' => 0, 'key2' => true]));
207
208
        $result = $this->queue->get(['key3' => 0]);
209
        $this->assertSame([], $result);
210
211
        $this->assertSame(1, $this->collection->count());
212
    }
213
214
    /**
215
     * @test
216
     * @covers ::get
217
     *
218
     * @return void
219
     */
220
    public function getWithOverflowResetTimestamp()
221
    {
222
        $this->queue->send($this->getMessage());
223
        $message = $this->queue->get([], ['runningResetDuration' => PHP_INT_MAX])[0];
224
        $this->assertEquals(new UTCDateTime(PHP_INT_MAX), $message->getEarliestGet());
225
    }
226
227
    /**
228
     * @test
229
     * @covers ::get
230
     */
231
    public function getWithNegativePollDuration()
232
    {
233
        $this->queue->send($this->getMessage(['key1' => 0]));
234
        $this->assertNotNull($this->queue->get([], ['pollDurationInMillis' => -1]));
235
    }
236
237
    /**
238
     * @test
239
     * @covers ::get
240
     */
241
    public function getWithNonStringKey()
242
    {
243
        $this->expectException(\InvalidArgumentException::class);
244
        $this->queue->get([0 => 'a value']);
245
    }
246
247
    /**
248
     * @test
249
     * @covers ::get
250
     */
251
    public function getByFullQuery()
252
    {
253
        $messageOne = $this->getMessage(['key1' => 0, 'key2' => true]);
254
255
        $this->queue->send($messageOne);
256
        $this->queue->send($this->getMessage(['key' => 'value']));
257
258
        $result = $this->queue->get($messageOne->getPayload())[0];
259
260
        $this->assertSame((string)$messageOne->getId(), (string)$result->getId());
261
        $this->assertSame($messageOne->getPayload(), $result->getPayload());
262
    }
263
264
    /**
265
     * @test
266
     * @covers ::get
267
     */
268
    public function getBySubDocQuery()
269
    {
270
        $expected = $this->getMessage(
271
            [
272
                'one' => [
273
                    'two' => [
274
                        'three' => 5,
275
                        'notused' => 'notused',
276
                    ],
277
                    'notused' => 'notused',
278
                ],
279
                'notused' => 'notused',
280
            ]
281
        );
282
283
        $this->queue->send($this->getMessage(['key1' => 0, 'key2' => true]));
284
        $this->queue->send($expected);
285
286
        $actual = $this->queue->get(['one.two.three' => ['$gt' => 4]])[0];
287
        $this->assertSameMessage($expected, $actual);
288
    }
289
290
    /**
291
     * @test
292
     * @covers ::get
293
     */
294
    public function getBeforeAck()
295
    {
296
        $messageOne = $this->getMessage(['key1' => 0, 'key2' => true]);
297
298
        $this->queue->send($messageOne);
299
        $this->queue->send($this->getMessage(['key' => 'value']));
300
301
        $this->queue->get($messageOne->getPayload());
302
303
        //try get message we already have before ack
304
        $result = $this->queue->get($messageOne->getPayload());
305
        $this->assertSame([], $result);
306
    }
307
308
    /**
309
     * @test
310
     * @covers ::get
311
     */
312
    public function getWithCustomPriority()
313
    {
314
        $messageOne = $this->getMessage(['key' => 0], null, 0.5);
315
        $messageTwo = $this->getMessage(['key' => 1], null, 0.4);
316
        $messageThree = $this->getMessage(['key' => 2], null, 0.3);
317
318
        $this->queue->send($messageOne);
319
        $this->queue->send($messageTwo);
320
        $this->queue->send($messageThree);
321
322
        $messages = $this->queue->get([], ['maxNumberOfMessages' => 10]);
323
324
        $this->assertSameMessage($messageThree, $messages[0]);
325
        $this->assertSameMessage($messageTwo, $messages[1]);
326
        $this->assertSameMessage($messageOne, $messages[2]);
327
    }
328
329
    /**
330
     * @test
331
     * @covers ::get
332
     */
333
    public function getWithTimeBasedPriority()
334
    {
335
        $messageOne = $this->getMessage(['key' => 0]);
336
        $messageTwo = $this->getMessage(['key' => 1]);
337
        $messageThree = $this->getMessage(['key' => 2]);
338
339
        $this->queue->send($messageOne);
340
        $this->queue->send($messageTwo);
341
        $this->queue->send($messageThree);
342
343
        $messages = $this->queue->get([], ['maxNumberOfMessages' => 10]);
344
345
        $this->assertSameMessage($messageOne, $messages[0]);
346
        $this->assertSameMessage($messageTwo, $messages[1]);
347
        $this->assertSameMessage($messageThree, $messages[2]);
348
    }
349
350
    /**
351
     * @test
352
     * @covers ::get
353
     */
354
    public function getWait()
355
    {
356
        $start = microtime(true);
357
358
        $this->queue->get([]);
359
360
        $end = microtime(true);
361
362
        $this->assertTrue($end - $start >= 0.200);
363
        $this->assertTrue($end - $start < 0.300);
364
    }
365
366
    /**
367
     * @test
368
     * @covers ::get
369
     */
370
    public function earliestGet()
371
    {
372
        $messageOne = $this->getMessage(
373
            ['key1' => 0, 'key2' => true],
374
            new UTCDateTime((time() + 1) * 1000)
375
        );
376
377
        $this->queue->send($messageOne);
378
379
        $this->assertSame([], $this->queue->get($messageOne->getPayload()));
380
381
        sleep(1);
382
383
        $this->assertCount(1, $this->queue->get($messageOne->getPayload()));
384
    }
385
386
    /**
387
     * @test
388
     * @covers ::count
389
     */
390
    public function countWithNonStringKey()
391
    {
392
        $this->expectException(\InvalidArgumentException::class);
393
        $this->queue->count([0 => 'a value']);
394
    }
395
396
    /**
397
     * @test
398
     * @covers ::count
399
     */
400
    public function testCount()
401
    {
402
        $message = $this->getMessage(['boo' => 'scary']);
403
404
        $this->assertSame(0, $this->queue->count($message->getPayload(), true));
405
        $this->assertSame(0, $this->queue->count($message->getPayload(), false));
406
        $this->assertSame(0, $this->queue->count($message->getPayload()));
407
408
        $this->queue->send($message);
409
        $this->assertSame(1, $this->queue->count($message->getPayload(), false));
410
        $this->assertSame(0, $this->queue->count($message->getPayload(), true));
411
        $this->assertSame(1, $this->queue->count($message->getPayload()));
412
413
        $this->queue->get($message->getPayload());
414
        $this->assertSame(0, $this->queue->count($message->getPayload(), false));
415
        $this->assertSame(1, $this->queue->count($message->getPayload(), true));
416
        $this->assertSame(1, $this->queue->count($message->getPayload()));
417
    }
418
419
    /**
420
     * @test
421
     * @covers ::ack
422
     */
423
    public function ack()
424
    {
425
        $messages = [$this->getMessage(), $this->getMessage()];
426
        $this->sendAllMessages($messages);
427
        $count = $this->collection->count();
428
        $this->assertSame(2, $count);
429
        foreach ($this->queue->get([], ['maxNumberOfMessages' => 10]) as $message) {
430
            $this->queue->ack($message);
431
            $this->assertSame(--$count, $this->collection->count());
432
        }
433
    }
434
435
    /**
436
     * @test
437
     * @covers ::requeue
438
     */
439
    public function requeue()
440
    {
441
        $messages = [
442
            $this->getMessage(['key' => 1]),
443
            $this->getMessage(['key' => 2]),
444
            $this->getMessage(['key' => 3]),
445
        ];
446
447
        $this->sendAllMessages($messages);
448
449
        $message = $this->queue->get([])[0];
450
451
        $this->assertSameMessage($messages[0], $message);
452
453
        $this->queue->requeue($message->withEarliestGet(new UTCDateTime((int)(microtime(true) * 1000))));
454
455
        $actual = $this->queue->get([], ['maxNumberOfMessages' => 10]);
456
457
        $this->assertSameMessage($messages[1], $actual[0]);
458
        $this->assertSameMessage($messages[2], $actual[1]);
459
        $this->assertSameMessage($messages[0], $actual[2]);
460
    }
461
462
    /**
463
     * @test
464
     * @covers ::send
465
     */
466
    public function send()
467
    {
468
        $message = $this->getMessage(['key1' => 0, 'key2' => true], new UTCDateTime(34 * 1000), 0.8);
469
        $this->queue->send($message);
470
        $this->assertSingleMessage($message);
471
    }
472
473
    /**
474
     * @test
475
     * @covers ::send
476
     */
477
    public function sendWithHighEarliestGet()
478
    {
479
        $message = $this->getMessage([], new UTCDateTime(PHP_INT_MAX));
480
        $this->queue->send($message);
481
        $this->assertSingleMessage($message);
482
    }
483
484
    /**
485
     * @test
486
     * @covers ::send
487
     */
488
    public function sendWithLowEarliestGet()
489
    {
490
        $message = $this->getMessage([], new UTCDateTime(0));
491
        $this->queue->send($message);
492
        $this->assertSingleMessage($message);
493
    }
494
495
    private function assertSameMessage(Message $expected, Message $actual)
496
    {
497
        $this->assertSame((string)$expected->getId(), (string)$actual->getId());
498
        $this->assertSame($expected->getPayload(), $actual->getPayload());
499
        $this->assertSame($expected->getPriority(), $actual->getPriority());
500
    }
501
502
    private function assertSingleMessage(Message $expected)
503
    {
504
        $this->assertSame(1, $this->collection->count());
505
506
        $actual = $this->collection->findOne(
507
            [],
508
            ['typeMap' => ['root' => 'array', 'document' => 'array', 'array' => 'array']]
509
        );
510
511
        $this->assertSame((string)$expected->getId(), (string)$actual['_id']);
512
        $this->assertSame($expected->getPayload(), $actual['payload']);
513
        $this->assertSame($expected->getPriority(), $actual['priority']);
514
        $this->assertSame(gethostname(), $actual['machineName']);
515
        $this->assertEquals($expected->getEarliestGet(), $actual['earliestGet']);
516
    }
517
518
    private function getMessage(array $payload = [], UTCDateTime $earliestGet = null, float $priority = 0.0)
519
    {
520
        return new Message(
521
            new ObjectId(),
522
            $payload,
523
            $earliestGet ?? new UTCDateTime((int)microtime() * 1000),
524
            $priority
525
        );
526
    }
527
528
    private function sendAllMessages(array $messages)
529
    {
530
        foreach ($messages as $message) {
531
            $this->queue->send($message);
532
        }
533
    }
534
}
535