Completed
Push — master ( cf2de2...e02161 )
by Matthew
10:11
created

JobManager   B

Complexity

Total Complexity 38

Size/Duplication

Total Lines 366
Duplicated Lines 0 %

Coupling/Cohesion

Components 1
Dependencies 10

Test Coverage

Coverage 91%

Importance

Changes 0
Metric Value
wmc 38
lcom 1
cbo 10
dl 0
loc 366
ccs 182
cts 200
cp 0.91
rs 8.3999
c 0
b 0
f 0

13 Methods

Rating   Name   Duplication   Size   Complexity  
B countJobsByStatus() 0 30 4
A pruneErroneousJobs() 0 13 1
B resetSaveOk() 0 23 4
A addWorkerNameCriterion() 0 10 3
A updateExpired() 0 17 1
A pruneArchivedJobs() 0 6 1
B getJobCount() 0 45 4
A getStatus() 0 20 4
B getJob() 0 37 2
B takeJob() 0 32 4
B updateNearestBatch() 0 26 2
B updateBatchJob() 0 29 5
A getStatusByEntityName() 0 22 3
1
<?php
2
3
namespace Dtc\QueueBundle\ORM;
4
5
use Doctrine\ORM\EntityManager;
6
use Doctrine\ORM\EntityRepository;
7
use Doctrine\ORM\QueryBuilder;
8
use Dtc\QueueBundle\Doctrine\BaseJobManager;
9
use Dtc\QueueBundle\Entity\Job;
10
use Dtc\QueueBundle\Model\BaseJob;
11
use Dtc\QueueBundle\Model\RetryableJob;
12
use Symfony\Component\Process\Exception\LogicException;
13
14
class JobManager extends BaseJobManager
15
{
16
    use CommonTrait;
17
    protected static $saveInsertCalled = null;
18
    protected static $resetInsertCalled = null;
19
20 3
    public function countJobsByStatus($objectName, $status, $workerName = null, $method = null)
21
    {
22
        /** @var EntityManager $objectManager */
23 3
        $objectManager = $this->getObjectManager();
24
25
        $queryBuilder = $objectManager
26 3
            ->createQueryBuilder()
27 3
            ->select('count(a.id)')
28 3
            ->from($objectName, 'a')
29 3
            ->where('a.status = :status');
30
31 3
        if (null !== $workerName) {
32 1
            $queryBuilder->andWhere('a.workerName = :workerName')
33 1
                ->setParameter(':workerName', $workerName);
34
        }
35
36 3
        if (null !== $method) {
37 1
            $queryBuilder->andWhere('a.method = :method')
38 1
                ->setParameter(':method', $workerName);
39
        }
40
41 3
        $count = $queryBuilder->setParameter(':status', $status)
42 3
            ->getQuery()->getSingleScalarResult();
43
44 3
        if (!$count) {
45 1
            return 0;
46
        }
47
48 3
        return $count;
49
    }
50
51
    /**
52
     * @param string|null $workerName
53
     * @param string|null $method
54
     *
55
     * @return int Count of jobs pruned
56
     */
57 1
    public function pruneErroneousJobs($workerName = null, $method = null)
58
    {
59
        /** @var EntityManager $objectManager */
60 1
        $objectManager = $this->getObjectManager();
61 1
        $queryBuilder = $objectManager->createQueryBuilder()->delete($this->getArchiveObjectName(), 'j');
62 1
        $queryBuilder->where('j.status = :status')
63 1
            ->setParameter(':status', BaseJob::STATUS_ERROR);
64
65 1
        $this->addWorkerNameCriterion($queryBuilder, $workerName, $method);
66 1
        $query = $queryBuilder->getQuery();
67
68 1
        return intval($query->execute());
69
    }
70
71 15
    protected function resetSaveOk($function)
72
    {
73 15
        $objectManager = $this->getObjectManager();
74 15
        $splObjectHash = spl_object_hash($objectManager);
75
76 15
        if ('save' === $function) {
77
            $compare = static::$resetInsertCalled;
78
        } else {
79 15
            $compare = static::$saveInsertCalled;
80
        }
81
82 15
        if ($splObjectHash === $compare) {
83
            // Insert SQL is cached...
84
            $msg = "Can't call save and reset within the same process cycle (or using the same EntityManager)";
85
            throw new LogicException($msg);
86
        }
87
88 15
        if ('save' === $function) {
89
            static::$saveInsertCalled = spl_object_hash($objectManager);
90
        } else {
91 15
            static::$resetInsertCalled = spl_object_hash($objectManager);
92
        }
93 15
    }
94
95
    /**
96
     * @param string $workerName
97
     * @param string $method
98
     */
99 9
    protected function addWorkerNameCriterion(QueryBuilder $queryBuilder, $workerName = null, $method = null)
100
    {
101 9
        if (null !== $workerName) {
102 3
            $queryBuilder->andWhere('j.workerName = :workerName')->setParameter(':workerName', $workerName);
103
        }
104
105 9
        if (null !== $method) {
106 2
            $queryBuilder->andWhere('j.method = :method')->setParameter(':method', $method);
107
        }
108 9
    }
109
110 1
    protected function updateExpired($workerName = null, $method = null)
111
    {
112
        /** @var EntityManager $objectManager */
113 1
        $objectManager = $this->getObjectManager();
114 1
        $queryBuilder = $objectManager->createQueryBuilder()->update($this->getObjectName(), 'j');
115 1
        $queryBuilder->set('j.status', ':newStatus');
116 1
        $queryBuilder->where('j.expiresAt <= :expiresAt')
117 1
            ->setParameter(':expiresAt', new \DateTime());
118 1
        $queryBuilder->andWhere('j.status = :status')
119 1
            ->setParameter(':status', BaseJob::STATUS_NEW)
120 1
            ->setParameter(':newStatus', Job::STATUS_EXPIRED);
121
122 1
        $this->addWorkerNameCriterion($queryBuilder, $workerName, $method);
123 1
        $query = $queryBuilder->getQuery();
124
125 1
        return intval($query->execute());
126
    }
127
128
    /**
129
     * Removes archived jobs older than $olderThan.
130
     *
131
     * @param \DateTime $olderThan
132
     */
133 1
    public function pruneArchivedJobs(\DateTime $olderThan)
134
    {
135 1
        return $this->removeOlderThan($this->getArchiveObjectName(),
136 1
                'updatedAt',
137 1
                $olderThan);
138
    }
139
140 2
    public function getJobCount($workerName = null, $method = null)
141
    {
142
        /** @var EntityManager $objectManager */
143 2
        $objectManager = $this->getObjectManager();
144 2
        $queryBuilder = $objectManager->createQueryBuilder();
145
146 2
        $queryBuilder = $queryBuilder->select('count(j)')->from($this->getObjectName(), 'j');
147
148 2
        $where = 'where';
149 2
        if (null !== $workerName) {
150
            if (null !== $method) {
151
                $queryBuilder->where($queryBuilder->expr()->andX(
152
                    $queryBuilder->expr()->eq('j.workerName', ':workerName'),
153
                                                $queryBuilder->expr()->eq('j.method', ':method')
154
                ))
155
                    ->setParameter(':method', $method);
156
            } else {
157
                $queryBuilder->where('j.workerName = :workerName');
158
            }
159
            $queryBuilder->setParameter(':workerName', $workerName);
160
            $where = 'andWhere';
161 2
        } elseif (null !== $method) {
162
            $queryBuilder->where('j.method = :method')->setParameter(':method', $method);
163
            $where = 'andWhere';
164
        }
165
166 2
        $dateTime = new \DateTime();
167
        // Filter
168
        $queryBuilder
169 2
            ->$where($queryBuilder->expr()->orX(
170 2
                $queryBuilder->expr()->isNull('j.whenAt'),
171 2
                                        $queryBuilder->expr()->lte('j.whenAt', ':whenAt')
172
            ))
173 2
            ->andWhere($queryBuilder->expr()->orX(
174 2
                $queryBuilder->expr()->isNull('j.expiresAt'),
175 2
                $queryBuilder->expr()->gt('j.expiresAt', ':expiresAt')
176
            ))
177 2
            ->andWhere('j.locked is NULL')
178 2
            ->setParameter(':whenAt', $dateTime)
179 2
            ->setParameter(':expiresAt', $dateTime);
180
181 2
        $query = $queryBuilder->getQuery();
182
183 2
        return $query->getSingleScalarResult();
184
    }
185
186
    /**
187
     * Get Jobs statuses.
188
     */
189 2
    public function getStatus()
190
    {
191 2
        $result = [];
192 2
        $this->getStatusByEntityName($this->getObjectName(), $result);
193 2
        $this->getStatusByEntityName($this->getArchiveObjectName(), $result);
194
195 2
        $finalResult = [];
196 2
        foreach ($result as $key => $item) {
197 1
            ksort($item);
198 1
            foreach ($item as $status => $count) {
199 1
                if (isset($finalResult[$key][$status])) {
200
                    $finalResult[$key][$status] += $count;
201
                } else {
202 1
                    $finalResult[$key][$status] = $count;
203
                }
204
            }
205
        }
206
207 2
        return $finalResult;
208
    }
209
210
    /**
211
     * @param string $entityName
212
     */
213 2
    protected function getStatusByEntityName($entityName, array &$result)
214
    {
215
        /** @var EntityManager $objectManager */
216 2
        $objectManager = $this->getObjectManager();
217 2
        $result1 = $objectManager->getRepository($entityName)->createQueryBuilder('j')->select('j.workerName, j.method, j.status, count(j) as c')
218 2
            ->groupBy('j.workerName, j.method, j.status')->getQuery()->getArrayResult();
219
220 2
        foreach ($result1 as $item) {
221 1
            $method = $item['workerName'].'->'.$item['method'].'()';
222 1
            if (!isset($result[$method])) {
223 1
                $result[$method] = [BaseJob::STATUS_NEW => 0,
224 1
                    BaseJob::STATUS_RUNNING => 0,
225 1
                    RetryableJob::STATUS_EXPIRED => 0,
226 1
                    RetryableJob::STATUS_MAX_ERROR => 0,
227 1
                    RetryableJob::STATUS_MAX_STALLED => 0,
228 1
                    RetryableJob::STATUS_MAX_RETRIES => 0,
229 1
                    BaseJob::STATUS_SUCCESS => 0,
230 1
                    BaseJob::STATUS_ERROR => 0, ];
231
            }
232 1
            $result[$method][$item['status']] += intval($item['c']);
233
        }
234 2
    }
235
236
    /**
237
     * Get the next job to run (can be filtered by workername and method name).
238
     *
239
     * @param string $workerName
240
     * @param string $methodName
241
     * @param bool   $prioritize
242
     *
243
     * @return Job|null
244
     */
245 7
    public function getJob($workerName = null, $methodName = null, $prioritize = true, $runId = null)
246
    {
247
        /** @var EntityRepository $repository */
248 7
        $repository = $this->getRepository();
249 7
        $queryBuilder = $repository->createQueryBuilder('j');
250 7
        $dateTime = new \DateTime();
251
        $queryBuilder
252 7
            ->select('j.id')
253 7
            ->where('j.status = :status')->setParameter(':status', BaseJob::STATUS_NEW)
254 7
            ->andWhere('j.locked is NULL')
255 7
            ->andWhere($queryBuilder->expr()->orX(
256 7
                $queryBuilder->expr()->isNull('j.whenAt'),
257 7
                        $queryBuilder->expr()->lte('j.whenAt', ':whenAt')
258
            ))
259 7
            ->andWhere($queryBuilder->expr()->orX(
260 7
                $queryBuilder->expr()->isNull('j.expiresAt'),
261 7
                        $queryBuilder->expr()->gt('j.expiresAt', ':expiresAt')
262
            ))
263 7
            ->setParameter(':whenAt', $dateTime)
264 7
            ->setParameter(':expiresAt', $dateTime);
265
266 7
        $this->addWorkerNameCriterion($queryBuilder, $workerName, $methodName);
267
268 7
        if ($prioritize) {
269 7
            $queryBuilder->addOrderBy('j.priority', 'DESC');
270 7
            $queryBuilder->addOrderBy('j.whenAt', 'ASC');
271
        } else {
272 1
            $queryBuilder->orderBy('j.whenAt', 'ASC');
273
        }
274 7
        $queryBuilder->setMaxResults(1);
275
276
        /** @var QueryBuilder $queryBuilder */
277 7
        $query = $queryBuilder->getQuery();
278 7
        $jobs = $query->getResult();
279
280 7
        return $this->takeJob($jobs, $runId);
281
    }
282
283 7
    protected function takeJob($jobs, $runId = null)
284
    {
285 7
        if (isset($jobs[0]['id'])) {
286
            /** @var EntityRepository $repository */
287 6
            $repository = $this->getRepository();
288
            /** @var QueryBuilder $queryBuilder */
289 6
            $queryBuilder = $repository->createQueryBuilder('j');
290
            $queryBuilder
291 6
                ->update()
292 6
                ->set('j.locked', ':locked')
293 6
                ->setParameter(':locked', true)
294 6
                ->set('j.lockedAt', ':lockedAt')
295 6
                ->setParameter(':lockedAt', new \DateTime())
296 6
                ->set('j.status', ':status')
297 6
                ->setParameter(':status', BaseJob::STATUS_RUNNING);
298 6
            if (null !== $runId) {
299
                $queryBuilder
300
                    ->set('j.runId', ':runId')
301
                    ->setParameter(':runId', $runId);
302
            }
303 6
            $queryBuilder->where('j.id = :id');
304 6
            $queryBuilder->andWhere('j.locked is NULL');
305 6
            $queryBuilder->setParameter(':id', $jobs[0]['id']);
306 6
            $resultCount = $queryBuilder->getQuery()->execute();
307
308 6
            if (1 === $resultCount) {
309 6
                return $repository->find($jobs[0]['id']);
310
            }
311
        }
312
313 3
        return null;
314
    }
315
316
    /**
317
     * Tries to update the nearest job as a batch.
318
     *
319
     * @param \Dtc\QueueBundle\Model\Job $job
320
     *
321
     * @return mixed|null
322
     */
323 1
    public function updateNearestBatch(\Dtc\QueueBundle\Model\Job $job)
324
    {
325
        /** @var QueryBuilder $queryBuilder */
326 1
        $queryBuilder = $this->getRepository()->createQueryBuilder('j');
327 1
        $queryBuilder->select()
328 1
            ->where('j.crcHash = :crcHash')
329 1
            ->andWhere('j.status = :status')
330 1
            ->setParameter(':status', BaseJob::STATUS_NEW)
331 1
            ->setParameter(':crcHash', $job->getCrcHash())
332 1
            ->orderBy('j.whenAt', 'ASC')
333 1
            ->setMaxResults(1);
334 1
        $existingJobs = $queryBuilder->getQuery()->execute();
335
336 1
        if (empty($existingJobs)) {
337
            return null;
338
        }
339
        /** @var Job $existingJob */
340 1
        $existingJob = $existingJobs[0];
341
342 1
        $newPriority = max($job->getPriority(), $existingJob->getPriority());
343 1
        $newWhenAt = min($job->getWhenAt(), $existingJob->getWhenAt());
344
345 1
        $this->updateBatchJob($existingJob, $newPriority, $newWhenAt);
346
347 1
        return $existingJob;
348
    }
349
350 1
    protected function updateBatchJob(Job $existingJob, $newPriority, $newWhenAt)
351
    {
352 1
        $existingPriority = $existingJob->getPriority();
353 1
        $existingWhenAt = $existingJob->getWhenAt();
354
355 1
        if ($newPriority !== $existingPriority || $newWhenAt !== $existingWhenAt) {
356
            /** @var EntityRepository $repository */
357 1
            $repository = $this->getRepository();
358
            /** @var QueryBuilder $queryBuilder */
359 1
            $queryBuilder = $repository->createQueryBuilder('j');
360 1
            $queryBuilder->update();
361 1
            if ($newPriority !== $existingPriority) {
362 1
                $existingJob->setPriority($newPriority);
363 1
                $queryBuilder->set('j.priority', ':priority')
364 1
                    ->setParameter(':priority', $newPriority);
365
            }
366 1
            if ($newWhenAt !== $existingWhenAt) {
367 1
                $existingJob->setWhenAt($newWhenAt);
368 1
                $queryBuilder->set('j.whenAt', ':whenAt')
369 1
                    ->setParameter(':whenAt', $newWhenAt);
370
            }
371 1
            $queryBuilder->where('j.id = :id');
372 1
            $queryBuilder->andWhere('j.locked is NULL');
373 1
            $queryBuilder->setParameter(':id', $existingJob->getId());
374 1
            $queryBuilder->getQuery()->execute();
375
        }
376
377 1
        return $existingJob;
378
    }
379
}
380