Completed
Push — master ( 239355...d39289 )
by Matthew
05:26
created

JobManager::getJobCount()   B

Complexity

Conditions 4
Paths 4

Size

Total Lines 45
Code Lines 31

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 19
CRAP Score 4.6558

Importance

Changes 0
Metric Value
dl 0
loc 45
ccs 19
cts 29
cp 0.6552
rs 8.5806
c 0
b 0
f 0
cc 4
eloc 31
nc 4
nop 2
crap 4.6558
1
<?php
2
3
namespace Dtc\QueueBundle\ORM;
4
5
use Doctrine\DBAL\LockMode;
6
use Doctrine\ORM\EntityManager;
7
use Doctrine\ORM\EntityRepository;
8
use Doctrine\ORM\Id\AssignedGenerator;
9
use Doctrine\ORM\Mapping\ClassMetadata;
10
use Doctrine\ORM\QueryBuilder;
11
use Dtc\QueueBundle\Doctrine\BaseJobManager;
12
use Dtc\QueueBundle\Entity\Job;
13
use Dtc\QueueBundle\Model\BaseJob;
14
use Dtc\QueueBundle\Model\RetryableJob;
15
16
class JobManager extends BaseJobManager
17
{
18
    protected $formerIdGenerators;
19
    protected static $saveInsertCalled = null;
20
    protected static $resetInsertCalled = null;
21
22 9
    public function stopIdGenerator($objectName)
23
    {
24 9
        $objectManager = $this->getObjectManager();
25 9
        $repository = $objectManager->getRepository($objectName);
26
        /** @var ClassMetadata $metadata */
27 9
        $metadata = $objectManager->getClassMetadata($repository->getClassName());
28 9
        $this->formerIdGenerators[$objectName]['generator'] = $metadata->idGenerator;
29 9
        $this->formerIdGenerators[$objectName]['type'] = $metadata->generatorType;
30 9
        $metadata->setIdGeneratorType(ClassMetadata::GENERATOR_TYPE_NONE);
31 9
        $metadata->setIdGenerator(new AssignedGenerator());
32 9
    }
33
34 1
    public function restoreIdGenerator($objectName)
35
    {
36 1
        $objectManager = $this->getObjectManager();
37 1
        $repository = $objectManager->getRepository($objectName);
38
        /** @var ClassMetadata $metadata */
39 1
        $metadata = $objectManager->getClassMetadata($repository->getClassName());
40 1
        $generator = $this->formerIdGenerators[$objectName]['generator'];
41 1
        $type = $this->formerIdGenerators[$objectName]['type'];
42 1
        $metadata->setIdGeneratorType($type);
43 1
        $metadata->setIdGenerator($generator);
44 1
    }
45
46 3
    public function countJobsByStatus($objectName, $status, $workerName = null, $method = null)
47
    {
48
        /** @var EntityManager $objectManager */
49 3
        $objectManager = $this->getObjectManager();
50
51
        $qb = $objectManager
52 3
            ->createQueryBuilder()
53 3
            ->select('count(a.id)')
54 3
            ->from($objectName, 'a')
55 3
            ->where('a.status = :status');
56
57 3
        if (null !== $workerName) {
58 1
            $qb->andWhere('a.workerName = :workerName')
59 1
                ->setParameter(':workerName', $workerName);
60
        }
61
62 3
        if (null !== $method) {
63 1
            $qb->andWhere('a.method = :method')
64 1
                ->setParameter(':method', $workerName);
65
        }
66
67 3
        $count = $qb->setParameter(':status', $status)
68 3
            ->getQuery()->getSingleScalarResult();
69
70 3
        if (!$count) {
71 1
            return 0;
72
        }
73
74 3
        return $count;
75
    }
76
77
    /**
78
     * @param string|null $workerName
79
     * @param string|null $method
80
     *
81
     * @return int Count of jobs pruned
82
     */
83 1
    public function pruneErroneousJobs($workerName = null, $method = null)
84
    {
85
        /** @var EntityManager $objectManager */
86 1
        $objectManager = $this->getObjectManager();
87 1
        $qb = $objectManager->createQueryBuilder()->delete($this->getArchiveObjectName(), 'j');
88 1
        $qb->where('j.status = :status')
89 1
            ->setParameter(':status', BaseJob::STATUS_ERROR);
90
91 1
        $this->addWorkerNameCriterion($qb, $workerName, $method);
92 1
        $query = $qb->getQuery();
93
94 1
        return intval($query->execute());
95
    }
96
97 12
    protected function resetSaveOk($function)
98
    {
99 12
        $objectManager = $this->getObjectManager();
100 12
        $splObjectHash = spl_object_hash($objectManager);
101
102 12
        if ('save' === $function) {
103 12
            $compare = static::$resetInsertCalled;
104
        } else {
105 1
            $compare = static::$saveInsertCalled;
106
        }
107
108 12
        if ($splObjectHash === $compare) {
109
            // Insert SQL is cached...
110
            $msg = "Can't call save and reset within the same process cycle (or using the same EntityManager)";
111
            throw new \Exception($msg);
112
        }
113
114 12
        if ('save' === $function) {
115 12
            static::$saveInsertCalled = spl_object_hash($objectManager);
116
        } else {
117 1
            static::$resetInsertCalled = spl_object_hash($objectManager);
118
        }
119 12
    }
120
121 7
    protected function addWorkerNameCriterion(QueryBuilder $queryBuilder, $workerName = null, $method = null)
122
    {
123 7
        if (null !== $workerName) {
124 3
            $queryBuilder->andWhere('j.workerName = :workerName')->setParameter(':workerName', $workerName);
125
        }
126
127 7
        if (null !== $method) {
128 2
            $queryBuilder->andWhere('j.method = :method')->setParameter(':method', $method);
129
        }
130 7
    }
131
132 1
    protected function updateExpired($workerName = null, $method = null)
133
    {
134
        /** @var EntityManager $objectManager */
135 1
        $objectManager = $this->getObjectManager();
136 1
        $qb = $objectManager->createQueryBuilder()->update($this->getObjectName(), 'j');
137 1
        $qb->set('j.status', ':newStatus');
138 1
        $qb->where('j.expiresAt <= :expiresAt')
139 1
            ->setParameter(':expiresAt', new \DateTime());
140 1
        $qb->andWhere('j.status = :status')
141 1
            ->setParameter(':status', BaseJob::STATUS_NEW)
142 1
            ->setParameter(':newStatus', Job::STATUS_EXPIRED);
143
144 1
        $this->addWorkerNameCriterion($qb, $workerName, $method);
145 1
        $query = $qb->getQuery();
146
147 1
        return intval($query->execute());
148
    }
149
150
    /**
151
     * Removes archived jobs older than $olderThan.
152
     *
153
     * @param \DateTime $olderThan
154
     */
155 1
    public function pruneArchivedJobs(\DateTime $olderThan)
156
    {
157
        /** @var EntityManager $objectManager */
158 1
        $objectManager = $this->getObjectManager();
159 1
        $qb = $objectManager->createQueryBuilder()->delete($this->getArchiveObjectName(), 'j');
160
        $qb = $qb
161 1
            ->where('j.updatedAt < :updatedAt')
162 1
            ->setParameter(':updatedAt', $olderThan);
163
164 1
        $query = $qb->getQuery();
165
166 1
        return $query->execute();
167
    }
168
169 1
    public function getJobCount($workerName = null, $method = null)
170
    {
171
        /** @var EntityManager $objectManager */
172 1
        $objectManager = $this->getObjectManager();
173 1
        $qb = $objectManager->createQueryBuilder();
174
175 1
        $qb = $qb->select('count(j)')->from($this->getObjectName(), 'j');
176
177 1
        $where = 'where';
178 1
        if (null !== $workerName) {
179
            if (null !== $method) {
180
                $qb->where($qb->expr()->andX(
181
                    $qb->expr()->eq('j.workerName', ':workerName'),
182
                                                $qb->expr()->eq('j.method', ':method')
183
                ))
184
                    ->setParameter(':method', $method);
185
            } else {
186
                $qb->where('j.workerName = :workerName');
187
            }
188
            $qb->setParameter(':workerName', $workerName);
189
            $where = 'andWhere';
190 1
        } elseif (null !== $method) {
191
            $qb->where('j.method = :method')->setParameter(':method', $method);
192
            $where = 'andWhere';
193
        }
194
195 1
        $dateTime = new \DateTime();
196
        // Filter
197
        $qb
198 1
            ->$where($qb->expr()->orX(
199 1
                $qb->expr()->isNull('j.whenAt'),
200 1
                                        $qb->expr()->lte('j.whenAt', ':whenAt')
201
            ))
202 1
            ->andWhere($qb->expr()->orX(
203 1
                $qb->expr()->isNull('j.expiresAt'),
204 1
                $qb->expr()->gt('j.expiresAt', ':expiresAt')
205
            ))
206 1
            ->andWhere('j.locked is NULL')
207 1
            ->setParameter(':whenAt', $dateTime)
208 1
            ->setParameter(':expiresAt', $dateTime);
209
210 1
        $query = $qb->getQuery();
211
212 1
        return $query->getSingleScalarResult();
213
    }
214
215
    /**
216
     * Get Jobs statuses.
217
     */
218 1
    public function getStatus()
219
    {
220 1
        $result = [];
221 1
        $this->getStatusByEntityName($this->getObjectName(), $result);
222 1
        $this->getStatusByEntityName($this->getArchiveObjectName(), $result);
223
224 1
        $finalResult = [];
225 1
        foreach ($result as $key => $item) {
226 1
            ksort($item);
227 1
            foreach ($item as $status => $count) {
228 1
                if (isset($finalResult[$key][$status])) {
229
                    $finalResult[$key][$status] += $count;
230
                } else {
231 1
                    $finalResult[$key][$status] = $count;
232
                }
233
            }
234
        }
235
236 1
        return $finalResult;
237
    }
238
239
    /**
240
     * @param string $entityName
241
     */
242 1
    protected function getStatusByEntityName($entityName, array &$result)
243
    {
244
        /** @var EntityManager $objectManager */
245 1
        $objectManager = $this->getObjectManager();
246 1
        $result1 = $objectManager->getRepository($entityName)->createQueryBuilder('j')->select('j.workerName, j.method, j.status, count(j) as c')
247 1
            ->groupBy('j.workerName, j.method, j.status')->getQuery()->getArrayResult();
248
249 1
        foreach ($result1 as $item) {
250 1
            $method = $item['workerName'].'->'.$item['method'].'()';
251 1
            if (!isset($result[$method])) {
252 1
                $result[$method] = [BaseJob::STATUS_NEW => 0,
253
                    BaseJob::STATUS_RUNNING => 0,
254
                    RetryableJob::STATUS_EXPIRED => 0,
255
                    RetryableJob::STATUS_MAX_ERROR => 0,
256
                    RetryableJob::STATUS_MAX_STALLED => 0,
257
                    RetryableJob::STATUS_MAX_RETRIES => 0,
258
                    BaseJob::STATUS_SUCCESS => 0,
259
                    BaseJob::STATUS_ERROR => 0, ];
260
            }
261 1
            $result[$method][$item['status']] += intval($item['c']);
262
        }
263 1
    }
264
265
    /**
266
     * Get the next job to run (can be filtered by workername and method name).
267
     *
268
     * @param string $workerName
269
     * @param string $methodName
270
     * @param bool   $prioritize
271
     *
272
     * @return Job|null
273
     */
274 5
    public function getJob($workerName = null, $methodName = null, $prioritize = true, $runId = null)
275
    {
276 5
        $uniqid = uniqid(gethostname().'-'.getmypid(), true);
277 5
        $hash = hash('sha256', $uniqid);
278
279
        /** @var EntityManager $objectManager */
280 5
        $objectManager = $this->getObjectManager();
281
282 5
        $objectManager->beginTransaction();
283
284
        /** @var EntityRepository $repository */
285 5
        $repository = $this->getRepository();
286 5
        $qb = $repository->createQueryBuilder('j');
287 5
        $dateTime = new \DateTime();
288
        $qb
289 5
            ->select('j')
290 5
            ->where('j.status = :status')->setParameter(':status', BaseJob::STATUS_NEW)
291 5
            ->andWhere('j.locked is NULL')
292 5
            ->andWhere($qb->expr()->orX(
293 5
                $qb->expr()->isNull('j.whenAt'),
294 5
                        $qb->expr()->lte('j.whenAt', ':whenAt')
295
            ))
296 5
            ->andWhere($qb->expr()->orX(
297 5
                $qb->expr()->isNull('j.expiresAt'),
298 5
                        $qb->expr()->gt('j.expiresAt', ':expiresAt')
299
            ))
300 5
            ->setParameter(':whenAt', $dateTime)
301 5
            ->setParameter(':expiresAt', $dateTime);
302
303 5
        $this->addWorkerNameCriterion($qb, $workerName, $methodName);
304
305 5
        if ($prioritize) {
306 5
            $qb->add('orderBy', 'j.priority ASC, j.whenAt ASC');
0 ignored issues
show
Documentation introduced by
'j.priority ASC, j.whenAt ASC' is of type string, but the function expects a object<Doctrine\ORM\Query\Expr\Base>.

It seems like the type of the argument is not accepted by the function/method which you are calling.

In some cases, in particular if PHP’s automatic type-juggling kicks in this might be fine. In other cases, however this might be a bug.

We suggest to add an explicit type cast like in the following example:

function acceptsInteger($int) { }

$x = '123'; // string "123"

// Instead of
acceptsInteger($x);

// we recommend to use
acceptsInteger((integer) $x);
Loading history...
307
        } else {
308
            $qb->orderBy('j.whenAt', 'ASC');
309
        }
310 5
        $qb->setMaxResults(1);
311
312
        /** @var QueryBuilder $qb */
313 5
        $query = $qb->getQuery();
314 5
        $query->setLockMode(LockMode::PESSIMISTIC_WRITE);
315 5
        $jobs = $query->getResult();
316
317 5
        if ($jobs) {
318
            /** @var Job $job */
319 4
            $job = $jobs[0];
320 4
            if (!$job) {
321
                throw new \Exception("No job found for $hash, even though last result was count ".count($jobs));
322
            }
323 4
            $job->setLocked(true);
324 4
            $job->setLockedAt(new \DateTime());
325 4
            $job->setStatus(BaseJob::STATUS_RUNNING);
326 4
            $job->setRunId($runId);
327 4
            $objectManager->commit();
328 4
            $objectManager->flush();
329
330 4
            return $job;
331
        }
332
333 2
        $objectManager->rollback();
334
335 2
        return null;
336
    }
337
}
338