Completed
Push — master ( f4b85b...244823 )
by Alexandre
02:30
created

DoctrineAdapterTest::testCountMessages()   A

Complexity

Conditions 1
Paths 1

Size

Total Lines 14
Code Lines 7

Duplication

Lines 0
Ratio 0 %
Metric Value
dl 0
loc 14
rs 9.4285
cc 1
eloc 7
nc 1
nop 0
1
2
<?php
3
4
use Heri\Bundle\JobQueueBundle\Tests\TestCase;
5
use Heri\Bundle\JobQueueBundle\Adapter as Adapter;
6
use Heri\Bundle\JobQueueBundle\Service\QueueService;
7
use Heri\Bundle\JobQueueBundle\Command\QueueListenCommand;
8
use Symfony\Bundle\FrameworkBundle\Console\Application;
9
use Symfony\Component\Console\Output\ConsoleOutput;
10
11
class DoctrineAdapterTest extends TestCase
12
{
13
    /**
14
     * @var QueueService
15
     */
16
    protected $queue;
17
18
    /**
19
     * @var string
20
     */
21
    protected $queueName = 'my:queue';
22
23
    /**
24
     * @var int
25
     */
26
    protected $maxMessages = 1;
27
28
    /**
29
     * @var bool
30
     */
31
    protected $verbose = true;
32
33
    /**
34
     * {@inheritdoc}
35
     */
36
    public function setUp()
37
    {
38
        parent::setUp();
39
40
        if (!class_exists('Doctrine\ORM\EntityManager')) {
41
            $this->markTestSkipped('Doctrine not installed');
42
        }
43
44
        $adapter = new Adapter\DoctrineAdapter(array());
45
        $adapter->em = $this->em;
46
47
        $this->queue = new QueueService(
48
            $this->container->get('logger'),
49
            $this->container->getParameter('jobqueue.config')
50
        );
51
        $this->queue->adapter = $adapter;
52
        $this->queue->attach($this->queueName.'1');
53
54
        $application = new Application($this->kernel);
55
        $application->add(new QueueListenCommand());
56
57
        $command = $application->find('jobqueue:listen');
58
        $this->queue->setCommand($command);
59
60
        if ($this->verbose) {
61
            $this->queue->setOutput(new ConsoleOutput());
62
        }
63
    }
64
65
    public function testPushAndReceive()
66
    {
67
        $queue1 = $this->em
68
            ->getRepository('Heri\Bundle\JobQueueBundle\Entity\Queue')
69
            ->findOneByName($this->queueName.'1');
70
        $this->assertNotNull($queue1, 'Queue created');
71
72
        // Queue 1 list command
73
        $command1 = [
74
            'command' => 'list',
75
        ];
76
        $this->queue->push($command1);
77
78
        $messages = $this->getMessages($queue1);
79
        $this->assertEquals(1, count($messages), 'Count number of messages');
80
81
        $message1 = $this->em
82
            ->getRepository('Heri\Bundle\JobQueueBundle\Entity\Message')
83
            ->find(1);
84
        $this->assertEquals($this->queueName.'1', $message1->getQueue()->getName(), 'Message1 queue relation check');
85
        $this->assertEquals($command1, json_decode($message1->getBody(), true), 'Verify encoded message in table');
86
        $this->assertEquals(0, $message1->getPriority(), 'Message1 priority');
87
        $this->assertEquals(false, $message1->getEnded(), 'Message1 no ended');
88
        $this->assertEquals(false, $message1->getFailed(), 'Message1 no failed');
89
90
        $queue1 = $this->em
91
            ->getRepository('Heri\Bundle\JobQueueBundle\Entity\Queue')
92
            ->findOneByName($this->queueName.'1');
93
        $this->assertNotNull($queue1, 'Queue created');
94
95
        // Queue 1 demo:great command
96
        $command2 = [
97
            'command' => 'demo:great',
98
            'argument' => [
99
                'name' => 'Alexandre',
100
                '--yell' => true,
101
            ],
102
        ];
103
        $this->queue
104
            ->highPriority()
105
            ->push($command2);
106
107
        $messages = $this->getMessages($queue1);
108
        $this->assertEquals(2, count($messages), 'Count number of messages');
109
        $message2 = $this->em
110
            ->getRepository('Heri\Bundle\JobQueueBundle\Entity\Message')
111
            ->find(2);
112
        $this->assertEquals($this->queueName.'1', $message2->getQueue()->getName(), 'Message2 queue relation check');
113
        $this->assertEquals($command2, json_decode($message2->getBody(), true), 'Verify encoded message in table');
114
        $this->assertEquals(1, $message2->getPriority(), 'Message2 priority');
115
        $this->assertEquals(false, $message2->getEnded(), 'Message2 no ended');
116
        $this->assertEquals(false, $message2->getFailed(), 'Message2 no failed');
117
118
        // Run demo:great command using directly receive method (priority 1)
119
        $this->queue->receive($this->maxMessages);
120
121
        $exceptions = $this->getMessageLogs();
122
        $this->assertEquals(1, count($exceptions), 'Exception logged from demo:great');
123
124
        $messages = $this->getMessages($queue1);
125
        $this->assertEquals(2, count($messages), '2 pending message not yet handled');
126
127
        // Run list command using listen method  (priority 0)
128
        $this->queue->listen($this->queueName.'1');
129
130
        $exceptions = $this->getMessageLogs();
131
        $this->assertEquals(1, count($exceptions), 'No more exception logged');
132
133
        $messages = $this->getMessages($queue1);
134
        $this->assertEquals(1, count($messages), '1 pending message left');
135
136
        $message2 = $this->em
137
            ->getRepository('Heri\Bundle\JobQueueBundle\Entity\Message')
138
            ->find(2);
139
        $this->assertEquals(false, $message2->getEnded(), 'Message2 no ended');
140
        $this->assertEquals(true, $message2->getFailed(), 'Message2 failed');
141
142
        $exceptions = $this->getMessageLogs();
143
        $this->assertEquals(1, count($exceptions), '1 exception logged');
144
145
        $exception = $this->em
146
            ->getRepository('Heri\Bundle\JobQueueBundle\Entity\MessageLog')
147
            ->find(1);
148
        $this->assertRegExp('/There are no commands defined in the "demo" namespace/', $exception->getLog(), 'Logged exception in database');
149
    }
150
151
    public function testRetryCounter()
152
    {
153
        $queue1 = $this->em
154
            ->getRepository('Heri\Bundle\JobQueueBundle\Entity\Queue')
155
            ->findOneByName($this->queueName.'1');
156
        $queue1->setTimeout(0);
157
        $this->assertNotNull($queue1, 'Queue created');
158
        $this->em->persist($queue1);
159
        $this->em->flush();
160
161
        $messages = $this->getMessages($queue1);
162
        $this->assertEquals(0, count($messages), 'Count number of messages');
163
164
        // Queue 1 demo:great command
165
        $command2 = [
166
            'command' => 'demo:great',
167
            'argument' => [
168
                'name' => 'Alexandre',
169
                '--yell' => true,
170
            ],
171
        ];
172
        $this->queue
173
            ->highPriority()
174
            ->push($command2)
175
        ;
176
177
        $messages = $this->getMessages($queue1);
178
        $this->assertEquals(1, count($messages), 'Count number of messages');
179
        $this->assertEquals(false, $messages[0]->getFailed());
180
        $this->assertEquals(0, $messages[0]->getNumRetries());
181
        $this->assertNull($messages[0]->getTimeout());
182
        $exceptions = $this->getMessageLogs();
183
        $this->assertEquals(0, count($exceptions), 'Exception logged from demo:great');
184
185
        // 1st try
186
        $this->queue->receive($this->maxMessages);
187
        $messages = $this->getMessages($queue1);
188
        $this->assertEquals(true, $messages[0]->getFailed());
189
        $this->assertEquals(0, $messages[0]->getNumRetries());
190
        $this->assertNotNull($messages[0]->getTimeout());
191
        $previousTimeout = $messages[0]->getTimeout();
192
        $exceptions = $this->getMessageLogs();
193
        $this->assertEquals(1, count($exceptions), 'Exception logged from demo:great');
194
195
        // 2nd try (1st retry)
196
        sleep(1); // bypass timeout
197
        $this->queue->receive($this->maxMessages, 0);
198
        $messages = $this->getMessages($queue1);
199
        $this->assertEquals(true, $messages[0]->getFailed());
200
        $this->assertEquals(1, $messages[0]->getNumRetries());
201
        $this->assertGreaterThan($previousTimeout, $messages[0]->getTimeout());
202
        $previousTimeout = $messages[0]->getTimeout();
203
        $exceptions = $this->getMessageLogs();
204
        $this->assertEquals(2, count($exceptions), 'Exception logged from demo:great');
205
206
        // 3rd try (2nd retry)
207
        sleep(1); // bypass timeout
208
        $this->queue->receive($this->maxMessages, 0);
209
        $messages = $this->getMessages($queue1);
210
        $this->assertEquals(true, $messages[0]->getFailed());
211
        $this->assertEquals(2, $messages[0]->getNumRetries());
212
        $this->assertGreaterThan($previousTimeout, $messages[0]->getTimeout());
213
        $previousTimeout = $messages[0]->getTimeout();
214
        $exceptions = $this->getMessageLogs();
215
        $this->assertEquals(3, count($exceptions), 'Exception logged from demo:great');
216
217
        // 4th try (3rd retry)
218
        sleep(1); // bypass timeout
219
        $this->queue->receive($this->maxMessages, 0);
220
        $messages = $this->getMessages($queue1);
221
        $this->assertEquals(true, $messages[0]->getFailed());
222
        $this->assertEquals(3, $messages[0]->getNumRetries());
223
        $this->assertGreaterThan($previousTimeout, $messages[0]->getTimeout());
224
        $exceptions = $this->getMessageLogs();
225
        $this->assertEquals(4, count($exceptions), 'Exception logged from demo:great');
226
    }
227
228
    public function testMaxRetries()
229
    {
230
        $queue1 = $this->em
231
            ->getRepository('Heri\Bundle\JobQueueBundle\Entity\Queue')
232
            ->findOneByName($this->queueName.'1');
233
        $queue1->setTimeout(0);
234
        $queue1->setMaxRetries(2);
235
        $this->assertNotNull($queue1, 'Queue created');
236
        $this->em->persist($queue1);
237
        $this->em->flush();
238
239
        $messages = $this->getMessages($queue1);
240
        $this->assertEquals(0, count($messages), 'Count number of messages');
241
242
        // Queue 1 demo:great command
243
        $command2 = [
244
            'command' => 'demo:great',
245
            'argument' => [
246
                'name' => 'Alexandre',
247
                '--yell' => true,
248
            ],
249
        ];
250
        $this->queue
251
            ->highPriority()
252
            ->push($command2)
253
        ;
254
255
        $messages = $this->getMessages($queue1);
256
        $this->assertEquals(1, count($messages), 'Count number of messages');
257
        $this->assertEquals(false, $messages[0]->getFailed());
258
        $this->assertEquals(0, $messages[0]->getNumRetries());
259
        $this->assertNull($messages[0]->getTimeout());
260
        $exceptions = $this->getMessageLogs();
261
        $this->assertEquals(0, count($exceptions), 'Exception logged from demo:great');
262
263
        // 1st try
264
        $this->queue->receive($this->maxMessages);
265
        $messages = $this->getMessages($queue1);
266
        $this->assertEquals(true, $messages[0]->getFailed());
267
        $this->assertEquals(0, $messages[0]->getNumRetries());
268
        $this->assertNotNull($messages[0]->getTimeout());
269
        $previousTimeout = $messages[0]->getTimeout();
270
        $exceptions = $this->getMessageLogs();
271
        $this->assertEquals(1, count($exceptions), 'Exception logged from demo:great');
272
273
        // 2nd try (1st retry)
274
        sleep(1); // bypass timeout
275
        $this->queue->receive($this->maxMessages, 0);
276
        $messages = $this->getMessages($queue1);
277
        $this->assertEquals(true, $messages[0]->getFailed());
278
        $this->assertEquals(1, $messages[0]->getNumRetries());
279
        $this->assertGreaterThan($previousTimeout, $messages[0]->getTimeout());
280
        $previousTimeout = $messages[0]->getTimeout();
281
        $exceptions = $this->getMessageLogs();
282
        $this->assertEquals(2, count($exceptions), 'Exception logged from demo:great');
283
284
        // 3rd try (2nd retry)
285
        sleep(1); // bypass timeout
286
        $this->queue->receive($this->maxMessages, 0);
287
        $messages = $this->getMessages($queue1);
288
        $this->assertEquals(true, $messages[0]->getFailed());
289
        $this->assertEquals(2, $messages[0]->getNumRetries());
290
        $this->assertGreaterThan($previousTimeout, $messages[0]->getTimeout());
291
        $previousTimeout = $messages[0]->getTimeout();
292
        $exceptions = $this->getMessageLogs();
293
        $this->assertEquals(3, count($exceptions), 'Exception logged from demo:great');
294
295
        // no more retries for this message
296
        sleep(1); // bypass timeout
297
        $this->queue->receive($this->maxMessages, 0);
298
        $messages = $this->getMessages($queue1);
299
        $this->assertEquals(true, $messages[0]->getFailed());
300
        $this->assertEquals(2, $messages[0]->getNumRetries());
301
        $this->assertEquals($previousTimeout, $messages[0]->getTimeout());
302
        $exceptions = $this->getMessageLogs();
303
        $this->assertEquals(3, count($exceptions), 'Exception logged from demo:great');
304
    }
305
306
    public function testDuplicatedMessages()
307
    {
308
        $queue1 = $this->em
309
            ->getRepository('Heri\Bundle\JobQueueBundle\Entity\Queue')
310
            ->findOneByName($this->queueName.'1');
311
        $this->assertNotNull($queue1, 'Queue created');
312
313
        // Queue 1 list command
314
        $command1 = [
315
            'command' => 'list',
316
        ];
317
318
        // Push $command1
319
        $this->queue->push($command1);
320
        $messages = $this->getMessages($queue1);
321
        $this->assertEquals(1, count($messages), 'Count number of messages');
322
323
        // Re Push same command $command1
324
        $this->queue->push($command1);
325
        $messages = $this->getMessages($queue1);
326
        $this->assertEquals(1, count($messages), 'Count number of messages');
327
328
        // Push $command2
329
        $command2 = [
330
            'command' => 'list2',
331
        ];
332
        $this->queue->push($command2);
333
        $messages = $this->getMessages($queue1);
334
        $this->assertEquals(2, count($messages), 'Count number of messages');
335
    }
336
337
    public function testCountMessages()
338
    {
339
        $count1 = $this->queue->count();
340
341
        // Queue list command
342
        $command1 = [
343
            'command' => 'list',
344
        ];
345
        $this->queue->push($command1);
346
347
        $count2 = $this->queue->count();
348
349
        $this->assertEquals($count1 + 1, $count2, 'countMessages retrieve added message');
350
    }
351
352
    protected function getMessages()
353
    {
354
        $this->em->clear();
355
356
        // Search for all messages inside our timeout
357
        $query = $this->em->createQuery("
358
            SELECT m
359
            FROM Heri\Bundle\JobQueueBundle\Entity\Message m
360
            LEFT JOIN m.queue q
361
            WHERE (q.name = :queue_name)
362
        ");
363
        $query->setParameter('queue_name', $this->queueName.'1');
364
365
        return $query->getResult();
366
    }
367
368
    protected function getMessageLogs()
369
    {
370
        $this->em->clear();
371
372
        return $this->em
373
            ->getRepository('Heri\Bundle\JobQueueBundle\Entity\MessageLog')
374
            ->findAll();
375
    }
376
}
377