UnorderedQueueTest   A
last analyzed

Complexity

Total Complexity 37

Size/Duplication

Total Lines 469
Duplicated Lines 0 %

Importance

Changes 1
Bugs 0 Features 0
Metric Value
eloc 167
c 1
b 0
f 0
dl 0
loc 469
rs 9.44
wmc 37

34 Methods

Rating   Name   Duplication   Size   Complexity  
A send() 0 5 1
A countWithNonStringKey() 0 4 1
A ensureIndexCannotBeCreatedAfterFiveAttempts() 0 10 1
A getMessage() 0 7 1
A ensureCountIndexWithNonStringKey() 0 4 1
A tearDown() 0 3 1
A constructWithNonStringUrl() 0 4 1
A getByFullQuery() 0 11 1
A testCount() 0 17 1
A getWithNegativePollDuration() 0 4 1
A ensureCountIndex() 0 15 1
A assertSingleMessage() 0 14 1
A getBeforeAck() 0 12 1
A ensureGetIndexWithTooLongCollectionName() 0 8 1
A sendWithLowEarliestGet() 0 5 1
A earliestGet() 0 14 1
A getByBadQuery() 0 8 1
A ensureGetIndex() 0 15 1
A ensureGetIndexWithNonStringBeforeSortKey() 0 4 1
A ensureCountIndexWithBadValue() 0 4 1
A getWithOverflowResetTimestamp() 0 5 1
A ensureGetIndexWithBadAfterSortValue() 0 4 1
A sendAllMessages() 0 4 2
A ensureCountIndexWithPrefixOfPrevious() 0 10 1
A setUp() 0 12 2
A ack() 0 9 2
A getWithTimeBasedPriority() 0 15 1
A sendWithHighEarliestGet() 0 5 1
A getBySubDocQuery() 0 20 1
A ensureGetIndexWithNonStringAfterSortKey() 0 4 1
A getWithNonStringKey() 0 4 1
A ensureGetIndexWithBadBeforeSortValue() 0 4 1
A getWait() 0 10 1
A assertSameMessage() 0 5 1
1
<?php
2
3
namespace TraderInteractive\Mongo;
4
5
use MongoDB\BSON\ObjectId;
6
use MongoDB\BSON\UTCDateTime;
7
use MongoDB\Client;
8
use PHPUnit\Framework\TestCase;
9
10
/**
11
 * @coversDefaultClass \TraderInteractive\Mongo\UnorderedQueue
12
 * @covers ::<private>
13
 * @covers ::__construct
14
 */
15
final class UnorderedQueueTest extends TestCase
16
{
17
    private $collection;
18
    private $mongoUrl;
19
    private $queue;
20
21
    public function setUp(): void
22
    {
23
        $this->mongoUrl = getenv('TESTING_MONGO_URL') ?: 'mongodb://localhost:27017';
24
        $mongo = new Client(
25
            $this->mongoUrl,
26
            [],
27
            ['typeMap' => ['root' => 'array', 'document' => 'array', 'array' => 'array']]
28
        );
29
        $this->collection = $mongo->selectDatabase('testing')->selectCollection('messages');
30
        $this->collection->deleteMany([]);
31
32
        $this->queue = new UnorderedQueue($this->collection);
33
    }
34
35
    public function tearDown(): void
36
    {
37
        (new Client($this->mongoUrl))->dropDatabase('testing');
38
    }
39
40
    /**
41
     * @test
42
     * @covers ::__construct
43
     */
44
    public function constructWithNonStringUrl()
45
    {
46
        $this->expectException(\InvalidArgumentException::class);
47
        new UnorderedQueue(1, 'testing', 'messages');
48
    }
49
50
    /**
51
     * @test
52
     * @covers ::ensureGetIndex
53
     */
54
    public function ensureGetIndex()
55
    {
56
        $collection = (new Client($this->mongoUrl))->selectDatabase('testing')->selectCollection(uniqid());
57
        $queue = new UnorderedQueue($collection);
58
        $queue->ensureGetIndex(['type' => 1], ['boo' => -1]);
59
        $queue->ensureGetIndex(['another.sub' => 1]);
60
61
        $indexes = iterator_to_array($collection->listIndexes());
62
        $this->assertSame(3, count($indexes));
63
64
        $expectedOne = ['earliestGet' => 1, 'payload.type' => 1, 'priority' => 1, 'created' => 1, 'payload.boo' => -1];
65
        $this->assertSame($expectedOne, $indexes[1]['key']);
66
67
        $expectedTwo = ['earliestGet' => 1, 'payload.another.sub' => 1, 'priority' => 1, 'created' => 1];
68
        $this->assertSame($expectedTwo, $indexes[2]['key']);
69
    }
70
71
    /**
72
     * @test
73
     * @covers ::ensureGetIndex
74
     */
75
    public function ensureGetIndexWithTooLongCollectionName()
76
    {
77
        $this->expectException(\Exception::class);
78
        $collectionName = 'messages012345678901234567890123456789012345678901234567890123456789';
79
        $collectionName .= '012345678901234567890123456789012345678901234567890123456789';//128 chars
80
81
        $queue = new UnorderedQueue($this->mongoUrl, 'testing', $collectionName);
82
        $queue->ensureGetIndex([]);
83
    }
84
85
    /**
86
     * @test
87
     * @covers ::ensureGetIndex
88
     */
89
    public function ensureGetIndexWithNonStringBeforeSortKey()
90
    {
91
        $this->expectException(\InvalidArgumentException::class);
92
        $this->queue->ensureGetIndex([0 => 1]);
93
    }
94
95
    /**
96
     * @test
97
     * @covers ::ensureGetIndex
98
     */
99
    public function ensureGetIndexWithNonStringAfterSortKey()
100
    {
101
        $this->expectException(\InvalidArgumentException::class);
102
        $this->queue->ensureGetIndex(['field' => 1], [0 => 1]);
103
    }
104
105
    /**
106
     * @test
107
     * @covers ::ensureGetIndex
108
     */
109
    public function ensureGetIndexWithBadBeforeSortValue()
110
    {
111
        $this->expectException(\InvalidArgumentException::class);
112
        $this->queue->ensureGetIndex(['field' => 'NotAnInt']);
113
    }
114
115
    /**
116
     * @test
117
     * @covers ::ensureGetIndex
118
     */
119
    public function ensureGetIndexWithBadAfterSortValue()
120
    {
121
        $this->expectException(\InvalidArgumentException::class);
122
        $this->queue->ensureGetIndex([], ['field' => 'NotAnInt']);
123
    }
124
125
    /**
126
     * Verifies the behaviour of the UnorderedQueue when it cannot create an index after 5 attempts.
127
     *
128
     * @test
129
     * @covers ::ensureGetIndex
130
     */
131
    public function ensureIndexCannotBeCreatedAfterFiveAttempts()
132
    {
133
        $this->expectException(\Exception::class);
134
        $this->expectExceptionMessage('couldnt create index after 5 attempts');
135
        $mockCollection = $this->getMockBuilder('\MongoDB\Collection')->disableOriginalConstructor()->getMock();
136
137
        $mockCollection->method('listIndexes')->willReturn([]);
138
139
        $queue = new UnorderedQueue($mockCollection);
140
        $queue->ensureCountIndex(['type' => 1], false);
141
    }
142
143
    /**
144
     * @test
145
     * @covers ::ensureCountIndex
146
     */
147
    public function ensureCountIndex()
148
    {
149
        $collection = (new Client($this->mongoUrl))->selectDatabase('testing')->selectCollection(uniqid());
150
        $queue = new UnorderedQueue($collection);
151
        $queue->ensureCountIndex(['type' => 1, 'boo' => -1], false);
152
        $queue->ensureCountIndex(['another.sub' => 1], true);
153
154
        $indexes = iterator_to_array($collection->listIndexes());
155
        $this->assertSame(3, count($indexes));
156
157
        $expectedOne = ['payload.type' => 1, 'payload.boo' => -1];
158
        $this->assertSame($expectedOne, $indexes[1]['key']);
159
160
        $expectedTwo = ['earliestGet' => 1, 'payload.another.sub' => 1];
161
        $this->assertSame($expectedTwo, $indexes[2]['key']);
162
    }
163
164
    /**
165
     * @test
166
     * @covers ::ensureCountIndex
167
     */
168
    public function ensureCountIndexWithPrefixOfPrevious()
169
    {
170
        $this->queue->ensureCountIndex(['type' => 1, 'boo' => -1], false);
171
        $this->queue->ensureCountIndex(['type' => 1], false);
172
173
        $indexes = iterator_to_array($this->collection->listIndexes());
174
        $this->assertSame(2, count($indexes));
175
176
        $expected = ['payload.type' => 1, 'payload.boo' => -1];
177
        $this->assertSame($expected, $indexes[1]['key']);
178
    }
179
180
    /**
181
     * @test
182
     * @covers ::ensureCountIndex
183
     */
184
    public function ensureCountIndexWithNonStringKey()
185
    {
186
        $this->expectException(\InvalidArgumentException::class);
187
        $this->queue->ensureCountIndex([0 => 1], false);
188
    }
189
190
    /**
191
     * @test
192
     * @covers ::ensureCountIndex
193
     */
194
    public function ensureCountIndexWithBadValue()
195
    {
196
        $this->expectException(\InvalidArgumentException::class);
197
        $this->queue->ensureCountIndex(['field' => 'NotAnInt'], false);
198
    }
199
200
    /**
201
     * @test
202
     * @covers ::get
203
     */
204
    public function getByBadQuery()
205
    {
206
        $this->queue->send($this->getMessage(['key1' => 0, 'key2' => true]));
207
208
        $result = $this->queue->get(['key3' => 0]);
209
        $this->assertSame([], $result);
210
211
        $this->assertSame(1, $this->collection->count());
212
    }
213
214
    /**
215
     * @test
216
     * @covers ::get
217
     *
218
     * @return void
219
     */
220
    public function getWithOverflowResetTimestamp()
221
    {
222
        $this->queue->send($this->getMessage());
223
        $message = $this->queue->get([], ['runningResetDuration' => PHP_INT_MAX])[0];
224
        $this->assertEquals(new UTCDateTime(PHP_INT_MAX), $message->getEarliestGet());
225
    }
226
227
    /**
228
     * @test
229
     * @covers ::get
230
     */
231
    public function getWithNegativePollDuration()
232
    {
233
        $this->queue->send($this->getMessage(['key1' => 0]));
234
        $this->assertNotNull($this->queue->get([], ['pollDurationInMillis' => -1]));
235
    }
236
237
    /**
238
     * @test
239
     * @covers ::get
240
     */
241
    public function getWithNonStringKey()
242
    {
243
        $this->expectException(\InvalidArgumentException::class);
244
        $this->queue->get([0 => 'a value']);
245
    }
246
247
    /**
248
     * @test
249
     * @covers ::get
250
     */
251
    public function getByFullQuery()
252
    {
253
        $messageOne = $this->getMessage(['key1' => 0, 'key2' => true]);
254
255
        $this->queue->send($messageOne);
256
        $this->queue->send($this->getMessage(['key' => 'value']));
257
258
        $result = $this->queue->get($messageOne->getPayload())[0];
259
260
        $this->assertSame((string)$messageOne->getId(), (string)$result->getId());
261
        $this->assertSame($messageOne->getPayload(), $result->getPayload());
262
    }
263
264
    /**
265
     * @test
266
     * @covers ::get
267
     */
268
    public function getBySubDocQuery()
269
    {
270
        $expected = $this->getMessage(
271
            [
272
                'one' => [
273
                    'two' => [
274
                        'three' => 5,
275
                        'notused' => 'notused',
276
                    ],
277
                    'notused' => 'notused',
278
                ],
279
                'notused' => 'notused',
280
            ]
281
        );
282
283
        $this->queue->send($this->getMessage(['key1' => 0, 'key2' => true]));
284
        $this->queue->send($expected);
285
286
        $actual = $this->queue->get(['one.two.three' => ['$gt' => 4]])[0];
287
        $this->assertSameMessage($expected, $actual);
288
    }
289
290
    /**
291
     * @test
292
     * @covers ::get
293
     */
294
    public function getBeforeAck()
295
    {
296
        $messageOne = $this->getMessage(['key1' => 0, 'key2' => true]);
297
298
        $this->queue->send($messageOne);
299
        $this->queue->send($this->getMessage(['key' => 'value']));
300
301
        $this->queue->get($messageOne->getPayload());
302
303
        //try get message we already have before ack
304
        $result = $this->queue->get($messageOne->getPayload());
305
        $this->assertSame([], $result);
306
    }
307
308
    /**
309
     * @test
310
     * @covers ::get
311
     */
312
    public function getWithTimeBasedPriority()
313
    {
314
        $messageOne = $this->getMessage(['key' => 0]);
315
        $messageTwo = $this->getMessage(['key' => 1]);
316
        $messageThree = $this->getMessage(['key' => 2]);
317
318
        $this->queue->send($messageOne);
319
        $this->queue->send($messageTwo);
320
        $this->queue->send($messageThree);
321
322
        $messages = $this->queue->get([], ['maxNumberOfMessages' => 10]);
323
324
        $this->assertSameMessage($messageOne, $messages[0]);
325
        $this->assertSameMessage($messageTwo, $messages[1]);
326
        $this->assertSameMessage($messageThree, $messages[2]);
327
    }
328
329
    /**
330
     * @test
331
     * @covers ::get
332
     */
333
    public function getWait()
334
    {
335
        $start = microtime(true);
336
337
        $this->queue->get([]);
338
339
        $end = microtime(true);
340
341
        $this->assertTrue($end - $start >= 0.200);
342
        $this->assertTrue($end - $start < 0.300);
343
    }
344
345
    /**
346
     * @test
347
     * @covers ::get
348
     */
349
    public function earliestGet()
350
    {
351
        $messageOne = $this->getMessage(
352
            ['key1' => 0, 'key2' => true],
353
            new UTCDateTime((time() + 1) * 1000)
354
        );
355
356
        $this->queue->send($messageOne);
357
358
        $this->assertSame([], $this->queue->get($messageOne->getPayload()));
359
360
        sleep(1);
361
362
        $this->assertCount(1, $this->queue->get($messageOne->getPayload()));
363
    }
364
365
    /**
366
     * @test
367
     * @covers ::count
368
     */
369
    public function countWithNonStringKey()
370
    {
371
        $this->expectException(\InvalidArgumentException::class);
372
        $this->queue->count([0 => 'a value']);
373
    }
374
375
    /**
376
     * @test
377
     * @covers ::count
378
     */
379
    public function testCount()
380
    {
381
        $message = $this->getMessage(['boo' => 'scary']);
382
383
        $this->assertSame(0, $this->queue->count($message->getPayload(), true));
384
        $this->assertSame(0, $this->queue->count($message->getPayload(), false));
385
        $this->assertSame(0, $this->queue->count($message->getPayload()));
386
387
        $this->queue->send($message);
388
        $this->assertSame(1, $this->queue->count($message->getPayload(), false));
389
        $this->assertSame(0, $this->queue->count($message->getPayload(), true));
390
        $this->assertSame(1, $this->queue->count($message->getPayload()));
391
392
        $this->queue->get($message->getPayload());
393
        $this->assertSame(0, $this->queue->count($message->getPayload(), false));
394
        $this->assertSame(1, $this->queue->count($message->getPayload(), true));
395
        $this->assertSame(1, $this->queue->count($message->getPayload()));
396
    }
397
398
    /**
399
     * @test
400
     * @covers ::ack
401
     */
402
    public function ack()
403
    {
404
        $messages = [$this->getMessage(), $this->getMessage()];
405
        $this->sendAllMessages($messages);
406
        $count = $this->collection->count();
407
        $this->assertSame(2, $count);
408
        foreach ($this->queue->get([], ['maxNumberOfMessages' => 10]) as $message) {
409
            $this->queue->ack($message);
410
            $this->assertSame(--$count, $this->collection->count());
411
        }
412
    }
413
414
    /**
415
     * @test
416
     * @covers ::send
417
     */
418
    public function send()
419
    {
420
        $message = $this->getMessage(['key1' => 0, 'key2' => true], new UTCDateTime(34 * 1000), 0.8);
421
        $this->queue->send($message);
422
        $this->assertSingleMessage($message);
423
    }
424
425
    /**
426
     * @test
427
     * @covers ::send
428
     */
429
    public function sendWithHighEarliestGet()
430
    {
431
        $message = $this->getMessage([], new UTCDateTime(PHP_INT_MAX));
432
        $this->queue->send($message);
433
        $this->assertSingleMessage($message);
434
    }
435
436
    /**
437
     * @test
438
     * @covers ::send
439
     */
440
    public function sendWithLowEarliestGet()
441
    {
442
        $message = $this->getMessage([], new UTCDateTime(0));
443
        $this->queue->send($message);
444
        $this->assertSingleMessage($message);
445
    }
446
447
    private function assertSameMessage(Message $expected, Message $actual)
448
    {
449
        $this->assertSame((string)$expected->getId(), (string)$actual->getId());
450
        $this->assertSame($expected->getPayload(), $actual->getPayload());
451
        $this->assertSame($expected->getPriority(), $actual->getPriority());
452
    }
453
454
    private function assertSingleMessage(Message $expected)
455
    {
456
        $this->assertSame(1, $this->collection->count());
457
458
        $actual = $this->collection->findOne(
459
            [],
460
            ['typeMap' => ['root' => 'array', 'document' => 'array', 'array' => 'array']]
461
        );
462
463
        $this->assertSame((string)$expected->getId(), (string)$actual['_id']);
464
        $this->assertSame($expected->getPayload(), $actual['payload']);
465
        $this->assertSame($expected->getPriority(), $actual['priority']);
466
        $this->assertSame(gethostname(), $actual['machineName']);
467
        $this->assertEquals($expected->getEarliestGet(), $actual['earliestGet']);
468
    }
469
470
    private function getMessage(array $payload = [], UTCDateTime $earliestGet = null, float $priority = 0.0)
471
    {
472
        return new Message(
473
            new ObjectId(),
474
            $payload,
475
            $earliestGet ?? new UTCDateTime((int)microtime() * 1000),
476
            $priority
477
        );
478
    }
479
480
    private function sendAllMessages(array $messages)
481
    {
482
        foreach ($messages as $message) {
483
            $this->queue->send($message);
484
        }
485
    }
486
}
487