Completed
Push — master ( be2940...32ab89 )
by Matthew
05:55
created

JobManager::restoreIdGenerator()   A

Complexity

Conditions 1
Paths 1

Size

Total Lines 11
Code Lines 8

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 9
CRAP Score 1

Importance

Changes 0
Metric Value
dl 0
loc 11
ccs 9
cts 9
cp 1
rs 9.4285
c 0
b 0
f 0
cc 1
eloc 8
nc 1
nop 1
crap 1
1
<?php
2
3
namespace Dtc\QueueBundle\ORM;
4
5
use Doctrine\DBAL\LockMode;
6
use Doctrine\ORM\EntityManager;
7
use Doctrine\ORM\EntityRepository;
8
use Doctrine\ORM\Id\AssignedGenerator;
9
use Doctrine\ORM\Mapping\ClassMetadata;
10
use Doctrine\ORM\QueryBuilder;
11
use Dtc\QueueBundle\Doctrine\BaseJobManager;
12
use Dtc\QueueBundle\Entity\Job;
13
use Dtc\QueueBundle\Model\BaseJob;
14
use Dtc\QueueBundle\Model\RetryableJob;
15
16
class JobManager extends BaseJobManager
17
{
18
    use CommonTrait;
19
    protected $formerIdGenerators;
20
    protected static $saveInsertCalled = null;
21
    protected static $resetInsertCalled = null;
22
23 9
    public function stopIdGenerator($objectName)
24
    {
25 9
        $objectManager = $this->getObjectManager();
26 9
        $repository = $objectManager->getRepository($objectName);
27
        /** @var ClassMetadata $metadata */
28 9
        $metadata = $objectManager->getClassMetadata($repository->getClassName());
29 9
        $this->formerIdGenerators[$objectName]['generator'] = $metadata->idGenerator;
30 9
        $this->formerIdGenerators[$objectName]['type'] = $metadata->generatorType;
31 9
        $metadata->setIdGeneratorType(ClassMetadata::GENERATOR_TYPE_NONE);
32 9
        $metadata->setIdGenerator(new AssignedGenerator());
33 9
    }
34
35 1
    public function restoreIdGenerator($objectName)
36
    {
37 1
        $objectManager = $this->getObjectManager();
38 1
        $repository = $objectManager->getRepository($objectName);
39
        /** @var ClassMetadata $metadata */
40 1
        $metadata = $objectManager->getClassMetadata($repository->getClassName());
41 1
        $generator = $this->formerIdGenerators[$objectName]['generator'];
42 1
        $type = $this->formerIdGenerators[$objectName]['type'];
43 1
        $metadata->setIdGeneratorType($type);
44 1
        $metadata->setIdGenerator($generator);
45 1
    }
46
47 3
    public function countJobsByStatus($objectName, $status, $workerName = null, $method = null)
48
    {
49
        /** @var EntityManager $objectManager */
50 3
        $objectManager = $this->getObjectManager();
51
52
        $qb = $objectManager
53 3
            ->createQueryBuilder()
54 3
            ->select('count(a.id)')
55 3
            ->from($objectName, 'a')
56 3
            ->where('a.status = :status');
57
58 3
        if (null !== $workerName) {
59 1
            $qb->andWhere('a.workerName = :workerName')
60 1
                ->setParameter(':workerName', $workerName);
61
        }
62
63 3
        if (null !== $method) {
64 1
            $qb->andWhere('a.method = :method')
65 1
                ->setParameter(':method', $workerName);
66
        }
67
68 3
        $count = $qb->setParameter(':status', $status)
69 3
            ->getQuery()->getSingleScalarResult();
70
71 3
        if (!$count) {
72 1
            return 0;
73
        }
74
75 3
        return $count;
76
    }
77
78
    /**
79
     * @param string|null $workerName
80
     * @param string|null $method
81
     *
82
     * @return int Count of jobs pruned
83
     */
84 1
    public function pruneErroneousJobs($workerName = null, $method = null)
85
    {
86
        /** @var EntityManager $objectManager */
87 1
        $objectManager = $this->getObjectManager();
88 1
        $qb = $objectManager->createQueryBuilder()->delete($this->getArchiveObjectName(), 'j');
89 1
        $qb->where('j.status = :status')
90 1
            ->setParameter(':status', BaseJob::STATUS_ERROR);
91
92 1
        $this->addWorkerNameCriterion($qb, $workerName, $method);
93 1
        $query = $qb->getQuery();
94
95 1
        return intval($query->execute());
96
    }
97
98 13
    protected function resetSaveOk($function)
99
    {
100 13
        $objectManager = $this->getObjectManager();
101 13
        $splObjectHash = spl_object_hash($objectManager);
102
103 13
        if ('save' === $function) {
104
            $compare = static::$resetInsertCalled;
105
        } else {
106 13
            $compare = static::$saveInsertCalled;
107
        }
108
109 13
        if ($splObjectHash === $compare) {
110
            // Insert SQL is cached...
111
            $msg = "Can't call save and reset within the same process cycle (or using the same EntityManager)";
112
            throw new \Exception($msg);
113
        }
114
115 13
        if ('save' === $function) {
116
            static::$saveInsertCalled = spl_object_hash($objectManager);
117
        } else {
118 13
            static::$resetInsertCalled = spl_object_hash($objectManager);
119
        }
120 13
    }
121
122
    /**
123
     * @param string $workerName
124
     * @param string $method
125
     */
126 8
    protected function addWorkerNameCriterion(QueryBuilder $queryBuilder, $workerName = null, $method = null)
127
    {
128 8
        if (null !== $workerName) {
129 3
            $queryBuilder->andWhere('j.workerName = :workerName')->setParameter(':workerName', $workerName);
130
        }
131
132 8
        if (null !== $method) {
133 2
            $queryBuilder->andWhere('j.method = :method')->setParameter(':method', $method);
134
        }
135 8
    }
136
137 1
    protected function updateExpired($workerName = null, $method = null)
138
    {
139
        /** @var EntityManager $objectManager */
140 1
        $objectManager = $this->getObjectManager();
141 1
        $qb = $objectManager->createQueryBuilder()->update($this->getObjectName(), 'j');
142 1
        $qb->set('j.status', ':newStatus');
143 1
        $qb->where('j.expiresAt <= :expiresAt')
144 1
            ->setParameter(':expiresAt', new \DateTime());
145 1
        $qb->andWhere('j.status = :status')
146 1
            ->setParameter(':status', BaseJob::STATUS_NEW)
147 1
            ->setParameter(':newStatus', Job::STATUS_EXPIRED);
148
149 1
        $this->addWorkerNameCriterion($qb, $workerName, $method);
150 1
        $query = $qb->getQuery();
151
152 1
        return intval($query->execute());
153
    }
154
155
    /**
156
     * Removes archived jobs older than $olderThan.
157
     *
158
     * @param \DateTime $olderThan
159
     */
160 1
    public function pruneArchivedJobs(\DateTime $olderThan)
161
    {
162
        /** @var EntityManager $entityManager */
163 1
        $entityManager = $this->getObjectManager();
164
165 1
        return $this->removeOlderThan($entityManager,
166 1
                $this->getArchiveObjectName(),
167 1
                'updatedAt',
168 1
                $olderThan);
169
    }
170
171 2
    public function getJobCount($workerName = null, $method = null)
172
    {
173
        /** @var EntityManager $objectManager */
174 2
        $objectManager = $this->getObjectManager();
175 2
        $qb = $objectManager->createQueryBuilder();
176
177 2
        $qb = $qb->select('count(j)')->from($this->getObjectName(), 'j');
178
179 2
        $where = 'where';
180 2
        if (null !== $workerName) {
181
            if (null !== $method) {
182
                $qb->where($qb->expr()->andX(
183
                    $qb->expr()->eq('j.workerName', ':workerName'),
184
                                                $qb->expr()->eq('j.method', ':method')
185
                ))
186
                    ->setParameter(':method', $method);
187
            } else {
188
                $qb->where('j.workerName = :workerName');
189
            }
190
            $qb->setParameter(':workerName', $workerName);
191
            $where = 'andWhere';
192 2
        } elseif (null !== $method) {
193
            $qb->where('j.method = :method')->setParameter(':method', $method);
194
            $where = 'andWhere';
195
        }
196
197 2
        $dateTime = new \DateTime();
198
        // Filter
199
        $qb
200 2
            ->$where($qb->expr()->orX(
201 2
                $qb->expr()->isNull('j.whenAt'),
202 2
                                        $qb->expr()->lte('j.whenAt', ':whenAt')
203
            ))
204 2
            ->andWhere($qb->expr()->orX(
205 2
                $qb->expr()->isNull('j.expiresAt'),
206 2
                $qb->expr()->gt('j.expiresAt', ':expiresAt')
207
            ))
208 2
            ->andWhere('j.locked is NULL')
209 2
            ->setParameter(':whenAt', $dateTime)
210 2
            ->setParameter(':expiresAt', $dateTime);
211
212 2
        $query = $qb->getQuery();
213
214 2
        return $query->getSingleScalarResult();
215
    }
216
217
    /**
218
     * Get Jobs statuses.
219
     */
220 2
    public function getStatus()
221
    {
222 2
        $result = [];
223 2
        $this->getStatusByEntityName($this->getObjectName(), $result);
224 2
        $this->getStatusByEntityName($this->getArchiveObjectName(), $result);
225
226 2
        $finalResult = [];
227 2
        foreach ($result as $key => $item) {
228 1
            ksort($item);
229 1
            foreach ($item as $status => $count) {
230 1
                if (isset($finalResult[$key][$status])) {
231
                    $finalResult[$key][$status] += $count;
232
                } else {
233 1
                    $finalResult[$key][$status] = $count;
234
                }
235
            }
236
        }
237
238 2
        return $finalResult;
239
    }
240
241
    /**
242
     * @param string $entityName
243
     */
244 2
    protected function getStatusByEntityName($entityName, array &$result)
245
    {
246
        /** @var EntityManager $objectManager */
247 2
        $objectManager = $this->getObjectManager();
248 2
        $result1 = $objectManager->getRepository($entityName)->createQueryBuilder('j')->select('j.workerName, j.method, j.status, count(j) as c')
249 2
            ->groupBy('j.workerName, j.method, j.status')->getQuery()->getArrayResult();
250
251 2
        foreach ($result1 as $item) {
252 1
            $method = $item['workerName'].'->'.$item['method'].'()';
253 1
            if (!isset($result[$method])) {
254 1
                $result[$method] = [BaseJob::STATUS_NEW => 0,
255
                    BaseJob::STATUS_RUNNING => 0,
256
                    RetryableJob::STATUS_EXPIRED => 0,
257
                    RetryableJob::STATUS_MAX_ERROR => 0,
258
                    RetryableJob::STATUS_MAX_STALLED => 0,
259
                    RetryableJob::STATUS_MAX_RETRIES => 0,
260
                    BaseJob::STATUS_SUCCESS => 0,
261
                    BaseJob::STATUS_ERROR => 0, ];
262
            }
263 1
            $result[$method][$item['status']] += intval($item['c']);
264
        }
265 2
    }
266
267
    /**
268
     * Get the next job to run (can be filtered by workername and method name).
269
     *
270
     * @param string $workerName
271
     * @param string $methodName
272
     * @param bool   $prioritize
273
     *
274
     * @return Job|null
275
     */
276 6
    public function getJob($workerName = null, $methodName = null, $prioritize = true, $runId = null)
277
    {
278 6
        $uniqid = uniqid(gethostname().'-'.getmypid(), true);
279 6
        $hash = hash('sha256', $uniqid);
280
281
        /** @var EntityManager $objectManager */
282 6
        $objectManager = $this->getObjectManager();
283
284 6
        $objectManager->beginTransaction();
285
286
        /** @var EntityRepository $repository */
287 6
        $repository = $this->getRepository();
288 6
        $qb = $repository->createQueryBuilder('j');
289 6
        $dateTime = new \DateTime();
290
        $qb
291 6
            ->select('j')
292 6
            ->where('j.status = :status')->setParameter(':status', BaseJob::STATUS_NEW)
293 6
            ->andWhere('j.locked is NULL')
294 6
            ->andWhere($qb->expr()->orX(
295 6
                $qb->expr()->isNull('j.whenAt'),
296 6
                        $qb->expr()->lte('j.whenAt', ':whenAt')
297
            ))
298 6
            ->andWhere($qb->expr()->orX(
299 6
                $qb->expr()->isNull('j.expiresAt'),
300 6
                        $qb->expr()->gt('j.expiresAt', ':expiresAt')
301
            ))
302 6
            ->setParameter(':whenAt', $dateTime)
303 6
            ->setParameter(':expiresAt', $dateTime);
304
305 6
        $this->addWorkerNameCriterion($qb, $workerName, $methodName);
306
307 6
        if ($prioritize) {
308 6
            $qb->add('orderBy', 'j.priority DESC, j.whenAt ASC');
0 ignored issues
show
Documentation introduced by
'j.priority DESC, j.whenAt ASC' is of type string, but the function expects a object<Doctrine\ORM\Query\Expr\Base>.

It seems like the type of the argument is not accepted by the function/method which you are calling.

In some cases, in particular if PHP’s automatic type-juggling kicks in this might be fine. In other cases, however this might be a bug.

We suggest to add an explicit type cast like in the following example:

function acceptsInteger($int) { }

$x = '123'; // string "123"

// Instead of
acceptsInteger($x);

// we recommend to use
acceptsInteger((integer) $x);
Loading history...
309
        } else {
310
            $qb->orderBy('j.whenAt', 'ASC');
311
        }
312 6
        $qb->setMaxResults(1);
313
314
        /** @var QueryBuilder $qb */
315 6
        $query = $qb->getQuery();
316 6
        $query->setLockMode(LockMode::PESSIMISTIC_WRITE);
317 6
        $jobs = $query->getResult();
318
319 6
        if ($jobs) {
320
            /** @var Job $job */
321 5
            $job = $jobs[0];
322 5
            if (!$job) {
323
                throw new \Exception("No job found for $hash, even though last result was count ".count($jobs));
324
            }
325 5
            $job->setLocked(true);
326 5
            $job->setLockedAt(new \DateTime());
327 5
            $job->setStatus(BaseJob::STATUS_RUNNING);
328 5
            $job->setRunId($runId);
329 5
            $objectManager->commit();
330 5
            $objectManager->flush();
331
332 5
            return $job;
333
        }
334
335 3
        $objectManager->rollback();
336
337 3
        return null;
338
    }
339
}
340