Completed
Push — master ( 8a683b...96b103 )
by Matthew
05:26 queued 53s
created

JobManager::pruneErroneousJobs()   A

Complexity

Conditions 1
Paths 1

Size

Total Lines 13
Code Lines 8

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 8
CRAP Score 1

Importance

Changes 0
Metric Value
dl 0
loc 13
ccs 8
cts 8
cp 1
rs 9.4285
c 0
b 0
f 0
cc 1
eloc 8
nc 1
nop 2
crap 1
1
<?php
2
3
namespace Dtc\QueueBundle\ORM;
4
5
use Doctrine\ORM\EntityManager;
6
use Doctrine\ORM\EntityRepository;
7
use Doctrine\ORM\QueryBuilder;
8
use Dtc\QueueBundle\Doctrine\BaseJobManager;
9
use Dtc\QueueBundle\Entity\Job;
10
use Dtc\QueueBundle\Model\BaseJob;
11
use Dtc\QueueBundle\Model\RetryableJob;
12
use Symfony\Component\Process\Exception\LogicException;
13
14
class JobManager extends BaseJobManager
15
{
16
    use CommonTrait;
17
    protected static $saveInsertCalled = null;
18
    protected static $resetInsertCalled = null;
19
20 3
    public function countJobsByStatus($objectName, $status, $workerName = null, $method = null)
21
    {
22
        /** @var EntityManager $objectManager */
23 3
        $objectManager = $this->getObjectManager();
24
25
        $queryBuilder = $objectManager
26 3
            ->createQueryBuilder()
27 3
            ->select('count(a.id)')
28 3
            ->from($objectName, 'a')
29 3
            ->where('a.status = :status');
30
31 3
        if (null !== $workerName) {
32 1
            $queryBuilder->andWhere('a.workerName = :workerName')
33 1
                ->setParameter(':workerName', $workerName);
34 1
        }
35
36 3
        if (null !== $method) {
37 1
            $queryBuilder->andWhere('a.method = :method')
38 1
                ->setParameter(':method', $workerName);
39 1
        }
40
41 3
        $count = $queryBuilder->setParameter(':status', $status)
42 3
            ->getQuery()->getSingleScalarResult();
43
44 3
        if (!$count) {
45 1
            return 0;
46
        }
47
48 3
        return $count;
49
    }
50
51
    /**
52
     * @param string|null $workerName
53
     * @param string|null $method
54
     *
55
     * @return int Count of jobs pruned
56
     */
57 1
    public function pruneErroneousJobs($workerName = null, $method = null)
58
    {
59
        /** @var EntityManager $objectManager */
60 1
        $objectManager = $this->getObjectManager();
61 1
        $queryBuilder = $objectManager->createQueryBuilder()->delete($this->getArchiveObjectName(), 'j');
62 1
        $queryBuilder->where('j.status = :status')
63 1
            ->setParameter(':status', BaseJob::STATUS_ERROR);
64
65 1
        $this->addWorkerNameCriterion($queryBuilder, $workerName, $method);
66 1
        $query = $queryBuilder->getQuery();
67
68 1
        return intval($query->execute());
69
    }
70
71 14
    protected function resetSaveOk($function)
72 1
    {
73 14
        $objectManager = $this->getObjectManager();
74 14
        $splObjectHash = spl_object_hash($objectManager);
75
76 14
        if ('save' === $function) {
77
            $compare = static::$resetInsertCalled;
78
        } else {
79 14
            $compare = static::$saveInsertCalled;
80
        }
81
82 14
        if ($splObjectHash === $compare) {
83
            // Insert SQL is cached...
84
            $msg = "Can't call save and reset within the same process cycle (or using the same EntityManager)";
85
            throw new LogicException($msg);
86
        }
87
88 14
        if ('save' === $function) {
89
            static::$saveInsertCalled = spl_object_hash($objectManager);
90
        } else {
91 14
            static::$resetInsertCalled = spl_object_hash($objectManager);
92
        }
93 14
    }
94
95
    /**
96
     * @param string $workerName
97
     * @param string $method
98
     */
99 8
    protected function addWorkerNameCriterion(QueryBuilder $queryBuilder, $workerName = null, $method = null)
100
    {
101 8
        if (null !== $workerName) {
102 3
            $queryBuilder->andWhere('j.workerName = :workerName')->setParameter(':workerName', $workerName);
103 3
        }
104
105 8
        if (null !== $method) {
106 2
            $queryBuilder->andWhere('j.method = :method')->setParameter(':method', $method);
107 2
        }
108 8
    }
109
110 1
    protected function updateExpired($workerName = null, $method = null)
111
    {
112
        /** @var EntityManager $objectManager */
113 1
        $objectManager = $this->getObjectManager();
114 1
        $queryBuilder = $objectManager->createQueryBuilder()->update($this->getObjectName(), 'j');
115 1
        $queryBuilder->set('j.status', ':newStatus');
116 1
        $queryBuilder->where('j.expiresAt <= :expiresAt')
117 1
            ->setParameter(':expiresAt', new \DateTime());
118 1
        $queryBuilder->andWhere('j.status = :status')
119 1
            ->setParameter(':status', BaseJob::STATUS_NEW)
120 1
            ->setParameter(':newStatus', Job::STATUS_EXPIRED);
121
122 1
        $this->addWorkerNameCriterion($queryBuilder, $workerName, $method);
123 1
        $query = $queryBuilder->getQuery();
124
125 1
        return intval($query->execute());
126
    }
127
128
    /**
129
     * Removes archived jobs older than $olderThan.
130
     *
131
     * @param \DateTime $olderThan
132
     */
133 1
    public function pruneArchivedJobs(\DateTime $olderThan)
134
    {
135 1
        return $this->removeOlderThan($this->getArchiveObjectName(),
136 1
                'updatedAt',
137 1
                $olderThan);
138
    }
139
140 2
    public function getJobCount($workerName = null, $method = null)
141
    {
142
        /** @var EntityManager $objectManager */
143 2
        $objectManager = $this->getObjectManager();
144 2
        $queryBuilder = $objectManager->createQueryBuilder();
145
146 2
        $queryBuilder = $queryBuilder->select('count(j)')->from($this->getObjectName(), 'j');
147
148 2
        $where = 'where';
149 2
        if (null !== $workerName) {
150
            if (null !== $method) {
151
                $queryBuilder->where($queryBuilder->expr()->andX(
152
                    $queryBuilder->expr()->eq('j.workerName', ':workerName'),
153
                                                $queryBuilder->expr()->eq('j.method', ':method')
154
                ))
155
                    ->setParameter(':method', $method);
156
            } else {
157
                $queryBuilder->where('j.workerName = :workerName');
158
            }
159
            $queryBuilder->setParameter(':workerName', $workerName);
160
            $where = 'andWhere';
161 2
        } elseif (null !== $method) {
162
            $queryBuilder->where('j.method = :method')->setParameter(':method', $method);
163
            $where = 'andWhere';
164
        }
165
166 2
        $dateTime = new \DateTime();
167
        // Filter
168
        $queryBuilder
169 2
            ->$where($queryBuilder->expr()->orX(
170 2
                $queryBuilder->expr()->isNull('j.whenAt'),
171 2
                                        $queryBuilder->expr()->lte('j.whenAt', ':whenAt')
172 2
            ))
173 2
            ->andWhere($queryBuilder->expr()->orX(
174 2
                $queryBuilder->expr()->isNull('j.expiresAt'),
175 2
                $queryBuilder->expr()->gt('j.expiresAt', ':expiresAt')
176 2
            ))
177 2
            ->andWhere('j.locked is NULL')
178 2
            ->setParameter(':whenAt', $dateTime)
179 2
            ->setParameter(':expiresAt', $dateTime);
180
181 2
        $query = $queryBuilder->getQuery();
182
183 2
        return $query->getSingleScalarResult();
184
    }
185
186
    /**
187
     * For ORM it's prudent to wrap things in a transaction.
188
     *
189
     * @param $i
190
     * @param $count
191
     * @param array $stalledJobs
192
     * @param $countProcessed
193
     */
194 1
    protected function runStalledLoop($i, $count, array $stalledJobs, &$countProcessed)
195
    {
196
        /** @var EntityManager $objectManager */
197 1
        $objectManager = $this->getObjectManager();
198
        try {
199 1
            $objectManager->beginTransaction();
200 1
            parent::runStalledLoop($i, $count, $stalledJobs, $countProcessed);
201 1
            $objectManager->commit();
202 1
        } catch (\Exception $exception) {
203
            $objectManager->rollback();
204
205
            // Try again
206
            parent::runStalledLoop($i, $count, $stalledJobs, $countProcessed);
207
        }
208 1
    }
209
210
    /**
211
     * Get Jobs statuses.
212
     */
213 2
    public function getStatus()
214
    {
215 2
        $result = [];
216 2
        $this->getStatusByEntityName($this->getObjectName(), $result);
217 2
        $this->getStatusByEntityName($this->getArchiveObjectName(), $result);
218
219 2
        $finalResult = [];
220 2
        foreach ($result as $key => $item) {
221 1
            ksort($item);
222 1
            foreach ($item as $status => $count) {
223 1
                if (isset($finalResult[$key][$status])) {
224
                    $finalResult[$key][$status] += $count;
225
                } else {
226 1
                    $finalResult[$key][$status] = $count;
227
                }
228 1
            }
229 2
        }
230
231 2
        return $finalResult;
232
    }
233
234
    /**
235
     * @param string $entityName
236
     */
237 2
    protected function getStatusByEntityName($entityName, array &$result)
238
    {
239
        /** @var EntityManager $objectManager */
240 2
        $objectManager = $this->getObjectManager();
241 2
        $result1 = $objectManager->getRepository($entityName)->createQueryBuilder('j')->select('j.workerName, j.method, j.status, count(j) as c')
242 2
            ->groupBy('j.workerName, j.method, j.status')->getQuery()->getArrayResult();
243
244 2
        foreach ($result1 as $item) {
245 1
            $method = $item['workerName'].'->'.$item['method'].'()';
246 1
            if (!isset($result[$method])) {
247 1
                $result[$method] = [BaseJob::STATUS_NEW => 0,
248 1
                    BaseJob::STATUS_RUNNING => 0,
249 1
                    RetryableJob::STATUS_EXPIRED => 0,
250 1
                    RetryableJob::STATUS_MAX_ERROR => 0,
251 1
                    RetryableJob::STATUS_MAX_STALLED => 0,
252 1
                    RetryableJob::STATUS_MAX_RETRIES => 0,
253 1
                    BaseJob::STATUS_SUCCESS => 0,
254 1
                    BaseJob::STATUS_ERROR => 0, ];
255 1
            }
256 1
            $result[$method][$item['status']] += intval($item['c']);
257 2
        }
258 2
    }
259
260
    /**
261
     * Get the next job to run (can be filtered by workername and method name).
262
     *
263
     * @param string $workerName
264
     * @param string $methodName
265
     * @param bool   $prioritize
266
     *
267
     * @return Job|null
268
     */
269 6
    public function getJob($workerName = null, $methodName = null, $prioritize = true, $runId = null)
270
    {
271
        /** @var EntityRepository $repository */
272 6
        $repository = $this->getRepository();
273 6
        $queryBuilder = $repository->createQueryBuilder('j');
274 6
        $dateTime = new \DateTime();
275
        $queryBuilder
276 6
            ->select('j.id')
277 6
            ->where('j.status = :status')->setParameter(':status', BaseJob::STATUS_NEW)
278 6
            ->andWhere('j.locked is NULL')
279 6
            ->andWhere($queryBuilder->expr()->orX(
280 6
                $queryBuilder->expr()->isNull('j.whenAt'),
281 6
                        $queryBuilder->expr()->lte('j.whenAt', ':whenAt')
282 6
            ))
283 6
            ->andWhere($queryBuilder->expr()->orX(
284 6
                $queryBuilder->expr()->isNull('j.expiresAt'),
285 6
                        $queryBuilder->expr()->gt('j.expiresAt', ':expiresAt')
286 6
            ))
287 6
            ->setParameter(':whenAt', $dateTime)
288 6
            ->setParameter(':expiresAt', $dateTime);
289
290 6
        $this->addWorkerNameCriterion($queryBuilder, $workerName, $methodName);
291
292 6
        if ($prioritize) {
293 6
            $queryBuilder->add('orderBy', 'j.priority DESC, j.whenAt ASC');
0 ignored issues
show
Documentation introduced by
'j.priority DESC, j.whenAt ASC' is of type string, but the function expects a object<Doctrine\ORM\Query\Expr\Base>.

It seems like the type of the argument is not accepted by the function/method which you are calling.

In some cases, in particular if PHP’s automatic type-juggling kicks in this might be fine. In other cases, however this might be a bug.

We suggest to add an explicit type cast like in the following example:

function acceptsInteger($int) { }

$x = '123'; // string "123"

// Instead of
acceptsInteger($x);

// we recommend to use
acceptsInteger((integer) $x);
Loading history...
294 6
        } else {
295
            $queryBuilder->orderBy('j.whenAt', 'ASC');
296
        }
297 6
        $queryBuilder->setMaxResults(1);
298
299
        /** @var QueryBuilder $queryBuilder */
300 6
        $query = $queryBuilder->getQuery();
301 6
        $jobs = $query->getResult();
302
303 6
        return $this->takeJob($jobs, $runId);
304
    }
305
306 6
    protected function takeJob($jobs, $runId = null)
307
    {
308 6
        if (isset($jobs[0]['id'])) {
309 5
            $repository = $this->getRepository();
310 5
            $queryBuilder = $repository->createQueryBuilder('j');
311
            $queryBuilder
312 5
                ->update()
313 5
                ->set('j.locked', ':locked')
314 5
                ->setParameter(':locked', true)
315 5
                ->set('j.lockedAt', ':lockedAt')
316 5
                ->setParameter(':lockedAt', new \DateTime())
317 5
                ->set('j.status', ':status')
318 5
                ->setParameter(':status', BaseJob::STATUS_RUNNING);
319 5
            if (null !== $runId) {
320
                $queryBuilder
321
                    ->set('runId', ':runId')
322
                    ->setParameter(':runId', $runId);
323
            }
324 5
            $queryBuilder->where('j.id = :id');
325 5
            $queryBuilder->andWhere('j.locked is NULL');
326 5
            $queryBuilder->setParameter(':id', $jobs[0]['id']);
327 5
            $resultCount = $queryBuilder->getQuery()->execute();
328
329 5
            if (1 === $resultCount) {
330 5
                return $repository->find($jobs[0]['id']);
331
            }
332
        }
333
334 4
        return null;
335
    }
336
337
    /**
338
     * Tries to update the nearest job as a batch.
339
     *
340
     * @param \Dtc\QueueBundle\Model\Job $job
341
     *
342
     * @return mixed|null
343
     */
344 1
    public function updateNearestBatch(\Dtc\QueueBundle\Model\Job $job)
345
    {
346 1
        $oldJob = null;
347 1
        $retries = 0;
348
        do {
349
            try {
350
                /** @var EntityManager $entityManager */
351 1
                $entityManager = $this->getObjectManager();
352 1
                $entityManager->beginTransaction();
353
354
                /** @var QueryBuilder $queryBuilder */
355 1
                $queryBuilder = $this->getRepository()->createQueryBuilder('j');
356 1
                $queryBuilder->select()
357 1
                    ->where('j.crcHash = :crcHash')
358 1
                    ->andWhere('j.status = :status')
359 1
                    ->setParameter(':status', BaseJob::STATUS_NEW)
360 1
                    ->setParameter(':crcHash', $job->getCrcHash())
361 1
                    ->orderBy('j.whenAt', 'ASC')
362 1
                    ->setMaxResults(1);
363 1
                $oldJobs = $queryBuilder->getQuery()->execute();
364
365 1
                if (empty($oldJobs)) {
366
                    return null;
367
                }
368 1
                $oldJob = $oldJobs[0];
369
370 1
                $oldJob->setPriority(max($job->getPriority(), $oldJob->getPriority()));
371 1
                $oldJob->setWhenAt(min($job->getWhenAt(), $oldJob->getWhenAt()));
372
373 1
                $entityManager->persist($oldJob);
374 1
                $entityManager->commit();
375 1
                $this->flush();
376 1
            } catch (\Exception $exception) {
377
                ++$retries;
378
                $entityManager->rollback();
379
            }
380 1
        } while (null === $oldJob && $retries < 5); // After 5 retries assume database is down or too much contention
381
382 1
        return $oldJob;
383
    }
384
}
385