Passed
Pull Request — master (#57)
by Matthew
07:32
created

DoctrineJobManagerTest::testpruneExceptionJobs()   B

Complexity

Conditions 1
Paths 1

Size

Total Lines 90
Code Lines 72

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
dl 0
loc 90
rs 8.5454
c 0
b 0
f 0
cc 1
eloc 72
nc 1
nop 0

How to fix   Long Method   

Long Method

Small methods make your code easier to understand, in particular if combined with a good name. Besides, if your method is small, finding a good name is usually much easier.

For example, if you find yourself adding comments to a method's body, this is usually a good sign to extract the commented part to a new method, and use the comment as a starting point when coming up with a good name for this new method.

Commonly applied refactorings include:

1
<?php
2
3
namespace Dtc\QueueBundle\Tests\Doctrine;
4
5
use Doctrine\ODM\MongoDB\DocumentManager;
6
use Doctrine\ORM\EntityManager;
7
use Dtc\QueueBundle\Doctrine\DoctrineJobManager;
8
use Dtc\QueueBundle\Doctrine\DtcQueueListener;
9
use Dtc\QueueBundle\Manager\StallableJobManager;
10
use Dtc\QueueBundle\Model\BaseJob;
11
use Dtc\QueueBundle\Model\Job;
12
use Dtc\QueueBundle\Model\RetryableJob;
13
use Dtc\QueueBundle\Tests\Manager\AutoRetryTrait;
14
use Dtc\QueueBundle\Tests\Manager\PriorityTestTrait;
15
use Dtc\QueueBundle\Model\StallableJob;
16
use Dtc\QueueBundle\Tests\FibonacciWorker;
17
use Dtc\QueueBundle\Tests\Manager\BaseJobManagerTest;
18
use Dtc\QueueBundle\ODM\JobManager;
19
use Dtc\QueueBundle\Tests\Manager\RetryableTrait;
20
use Dtc\QueueBundle\Tests\ORM\JobManagerTest;
21
use Dtc\QueueBundle\Util\Util;
22
use Symfony\Component\DependencyInjection\Container;
23
use Symfony\Component\DependencyInjection\ParameterBag\ParameterBag;
24
25
/**
26
 * @author David
27
 *
28
 * This test requires local mongodb running
29
 */
30
abstract class DoctrineJobManagerTest extends BaseJobManagerTest
31
{
32
    use PriorityTestTrait;
33
    use AutoRetryTrait;
34
    use RetryableTrait;
35
36
    protected static $dtcQueueListener;
37
38
    /** @var DocumentManager|EntityManager */
39
    protected static $objectManager;
40
    protected static $objectName;
41
    protected static $archiveObjectName;
42
    protected static $runClass;
43
    protected static $runArchiveClass;
44
    protected static $jobTimingClass;
45
    protected static $jobManagerClass;
46
    protected static $runManagerClass;
47
    protected static $jobTimingManagerClass;
48
    public static $container;
49
    public static $runManager;
50
51
    public static function setUpBeforeClass()
52
    {
53
        self::$jobTimingManager = new self::$jobTimingManagerClass(self::$objectManager, self::$jobTimingClass, true);
54
        self::$runManager = new self::$runManagerClass(self::$objectManager, self::$runClass, self::$runArchiveClass);
55
        /** @var JobManager|\Dtc\QueueBundle\ORM\JobManager $jobManager */
56
        $jobManager = new self::$jobManagerClass(self::$runManager, self::$jobTimingManager, self::$objectManager, self::$objectName, self::$archiveObjectName);
57
        self::$jobManager = $jobManager;
58
        $jobManager->setMaxPriority(255);
59
60
        self::assertEquals(255, $jobManager->getMaxPriority());
61
        self::assertEquals(JobManager::PRIORITY_DESC, $jobManager->getPriorityDirection());
62
        $jobManager->setPriorityDirection(JobManager::PRIORITY_ASC);
63
        self::assertEquals(JobManager::PRIORITY_ASC, $jobManager->getPriorityDirection());
64
        $jobManager->setPriorityDirection(JobManager::PRIORITY_DESC);
65
66
        $parameters = new ParameterBag();
67
68
        $container = new Container($parameters);
69
        $container->set('dtc_queue.manager.job', $jobManager);
70
        $container->set('dtc_queue.manager.run', self::$runManager);
71
        self::$container = $container;
72
        self::$dtcQueueListener = new DtcQueueListener($jobManager->getJobArchiveClass(), self::$runManager->getRunArchiveClass());
73
        self::$objectManager->getEventManager()->addEventListener('preUpdate', self::$dtcQueueListener);
74
        self::$objectManager->getEventManager()->addEventListener('prePersist', self::$dtcQueueListener);
75
        self::$objectManager->getEventManager()->addEventListener('preRemove', self::$dtcQueueListener);
76
77
        self::$worker = new FibonacciWorker();
78
79
        parent::setUpBeforeClass();
80
    }
81
82
    public static function tearDownAfterClass()
83
    {
84
        self::$objectManager->getEventManager()->removeEventListener('preUpdate', self::$dtcQueueListener);
85
        self::$objectManager->getEventManager()->removeEventListener('prePersist', self::$dtcQueueListener);
86
        self::$objectManager->getEventManager()->removeEventListener('preRemove', self::$dtcQueueListener);
87
        parent::tearDownAfterClass();
88
    }
89
90
    public function testOrdering()
91
    {
92
        // priority when at
93
        /** @var DoctrineJobManager $jobManager */
94
        $jobManager = self::$jobManager;
95
96
        $time1 = time() - 2;
97
        $dateTime1 = new \DateTime("@$time1");
98
99
        $time2 = time();
100
        $dateTime2 = new \DateTime("@$time2");
101
102
        /** @var Job $job */
103
        $job = new static::$jobClass(static::$worker, false, null);
104
        $job->fibonacci(1);
105
        $job->setWhenAt($dateTime1);
106
        $job->setPriority(3);
107
        $id = $job->getId();
108
109
        $job2 = new static::$jobClass(static::$worker, false, null);
110
        $job2->setPriority(1);
111
        $job2->setWhenAt($dateTime2);
112
        $job2->fibonacci(1);
113
        $id2 = $job2->getId();
114
115
        $job3 = new static::$jobClass(static::$worker, false, null);
116
        $job3->setPriority(1);
117
        $job3->setWhenAt($dateTime1);
118
        $job3->fibonacci(1);
119
        $id3 = $job3->getId();
120
121
        $job4 = new static::$jobClass(static::$worker, false, null);
122
        $job4->setPriority(1);
123
        $job4->setWhenAt($dateTime2);
124
        $job4->fibonacci(1);
125
        $id4 = $job4->getId();
126
127
        $nextJob = $jobManager->getJob();
128
        static::assertEquals($id3, $nextJob->getId());
129
        $nextNextJob = $jobManager->getJob();
130
        $nextNextId = $nextNextJob->getId();
131
        static::assertTrue($id4 == $nextNextId || $id2 == $nextNextId, "$nextNextId not equals $id4 or $id2, could be $id or $id3");
132
133
        static::assertNotNull($jobManager->getJob());
134
        static::assertNotNull($jobManager->getJob());
135
136
        // non-priority when at
137
        $time1 = time() - 2;
138
        $dateTime1 = new \DateTime("@$time1");
139
140
        $time2 = time();
141
        $dateTime2 = new \DateTime("@$time2");
142
143
        /** @var Job $job */
144
        $job = new static::$jobClass(static::$worker, false, null);
145
        $job->fibonacci(1);
146
        $job->setWhenAt($dateTime1);
147
        $job->setPriority(3);
148
        $id = $job->getId();
149
150
        $job2 = new static::$jobClass(static::$worker, false, null);
151
        $job2->setPriority(1);
152
        $job2->setWhenAt($dateTime2);
153
        $job2->fibonacci(1);
154
155
        $job3 = new static::$jobClass(static::$worker, false, null);
156
        $job3->setPriority(1);
157
        $job3->setWhenAt($dateTime2);
158
        $job3->fibonacci(1);
159
160
        $job4 = new static::$jobClass(static::$worker, false, null);
161
        $job4->setPriority(1);
162
        $job4->setWhenAt($dateTime2);
163
        $job4->fibonacci(1);
164
165
        $nextJob = $jobManager->getJob(null, null, false);
166
        static::assertEquals($id, $nextJob->getId());
167
        static::assertNotNull($jobManager->getJob());
168
        static::assertNotNull($jobManager->getJob());
169
        static::assertNotNull($jobManager->getJob());
170
    }
171
172
    public function getJobBy()
173
    {
174
        /** @var DoctrineJobManager $jobManager */
175
        $jobManager = self::$jobManager;
176
177
        /** @var Job $job */
178
        $job = new static::$jobClass(static::$worker, false, null);
179
        $job->fibonacci(1);
180
        $id = $job->getId();
181
        $nextJob = $jobManager->getJob('fibonacci', null);
182
        static::assertNotNull($nextJob);
183
        static::assertEquals($id, $nextJob->getId());
184
185
        /** @var Job $job */
186
        $job = new static::$jobClass(static::$worker, false, null);
187
        $job->fibonacci(1);
188
        $id = $job->getId();
189
        $nextJob = $jobManager->getJob('fibonacci', 'fibonacci');
190
        static::assertNotNull($nextJob);
191
        static::assertEquals($id, $nextJob->getId());
192
193
        /** @var Job $job */
194
        $job = new static::$jobClass(static::$worker, false, null);
195
        $job->fibonacci(1);
196
        $id = $job->getId();
197
        $nextJob = $jobManager->getJob(null, 'fibonacci');
198
        static::assertNotNull($nextJob);
199
        static::assertEquals($id, $nextJob->getId());
200
201
        /** @var Job $job */
202
        $job = new static::$jobClass(static::$worker, false, null);
203
        $job->fibonacci(1);
204
        $id = $job->getId();
205
        $nextJob = $jobManager->getJob(null, 'fibonaccia');
206
        static::assertNull($nextJob);
207
        $nextJob = $jobManager->getJob('fibonacci', 'fibonaccia');
208
        static::assertNull($nextJob);
209
        $nextJob = $jobManager->getJob('fibonaccii', 'fibonacci');
210
        static::assertNull($nextJob);
211
        $nextJob = $jobManager->getJob();
212
        static::assertNotNull($nextJob);
213
        static::assertEquals($id, $nextJob->getId());
214
    }
215
216
    public function testDeleteJob()
217
    {
218
        /** @var JobManager|\Dtc\QueueBundle\ORM\JobManager $jobManager */
219
        $jobManager = self::$jobManager;
220
221
        /** @var Job $job */
222
        $job = $this->getJob();
223
        $id = $job->getId();
224
        $jobManager->deleteJob($job);
225
226
        $nextJob = $jobManager->getJob(null, null, true, 123);
227
        self::assertNull($nextJob, "Shouldn't be any jobs left in queue");
228
229
        $archiveObjectName = $jobManager->getJobArchiveClass();
230
231
        self::assertNotNull($id);
232
        $archiveRepository = $jobManager->getObjectManager()->getRepository($archiveObjectName);
233
        $result = $archiveRepository->find($id);
234
        self::assertNotNull($result);
235
        self::assertEquals($id, $result->getId());
236
    }
237
238
    public function testCountLiveJobs()
239
    {
240
        $this->drain();
241
242
        /** @var JobManager|\Dtc\QueueBundle\ORM\JobManager $jobManager */
243
        $jobManager = self::$jobManager;
244
245
        $this->getJob();
246
247
        $count = $jobManager->countLiveJobs();
248
        self::assertEquals(1, $count);
249
250
        $this->getJob();
251
252
        $count = $jobManager->countLiveJobs();
253
        self::assertEquals(2, $count);
254
255
        $this->getJob();
256
257
        $count = $jobManager->countLiveJobs();
258
        self::assertEquals(3, $count);
259
260
        $count = $jobManager->countLiveJobs('asdf');
261
        self::assertEquals(0, $count);
262
263
        $count = $jobManager->countLiveJobs('fibonacci');
264
        self::assertEquals(3, $count);
265
266
        $count = $jobManager->countLiveJobs('fibonacci', 'test');
267
        self::assertEquals(0, $count);
268
269
        $count = $jobManager->countLiveJobs('fibonacci', 'fibonacci');
270
        self::assertEquals(3, $count);
271
272
        $this->drain();
273
    }
274
275
    public function testArchiveAllJobs()
276
    {
277
        $this->drain();
278
279
        /** @var JobManager|\Dtc\QueueBundle\ORM\JobManager $jobManager */
280
        $jobManager = self::$jobManager;
281
282
        $this->getJob();
283
284
        $count = $jobManager->countLiveJobs();
285
        $archiveCount = $this->runCountQuery($jobManager->getJobArchiveClass());
286
        self::assertEquals(1, $count);
287
        $allCount = $this->runCountQuery($jobManager->getJobClass());
288
        $counter = 0;
289
        $countJobs = function ($count) use (&$counter) {
290
            $counter += $count;
291
        };
292
        $jobManager->archiveAllJobs(null, null, $countJobs);
293
        self::assertEquals(0, $jobManager->countLiveJobs());
294
        self::assertEquals($allCount - 1, $this->runCountQuery($jobManager->getJobClass()));
295
        self::assertEquals($archiveCount + 1, $this->runCountQuery($jobManager->getJobArchiveClass()));
296
        self::assertEquals(1, $counter);
297
298
        $this->getJob();
299
        $this->getJob();
300
301
        $count = $jobManager->countLiveJobs();
302
        self::assertEquals(2, $count);
303
        $archiveCount = $this->runCountQuery($jobManager->getJobArchiveClass());
304
        $counter = 0;
305
        $jobManager->archiveAllJobs('fibonacci', null, $countJobs);
306
        self::assertEquals(0, $jobManager->countLiveJobs());
307
        self::assertEquals(2, $counter);
308
        self::assertEquals($archiveCount + 2, $this->runCountQuery($jobManager->getJobArchiveClass()));
309
310
        $this->getJob();
311
        $this->getJob();
312
313
        $count = $jobManager->countLiveJobs();
314
        self::assertEquals(2, $count);
315
316
        $jobManager->archiveAllJobs('fibonacc', null, $countJobs);
317
        self::assertEquals(2, $jobManager->countLiveJobs());
318
319
        $jobManager->archiveAllJobs('fibonacci', 'fibo', $countJobs);
320
        self::assertEquals(2, $jobManager->countLiveJobs());
321
322
        $jobManager->archiveAllJobs('fibonacci', 'fibonacci', $countJobs);
323
        self::assertEquals(0, $jobManager->countLiveJobs());
324
325
        $this->drain();
326
    }
327
328
    abstract protected function runCountQuery($class);
329
330
    public function testResetExceptionJobs()
331
    {
332
        /** @var JobManager|\Dtc\QueueBundle\ORM\JobManager $jobManager */
333
        $jobManager = self::$jobManager;
334
335
        $id = $this->createExceptionJob();
336
        $archiveObjectName = $jobManager->getJobArchiveClass();
337
        $objectManager = $jobManager->getObjectManager();
338
        $archiveRepository = $objectManager->getRepository($archiveObjectName);
339
        $result = $archiveRepository->find($id);
340
        self::assertNotNull($result);
341
        self::assertEquals(BaseJob::STATUS_EXCEPTION, $result->getStatus());
342
        if ($objectManager instanceof EntityManager) {
343
            JobManagerTest::createObjectManager();
344
            $jobManager = new self::$jobManagerClass(self::$runManager, self::$jobTimingManager, self::$objectManager, self::$objectName, self::$archiveObjectName);
345
            $jobManager->getObjectManager()->clear();
346
            $objectManager = $jobManager->getObjectManager();
347
        }
348
349
        $count = $jobManager->resetExceptionJobs();
350
351
        self::assertEquals(1, $count);
352
        $repository = $jobManager->getRepository();
353
        $job = $repository->find($id);
354
355
        self::assertNotNull($job);
356
        self::assertEquals(BaseJob::STATUS_NEW, $job->getStatus());
357
        self::assertNull($job->getStartedAt());
358
        self::assertNull($job->getFinishedAt());
359
        self::assertNull($job->getElapsed());
360
        self::assertNull($job->getMessage());
361
362
        $objectManager->remove($job);
363
        $objectManager->flush();
364
365
        $id = $this->createExceptionJob();
366
        $archiveObjectName = $jobManager->getJobArchiveClass();
367
        $objectManager = $jobManager->getObjectManager();
368
        $archiveRepository = $objectManager->getRepository($archiveObjectName);
369
        $result = $archiveRepository->find($id);
370
        $result->setMaxRetries(10);
371
        $result->setRetries(10);
372
        $objectManager->persist($result);
373
        $objectManager->flush();
374
        $count = $jobManager->resetExceptionJobs();
375
        self::assertEquals(0, $count);
376
        $job = $repository->find($id);
377
        self::assertNull($job);
378
        $job = $archiveRepository->find($id);
379
        self::assertNotNull($job);
380
        $objectManager->remove($job);
381
        $objectManager->flush();
382
    }
383
384
    protected function createExceptionJob()
385
    {
386
        /** @var JobManager|\Dtc\QueueBundle\ORM\JobManager $jobManager */
387
        $jobManager = self::$jobManager;
388
389
        /** @var Job $job */
390
        $job = $this->getJob();
391
        $id = $job->getId();
392
        $jobManager->deleteJob($job);
393
394
        /** @var JobManager|\Dtc\QueueBundle\ORM\JobManager $jobManager */
395
        $archiveObjectName = $jobManager->getJobArchiveClass();
396
397
        $objectManager = $jobManager->getObjectManager();
398
399
        $archiveRepository = $objectManager->getRepository($archiveObjectName);
400
        $result = $archiveRepository->find($id);
401
        self::assertNotNull($result);
402
        self::assertEquals($id, $result->getId());
403
404
        $result->setStatus(BaseJob::STATUS_EXCEPTION);
405
        $result->setStartedAt(Util::getMicrotimeDateTime());
406
        $result->setFinishedAt(new \DateTime());
407
        $result->setElapsed(12345);
408
        $result->setMessage('soomething');
409
        $objectManager->persist($result);
410
        $objectManager->flush();
411
412
        return $id;
413
    }
414
415
    /**
416
     * @param bool $endRun
417
     * @param bool $setId
418
     *
419
     * @return mixed
420
     */
421
    public function createStalledJob($endRun, $setId)
422
    {
423
        /** @var JobManager|\Dtc\QueueBundle\ORM\JobManager $jobManager */
424
        $jobManager = self::$jobManager;
425
426
        $job = new self::$jobClass(self::$worker, false, null);
427
        $job->fibonacci(1);
428
        self::assertNotNull($job->getId(), 'Job id should be generated');
429
        $job->setStatus(BaseJob::STATUS_RUNNING);
430
        $time = time();
431
        $date = new \DateTime("@$time");
432
        $job->setStartedAt($date);
433
        $id = $job->getId();
434
        $job = $jobManager->getRepository()->find($id);
435
436
        self::assertNotNull($job);
437
438
        $runClass = self::$runManager->getRunClass();
439
440
        $objectManager = $jobManager->getObjectManager();
441
        $run = new $runClass();
442
        $run->setLastHeartbeatAt(new \DateTime());
443
        if ($setId) {
444
            $run->setCurrentJobId($job->getId());
445
        }
446
        $objectManager->persist($run);
447
        $objectManager->flush();
448
        $runId = $run->getId();
449
        self::assertNotNull($runId);
450
        $job->setRunId($runId);
451
        $objectManager->persist($job);
452
        $objectManager->flush();
453
        if ($endRun) {
454
            $objectManager->remove($run);
455
            $objectManager->flush();
456
        }
457
        $id = $job->getId();
458
        $job = $jobManager->getRepository()->find($id);
459
460
        self::assertNotNull($job);
461
462
        if ($endRun) {
463
            $archivedRun = $objectManager->getRepository(self::$runManager->getRunArchiveClass())->find($runId);
464
465
            $minusTime = $time - (DoctrineJobManager::STALLED_SECONDS + 1);
466
            $archivedRun->setEndedAt(new \DateTime("@$minusTime"));
467
468
            $objectManager->persist($archivedRun);
469
            $objectManager->flush();
470
        }
471
        $id = $job->getId();
472
473
        return $id;
474
    }
475
476
    public function testResetStalledJobs()
477
    {
478
        $this->drain();
479
480
        /** @var JobManager|\Dtc\QueueBundle\ORM\JobManager $jobManager */
481
        $jobManager = self::$jobManager;
482
        $jobManager->pruneStalledJobs();
483
484
        $id = $this->createStalledJob(true, false);
485
486
        $objectManager = $jobManager->getObjectManager();
487
        $count = $jobManager->resetStalledJobs();
488
        self::assertEquals(1, $count);
489
490
        $job = $jobManager->getRepository()->find($id);
491
492
        self::assertNotNull($job);
493
        self::assertEquals(BaseJob::STATUS_NEW, $job->getStatus());
494
        self::assertNull($job->getStartedAt());
495
        self::assertNull($job->getFinishedAt());
496
        self::assertNull($job->getElapsed());
497
        self::assertNull($job->getMessage());
498
        self::assertEquals(1, $job->getStalls());
499
500
        $objectManager->remove($job);
501
        $objectManager->flush();
502
503
        /** @var JobManager|\Dtc\QueueBundle\ORM\JobManager $jobManager */
504
        $jobManager = self::$jobManager;
505
        $id = $this->createStalledJob(true, true);
506
507
        $objectManager = $jobManager->getObjectManager();
508
        $count = $jobManager->resetStalledJobs();
509
        self::assertEquals(1, $count);
510
511
        $job = $jobManager->getRepository()->find($id);
512
513
        self::assertNotNull($job);
514
        $objectManager->remove($job);
515
        $objectManager->flush();
516
517
        /** @var JobManager|\Dtc\QueueBundle\ORM\JobManager $jobManager */
518
        $id = $this->createStalledJob(false, false);
519
520
        $objectManager = $jobManager->getObjectManager();
521
        $count = $jobManager->resetStalledJobs();
522
        self::assertEquals(1, $count);
523
524
        $job = $jobManager->getRepository()->find($id);
525
526
        self::assertNotNull($job);
527
        self::assertEquals(BaseJob::STATUS_NEW, $job->getStatus());
528
        self::assertNull($job->getStartedAt());
529
        self::assertNull($job->getFinishedAt());
530
        self::assertNull($job->getElapsed());
531
        self::assertNull($job->getMessage());
532
        self::assertEquals(1, $job->getStalls());
533
534
        $objectManager->remove($job);
535
        $objectManager->flush();
536
537
        /** @var JobManager|\Dtc\QueueBundle\ORM\JobManager $jobManager */
538
        $jobManager = self::$jobManager;
539
        $id = $this->createStalledJob(false, true);
540
541
        $job = $jobManager->getRepository()->find($id);
542
        $objectManager = $jobManager->getObjectManager();
543
        $count = $jobManager->resetStalledJobs();
544
        self::assertEquals(0, $count);
545
546
        $objectManager->remove($job);
547
        $objectManager->flush();
548
549
        $id = $this->createStalledJob(true, false);
550
        $job = $jobManager->getRepository()->find($id);
551
        $job->setMaxRetries(10);
552
        $job->setRetries(10);
553
        $objectManager->persist($job);
554
        $objectManager->flush();
555
556
        $count = $jobManager->resetStalledJobs();
557
        self::assertEquals(0, $count);
558
        $job = $jobManager->getRepository()->find($id);
559
        self::assertNull($job);
560
        $job = $objectManager->getRepository($jobManager->getJobArchiveClass())->find($id);
561
        self::assertNotNull($job);
562
        $objectManager->remove($job);
563
        $objectManager->flush();
564
565
        $id = $this->createStalledJob(true, false);
566
        $job = $jobManager->getRepository()->find($id);
567
        $job->setMaxStalls(10);
568
        $job->setStalls(10);
569
        $objectManager->persist($job);
570
        $objectManager->flush();
571
572
        $count = $jobManager->resetStalledJobs();
573
        self::assertEquals(0, $count);
574
        $job = $jobManager->getRepository()->find($id);
575
        self::assertNull($job);
576
        $job = $objectManager->getRepository($jobManager->getJobArchiveClass())->find($id);
577
        self::assertNotNull($job);
578
        $objectManager->remove($job);
579
        $objectManager->flush();
580
    }
581
582
    public function testpruneExceptionJobs()
583
    {
584
        $job = $this->getJob();
585
        $id = $job->getId();
586
587
        /** @var JobManager|\Dtc\QueueBundle\ORM\JobManager $jobManager */
588
        $jobManager = self::$jobManager;
589
        $jobManager->deleteJob($job);
590
        $archiveObjectName = $jobManager->getJobArchiveClass();
591
592
        $objectManager = $jobManager->getObjectManager();
593
594
        $archiveRepository = $objectManager->getRepository($archiveObjectName);
595
        $result = $archiveRepository->find($id);
596
        self::assertNotNull($result);
597
        self::assertEquals($id, $result->getId());
598
599
        $result->setStatus(BaseJob::STATUS_EXCEPTION);
600
        $result->setStartedAt(new \DateTime());
601
        $result->setFinishedAt(new \DateTime());
602
        $result->setElapsed(12345);
603
        $result->setMessage('soomething');
604
        $objectManager->persist($result);
605
        $objectManager->flush();
606
607
        $count = $jobManager->pruneExceptionJobs('asdf');
608
        self::assertEquals(0, $count);
609
        $count = $jobManager->pruneExceptionJobs(null, 'asdf');
610
        self::assertEquals(0, $count);
611
        $count = $jobManager->pruneExceptionJobs('fibonacci', 'asdf');
612
        self::assertEquals(0, $count);
613
        $count = $jobManager->pruneExceptionJobs('fibonacci', 'asdf');
614
        self::assertEquals(0, $count);
615
        $count = $jobManager->pruneExceptionJobs('fibonacci', 'fibonacci');
616
        self::assertEquals(1, $count);
617
        $repository = $jobManager->getRepository();
618
        $job = $repository->find($id);
619
        $objectManager->clear();
620
        self::assertNull($job);
621
        $archiveJob = $archiveRepository->find($id);
622
        self::assertNull($archiveJob);
623
624
        $job = $this->getJob();
625
        $id = $job->getId();
626
        $objectManager->remove($job);
627
        $objectManager->flush();
628
        /** @var JobManager|\Dtc\QueueBundle\ORM\JobManager $jobManager */
629
        $jobManager = self::$jobManager;
630
        $archiveObjectName = $jobManager->getJobArchiveClass();
631
632
        $objectManager = $jobManager->getObjectManager();
633
634
        $archiveRepository = $objectManager->getRepository($archiveObjectName);
635
        $result = $archiveRepository->find($id);
636
        self::assertNotNull($result);
637
        self::assertEquals($id, $result->getId());
638
639
        $result->setStatus(BaseJob::STATUS_EXCEPTION);
640
        $result->setStartedAt(new \DateTime());
641
        $result->setFinishedAt(new \DateTime());
642
        $result->setElapsed(12345);
643
        $result->setMessage('soomething');
644
        $objectManager->persist($result);
645
        $objectManager->flush();
646
647
        $job = $this->getJob();
648
        $id = $job->getId();
649
        $objectManager->remove($job);
650
        $objectManager->flush();
651
652
        /** @var JobManager|\Dtc\QueueBundle\ORM\JobManager $jobManager */
653
        $jobManager = self::$jobManager;
654
        $archiveObjectName = $jobManager->getJobArchiveClass();
655
        $objectManager = $jobManager->getObjectManager();
656
657
        $archiveRepository = $objectManager->getRepository($archiveObjectName);
658
        $result = $archiveRepository->find($id);
659
        self::assertNotNull($result);
660
        self::assertEquals($id, $result->getId());
661
662
        $result->setStatus(BaseJob::STATUS_EXCEPTION);
663
        $result->setStartedAt(new \DateTime());
664
        $result->setFinishedAt(new \DateTime());
665
        $result->setElapsed(12345);
666
        $result->setMessage('soomething');
667
        $objectManager->persist($result);
668
        $objectManager->flush();
669
        $count = $jobManager->pruneExceptionJobs();
670
        self::assertEquals(2, $count);
671
    }
672
673
    public function testPruneStalledJobs()
674
    {
675
        static::setUpBeforeClass();
676
677
        /** @var JobManager|\Dtc\QueueBundle\ORM\JobManager $jobManager */
678
        $jobManager = self::$jobManager;
679
680
        $job = new self::$jobClass(self::$worker, false, null);
681
        $job->fibonacci(1);
682
        self::assertNotNull($job->getId(), 'Job id should be generated');
683
        $job->setStatus(BaseJob::STATUS_RUNNING);
684
        $time = time();
685
        $date = new \DateTime("@$time");
686
        $job->setStartedAt($date);
687
        $id = $job->getId();
688
        $job = $jobManager->getRepository()->find($id);
689
690
        self::assertNotNull($job);
691
692
        $runClass = self::$runManager->getRunClass();
693
694
        $objectManager = $jobManager->getObjectManager();
695
        $run = new $runClass();
696
        $run->setLastHeartbeatAt(new \DateTime());
697
        $objectManager->persist($run);
698
        $objectManager->flush();
699
        $runId = $run->getId();
700
        self::assertNotNull($runId);
701
        $job->setRunId($runId);
702
        $objectManager->persist($job);
703
        $objectManager->flush();
704
        $objectManager->remove($run);
705
        $objectManager->flush();
706
        $id = $job->getId();
707
        $job = $jobManager->getRepository()->find($id);
708
709
        self::assertNotNull($job);
710
711
        $archivedRun = $objectManager->getRepository(self::$runManager->getRunArchiveClass())->find($runId);
712
713
        $minusTime = $time - (DoctrineJobManager::STALLED_SECONDS + 1);
714
        $archivedRun->setEndedAt(new \DateTime("@$minusTime"));
715
716
        $objectManager->persist($archivedRun);
717
        $objectManager->flush();
718
719
        $count = $jobManager->pruneStalledJobs('asdf');
720
        self::assertEquals(0, $count);
721
        $count = $jobManager->pruneStalledJobs(null, 'asdf');
722
        self::assertEquals(0, $count);
723
        $count = $jobManager->pruneStalledJobs('fibonacci', 'asdf');
724
        self::assertEquals(0, $count);
725
        $count = $jobManager->pruneStalledJobs('fibonacci', 'fibonacci');
726
        self::assertEquals(1, $count);
727
728
        $job = $jobManager->getRepository()->find($id);
729
730
        self::assertNull($job);
731
732
        $archivedJob = $jobManager->getObjectManager()->getRepository($jobManager->getJobArchiveClass())->find($id);
733
734
        self::assertNotNull($archivedJob);
735
        self::assertEquals(StallableJob::STATUS_STALLED, $archivedJob->getStatus());
736
        self::assertEquals(1, $archivedJob->getStalls());
737
        $objectManager->remove($archivedJob);
738
        $objectManager->flush();
739
740
        // multiple
741
742
        /** @var JobManager|\Dtc\QueueBundle\ORM\JobManager $jobManager */
743
        $jobManager = self::$jobManager;
744
745
        $job = new self::$jobClass(self::$worker, false, null);
746
        $job->fibonacci(1);
747
        self::assertNotNull($job->getId(), 'Job id should be generated');
748
        $job->setStatus(BaseJob::STATUS_RUNNING);
749
        $time = time();
750
        $date = new \DateTime("@$time");
751
        $job->setStartedAt($date);
752
        $id = $job->getId();
753
        $job = $jobManager->getRepository()->find($id);
754
755
        self::assertNotNull($job);
756
757
        $runClass = self::$runManager->getRunClass();
758
759
        $objectManager = $jobManager->getObjectManager();
760
        $run = new $runClass();
761
        $run->setLastHeartbeatAt(new \DateTime());
762
        $objectManager->persist($run);
763
        $objectManager->flush();
764
        $runId = $run->getId();
765
        self::assertNotNull($runId);
766
        $job->setRunId($runId);
767
        $objectManager->persist($job);
768
        $objectManager->flush();
769
        $objectManager->remove($run);
770
        $objectManager->flush();
771
        $id = $job->getId();
772
        $job = $jobManager->getRepository()->find($id);
773
774
        self::assertNotNull($job);
775
776
        $archivedRun = $objectManager->getRepository(self::$runManager->getRunArchiveClass())->find($runId);
777
778
        $minusTime = $time - (DoctrineJobManager::STALLED_SECONDS + 1);
779
        $archivedRun->setEndedAt(new \DateTime("@$minusTime"));
780
781
        $objectManager->persist($archivedRun);
782
        $objectManager->flush();
783
784
        $job = new self::$jobClass(self::$worker, false, null);
785
        $job->fibonacci(1);
786
        self::assertNotNull($job->getId(), 'Job id should be generated');
787
        $job->setStatus(BaseJob::STATUS_RUNNING);
788
        $time = time();
789
        $date = new \DateTime("@$time");
790
        $job->setStartedAt($date);
791
        $id = $job->getId();
792
        $job = $jobManager->getRepository()->find($id);
793
794
        self::assertNotNull($job);
795
796
        $runClass = self::$runManager->getRunClass();
797
798
        $objectManager = $jobManager->getObjectManager();
799
        $run = new $runClass();
800
        $run->setLastHeartbeatAt(new \DateTime());
801
        $objectManager->persist($run);
802
        $objectManager->flush();
803
        $runId = $run->getId();
804
        self::assertNotNull($runId);
805
        $job->setRunId($runId);
806
        $objectManager->persist($job);
807
        $objectManager->flush();
808
        $objectManager->remove($run);
809
        $objectManager->flush();
810
        $id = $job->getId();
811
        $job = $jobManager->getRepository()->find($id);
812
813
        self::assertNotNull($job);
814
815
        $archivedRun = $objectManager->getRepository(self::$runManager->getRunArchiveClass())->find($runId);
816
817
        $minusTime = $time - (DoctrineJobManager::STALLED_SECONDS + 1);
818
        $archivedRun->setEndedAt(new \DateTime("@$minusTime"));
819
820
        $objectManager->persist($archivedRun);
821
        $objectManager->flush();
822
        $count = $jobManager->pruneStalledJobs();
823
        self::assertEquals(2, $count);
824
    }
825
826
    public function testBatchJobs()
827
    {
828
        /** @var JobManager|\Dtc\QueueBundle\ORM\JobManager $jobManager */
829
        $jobManager = self::$jobManager;
830
831
        $jobs = $jobManager->getRepository()->findAll();
832
        foreach ($jobs as $job) {
833
            $jobManager->getObjectManager()->remove($job);
834
        }
835
        $jobManager->getObjectManager()->flush();
836
        $jobManager->getObjectManager()->clear();
837
838
        /** @var JobManager|\Dtc\QueueBundle\ORM\JobManager $jobManager */
839
        $worker = self::$worker;
840
        $job1 = $worker->later()->fibonacci(1);
841
        $job2 = $worker->batchLater()->fibonacci(1);
842
        self::assertEquals($job1, $job2);
843
844
        $jobs = $jobManager->getRepository()->findAll();
845
        self::assertCount(1, $jobs);
846
        self::assertEquals($job1, $jobs[0]);
847
        self::assertNull($jobs[0]->getPriority());
848
        $jobManager->getObjectManager()->remove($jobs[0]);
849
        $jobManager->getObjectManager()->flush();
850
        $jobManager->getObjectManager()->clear();
851
852
        $job1 = $worker->later()->fibonacci(1);
853
        self::assertNull($job1->getPriority());
854
        $job2 = $worker->batchLater()->setPriority(3)->fibonacci(1);
855
        self::assertEquals($job1, $job2);
856
        self::assertNotNull($job2->getPriority());
857
858
        $jobs = $jobManager->getRepository()->findAll();
859
        self::assertCount(1, $jobs);
860
        self::assertEquals($job1, $jobs[0]);
861
        self::assertNotNull($jobs[0]->getPriority());
862
863
        // Not
864
        $jobs = $jobManager->getRepository()->findAll();
865
        foreach ($jobs as $job) {
866
            $jobManager->getObjectManager()->remove($job);
867
        }
868
        $jobManager->getObjectManager()->remove($jobs[0]);
869
        $jobManager->getObjectManager()->flush();
870
        $jobManager->getObjectManager()->clear();
871
872
        $job1 = $worker->later(100)->fibonacci(1);
873
874
        $time1 = new \DateTime('@'.time());
875
        $job2 = $worker->batchLater(0)->fibonacci(1);
876
        $time2 = Util::getDateTimeFromDecimalFormat(Util::getMicrotimeDecimal());
877
878
        self::assertEquals($job1, $job2);
879
        self::assertGreaterThanOrEqual($time1, $job2->getWhenAt());
880
        self::assertLessThanOrEqual($time2, $job2->getWhenAt());
881
882
        $jobs = $jobManager->getRepository()->findAll();
883
        self::assertCount(1, $jobs);
884
        self::assertEquals($job1, $jobs[0]);
885
        self::assertGreaterThanOrEqual($time1, $jobs[0]->getWhenAt());
886
        self::assertLessThanOrEqual($time2, $jobs[0]->getWhenAt());
887
        $jobManager->getObjectManager()->remove($jobs[0]);
888
        $jobManager->getObjectManager()->flush();
889
        $jobManager->getObjectManager()->clear();
890
891
        $job1 = $worker->later(100)->setPriority(3)->fibonacci(1);
892
        $priority1 = $job1->getPriority();
893
        $time1 = Util::getDateTimeFromDecimalFormat(Util::getMicrotimeDecimal());
894
        $job2 = $worker->batchLater(0)->setPriority(1)->fibonacci(1);
895
        $time2 = Util::getDateTimeFromDecimalFormat(Util::getMicrotimeDecimal());
896
        self::assertEquals($job1, $job2);
897
        self::assertNotEquals($priority1, $job2->getPriority());
898
899
        self::assertEquals($job1, $job2);
900
        self::assertGreaterThanOrEqual($time1, $job2->getWhenAt());
901
        self::assertLessThanOrEqual($time2, $job2->getWhenAt());
902
    }
903
904
    public function testPruneExpiredJobs()
905
    {
906
        /** @var JobManager|\Dtc\QueueBundle\ORM\JobManager $jobManager */
907
        $jobManager = self::$jobManager;
908
        $objectManager = $jobManager->getObjectManager();
909
910
        $job = new self::$jobClass(self::$worker, false, null);
911
        $job->fibonacci(1);
912
        self::assertNotNull($job->getId(), 'Job id should be generated');
913
        $time = time() - 1;
914
        $date = new \DateTime("@$time");
915
        $job->setExpiresAt($date);
916
        $objectManager->persist($job);
917
        $objectManager->flush();
918
919
        $count = $jobManager->pruneExpiredJobs('asdf');
920
        self::assertEquals(0, $count);
921
        $count = $jobManager->pruneExpiredJobs(null, 'asdf');
922
        self::assertEquals(0, $count);
923
        $count = $jobManager->pruneExpiredJobs(null, 'fibonacci');
924
        self::assertEquals(1, $count);
925
926
        $job = new self::$jobClass(self::$worker, false, null);
927
        $job->fibonacci(1);
928
        self::assertNotNull($job->getId(), 'Job id should be generated');
929
        $time = time() - 1;
930
        $date = new \DateTime("@$time");
931
        $job->setExpiresAt($date);
932
        $objectManager->persist($job);
933
        $objectManager->flush();
934
935
        $job = new self::$jobClass(self::$worker, false, null);
936
        $job->fibonacci(1);
937
        self::assertNotNull($job->getId(), 'Job id should be generated');
938
        $time = time() - 1;
939
        $date = new \DateTime("@$time");
940
        $job->setExpiresAt($date);
941
        $objectManager->persist($job);
942
        $objectManager->flush();
943
944
        $count = $jobManager->pruneExpiredJobs(null, 'fibonacci');
945
        self::assertEquals(2, $count);
946
947
        $job = new self::$jobClass(self::$worker, false, null);
948
        $job->fibonacci(1);
949
        self::assertNotNull($job->getId(), 'Job id should be generated');
950
        $time = time() - 1;
951
        $date = new \DateTime("@$time");
952
        $job->setExpiresAt($date);
953
        $objectManager->persist($job);
954
        $objectManager->flush();
955
956
        $job = new self::$jobClass(self::$worker, false, null);
957
        $job->fibonacci(1);
958
        self::assertNotNull($job->getId(), 'Job id should be generated');
959
        $time = time() - 1;
960
        $date = new \DateTime("@$time");
961
        $job->setExpiresAt($date);
962
        $objectManager->persist($job);
963
        $objectManager->flush();
964
965
        $count = $jobManager->pruneExpiredJobs('fibonacci', 'fibonacci');
966
        self::assertEquals(2, $count);
967
968
        $job = new self::$jobClass(self::$worker, false, null);
969
        $job->fibonacci(1);
970
        self::assertNotNull($job->getId(), 'Job id should be generated');
971
        $time = time() - 1;
972
        $date = new \DateTime("@$time");
973
        $job->setExpiresAt($date);
974
        $objectManager->persist($job);
975
        $objectManager->flush();
976
977
        $job = new self::$jobClass(self::$worker, false, null);
978
        $job->fibonacci(1);
979
        self::assertNotNull($job->getId(), 'Job id should be generated');
980
        $time = time() - 1;
981
        $date = new \DateTime("@$time");
982
        $job->setExpiresAt($date);
983
        $objectManager->persist($job);
984
        $objectManager->flush();
985
986
        $count = $jobManager->pruneExpiredJobs('fibonacci');
987
        self::assertEquals(2, $count);
988
989
        $job = new self::$jobClass(self::$worker, false, null);
990
        $job->fibonacci(1);
991
        self::assertNotNull($job->getId(), 'Job id should be generated');
992
        $time = time() - 1;
993
        $date = new \DateTime("@$time");
994
        $job->setExpiresAt($date);
995
        $objectManager->persist($job);
996
        $objectManager->flush();
997
998
        $jobId1 = $job->getId();
999
1000
        $job = new self::$jobClass(self::$worker, false, null);
1001
        $job->fibonacci(1);
1002
        self::assertNotNull($job->getId(), 'Job id should be generated');
1003
        $time = time() - 1;
1004
        $date = new \DateTime("@$time");
1005
        $job->setExpiresAt($date);
1006
        $objectManager->persist($job);
1007
        $objectManager->flush();
1008
1009
        $jobId2 = $job->getId();
1010
1011
        $count = $jobManager->pruneExpiredJobs();
1012
        self::assertEquals(2, $count);
1013
1014
        $archiveRepository = $jobManager->getObjectManager()->getRepository($jobManager->getJobArchiveClass());
1015
1016
        $job = $archiveRepository->find($jobId1);
1017
        self::assertNotNull($job);
1018
        self::assertEquals(Job::STATUS_EXPIRED, $job->getStatus());
1019
1020
        $job = $archiveRepository->find($jobId2);
1021
        self::assertNotNull($job);
1022
        self::assertEquals(Job::STATUS_EXPIRED, $job->getStatus());
1023
    }
1024
1025
    public function testPruneArchivedJobs()
1026
    {
1027
        /** @var JobManager|\Dtc\QueueBundle\ORM\JobManager $jobManager */
1028
        $jobManager = self::$jobManager;
1029
        $objectManager = $jobManager->getObjectManager();
1030
        $jobArchiveClass = $jobManager->getJobArchiveClass();
1031
        $jobArchiveRepository = $objectManager->getRepository($jobArchiveClass);
1032
1033
        self::$objectManager->getEventManager()->removeEventListener('preUpdate', self::$dtcQueueListener);
1034
1035
        $job = new self::$jobClass(self::$worker, false, null);
1036
        $job->fibonacci(1);
1037
        $id = $job->getId();
1038
        $objectManager->remove($job);
1039
        $objectManager->flush();
1040
1041
        $jobArchive = $jobArchiveRepository->find($id);
1042
        self::assertNotNull($jobArchive);
1043
        $time = time() - 86401;
1044
        $jobArchive->setUpdatedAt(new \DateTime("@$time"));
1045
        $objectManager->persist($jobArchive);
1046
        $objectManager->flush();
1047
1048
        $older = $time + 1;
1049
        $count = $jobManager->pruneArchivedJobs(new \DateTime("@$time"));
1050
        self::assertEquals(0, $count);
1051
        $count = $jobManager->pruneArchivedJobs(new \DateTime("@$older"));
1052
        self::assertEquals(1, $count);
1053
1054
        $job = new self::$jobClass(self::$worker, false, null);
1055
        $job->fibonacci(1);
1056
        $id = $job->getId();
1057
        $objectManager->remove($job);
1058
        $objectManager->flush();
1059
1060
        $jobArchive = $jobArchiveRepository->find($id);
1061
        self::assertNotNull($jobArchive);
1062
        $time = time() - 86401;
1063
        $jobArchive->setUpdatedAt(new \DateTime("@$time"));
1064
        $objectManager->persist($jobArchive);
1065
        $objectManager->flush();
1066
1067
        $job = new self::$jobClass(self::$worker, false, null);
1068
        $job->fibonacci(1);
1069
        $id = $job->getId();
1070
        $objectManager->remove($job);
1071
        $objectManager->flush();
1072
1073
        $jobArchive = $jobArchiveRepository->find($id);
1074
        self::assertNotNull($jobArchive);
1075
        $jobArchive->setUpdatedAt(new \DateTime("@$time"));
1076
        $objectManager->persist($jobArchive);
1077
        $objectManager->flush();
1078
        $older = $time + 1;
1079
        $count = $jobManager->pruneArchivedJobs(new \DateTime("@$time"));
1080
        self::assertEquals(0, $count);
1081
        $count = $jobManager->pruneArchivedJobs(new \DateTime("@$older"));
1082
        self::assertEquals(2, $count);
1083
1084
        self::$objectManager->getEventManager()->addEventListener('preUpdate', self::$dtcQueueListener);
1085
    }
1086
1087
    public function testPerformance()
1088
    {
1089
        /** @var JobManager|\Dtc\QueueBundle\ORM\JobManager $jobManager */
1090
        $jobManager = self::$jobManager;
1091
1092
        $jobs = $jobManager->getRepository()->findAll();
1093
        foreach ($jobs as $job) {
1094
            $jobManager->getObjectManager()->remove($job);
1095
        }
1096
        $jobManager->getObjectManager()->flush();
1097
1098
        $jobManager->getObjectManager()->clear();
1099
        parent::testPerformance();
1100
    }
1101
1102
    protected function getBaseStatus()
1103
    {
1104
        /** @var JobManager|\Dtc\QueueBundle\ORM\JobManager $jobManager */
1105
        $jobManager = self::$jobManager;
1106
        $job = new self::$jobClass(self::$worker, false, null);
1107
        $job->fibonacci(1);
1108
        $status = $jobManager->getStatus();
1109
        self::assertArrayHasKey('fibonacci->fibonacci()', $status);
1110
        $fibonacciStatus = $status['fibonacci->fibonacci()'];
1111
1112
        self::assertArrayHasKey(BaseJob::STATUS_NEW, $fibonacciStatus);
1113
        self::assertArrayHasKey(BaseJob::STATUS_EXCEPTION, $fibonacciStatus);
1114
        self::assertArrayHasKey(BaseJob::STATUS_RUNNING, $fibonacciStatus);
1115
        self::assertArrayHasKey(BaseJob::STATUS_SUCCESS, $fibonacciStatus);
1116
        self::assertArrayHasKey(Job::STATUS_EXPIRED, $fibonacciStatus);
1117
        self::assertArrayHasKey(StallableJob::STATUS_MAX_STALLS, $fibonacciStatus);
1118
        self::assertArrayHasKey(RetryableJob::STATUS_MAX_EXCEPTIONS, $fibonacciStatus);
1119
        self::assertArrayHasKey(RetryableJob::STATUS_MAX_FAILURES, $fibonacciStatus);
1120
        self::assertArrayHasKey(RetryableJob::STATUS_MAX_RETRIES, $fibonacciStatus);
1121
1122
        return [$job, $status];
1123
    }
1124
1125
    public function testGetStatus()
1126
    {
1127
        list($job1, $status1) = $this->getBaseStatus();
1128
        list($job2, $status2) = $this->getBaseStatus();
1129
        $fibonacciStatus1 = $status1['fibonacci->fibonacci()'];
1130
        $fibonacciStatus2 = $status2['fibonacci->fibonacci()'];
1131
1132
        self::assertEquals($fibonacciStatus1[BaseJob::STATUS_NEW] + 1, $fibonacciStatus2[BaseJob::STATUS_NEW]);
1133
        /** @var JobManager|\Dtc\QueueBundle\ORM\JobManager $jobManager */
1134
        $jobManager = self::$jobManager;
1135
        $objectManager = $jobManager->getObjectManager();
1136
        $objectManager->remove($job1);
1137
        $objectManager->remove($job2);
1138
    }
1139
1140
    public function testStallableJobManager()
1141
    {
1142
        /** @var StallableJobManager $jobManager */
1143
        $jobManager = self::$jobManager;
1144
1145
        $defaultMaxStalls = $jobManager->getDefaultMaxStalls();
1146
        $jobManager->setDefaultMaxStalls(123);
1147
        self::assertEquals(123, $jobManager->getDefaultMaxStalls());
1148
        $jobManager->setDefaultMaxStalls($defaultMaxStalls);
1149
        self::assertEquals($defaultMaxStalls, $jobManager->getDefaultMaxStalls());
1150
    }
1151
}
1152