Completed
Push — master ( e02161...971421 )
by Matthew
12:50
created

JobManager::getJobQueryBuilder()   B

Complexity

Conditions 2
Paths 2

Size

Total Lines 32
Code Lines 23

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 0
CRAP Score 6

Importance

Changes 0
Metric Value
dl 0
loc 32
ccs 0
cts 24
cp 0
rs 8.8571
c 0
b 0
f 0
cc 2
eloc 23
nc 2
nop 3
crap 6
1
<?php
2
3
namespace Dtc\QueueBundle\ORM;
4
5
use Doctrine\ORM\EntityManager;
6
use Doctrine\ORM\EntityRepository;
7
use Doctrine\ORM\QueryBuilder;
8
use Dtc\QueueBundle\Doctrine\BaseJobManager;
9
use Dtc\QueueBundle\Entity\Job;
10
use Dtc\QueueBundle\Model\BaseJob;
11
use Dtc\QueueBundle\Model\RetryableJob;
12
use Symfony\Component\Process\Exception\LogicException;
13
14
class JobManager extends BaseJobManager
15
{
16
    use CommonTrait;
17
    protected static $saveInsertCalled = null;
18
    protected static $resetInsertCalled = null;
19
20
    public function countJobsByStatus($objectName, $status, $workerName = null, $method = null)
21
    {
22
        /** @var EntityManager $objectManager */
23
        $objectManager = $this->getObjectManager();
24
25
        $queryBuilder = $objectManager
26
            ->createQueryBuilder()
27
            ->select('count(a.id)')
28
            ->from($objectName, 'a')
29
            ->where('a.status = :status');
30
31
        if (null !== $workerName) {
32
            $queryBuilder->andWhere('a.workerName = :workerName')
33
                ->setParameter(':workerName', $workerName);
34
        }
35
36
        if (null !== $method) {
37
            $queryBuilder->andWhere('a.method = :method')
38
                ->setParameter(':method', $workerName);
39
        }
40
41
        $count = $queryBuilder->setParameter(':status', $status)
42
            ->getQuery()->getSingleScalarResult();
43
44
        if (!$count) {
45
            return 0;
46
        }
47
48
        return $count;
49
    }
50
51
    /**
52
     * @param string|null $workerName
53
     * @param string|null $method
54
     *
55
     * @return int Count of jobs pruned
56
     */
57
    public function pruneErroneousJobs($workerName = null, $method = null)
58
    {
59
        /** @var EntityManager $objectManager */
60
        $objectManager = $this->getObjectManager();
61
        $queryBuilder = $objectManager->createQueryBuilder()->delete($this->getArchiveObjectName(), 'j');
62
        $queryBuilder->where('j.status = :status')
63
            ->setParameter(':status', BaseJob::STATUS_ERROR);
64
65
        $this->addWorkerNameCriterion($queryBuilder, $workerName, $method);
66
        $query = $queryBuilder->getQuery();
67
68
        return intval($query->execute());
69
    }
70
71 1
    protected function resetSaveOk($function)
72 1
    {
73
        $objectManager = $this->getObjectManager();
74
        $splObjectHash = spl_object_hash($objectManager);
75
76
        if ('save' === $function) {
77
            $compare = static::$resetInsertCalled;
78
        } else {
79
            $compare = static::$saveInsertCalled;
80
        }
81
82
        if ($splObjectHash === $compare) {
83
            // Insert SQL is cached...
84
            $msg = "Can't call save and reset within the same process cycle (or using the same EntityManager)";
85
            throw new LogicException($msg);
86
        }
87
88
        if ('save' === $function) {
89
            static::$saveInsertCalled = spl_object_hash($objectManager);
90
        } else {
91
            static::$resetInsertCalled = spl_object_hash($objectManager);
92
        }
93
    }
94
95
    /**
96
     * @param string $workerName
97
     * @param string $method
98
     */
99
    protected function addWorkerNameCriterion(QueryBuilder $queryBuilder, $workerName = null, $method = null)
100
    {
101
        if (null !== $workerName) {
102
            $queryBuilder->andWhere('j.workerName = :workerName')->setParameter(':workerName', $workerName);
103
        }
104
105
        if (null !== $method) {
106
            $queryBuilder->andWhere('j.method = :method')->setParameter(':method', $method);
107
        }
108
    }
109
110
    protected function updateExpired($workerName = null, $method = null)
111
    {
112
        /** @var EntityManager $objectManager */
113
        $objectManager = $this->getObjectManager();
114
        $queryBuilder = $objectManager->createQueryBuilder()->update($this->getObjectName(), 'j');
115
        $queryBuilder->set('j.status', ':newStatus');
116
        $queryBuilder->where('j.expiresAt <= :expiresAt')
117
            ->setParameter(':expiresAt', new \DateTime());
118
        $queryBuilder->andWhere('j.status = :status')
119
            ->setParameter(':status', BaseJob::STATUS_NEW)
120
            ->setParameter(':newStatus', Job::STATUS_EXPIRED);
121
122
        $this->addWorkerNameCriterion($queryBuilder, $workerName, $method);
123
        $query = $queryBuilder->getQuery();
124
125
        return intval($query->execute());
126
    }
127
128
    /**
129
     * Removes archived jobs older than $olderThan.
130
     *
131
     * @param \DateTime $olderThan
132
     */
133
    public function pruneArchivedJobs(\DateTime $olderThan)
134
    {
135
        return $this->removeOlderThan($this->getArchiveObjectName(),
136
                'updatedAt',
137
                $olderThan);
138
    }
139
140 1
    public function getJobCount($workerName = null, $method = null)
141
    {
142
        /** @var EntityManager $objectManager */
143 1
        $objectManager = $this->getObjectManager();
144 1
        $queryBuilder = $objectManager->createQueryBuilder();
145
146 1
        $queryBuilder = $queryBuilder->select('count(j)')->from($this->getObjectName(), 'j');
147
148 1
        $where = 'where';
149 1
        if (null !== $workerName) {
150
            if (null !== $method) {
151
                $queryBuilder->where($queryBuilder->expr()->andX(
152
                    $queryBuilder->expr()->eq('j.workerName', ':workerName'),
153
                                                $queryBuilder->expr()->eq('j.method', ':method')
154
                ))
155
                    ->setParameter(':method', $method);
156
            } else {
157
                $queryBuilder->where('j.workerName = :workerName');
158
            }
159
            $queryBuilder->setParameter(':workerName', $workerName);
160
            $where = 'andWhere';
161 1
        } elseif (null !== $method) {
162
            $queryBuilder->where('j.method = :method')->setParameter(':method', $method);
163
            $where = 'andWhere';
164
        }
165
166 1
        $dateTime = new \DateTime();
167
        // Filter
168
        $queryBuilder
169 1
            ->$where($queryBuilder->expr()->orX(
170 1
                $queryBuilder->expr()->isNull('j.whenAt'),
171 1
                                        $queryBuilder->expr()->lte('j.whenAt', ':whenAt')
172 1
            ))
173 1
            ->andWhere($queryBuilder->expr()->orX(
174 1
                $queryBuilder->expr()->isNull('j.expiresAt'),
175 1
                $queryBuilder->expr()->gt('j.expiresAt', ':expiresAt')
176 1
            ))
177 1
            ->andWhere('j.locked is NULL')
178 1
            ->setParameter(':whenAt', $dateTime)
179 1
            ->setParameter(':expiresAt', $dateTime);
180
181 1
        $query = $queryBuilder->getQuery();
182
183 1
        return $query->getSingleScalarResult();
184
    }
185
186
    /**
187
     * Get Jobs statuses.
188
     */
189 1
    public function getStatus()
190
    {
191 1
        $result = [];
192 1
        $this->getStatusByEntityName($this->getObjectName(), $result);
193 1
        $this->getStatusByEntityName($this->getArchiveObjectName(), $result);
194
195 1
        $finalResult = [];
196 1
        foreach ($result as $key => $item) {
197
            ksort($item);
198
            foreach ($item as $status => $count) {
199
                if (isset($finalResult[$key][$status])) {
200
                    $finalResult[$key][$status] += $count;
201
                } else {
202
                    $finalResult[$key][$status] = $count;
203
                }
204
            }
205 1
        }
206
207 1
        return $finalResult;
208
    }
209
210
    /**
211
     * @param string $entityName
212
     */
213 1
    protected function getStatusByEntityName($entityName, array &$result)
214
    {
215
        /** @var EntityManager $objectManager */
216 1
        $objectManager = $this->getObjectManager();
217 1
        $result1 = $objectManager->getRepository($entityName)->createQueryBuilder('j')->select('j.workerName, j.method, j.status, count(j) as c')
218 1
            ->groupBy('j.workerName, j.method, j.status')->getQuery()->getArrayResult();
219
220 1
        foreach ($result1 as $item) {
221
            $method = $item['workerName'].'->'.$item['method'].'()';
222
            if (!isset($result[$method])) {
223
                $result[$method] = [BaseJob::STATUS_NEW => 0,
224
                    BaseJob::STATUS_RUNNING => 0,
225
                    RetryableJob::STATUS_EXPIRED => 0,
226
                    RetryableJob::STATUS_MAX_ERROR => 0,
227
                    RetryableJob::STATUS_MAX_STALLED => 0,
228
                    RetryableJob::STATUS_MAX_RETRIES => 0,
229
                    BaseJob::STATUS_SUCCESS => 0,
230
                    BaseJob::STATUS_ERROR => 0, ];
231
            }
232
            $result[$method][$item['status']] += intval($item['c']);
233 1
        }
234 1
    }
235
236
    /**
237
     * Get the next job to run (can be filtered by workername and method name).
238
     *
239
     * @param string $workerName
240
     * @param string $methodName
241
     * @param bool   $prioritize
242
     *
243
     * @return Job|null
244
     */
245
    public function getJob($workerName = null, $methodName = null, $prioritize = true, $runId = null)
246
    {
247
        $queryBuilder = $this->getJobQueryBuilder($workerName, $methodName, $prioritize);
0 ignored issues
show
Bug introduced by
It seems like $workerName defined by parameter $workerName on line 245 can also be of type string; however, Dtc\QueueBundle\ORM\JobM...r::getJobQueryBuilder() does only seem to accept null, maybe add an additional type check?

This check looks at variables that have been passed in as parameters and are passed out again to other methods.

If the outgoing method call has stricter type requirements than the method itself, an issue is raised.

An additional type check may prevent trouble.

Loading history...
Bug introduced by
It seems like $methodName defined by parameter $methodName on line 245 can also be of type string; however, Dtc\QueueBundle\ORM\JobM...r::getJobQueryBuilder() does only seem to accept null, maybe add an additional type check?

This check looks at variables that have been passed in as parameters and are passed out again to other methods.

If the outgoing method call has stricter type requirements than the method itself, an issue is raised.

An additional type check may prevent trouble.

Loading history...
248
        $queryBuilder->setMaxResults(1);
249
250
        /** @var QueryBuilder $queryBuilder */
251
        $query = $queryBuilder->getQuery();
252
        $jobs = $query->getResult();
253
254
        return $this->takeJob($jobs, $runId);
255
    }
256
257
    /**
258
     * @param null $workerName
259
     * @param null $methodName
260
     * @param bool $prioritize
261
     *
262
     * @return QueryBuilder
263
     */
264
    public function getJobQueryBuilder($workerName = null, $methodName = null, $prioritize = true)
265
    {
266
        /** @var EntityRepository $repository */
267
        $repository = $this->getRepository();
268
        $queryBuilder = $repository->createQueryBuilder('j');
269
        $dateTime = new \DateTime();
270
        $queryBuilder
271
            ->select('j.id')
272
            ->where('j.status = :status')->setParameter(':status', BaseJob::STATUS_NEW)
273
            ->andWhere('j.locked is NULL')
274
            ->andWhere($queryBuilder->expr()->orX(
275
                $queryBuilder->expr()->isNull('j.whenAt'),
276
                $queryBuilder->expr()->lte('j.whenAt', ':whenAt')
277
            ))
278
            ->andWhere($queryBuilder->expr()->orX(
279
                $queryBuilder->expr()->isNull('j.expiresAt'),
280
                $queryBuilder->expr()->gt('j.expiresAt', ':expiresAt')
281
            ))
282
            ->setParameter(':whenAt', $dateTime)
283
            ->setParameter(':expiresAt', $dateTime);
284
285
        $this->addWorkerNameCriterion($queryBuilder, $workerName, $methodName);
286
287
        if ($prioritize) {
288
            $queryBuilder->addOrderBy('j.priority', 'DESC');
289
            $queryBuilder->addOrderBy('j.whenAt', 'ASC');
290
        } else {
291
            $queryBuilder->orderBy('j.whenAt', 'ASC');
292
        }
293
294
        return $queryBuilder;
295
    }
296
297
    protected function takeJob($jobs, $runId = null)
298
    {
299
        if (isset($jobs[0]['id'])) {
300
            /** @var EntityRepository $repository */
301
            $repository = $this->getRepository();
302
            /** @var QueryBuilder $queryBuilder */
303
            $queryBuilder = $repository->createQueryBuilder('j');
304
            $queryBuilder
305
                ->update()
306
                ->set('j.locked', ':locked')
307
                ->setParameter(':locked', true)
308
                ->set('j.lockedAt', ':lockedAt')
309
                ->setParameter(':lockedAt', new \DateTime())
310
                ->set('j.status', ':status')
311
                ->setParameter(':status', BaseJob::STATUS_RUNNING);
312
            if (null !== $runId) {
313
                $queryBuilder
314
                    ->set('j.runId', ':runId')
315
                    ->setParameter(':runId', $runId);
316
            }
317
            $queryBuilder->where('j.id = :id');
318
            $queryBuilder->andWhere('j.locked is NULL');
319
            $queryBuilder->setParameter(':id', $jobs[0]['id']);
320
            $resultCount = $queryBuilder->getQuery()->execute();
321
322
            if (1 === $resultCount) {
323
                return $repository->find($jobs[0]['id']);
324
            }
325
        }
326
327
        return null;
328
    }
329
330
    /**
331
     * Tries to update the nearest job as a batch.
332
     *
333
     * @param \Dtc\QueueBundle\Model\Job $job
334
     *
335
     * @return mixed|null
336
     */
337
    public function updateNearestBatch(\Dtc\QueueBundle\Model\Job $job)
338
    {
339
        /** @var QueryBuilder $queryBuilder */
340
        $queryBuilder = $this->getRepository()->createQueryBuilder('j');
341
        $queryBuilder->select()
342
            ->where('j.crcHash = :crcHash')
343
            ->andWhere('j.status = :status')
344
            ->setParameter(':status', BaseJob::STATUS_NEW)
345
            ->setParameter(':crcHash', $job->getCrcHash())
346
            ->orderBy('j.whenAt', 'ASC')
347
            ->setMaxResults(1);
348
        $existingJobs = $queryBuilder->getQuery()->execute();
349
350
        if (empty($existingJobs)) {
351
            return null;
352
        }
353
        /** @var Job $existingJob */
354
        $existingJob = $existingJobs[0];
355
356
        $newPriority = max($job->getPriority(), $existingJob->getPriority());
357
        $newWhenAt = min($job->getWhenAt(), $existingJob->getWhenAt());
358
359
        $this->updateBatchJob($existingJob, $newPriority, $newWhenAt);
360
361
        return $existingJob;
362
    }
363
364
    protected function updateBatchJob(Job $existingJob, $newPriority, $newWhenAt)
365
    {
366
        $existingPriority = $existingJob->getPriority();
367
        $existingWhenAt = $existingJob->getWhenAt();
368
369
        if ($newPriority !== $existingPriority || $newWhenAt !== $existingWhenAt) {
370
            /** @var EntityRepository $repository */
371
            $repository = $this->getRepository();
372
            /** @var QueryBuilder $queryBuilder */
373
            $queryBuilder = $repository->createQueryBuilder('j');
374
            $queryBuilder->update();
375
            if ($newPriority !== $existingPriority) {
376
                $existingJob->setPriority($newPriority);
377
                $queryBuilder->set('j.priority', ':priority')
378
                    ->setParameter(':priority', $newPriority);
379
            }
380
            if ($newWhenAt !== $existingWhenAt) {
381
                $existingJob->setWhenAt($newWhenAt);
382
                $queryBuilder->set('j.whenAt', ':whenAt')
383
                    ->setParameter(':whenAt', $newWhenAt);
384
            }
385
            $queryBuilder->where('j.id = :id');
386
            $queryBuilder->andWhere('j.locked is NULL');
387
            $queryBuilder->setParameter(':id', $existingJob->getId());
388
            $queryBuilder->getQuery()->execute();
389
        }
390
391
        return $existingJob;
392
    }
393
}
394