Completed
Pull Request — master (#24)
by Matthew
06:08 queued 27s
created

JobManager::getJob()   A

Complexity

Conditions 3
Paths 4

Size

Total Lines 56
Code Lines 38

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 35
CRAP Score 3.0014

Importance

Changes 0
Metric Value
dl 0
loc 56
ccs 35
cts 37
cp 0.9459
rs 9.7251
c 0
b 0
f 0
cc 3
eloc 38
nc 4
nop 4
crap 3.0014

How to fix   Long Method   

Long Method

Small methods make your code easier to understand, in particular if combined with a good name. Besides, if your method is small, finding a good name is usually much easier.

For example, if you find yourself adding comments to a method's body, this is usually a good sign to extract the commented part to a new method, and use the comment as a starting point when coming up with a good name for this new method.

Commonly applied refactorings include:

1
<?php
2
3
namespace Dtc\QueueBundle\ORM;
4
5
use Doctrine\DBAL\LockMode;
6
use Doctrine\ORM\EntityManager;
7
use Doctrine\ORM\EntityRepository;
8
use Doctrine\ORM\QueryBuilder;
9
use Dtc\QueueBundle\Doctrine\BaseJobManager;
10
use Dtc\QueueBundle\Entity\Job;
11
use Dtc\QueueBundle\Model\BaseJob;
12
use Dtc\QueueBundle\Model\RetryableJob;
13
use Symfony\Component\Process\Exception\LogicException;
14
15
class JobManager extends BaseJobManager
16
{
17
    use CommonTrait;
18
    protected static $saveInsertCalled = null;
19
    protected static $resetInsertCalled = null;
20 3
21
    public function countJobsByStatus($objectName, $status, $workerName = null, $method = null)
22
    {
23 3
        /** @var EntityManager $objectManager */
24
        $objectManager = $this->getObjectManager();
25
26 3
        $qb = $objectManager
27 3
            ->createQueryBuilder()
28 3
            ->select('count(a.id)')
29 3
            ->from($objectName, 'a')
30
            ->where('a.status = :status');
31 3
32 1
        if (null !== $workerName) {
33 1
            $qb->andWhere('a.workerName = :workerName')
34
                ->setParameter(':workerName', $workerName);
35
        }
36 3
37 1
        if (null !== $method) {
38 1
            $qb->andWhere('a.method = :method')
39
                ->setParameter(':method', $workerName);
40
        }
41 3
42 3
        $count = $qb->setParameter(':status', $status)
43
            ->getQuery()->getSingleScalarResult();
44 3
45 1
        if (!$count) {
46
            return 0;
47
        }
48 3
49
        return $count;
50
    }
51
52
    /**
53
     * @param string|null $workerName
54
     * @param string|null $method
55
     *
56
     * @return int Count of jobs pruned
57 1
     */
58
    public function pruneErroneousJobs($workerName = null, $method = null)
59
    {
60 1
        /** @var EntityManager $objectManager */
61 1
        $objectManager = $this->getObjectManager();
62 1
        $qb = $objectManager->createQueryBuilder()->delete($this->getArchiveObjectName(), 'j');
63 1
        $qb->where('j.status = :status')
64
            ->setParameter(':status', BaseJob::STATUS_ERROR);
65 1
66 1
        $this->addWorkerNameCriterion($qb, $workerName, $method);
67
        $query = $qb->getQuery();
68 1
69
        return intval($query->execute());
70
    }
71 14
72
    protected function resetSaveOk($function)
73 14
    {
74 14
        $objectManager = $this->getObjectManager();
75
        $splObjectHash = spl_object_hash($objectManager);
76 14
77
        if ('save' === $function) {
78
            $compare = static::$resetInsertCalled;
79 14
        } else {
80
            $compare = static::$saveInsertCalled;
81
        }
82 14
83
        if ($splObjectHash === $compare) {
84
            // Insert SQL is cached...
85
            $msg = "Can't call save and reset within the same process cycle (or using the same EntityManager)";
86
            throw new LogicException($msg);
87
        }
88 14
89
        if ('save' === $function) {
90
            static::$saveInsertCalled = spl_object_hash($objectManager);
91 14
        } else {
92
            static::$resetInsertCalled = spl_object_hash($objectManager);
93 14
        }
94
    }
95
96
    /**
97
     * @param string $workerName
98
     * @param string $method
99 8
     */
100
    protected function addWorkerNameCriterion(QueryBuilder $queryBuilder, $workerName = null, $method = null)
101 8
    {
102 3
        if (null !== $workerName) {
103
            $queryBuilder->andWhere('j.workerName = :workerName')->setParameter(':workerName', $workerName);
104
        }
105 8
106 2
        if (null !== $method) {
107
            $queryBuilder->andWhere('j.method = :method')->setParameter(':method', $method);
108 8
        }
109
    }
110 1
111
    protected function updateExpired($workerName = null, $method = null)
112
    {
113 1
        /** @var EntityManager $objectManager */
114 1
        $objectManager = $this->getObjectManager();
115 1
        $qb = $objectManager->createQueryBuilder()->update($this->getObjectName(), 'j');
116 1
        $qb->set('j.status', ':newStatus');
117 1
        $qb->where('j.expiresAt <= :expiresAt')
118 1
            ->setParameter(':expiresAt', new \DateTime());
119 1
        $qb->andWhere('j.status = :status')
120 1
            ->setParameter(':status', BaseJob::STATUS_NEW)
121
            ->setParameter(':newStatus', Job::STATUS_EXPIRED);
122 1
123 1
        $this->addWorkerNameCriterion($qb, $workerName, $method);
124
        $query = $qb->getQuery();
125 1
126
        return intval($query->execute());
127
    }
128 1
129
    protected function getJobCurrentStatus(\Dtc\QueueBundle\Model\Job $job)
130
    {
131 1
        /** @var EntityManager $objectManager */
132 1
        $objectManager = $this->getObjectManager();
133 1
        $qb = $objectManager->createQueryBuilder()->select('j.status')->from($this->getObjectName(), 'j');
134
        $qb->where('j.id = :id')->setParameter(':id', $job->getId());
135 1
136
        return $qb->getQuery()->getSingleScalarResult();
137
    }
138
139
    /**
140
     * Removes archived jobs older than $olderThan.
141
     *
142
     * @param \DateTime $olderThan
143 1
     */
144
    public function pruneArchivedJobs(\DateTime $olderThan)
145 1
    {
146 1
        return $this->removeOlderThan($this->getArchiveObjectName(),
147 1
                'updatedAt',
148
                $olderThan);
149
    }
150 2
151
    public function getJobCount($workerName = null, $method = null)
152
    {
153 2
        /** @var EntityManager $objectManager */
154 2
        $objectManager = $this->getObjectManager();
155
        $qb = $objectManager->createQueryBuilder();
156 2
157
        $qb = $qb->select('count(j)')->from($this->getObjectName(), 'j');
158 2
159 2
        $where = 'where';
160
        if (null !== $workerName) {
161
            if (null !== $method) {
162
                $qb->where($qb->expr()->andX(
163
                    $qb->expr()->eq('j.workerName', ':workerName'),
164
                                                $qb->expr()->eq('j.method', ':method')
165
                ))
166
                    ->setParameter(':method', $method);
167
            } else {
168
                $qb->where('j.workerName = :workerName');
169
            }
170
            $qb->setParameter(':workerName', $workerName);
171 2
            $where = 'andWhere';
172
        } elseif (null !== $method) {
173
            $qb->where('j.method = :method')->setParameter(':method', $method);
174
            $where = 'andWhere';
175
        }
176 2
177
        $dateTime = new \DateTime();
178
        // Filter
179 2
        $qb
180 2
            ->$where($qb->expr()->orX(
181 2
                $qb->expr()->isNull('j.whenAt'),
182
                                        $qb->expr()->lte('j.whenAt', ':whenAt')
183 2
            ))
184 2
            ->andWhere($qb->expr()->orX(
185 2
                $qb->expr()->isNull('j.expiresAt'),
186
                $qb->expr()->gt('j.expiresAt', ':expiresAt')
187 2
            ))
188 2
            ->andWhere('j.locked is NULL')
189 2
            ->setParameter(':whenAt', $dateTime)
190
            ->setParameter(':expiresAt', $dateTime);
191 2
192
        $query = $qb->getQuery();
193 2
194
        return $query->getSingleScalarResult();
195
    }
196
197
    /**
198
     * For ORM it's prudent to wrap things in a transaction.
199
     *
200
     * @param $i
201
     * @param $count
202
     * @param array $stalledJobs
203
     * @param $countProcessed
204 1
     */
205
    protected function runStalledLoop($i, $count, array $stalledJobs, &$countProcessed)
206
    {
207 1
        /** @var EntityManager $objectManager */
208
        $objectManager = $this->getObjectManager();
209 1
        try {
210 1
            $objectManager->beginTransaction();
211 1
            parent::runStalledLoop($i, $count, $stalledJobs, $countProcessed);
212
            $objectManager->commit();
213
        } catch (\Exception $exception) {
214
            $objectManager->rollback();
215
216
            // Try again
217
            parent::runStalledLoop($i, $count, $stalledJobs, $countProcessed);
218 1
        }
219
    }
220
221
    /**
222
     * Get Jobs statuses.
223 2
     */
224
    public function getStatus()
225 2
    {
226 2
        $result = [];
227 2
        $this->getStatusByEntityName($this->getObjectName(), $result);
228
        $this->getStatusByEntityName($this->getArchiveObjectName(), $result);
229 2
230 2
        $finalResult = [];
231 1
        foreach ($result as $key => $item) {
232 1
            ksort($item);
233 1
            foreach ($item as $status => $count) {
234
                if (isset($finalResult[$key][$status])) {
235
                    $finalResult[$key][$status] += $count;
236 1
                } else {
237
                    $finalResult[$key][$status] = $count;
238
                }
239
            }
240
        }
241 2
242
        return $finalResult;
243
    }
244
245
    /**
246
     * @param string $entityName
247 2
     */
248
    protected function getStatusByEntityName($entityName, array &$result)
249
    {
250 2
        /** @var EntityManager $objectManager */
251 2
        $objectManager = $this->getObjectManager();
252 2
        $result1 = $objectManager->getRepository($entityName)->createQueryBuilder('j')->select('j.workerName, j.method, j.status, count(j) as c')
253
            ->groupBy('j.workerName, j.method, j.status')->getQuery()->getArrayResult();
254 2
255 1
        foreach ($result1 as $item) {
256 1
            $method = $item['workerName'].'->'.$item['method'].'()';
257 1
            if (!isset($result[$method])) {
258
                $result[$method] = [BaseJob::STATUS_NEW => 0,
259
                    BaseJob::STATUS_RUNNING => 0,
260
                    RetryableJob::STATUS_EXPIRED => 0,
261
                    RetryableJob::STATUS_MAX_ERROR => 0,
262
                    RetryableJob::STATUS_MAX_STALLED => 0,
263
                    RetryableJob::STATUS_MAX_RETRIES => 0,
264
                    BaseJob::STATUS_SUCCESS => 0,
265
                    BaseJob::STATUS_ERROR => 0, ];
266 1
            }
267
            $result[$method][$item['status']] += intval($item['c']);
268 2
        }
269
    }
270
271
    /**
272
     * Get the next job to run (can be filtered by workername and method name).
273
     *
274
     * @param string $workerName
275
     * @param string $methodName
276
     * @param bool   $prioritize
277
     *
278
     * @return Job|null
279 6
     */
280
    public function getJob($workerName = null, $methodName = null, $prioritize = true, $runId = null)
281 6
    {
282 6
        /** @var EntityManager $objectManager */
283
        $objectManager = $this->getObjectManager();
284
285 6
        $objectManager->beginTransaction();
286
287 6
        /** @var EntityRepository $repository */
288
        $repository = $this->getRepository();
289
        $qb = $repository->createQueryBuilder('j');
290 6
        $dateTime = new \DateTime();
291 6
        $qb
292 6
            ->select('j')
293
            ->where('j.status = :status')->setParameter(':status', BaseJob::STATUS_NEW)
294 6
            ->andWhere('j.locked is NULL')
295 6
            ->andWhere($qb->expr()->orX(
296 6
                $qb->expr()->isNull('j.whenAt'),
297 6
                        $qb->expr()->lte('j.whenAt', ':whenAt')
298 6
            ))
299 6
            ->andWhere($qb->expr()->orX(
300
                $qb->expr()->isNull('j.expiresAt'),
301 6
                        $qb->expr()->gt('j.expiresAt', ':expiresAt')
302 6
            ))
303 6
            ->setParameter(':whenAt', $dateTime)
304
            ->setParameter(':expiresAt', $dateTime);
305 6
306 6
        $this->addWorkerNameCriterion($qb, $workerName, $methodName);
307
308 6
        if ($prioritize) {
309
            $qb->add('orderBy', 'j.priority DESC, j.whenAt ASC');
0 ignored issues
show
Documentation introduced by
'j.priority DESC, j.whenAt ASC' is of type string, but the function expects a object<Doctrine\ORM\Query\Expr\Base>.

It seems like the type of the argument is not accepted by the function/method which you are calling.

In some cases, in particular if PHP’s automatic type-juggling kicks in this might be fine. In other cases, however this might be a bug.

We suggest to add an explicit type cast like in the following example:

function acceptsInteger($int) { }

$x = '123'; // string "123"

// Instead of
acceptsInteger($x);

// we recommend to use
acceptsInteger((integer) $x);
Loading history...
310 6
        } else {
311 6
            $qb->orderBy('j.whenAt', 'ASC');
312
        }
313
        $qb->setMaxResults(1);
314
315 6
        /** @var QueryBuilder $qb */
316
        $query = $qb->getQuery();
317
        $query->setLockMode(LockMode::PESSIMISTIC_WRITE);
318 6
        $jobs = $query->getResult();
319 6
320 6
        if ($jobs) {
321
            /** @var Job $job */
322 6
            $job = $jobs[0];
323
            $job->setLocked(true);
324 5
            $job->setLockedAt(new \DateTime());
325 5
            $job->setStatus(BaseJob::STATUS_RUNNING);
326
            $job->setRunId($runId);
327
            $objectManager->commit();
328 5
            $objectManager->flush();
329 5
            return $job;
330 5
        }
331 5
332 5
        $objectManager->rollback();
333 5
334
        return null;
335 5
    }
336
337
    /**
338 3
     * Tries to update the nearest job as a batch.
339
     *
340 3
     * @param \Dtc\QueueBundle\Model\Job $job
341
     *
342
     * @return mixed|null
343
     */
344
    public function updateNearestBatch(\Dtc\QueueBundle\Model\Job $job)
345
    {
346
        $oldJob = null;
347
        do {
348
            try {
349
                /** @var EntityManager $entityManager */
350 1
                $entityManager = $this->getObjectManager();
351
                $entityManager->beginTransaction();
352 1
353
                /** @var QueryBuilder $queryBuilder */
354
                $queryBuilder = $this->getRepository()->createQueryBuilder('j');
355
                $queryBuilder->select()
356 1
                    ->where('j.crcHash = :crcHash')
357 1
                    ->andWhere('j.status = :status')
358
                    ->setParameter(':status', BaseJob::STATUS_NEW)
359
                    ->setParameter(':crcHash', $job->getCrcHash())
360 1
                    ->orderBy('j.whenAt', 'ASC')
361 1
                    ->setMaxResults(1);
362 1
                $oldJob = $queryBuilder->getQuery()->getSingleResult();
363 1
364 1
                if (!$oldJob) {
365 1
                    return null;
366 1
                }
367 1
368 1
                $oldJob->setPriority(max($job->getPriority(), $oldJob->getPriority()));
369
                $oldJob->setWhenAt(min($job->getWhenAt(), $oldJob->getWhenAt()));
370 1
371
                $entityManager->persist($oldJob);
372
                $entityManager->commit();
373
                $this->flush();
374 1
            } catch (\Exception $exception) {
375 1
                $entityManager->rollback();
376
            }
377 1
        } while (null === $oldJob);
378 1
379 1
        return $oldJob;
380
    }
381
}
382