Completed
Push — master ( 1204eb...5574d8 )
by Matthew
05:57
created

JobManager::getStatusByEntityName()   A

Complexity

Conditions 3
Paths 3

Size

Total Lines 22
Code Lines 16

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 6
CRAP Score 3.576

Importance

Changes 0
Metric Value
dl 0
loc 22
ccs 6
cts 10
cp 0.6
rs 9.2
c 0
b 0
f 0
cc 3
eloc 16
nc 3
nop 2
crap 3.576
1
<?php
2
3
namespace Dtc\QueueBundle\ORM;
4
5
use Doctrine\DBAL\LockMode;
6
use Doctrine\ORM\EntityManager;
7
use Doctrine\ORM\EntityRepository;
8
use Doctrine\ORM\QueryBuilder;
9
use Dtc\QueueBundle\Doctrine\BaseJobManager;
10
use Dtc\QueueBundle\Entity\Job;
11
use Dtc\QueueBundle\Model\BaseJob;
12
use Dtc\QueueBundle\Model\RetryableJob;
13
14
class JobManager extends BaseJobManager
15
{
16
    use CommonTrait;
17
    protected static $saveInsertCalled = null;
18
    protected static $resetInsertCalled = null;
19
20
    public function countJobsByStatus($objectName, $status, $workerName = null, $method = null)
21
    {
22
        /** @var EntityManager $objectManager */
23
        $objectManager = $this->getObjectManager();
24
25
        $qb = $objectManager
26
            ->createQueryBuilder()
27
            ->select('count(a.id)')
28
            ->from($objectName, 'a')
29
            ->where('a.status = :status');
30
31
        if (null !== $workerName) {
32
            $qb->andWhere('a.workerName = :workerName')
33
                ->setParameter(':workerName', $workerName);
34
        }
35
36
        if (null !== $method) {
37
            $qb->andWhere('a.method = :method')
38
                ->setParameter(':method', $workerName);
39
        }
40
41
        $count = $qb->setParameter(':status', $status)
42
            ->getQuery()->getSingleScalarResult();
43
44
        if (!$count) {
45
            return 0;
46
        }
47
48
        return $count;
49
    }
50
51
    /**
52
     * @param string|null $workerName
53
     * @param string|null $method
54
     *
55
     * @return int Count of jobs pruned
56
     */
57
    public function pruneErroneousJobs($workerName = null, $method = null)
58
    {
59
        /** @var EntityManager $objectManager */
60
        $objectManager = $this->getObjectManager();
61
        $qb = $objectManager->createQueryBuilder()->delete($this->getArchiveObjectName(), 'j');
62
        $qb->where('j.status = :status')
63
            ->setParameter(':status', BaseJob::STATUS_ERROR);
64
65
        $this->addWorkerNameCriterion($qb, $workerName, $method);
66
        $query = $qb->getQuery();
67
68
        return intval($query->execute());
69
    }
70
71
    protected function resetSaveOk($function)
72
    {
73
        $objectManager = $this->getObjectManager();
74
        $splObjectHash = spl_object_hash($objectManager);
75
76
        if ('save' === $function) {
77
            $compare = static::$resetInsertCalled;
78
        } else {
79
            $compare = static::$saveInsertCalled;
80
        }
81
82
        if ($splObjectHash === $compare) {
83
            // Insert SQL is cached...
84
            $msg = "Can't call save and reset within the same process cycle (or using the same EntityManager)";
85
            throw new \Exception($msg);
86
        }
87
88
        if ('save' === $function) {
89
            static::$saveInsertCalled = spl_object_hash($objectManager);
90
        } else {
91
            static::$resetInsertCalled = spl_object_hash($objectManager);
92
        }
93
    }
94
95
    /**
96
     * @param string $workerName
97
     * @param string $method
98
     */
99
    protected function addWorkerNameCriterion(QueryBuilder $queryBuilder, $workerName = null, $method = null)
100
    {
101
        if (null !== $workerName) {
102
            $queryBuilder->andWhere('j.workerName = :workerName')->setParameter(':workerName', $workerName);
103
        }
104
105
        if (null !== $method) {
106
            $queryBuilder->andWhere('j.method = :method')->setParameter(':method', $method);
107
        }
108
    }
109
110
    protected function updateExpired($workerName = null, $method = null)
111
    {
112
        /** @var EntityManager $objectManager */
113
        $objectManager = $this->getObjectManager();
114
        $qb = $objectManager->createQueryBuilder()->update($this->getObjectName(), 'j');
115
        $qb->set('j.status', ':newStatus');
116
        $qb->where('j.expiresAt <= :expiresAt')
117
            ->setParameter(':expiresAt', new \DateTime());
118
        $qb->andWhere('j.status = :status')
119
            ->setParameter(':status', BaseJob::STATUS_NEW)
120
            ->setParameter(':newStatus', Job::STATUS_EXPIRED);
121
122
        $this->addWorkerNameCriterion($qb, $workerName, $method);
123
        $query = $qb->getQuery();
124
125
        return intval($query->execute());
126
    }
127
128
    protected function getJobCurrentStatus(\Dtc\QueueBundle\Model\Job $job)
129
    {
130
        /** @var EntityManager $objectManager */
131
        $objectManager = $this->getObjectManager();
132
        $qb = $objectManager->createQueryBuilder()->select('j.status')->from($this->getObjectName(), 'j');
133
        $qb->where('j.id = :id')->setParameter(':id', $job->getId());
134
135
        return $qb->getQuery()->getSingleScalarResult();
136
    }
137
138
    /**
139
     * Removes archived jobs older than $olderThan.
140
     *
141
     * @param \DateTime $olderThan
142
     */
143
    public function pruneArchivedJobs(\DateTime $olderThan)
144
    {
145
        return $this->removeOlderThan($this->getArchiveObjectName(),
146
                'updatedAt',
147
                $olderThan);
148
    }
149
150 1
    public function getJobCount($workerName = null, $method = null)
151
    {
152
        /** @var EntityManager $objectManager */
153 1
        $objectManager = $this->getObjectManager();
154 1
        $qb = $objectManager->createQueryBuilder();
155
156 1
        $qb = $qb->select('count(j)')->from($this->getObjectName(), 'j');
157
158 1
        $where = 'where';
159 1
        if (null !== $workerName) {
160
            if (null !== $method) {
161
                $qb->where($qb->expr()->andX(
162
                    $qb->expr()->eq('j.workerName', ':workerName'),
163
                                                $qb->expr()->eq('j.method', ':method')
164
                ))
165
                    ->setParameter(':method', $method);
166
            } else {
167
                $qb->where('j.workerName = :workerName');
168
            }
169
            $qb->setParameter(':workerName', $workerName);
170
            $where = 'andWhere';
171 1
        } elseif (null !== $method) {
172
            $qb->where('j.method = :method')->setParameter(':method', $method);
173
            $where = 'andWhere';
174
        }
175
176 1
        $dateTime = new \DateTime();
177
        // Filter
178
        $qb
179 1
            ->$where($qb->expr()->orX(
180 1
                $qb->expr()->isNull('j.whenAt'),
181 1
                                        $qb->expr()->lte('j.whenAt', ':whenAt')
182
            ))
183 1
            ->andWhere($qb->expr()->orX(
184 1
                $qb->expr()->isNull('j.expiresAt'),
185 1
                $qb->expr()->gt('j.expiresAt', ':expiresAt')
186
            ))
187 1
            ->andWhere('j.locked is NULL')
188 1
            ->setParameter(':whenAt', $dateTime)
189 1
            ->setParameter(':expiresAt', $dateTime);
190
191 1
        $query = $qb->getQuery();
192
193 1
        return $query->getSingleScalarResult();
194
    }
195
196
    /**
197
     * For ORM it's prudent to wrap things in a transaction.
198
     *
199
     * @param $i
200
     * @param $count
201
     * @param array $stalledJobs
202
     * @param $countProcessed
203
     */
204
    protected function runStalledLoop($i, $count, array $stalledJobs, &$countProcessed)
205
    {
206
        /** @var EntityManager $objectManager */
207
        $objectManager = $this->getObjectManager();
208
        try {
209
            $objectManager->beginTransaction();
210
            parent::runStalledLoop($i, $count, $stalledJobs, $countProcessed);
211
            $objectManager->commit();
212
        }
213
        catch (\Exception $exception) {
214
            $objectManager->rollback();
215
216
            // Try again
217
            parent::runStalledLoop($i, $count, $stalledJobs, $countProcessed);
218
        }
219
    }
220
221
    /**
222
     * Get Jobs statuses.
223
     */
224 1
    public function getStatus()
225
    {
226 1
        $result = [];
227 1
        $this->getStatusByEntityName($this->getObjectName(), $result);
228 1
        $this->getStatusByEntityName($this->getArchiveObjectName(), $result);
229
230 1
        $finalResult = [];
231 1
        foreach ($result as $key => $item) {
232
            ksort($item);
233
            foreach ($item as $status => $count) {
234
                if (isset($finalResult[$key][$status])) {
235
                    $finalResult[$key][$status] += $count;
236
                } else {
237
                    $finalResult[$key][$status] = $count;
238
                }
239
            }
240
        }
241
242 1
        return $finalResult;
243
    }
244
245
    /**
246
     * @param string $entityName
247
     */
248 1
    protected function getStatusByEntityName($entityName, array &$result)
249
    {
250
        /** @var EntityManager $objectManager */
251 1
        $objectManager = $this->getObjectManager();
252 1
        $result1 = $objectManager->getRepository($entityName)->createQueryBuilder('j')->select('j.workerName, j.method, j.status, count(j) as c')
253 1
            ->groupBy('j.workerName, j.method, j.status')->getQuery()->getArrayResult();
254
255 1
        foreach ($result1 as $item) {
256
            $method = $item['workerName'].'->'.$item['method'].'()';
257
            if (!isset($result[$method])) {
258
                $result[$method] = [BaseJob::STATUS_NEW => 0,
259
                    BaseJob::STATUS_RUNNING => 0,
260
                    RetryableJob::STATUS_EXPIRED => 0,
261
                    RetryableJob::STATUS_MAX_ERROR => 0,
262
                    RetryableJob::STATUS_MAX_STALLED => 0,
263
                    RetryableJob::STATUS_MAX_RETRIES => 0,
264
                    BaseJob::STATUS_SUCCESS => 0,
265
                    BaseJob::STATUS_ERROR => 0, ];
266
            }
267
            $result[$method][$item['status']] += intval($item['c']);
268
        }
269 1
    }
270
271
    /**
272
     * Get the next job to run (can be filtered by workername and method name).
273
     *
274
     * @param string $workerName
275
     * @param string $methodName
276
     * @param bool   $prioritize
277
     *
278
     * @return Job|null
279
     */
280
    public function getJob($workerName = null, $methodName = null, $prioritize = true, $runId = null)
281
    {
282
        $uniqid = uniqid(gethostname().'-'.getmypid(), true);
283
        $hash = hash('sha256', $uniqid);
284
285
        /** @var EntityManager $objectManager */
286
        $objectManager = $this->getObjectManager();
287
288
        $objectManager->beginTransaction();
289
290
        /** @var EntityRepository $repository */
291
        $repository = $this->getRepository();
292
        $qb = $repository->createQueryBuilder('j');
293
        $dateTime = new \DateTime();
294
        $qb
295
            ->select('j')
296
            ->where('j.status = :status')->setParameter(':status', BaseJob::STATUS_NEW)
297
            ->andWhere('j.locked is NULL')
298
            ->andWhere($qb->expr()->orX(
299
                $qb->expr()->isNull('j.whenAt'),
300
                        $qb->expr()->lte('j.whenAt', ':whenAt')
301
            ))
302
            ->andWhere($qb->expr()->orX(
303
                $qb->expr()->isNull('j.expiresAt'),
304
                        $qb->expr()->gt('j.expiresAt', ':expiresAt')
305
            ))
306
            ->setParameter(':whenAt', $dateTime)
307
            ->setParameter(':expiresAt', $dateTime);
308
309
        $this->addWorkerNameCriterion($qb, $workerName, $methodName);
310
311
        if ($prioritize) {
312
            $qb->add('orderBy', 'j.priority DESC, j.whenAt ASC');
0 ignored issues
show
Documentation introduced by
'j.priority DESC, j.whenAt ASC' is of type string, but the function expects a object<Doctrine\ORM\Query\Expr\Base>.

It seems like the type of the argument is not accepted by the function/method which you are calling.

In some cases, in particular if PHP’s automatic type-juggling kicks in this might be fine. In other cases, however this might be a bug.

We suggest to add an explicit type cast like in the following example:

function acceptsInteger($int) { }

$x = '123'; // string "123"

// Instead of
acceptsInteger($x);

// we recommend to use
acceptsInteger((integer) $x);
Loading history...
313
        } else {
314
            $qb->orderBy('j.whenAt', 'ASC');
315
        }
316
        $qb->setMaxResults(1);
317
318
        /** @var QueryBuilder $qb */
319
        $query = $qb->getQuery();
320
        $query->setLockMode(LockMode::PESSIMISTIC_WRITE);
321
        $jobs = $query->getResult();
322
323
        if ($jobs) {
324
            /** @var Job $job */
325
            $job = $jobs[0];
326
            if (!$job) {
327
                throw new \Exception("No job found for $hash, even though last result was count ".count($jobs));
328
            }
329
            $job->setLocked(true);
330
            $job->setLockedAt(new \DateTime());
331
            $job->setStatus(BaseJob::STATUS_RUNNING);
332
            $job->setRunId($runId);
333
            $objectManager->commit();
334
            $objectManager->flush();
335
336
            return $job;
337
        }
338
339
        $objectManager->rollback();
340
341
        return null;
342
    }
343
344
    /**
345
     * Tries to update the nearest job as a batch.
346
     *
347
     * @param \Dtc\QueueBundle\Model\Job $job
348
     *
349
     * @return mixed|null
350
     */
351
    public function updateNearestBatch(\Dtc\QueueBundle\Model\Job $job)
352
    {
353
        $oldJob = null;
354
        do {
355
            try {
356
                /** @var EntityManager $entityManager */
357
                $entityManager = $this->getObjectManager();
358
                $entityManager->beginTransaction();
359
360
                /** @var QueryBuilder $queryBuilder */
361
                $queryBuilder = $this->getRepository()->createQueryBuilder('j');
362
                $queryBuilder->select()
363
                    ->where('j.crcHash = :crcHash')
364
                    ->andWhere('j.status = :status')
365
                    ->setParameter(':status', BaseJob::STATUS_NEW)
366
                    ->setParameter(':crcHash', $job->getCrcHash())
367
                    ->orderBy('j.whenAt', 'ASC')
368
                    ->setMaxResults(1);
369
                $oldJob = $queryBuilder->getQuery()->getSingleResult();
370
371
                if (!$oldJob) {
372
                    return null;
373
                }
374
375
                $oldJob->setPriority(max($job->getPriority(), $oldJob->getPriority()));
376
                $oldJob->setWhenAt(min($job->getWhenAt(), $oldJob->getWhenAt()));
377
378
                $entityManager->persist($oldJob);
379
                $entityManager->commit();
380
                $this->flush();
381
            } catch (\Exception $exception) {
382
                $entityManager->rollback();
383
            }
384
        } while (null === $oldJob);
385
386
        return $oldJob;
387
    }
388
}
389