Completed
Push — master ( 1204eb...5574d8 )
by Matthew
05:57
created

BaseJobManagerTest::testResetStalledJobs()   B

Complexity

Conditions 1
Paths 1

Size

Total Lines 105
Code Lines 79

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
dl 0
loc 105
rs 8.2857
c 0
b 0
f 0
cc 1
eloc 79
nc 1
nop 0

How to fix   Long Method   

Long Method

Small methods make your code easier to understand, in particular if combined with a good name. Besides, if your method is small, finding a good name is usually much easier.

For example, if you find yourself adding comments to a method's body, this is usually a good sign to extract the commented part to a new method, and use the comment as a starting point when coming up with a good name for this new method.

Commonly applied refactorings include:

1
<?php
2
3
namespace Dtc\QueueBundle\Tests\Doctrine;
4
5
use Doctrine\ODM\MongoDB\DocumentManager;
6
use Doctrine\ORM\EntityManager;
7
use Dtc\QueueBundle\Doctrine\BaseJobManager;
8
use Dtc\QueueBundle\Doctrine\DtcQueueListener;
9
use Dtc\QueueBundle\Model\BaseJob;
10
use Dtc\QueueBundle\Model\Job;
11
use Dtc\QueueBundle\Tests\Model\PriorityTestTrait;
12
use Dtc\QueueBundle\Model\RetryableJob;
13
use Dtc\QueueBundle\Tests\FibonacciWorker;
14
use Dtc\QueueBundle\Tests\Model\BaseJobManagerTest as BaseBaseJobManagerTest;
15
use Dtc\QueueBundle\ODM\JobManager;
16
use Dtc\QueueBundle\Tests\ORM\JobManagerTest;
17
use Symfony\Component\DependencyInjection\Container;
18
use Symfony\Component\DependencyInjection\ParameterBag\ParameterBag;
19
20
/**
21
 * @author David
22
 *
23
 * This test requires local mongodb running
24
 */
25
abstract class BaseJobManagerTest extends BaseBaseJobManagerTest
26
{
27
    use PriorityTestTrait;
28
29
    protected static $dtcQueueListener;
30
31
    /** @var DocumentManager|EntityManager */
32
    protected static $objectManager;
33
    protected static $objectName;
34
    protected static $archiveObjectName;
35
    protected static $runClass;
36
    protected static $runArchiveClass;
37
    protected static $jobTimingClass;
38
    protected static $jobManagerClass;
39
    protected static $runManagerClass;
40
    public static $runManager;
41
42
    public static function setUpBeforeClass()
43
    {
44
        self::$jobManager = new self::$jobManagerClass(self::$objectManager, self::$objectName, self::$archiveObjectName, self::$runClass, self::$runArchiveClass);
45
        self::$jobManager->setMaxPriority(255);
46
        self::$runManager = new self::$runManagerClass(self::$objectManager, self::$runClass, self::$jobTimingClass, true);
47
        self::$runManager->setRunArchiveClass(self::$runArchiveClass);
48
49
        self::assertEquals(255, self::$jobManager->getMaxPriority());
50
        self::assertEquals(JobManager::PRIORITY_DESC, self::$jobManager->getPriorityDirection());
51
        self::$jobManager->setPriorityDirection(JobManager::PRIORITY_ASC);
52
        self::assertEquals(JobManager::PRIORITY_ASC, self::$jobManager->getPriorityDirection());
53
        self::$jobManager->setPriorityDirection(JobManager::PRIORITY_DESC);
54
55
        /** @var BaseJobManager $jobManager */
56
        $jobManager = self::$jobManager;
57
58
        $parameters = new ParameterBag();
59
60
        $container = new Container($parameters);
61
        $container->set('dtc_queue.job_manager', $jobManager);
62
        $container->set('dtc_queue.run_manager', self::$runManager);
63
64
        self::$dtcQueueListener = new DtcQueueListener(self::$jobManager->getArchiveObjectName(), self::$runManager->getRunArchiveClass());
65
        self::$objectManager->getEventManager()->addEventListener('preUpdate', self::$dtcQueueListener);
66
        self::$objectManager->getEventManager()->addEventListener('prePersist', self::$dtcQueueListener);
67
        self::$objectManager->getEventManager()->addEventListener('preRemove', self::$dtcQueueListener);
68
69
        self::$worker = new FibonacciWorker();
70
71
        self::$worker->setJobClass($jobManager->getRepository()->getClassName());
72
        parent::setUpBeforeClass();
73
    }
74
75
    public static function tearDownAfterClass()
76
    {
77
        self::$objectManager->getEventManager()->removeEventListener('preUpdate', self::$dtcQueueListener);
78
        self::$objectManager->getEventManager()->removeEventListener('prePersist', self::$dtcQueueListener);
79
        self::$objectManager->getEventManager()->removeEventListener('preRemove', self::$dtcQueueListener);
80
        parent::tearDownAfterClass();
81
    }
82
83
    public function testDeleteJob()
84
    {
85
        /** @var JobManager|\Dtc\QueueBundle\ORM\JobManager $jobManager */
86
        $jobManager = self::$jobManager;
87
88
        /** @var Job $job */
89
        $job = $this->getJob();
90
        $id = $job->getId();
91
        $jobManager->deleteJob($job);
92
93
        $nextJob = $jobManager->getJob();
94
        self::assertNull($nextJob, "Shouldn't be any jobs left in queue");
95
96
        $archiveObjectName = $jobManager->getArchiveObjectName();
97
98
        self::assertNotNull($id);
99
        $archiveRepository = $jobManager->getObjectManager()->getRepository($archiveObjectName);
100
        $result = $archiveRepository->find($id);
101
        self::assertNotNull($result);
102
        self::assertEquals($id, $result->getId());
103
    }
104
105
    public function testResetErroneousJobs()
106
    {
107
        /** @var JobManager|\Dtc\QueueBundle\ORM\JobManager $jobManager */
108
        $jobManager = self::$jobManager;
109
110
        $id = $this->createErroredJob();
111
        $archiveObjectName = $jobManager->getArchiveObjectName();
112
        $objectManager = $jobManager->getObjectManager();
113
        $archiveRepository = $objectManager->getRepository($archiveObjectName);
114
        $result = $archiveRepository->find($id);
0 ignored issues
show
Unused Code introduced by
$result is not used, you could remove the assignment.

This check looks for variable assignements that are either overwritten by other assignments or where the variable is not used subsequently.

$myVar = 'Value';
$higher = false;

if (rand(1, 6) > 3) {
    $higher = true;
} else {
    $higher = false;
}

Both the $myVar assignment in line 1 and the $higher assignment in line 2 are dead. The first because $myVar is never used and the second because $higher is always overwritten for every possible time line.

Loading history...
115
116
        if ($objectManager instanceof EntityManager) {
117
            JobManagerTest::createObjectManager();
118
            $jobManager = new self::$jobManagerClass(self::$objectManager, self::$objectName, self::$archiveObjectName, self::$runClass, self::$runArchiveClass);
119
            $jobManager->getObjectManager()->clear();
120
            $objectManager = $jobManager->getObjectManager();
121
        }
122
123
        $count = $jobManager->resetErroneousJobs();
124
125
        self::assertEquals(1, $count);
126
        $repository = $jobManager->getRepository();
127
        $job = $repository->find($id);
128
129
        self::assertNotNull($job);
130
        self::assertEquals(BaseJob::STATUS_NEW, $job->getStatus());
131
        self::assertNull($job->getLockedAt());
132
        self::assertNull($job->getFinishedAt());
133
        self::assertNull($job->getElapsed());
134
        self::assertNull($job->getMessage());
135
        self::assertNull($job->getLocked());
136
137
        $objectManager->remove($job);
138
        $objectManager->flush();
139
140
        $id = $this->createErroredJob();
141
        $archiveObjectName = $jobManager->getArchiveObjectName();
142
        $objectManager = $jobManager->getObjectManager();
143
        $archiveRepository = $objectManager->getRepository($archiveObjectName);
144
        $result = $archiveRepository->find($id);
145
        $result->setMaxRetries(10);
146
        $result->setRetries(10);
147
        $objectManager->persist($result);
148
        $objectManager->flush();
149
        $count = $jobManager->resetErroneousJobs();
150
        self::assertEquals(0, $count);
151
        $job = $repository->find($id);
152
        self::assertNull($job);
153
        $job = $archiveRepository->find($id);
154
        self::assertNotNull($job);
155
        $objectManager->remove($job);
156
        $objectManager->flush();
157
    }
158
159
    protected function createErroredJob()
160
    {
161
        /** @var JobManager|\Dtc\QueueBundle\ORM\JobManager $jobManager */
162
        $jobManager = self::$jobManager;
163
164
        /** @var Job $job */
165
        $job = $this->getJob();
166
        $id = $job->getId();
167
        $jobManager->deleteJob($job);
168
169
        /** @var JobManager|\Dtc\QueueBundle\ORM\JobManager $jobManager */
170
        $archiveObjectName = $jobManager->getArchiveObjectName();
171
172
        $objectManager = $jobManager->getObjectManager();
173
174
        $archiveRepository = $objectManager->getRepository($archiveObjectName);
175
        $result = $archiveRepository->find($id);
176
        self::assertNotNull($result);
177
        self::assertEquals($id, $result->getId());
178
179
        $result->setStatus(BaseJob::STATUS_ERROR);
180
        $result->setLocked(true);
181
        $result->setLockedAt(new \DateTime());
182
        $result->setFinishedAt(new \DateTime());
183
        $result->setElapsed(12345);
184
        $result->setMessage('soomething');
185
        $objectManager->persist($result);
186
        $objectManager->flush();
187
188
        return $id;
189
    }
190
191
    /**
192
     * @param bool $flushRun
0 ignored issues
show
Bug introduced by
There is no parameter named $flushRun. Was it maybe removed?

This check looks for PHPDoc comments describing methods or function parameters that do not exist on the corresponding method or function.

Consider the following example. The parameter $italy is not defined by the method finale(...).

/**
 * @param array $germany
 * @param array $island
 * @param array $italy
 */
function finale($germany, $island) {
    return "2:1";
}

The most likely cause is that the parameter was removed, but the annotation was not.

Loading history...
193
     *
194
     * @return mixed
195
     */
196
    public function createStalledJob($endRun, $setId)
197
    {
198
        /** @var JobManager|\Dtc\QueueBundle\ORM\JobManager $jobManager */
199
        $jobManager = self::$jobManager;
200
201
        $job = new self::$jobClass(self::$worker, false, null);
202
        $job->fibonacci(1);
203
        self::assertNotNull($job->getId(), 'Job id should be generated');
204
        $job->setStatus(BaseJob::STATUS_RUNNING);
205
        $job->setLocked(true);
206
        $time = time();
207
        $date = new \DateTime("@$time");
208
        $job->setLockedAt($date);
209
        $id = $job->getId();
210
        $job = $jobManager->getRepository()->find($id);
211
212
        self::assertNotNull($job);
213
214
        $runClass = $jobManager->getRunClass();
215
216
        $objectManager = $jobManager->getObjectManager();
217
        $run = new $runClass();
218
        $run->setLastHeartbeatAt(new \DateTime());
219
        if ($setId) {
220
            $run->setCurrentJobId($job->getId);
221
        }
222
        $objectManager->persist($run);
223
        $objectManager->flush();
224
        $runId = $run->getId();
225
        self::assertNotNull($runId);
226
        $job->setRunId($runId);
227
        $objectManager->persist($job);
228
        $objectManager->flush();
229
        if ($endRun) {
230
            $objectManager->remove($run);
231
            $objectManager->flush();
232
        }
233
        $id = $job->getId();
234
        $job = $jobManager->getRepository()->find($id);
235
236
        self::assertNotNull($job);
237
238
        if ($endRun) {
239
            $archivedRun = $objectManager->getRepository($jobManager->getRunArchiveClass())->find($runId);
240
241
            $minusTime = $time - (BaseJobManager::STALLED_SECONDS + 1);
242
            $archivedRun->setEndedAt(new \DateTime("@$minusTime"));
243
244
            $objectManager->persist($archivedRun);
245
            $objectManager->flush();
246
        }
247
        $id = $job->getId();
248
249
        return $id;
250
    }
251
252
    public function testResetStalledJobs()
253
    {
254
        /** @var JobManager|\Dtc\QueueBundle\ORM\JobManager $jobManager */
255
        $jobManager = self::$jobManager;
256
        $id = $this->createStalledJob(true, false);
257
258
        $objectManager = $jobManager->getObjectManager();
259
        $count = $jobManager->resetStalledJobs();
260
        self::assertEquals(1, $count);
261
262
        $job = $jobManager->getRepository()->find($id);
263
264
        self::assertNotNull($job);
265
        self::assertEquals(BaseJob::STATUS_NEW, $job->getStatus());
266
        self::assertNull($job->getLockedAt());
267
        self::assertNull($job->getFinishedAt());
268
        self::assertNull($job->getElapsed());
269
        self::assertNull($job->getMessage());
270
        self::assertNull($job->getLocked());
271
        self::assertEquals(1, $job->getStalledCount());
272
273
        $objectManager->remove($job);
274
        $objectManager->flush();
275
276
        $jobManager = self::$jobManager;
277
        $id = $this->createStalledJob(true, true);
278
279
        $objectManager = $jobManager->getObjectManager();
280
        $count = $jobManager->resetStalledJobs();
281
        self::assertEquals(1, $count);
282
283
        $job = $jobManager->getRepository()->find($id);
284
285
        self::assertNotNull($job);
286
        $objectManager->remove($job);
287
        $objectManager->flush();
288
289
        /** @var JobManager|\Dtc\QueueBundle\ORM\JobManager $jobManager */
290
        $id = $this->createStalledJob(false, false);
291
292
        $objectManager = $jobManager->getObjectManager();
293
        $count = $jobManager->resetStalledJobs();
294
        self::assertEquals(1, $count);
295
296
        $job = $jobManager->getRepository()->find($id);
297
298
        self::assertNotNull($job);
299
        self::assertEquals(BaseJob::STATUS_NEW, $job->getStatus());
300
        self::assertNull($job->getLockedAt());
301
        self::assertNull($job->getFinishedAt());
302
        self::assertNull($job->getElapsed());
303
        self::assertNull($job->getMessage());
304
        self::assertNull($job->getLocked());
305
        self::assertEquals(1, $job->getStalledCount());
306
307
        $objectManager->remove($job);
308
        $objectManager->flush();
309
310
        /** @var JobManager|\Dtc\QueueBundle\ORM\JobManager $jobManager */
311
        $jobManager = self::$jobManager;
312
        $id = $this->createStalledJob(false, true);
313
314
        $count = $jobManager->resetStalledJobs();
315
        self::assertEquals(1, $count);
316
317
        $job = $jobManager->getRepository()->find($id);
318
        $objectManager = $jobManager->getObjectManager();
319
        $count = $jobManager->resetStalledJobs();
320
        self::assertEquals(0, $count);
321
322
        $objectManager->remove($job);
323
        $objectManager->flush();
324
325
        $id = $this->createStalledJob(true, false);
326
        $job = $jobManager->getRepository()->find($id);
327
        $job->setMaxRetries(10);
328
        $job->setRetries(10);
329
        $objectManager->persist($job);
330
        $objectManager->flush();
331
332
        $count = $jobManager->resetStalledJobs();
333
        self::assertEquals(0, $count);
334
        $job = $jobManager->getRepository()->find($id);
335
        self::assertNull($job);
336
        $job = $objectManager->getRepository($jobManager->getArchiveObjectName())->find($id);
337
        self::assertNotNull($job);
338
        $objectManager->remove($job);
339
        $objectManager->flush();
340
341
        $id = $this->createStalledJob(true, false);
342
        $job = $jobManager->getRepository()->find($id);
343
        $job->setMaxStalled(10);
344
        $job->setStalledCount(10);
345
        $objectManager->persist($job);
346
        $objectManager->flush();
347
348
        $count = $jobManager->resetStalledJobs();
349
        self::assertEquals(0, $count);
350
        $job = $jobManager->getRepository()->find($id);
351
        self::assertNull($job);
352
        $job = $objectManager->getRepository($jobManager->getArchiveObjectName())->find($id);
353
        self::assertNotNull($job);
354
        $objectManager->remove($job);
355
        $objectManager->flush();
356
    }
357
358
    public function testPruneErroneousJobs()
359
    {
360
        $job = $this->getJob();
361
        $id = $job->getId();
362
363
        /** @var JobManager|\Dtc\QueueBundle\ORM\JobManager $jobManager */
364
        $jobManager = self::$jobManager;
365
        $jobManager->deleteJob($job);
366
        $archiveObjectName = $jobManager->getArchiveObjectName();
367
368
        $objectManager = $jobManager->getObjectManager();
369
370
        $archiveRepository = $objectManager->getRepository($archiveObjectName);
371
        $result = $archiveRepository->find($id);
372
        self::assertNotNull($result);
373
        self::assertEquals($id, $result->getId());
374
375
        $result->setStatus(BaseJob::STATUS_ERROR);
376
        $result->setLocked(true);
377
        $result->setLockedAt(new \DateTime());
378
        $result->setFinishedAt(new \DateTime());
379
        $result->setElapsed(12345);
380
        $result->setMessage('soomething');
381
        $objectManager->persist($result);
382
        $objectManager->flush();
383
384
        $count = $jobManager->pruneErroneousJobs('asdf');
385
        self::assertEquals(0, $count);
386
        $count = $jobManager->pruneErroneousJobs(null, 'asdf');
387
        self::assertEquals(0, $count);
388
        $count = $jobManager->pruneErroneousJobs('fibonacci', 'asdf');
389
        self::assertEquals(0, $count);
390
        $count = $jobManager->pruneErroneousJobs('fibonacci', 'asdf');
391
        self::assertEquals(0, $count);
392
        $count = $jobManager->pruneErroneousJobs('fibonacci', 'fibonacci');
393
        self::assertEquals(1, $count);
394
        $repository = $jobManager->getRepository();
395
        $job = $repository->find($id);
396
        $objectManager->clear();
397
        self::assertNull($job);
398
        $archiveJob = $archiveRepository->find($id);
399
        self::assertNull($archiveJob);
400
401
        $job = $this->getJob();
402
        $id = $job->getId();
403
        $objectManager->remove($job);
404
        $objectManager->flush();
405
        /** @var JobManager|\Dtc\QueueBundle\ORM\JobManager $jobManager */
406
        $jobManager = self::$jobManager;
407
        $archiveObjectName = $jobManager->getArchiveObjectName();
408
409
        $objectManager = $jobManager->getObjectManager();
410
411
        $archiveRepository = $objectManager->getRepository($archiveObjectName);
412
        $result = $archiveRepository->find($id);
413
        self::assertNotNull($result);
414
        self::assertEquals($id, $result->getId());
415
416
        $result->setStatus(BaseJob::STATUS_ERROR);
417
        $result->setLocked(true);
418
        $result->setLockedAt(new \DateTime());
419
        $result->setFinishedAt(new \DateTime());
420
        $result->setElapsed(12345);
421
        $result->setMessage('soomething');
422
        $objectManager->persist($result);
423
        $objectManager->flush();
424
425
        $job = $this->getJob();
426
        $id = $job->getId();
427
        $objectManager->remove($job);
428
        $objectManager->flush();
429
430
        /** @var JobManager|\Dtc\QueueBundle\ORM\JobManager $jobManager */
431
        $jobManager = self::$jobManager;
432
        $archiveObjectName = $jobManager->getArchiveObjectName();
433
        $objectManager = $jobManager->getObjectManager();
434
435
        $archiveRepository = $objectManager->getRepository($archiveObjectName);
436
        $result = $archiveRepository->find($id);
437
        self::assertNotNull($result);
438
        self::assertEquals($id, $result->getId());
439
440
        $result->setStatus(BaseJob::STATUS_ERROR);
441
        $result->setLocked(true);
442
        $result->setLockedAt(new \DateTime());
443
        $result->setFinishedAt(new \DateTime());
444
        $result->setElapsed(12345);
445
        $result->setMessage('soomething');
446
        $objectManager->persist($result);
447
        $objectManager->flush();
448
        $count = $jobManager->pruneErroneousJobs();
449
        self::assertEquals(2, $count);
450
    }
451
452
    public function testPruneStalledJobs()
453
    {
454
        static::setUpBeforeClass();
455
456
        /** @var JobManager|\Dtc\QueueBundle\ORM\JobManager $jobManager */
457
        $jobManager = self::$jobManager;
458
459
        $job = new self::$jobClass(self::$worker, false, null);
460
        $job->fibonacci(1);
461
        self::assertNotNull($job->getId(), 'Job id should be generated');
462
        $job->setStatus(BaseJob::STATUS_RUNNING);
463
        $job->setLocked(true);
464
        $time = time();
465
        $date = new \DateTime("@$time");
466
        $job->setLockedAt($date);
467
        $id = $job->getId();
468
        $job = $jobManager->getRepository()->find($id);
469
470
        self::assertNotNull($job);
471
472
        $runClass = $jobManager->getRunClass();
473
474
        $objectManager = $jobManager->getObjectManager();
475
        $run = new $runClass();
476
        $run->setLastHeartbeatAt(new \DateTime());
477
        $objectManager->persist($run);
478
        $objectManager->flush();
479
        $runId = $run->getId();
480
        self::assertNotNull($runId);
481
        $job->setRunId($runId);
482
        $objectManager->persist($job);
483
        $objectManager->flush();
484
        $objectManager->remove($run);
485
        $objectManager->flush();
486
        $id = $job->getId();
487
        $job = $jobManager->getRepository()->find($id);
488
489
        self::assertNotNull($job);
490
491
        $archivedRun = $objectManager->getRepository($jobManager->getRunArchiveClass())->find($runId);
492
493
        $minusTime = $time - (BaseJobManager::STALLED_SECONDS + 1);
494
        $archivedRun->setEndedAt(new \DateTime("@$minusTime"));
495
496
        $objectManager->persist($archivedRun);
497
        $objectManager->flush();
498
499
        $count = $jobManager->pruneStalledJobs('asdf');
500
        self::assertEquals(0, $count);
501
        $count = $jobManager->pruneStalledJobs(null, 'asdf');
502
        self::assertEquals(0, $count);
503
        $count = $jobManager->pruneStalledJobs('fibonacci', 'asdf');
504
        self::assertEquals(0, $count);
505
        $count = $jobManager->pruneStalledJobs('fibonacci', 'fibonacci');
506
        self::assertEquals(1, $count);
507
508
        $job = $jobManager->getRepository()->find($id);
509
510
        self::assertNull($job);
511
512
        $archivedJob = $jobManager->getObjectManager()->getRepository($jobManager->getArchiveObjectName())->find($id);
513
514
        self::assertNotNull($archivedJob);
515
        self::assertEquals(BaseJob::STATUS_ERROR, $archivedJob->getStatus());
516
        self::assertEquals(1, $archivedJob->getStalledCount());
517
        $objectManager->remove($archivedJob);
518
        $objectManager->flush();
519
520
        // multiple
521
522
        /** @var JobManager|\Dtc\QueueBundle\ORM\JobManager $jobManager */
523
        $jobManager = self::$jobManager;
524
525
        $job = new self::$jobClass(self::$worker, false, null);
526
        $job->fibonacci(1);
527
        self::assertNotNull($job->getId(), 'Job id should be generated');
528
        $job->setStatus(BaseJob::STATUS_RUNNING);
529
        $job->setLocked(true);
530
        $time = time();
531
        $date = new \DateTime("@$time");
532
        $job->setLockedAt($date);
533
        $id = $job->getId();
534
        $job = $jobManager->getRepository()->find($id);
535
536
        self::assertNotNull($job);
537
538
        $runClass = $jobManager->getRunClass();
539
540
        $objectManager = $jobManager->getObjectManager();
541
        $run = new $runClass();
542
        $run->setLastHeartbeatAt(new \DateTime());
543
        $objectManager->persist($run);
544
        $objectManager->flush();
545
        $runId = $run->getId();
546
        self::assertNotNull($runId);
547
        $job->setRunId($runId);
548
        $objectManager->persist($job);
549
        $objectManager->flush();
550
        $objectManager->remove($run);
551
        $objectManager->flush();
552
        $id = $job->getId();
553
        $job = $jobManager->getRepository()->find($id);
554
555
        self::assertNotNull($job);
556
557
        $archivedRun = $objectManager->getRepository($jobManager->getRunArchiveClass())->find($runId);
558
559
        $minusTime = $time - (BaseJobManager::STALLED_SECONDS + 1);
560
        $archivedRun->setEndedAt(new \DateTime("@$minusTime"));
561
562
        $objectManager->persist($archivedRun);
563
        $objectManager->flush();
564
565
        $job = new self::$jobClass(self::$worker, false, null);
566
        $job->fibonacci(1);
567
        self::assertNotNull($job->getId(), 'Job id should be generated');
568
        $job->setStatus(BaseJob::STATUS_RUNNING);
569
        $job->setLocked(true);
570
        $time = time();
571
        $date = new \DateTime("@$time");
572
        $job->setLockedAt($date);
573
        $id = $job->getId();
574
        $job = $jobManager->getRepository()->find($id);
575
576
        self::assertNotNull($job);
577
578
        $runClass = $jobManager->getRunClass();
579
580
        $objectManager = $jobManager->getObjectManager();
581
        $run = new $runClass();
582
        $run->setLastHeartbeatAt(new \DateTime());
583
        $objectManager->persist($run);
584
        $objectManager->flush();
585
        $runId = $run->getId();
586
        self::assertNotNull($runId);
587
        $job->setRunId($runId);
588
        $objectManager->persist($job);
589
        $objectManager->flush();
590
        $objectManager->remove($run);
591
        $objectManager->flush();
592
        $id = $job->getId();
593
        $job = $jobManager->getRepository()->find($id);
594
595
        self::assertNotNull($job);
596
597
        $archivedRun = $objectManager->getRepository($jobManager->getRunArchiveClass())->find($runId);
598
599
        $minusTime = $time - (BaseJobManager::STALLED_SECONDS + 1);
600
        $archivedRun->setEndedAt(new \DateTime("@$minusTime"));
601
602
        $objectManager->persist($archivedRun);
603
        $objectManager->flush();
604
        $count = $jobManager->pruneStalledJobs();
605
        self::assertEquals(2, $count);
606
    }
607
608
    public function testBatchJobs()
609
    {
610
        $jobs = self::$jobManager->getRepository()->findAll();
611
        foreach ($jobs as $job) {
612
            self::$jobManager->getObjectManager()->remove($job);
613
        }
614
        self::$jobManager->getObjectManager()->flush();
615
        self::$jobManager->getObjectManager()->clear();
616
617
        /** @var JobManager|\Dtc\QueueBundle\ORM\JobManager $jobManager */
618
        $worker = self::$worker;
619
        $job1 = $worker->later()->fibonacci(1);
620
        $job2 = $worker->batchLater()->fibonacci(1);
621
        self::assertEquals($job1, $job2);
622
623
        $jobs = self::$jobManager->getRepository()->findAll();
624
        self::assertCount(1, $jobs);
625
        self::assertEquals($job1, $jobs[0]);
626
        self::assertNull($jobs[0]->getPriority());
627
        self::$jobManager->getObjectManager()->remove($jobs[0]);
628
        self::$jobManager->getObjectManager()->flush();
629
        self::$jobManager->getObjectManager()->clear();
630
631
        $job1 = $worker->later()->fibonacci(1);
632
        self::assertNull($job1->getPriority());
633
        $job2 = $worker->batchLater()->setPriority(3)->fibonacci(1);
634
        self::assertEquals($job1, $job2);
635
        self::assertNotNull($job2->getPriority());
636
637
        $jobs = self::$jobManager->getRepository()->findAll();
638
        self::assertCount(1, $jobs);
639
        self::assertEquals($job1, $jobs[0]);
640
        self::assertNotNull($jobs[0]->getPriority());
641
642
        // Not
643
        $jobs = self::$jobManager->getRepository()->findAll();
644
        foreach ($jobs as $job) {
645
            self::$jobManager->getObjectManager()->remove($job);
646
        }
647
        self::$jobManager->getObjectManager()->remove($jobs[0]);
648
        self::$jobManager->getObjectManager()->flush();
649
        self::$jobManager->getObjectManager()->clear();
650
651
        $job1 = $worker->later(100)->fibonacci(1);
652
653
        $time1 = new \DateTime('@'.time());
654
        $job2 = $worker->batchLater(0)->fibonacci(1);
655
        $time2 = new \DateTime();
656
657
        self::assertEquals($job1, $job2);
658
        self::assertGreaterThanOrEqual($time1, $job2->getWhenAt());
659
        self::assertLessThanOrEqual($time2, $job2->getWhenAt());
660
661
        $jobs = self::$jobManager->getRepository()->findAll();
662
        self::assertCount(1, $jobs);
663
        self::assertEquals($job1, $jobs[0]);
664
        self::assertGreaterThanOrEqual($time1, $jobs[0]->getWhenAt());
665
        self::assertLessThanOrEqual($time2, $jobs[0]->getWhenAt());
666
        self::$jobManager->getObjectManager()->remove($jobs[0]);
667
        self::$jobManager->getObjectManager()->flush();
668
        self::$jobManager->getObjectManager()->clear();
669
670
        $job1 = $worker->later(100)->setPriority(3)->fibonacci(1);
671
        $priority1 = $job1->getPriority();
672
        $time1 = new \DateTime('@'.time());
673
        $job2 = $worker->batchLater(0)->setPriority(1)->fibonacci(1);
674
        $time2 = new \DateTime();
675
        self::assertEquals($job1, $job2);
676
        self::assertNotEquals($priority1, $job2->getPriority());
677
678
        self::assertEquals($job1, $job2);
679
        self::assertGreaterThanOrEqual($time1, $job2->getWhenAt());
680
        self::assertLessThanOrEqual($time2, $job2->getWhenAt());
681
    }
682
683
    public function testPruneExpiredJobs()
684
    {
685
        /** @var JobManager|\Dtc\QueueBundle\ORM\JobManager $jobManager */
686
        $jobManager = self::$jobManager;
687
        $objectManager = $jobManager->getObjectManager();
688
689
        $job = new self::$jobClass(self::$worker, false, null);
690
        $job->fibonacci(1);
691
        self::assertNotNull($job->getId(), 'Job id should be generated');
692
        $time = time() - 1;
693
        $date = new \DateTime("@$time");
694
        $job->setExpiresAt($date);
695
        $objectManager->persist($job);
696
        $objectManager->flush();
697
698
        $count = $jobManager->pruneExpiredJobs('asdf');
699
        self::assertEquals(0, $count);
700
        $count = $jobManager->pruneExpiredJobs(null, 'asdf');
701
        self::assertEquals(0, $count);
702
        $count = $jobManager->pruneExpiredJobs(null, 'fibonacci');
703
        self::assertEquals(1, $count);
704
705
        $job = new self::$jobClass(self::$worker, false, null);
706
        $job->fibonacci(1);
707
        self::assertNotNull($job->getId(), 'Job id should be generated');
708
        $time = time() - 1;
709
        $date = new \DateTime("@$time");
710
        $job->setExpiresAt($date);
711
        $objectManager->persist($job);
712
        $objectManager->flush();
713
714
        $job = new self::$jobClass(self::$worker, false, null);
715
        $job->fibonacci(1);
716
        self::assertNotNull($job->getId(), 'Job id should be generated');
717
        $time = time() - 1;
718
        $date = new \DateTime("@$time");
719
        $job->setExpiresAt($date);
720
        $objectManager->persist($job);
721
        $objectManager->flush();
722
723
        $count = $jobManager->pruneExpiredJobs(null, 'fibonacci');
724
        self::assertEquals(2, $count);
725
726
        $job = new self::$jobClass(self::$worker, false, null);
727
        $job->fibonacci(1);
728
        self::assertNotNull($job->getId(), 'Job id should be generated');
729
        $time = time() - 1;
730
        $date = new \DateTime("@$time");
731
        $job->setExpiresAt($date);
732
        $objectManager->persist($job);
733
        $objectManager->flush();
734
735
        $job = new self::$jobClass(self::$worker, false, null);
736
        $job->fibonacci(1);
737
        self::assertNotNull($job->getId(), 'Job id should be generated');
738
        $time = time() - 1;
739
        $date = new \DateTime("@$time");
740
        $job->setExpiresAt($date);
741
        $objectManager->persist($job);
742
        $objectManager->flush();
743
744
        $count = $jobManager->pruneExpiredJobs('fibonacci', 'fibonacci');
745
        self::assertEquals(2, $count);
746
747
        $job = new self::$jobClass(self::$worker, false, null);
748
        $job->fibonacci(1);
749
        self::assertNotNull($job->getId(), 'Job id should be generated');
750
        $time = time() - 1;
751
        $date = new \DateTime("@$time");
752
        $job->setExpiresAt($date);
753
        $objectManager->persist($job);
754
        $objectManager->flush();
755
756
        $job = new self::$jobClass(self::$worker, false, null);
757
        $job->fibonacci(1);
758
        self::assertNotNull($job->getId(), 'Job id should be generated');
759
        $time = time() - 1;
760
        $date = new \DateTime("@$time");
761
        $job->setExpiresAt($date);
762
        $objectManager->persist($job);
763
        $objectManager->flush();
764
765
        $count = $jobManager->pruneExpiredJobs('fibonacci');
766
        self::assertEquals(2, $count);
767
768
        $job = new self::$jobClass(self::$worker, false, null);
769
        $job->fibonacci(1);
770
        self::assertNotNull($job->getId(), 'Job id should be generated');
771
        $time = time() - 1;
772
        $date = new \DateTime("@$time");
773
        $job->setExpiresAt($date);
774
        $objectManager->persist($job);
775
        $objectManager->flush();
776
777
        $jobId1 = $job->getId();
778
779
        $job = new self::$jobClass(self::$worker, false, null);
780
        $job->fibonacci(1);
781
        self::assertNotNull($job->getId(), 'Job id should be generated');
782
        $time = time() - 1;
783
        $date = new \DateTime("@$time");
784
        $job->setExpiresAt($date);
785
        $objectManager->persist($job);
786
        $objectManager->flush();
787
788
        $jobId2 = $job->getId();
789
790
        $count = $jobManager->pruneExpiredJobs();
791
        self::assertEquals(2, $count);
792
793
        $archiveRepository = $jobManager->getObjectManager()->getRepository($jobManager->getArchiveObjectName());
794
795
        $job = $archiveRepository->find($jobId1);
796
        self::assertNotNull($job);
797
        self::assertEquals(Job::STATUS_EXPIRED, $job->getStatus());
798
799
        $job = $archiveRepository->find($jobId2);
800
        self::assertNotNull($job);
801
        self::assertEquals(Job::STATUS_EXPIRED, $job->getStatus());
802
    }
803
804
    public function testPruneArchivedJobs()
805
    {
806
        /** @var JobManager|\Dtc\QueueBundle\ORM\JobManager $jobManager */
807
        $jobManager = self::$jobManager;
808
        $objectManager = $jobManager->getObjectManager();
809
        $jobArchiveClass = $jobManager->getArchiveObjectName();
810
        $jobArchiveRepository = $objectManager->getRepository($jobArchiveClass);
811
812
        self::$objectManager->getEventManager()->removeEventListener('preUpdate', self::$dtcQueueListener);
813
814
        $job = new self::$jobClass(self::$worker, false, null);
815
        $job->fibonacci(1);
816
        $id = $job->getId();
817
        $objectManager->remove($job);
818
        $objectManager->flush();
819
820
        $jobArchive = $jobArchiveRepository->find($id);
821
        self::assertNotNull($jobArchive);
822
        $time = time() - 86401;
823
        $jobArchive->setUpdatedAt(new \DateTime("@$time"));
824
        $objectManager->persist($jobArchive);
825
        $objectManager->flush();
826
827
        $older = $time + 1;
828
        $count = $jobManager->pruneArchivedJobs(new \DateTime("@$time"));
829
        self::assertEquals(0, $count);
830
        $count = $jobManager->pruneArchivedJobs(new \DateTime("@$older"));
831
        self::assertEquals(1, $count);
832
833
        $job = new self::$jobClass(self::$worker, false, null);
834
        $job->fibonacci(1);
835
        $id = $job->getId();
836
        $objectManager->remove($job);
837
        $objectManager->flush();
838
839
        $jobArchive = $jobArchiveRepository->find($id);
840
        self::assertNotNull($jobArchive);
841
        $time = time() - 86401;
842
        $jobArchive->setUpdatedAt(new \DateTime("@$time"));
843
        $objectManager->persist($jobArchive);
844
        $objectManager->flush();
845
846
        $job = new self::$jobClass(self::$worker, false, null);
847
        $job->fibonacci(1);
848
        $id = $job->getId();
849
        $objectManager->remove($job);
850
        $objectManager->flush();
851
852
        $jobArchive = $jobArchiveRepository->find($id);
853
        self::assertNotNull($jobArchive);
854
        $jobArchive->setUpdatedAt(new \DateTime("@$time"));
855
        $objectManager->persist($jobArchive);
856
        $objectManager->flush();
857
        $older = $time + 1;
858
        $count = $jobManager->pruneArchivedJobs(new \DateTime("@$time"));
859
        self::assertEquals(0, $count);
860
        $count = $jobManager->pruneArchivedJobs(new \DateTime("@$older"));
861
        self::assertEquals(2, $count);
862
863
        self::$objectManager->getEventManager()->addEventListener('preUpdate', self::$dtcQueueListener);
864
    }
865
866
    public function testPerformance()
867
    {
868
        $jobs = self::$jobManager->getRepository()->findAll();
869
        foreach ($jobs as $job) {
870
            self::$jobManager->getObjectManager()->remove($job);
871
        }
872
        self::$jobManager->getObjectManager()->flush();
873
874
        self::$jobManager->getObjectManager()->clear();
875
        parent::testPerformance();
876
    }
877
878
    protected function getBaseStatus()
879
    {
880
        /** @var BaseJobManager $jobManager */
881
        $jobManager = self::$jobManager;
882
        $job = new self::$jobClass(self::$worker, false, null);
883
        $job->fibonacci(1);
884
        $status = $jobManager->getStatus();
885
        self::assertArrayHasKey('fibonacci->fibonacci()', $status);
886
        $fibonacciStatus = $status['fibonacci->fibonacci()'];
887
888
        self::assertArrayHasKey(BaseJob::STATUS_NEW, $fibonacciStatus);
889
        self::assertArrayHasKey(BaseJob::STATUS_ERROR, $fibonacciStatus);
890
        self::assertArrayHasKey(BaseJob::STATUS_RUNNING, $fibonacciStatus);
891
        self::assertArrayHasKey(BaseJob::STATUS_SUCCESS, $fibonacciStatus);
892
        self::assertArrayHasKey(RetryableJob::STATUS_MAX_STALLED, $fibonacciStatus);
893
        self::assertArrayHasKey(RetryableJob::STATUS_MAX_ERROR, $fibonacciStatus);
894
        self::assertArrayHasKey(RetryableJob::STATUS_MAX_RETRIES, $fibonacciStatus);
895
        self::assertArrayHasKey(RetryableJob::STATUS_EXPIRED, $fibonacciStatus);
896
897
        return [$job, $status];
898
    }
899
900
    public function testGetStatus()
901
    {
902
        list($job1, $status1) = $this->getBaseStatus();
903
        list($job2, $status2) = $this->getBaseStatus();
904
        $fibonacciStatus1 = $status1['fibonacci->fibonacci()'];
905
        $fibonacciStatus2 = $status2['fibonacci->fibonacci()'];
906
907
        self::assertEquals($fibonacciStatus1[BaseJob::STATUS_NEW] + 1, $fibonacciStatus2[BaseJob::STATUS_NEW]);
908
        $jobManager = self::$jobManager;
909
        $objectManager = $jobManager->getObjectManager();
910
        $objectManager->remove($job1);
911
        $objectManager->remove($job2);
912
    }
913
}
914