Completed
Push — master ( dbde40...c702e0 )
by Matthew
08:07
created

BaseJobManagerTest::testPruneArchivedJobs()   A

Complexity

Conditions 1
Paths 1

Size

Total Lines 61
Code Lines 49

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
dl 0
loc 61
rs 9.5147
c 0
b 0
f 0
cc 1
eloc 49
nc 1
nop 0

How to fix   Long Method   

Long Method

Small methods make your code easier to understand, in particular if combined with a good name. Besides, if your method is small, finding a good name is usually much easier.

For example, if you find yourself adding comments to a method's body, this is usually a good sign to extract the commented part to a new method, and use the comment as a starting point when coming up with a good name for this new method.

Commonly applied refactorings include:

1
<?php
2
3
namespace Dtc\QueueBundle\Tests\Doctrine;
4
5
use Doctrine\ODM\MongoDB\DocumentManager;
6
use Doctrine\ORM\EntityManager;
7
use Dtc\QueueBundle\Doctrine\BaseJobManager;
8
use Dtc\QueueBundle\Doctrine\DtcQueueListener;
9
use Dtc\QueueBundle\Model\BaseJob;
10
use Dtc\QueueBundle\Model\Job;
11
use Dtc\QueueBundle\Tests\Model\PriorityTestTrait;
12
use Dtc\QueueBundle\Model\RetryableJob;
13
use Dtc\QueueBundle\Tests\FibonacciWorker;
14
use Dtc\QueueBundle\Tests\Model\BaseJobManagerTest as BaseBaseJobManagerTest;
15
use Dtc\QueueBundle\ODM\JobManager;
16
use Dtc\QueueBundle\Tests\ORM\JobManagerTest;
17
use Symfony\Component\DependencyInjection\Container;
18
use Symfony\Component\DependencyInjection\ParameterBag\ParameterBag;
19
20
/**
21
 * @author David
22
 *
23
 * This test requires local mongodb running
24
 */
25
abstract class BaseJobManagerTest extends BaseBaseJobManagerTest
26
{
27
    use PriorityTestTrait;
28
29
    protected static $dtcQueueListener;
30
31
    /** @var DocumentManager|EntityManager */
32
    protected static $objectManager;
33
    protected static $objectName;
34
    protected static $archiveObjectName;
35
    protected static $runClass;
36
    protected static $runArchiveClass;
37
    protected static $jobTimingClass;
38
    protected static $jobManagerClass;
39
    protected static $runManagerClass;
40
    public static $runManager;
41
42
    public static function setUpBeforeClass()
43
    {
44
        self::$jobManager = new self::$jobManagerClass(self::$objectManager, self::$objectName, self::$archiveObjectName, self::$runClass, self::$runArchiveClass);
45
        self::$jobManager->setMaxPriority(255);
46
        self::$runManager = new self::$runManagerClass(self::$runClass, self::$jobTimingClass, true);
47
        self::$runManager->setObjectManager(self::$objectManager);
48
        self::$runManager->setRunArchiveClass(self::$runArchiveClass);
49
50
        self::assertEquals(255, self::$jobManager->getMaxPriority());
51
        self::assertEquals(JobManager::PRIORITY_DESC, self::$jobManager->getPriorityDirection());
52
        self::$jobManager->setPriorityDirection(JobManager::PRIORITY_ASC);
53
        self::assertEquals(JobManager::PRIORITY_ASC, self::$jobManager->getPriorityDirection());
54
        self::$jobManager->setPriorityDirection(JobManager::PRIORITY_DESC);
55
56
        /** @var BaseJobManager $jobManager */
57
        $jobManager = self::$jobManager;
58
59
        $parameters = new ParameterBag();
60
61
        $container = new Container($parameters);
62
        $container->set('dtc_queue.job_manager', $jobManager);
63
        $container->set('dtc_queue.run_manager', self::$runManager);
64
65
        self::$dtcQueueListener = new DtcQueueListener($container);
66
        self::$objectManager->getEventManager()->addEventListener('preUpdate', self::$dtcQueueListener);
67
        self::$objectManager->getEventManager()->addEventListener('prePersist', self::$dtcQueueListener);
68
        self::$objectManager->getEventManager()->addEventListener('preRemove', self::$dtcQueueListener);
69
70
        self::$worker = new FibonacciWorker();
71
72
        self::$worker->setJobClass($jobManager->getRepository()->getClassName());
73
        parent::setUpBeforeClass();
74
    }
75
76
    public static function tearDownAfterClass()
77
    {
78
        self::$objectManager->getEventManager()->removeEventListener('preUpdate', self::$dtcQueueListener);
79
        self::$objectManager->getEventManager()->removeEventListener('prePersist', self::$dtcQueueListener);
80
        self::$objectManager->getEventManager()->removeEventListener('preRemove', self::$dtcQueueListener);
81
        parent::tearDownAfterClass();
82
    }
83
84
    public function testDeleteJob()
85
    {
86
        /** @var JobManager|\Dtc\QueueBundle\ORM\JobManager $jobManager */
87
        $jobManager = self::$jobManager;
88
89
        /** @var Job $job */
90
        $job = $this->getJob();
91
        $id = $job->getId();
92
        $jobManager->deleteJob($job);
93
94
        $nextJob = $jobManager->getJob();
95
        self::assertNull($nextJob, "Shouldn't be any jobs left in queue");
96
97
        $archiveObjectName = $jobManager->getArchiveObjectName();
98
99
        self::assertNotNull($id);
100
        $archiveRepository = $jobManager->getObjectManager()->getRepository($archiveObjectName);
101
        $result = $archiveRepository->find($id);
102
        self::assertNotNull($result);
103
        self::assertEquals($id, $result->getId());
104
    }
105
106
    public function testResetErroneousJobs()
107
    {
108
        /** @var JobManager|\Dtc\QueueBundle\ORM\JobManager $jobManager */
109
        $jobManager = self::$jobManager;
110
111
        /** @var Job $job */
112
        $job = $this->getJob();
113
        $id = $job->getId();
114
        $jobManager->deleteJob($job);
115
116
        /** @var JobManager|\Dtc\QueueBundle\ORM\JobManager $jobManager */
117
        $jobManager = self::$jobManager;
118
        $archiveObjectName = $jobManager->getArchiveObjectName();
119
120
        $objectManager = $jobManager->getObjectManager();
121
122
        $archiveRepository = $objectManager->getRepository($archiveObjectName);
123
        $result = $archiveRepository->find($id);
124
        self::assertNotNull($result);
125
        self::assertEquals($id, $result->getId());
126
127
        $result->setStatus(BaseJob::STATUS_ERROR);
128
        $result->setLocked(true);
129
        $result->setLockedAt(new \DateTime());
130
        $result->setFinishedAt(new \DateTime());
131
        $result->setElapsed(12345);
132
        $result->setMessage('soomething');
133
        $objectManager->persist($result);
134
        $objectManager->flush();
135
136
        if ($objectManager instanceof EntityManager) {
137
            JobManagerTest::createObjectManager();
138
            $jobManager = new self::$jobManagerClass(self::$objectManager, self::$objectName, self::$archiveObjectName, self::$runClass, self::$runArchiveClass);
139
            $jobManager->getObjectManager()->clear();
140
            $objectManager = $jobManager->getObjectManager();
141
        }
142
143
        $count = $jobManager->resetErroneousJobs();
144
145
        self::assertEquals(1, $count);
146
        $repository = $jobManager->getRepository();
147
        $job = $repository->find($id);
148
149
        self::assertNotNull($job);
150
        self::assertEquals(BaseJob::STATUS_NEW, $job->getStatus());
151
        self::assertNull($job->getLockedAt());
152
        self::assertNull($job->getFinishedAt());
153
        self::assertNull($job->getElapsed());
154
        self::assertNull($job->getMessage());
155
        self::assertNull($job->getLocked());
156
157
        $objectManager->remove($job);
158
        $objectManager->flush();
159
    }
160
161
    public function testResetStalledJobs()
162
    {
163
        /** @var JobManager|\Dtc\QueueBundle\ORM\JobManager $jobManager */
164
        $jobManager = self::$jobManager;
165
166
        $job = new self::$jobClass(self::$worker, false, null);
167
        $job->fibonacci(1);
168
        self::assertNotNull($job->getId(), 'Job id should be generated');
169
        $job->setStatus(BaseJob::STATUS_RUNNING);
170
        $job->setLocked(true);
171
        $time = time();
172
        $date = new \DateTime("@$time");
173
        $job->setLockedAt($date);
174
        $id = $job->getId();
175
        $job = $jobManager->getRepository()->find($id);
176
177
        self::assertNotNull($job);
178
179
        $runClass = $jobManager->getRunClass();
180
181
        $objectManager = $jobManager->getObjectManager();
182
        $run = new $runClass();
183
        $run->setLastHeartbeatAt(new \DateTime());
184
        $objectManager->persist($run);
185
        $objectManager->flush();
186
        $runId = $run->getId();
187
        self::assertNotNull($runId);
188
        $job->setRunId($runId);
189
        $objectManager->persist($job);
190
        $objectManager->flush();
191
        $objectManager->remove($run);
192
        $objectManager->flush();
193
        $id = $job->getId();
194
        $job = $jobManager->getRepository()->find($id);
195
196
        self::assertNotNull($job);
197
198
        $archivedRun = $objectManager->getRepository($jobManager->getRunArchiveClass())->find($runId);
199
200
        $minusTime = $time - (BaseJobManager::STALLED_SECONDS + 1);
201
        $archivedRun->setEndedAt(new \DateTime("@$minusTime"));
202
203
        $objectManager->persist($archivedRun);
204
        $objectManager->flush();
205
206
        $count = $jobManager->resetStalledJobs();
207
208
        self::assertEquals(1, $count);
209
210
        $id = $job->getId();
211
        $job = $jobManager->getRepository()->find($id);
212
213
        self::assertNotNull($job);
214
        self::assertEquals(BaseJob::STATUS_NEW, $job->getStatus());
215
        self::assertNull($job->getLockedAt());
216
        self::assertNull($job->getFinishedAt());
217
        self::assertNull($job->getElapsed());
218
        self::assertNull($job->getMessage());
219
        self::assertNull($job->getLocked());
220
        self::assertEquals(1, $job->getStalledCount());
221
222
        $objectManager->remove($job);
223
        $objectManager->flush();
224
    }
225
226
    public function testPruneErroneousJobs()
227
    {
228
        $job = $this->getJob();
229
        $id = $job->getId();
230
231
        /** @var JobManager|\Dtc\QueueBundle\ORM\JobManager $jobManager */
232
        $jobManager = self::$jobManager;
233
        $jobManager->deleteJob($job);
234
        $archiveObjectName = $jobManager->getArchiveObjectName();
235
236
        $objectManager = $jobManager->getObjectManager();
237
238
        $archiveRepository = $objectManager->getRepository($archiveObjectName);
239
        $result = $archiveRepository->find($id);
240
        self::assertNotNull($result);
241
        self::assertEquals($id, $result->getId());
242
243
        $result->setStatus(BaseJob::STATUS_ERROR);
244
        $result->setLocked(true);
245
        $result->setLockedAt(new \DateTime());
246
        $result->setFinishedAt(new \DateTime());
247
        $result->setElapsed(12345);
248
        $result->setMessage('soomething');
249
        $objectManager->persist($result);
250
        $objectManager->flush();
251
252
        $count = $jobManager->pruneErroneousJobs('asdf');
253
        self::assertEquals(0, $count);
254
        $count = $jobManager->pruneErroneousJobs(null, 'asdf');
255
        self::assertEquals(0, $count);
256
        $count = $jobManager->pruneErroneousJobs('fibonacci', 'asdf');
257
        self::assertEquals(0, $count);
258
        $count = $jobManager->pruneErroneousJobs('fibonacci', 'asdf');
259
        self::assertEquals(0, $count);
260
        $count = $jobManager->pruneErroneousJobs('fibonacci', 'fibonacci');
261
        self::assertEquals(1, $count);
262
        $repository = $jobManager->getRepository();
263
        $job = $repository->find($id);
264
        $objectManager->clear();
265
        self::assertNull($job);
266
        $archiveJob = $archiveRepository->find($id);
267
        self::assertNull($archiveJob);
268
269
        $job = $this->getJob();
270
        $id = $job->getId();
271
        $objectManager->remove($job);
272
        $objectManager->flush();
273
        /** @var JobManager|\Dtc\QueueBundle\ORM\JobManager $jobManager */
274
        $jobManager = self::$jobManager;
275
        $archiveObjectName = $jobManager->getArchiveObjectName();
276
277
        $objectManager = $jobManager->getObjectManager();
278
279
        $archiveRepository = $objectManager->getRepository($archiveObjectName);
280
        $result = $archiveRepository->find($id);
281
        self::assertNotNull($result);
282
        self::assertEquals($id, $result->getId());
283
284
        $result->setStatus(BaseJob::STATUS_ERROR);
285
        $result->setLocked(true);
286
        $result->setLockedAt(new \DateTime());
287
        $result->setFinishedAt(new \DateTime());
288
        $result->setElapsed(12345);
289
        $result->setMessage('soomething');
290
        $objectManager->persist($result);
291
        $objectManager->flush();
292
293
        $job = $this->getJob();
294
        $id = $job->getId();
295
        $objectManager->remove($job);
296
        $objectManager->flush();
297
298
        /** @var JobManager|\Dtc\QueueBundle\ORM\JobManager $jobManager */
299
        $jobManager = self::$jobManager;
300
        $archiveObjectName = $jobManager->getArchiveObjectName();
301
        $objectManager = $jobManager->getObjectManager();
302
303
        $archiveRepository = $objectManager->getRepository($archiveObjectName);
304
        $result = $archiveRepository->find($id);
305
        self::assertNotNull($result);
306
        self::assertEquals($id, $result->getId());
307
308
        $result->setStatus(BaseJob::STATUS_ERROR);
309
        $result->setLocked(true);
310
        $result->setLockedAt(new \DateTime());
311
        $result->setFinishedAt(new \DateTime());
312
        $result->setElapsed(12345);
313
        $result->setMessage('soomething');
314
        $objectManager->persist($result);
315
        $objectManager->flush();
316
        $count = $jobManager->pruneErroneousJobs();
317
        self::assertEquals(2, $count);
318
    }
319
320
    public function testPruneStalledJobs()
321
    {
322
        static::setUpBeforeClass();
323
324
        /** @var JobManager|\Dtc\QueueBundle\ORM\JobManager $jobManager */
325
        $jobManager = self::$jobManager;
326
327
        $job = new self::$jobClass(self::$worker, false, null);
328
        $job->fibonacci(1);
329
        self::assertNotNull($job->getId(), 'Job id should be generated');
330
        $job->setStatus(BaseJob::STATUS_RUNNING);
331
        $job->setLocked(true);
332
        $time = time();
333
        $date = new \DateTime("@$time");
334
        $job->setLockedAt($date);
335
        $id = $job->getId();
336
        $job = $jobManager->getRepository()->find($id);
337
338
        self::assertNotNull($job);
339
340
        $runClass = $jobManager->getRunClass();
341
342
        $objectManager = $jobManager->getObjectManager();
343
        $run = new $runClass();
344
        $run->setLastHeartbeatAt(new \DateTime());
345
        $objectManager->persist($run);
346
        $objectManager->flush();
347
        $runId = $run->getId();
348
        self::assertNotNull($runId);
349
        $job->setRunId($runId);
350
        $objectManager->persist($job);
351
        $objectManager->flush();
352
        $objectManager->remove($run);
353
        $objectManager->flush();
354
        $id = $job->getId();
355
        $job = $jobManager->getRepository()->find($id);
356
357
        self::assertNotNull($job);
358
359
        $archivedRun = $objectManager->getRepository($jobManager->getRunArchiveClass())->find($runId);
360
361
        $minusTime = $time - (BaseJobManager::STALLED_SECONDS + 1);
362
        $archivedRun->setEndedAt(new \DateTime("@$minusTime"));
363
364
        $objectManager->persist($archivedRun);
365
        $objectManager->flush();
366
367
        $count = $jobManager->pruneStalledJobs('asdf');
368
        self::assertEquals(0, $count);
369
        $count = $jobManager->pruneStalledJobs(null, 'asdf');
370
        self::assertEquals(0, $count);
371
        $count = $jobManager->pruneStalledJobs('fibonacci', 'asdf');
372
        self::assertEquals(0, $count);
373
        $count = $jobManager->pruneStalledJobs('fibonacci', 'fibonacci');
374
        self::assertEquals(1, $count);
375
376
        $job = $jobManager->getRepository()->find($id);
377
378
        self::assertNull($job);
379
380
        $archivedJob = $jobManager->getObjectManager()->getRepository($jobManager->getArchiveObjectName())->find($id);
381
382
        self::assertNotNull($archivedJob);
383
        self::assertEquals(BaseJob::STATUS_ERROR, $archivedJob->getStatus());
384
        self::assertEquals(1, $archivedJob->getStalledCount());
385
        $objectManager->remove($archivedJob);
386
        $objectManager->flush();
387
388
        // multiple
389
390
        /** @var JobManager|\Dtc\QueueBundle\ORM\JobManager $jobManager */
391
        $jobManager = self::$jobManager;
392
393
        $job = new self::$jobClass(self::$worker, false, null);
394
        $job->fibonacci(1);
395
        self::assertNotNull($job->getId(), 'Job id should be generated');
396
        $job->setStatus(BaseJob::STATUS_RUNNING);
397
        $job->setLocked(true);
398
        $time = time();
399
        $date = new \DateTime("@$time");
400
        $job->setLockedAt($date);
401
        $id = $job->getId();
402
        $job = $jobManager->getRepository()->find($id);
403
404
        self::assertNotNull($job);
405
406
        $runClass = $jobManager->getRunClass();
407
408
        $objectManager = $jobManager->getObjectManager();
409
        $run = new $runClass();
410
        $run->setLastHeartbeatAt(new \DateTime());
411
        $objectManager->persist($run);
412
        $objectManager->flush();
413
        $runId = $run->getId();
414
        self::assertNotNull($runId);
415
        $job->setRunId($runId);
416
        $objectManager->persist($job);
417
        $objectManager->flush();
418
        $objectManager->remove($run);
419
        $objectManager->flush();
420
        $id = $job->getId();
421
        $job = $jobManager->getRepository()->find($id);
422
423
        self::assertNotNull($job);
424
425
        $archivedRun = $objectManager->getRepository($jobManager->getRunArchiveClass())->find($runId);
426
427
        $minusTime = $time - (BaseJobManager::STALLED_SECONDS + 1);
428
        $archivedRun->setEndedAt(new \DateTime("@$minusTime"));
429
430
        $objectManager->persist($archivedRun);
431
        $objectManager->flush();
432
433
        $job = new self::$jobClass(self::$worker, false, null);
434
        $job->fibonacci(1);
435
        self::assertNotNull($job->getId(), 'Job id should be generated');
436
        $job->setStatus(BaseJob::STATUS_RUNNING);
437
        $job->setLocked(true);
438
        $time = time();
439
        $date = new \DateTime("@$time");
440
        $job->setLockedAt($date);
441
        $id = $job->getId();
442
        $job = $jobManager->getRepository()->find($id);
443
444
        self::assertNotNull($job);
445
446
        $runClass = $jobManager->getRunClass();
447
448
        $objectManager = $jobManager->getObjectManager();
449
        $run = new $runClass();
450
        $run->setLastHeartbeatAt(new \DateTime());
451
        $objectManager->persist($run);
452
        $objectManager->flush();
453
        $runId = $run->getId();
454
        self::assertNotNull($runId);
455
        $job->setRunId($runId);
456
        $objectManager->persist($job);
457
        $objectManager->flush();
458
        $objectManager->remove($run);
459
        $objectManager->flush();
460
        $id = $job->getId();
461
        $job = $jobManager->getRepository()->find($id);
462
463
        self::assertNotNull($job);
464
465
        $archivedRun = $objectManager->getRepository($jobManager->getRunArchiveClass())->find($runId);
466
467
        $minusTime = $time - (BaseJobManager::STALLED_SECONDS + 1);
468
        $archivedRun->setEndedAt(new \DateTime("@$minusTime"));
469
470
        $objectManager->persist($archivedRun);
471
        $objectManager->flush();
472
        $count = $jobManager->pruneStalledJobs();
473
        self::assertEquals(2, $count);
474
    }
475
476
    public function testBatchJobs() {
477
        $jobs = self::$jobManager->getRepository()->findAll();
478
        foreach ($jobs as $job) {
479
            self::$jobManager->getObjectManager()->remove($job);
480
        }
481
        self::$jobManager->getObjectManager()->flush();
482
483
        self::$jobManager->getObjectManager()->clear();
484
485
        /** @var JobManager|\Dtc\QueueBundle\ORM\JobManager $jobManager */
486
        $worker = self::$worker;
487
        $worker->later()->fibonacci(1);
488
        $worker->batchLater()->fibonacci(1);
489
490
        $jobs = self::$jobManager->getRepository()->findAll();
491
        self::assertCount(1, $jobs);
492
    }
493
494
    public function testPruneExpiredJobs()
495
    {
496
        /** @var JobManager|\Dtc\QueueBundle\ORM\JobManager $jobManager */
497
        $jobManager = self::$jobManager;
498
        $objectManager = $jobManager->getObjectManager();
499
500
        $job = new self::$jobClass(self::$worker, false, null);
501
        $job->fibonacci(1);
502
        self::assertNotNull($job->getId(), 'Job id should be generated');
503
        $time = time() - 1;
504
        $date = new \DateTime("@$time");
505
        $job->setExpiresAt($date);
506
        $objectManager->persist($job);
507
        $objectManager->flush();
508
509
        $count = $jobManager->pruneExpiredJobs('asdf');
510
        self::assertEquals(0, $count);
511
        $count = $jobManager->pruneExpiredJobs(null, 'asdf');
512
        self::assertEquals(0, $count);
513
        $count = $jobManager->pruneExpiredJobs(null, 'fibonacci');
514
        self::assertEquals(1, $count);
515
516
        $job = new self::$jobClass(self::$worker, false, null);
517
        $job->fibonacci(1);
518
        self::assertNotNull($job->getId(), 'Job id should be generated');
519
        $time = time() - 1;
520
        $date = new \DateTime("@$time");
521
        $job->setExpiresAt($date);
522
        $objectManager->persist($job);
523
        $objectManager->flush();
524
525
        $job = new self::$jobClass(self::$worker, false, null);
526
        $job->fibonacci(1);
527
        self::assertNotNull($job->getId(), 'Job id should be generated');
528
        $time = time() - 1;
529
        $date = new \DateTime("@$time");
530
        $job->setExpiresAt($date);
531
        $objectManager->persist($job);
532
        $objectManager->flush();
533
534
        $count = $jobManager->pruneExpiredJobs(null, 'fibonacci');
535
        self::assertEquals(2, $count);
536
537
        $job = new self::$jobClass(self::$worker, false, null);
538
        $job->fibonacci(1);
539
        self::assertNotNull($job->getId(), 'Job id should be generated');
540
        $time = time() - 1;
541
        $date = new \DateTime("@$time");
542
        $job->setExpiresAt($date);
543
        $objectManager->persist($job);
544
        $objectManager->flush();
545
546
        $job = new self::$jobClass(self::$worker, false, null);
547
        $job->fibonacci(1);
548
        self::assertNotNull($job->getId(), 'Job id should be generated');
549
        $time = time() - 1;
550
        $date = new \DateTime("@$time");
551
        $job->setExpiresAt($date);
552
        $objectManager->persist($job);
553
        $objectManager->flush();
554
555
        $count = $jobManager->pruneExpiredJobs('fibonacci', 'fibonacci');
556
        self::assertEquals(2, $count);
557
558
        $job = new self::$jobClass(self::$worker, false, null);
559
        $job->fibonacci(1);
560
        self::assertNotNull($job->getId(), 'Job id should be generated');
561
        $time = time() - 1;
562
        $date = new \DateTime("@$time");
563
        $job->setExpiresAt($date);
564
        $objectManager->persist($job);
565
        $objectManager->flush();
566
567
        $job = new self::$jobClass(self::$worker, false, null);
568
        $job->fibonacci(1);
569
        self::assertNotNull($job->getId(), 'Job id should be generated');
570
        $time = time() - 1;
571
        $date = new \DateTime("@$time");
572
        $job->setExpiresAt($date);
573
        $objectManager->persist($job);
574
        $objectManager->flush();
575
576
        $count = $jobManager->pruneExpiredJobs('fibonacci');
577
        self::assertEquals(2, $count);
578
579
        $job = new self::$jobClass(self::$worker, false, null);
580
        $job->fibonacci(1);
581
        self::assertNotNull($job->getId(), 'Job id should be generated');
582
        $time = time() - 1;
583
        $date = new \DateTime("@$time");
584
        $job->setExpiresAt($date);
585
        $objectManager->persist($job);
586
        $objectManager->flush();
587
588
        $jobId1 = $job->getId();
589
590
        $job = new self::$jobClass(self::$worker, false, null);
591
        $job->fibonacci(1);
592
        self::assertNotNull($job->getId(), 'Job id should be generated');
593
        $time = time() - 1;
594
        $date = new \DateTime("@$time");
595
        $job->setExpiresAt($date);
596
        $objectManager->persist($job);
597
        $objectManager->flush();
598
599
        $jobId2 = $job->getId();
600
601
        $count = $jobManager->pruneExpiredJobs();
602
        self::assertEquals(2, $count);
603
604
        $archiveRepository = $jobManager->getObjectManager()->getRepository($jobManager->getArchiveObjectName());
605
606
        $job = $archiveRepository->find($jobId1);
607
        self::assertNotNull($job);
608
        self::assertEquals(Job::STATUS_EXPIRED, $job->getStatus());
609
610
        $job = $archiveRepository->find($jobId2);
611
        self::assertNotNull($job);
612
        self::assertEquals(Job::STATUS_EXPIRED, $job->getStatus());
613
    }
614
615
    public function testPruneArchivedJobs()
616
    {
617
        /** @var JobManager|\Dtc\QueueBundle\ORM\JobManager $jobManager */
618
        $jobManager = self::$jobManager;
619
        $objectManager = $jobManager->getObjectManager();
620
        $jobArchiveClass = $jobManager->getArchiveObjectName();
621
        $jobArchiveRepository = $objectManager->getRepository($jobArchiveClass);
622
623
        self::$objectManager->getEventManager()->removeEventListener('preUpdate', self::$dtcQueueListener);
624
625
        $job = new self::$jobClass(self::$worker, false, null);
626
        $job->fibonacci(1);
627
        $id = $job->getId();
628
        $objectManager->remove($job);
629
        $objectManager->flush();
630
631
        $jobArchive = $jobArchiveRepository->find($id);
632
        self::assertNotNull($jobArchive);
633
        $time = time() - 86401;
634
        $jobArchive->setUpdatedAt(new \DateTime("@$time"));
635
        $objectManager->persist($jobArchive);
636
        $objectManager->flush();
637
638
        $older = $time + 1;
639
        $count = $jobManager->pruneArchivedJobs(new \DateTime("@$time"));
640
        self::assertEquals(0, $count);
641
        $count = $jobManager->pruneArchivedJobs(new \DateTime("@$older"));
642
        self::assertEquals(1, $count);
643
644
        $job = new self::$jobClass(self::$worker, false, null);
645
        $job->fibonacci(1);
646
        $id = $job->getId();
647
        $objectManager->remove($job);
648
        $objectManager->flush();
649
650
        $jobArchive = $jobArchiveRepository->find($id);
651
        self::assertNotNull($jobArchive);
652
        $time = time() - 86401;
653
        $jobArchive->setUpdatedAt(new \DateTime("@$time"));
654
        $objectManager->persist($jobArchive);
655
        $objectManager->flush();
656
657
        $job = new self::$jobClass(self::$worker, false, null);
658
        $job->fibonacci(1);
659
        $id = $job->getId();
660
        $objectManager->remove($job);
661
        $objectManager->flush();
662
663
        $jobArchive = $jobArchiveRepository->find($id);
664
        self::assertNotNull($jobArchive);
665
        $jobArchive->setUpdatedAt(new \DateTime("@$time"));
666
        $objectManager->persist($jobArchive);
667
        $objectManager->flush();
668
        $older = $time + 1;
669
        $count = $jobManager->pruneArchivedJobs(new \DateTime("@$time"));
670
        self::assertEquals(0, $count);
671
        $count = $jobManager->pruneArchivedJobs(new \DateTime("@$older"));
672
        self::assertEquals(2, $count);
673
674
        self::$objectManager->getEventManager()->addEventListener('preUpdate', self::$dtcQueueListener);
675
    }
676
677
    public function testPerformance()
678
    {
679
        $jobs = self::$jobManager->getRepository()->findAll();
680
        foreach ($jobs as $job) {
681
            self::$jobManager->getObjectManager()->remove($job);
682
        }
683
        self::$jobManager->getObjectManager()->flush();
684
685
        self::$jobManager->getObjectManager()->clear();
686
        parent::testPerformance();
687
    }
688
689
    protected function getBaseStatus()
690
    {
691
        /** @var BaseJobManager $jobManager */
692
        $jobManager = self::$jobManager;
693
        $job = new self::$jobClass(self::$worker, false, null);
694
        $job->fibonacci(1);
695
        $status = $jobManager->getStatus();
696
        self::assertArrayHasKey('fibonacci->fibonacci()', $status);
697
        $fibonacciStatus = $status['fibonacci->fibonacci()'];
698
699
        self::assertArrayHasKey(BaseJob::STATUS_NEW, $fibonacciStatus);
700
        self::assertArrayHasKey(BaseJob::STATUS_ERROR, $fibonacciStatus);
701
        self::assertArrayHasKey(BaseJob::STATUS_RUNNING, $fibonacciStatus);
702
        self::assertArrayHasKey(BaseJob::STATUS_SUCCESS, $fibonacciStatus);
703
        self::assertArrayHasKey(RetryableJob::STATUS_MAX_STALLED, $fibonacciStatus);
704
        self::assertArrayHasKey(RetryableJob::STATUS_MAX_ERROR, $fibonacciStatus);
705
        self::assertArrayHasKey(RetryableJob::STATUS_MAX_RETRIES, $fibonacciStatus);
706
        self::assertArrayHasKey(RetryableJob::STATUS_EXPIRED, $fibonacciStatus);
707
708
        return [$job, $status];
709
    }
710
711
    public function testGetStatus()
712
    {
713
        list($job1, $status1) = $this->getBaseStatus();
714
        list($job2, $status2) = $this->getBaseStatus();
715
        $fibonacciStatus1 = $status1['fibonacci->fibonacci()'];
716
        $fibonacciStatus2 = $status2['fibonacci->fibonacci()'];
717
718
        self::assertEquals($fibonacciStatus1[BaseJob::STATUS_NEW] + 1, $fibonacciStatus2[BaseJob::STATUS_NEW]);
719
        $jobManager = self::$jobManager;
720
        $objectManager = $jobManager->getObjectManager();
721
        $objectManager->remove($job1);
722
        $objectManager->remove($job2);
723
    }
724
}
725