Completed
Push — master ( 4cd93a...e8f4ec )
by Thomas Mauro
10:51
created

AbstractAdapterTestSuite::createExchangeProvider()   B

Complexity

Conditions 1
Paths 1

Size

Total Lines 27
Code Lines 18

Duplication

Lines 0
Ratio 0 %

Importance

Changes 1
Bugs 1 Features 0
Metric Value
c 1
b 1
f 0
dl 0
loc 27
rs 8.8571
cc 1
eloc 18
nc 1
nop 0
1
<?php
2
3
namespace AMQPAL\FunctionalTest\Adapter;
4
5
use AMQPAL\Adapter\AdapterInterface;
6
use AMQPAL\Adapter\QueueInterface;
7
use AMQPAL\FunctionalTest\Exception\TimeoutException;
8
use RabbitMq\ManagementApi\Client;
9
use AMQPAL\Adapter\ChannelInterface;
10
use AMQPAL\Adapter\Message;
11
use AMQPAL\Options\ExchangeOptions;
12
use AMQPAL\Options\QueueOptions;
13
14
abstract class AbstractAdapterTestSuite extends \PHPUnit_Framework_TestCase
15
{
16
    /**
17
     * @var array
18
     */
19
    protected $connectionOptions = [
20
        'host' => 'localhost',
21
        'port' => 5672,
22
        'username' => 'guest',
23
        'password' => 'guest',
24
        'vhost' => 'test'
25
    ];
26
27
    /**
28
     * @var array
29
     */
30
    protected $managementOptions = [
31
        'host' => 'localhost',
32
        'port' => 15672
33
    ];
34
    
35
    /**
36
     * @var AdapterInterface
37
     */
38
    protected $adapter;
39
    /**
40
     * @var Client
41
     */
42
    protected $client;
43
44
    public function __construct($name = null, array $data = [], $dataName = '')
45
    {
46
        $optionsFile = __DIR__ . '/../../test.options.php';
47
        if (file_exists($optionsFile)) {
48
            $options = include $optionsFile;
49
        } else {
50
            $options = include __DIR__ . '/../../test.options.php.dist';
51
        }
52
        
53
        $this->connectionOptions = $options['connection'];
54
        $this->managementOptions = $options['management'];
55
        
56
        parent::__construct($name, $data, $dataName);
57
    }
58
59
60
    public function setUp()
61
    {
62
        parent::setUp();
63
64
        $this->client =  new Client(
65
            null,
66
            sprintf('http://%s:%d', $this->managementOptions['host'], $this->managementOptions['port'])
67
        );
68
69
        $this->cleanAll($this->client);
70
    }
71
72
    /**
73
     * @dataProvider createExchangeProvider
74
     * @param array $exchangeOptionsArray
75
     */
76
    public function testCreateExchange(array $exchangeOptionsArray)
77
    {
78
        $channel = $this->adapter->createChannel();
79
        static::assertInstanceOf(ChannelInterface::class, $channel);
80
81
        $exchangeOptions = new ExchangeOptions($exchangeOptionsArray);
82
83
        $exchange = $channel->createExchange($exchangeOptions);
84
        $exchange->declareExchange();
85
86
        $exchangeInfo = $this->client->exchanges()->get('test', 'exchange-name');
87
88
        static::assertInternalType('array', $exchangeInfo);
89
        static::assertEquals($exchangeOptionsArray['name'], $exchangeInfo['name']);
90
        static::assertEquals($exchangeOptionsArray['type'], $exchangeInfo['type']);
91
        static::assertEquals($exchangeOptionsArray['durable'], $exchangeInfo['durable']);
92
        static::assertEquals($exchangeOptionsArray['auto_delete'], $exchangeInfo['auto_delete']);
93
        static::assertEquals($exchangeOptionsArray['internal'], $exchangeInfo['internal']);
94
        static::assertEquals($exchangeOptionsArray['arguments'], $exchangeInfo['arguments']);
95
    }
96
97
    /**
98
     * @dataProvider createQueueProvider
99
     * @param array $queueOptionsArray
100
     */
101
    public function testCreateQueue(array $queueOptionsArray)
102
    {
103
        $channel = $this->adapter->createChannel();
104
        static::assertInstanceOf(ChannelInterface::class, $channel);
105
106
        $queueOptions = new QueueOptions($queueOptionsArray);
107
108
        $queue = $channel->createQueue($queueOptions);
109
        $queue->declareQueue();
110
111
        $queueInfo = $this->client->queues()->get('test', 'queue-name');
112
113
        static::assertInternalType('array', $queueInfo);
114
        static::assertEquals($queueOptionsArray['name'], $queueInfo['name']);
115
        static::assertEquals($queueOptionsArray['durable'], $queueInfo['durable']);
116
        static::assertEquals($queueOptionsArray['auto_delete'], $queueInfo['auto_delete']);
117
        static::assertEquals($queueOptionsArray['exclusive'], $queueInfo['exclusive']);
118
        static::assertEquals($queueOptionsArray['arguments'], $queueInfo['arguments']);
119
    }
120
121
    public function testPublishAndGet()
122
    {
123
        $channel = $this->adapter->createChannel();
124
        $exchangeOptions = new ExchangeOptions([
125
            'name' => 'exchange-name',
126
            'type' => 'direct'
127
        ]);
128
        $queueOptions = new QueueOptions([
129
            'name' => 'queue-name',
130
        ]);
131
132
        $exchange = $channel->createExchange($exchangeOptions);
133
        $exchange->declareExchange();
134
135
        $queue = $channel->createQueue($queueOptions);
136
        $queue->declareQueue();
137
138
        $queue->bind('exchange-name');
139
140
        $exchange->publish('foo');
141
142
        $message = $this->doUntil(
143
            function () use ($queue) {
144
                return $queue->get();
145
            },
146
            function ($ret) {
147
                return null !== $ret;
148
            },
149
            5,
150
            10000
151
        );
152
153
        static::assertInstanceOf(Message::class, $message);
154
        static::assertEquals('foo', $message->getBody());
155
156
        $client = $this->client;
157
        $queueInfo = $this->doUntil(
158
            function () use ($client) {
159
                return $client->queues()->get('test', 'queue-name');
160
            },
161
            function ($ret) {
162
                return is_array($ret)
163
                && 1 === $ret['messages']
164
                && 1 === $ret['messages_unacknowledged'];
165
            },
166
            5,
167
            10000
168
        );
169
170
        static::assertEquals(1, $queueInfo['messages']);
171
        static::assertEquals(1, $queueInfo['messages_unacknowledged']);
172
    }
173
174
    public function testPublishAndGetWithAutoAck()
175
    {
176
        $channel = $this->adapter->createChannel();
177
        $exchangeOptions = new ExchangeOptions([
178
            'name' => 'exchange-name',
179
            'type' => 'direct'
180
        ]);
181
        $queueOptions = new QueueOptions([
182
            'name' => 'queue-name',
183
        ]);
184
185
        $exchange = $channel->createExchange($exchangeOptions);
186
        $exchange->declareExchange();
187
188
        $queue = $channel->createQueue($queueOptions);
189
        $queue->declareQueue();
190
191
        $queue->bind('exchange-name');
192
193
        $exchange->publish('foo');
194
195
        $message = $this->doUntil(
196
            function () use ($queue) {
197
                return $queue->get(true);
198
            },
199
            function ($ret) {
200
                return null !== $ret;
201
            },
202
            10,
203
            10000
204
        );
205
206
        static::assertInstanceOf(Message::class, $message);
207
        static::assertEquals('foo', $message->getBody());
208
209
        $client = $this->client;
210
211
        $queueInfo = $this->doUntil(
212
            function () use ($client) {
213
                return $client->queues()->get('test', 'queue-name');
214
            },
215
            function ($ret) {
216
                return is_array($ret)
217
                && 0 === $ret['messages']
218
                && 0 === $ret['messages_unacknowledged'];
219
            },
220
            10,
221
            10000
222
        );
223
224
        static::assertEquals(0, $queueInfo['messages']);
225
        static::assertEquals(0, $queueInfo['messages_unacknowledged']);
226
    }
227
228
    public function testConsumeWithAck()
229
    {
230
        $channel = $this->adapter->createChannel();
231
        $exchangeOptions = new ExchangeOptions([
232
            'name' => 'exchange-name',
233
            'type' => 'direct'
234
        ]);
235
        $queueOptions = new QueueOptions([
236
            'name' => 'queue-name',
237
        ]);
238
239
        $exchange = $channel->createExchange($exchangeOptions);
240
        $exchange->declareExchange();
241
242
        $queue = $channel->createQueue($queueOptions);
243
        $queue->declareQueue();
244
245
        $queue->bind('exchange-name');
246
247
        $messages = ['foo', 'bar'];
248
249
        foreach ($messages as $message) {
250
            $exchange->publish($message);
251
        }
252
253
        $messageReceived = [];
254
255
        $callback = function (Message $message, QueueInterface $queue) use (&$messageReceived, $messages) {
256
            $messageReceived[] = $message;
257
            $queue->ack($message->getDeliveryTag());
258
            if (count($messageReceived) >= count($messages)) {
259
                return false;
260
            }
261
        };
262
263
        $queue->consume($callback, false, false, false, 'consumer-tag');
264
265
        static::assertCount(count($messages), $messageReceived);
266
267
        $client = $this->client;
268
        $queueInfo = $this->doUntil(
269
            function () use ($client) {
270
                return $client->queues()->get('test', 'queue-name');
271
            },
272
            function ($ret) {
273
                return is_array($ret)
274
                && 0 === $ret['messages']
275
                && 0 === $ret['messages_unacknowledged'];
276
            },
277
            10,
278
            5000
279
        );
280
281
        static::assertEquals(0, $queueInfo['messages']);
282
        static::assertEquals(0, $queueInfo['messages_unacknowledged']);
283
    }
284
285
    public function testConsumeWithAutoAck()
286
    {
287
        $channel = $this->adapter->createChannel();
288
        $exchangeOptions = new ExchangeOptions([
289
            'name' => 'exchange-name',
290
            'type' => 'direct'
291
        ]);
292
        $queueOptions = new QueueOptions([
293
            'name' => 'queue-name',
294
        ]);
295
296
        $exchange = $channel->createExchange($exchangeOptions);
297
        $exchange->declareExchange();
298
299
        $queue = $channel->createQueue($queueOptions);
300
        $queue->declareQueue();
301
302
        $queue->bind('exchange-name');
303
304
        $messages = ['foo', 'bar'];
305
306
        foreach ($messages as $message) {
307
            $exchange->publish($message);
308
        }
309
310
        $messageReceived = [];
311
312
        $callback = function (Message $message, QueueInterface $queue) use (&$messageReceived, $messages) {
0 ignored issues
show
Unused Code introduced by
The parameter $queue is not used and could be removed.

This check looks from parameters that have been defined for a function or method, but which are not used in the method body.

Loading history...
313
            $messageReceived[] = $message;
314
            if (count($messageReceived) >= count($messages)) {
315
                return false;
316
            }
317
        };
318
319
        $queue->consume($callback, false, true, false, 'consumer-tag');
320
321
        static::assertCount(count($messages), $messageReceived);
322
323
        $client = $this->client;
324
        $queueInfo = $this->doUntil(
325
            function () use ($client) {
326
                return $client->queues()->get('test', 'queue-name');
327
            },
328
            function ($ret) {
329
                return is_array($ret)
330
                && 0 === $ret['messages']
331
                && 0 === $ret['messages_unacknowledged'];
332
            },
333
            10,
334
            5000
335
        );
336
337
        static::assertEquals(0, $queueInfo['messages']);
338
        static::assertEquals(0, $queueInfo['messages_unacknowledged']);
339
    }
340
341
    public function testConsumeWithAutoRejectAndRequeue()
342
    {
343
        $channel = $this->adapter->createChannel();
344
        $exchangeOptions = new ExchangeOptions([
345
            'name' => 'exchange-name',
346
            'type' => 'direct'
347
        ]);
348
        $queueOptions = new QueueOptions([
349
            'name' => 'queue-name',
350
        ]);
351
352
        $exchange = $channel->createExchange($exchangeOptions);
353
        $exchange->declareExchange();
354
355
        $queue = $channel->createQueue($queueOptions);
356
        $queue->declareQueue();
357
358
        $queue->bind('exchange-name');
359
360
        $messages = ['foo', 'bar'];
361
362
        foreach ($messages as $message) {
363
            $exchange->publish($message);
364
        }
365
366
        $messageReceived = [];
367
368
        $callback = function (Message $message, QueueInterface $queue) use (&$messageReceived, $messages) {
369
            $messageReceived[] = $message;
370
            $queue->reject($message->getDeliveryTag(), true);
371
            if (count($messageReceived) >= count($messages)) {
372
                return false;
373
            }
374
        };
375
376
        $queue->consume($callback, false, false, false, 'consumer-tag');
377
378
        static::assertCount(count($messages), $messageReceived);
379
380
        $client = $this->client;
381
        $queueInfo = $this->doUntil(
382
            function () use ($client) {
383
                return $client->queues()->get('test', 'queue-name');
384
            },
385
            function ($ret) {
386
                return is_array($ret)
387
                && 2 === $ret['messages']
388
                && 2 === $ret['messages_unacknowledged'];
389
            },
390
            10,
391
            5000
392
        );
393
394
        static::assertEquals(2, $queueInfo['messages']);
395
        static::assertEquals(2, $queueInfo['messages_unacknowledged']);
396
    }
397
    
398
    /**
399
     * @param callable $doFunction
400
     * @param callable $until
401
     * @param float  $timeout
402
     * @param null     $usleep
403
     * @return mixed
404
     */
405
    public function doUntil(callable $doFunction, callable $until, $timeout, $usleep = null)
406
    {
407
        $startTime = microtime(true);
408
        while (true) {
409
            $ret = $doFunction();
410
            $valid = $until($ret);
411
            if ($valid) {
412
                return $ret;
413
            }
414
            if (microtime(true) - $startTime > $timeout) {
415
                throw new TimeoutException('Timeout');
416
            } elseif ($usleep) {
417
                usleep($usleep);
418
            }
419
        }
420
    }
421
422
    public function createExchangeProvider()
423
    {
424
        return [
425
            [
426
                [
427
                    'name' => 'exchange-name',
428
                    'type' => 'fanout',
429
                    'passive' => false,
430
                    'durable' => false,
431
                    'auto_delete' => true,
432
                    'internal' => false,
433
                    'no_wait' => false,
434
                    'arguments' => []
435
                ],
436
                [
437
                    'name' => 'exchange-name',
438
                    'type' => 'fanout',
439
                    'passive' => false,
440
                    'durable' => true,
441
                    'auto_delete' => false,
442
                    'internal' => true,
443
                    'no_wait' => true,
444
                    'arguments' => ['foo' => 'bar']
445
                ]
446
            ]
447
        ];
448
    }
449
450
    public function createQueueProvider()
451
    {
452
        return [
453
            [
454
                [
455
                    'name' => 'queue-name',
456
                    'passive' => false,
457
                    'exclusive' => false,
458
                    'durable' => false,
459
                    'auto_delete' => true,
460
                    'arguments' => []
461
                ],
462
                [
463
                    'name' => 'queue-name',
464
                    'passive' => false,
465
                    'exclusive' => true,
466
                    'durable' => true,
467
                    'auto_delete' => false,
468
                    'arguments' => ['foo' => 'bar']
469
                ]
470
            ]
471
        ];
472
    }
473
474
    /**
475
     * Clean rabbitmq
476
     *
477
     * @param Client $client
478
     */
479
    protected function cleanAll(Client $client)
480
    {
481
        $exchanges = $client->exchanges()->all();
482
        foreach ($exchanges as $exchange) {
483
            if ('' === $exchange['name'] || 0 === strpos($exchange['name'], 'amq')) {
484
                continue;
485
            }
486
            $client->exchanges()->delete($exchange['vhost'], $exchange['name']);
487
        }
488
489
        $queues = $client->queues()->all();
490
        foreach ($queues as $queue) {
491
            $client->queues()->delete($queue['vhost'], $queue['name']);
492
        }
493
494
        $connections = $client->connections()->all();
495
        foreach ($connections as $connection) {
496
            $client->connections()->delete($connection['name']);
497
        }
498
    }
499
500
    public function tearDown()
501
    {
502
        try {
503
            $this->adapter->getConnection()->disconnect();
504
        } catch (\Exception $e) {
505
            // ignore
506
        }
507
508
        parent::tearDown();
509
    }
510
}
511