Completed
Push — master ( 16d212...b7b31c )
by Matthew
07:34 queued 05:11
created

JobManager::pruneErroneousJobs()   A

Complexity

Conditions 1
Paths 1

Size

Total Lines 13
Code Lines 8

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 8
CRAP Score 1

Importance

Changes 0
Metric Value
dl 0
loc 13
ccs 8
cts 8
cp 1
rs 9.4285
c 0
b 0
f 0
cc 1
eloc 8
nc 1
nop 2
crap 1
1
<?php
2
3
namespace Dtc\QueueBundle\ORM;
4
5
use Doctrine\ORM\EntityManager;
6
use Doctrine\ORM\EntityRepository;
7
use Doctrine\ORM\QueryBuilder;
8
use Dtc\QueueBundle\Doctrine\BaseJobManager;
9
use Dtc\QueueBundle\Entity\Job;
10
use Dtc\QueueBundle\Model\BaseJob;
11
use Dtc\QueueBundle\Model\RetryableJob;
12
use Symfony\Component\Process\Exception\LogicException;
13
14
class JobManager extends BaseJobManager
15
{
16
    use CommonTrait;
17
    protected static $saveInsertCalled = null;
18
    protected static $resetInsertCalled = null;
19
20 3
    public function countJobsByStatus($objectName, $status, $workerName = null, $method = null)
21
    {
22
        /** @var EntityManager $objectManager */
23 3
        $objectManager = $this->getObjectManager();
24
25
        $queryBuilder = $objectManager
26 3
            ->createQueryBuilder()
27 3
            ->select('count(a.id)')
28 3
            ->from($objectName, 'a')
29 3
            ->where('a.status = :status');
30
31 3
        if (null !== $workerName) {
32 1
            $queryBuilder->andWhere('a.workerName = :workerName')
33 1
                ->setParameter(':workerName', $workerName);
34 1
        }
35
36 3
        if (null !== $method) {
37 1
            $queryBuilder->andWhere('a.method = :method')
38 1
                ->setParameter(':method', $workerName);
39 1
        }
40
41 3
        $count = $queryBuilder->setParameter(':status', $status)
42 3
            ->getQuery()->getSingleScalarResult();
43
44 3
        if (!$count) {
45 1
            return 0;
46
        }
47
48 3
        return $count;
49
    }
50
51
    /**
52
     * @param string|null $workerName
53
     * @param string|null $method
54
     *
55
     * @return int Count of jobs pruned
56
     */
57 1
    public function pruneErroneousJobs($workerName = null, $method = null)
58
    {
59
        /** @var EntityManager $objectManager */
60 1
        $objectManager = $this->getObjectManager();
61 1
        $queryBuilder = $objectManager->createQueryBuilder()->delete($this->getArchiveObjectName(), 'j');
62 1
        $queryBuilder->where('j.status = :status')
63 1
            ->setParameter(':status', BaseJob::STATUS_ERROR);
64
65 1
        $this->addWorkerNameCriterion($queryBuilder, $workerName, $method);
66 1
        $query = $queryBuilder->getQuery();
67
68 1
        return intval($query->execute());
69
    }
70
71 16
    protected function resetSaveOk($function)
72 1
    {
73 16
        $objectManager = $this->getObjectManager();
74 16
        $splObjectHash = spl_object_hash($objectManager);
75
76 16
        if ('save' === $function) {
77
            $compare = static::$resetInsertCalled;
78
        } else {
79 16
            $compare = static::$saveInsertCalled;
80
        }
81
82 16
        if ($splObjectHash === $compare) {
83
            // Insert SQL is cached...
84
            $msg = "Can't call save and reset within the same process cycle (or using the same EntityManager)";
85
            throw new LogicException($msg);
86
        }
87
88 16
        if ('save' === $function) {
89
            static::$saveInsertCalled = spl_object_hash($objectManager);
90
        } else {
91 16
            static::$resetInsertCalled = spl_object_hash($objectManager);
92
        }
93 16
    }
94
95
    /**
96
     * @param string $workerName
97
     * @param string $method
98
     */
99 10
    protected function addWorkerNameCriterion(QueryBuilder $queryBuilder, $workerName = null, $method = null)
100
    {
101 10
        if (null !== $workerName) {
102 3
            $queryBuilder->andWhere('j.workerName = :workerName')->setParameter(':workerName', $workerName);
103 3
        }
104
105 10
        if (null !== $method) {
106 2
            $queryBuilder->andWhere('j.method = :method')->setParameter(':method', $method);
107 2
        }
108 10
    }
109
110 1
    protected function updateExpired($workerName = null, $method = null)
111
    {
112
        /** @var EntityManager $objectManager */
113 1
        $objectManager = $this->getObjectManager();
114 1
        $queryBuilder = $objectManager->createQueryBuilder()->update($this->getObjectName(), 'j');
115 1
        $queryBuilder->set('j.status', ':newStatus');
116 1
        $queryBuilder->where('j.expiresAt <= :expiresAt')
117 1
            ->setParameter(':expiresAt', new \DateTime());
118 1
        $queryBuilder->andWhere('j.status = :status')
119 1
            ->setParameter(':status', BaseJob::STATUS_NEW)
120 1
            ->setParameter(':newStatus', Job::STATUS_EXPIRED);
121
122 1
        $this->addWorkerNameCriterion($queryBuilder, $workerName, $method);
123 1
        $query = $queryBuilder->getQuery();
124
125 1
        return intval($query->execute());
126
    }
127
128
    /**
129
     * Removes archived jobs older than $olderThan.
130
     *
131
     * @param \DateTime $olderThan
132
     */
133 1
    public function pruneArchivedJobs(\DateTime $olderThan)
134
    {
135 1
        return $this->removeOlderThan($this->getArchiveObjectName(),
136 1
                'updatedAt',
137 1
                $olderThan);
138
    }
139
140 2
    public function getJobCount($workerName = null, $method = null)
141
    {
142
        /** @var EntityManager $objectManager */
143 2
        $objectManager = $this->getObjectManager();
144 2
        $queryBuilder = $objectManager->createQueryBuilder();
145
146 2
        $queryBuilder = $queryBuilder->select('count(j)')->from($this->getObjectName(), 'j');
147
148 2
        $where = 'where';
149 2
        if (null !== $workerName) {
150
            if (null !== $method) {
151
                $queryBuilder->where($queryBuilder->expr()->andX(
152
                    $queryBuilder->expr()->eq('j.workerName', ':workerName'),
153
                                                $queryBuilder->expr()->eq('j.method', ':method')
154
                ))
155
                    ->setParameter(':method', $method);
156
            } else {
157
                $queryBuilder->where('j.workerName = :workerName');
158
            }
159
            $queryBuilder->setParameter(':workerName', $workerName);
160
            $where = 'andWhere';
161 2
        } elseif (null !== $method) {
162
            $queryBuilder->where('j.method = :method')->setParameter(':method', $method);
163
            $where = 'andWhere';
164
        }
165
166 2
        $dateTime = new \DateTime();
167
        // Filter
168
        $queryBuilder
169 2
            ->$where($queryBuilder->expr()->orX(
170 2
                $queryBuilder->expr()->isNull('j.whenAt'),
171 2
                                        $queryBuilder->expr()->lte('j.whenAt', ':whenAt')
172 2
            ))
173 2
            ->andWhere($queryBuilder->expr()->orX(
174 2
                $queryBuilder->expr()->isNull('j.expiresAt'),
175 2
                $queryBuilder->expr()->gt('j.expiresAt', ':expiresAt')
176 2
            ))
177 2
            ->andWhere('j.locked is NULL')
178 2
            ->setParameter(':whenAt', $dateTime)
179 2
            ->setParameter(':expiresAt', $dateTime);
180
181 2
        $query = $queryBuilder->getQuery();
182
183 2
        return $query->getSingleScalarResult();
184
    }
185
186
    /**
187
     * Get Jobs statuses.
188
     */
189 2
    public function getStatus()
190
    {
191 2
        $result = [];
192 2
        $this->getStatusByEntityName($this->getObjectName(), $result);
193 2
        $this->getStatusByEntityName($this->getArchiveObjectName(), $result);
194
195 2
        $finalResult = [];
196 2
        foreach ($result as $key => $item) {
197 1
            ksort($item);
198 1
            foreach ($item as $status => $count) {
199 1
                if (isset($finalResult[$key][$status])) {
200
                    $finalResult[$key][$status] += $count;
201
                } else {
202 1
                    $finalResult[$key][$status] = $count;
203
                }
204 1
            }
205 2
        }
206
207 2
        return $finalResult;
208
    }
209
210
    /**
211
     * @param string $entityName
212
     */
213 2
    protected function getStatusByEntityName($entityName, array &$result)
214
    {
215
        /** @var EntityManager $objectManager */
216 2
        $objectManager = $this->getObjectManager();
217 2
        $result1 = $objectManager->getRepository($entityName)->createQueryBuilder('j')->select('j.workerName, j.method, j.status, count(j) as c')
218 2
            ->groupBy('j.workerName, j.method, j.status')->getQuery()->getArrayResult();
219
220 2
        foreach ($result1 as $item) {
221 1
            $method = $item['workerName'].'->'.$item['method'].'()';
222 1
            if (!isset($result[$method])) {
223 1
                $result[$method] = [BaseJob::STATUS_NEW => 0,
224 1
                    BaseJob::STATUS_RUNNING => 0,
225 1
                    RetryableJob::STATUS_EXPIRED => 0,
226 1
                    RetryableJob::STATUS_MAX_ERROR => 0,
227 1
                    RetryableJob::STATUS_MAX_STALLED => 0,
228 1
                    RetryableJob::STATUS_MAX_RETRIES => 0,
229 1
                    BaseJob::STATUS_SUCCESS => 0,
230 1
                    BaseJob::STATUS_ERROR => 0, ];
231 1
            }
232 1
            $result[$method][$item['status']] += intval($item['c']);
233 2
        }
234 2
    }
235
236
    /**
237
     * Get the next job to run (can be filtered by workername and method name).
238
     *
239
     * @param string $workerName
240
     * @param string $methodName
241
     * @param bool   $prioritize
242
     *
243
     * @return Job|null
244
     */
245 8
    public function getJob($workerName = null, $methodName = null, $prioritize = true, $runId = null)
246
    {
247 8
        $queryBuilder = $this->getJobQueryBuilder($workerName, $methodName, $prioritize);
0 ignored issues
show
Bug introduced by
It seems like $workerName defined by parameter $workerName on line 245 can also be of type string; however, Dtc\QueueBundle\ORM\JobM...r::getJobQueryBuilder() does only seem to accept null, maybe add an additional type check?

This check looks at variables that have been passed in as parameters and are passed out again to other methods.

If the outgoing method call has stricter type requirements than the method itself, an issue is raised.

An additional type check may prevent trouble.

Loading history...
Bug introduced by
It seems like $methodName defined by parameter $methodName on line 245 can also be of type string; however, Dtc\QueueBundle\ORM\JobM...r::getJobQueryBuilder() does only seem to accept null, maybe add an additional type check?

This check looks at variables that have been passed in as parameters and are passed out again to other methods.

If the outgoing method call has stricter type requirements than the method itself, an issue is raised.

An additional type check may prevent trouble.

Loading history...
248 8
        $queryBuilder->setMaxResults(1);
249
250
        /** @var QueryBuilder $queryBuilder */
251 8
        $query = $queryBuilder->getQuery();
252 8
        $jobs = $query->getResult();
253
254 8
        return $this->takeJob($jobs, $runId);
255
    }
256
257
    /**
258
     * @param null $workerName
259
     * @param null $methodName
260
     * @param bool $prioritize
261
     *
262
     * @return QueryBuilder
263
     */
264 8
    public function getJobQueryBuilder($workerName = null, $methodName = null, $prioritize = true)
265
    {
266
        /** @var EntityRepository $repository */
267 8
        $repository = $this->getRepository();
268 8
        $queryBuilder = $repository->createQueryBuilder('j');
269 8
        $dateTime = new \DateTime();
270
        $queryBuilder
271 8
            ->select('j.id')
272 8
            ->where('j.status = :status')->setParameter(':status', BaseJob::STATUS_NEW)
273 8
            ->andWhere('j.locked is NULL')
274 8
            ->andWhere($queryBuilder->expr()->orX(
275 8
                $queryBuilder->expr()->isNull('j.whenAt'),
276 8
                $queryBuilder->expr()->lte('j.whenAt', ':whenAt')
277 8
            ))
278 8
            ->andWhere($queryBuilder->expr()->orX(
279 8
                $queryBuilder->expr()->isNull('j.expiresAt'),
280 8
                $queryBuilder->expr()->gt('j.expiresAt', ':expiresAt')
281 8
            ))
282 8
            ->setParameter(':whenAt', $dateTime)
283 8
            ->setParameter(':expiresAt', $dateTime);
284
285 8
        $this->addWorkerNameCriterion($queryBuilder, $workerName, $methodName);
286
287 8
        if ($prioritize) {
288 8
            $queryBuilder->addOrderBy('j.priority', 'DESC');
289 8
            $queryBuilder->addOrderBy('j.whenAt', 'ASC');
290 8
        } else {
291 1
            $queryBuilder->orderBy('j.whenAt', 'ASC');
292
        }
293
294 8
        return $queryBuilder;
295
    }
296
297 8
    protected function takeJob($jobs, $runId = null)
298
    {
299 8
        if (isset($jobs[0]['id'])) {
300
            /** @var EntityRepository $repository */
301 7
            $repository = $this->getRepository();
302
            /** @var QueryBuilder $queryBuilder */
303 7
            $queryBuilder = $repository->createQueryBuilder('j');
304
            $queryBuilder
305 7
                ->update()
306 7
                ->set('j.locked', ':locked')
307 7
                ->setParameter(':locked', true)
308 7
                ->set('j.lockedAt', ':lockedAt')
309 7
                ->setParameter(':lockedAt', new \DateTime())
310 7
                ->set('j.status', ':status')
311 7
                ->setParameter(':status', BaseJob::STATUS_RUNNING);
312 7
            if (null !== $runId) {
313
                $queryBuilder
314 1
                    ->set('j.runId', ':runId')
315 1
                    ->setParameter(':runId', $runId);
316 1
            }
317 7
            $queryBuilder->where('j.id = :id');
318 7
            $queryBuilder->andWhere('j.locked is NULL');
319 7
            $queryBuilder->setParameter(':id', $jobs[0]['id']);
320 7
            $resultCount = $queryBuilder->getQuery()->execute();
321
322 7
            if (1 === $resultCount) {
323 7
                return $repository->find($jobs[0]['id']);
324
            }
325
        }
326
327 4
        return null;
328
    }
329
330
    /**
331
     * Tries to update the nearest job as a batch.
332
     *
333
     * @param \Dtc\QueueBundle\Model\Job $job
334
     *
335
     * @return mixed|null
336
     */
337 1
    public function updateNearestBatch(\Dtc\QueueBundle\Model\Job $job)
338
    {
339
        /** @var QueryBuilder $queryBuilder */
340 1
        $queryBuilder = $this->getRepository()->createQueryBuilder('j');
341 1
        $queryBuilder->select()
342 1
            ->where('j.crcHash = :crcHash')
343 1
            ->andWhere('j.status = :status')
344 1
            ->setParameter(':status', BaseJob::STATUS_NEW)
345 1
            ->setParameter(':crcHash', $job->getCrcHash())
346 1
            ->orderBy('j.whenAt', 'ASC')
347 1
            ->setMaxResults(1);
348 1
        $existingJobs = $queryBuilder->getQuery()->execute();
349
350 1
        if (empty($existingJobs)) {
351
            return null;
352
        }
353
        /** @var Job $existingJob */
354 1
        $existingJob = $existingJobs[0];
355
356 1
        $newPriority = max($job->getPriority(), $existingJob->getPriority());
357 1
        $newWhenAt = min($job->getWhenAt(), $existingJob->getWhenAt());
358
359 1
        $this->updateBatchJob($existingJob, $newPriority, $newWhenAt);
360
361 1
        return $existingJob;
362
    }
363
364 1
    protected function updateBatchJob(Job $existingJob, $newPriority, $newWhenAt)
365
    {
366 1
        $existingPriority = $existingJob->getPriority();
367 1
        $existingWhenAt = $existingJob->getWhenAt();
368
369 1
        if ($newPriority !== $existingPriority || $newWhenAt !== $existingWhenAt) {
370
            /** @var EntityRepository $repository */
371 1
            $repository = $this->getRepository();
372
            /** @var QueryBuilder $queryBuilder */
373 1
            $queryBuilder = $repository->createQueryBuilder('j');
374 1
            $queryBuilder->update();
375 1
            if ($newPriority !== $existingPriority) {
376 1
                $existingJob->setPriority($newPriority);
377 1
                $queryBuilder->set('j.priority', ':priority')
378 1
                    ->setParameter(':priority', $newPriority);
379 1
            }
380 1
            if ($newWhenAt !== $existingWhenAt) {
381 1
                $existingJob->setWhenAt($newWhenAt);
382 1
                $queryBuilder->set('j.whenAt', ':whenAt')
383 1
                    ->setParameter(':whenAt', $newWhenAt);
384 1
            }
385 1
            $queryBuilder->where('j.id = :id');
386 1
            $queryBuilder->andWhere('j.locked is NULL');
387 1
            $queryBuilder->setParameter(':id', $existingJob->getId());
388 1
            $queryBuilder->getQuery()->execute();
389 1
        }
390
391 1
        return $existingJob;
392
    }
393
}
394