Completed
Push — master ( c6ec30...6c3722 )
by Matthew
18:20
created

BaseJobManagerTest::testGetStatus()   A

Complexity

Conditions 1
Paths 1

Size

Total Lines 12
Code Lines 10

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
dl 0
loc 12
rs 9.4285
c 0
b 0
f 0
cc 1
eloc 10
nc 1
nop 0
1
<?php
2
3
namespace Dtc\QueueBundle\Tests\Doctrine;
4
5
use Doctrine\ODM\MongoDB\DocumentManager;
6
use Doctrine\ORM\EntityManager;
7
use Dtc\QueueBundle\Doctrine\BaseJobManager;
8
use Dtc\QueueBundle\Doctrine\DtcQueueListener;
9
use Dtc\QueueBundle\Model\BaseJob;
10
use Dtc\QueueBundle\Model\Job;
11
use Dtc\QueueBundle\Model\RetryableJob;
12
use Dtc\QueueBundle\Tests\FibonacciWorker;
13
use Dtc\QueueBundle\Tests\Model\BaseJobManagerTest as BaseBaseJobManagerTest;
14
use Dtc\QueueBundle\ODM\JobManager;
15
use Dtc\QueueBundle\Tests\ORM\JobManagerTest;
16
use Symfony\Component\DependencyInjection\Container;
17
use Symfony\Component\DependencyInjection\ParameterBag\ParameterBag;
18
19
/**
20
 * @author David
21
 *
22
 * This test requires local mongodb running
23
 */
24
abstract class BaseJobManagerTest extends BaseBaseJobManagerTest
25
{
26
    protected static $dtcQueueListener;
27
28
    /** @var DocumentManager|EntityManager */
29
    protected static $objectManager;
30
    protected static $objectName;
31
    protected static $archiveObjectName;
32
    protected static $runClass;
33
    protected static $runArchiveClass;
34
    protected static $jobManagerClass;
35
36
    public static function setUpBeforeClass()
37
    {
38
        self::$jobManager = new self::$jobManagerClass(self::$objectManager, self::$objectName, self::$archiveObjectName, self::$runClass, self::$runArchiveClass);
39
40
        /** @var BaseJobManager $jobManager */
41
        $jobManager = self::$jobManager;
42
43
        $parameters = new ParameterBag();
44
45
        $container = new Container($parameters);
46
        $container->set('dtc_queue.job_manager', $jobManager);
47
48
        self::$dtcQueueListener = new DtcQueueListener($container);
49
        self::$objectManager->getEventManager()->addEventListener('preUpdate', self::$dtcQueueListener);
50
        self::$objectManager->getEventManager()->addEventListener('prePersist', self::$dtcQueueListener);
51
        self::$objectManager->getEventManager()->addEventListener('preRemove', self::$dtcQueueListener);
52
53
        self::$worker = new FibonacciWorker();
54
55
        self::$worker->setJobClass($jobManager->getRepository()->getClassName());
56
        parent::setUpBeforeClass();
57
    }
58
59
    public static function tearDownAfterClass()
60
    {
61
        self::$objectManager->getEventManager()->removeEventListener('preUpdate', self::$dtcQueueListener);
62
        self::$objectManager->getEventManager()->removeEventListener('prePersist', self::$dtcQueueListener);
63
        self::$objectManager->getEventManager()->removeEventListener('preRemove', self::$dtcQueueListener);
64
        parent::tearDownAfterClass();
65
    }
66
67
    public function testDeleteJob()
68
    {
69
        /** @var JobManager|\Dtc\QueueBundle\ORM\JobManager $jobManager */
70
        $jobManager = self::$jobManager;
71
72
        /** @var Job $job */
73
        $job = $this->getNewJob();
74
        $id = $job->getId();
75
        $jobManager->deleteJob($job);
76
77
        $nextJob = $jobManager->getJob();
78
        self::assertNull($nextJob, "Shouldn't be any jobs left in queue");
79
80
        $archiveObjectName = $jobManager->getArchiveObjectName();
81
82
        self::assertNotNull($id);
83
        $archiveRepository = $jobManager->getObjectManager()->getRepository($archiveObjectName);
84
        $result = $archiveRepository->find($id);
85
        self::assertNotNull($result);
86
        self::assertEquals($id, $result->getId());
87
    }
88
89
    public function testResetErroneousJobs()
90
    {
91
        /** @var JobManager|\Dtc\QueueBundle\ORM\JobManager $jobManager */
92
        $jobManager = self::$jobManager;
93
94
        /** @var Job $job */
95
        $job = $this->getNewJob();
96
        $id = $job->getId();
97
        $jobManager->deleteJob($job);
98
99
        /** @var JobManager|\Dtc\QueueBundle\ORM\JobManager $jobManager */
100
        $jobManager = self::$jobManager;
101
        $archiveObjectName = $jobManager->getArchiveObjectName();
102
103
        $objectManager = $jobManager->getObjectManager();
104
105
        $archiveRepository = $objectManager->getRepository($archiveObjectName);
106
        $result = $archiveRepository->find($id);
107
        self::assertNotNull($result);
108
        self::assertEquals($id, $result->getId());
109
110
        $result->setStatus(BaseJob::STATUS_ERROR);
111
        $result->setLocked(true);
112
        $result->setLockedAt(new \DateTime());
113
        $result->setFinishedAt(new \DateTime());
114
        $result->setElapsed(12345);
115
        $result->setMessage('soomething');
116
        $objectManager->persist($result);
117
        $objectManager->flush();
118
119
        if ($objectManager instanceof EntityManager) {
120
            JobManagerTest::createObjectManager();
121
            $jobManager = new self::$jobManagerClass(self::$objectManager, self::$objectName, self::$archiveObjectName, self::$runClass, self::$runArchiveClass);
122
            $jobManager->getObjectManager()->clear();
123
            $objectManager = $jobManager->getObjectManager();
124
        }
125
126
        $count = $jobManager->resetErroneousJobs();
127
128
        self::assertEquals(1, $count);
129
        $repository = $jobManager->getRepository();
130
        $job = $repository->find($id);
131
132
        self::assertNotNull($job);
133
        self::assertEquals(BaseJob::STATUS_NEW, $job->getStatus());
134
        self::assertNull($job->getLockedAt());
135
        self::assertNull($job->getFinishedAt());
136
        self::assertNull($job->getElapsed());
137
        self::assertNull($job->getMessage());
138
        self::assertNull($job->getLocked());
139
140
        $objectManager->remove($job);
141
        $objectManager->flush();
142
    }
143
144
    public function testResetStalledJobs()
145
    {
146
        /** @var JobManager|\Dtc\QueueBundle\ORM\JobManager $jobManager */
147
        $jobManager = self::$jobManager;
148
149
        $job = new self::$jobClass(self::$worker, false, null);
150
        $job->fibonacci(1);
151
        self::assertNotNull($job->getId(), 'Job id should be generated');
152
        $job->setStatus(BaseJob::STATUS_RUNNING);
153
        $job->setLocked(true);
154
        $time = time();
155
        $date = new \DateTime("@$time");
156
        $job->setLockedAt($date);
157
        $id = $job->getId();
158
        $job = $jobManager->getRepository()->find($id);
159
160
        self::assertNotNull($job);
161
162
        $runClass = $jobManager->getRunClass();
163
164
        $objectManager = $jobManager->getObjectManager();
165
        $run = new $runClass();
166
        $run->setLastHeartbeatAt(new \DateTime());
167
        $objectManager->persist($run);
168
        $objectManager->flush();
169
        $runId = $run->getId();
170
        self::assertNotNull($runId);
171
        $job->setRunId($runId);
172
        $objectManager->persist($job);
173
        $objectManager->flush();
174
        $objectManager->remove($run);
175
        $objectManager->flush();
176
        $id = $job->getId();
177
        $job = $jobManager->getRepository()->find($id);
178
179
        self::assertNotNull($job);
180
181
        $archivedRun = $objectManager->getRepository($jobManager->getRunArchiveClass())->find($runId);
182
183
        $minusTime = $time - (BaseJobManager::STALLED_SECONDS + 1);
184
        $archivedRun->setEndedAt(new \DateTime("@$minusTime"));
185
186
        $objectManager->persist($archivedRun);
187
        $objectManager->flush();
188
189
        $count = $jobManager->resetStalledJobs();
190
191
        self::assertEquals(1, $count);
192
193
        $id = $job->getId();
194
        $job = $jobManager->getRepository()->find($id);
195
196
        self::assertNotNull($job);
197
        self::assertEquals(BaseJob::STATUS_NEW, $job->getStatus());
198
        self::assertNull($job->getLockedAt());
199
        self::assertNull($job->getFinishedAt());
200
        self::assertNull($job->getElapsed());
201
        self::assertNull($job->getMessage());
202
        self::assertNull($job->getLocked());
203
        self::assertEquals(1, $job->getStalledCount());
204
205
        $objectManager->remove($job);
206
        $objectManager->flush();
207
    }
208
209
    public function testPruneErroneousJobs()
210
    {
211
        $job = $this->getJob();
212
        $id = $job->getId();
213
214
        /** @var JobManager|\Dtc\QueueBundle\ORM\JobManager $jobManager */
215
        $jobManager = self::$jobManager;
216
        $jobManager->deleteJob($job);
217
        $archiveObjectName = $jobManager->getArchiveObjectName();
218
219
        $objectManager = $jobManager->getObjectManager();
220
221
        $archiveRepository = $objectManager->getRepository($archiveObjectName);
222
        $result = $archiveRepository->find($id);
223
        self::assertNotNull($result);
224
        self::assertEquals($id, $result->getId());
225
226
        $result->setStatus(BaseJob::STATUS_ERROR);
227
        $result->setLocked(true);
228
        $result->setLockedAt(new \DateTime());
229
        $result->setFinishedAt(new \DateTime());
230
        $result->setElapsed(12345);
231
        $result->setMessage('soomething');
232
        $objectManager->persist($result);
233
        $objectManager->flush();
234
235
        $count = $jobManager->pruneErroneousJobs('asdf');
236
        self::assertEquals(0, $count);
237
        $count = $jobManager->pruneErroneousJobs(null, 'asdf');
238
        self::assertEquals(0, $count);
239
        $count = $jobManager->pruneErroneousJobs('fibonacci', 'asdf');
240
        self::assertEquals(0, $count);
241
        $count = $jobManager->pruneErroneousJobs('fibonacci', 'asdf');
242
        self::assertEquals(0, $count);
243
        $count = $jobManager->pruneErroneousJobs('fibonacci', 'fibonacci');
244
        self::assertEquals(1, $count);
245
        $repository = $jobManager->getRepository();
246
        $job = $repository->find($id);
247
        $objectManager->clear();
248
        self::assertNull($job);
249
        $archiveJob = $archiveRepository->find($id);
250
        self::assertNull($archiveJob);
251
252
        $job = $this->getJob();
253
        $id = $job->getId();
254
        $objectManager->remove($job);
255
        $objectManager->flush();
256
        /** @var JobManager|\Dtc\QueueBundle\ORM\JobManager $jobManager */
257
        $jobManager = self::$jobManager;
258
        $archiveObjectName = $jobManager->getArchiveObjectName();
259
260
        $objectManager = $jobManager->getObjectManager();
261
262
        $archiveRepository = $objectManager->getRepository($archiveObjectName);
263
        $result = $archiveRepository->find($id);
264
        self::assertNotNull($result);
265
        self::assertEquals($id, $result->getId());
266
267
        $result->setStatus(BaseJob::STATUS_ERROR);
268
        $result->setLocked(true);
269
        $result->setLockedAt(new \DateTime());
270
        $result->setFinishedAt(new \DateTime());
271
        $result->setElapsed(12345);
272
        $result->setMessage('soomething');
273
        $objectManager->persist($result);
274
        $objectManager->flush();
275
276
        $job = $this->getJob();
277
        $id = $job->getId();
278
        $objectManager->remove($job);
279
        $objectManager->flush();
280
281
        /** @var JobManager|\Dtc\QueueBundle\ORM\JobManager $jobManager */
282
        $jobManager = self::$jobManager;
283
        $archiveObjectName = $jobManager->getArchiveObjectName();
284
        $objectManager = $jobManager->getObjectManager();
285
286
        $archiveRepository = $objectManager->getRepository($archiveObjectName);
287
        $result = $archiveRepository->find($id);
288
        self::assertNotNull($result);
289
        self::assertEquals($id, $result->getId());
290
291
        $result->setStatus(BaseJob::STATUS_ERROR);
292
        $result->setLocked(true);
293
        $result->setLockedAt(new \DateTime());
294
        $result->setFinishedAt(new \DateTime());
295
        $result->setElapsed(12345);
296
        $result->setMessage('soomething');
297
        $objectManager->persist($result);
298
        $objectManager->flush();
299
        $count = $jobManager->pruneErroneousJobs();
300
        self::assertEquals(2, $count);
301
    }
302
303
    public function testPruneStalledJobs()
304
    {
305
        static::setUpBeforeClass();
306
307
        /** @var JobManager|\Dtc\QueueBundle\ORM\JobManager $jobManager */
308
        $jobManager = self::$jobManager;
309
310
        $job = new self::$jobClass(self::$worker, false, null);
311
        $job->fibonacci(1);
312
        self::assertNotNull($job->getId(), 'Job id should be generated');
313
        $job->setStatus(BaseJob::STATUS_RUNNING);
314
        $job->setLocked(true);
315
        $time = time();
316
        $date = new \DateTime("@$time");
317
        $job->setLockedAt($date);
318
        $id = $job->getId();
319
        $job = $jobManager->getRepository()->find($id);
320
321
        self::assertNotNull($job);
322
323
        $runClass = $jobManager->getRunClass();
324
325
        $objectManager = $jobManager->getObjectManager();
326
        $run = new $runClass();
327
        $run->setLastHeartbeatAt(new \DateTime());
328
        $objectManager->persist($run);
329
        $objectManager->flush();
330
        $runId = $run->getId();
331
        self::assertNotNull($runId);
332
        $job->setRunId($runId);
333
        $objectManager->persist($job);
334
        $objectManager->flush();
335
        $objectManager->remove($run);
336
        $objectManager->flush();
337
        $id = $job->getId();
338
        $job = $jobManager->getRepository()->find($id);
339
340
        self::assertNotNull($job);
341
342
        $archivedRun = $objectManager->getRepository($jobManager->getRunArchiveClass())->find($runId);
343
344
        $minusTime = $time - (BaseJobManager::STALLED_SECONDS + 1);
345
        $archivedRun->setEndedAt(new \DateTime("@$minusTime"));
346
347
        $objectManager->persist($archivedRun);
348
        $objectManager->flush();
349
350
        $count = $jobManager->pruneStalledJobs('asdf');
351
        self::assertEquals(0, $count);
352
        $count = $jobManager->pruneStalledJobs(null, 'asdf');
353
        self::assertEquals(0, $count);
354
        $count = $jobManager->pruneStalledJobs('fibonacci', 'asdf');
355
        self::assertEquals(0, $count);
356
        $count = $jobManager->pruneStalledJobs('fibonacci', 'fibonacci');
357
        self::assertEquals(1, $count);
358
359
        $job = $jobManager->getRepository()->find($id);
360
361
        self::assertNull($job);
362
363
        $archivedJob = $jobManager->getObjectManager()->getRepository($jobManager->getArchiveObjectName())->find($id);
364
365
        self::assertNotNull($archivedJob);
366
        self::assertEquals(BaseJob::STATUS_ERROR, $archivedJob->getStatus());
367
        self::assertEquals(1, $archivedJob->getStalledCount());
368
        $objectManager->remove($archivedJob);
369
        $objectManager->flush();
370
371
        // multiple
372
373
        /** @var JobManager|\Dtc\QueueBundle\ORM\JobManager $jobManager */
374
        $jobManager = self::$jobManager;
375
376
        $job = new self::$jobClass(self::$worker, false, null);
377
        $job->fibonacci(1);
378
        self::assertNotNull($job->getId(), 'Job id should be generated');
379
        $job->setStatus(BaseJob::STATUS_RUNNING);
380
        $job->setLocked(true);
381
        $time = time();
382
        $date = new \DateTime("@$time");
383
        $job->setLockedAt($date);
384
        $id = $job->getId();
385
        $job = $jobManager->getRepository()->find($id);
386
387
        self::assertNotNull($job);
388
389
        $runClass = $jobManager->getRunClass();
390
391
        $objectManager = $jobManager->getObjectManager();
392
        $run = new $runClass();
393
        $run->setLastHeartbeatAt(new \DateTime());
394
        $objectManager->persist($run);
395
        $objectManager->flush();
396
        $runId = $run->getId();
397
        self::assertNotNull($runId);
398
        $job->setRunId($runId);
399
        $objectManager->persist($job);
400
        $objectManager->flush();
401
        $objectManager->remove($run);
402
        $objectManager->flush();
403
        $id = $job->getId();
404
        $job = $jobManager->getRepository()->find($id);
405
406
        self::assertNotNull($job);
407
408
        $archivedRun = $objectManager->getRepository($jobManager->getRunArchiveClass())->find($runId);
409
410
        $minusTime = $time - (BaseJobManager::STALLED_SECONDS + 1);
411
        $archivedRun->setEndedAt(new \DateTime("@$minusTime"));
412
413
        $objectManager->persist($archivedRun);
414
        $objectManager->flush();
415
416
        $job = new self::$jobClass(self::$worker, false, null);
417
        $job->fibonacci(1);
418
        self::assertNotNull($job->getId(), 'Job id should be generated');
419
        $job->setStatus(BaseJob::STATUS_RUNNING);
420
        $job->setLocked(true);
421
        $time = time();
422
        $date = new \DateTime("@$time");
423
        $job->setLockedAt($date);
424
        $id = $job->getId();
425
        $job = $jobManager->getRepository()->find($id);
426
427
        self::assertNotNull($job);
428
429
        $runClass = $jobManager->getRunClass();
430
431
        $objectManager = $jobManager->getObjectManager();
432
        $run = new $runClass();
433
        $objectManager->persist($run);
434
        $objectManager->flush();
435
        $runId = $run->getId();
436
        self::assertNotNull($runId);
437
        $job->setRunId($runId);
438
        $objectManager->persist($job);
439
        $objectManager->flush();
440
        $objectManager->remove($run);
441
        $objectManager->flush();
442
        $id = $job->getId();
443
        $job = $jobManager->getRepository()->find($id);
444
445
        self::assertNotNull($job);
446
447
        $archivedRun = $objectManager->getRepository($jobManager->getRunArchiveClass())->find($runId);
448
449
        $minusTime = $time - (BaseJobManager::STALLED_SECONDS + 1);
450
        $archivedRun->setEndedAt(new \DateTime("@$minusTime"));
451
452
        $objectManager->persist($archivedRun);
453
        $objectManager->flush();
454
        $count = $jobManager->pruneStalledJobs();
455
        self::assertEquals(2, $count);
456
    }
457
458
    public function testPruneExpiredJobs()
459
    {
460
        /** @var JobManager|\Dtc\QueueBundle\ORM\JobManager $jobManager */
461
        $jobManager = self::$jobManager;
462
        $objectManager = $jobManager->getObjectManager();
463
464
        $job = new self::$jobClass(self::$worker, false, null);
465
        $job->fibonacci(1);
466
        self::assertNotNull($job->getId(), 'Job id should be generated');
467
        $time = time() - 1;
468
        $date = new \DateTime("@$time");
469
        $job->setExpiresAt($date);
470
        $objectManager->persist($job);
471
        $objectManager->flush();
472
473
        $count = $jobManager->pruneExpiredJobs('asdf');
474
        self::assertEquals(0, $count);
475
        $count = $jobManager->pruneExpiredJobs(null, 'asdf');
476
        self::assertEquals(0, $count);
477
        $count = $jobManager->pruneExpiredJobs(null, 'fibonacci');
478
        self::assertEquals(1, $count);
479
480
        $job = new self::$jobClass(self::$worker, false, null);
481
        $job->fibonacci(1);
482
        self::assertNotNull($job->getId(), 'Job id should be generated');
483
        $time = time() - 1;
484
        $date = new \DateTime("@$time");
485
        $job->setExpiresAt($date);
486
        $objectManager->persist($job);
487
        $objectManager->flush();
488
489
        $job = new self::$jobClass(self::$worker, false, null);
490
        $job->fibonacci(1);
491
        self::assertNotNull($job->getId(), 'Job id should be generated');
492
        $time = time() - 1;
493
        $date = new \DateTime("@$time");
494
        $job->setExpiresAt($date);
495
        $objectManager->persist($job);
496
        $objectManager->flush();
497
498
        $count = $jobManager->pruneExpiredJobs(null, 'fibonacci');
499
        self::assertEquals(2, $count);
500
501
        $job = new self::$jobClass(self::$worker, false, null);
502
        $job->fibonacci(1);
503
        self::assertNotNull($job->getId(), 'Job id should be generated');
504
        $time = time() - 1;
505
        $date = new \DateTime("@$time");
506
        $job->setExpiresAt($date);
507
        $objectManager->persist($job);
508
        $objectManager->flush();
509
510
        $job = new self::$jobClass(self::$worker, false, null);
511
        $job->fibonacci(1);
512
        self::assertNotNull($job->getId(), 'Job id should be generated');
513
        $time = time() - 1;
514
        $date = new \DateTime("@$time");
515
        $job->setExpiresAt($date);
516
        $objectManager->persist($job);
517
        $objectManager->flush();
518
519
        $count = $jobManager->pruneExpiredJobs('fibonacci', 'fibonacci');
520
        self::assertEquals(2, $count);
521
522
        $job = new self::$jobClass(self::$worker, false, null);
523
        $job->fibonacci(1);
524
        self::assertNotNull($job->getId(), 'Job id should be generated');
525
        $time = time() - 1;
526
        $date = new \DateTime("@$time");
527
        $job->setExpiresAt($date);
528
        $objectManager->persist($job);
529
        $objectManager->flush();
530
531
        $job = new self::$jobClass(self::$worker, false, null);
532
        $job->fibonacci(1);
533
        self::assertNotNull($job->getId(), 'Job id should be generated');
534
        $time = time() - 1;
535
        $date = new \DateTime("@$time");
536
        $job->setExpiresAt($date);
537
        $objectManager->persist($job);
538
        $objectManager->flush();
539
540
        $count = $jobManager->pruneExpiredJobs('fibonacci');
541
        self::assertEquals(2, $count);
542
543
        $job = new self::$jobClass(self::$worker, false, null);
544
        $job->fibonacci(1);
545
        self::assertNotNull($job->getId(), 'Job id should be generated');
546
        $time = time() - 1;
547
        $date = new \DateTime("@$time");
548
        $job->setExpiresAt($date);
549
        $objectManager->persist($job);
550
        $objectManager->flush();
551
552
        $jobId1 = $job->getId();
553
554
        $job = new self::$jobClass(self::$worker, false, null);
555
        $job->fibonacci(1);
556
        self::assertNotNull($job->getId(), 'Job id should be generated');
557
        $time = time() - 1;
558
        $date = new \DateTime("@$time");
559
        $job->setExpiresAt($date);
560
        $objectManager->persist($job);
561
        $objectManager->flush();
562
563
        $jobId2 = $job->getId();
564
565
        $count = $jobManager->pruneExpiredJobs();
566
        self::assertEquals(2, $count);
567
568
        $archiveRepository = $jobManager->getObjectManager()->getRepository($jobManager->getArchiveObjectName());
569
570
        $job = $archiveRepository->find($jobId1);
571
        self::assertNotNull($job);
572
        self::assertEquals(Job::STATUS_EXPIRED, $job->getStatus());
573
574
        $job = $archiveRepository->find($jobId2);
575
        self::assertNotNull($job);
576
        self::assertEquals(Job::STATUS_EXPIRED, $job->getStatus());
577
    }
578
579
    public function testPruneArchivedJobs()
580
    {
581
        /** @var JobManager|\Dtc\QueueBundle\ORM\JobManager $jobManager */
582
        $jobManager = self::$jobManager;
583
        $objectManager = $jobManager->getObjectManager();
584
        $jobArchiveClass = $jobManager->getArchiveObjectName();
585
        $jobArchiveRepository = $objectManager->getRepository($jobArchiveClass);
586
587
        self::$objectManager->getEventManager()->removeEventListener('preUpdate', self::$dtcQueueListener);
588
589
        $job = new self::$jobClass(self::$worker, false, null);
590
        $job->fibonacci(1);
591
        $id = $job->getId();
592
        $objectManager->remove($job);
593
        $objectManager->flush();
594
595
        $jobArchive = $jobArchiveRepository->find($id);
596
        self::assertNotNull($jobArchive);
597
        $time = time() - 86401;
598
        $jobArchive->setUpdatedAt(new \DateTime("@$time"));
599
        $objectManager->persist($jobArchive);
600
        $objectManager->flush();
601
602
        $older = $time + 1;
603
        $count = $jobManager->pruneArchivedJobs(new \DateTime("@$time"));
604
        self::assertEquals(0, $count);
605
        $count = $jobManager->pruneArchivedJobs(new \DateTime("@$older"));
606
        self::assertEquals(1, $count);
607
608
        $job = new self::$jobClass(self::$worker, false, null);
609
        $job->fibonacci(1);
610
        $id = $job->getId();
611
        $objectManager->remove($job);
612
        $objectManager->flush();
613
614
        $jobArchive = $jobArchiveRepository->find($id);
615
        self::assertNotNull($jobArchive);
616
        $time = time() - 86401;
617
        $jobArchive->setUpdatedAt(new \DateTime("@$time"));
618
        $objectManager->persist($jobArchive);
619
        $objectManager->flush();
620
621
        $job = new self::$jobClass(self::$worker, false, null);
622
        $job->fibonacci(1);
623
        $id = $job->getId();
624
        $objectManager->remove($job);
625
        $objectManager->flush();
626
627
        $jobArchive = $jobArchiveRepository->find($id);
628
        self::assertNotNull($jobArchive);
629
        $jobArchive->setUpdatedAt(new \DateTime("@$time"));
630
        $objectManager->persist($jobArchive);
631
        $objectManager->flush();
632
        $older = $time + 1;
633
        $count = $jobManager->pruneArchivedJobs(new \DateTime("@$time"));
634
        self::assertEquals(0, $count);
635
        $count = $jobManager->pruneArchivedJobs(new \DateTime("@$older"));
636
        self::assertEquals(2, $count);
637
638
        self::$objectManager->getEventManager()->addEventListener('preUpdate', self::$dtcQueueListener);
639
    }
640
641
    public function testPerformance()
642
    {
643
        $jobs = self::$jobManager->getRepository()->findAll();
644
        foreach ($jobs as $job) {
645
            self::$jobManager->getObjectManager()->remove($job);
646
        }
647
        self::$jobManager->getObjectManager()->flush();
648
649
        self::$jobManager->getObjectManager()->clear();
650
        parent::testPerformance();
651
    }
652
653
    protected function getBaseStatus() {
654
        /** @var BaseJobManager $jobManager */
655
        $jobManager = self::$jobManager;
656
        $status = $jobManager->getStatus();
657
        $job = new self::$jobClass(self::$worker, false, null);
658
        $job->fibonacci(1);
659
        self::assertArrayHasKey('fibonacci->fibonacci', $status);
660
        $fibonacciStatus = $status['fibonacci->fibonacci'];
661
662
        self::assertArrayHasKey(BaseJob::STATUS_NEW, $fibonacciStatus);
663
        self::assertArrayHasKey(BaseJob::STATUS_ERROR, $fibonacciStatus);
664
        self::assertArrayHasKey(BaseJob::STATUS_RUNNING, $fibonacciStatus);
665
        self::assertArrayHasKey(BaseJob::STATUS_SUCCESS, $fibonacciStatus);
666
        self::assertArrayHasKey(RetryableJob::STATUS_MAX_STALLED, $fibonacciStatus);
667
        self::assertArrayHasKey(RetryableJob::STATUS_MAX_ERROR, $fibonacciStatus);
668
        self::assertArrayHasKey(RetryableJob::STATUS_MAX_RETRIES, $fibonacciStatus);
669
        self::assertArrayHasKey(RetryableJob::STATUS_EXPIRED, $fibonacciStatus);
670
        return [$job, $status];
671
    }
672
673
    public function testGetStatus() {
674
        list ($job1, $status1) = $this->getBaseStatus();
675
        list ($job2, $status2) = $this->getBaseStatus();
676
        $fibonacciStatus1 = $status1['fibonacci->fibonacci'];
677
        $fibonacciStatus2 = $status2['fibonacci->fibonacci'];
678
679
        self::assertEquals($fibonacciStatus1[BaseJob::STATUS_NEW] + 1, $fibonacciStatus2[BaseJob::STATUS_NEW]);
680
        $jobManager = self::$jobManager;
681
        $objectManager = $jobManager->getObjectManager();
682
        $objectManager->remove($job1);
683
        $objectManager->remove($job2);
684
    }
685
}
686