Completed
Push — master ( 17631a...07adb9 )
by Matthew
05:11
created

JobManager::restoreIdGenerator()   A

Complexity

Conditions 1
Paths 1

Size

Total Lines 11
Code Lines 8

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 9
CRAP Score 1

Importance

Changes 0
Metric Value
dl 0
loc 11
ccs 9
cts 9
cp 1
rs 9.4285
c 0
b 0
f 0
cc 1
eloc 8
nc 1
nop 1
crap 1
1
<?php
2
3
namespace Dtc\QueueBundle\ORM;
4
5
use Doctrine\DBAL\LockMode;
6
use Doctrine\ORM\EntityManager;
7
use Doctrine\ORM\EntityRepository;
8
use Doctrine\ORM\Id\AssignedGenerator;
9
use Doctrine\ORM\Mapping\ClassMetadata;
10
use Doctrine\ORM\QueryBuilder;
11
use Dtc\QueueBundle\Doctrine\BaseJobManager;
12
use Dtc\QueueBundle\Entity\Job;
13
use Dtc\QueueBundle\Model\BaseJob;
14
use Dtc\QueueBundle\Model\RetryableJob;
15
16
class JobManager extends BaseJobManager
17
{
18
    use CommonTrait;
19
    protected $formerIdGenerators;
20
    protected static $saveInsertCalled = null;
21
    protected static $resetInsertCalled = null;
22
23 9
    public function stopIdGenerator($objectName)
24
    {
25 9
        $objectManager = $this->getObjectManager();
26 9
        $repository = $objectManager->getRepository($objectName);
27
        /** @var ClassMetadata $metadata */
28 9
        $metadata = $objectManager->getClassMetadata($repository->getClassName());
29 9
        $this->formerIdGenerators[$objectName]['generator'] = $metadata->idGenerator;
30 9
        $this->formerIdGenerators[$objectName]['type'] = $metadata->generatorType;
31 9
        $metadata->setIdGeneratorType(ClassMetadata::GENERATOR_TYPE_NONE);
32 9
        $metadata->setIdGenerator(new AssignedGenerator());
33 9
    }
34
35 1
    public function restoreIdGenerator($objectName)
36
    {
37 1
        $objectManager = $this->getObjectManager();
38 1
        $repository = $objectManager->getRepository($objectName);
39
        /** @var ClassMetadata $metadata */
40 1
        $metadata = $objectManager->getClassMetadata($repository->getClassName());
41 1
        $generator = $this->formerIdGenerators[$objectName]['generator'];
42 1
        $type = $this->formerIdGenerators[$objectName]['type'];
43 1
        $metadata->setIdGeneratorType($type);
44 1
        $metadata->setIdGenerator($generator);
45 1
    }
46
47 3
    public function countJobsByStatus($objectName, $status, $workerName = null, $method = null)
48
    {
49
        /** @var EntityManager $objectManager */
50 3
        $objectManager = $this->getObjectManager();
51
52
        $qb = $objectManager
53 3
            ->createQueryBuilder()
54 3
            ->select('count(a.id)')
55 3
            ->from($objectName, 'a')
56 3
            ->where('a.status = :status');
57
58 3
        if (null !== $workerName) {
59 1
            $qb->andWhere('a.workerName = :workerName')
60 1
                ->setParameter(':workerName', $workerName);
61
        }
62
63 3
        if (null !== $method) {
64 1
            $qb->andWhere('a.method = :method')
65 1
                ->setParameter(':method', $workerName);
66
        }
67
68 3
        $count = $qb->setParameter(':status', $status)
69 3
            ->getQuery()->getSingleScalarResult();
70
71 3
        if (!$count) {
72 1
            return 0;
73
        }
74
75 3
        return $count;
76
    }
77
78
    /**
79
     * @param string|null $workerName
80
     * @param string|null $method
81
     *
82
     * @return int Count of jobs pruned
83
     */
84 1
    public function pruneErroneousJobs($workerName = null, $method = null)
85
    {
86
        /** @var EntityManager $objectManager */
87 1
        $objectManager = $this->getObjectManager();
88 1
        $qb = $objectManager->createQueryBuilder()->delete($this->getArchiveObjectName(), 'j');
89 1
        $qb->where('j.status = :status')
90 1
            ->setParameter(':status', BaseJob::STATUS_ERROR);
91
92 1
        $this->addWorkerNameCriterion($qb, $workerName, $method);
93 1
        $query = $qb->getQuery();
94
95 1
        return intval($query->execute());
96
    }
97
98 12
    protected function resetSaveOk($function)
99
    {
100 12
        $objectManager = $this->getObjectManager();
101 12
        $splObjectHash = spl_object_hash($objectManager);
102
103 12
        if ('save' === $function) {
104 12
            $compare = static::$resetInsertCalled;
105
        } else {
106 1
            $compare = static::$saveInsertCalled;
107
        }
108
109 12
        if ($splObjectHash === $compare) {
110
            // Insert SQL is cached...
111
            $msg = "Can't call save and reset within the same process cycle (or using the same EntityManager)";
112
            throw new \Exception($msg);
113
        }
114
115 12
        if ('save' === $function) {
116 12
            static::$saveInsertCalled = spl_object_hash($objectManager);
117
        } else {
118 1
            static::$resetInsertCalled = spl_object_hash($objectManager);
119
        }
120 12
    }
121
122 7
    protected function addWorkerNameCriterion(QueryBuilder $queryBuilder, $workerName = null, $method = null)
123
    {
124 7
        if (null !== $workerName) {
125 3
            $queryBuilder->andWhere('j.workerName = :workerName')->setParameter(':workerName', $workerName);
126
        }
127
128 7
        if (null !== $method) {
129 2
            $queryBuilder->andWhere('j.method = :method')->setParameter(':method', $method);
130
        }
131 7
    }
132
133 1
    protected function updateExpired($workerName = null, $method = null)
134
    {
135
        /** @var EntityManager $objectManager */
136 1
        $objectManager = $this->getObjectManager();
137 1
        $qb = $objectManager->createQueryBuilder()->update($this->getObjectName(), 'j');
138 1
        $qb->set('j.status', ':newStatus');
139 1
        $qb->where('j.expiresAt <= :expiresAt')
140 1
            ->setParameter(':expiresAt', new \DateTime());
141 1
        $qb->andWhere('j.status = :status')
142 1
            ->setParameter(':status', BaseJob::STATUS_NEW)
143 1
            ->setParameter(':newStatus', Job::STATUS_EXPIRED);
144
145 1
        $this->addWorkerNameCriterion($qb, $workerName, $method);
146 1
        $query = $qb->getQuery();
147
148 1
        return intval($query->execute());
149
    }
150
151
    /**
152
     * Removes archived jobs older than $olderThan.
153
     *
154
     * @param \DateTime $olderThan
155
     */
156 1
    public function pruneArchivedJobs(\DateTime $olderThan)
157
    {
158
        /** @var EntityManager $entityManager */
159 1
        $entityManager = $this->getObjectManager();
160
161 1
        return $this->removeOlderThan($entityManager,
162 1
                $this->getArchiveObjectName(),
163 1
                'updatedAt',
164 1
                $olderThan);
165
    }
166
167 2
    public function getJobCount($workerName = null, $method = null)
168
    {
169
        /** @var EntityManager $objectManager */
170 2
        $objectManager = $this->getObjectManager();
171 2
        $qb = $objectManager->createQueryBuilder();
172
173 2
        $qb = $qb->select('count(j)')->from($this->getObjectName(), 'j');
174
175 2
        $where = 'where';
176 2
        if (null !== $workerName) {
177
            if (null !== $method) {
178
                $qb->where($qb->expr()->andX(
179
                    $qb->expr()->eq('j.workerName', ':workerName'),
180
                                                $qb->expr()->eq('j.method', ':method')
181
                ))
182
                    ->setParameter(':method', $method);
183
            } else {
184
                $qb->where('j.workerName = :workerName');
185
            }
186
            $qb->setParameter(':workerName', $workerName);
187
            $where = 'andWhere';
188 2
        } elseif (null !== $method) {
189
            $qb->where('j.method = :method')->setParameter(':method', $method);
190
            $where = 'andWhere';
191
        }
192
193 2
        $dateTime = new \DateTime();
194
        // Filter
195
        $qb
196 2
            ->$where($qb->expr()->orX(
197 2
                $qb->expr()->isNull('j.whenAt'),
198 2
                                        $qb->expr()->lte('j.whenAt', ':whenAt')
199
            ))
200 2
            ->andWhere($qb->expr()->orX(
201 2
                $qb->expr()->isNull('j.expiresAt'),
202 2
                $qb->expr()->gt('j.expiresAt', ':expiresAt')
203
            ))
204 2
            ->andWhere('j.locked is NULL')
205 2
            ->setParameter(':whenAt', $dateTime)
206 2
            ->setParameter(':expiresAt', $dateTime);
207
208 2
        $query = $qb->getQuery();
209
210 2
        return $query->getSingleScalarResult();
211
    }
212
213
    /**
214
     * Get Jobs statuses.
215
     */
216 2
    public function getStatus()
217
    {
218 2
        $result = [];
219 2
        $this->getStatusByEntityName($this->getObjectName(), $result);
220 2
        $this->getStatusByEntityName($this->getArchiveObjectName(), $result);
221
222 2
        $finalResult = [];
223 2
        foreach ($result as $key => $item) {
224 1
            ksort($item);
225 1
            foreach ($item as $status => $count) {
226 1
                if (isset($finalResult[$key][$status])) {
227
                    $finalResult[$key][$status] += $count;
228
                } else {
229 1
                    $finalResult[$key][$status] = $count;
230
                }
231
            }
232
        }
233
234 2
        return $finalResult;
235
    }
236
237
    /**
238
     * @param string $entityName
239
     */
240 2
    protected function getStatusByEntityName($entityName, array &$result)
241
    {
242
        /** @var EntityManager $objectManager */
243 2
        $objectManager = $this->getObjectManager();
244 2
        $result1 = $objectManager->getRepository($entityName)->createQueryBuilder('j')->select('j.workerName, j.method, j.status, count(j) as c')
245 2
            ->groupBy('j.workerName, j.method, j.status')->getQuery()->getArrayResult();
246
247 2
        foreach ($result1 as $item) {
248 1
            $method = $item['workerName'].'->'.$item['method'].'()';
249 1
            if (!isset($result[$method])) {
250 1
                $result[$method] = [BaseJob::STATUS_NEW => 0,
251
                    BaseJob::STATUS_RUNNING => 0,
252
                    RetryableJob::STATUS_EXPIRED => 0,
253
                    RetryableJob::STATUS_MAX_ERROR => 0,
254
                    RetryableJob::STATUS_MAX_STALLED => 0,
255
                    RetryableJob::STATUS_MAX_RETRIES => 0,
256
                    BaseJob::STATUS_SUCCESS => 0,
257
                    BaseJob::STATUS_ERROR => 0, ];
258
            }
259 1
            $result[$method][$item['status']] += intval($item['c']);
260
        }
261 2
    }
262
263
    /**
264
     * Get the next job to run (can be filtered by workername and method name).
265
     *
266
     * @param string $workerName
267
     * @param string $methodName
268
     * @param bool   $prioritize
269
     *
270
     * @return Job|null
271
     */
272 5
    public function getJob($workerName = null, $methodName = null, $prioritize = true, $runId = null)
273
    {
274 5
        $uniqid = uniqid(gethostname().'-'.getmypid(), true);
275 5
        $hash = hash('sha256', $uniqid);
276
277
        /** @var EntityManager $objectManager */
278 5
        $objectManager = $this->getObjectManager();
279
280 5
        $objectManager->beginTransaction();
281
282
        /** @var EntityRepository $repository */
283 5
        $repository = $this->getRepository();
284 5
        $qb = $repository->createQueryBuilder('j');
285 5
        $dateTime = new \DateTime();
286
        $qb
287 5
            ->select('j')
288 5
            ->where('j.status = :status')->setParameter(':status', BaseJob::STATUS_NEW)
289 5
            ->andWhere('j.locked is NULL')
290 5
            ->andWhere($qb->expr()->orX(
291 5
                $qb->expr()->isNull('j.whenAt'),
292 5
                        $qb->expr()->lte('j.whenAt', ':whenAt')
293
            ))
294 5
            ->andWhere($qb->expr()->orX(
295 5
                $qb->expr()->isNull('j.expiresAt'),
296 5
                        $qb->expr()->gt('j.expiresAt', ':expiresAt')
297
            ))
298 5
            ->setParameter(':whenAt', $dateTime)
299 5
            ->setParameter(':expiresAt', $dateTime);
300
301 5
        $this->addWorkerNameCriterion($qb, $workerName, $methodName);
302
303 5
        if ($prioritize) {
304 5
            $qb->add('orderBy', 'j.priority ASC, j.whenAt ASC');
0 ignored issues
show
Documentation introduced by
'j.priority ASC, j.whenAt ASC' is of type string, but the function expects a object<Doctrine\ORM\Query\Expr\Base>.

It seems like the type of the argument is not accepted by the function/method which you are calling.

In some cases, in particular if PHP’s automatic type-juggling kicks in this might be fine. In other cases, however this might be a bug.

We suggest to add an explicit type cast like in the following example:

function acceptsInteger($int) { }

$x = '123'; // string "123"

// Instead of
acceptsInteger($x);

// we recommend to use
acceptsInteger((integer) $x);
Loading history...
305
        } else {
306
            $qb->orderBy('j.whenAt', 'ASC');
307
        }
308 5
        $qb->setMaxResults(1);
309
310
        /** @var QueryBuilder $qb */
311 5
        $query = $qb->getQuery();
312 5
        $query->setLockMode(LockMode::PESSIMISTIC_WRITE);
313 5
        $jobs = $query->getResult();
314
315 5
        if ($jobs) {
316
            /** @var Job $job */
317 4
            $job = $jobs[0];
318 4
            if (!$job) {
319
                throw new \Exception("No job found for $hash, even though last result was count ".count($jobs));
320
            }
321 4
            $job->setLocked(true);
322 4
            $job->setLockedAt(new \DateTime());
323 4
            $job->setStatus(BaseJob::STATUS_RUNNING);
324 4
            $job->setRunId($runId);
325 4
            $objectManager->commit();
326 4
            $objectManager->flush();
327
328 4
            return $job;
329
        }
330
331 2
        $objectManager->rollback();
332
333 2
        return null;
334
    }
335
}
336