Completed
Push — master ( fa7a2a...0b2176 )
by Matthew
08:03
created

JobManager::pruneErroneousJobs()   A

Complexity

Conditions 1
Paths 1

Size

Total Lines 13
Code Lines 8

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 8
CRAP Score 1

Importance

Changes 0
Metric Value
dl 0
loc 13
ccs 8
cts 8
cp 1
rs 9.4285
c 0
b 0
f 0
cc 1
eloc 8
nc 1
nop 2
crap 1
1
<?php
2
3
namespace Dtc\QueueBundle\ORM;
4
5
use Doctrine\DBAL\LockMode;
6
use Doctrine\ORM\EntityManager;
7
use Doctrine\ORM\EntityRepository;
8
use Doctrine\ORM\QueryBuilder;
9
use Dtc\QueueBundle\Doctrine\BaseJobManager;
10
use Dtc\QueueBundle\Entity\Job;
11
use Dtc\QueueBundle\Model\BaseJob;
12
use Dtc\QueueBundle\Model\RetryableJob;
13
14
class JobManager extends BaseJobManager
15
{
16
    use CommonTrait;
17
    protected $formerIdGenerators;
18
    protected static $saveInsertCalled = null;
19
    protected static $resetInsertCalled = null;
20
21 3
    public function countJobsByStatus($objectName, $status, $workerName = null, $method = null)
22
    {
23
        /** @var EntityManager $objectManager */
24 3
        $objectManager = $this->getObjectManager();
25
26
        $qb = $objectManager
27 3
            ->createQueryBuilder()
28 3
            ->select('count(a.id)')
29 3
            ->from($objectName, 'a')
30 3
            ->where('a.status = :status');
31
32 3
        if (null !== $workerName) {
33 1
            $qb->andWhere('a.workerName = :workerName')
34 1
                ->setParameter(':workerName', $workerName);
35 1
        }
36
37 3
        if (null !== $method) {
38 1
            $qb->andWhere('a.method = :method')
39 1
                ->setParameter(':method', $workerName);
40 1
        }
41
42 3
        $count = $qb->setParameter(':status', $status)
43 3
            ->getQuery()->getSingleScalarResult();
44
45 3
        if (!$count) {
46 1
            return 0;
47
        }
48
49 3
        return $count;
50
    }
51
52
    /**
53
     * @param string|null $workerName
54
     * @param string|null $method
55
     *
56
     * @return int Count of jobs pruned
57
     */
58 1
    public function pruneErroneousJobs($workerName = null, $method = null)
59
    {
60
        /** @var EntityManager $objectManager */
61 1
        $objectManager = $this->getObjectManager();
62 1
        $qb = $objectManager->createQueryBuilder()->delete($this->getArchiveObjectName(), 'j');
63 1
        $qb->where('j.status = :status')
64 1
            ->setParameter(':status', BaseJob::STATUS_ERROR);
65
66 1
        $this->addWorkerNameCriterion($qb, $workerName, $method);
67 1
        $query = $qb->getQuery();
68
69 1
        return intval($query->execute());
70
    }
71
72 14
    protected function resetSaveOk($function)
73
    {
74 14
        $objectManager = $this->getObjectManager();
75 14
        $splObjectHash = spl_object_hash($objectManager);
76
77 14
        if ('save' === $function) {
78
            $compare = static::$resetInsertCalled;
79
        } else {
80 14
            $compare = static::$saveInsertCalled;
81
        }
82
83 14
        if ($splObjectHash === $compare) {
84
            // Insert SQL is cached...
85
            $msg = "Can't call save and reset within the same process cycle (or using the same EntityManager)";
86
            throw new \Exception($msg);
87
        }
88
89 14
        if ('save' === $function) {
90
            static::$saveInsertCalled = spl_object_hash($objectManager);
91
        } else {
92 14
            static::$resetInsertCalled = spl_object_hash($objectManager);
93
        }
94 14
    }
95
96
    /**
97
     * @param string $workerName
98
     * @param string $method
99
     */
100 8
    protected function addWorkerNameCriterion(QueryBuilder $queryBuilder, $workerName = null, $method = null)
101
    {
102 8
        if (null !== $workerName) {
103 3
            $queryBuilder->andWhere('j.workerName = :workerName')->setParameter(':workerName', $workerName);
104 3
        }
105
106 8
        if (null !== $method) {
107 2
            $queryBuilder->andWhere('j.method = :method')->setParameter(':method', $method);
108 2
        }
109 8
    }
110
111 1
    protected function updateExpired($workerName = null, $method = null)
112
    {
113
        /** @var EntityManager $objectManager */
114 1
        $objectManager = $this->getObjectManager();
115 1
        $qb = $objectManager->createQueryBuilder()->update($this->getObjectName(), 'j');
116 1
        $qb->set('j.status', ':newStatus');
117 1
        $qb->where('j.expiresAt <= :expiresAt')
118 1
            ->setParameter(':expiresAt', new \DateTime());
119 1
        $qb->andWhere('j.status = :status')
120 1
            ->setParameter(':status', BaseJob::STATUS_NEW)
121 1
            ->setParameter(':newStatus', Job::STATUS_EXPIRED);
122
123 1
        $this->addWorkerNameCriterion($qb, $workerName, $method);
124 1
        $query = $qb->getQuery();
125
126 1
        return intval($query->execute());
127
    }
128
129
    /**
130
     * Removes archived jobs older than $olderThan.
131
     *
132
     * @param \DateTime $olderThan
133
     */
134 1
    public function pruneArchivedJobs(\DateTime $olderThan)
135
    {
136
        /** @var EntityManager $entityManager */
137 1
        $entityManager = $this->getObjectManager();
0 ignored issues
show
Unused Code introduced by
$entityManager is not used, you could remove the assignment.

This check looks for variable assignements that are either overwritten by other assignments or where the variable is not used subsequently.

$myVar = 'Value';
$higher = false;

if (rand(1, 6) > 3) {
    $higher = true;
} else {
    $higher = false;
}

Both the $myVar assignment in line 1 and the $higher assignment in line 2 are dead. The first because $myVar is never used and the second because $higher is always overwritten for every possible time line.

Loading history...
138
139 1
        return $this->removeOlderThan($this->getArchiveObjectName(),
140 1
                'updatedAt',
141 1
                $olderThan);
142
    }
143
144 2
    public function getJobCount($workerName = null, $method = null)
145
    {
146
        /** @var EntityManager $objectManager */
147 2
        $objectManager = $this->getObjectManager();
148 2
        $qb = $objectManager->createQueryBuilder();
149
150 2
        $qb = $qb->select('count(j)')->from($this->getObjectName(), 'j');
151
152 2
        $where = 'where';
153 2
        if (null !== $workerName) {
154
            if (null !== $method) {
155
                $qb->where($qb->expr()->andX(
156
                    $qb->expr()->eq('j.workerName', ':workerName'),
157
                                                $qb->expr()->eq('j.method', ':method')
158
                ))
159
                    ->setParameter(':method', $method);
160
            } else {
161
                $qb->where('j.workerName = :workerName');
162
            }
163
            $qb->setParameter(':workerName', $workerName);
164
            $where = 'andWhere';
165 2
        } elseif (null !== $method) {
166
            $qb->where('j.method = :method')->setParameter(':method', $method);
167
            $where = 'andWhere';
168
        }
169
170 2
        $dateTime = new \DateTime();
171
        // Filter
172
        $qb
173 2
            ->$where($qb->expr()->orX(
174 2
                $qb->expr()->isNull('j.whenAt'),
175 2
                                        $qb->expr()->lte('j.whenAt', ':whenAt')
176 2
            ))
177 2
            ->andWhere($qb->expr()->orX(
178 2
                $qb->expr()->isNull('j.expiresAt'),
179 2
                $qb->expr()->gt('j.expiresAt', ':expiresAt')
180 2
            ))
181 2
            ->andWhere('j.locked is NULL')
182 2
            ->setParameter(':whenAt', $dateTime)
183 2
            ->setParameter(':expiresAt', $dateTime);
184
185 2
        $query = $qb->getQuery();
186
187 2
        return $query->getSingleScalarResult();
188
    }
189
190
    /**
191
     * Get Jobs statuses.
192
     */
193 2
    public function getStatus()
194
    {
195 2
        $result = [];
196 2
        $this->getStatusByEntityName($this->getObjectName(), $result);
197 2
        $this->getStatusByEntityName($this->getArchiveObjectName(), $result);
198
199 2
        $finalResult = [];
200 2
        foreach ($result as $key => $item) {
201 1
            ksort($item);
202 1
            foreach ($item as $status => $count) {
203 1
                if (isset($finalResult[$key][$status])) {
204
                    $finalResult[$key][$status] += $count;
205
                } else {
206 1
                    $finalResult[$key][$status] = $count;
207
                }
208 1
            }
209 2
        }
210
211 2
        return $finalResult;
212
    }
213
214
    /**
215
     * @param string $entityName
216
     */
217 2
    protected function getStatusByEntityName($entityName, array &$result)
218
    {
219
        /** @var EntityManager $objectManager */
220 2
        $objectManager = $this->getObjectManager();
221 2
        $result1 = $objectManager->getRepository($entityName)->createQueryBuilder('j')->select('j.workerName, j.method, j.status, count(j) as c')
222 2
            ->groupBy('j.workerName, j.method, j.status')->getQuery()->getArrayResult();
223
224 2
        foreach ($result1 as $item) {
225 1
            $method = $item['workerName'].'->'.$item['method'].'()';
226 1
            if (!isset($result[$method])) {
227 1
                $result[$method] = [BaseJob::STATUS_NEW => 0,
228 1
                    BaseJob::STATUS_RUNNING => 0,
229 1
                    RetryableJob::STATUS_EXPIRED => 0,
230 1
                    RetryableJob::STATUS_MAX_ERROR => 0,
231 1
                    RetryableJob::STATUS_MAX_STALLED => 0,
232 1
                    RetryableJob::STATUS_MAX_RETRIES => 0,
233 1
                    BaseJob::STATUS_SUCCESS => 0,
234 1
                    BaseJob::STATUS_ERROR => 0, ];
235 1
            }
236 1
            $result[$method][$item['status']] += intval($item['c']);
237 2
        }
238 2
    }
239
240
    /**
241
     * Get the next job to run (can be filtered by workername and method name).
242
     *
243
     * @param string $workerName
244
     * @param string $methodName
245
     * @param bool   $prioritize
246
     *
247
     * @return Job|null
248
     */
249 6
    public function getJob($workerName = null, $methodName = null, $prioritize = true, $runId = null)
250
    {
251 6
        $uniqid = uniqid(gethostname().'-'.getmypid(), true);
252 6
        $hash = hash('sha256', $uniqid);
253
254
        /** @var EntityManager $objectManager */
255 6
        $objectManager = $this->getObjectManager();
256
257 6
        $objectManager->beginTransaction();
258
259
        /** @var EntityRepository $repository */
260 6
        $repository = $this->getRepository();
261 6
        $qb = $repository->createQueryBuilder('j');
262 6
        $dateTime = new \DateTime();
263
        $qb
264 6
            ->select('j')
265 6
            ->where('j.status = :status')->setParameter(':status', BaseJob::STATUS_NEW)
266 6
            ->andWhere('j.locked is NULL')
267 6
            ->andWhere($qb->expr()->orX(
268 6
                $qb->expr()->isNull('j.whenAt'),
269 6
                        $qb->expr()->lte('j.whenAt', ':whenAt')
270 6
            ))
271 6
            ->andWhere($qb->expr()->orX(
272 6
                $qb->expr()->isNull('j.expiresAt'),
273 6
                        $qb->expr()->gt('j.expiresAt', ':expiresAt')
274 6
            ))
275 6
            ->setParameter(':whenAt', $dateTime)
276 6
            ->setParameter(':expiresAt', $dateTime);
277
278 6
        $this->addWorkerNameCriterion($qb, $workerName, $methodName);
279
280 6
        if ($prioritize) {
281 6
            $qb->add('orderBy', 'j.priority DESC, j.whenAt ASC');
0 ignored issues
show
Documentation introduced by
'j.priority DESC, j.whenAt ASC' is of type string, but the function expects a object<Doctrine\ORM\Query\Expr\Base>.

It seems like the type of the argument is not accepted by the function/method which you are calling.

In some cases, in particular if PHP’s automatic type-juggling kicks in this might be fine. In other cases, however this might be a bug.

We suggest to add an explicit type cast like in the following example:

function acceptsInteger($int) { }

$x = '123'; // string "123"

// Instead of
acceptsInteger($x);

// we recommend to use
acceptsInteger((integer) $x);
Loading history...
282 6
        } else {
283
            $qb->orderBy('j.whenAt', 'ASC');
284
        }
285 6
        $qb->setMaxResults(1);
286
287
        /** @var QueryBuilder $qb */
288 6
        $query = $qb->getQuery();
289 6
        $query->setLockMode(LockMode::PESSIMISTIC_WRITE);
290 6
        $jobs = $query->getResult();
291
292 6
        if ($jobs) {
293
            /** @var Job $job */
294 5
            $job = $jobs[0];
295 5
            if (!$job) {
296
                throw new \Exception("No job found for $hash, even though last result was count ".count($jobs));
297
            }
298 5
            $job->setLocked(true);
299 5
            $job->setLockedAt(new \DateTime());
300 5
            $job->setStatus(BaseJob::STATUS_RUNNING);
301 5
            $job->setRunId($runId);
302 5
            $objectManager->commit();
303 5
            $objectManager->flush();
304
305 5
            return $job;
306
        }
307
308 3
        $objectManager->rollback();
309
310 3
        return null;
311
    }
312
313
    /**
314
     * Tries to update the nearest job as a batch.
315
     *
316
     * @param \Dtc\QueueBundle\Model\Job $job
317
     *
318
     * @return mixed|null
319
     */
320 2
    public function updateNearestBatch(\Dtc\QueueBundle\Model\Job $job)
321
    {
322 1
        $oldJob = null;
323
        do {
324
            try {
325
                /** @var EntityManager $entityManager */
326 1
                $entityManager = $this->getObjectManager();
327 1
                $entityManager->beginTransaction();
328
329
                /** @var QueryBuilder $queryBuilder */
330 1
                $queryBuilder = $this->getRepository()->createQueryBuilder('j');
331 1
                $queryBuilder->select()
332 1
                    ->where('j.crcHash = :crcHash')
333 1
                    ->andWhere('j.status = :status')
334 2
                    ->setParameter(':status', BaseJob::STATUS_NEW)
335 1
                    ->setParameter(':crcHash', $job->getCrcHash())
336 1
                    ->orderBy('j.whenAt', 'ASC')
337 1
                    ->setMaxResults(1);
338 1
                $oldJob = $queryBuilder->getQuery()->getSingleResult();
339
340 1
                if (!$oldJob) {
341
                    return null;
342
                }
343
344 1
                $oldJob->setPriority(max($job->getPriority(), $oldJob->getPriority()));
345 1
                $oldJob->setWhenAt(min($job->getWhenAt(), $oldJob->getWhenAt()));
346
347 1
                $entityManager->persist($oldJob);
348 1
                $entityManager->commit();
349 1
                $entityManager->flush();
350 1
            } catch (\Exception $exception) {
351
                $entityManager->rollback();
352
            }
353 1
        } while (null === $oldJob);
354
355 1
        return $oldJob;
356
    }
357
}
358