Completed
Push — master ( 4317fb...b1ebb8 )
by Matthew
04:36 queued 02:27
created

JobManager::updateBatchJob()   B

Complexity

Conditions 5
Paths 5

Size

Total Lines 29
Code Lines 20

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 0
CRAP Score 30

Importance

Changes 0
Metric Value
dl 0
loc 29
ccs 0
cts 20
cp 0
rs 8.439
c 0
b 0
f 0
cc 5
eloc 20
nc 5
nop 3
crap 30
1
<?php
2
3
namespace Dtc\QueueBundle\ORM;
4
5
use Doctrine\ORM\EntityManager;
6
use Doctrine\ORM\EntityRepository;
7
use Doctrine\ORM\Query;
8
use Doctrine\ORM\QueryBuilder;
9
use Dtc\QueueBundle\Doctrine\BaseJobManager;
10
use Dtc\QueueBundle\Entity\Job;
11
use Dtc\QueueBundle\Model\BaseJob;
12
use Dtc\QueueBundle\Model\RetryableJob;
13
use Symfony\Component\Process\Exception\LogicException;
14
15
class JobManager extends BaseJobManager
16
{
17
    use CommonTrait;
18
    protected static $saveInsertCalled = null;
19
    protected static $resetInsertCalled = null;
20
21
    public function countJobsByStatus($objectName, $status, $workerName = null, $method = null)
22
    {
23
        /** @var EntityManager $objectManager */
24
        $objectManager = $this->getObjectManager();
25
26
        $queryBuilder = $objectManager
27
            ->createQueryBuilder()
28
            ->select('count(a.id)')
29
            ->from($objectName, 'a')
30
            ->where('a.status = :status');
31
32
        if (null !== $workerName) {
33
            $queryBuilder->andWhere('a.workerName = :workerName')
34
                ->setParameter(':workerName', $workerName);
35
        }
36
37
        if (null !== $method) {
38
            $queryBuilder->andWhere('a.method = :method')
39
                ->setParameter(':method', $workerName);
40
        }
41
42
        $count = $queryBuilder->setParameter(':status', $status)
43
            ->getQuery()->getSingleScalarResult();
44
45
        if (!$count) {
46
            return 0;
47
        }
48
49
        return $count;
50
    }
51
52
    /**
53
     * @param string|null $workerName
54
     * @param string|null $method
55
     *
56
     * @return int Count of jobs pruned
57
     */
58
    public function pruneErroneousJobs($workerName = null, $method = null)
59
    {
60
        /** @var EntityManager $objectManager */
61
        $objectManager = $this->getObjectManager();
62
        $queryBuilder = $objectManager->createQueryBuilder()->delete($this->getJobArchiveClass(), 'j');
63
        $queryBuilder->where('j.status = :status')
64
            ->setParameter(':status', BaseJob::STATUS_ERROR);
65
66
        $this->addWorkerNameCriterion($queryBuilder, $workerName, $method);
67
        $query = $queryBuilder->getQuery();
68
69
        return intval($query->execute());
70
    }
71
72
    protected function resetSaveOk($function)
73
    {
74
        $objectManager = $this->getObjectManager();
75
        $splObjectHash = spl_object_hash($objectManager);
76
77
        if ('save' === $function) {
78
            $compare = static::$resetInsertCalled;
79
        } else {
80
            $compare = static::$saveInsertCalled;
81
        }
82
83
        if ($splObjectHash === $compare) {
84
            // Insert SQL is cached...
85
            $msg = "Can't call save and reset within the same process cycle (or using the same EntityManager)";
86
            throw new LogicException($msg);
87
        }
88
89
        if ('save' === $function) {
90
            static::$saveInsertCalled = spl_object_hash($objectManager);
91
        } else {
92
            static::$resetInsertCalled = spl_object_hash($objectManager);
93
        }
94
    }
95
96
    /**
97
     * @param string $workerName
98
     * @param string $method
99
     */
100
    protected function addWorkerNameCriterion(QueryBuilder $queryBuilder, $workerName = null, $method = null)
101
    {
102
        if (null !== $workerName) {
103
            $queryBuilder->andWhere('j.workerName = :workerName')->setParameter(':workerName', $workerName);
104
        }
105
106
        if (null !== $method) {
107
            $queryBuilder->andWhere('j.method = :method')->setParameter(':method', $method);
108
        }
109
    }
110
111
    protected function updateExpired($workerName = null, $method = null)
112
    {
113
        /** @var EntityManager $objectManager */
114
        $objectManager = $this->getObjectManager();
115
        $queryBuilder = $objectManager->createQueryBuilder()->update($this->getJobClass(), 'j');
116
        $queryBuilder->set('j.status', ':newStatus');
117
        $queryBuilder->where('j.expiresAt <= :expiresAt')
118
            ->setParameter(':expiresAt', new \DateTime());
119
        $queryBuilder->andWhere('j.status = :status')
120
            ->setParameter(':status', BaseJob::STATUS_NEW)
121
            ->setParameter(':newStatus', Job::STATUS_EXPIRED);
122
123
        $this->addWorkerNameCriterion($queryBuilder, $workerName, $method);
124
        $query = $queryBuilder->getQuery();
125
126
        return intval($query->execute());
127
    }
128
129
    /**
130
     * Removes archived jobs older than $olderThan.
131
     *
132
     * @param \DateTime $olderThan
133
     */
134
    public function pruneArchivedJobs(\DateTime $olderThan)
135
    {
136
        return $this->removeOlderThan(
137
            $this->getJobArchiveClass(),
138
                'updatedAt',
139
                $olderThan
140
        );
141
    }
142
143 1
    public function getJobCount($workerName = null, $method = null)
144
    {
145
        /** @var EntityManager $objectManager */
146 1
        $objectManager = $this->getObjectManager();
147 1
        $queryBuilder = $objectManager->createQueryBuilder();
148
149 1
        $queryBuilder = $queryBuilder->select('count(j)')->from($this->getJobClass(), 'j');
150
151 1
        $where = 'where';
152 1
        if (null !== $workerName) {
153
            if (null !== $method) {
154
                $queryBuilder->where($queryBuilder->expr()->andX(
155
                    $queryBuilder->expr()->eq('j.workerName', ':workerName'),
156
                                                $queryBuilder->expr()->eq('j.method', ':method')
157
                ))
158
                    ->setParameter(':method', $method);
159
            } else {
160
                $queryBuilder->where('j.workerName = :workerName');
161
            }
162
            $queryBuilder->setParameter(':workerName', $workerName);
163
            $where = 'andWhere';
164 1
        } elseif (null !== $method) {
165
            $queryBuilder->where('j.method = :method')->setParameter(':method', $method);
166
            $where = 'andWhere';
167
        }
168
169 1
        $dateTime = new \DateTime();
170
        // Filter
171
        $queryBuilder
172 1
            ->$where($queryBuilder->expr()->orX(
173 1
                $queryBuilder->expr()->isNull('j.whenAt'),
174 1
                                        $queryBuilder->expr()->lte('j.whenAt', ':whenAt')
175
            ))
176 1
            ->andWhere($queryBuilder->expr()->orX(
177 1
                $queryBuilder->expr()->isNull('j.expiresAt'),
178 1
                $queryBuilder->expr()->gt('j.expiresAt', ':expiresAt')
179
            ))
180 1
            ->andWhere('j.locked is NULL')
181 1
            ->setParameter(':whenAt', $dateTime)
182 1
            ->setParameter(':expiresAt', $dateTime);
183
184 1
        $query = $queryBuilder->getQuery();
185
186 1
        return $query->getSingleScalarResult();
187
    }
188
189
    /**
190
     * Get Jobs statuses.
191
     */
192 1
    public function getStatus()
193
    {
194 1
        $result = [];
195 1
        $this->getStatusByEntityName($this->getJobClass(), $result);
196 1
        $this->getStatusByEntityName($this->getJobArchiveClass(), $result);
197
198 1
        $finalResult = [];
199 1
        foreach ($result as $key => $item) {
200
            ksort($item);
201
            foreach ($item as $status => $count) {
202
                if (isset($finalResult[$key][$status])) {
203
                    $finalResult[$key][$status] += $count;
204
                } else {
205
                    $finalResult[$key][$status] = $count;
206
                }
207
            }
208
        }
209
210 1
        return $finalResult;
211
    }
212
213
    /**
214
     * @param string $entityName
215
     */
216 1
    protected function getStatusByEntityName($entityName, array &$result)
217
    {
218
        /** @var EntityManager $objectManager */
219 1
        $objectManager = $this->getObjectManager();
220 1
        $result1 = $objectManager->getRepository($entityName)->createQueryBuilder('j')->select('j.workerName, j.method, j.status, count(j) as c')
221 1
            ->groupBy('j.workerName, j.method, j.status')->getQuery()->getArrayResult();
222
223 1
        foreach ($result1 as $item) {
224
            $method = $item['workerName'].'->'.$item['method'].'()';
225
            if (!isset($result[$method])) {
226
                $result[$method] = [BaseJob::STATUS_NEW => 0,
227
                    BaseJob::STATUS_RUNNING => 0,
228
                    RetryableJob::STATUS_EXPIRED => 0,
229
                    RetryableJob::STATUS_MAX_ERROR => 0,
230
                    RetryableJob::STATUS_MAX_STALLED => 0,
231
                    RetryableJob::STATUS_MAX_RETRIES => 0,
232
                    BaseJob::STATUS_SUCCESS => 0,
233
                    BaseJob::STATUS_ERROR => 0, ];
234
            }
235
            $result[$method][$item['status']] += intval($item['c']);
236
        }
237 1
    }
238
239
    /**
240
     * Get the next job to run (can be filtered by workername and method name).
241
     *
242
     * @param string $workerName
243
     * @param string $methodName
244
     * @param bool   $prioritize
245
     *
246
     * @return Job|null
247
     */
248
    public function getJob($workerName = null, $methodName = null, $prioritize = true, $runId = null)
249
    {
250
        do {
251
            $queryBuilder = $this->getJobQueryBuilder($workerName, $methodName, $prioritize);
0 ignored issues
show
Bug introduced by
It seems like $workerName defined by parameter $workerName on line 248 can also be of type string; however, Dtc\QueueBundle\ORM\JobM...r::getJobQueryBuilder() does only seem to accept null, maybe add an additional type check?

This check looks at variables that have been passed in as parameters and are passed out again to other methods.

If the outgoing method call has stricter type requirements than the method itself, an issue is raised.

An additional type check may prevent trouble.

Loading history...
Bug introduced by
It seems like $methodName defined by parameter $methodName on line 248 can also be of type string; however, Dtc\QueueBundle\ORM\JobM...r::getJobQueryBuilder() does only seem to accept null, maybe add an additional type check?

This check looks at variables that have been passed in as parameters and are passed out again to other methods.

If the outgoing method call has stricter type requirements than the method itself, an issue is raised.

An additional type check may prevent trouble.

Loading history...
252
            $queryBuilder->select('j.id');
253
            $queryBuilder->setMaxResults(100);
254
255
            /** @var QueryBuilder $queryBuilder */
256
            $query = $queryBuilder->getQuery();
257
            $jobs = $query->getResult();
258
            if ($jobs) {
0 ignored issues
show
Bug Best Practice introduced by
The expression $jobs of type array is implicitly converted to a boolean; are you sure this is intended? If so, consider using ! empty($expr) instead to make it clear that you intend to check for an array without elements.

This check marks implicit conversions of arrays to boolean values in a comparison. While in PHP an empty array is considered to be equal (but not identical) to false, this is not always apparent.

Consider making the comparison explicit by using empty(..) or ! empty(...) instead.

Loading history...
259
                foreach ($jobs as $job) {
260
                    if ($job = $this->takeJob($job['id'])) {
0 ignored issues
show
Bug introduced by
Are you sure the assignment to $job is correct as $this->takeJob($job['id']) (which targets Dtc\QueueBundle\ORM\JobManager::takeJob()) seems to always return null.

This check looks for function or method calls that always return null and whose return value is assigned to a variable.

class A
{
    function getObject()
    {
        return null;
    }

}

$a = new A();
$object = $a->getObject();

The method getObject() can return nothing but null, so it makes no sense to assign that value to a variable.

The reason is most likely that a function or method is imcomplete or has been reduced for debug purposes.

Loading history...
261
                        return $job;
262
                    }
263
                }
264
            }
265
        } while ($jobs);
0 ignored issues
show
Bug Best Practice introduced by
The expression $jobs of type array is implicitly converted to a boolean; are you sure this is intended? If so, consider using ! empty($expr) instead to make it clear that you intend to check for an array without elements.

This check marks implicit conversions of arrays to boolean values in a comparison. While in PHP an empty array is considered to be equal (but not identical) to false, this is not always apparent.

Consider making the comparison explicit by using empty(..) or ! empty(...) instead.

Loading history...
266
267
        return null;
268
    }
269
270
    /**
271
     * @param null $workerName
272
     * @param null $methodName
273
     * @param bool $prioritize
274
     *
275
     * @return QueryBuilder
276
     */
277
    public function getJobQueryBuilder($workerName = null, $methodName = null, $prioritize = true)
278
    {
279
        /** @var EntityRepository $repository */
280
        $repository = $this->getRepository();
281
        $queryBuilder = $repository->createQueryBuilder('j');
282
        $this->addStandardPredicate($queryBuilder);
283
        $this->addWorkerNameCriterion($queryBuilder, $workerName, $methodName);
284
285
        if ($prioritize) {
286
            $queryBuilder->addOrderBy('j.priority', 'DESC');
287
            $queryBuilder->addOrderBy('j.whenAt', 'ASC');
288
        } else {
289
            $queryBuilder->orderBy('j.whenAt', 'ASC');
290
        }
291
292
        return $queryBuilder;
293
    }
294
295
    protected function addStandardPredicate(QueryBuilder $queryBuilder)
296
    {
297
        $dateTime = new \DateTime();
298
        $queryBuilder
299
            ->where('j.status = :status')->setParameter(':status', BaseJob::STATUS_NEW)
300
            ->andWhere('j.locked is NULL')
301
            ->andWhere($queryBuilder->expr()->orX(
302
                $queryBuilder->expr()->isNull('j.whenAt'),
303
                $queryBuilder->expr()->lte('j.whenAt', ':whenAt')
304
            ))
305
            ->andWhere($queryBuilder->expr()->orX(
306
                $queryBuilder->expr()->isNull('j.expiresAt'),
307
                $queryBuilder->expr()->gt('j.expiresAt', ':expiresAt')
308
            ))
309
            ->setParameter(':whenAt', $dateTime)
310
            ->setParameter(':expiresAt', $dateTime);
311
    }
312
313
    protected function takeJob($jobId, $runId = null)
314
    {
315
        /** @var EntityRepository $repository */
316
        $repository = $this->getRepository();
317
        /** @var QueryBuilder $queryBuilder */
318
        $queryBuilder = $repository->createQueryBuilder('j');
319
        $queryBuilder
320
            ->update()
321
            ->set('j.locked', ':locked')
322
            ->setParameter(':locked', true)
323
            ->set('j.lockedAt', ':lockedAt')
324
            ->setParameter(':lockedAt', new \DateTime())
325
            ->set('j.status', ':status')
326
            ->setParameter(':status', BaseJob::STATUS_RUNNING);
327
        if (null !== $runId) {
328
            $queryBuilder
329
                ->set('j.runId', ':runId')
330
                ->setParameter(':runId', $runId);
331
        }
332
        $queryBuilder->where('j.id = :id');
333
        $queryBuilder->andWhere('j.locked is NULL');
334
        $queryBuilder->setParameter(':id', $jobId);
335
        $resultCount = $queryBuilder->getQuery()->execute();
336
337
        if (1 === $resultCount) {
338
            return $repository->find($jobId);
339
        }
340
341
        return null;
342
    }
343
344
    /**
345
     * Tries to update the nearest job as a batch.
346
     *
347
     * @param \Dtc\QueueBundle\Model\Job $job
348
     *
349
     * @return mixed|null
350
     */
351
    public function updateNearestBatch(\Dtc\QueueBundle\Model\Job $job)
352
    {
353
        /** @var QueryBuilder $queryBuilder */
354
        $queryBuilder = $this->getRepository()->createQueryBuilder('j');
355
        $queryBuilder->select()
356
            ->where('j.crcHash = :crcHash')
357
            ->andWhere('j.status = :status')
358
            ->setParameter(':status', BaseJob::STATUS_NEW)
359
            ->setParameter(':crcHash', $job->getCrcHash())
360
            ->orderBy('j.whenAt', 'ASC')
361
            ->setMaxResults(1);
362
        $existingJobs = $queryBuilder->getQuery()->execute();
363
364
        if (empty($existingJobs)) {
365
            return null;
366
        }
367
        /** @var Job $existingJob */
368
        $existingJob = $existingJobs[0];
369
370
        $newPriority = max($job->getPriority(), $existingJob->getPriority());
371
        $newWhenAt = min($job->getWhenAt(), $existingJob->getWhenAt());
372
373
        $this->updateBatchJob($existingJob, $newPriority, $newWhenAt);
374
375
        return $existingJob;
376
    }
377
378
    protected function updateBatchJob(Job $existingJob, $newPriority, $newWhenAt)
379
    {
380
        $existingPriority = $existingJob->getPriority();
381
        $existingWhenAt = $existingJob->getWhenAt();
382
383
        if ($newPriority !== $existingPriority || $newWhenAt !== $existingWhenAt) {
384
            /** @var EntityRepository $repository */
385
            $repository = $this->getRepository();
386
            /** @var QueryBuilder $queryBuilder */
387
            $queryBuilder = $repository->createQueryBuilder('j');
388
            $queryBuilder->update();
389
            if ($newPriority !== $existingPriority) {
390
                $existingJob->setPriority($newPriority);
391
                $queryBuilder->set('j.priority', ':priority')
392
                    ->setParameter(':priority', $newPriority);
393
            }
394
            if ($newWhenAt !== $existingWhenAt) {
395
                $existingJob->setWhenAt($newWhenAt);
396
                $queryBuilder->set('j.whenAt', ':whenAt')
397
                    ->setParameter(':whenAt', $newWhenAt);
398
            }
399
            $queryBuilder->where('j.id = :id');
400
            $queryBuilder->andWhere('j.locked is NULL');
401
            $queryBuilder->setParameter(':id', $existingJob->getId());
402
            $queryBuilder->getQuery()->execute();
403
        }
404
405
        return $existingJob;
406
    }
407
408
    public function getWorkersAndMethods()
409
    {
410
        /** @var EntityRepository $repository */
411
        $repository = $this->getRepository();
412
        $queryBuilder = $repository->createQueryBuilder('j');
413
        $this->addStandardPredicate($queryBuilder);
414
        $queryBuilder
415
            ->select('DISTINCT j.workerName, j.method');
416
417
        $results = $queryBuilder->getQuery()->getArrayResult();
418
        if (!$results) {
419
            return [];
420
        }
421
        $workerMethods = [];
422
        foreach ($results as $result) {
423
            $workerMethods[$result['workerName']][] = $result['method'];
424
        }
425
426
        return $workerMethods;
427
    }
428
429
    public function countLiveJobs($workerName = null, $methodName = null)
430
    {
431
        /** @var EntityRepository $repository */
432
        $repository = $this->getRepository();
433
        $queryBuilder = $repository->createQueryBuilder('j');
434
        $this->addStandardPredicate($queryBuilder);
435
        $this->addWorkerNameCriterion($queryBuilder, $workerName, $methodName);
436
        $queryBuilder->select('count(j.id)');
437
438
        return $queryBuilder->getQuery()->getSingleScalarResult();
439
    }
440
441
    public function archiveAllJobs($workerName = null, $methodName = null, $progressCallback)
442
    {
443
        // First mark all Live non-running jobs as Archive
444
        $repository = $this->getRepository();
445
        /** @var QueryBuilder $queryBuilder */
446
        $queryBuilder = $repository->createQueryBuilder('j');
447
        $queryBuilder->update($this->getJobClass(), 'j')
448
            ->set('j.status', ':statusArchive')
449
            ->setParameter(':statusArchive', Job::STATUS_ARCHIVE);
450
        $this->addStandardPredicate($queryBuilder);
451
        $this->addWorkerNameCriterion($queryBuilder, $workerName, $methodName);
452
        $resultCount = $queryBuilder->getQuery()->execute();
453
454
        if ($resultCount) {
455
            $this->runArchive($workerName, $methodName, $progressCallback);
456
        }
457
    }
458
459
    /**
460
     * Move jobs in 'archive' status to the archive table.
461
     *
462
     *  This is a bit of a hack to run a lower level query so as to process the INSERT INTO SELECT
463
     *   All on the server as "INSERT INTO SELECT" is not supported natively in Doctrine.
464
     *
465
     * @param null $workerName
466
     * @param null $methodName
467
     */
468
    protected function runArchive($workerName = null, $methodName = null, $progressCallback)
0 ignored issues
show
Unused Code introduced by
The parameter $workerName is not used and could be removed.

This check looks from parameters that have been defined for a function or method, but which are not used in the method body.

Loading history...
Unused Code introduced by
The parameter $methodName is not used and could be removed.

This check looks from parameters that have been defined for a function or method, but which are not used in the method body.

Loading history...
469
    {
470
        /** @var EntityManager $entityManager */
471
        $entityManager = $this->getObjectManager();
472
        $count = 0;
473
        do {
474
            /** @var EntityRepository $repository */
475
            $repository = $this->getRepository();
476
            $queryBuilder = $repository->createQueryBuilder('j');
477
            $queryBuilder->where('j.status = :status')
478
                ->setParameter(':status', Job::STATUS_ARCHIVE)
479
                ->setMaxResults(10000);
480
481
            $results = $queryBuilder->getQuery()->getArrayResult();
482
            foreach ($results as $jobRow) {
483
                $job = $repository->find($jobRow['id']);
484
                if ($job) {
485
                    $entityManager->remove($job);
486
                }
487
                ++$count;
488
                if (0 == $count % 10) {
489
                    $this->flush();
490
                    $progressCallback($count);
491
                }
492
            }
493
            $this->flush();
494
            $progressCallback($count);
495
        } while ($results && 10000 == count($results));
496
    }
497
}
498