Completed
Push — master ( ed304f...e36e8e )
by Matthew
05:35
created

JobManager   A

Complexity

Total Complexity 29

Size/Duplication

Total Lines 312
Duplicated Lines 0 %

Coupling/Cohesion

Components 1
Dependencies 10

Test Coverage

Coverage 79.07%

Importance

Changes 0
Metric Value
wmc 29
lcom 1
cbo 10
dl 0
loc 312
ccs 136
cts 172
cp 0.7907
rs 10
c 0
b 0
f 0

12 Methods

Rating   Name   Duplication   Size   Complexity  
A stopIdGenerator() 0 11 1
A restoreIdGenerator() 0 11 1
B countJobsByStatus() 0 30 4
A pruneErroneousJobs() 0 13 1
B resetSaveOk() 0 23 4
A addWorkerNameCriterion() 0 10 3
A updateExpired() 0 17 1
A pruneArchivedJobs() 0 13 1
B getJobCount() 0 45 4
A getStatus() 0 14 2
A getStatusByEntityName() 0 18 3
B getJob() 0 63 4
1
<?php
2
3
namespace Dtc\QueueBundle\ORM;
4
5
use Doctrine\DBAL\LockMode;
6
use Doctrine\ORM\EntityManager;
7
use Doctrine\ORM\EntityRepository;
8
use Doctrine\ORM\Id\AssignedGenerator;
9
use Doctrine\ORM\Mapping\ClassMetadata;
10
use Doctrine\ORM\QueryBuilder;
11
use Dtc\QueueBundle\Doctrine\BaseJobManager;
12
use Dtc\QueueBundle\Entity\Job;
13
use Dtc\QueueBundle\Model\BaseJob;
14
15
class JobManager extends BaseJobManager
16
{
17
    protected $formerIdGenerators;
18
    protected static $saveInsertCalled = null;
19
    protected static $resetInsertCalled = null;
20
21 8
    public function stopIdGenerator($objectName)
22
    {
23 8
        $objectManager = $this->getObjectManager();
24 8
        $repository = $objectManager->getRepository($objectName);
25
        /** @var ClassMetadata $metadata */
26 8
        $metadata = $objectManager->getClassMetadata($repository->getClassName());
27 8
        $this->formerIdGenerators[$objectName]['generator'] = $metadata->idGenerator;
28 8
        $this->formerIdGenerators[$objectName]['type'] = $metadata->generatorType;
29 8
        $metadata->setIdGeneratorType(ClassMetadata::GENERATOR_TYPE_NONE);
30 8
        $metadata->setIdGenerator(new AssignedGenerator());
31 8
    }
32
33 1
    public function restoreIdGenerator($objectName)
34
    {
35 1
        $objectManager = $this->getObjectManager();
36 1
        $repository = $objectManager->getRepository($objectName);
37
        /** @var ClassMetadata $metadata */
38 1
        $metadata = $objectManager->getClassMetadata($repository->getClassName());
39 1
        $generator = $this->formerIdGenerators[$objectName]['generator'];
40 1
        $type = $this->formerIdGenerators[$objectName]['type'];
41 1
        $metadata->setIdGeneratorType($type);
42 1
        $metadata->setIdGenerator($generator);
43 1
    }
44
45 3
    public function countJobsByStatus($objectName, $status, $workerName = null, $method = null)
46
    {
47
        /** @var EntityManager $objectManager */
48 3
        $objectManager = $this->getObjectManager();
49
50
        $qb = $objectManager
51 3
            ->createQueryBuilder()
52 3
            ->select('count(a.id)')
53 3
            ->from($objectName, 'a')
54 3
            ->where('a.status = :status');
55
56 3
        if (null !== $workerName) {
57 1
            $qb->andWhere('a.workerName = :workerName')
58 1
                ->setParameter(':workerName', $workerName);
59
        }
60
61 3
        if (null !== $method) {
62 1
            $qb->andWhere('a.method = :method')
63 1
                ->setParameter(':method', $workerName);
64
        }
65
66 3
        $count = $qb->setParameter(':status', $status)
67 3
            ->getQuery()->getSingleScalarResult();
68
69 3
        if (!$count) {
70 1
            return 0;
71
        }
72
73 3
        return $count;
74
    }
75
76
    /**
77
     * @param string|null $workerName
78
     * @param string|null $method
79
     *
80
     * @return int Count of jobs pruned
81
     */
82 1
    public function pruneErroneousJobs($workerName = null, $method = null)
83
    {
84
        /** @var EntityManager $objectManager */
85 1
        $objectManager = $this->getObjectManager();
86 1
        $qb = $objectManager->createQueryBuilder()->delete($this->getArchiveObjectName(), 'j');
87 1
        $qb->where('j.status = :status')
88 1
            ->setParameter(':status', BaseJob::STATUS_ERROR);
89
90 1
        $this->addWorkerNameCriterion($qb, $workerName, $method);
91 1
        $query = $qb->getQuery();
92
93 1
        return intval($query->execute());
94
    }
95
96 11
    protected function resetSaveOk($function)
97
    {
98 11
        $objectManager = $this->getObjectManager();
99 11
        $splObjectHash = spl_object_hash($objectManager);
100
101 11
        if ('save' === $function) {
102 11
            $compare = static::$resetInsertCalled;
103
        } else {
104 1
            $compare = static::$saveInsertCalled;
105
        }
106
107 11
        if ($splObjectHash === $compare) {
108
            // Insert SQL is cached...
109
            $msg = "Can't call save and reset within the same process cycle (or using the same EntityManager)";
110
            throw new \Exception($msg);
111
        }
112
113 11
        if ('save' === $function) {
114 11
            static::$saveInsertCalled = spl_object_hash($objectManager);
115
        } else {
116 1
            static::$resetInsertCalled = spl_object_hash($objectManager);
117
        }
118 11
    }
119
120 7
    protected function addWorkerNameCriterion(QueryBuilder $queryBuilder, $workerName = null, $method = null)
121
    {
122 7
        if (null !== $workerName) {
123 3
            $queryBuilder->andWhere('j.workerName = :workerName')->setParameter(':workerName', $workerName);
124
        }
125
126 7
        if (null !== $method) {
127 2
            $queryBuilder->andWhere('j.method = :method')->setParameter(':method', $method);
128
        }
129 7
    }
130
131 1
    protected function updateExpired($workerName = null, $method = null)
132
    {
133
        /** @var EntityManager $objectManager */
134 1
        $objectManager = $this->getObjectManager();
135 1
        $qb = $objectManager->createQueryBuilder()->update($this->getObjectName(), 'j');
136 1
        $qb->set('j.status', ':newStatus');
137 1
        $qb->where('j.expiresAt <= :expiresAt')
138 1
            ->setParameter(':expiresAt', new \DateTime());
139 1
        $qb->andWhere('j.status = :status')
140 1
            ->setParameter(':status', BaseJob::STATUS_NEW)
141 1
            ->setParameter(':newStatus', Job::STATUS_EXPIRED);
142
143 1
        $this->addWorkerNameCriterion($qb, $workerName, $method);
144 1
        $query = $qb->getQuery();
145
146 1
        return intval($query->execute());
147
    }
148
149
    /**
150
     * Removes archived jobs older than $olderThan.
151
     *
152
     * @param \DateTime $olderThan
153
     */
154 1
    public function pruneArchivedJobs(\DateTime $olderThan)
155
    {
156
        /** @var EntityManager $objectManager */
157 1
        $objectManager = $this->getObjectManager();
158 1
        $qb = $objectManager->createQueryBuilder()->delete($this->getArchiveObjectName(), 'j');
159
        $qb = $qb
160 1
            ->where('j.updatedAt < :updatedAt')
161 1
            ->setParameter(':updatedAt', $olderThan);
162
163 1
        $query = $qb->getQuery();
164
165 1
        return $query->execute();
166
    }
167
168 1
    public function getJobCount($workerName = null, $method = null)
169
    {
170
        /** @var EntityManager $objectManager */
171 1
        $objectManager = $this->getObjectManager();
172 1
        $qb = $objectManager->createQueryBuilder();
173
174 1
        $qb = $qb->select('count(j)')->from($this->getObjectName(), 'j');
175
176 1
        $where = 'where';
177 1
        if (null !== $workerName) {
178
            if (null !== $method) {
179
                $qb->where($qb->expr()->andX(
180
                    $qb->expr()->eq('j.workerName', ':workerName'),
181
                                                $qb->expr()->eq('j.method', ':method')
182
                ))
183
                    ->setParameter(':method', $method);
184
            } else {
185
                $qb->where('j.workerName = :workerName');
186
            }
187
            $qb->setParameter(':workerName', $workerName);
188
            $where = 'andWhere';
189 1
        } elseif (null !== $method) {
190
            $qb->where('j.method = :method')->setParameter(':method', $method);
191
            $where = 'andWhere';
192
        }
193
194 1
        $dateTime = new \DateTime();
195
        // Filter
196
        $qb
197 1
            ->$where($qb->expr()->orX(
198 1
                $qb->expr()->isNull('j.whenAt'),
199 1
                                        $qb->expr()->lte('j.whenAt', ':whenAt')
200
            ))
201 1
            ->andWhere($qb->expr()->orX(
202 1
                $qb->expr()->isNull('j.expiresAt'),
203 1
                $qb->expr()->gt('j.expiresAt', ':expiresAt')
204
            ))
205 1
            ->andWhere('j.locked is NULL')
206 1
            ->setParameter(':whenAt', $dateTime)
207 1
            ->setParameter(':expiresAt', $dateTime);
208
209 1
        $query = $qb->getQuery();
210
211 1
        return $query->getSingleScalarResult();
212
    }
213
214
    /**
215
     * Get Jobs statuses.
216
     */
217
    public function getStatus()
218
    {
219
        $result = [];
220
        $this->getStatusByEntityName($this->getObjectName(), $result);
221
        $this->getStatusByEntityName($this->getObjectName(), $result);
222
223
        $finalResult = [];
224
        foreach ($result as $key => $item) {
225
            ksort($item);
226
            $finalResult[$key] = $item;
227
        }
228
229
        return $finalResult;
230
    }
231
232
    /**
233
     * @param string $entityName
234
     */
235
    protected function getStatusByEntityName($entityName, array &$result)
236
    {
237
        /** @var EntityManager $objectManager */
238
        $objectManager = $this->getObjectManager();
239
        $result1 = $objectManager->getRepository($entityName)->createQueryBuilder('j')->select('j.workerName, j.method, j.status, count(j) as c')
240
            ->groupBy('j.workerName, j.method, j.status')->getQuery()->getArrayResult();
241
242
        foreach ($result1 as $item) {
243
            $method = $item['workerName'].'->'.$item['method'];
244
            if (!isset($result[$method])) {
245
                $result[$method] = [BaseJob::STATUS_NEW => 0,
246
                    BaseJob::STATUS_RUNNING => 0,
247
                    BaseJob::STATUS_SUCCESS => 0,
248
                    BaseJob::STATUS_ERROR => 0, ];
249
            }
250
            $result[$method][$item['status']] += intval($item['c']);
251
        }
252
    }
253
254
    /**
255
     * Get the next job to run (can be filtered by workername and method name).
256
     *
257
     * @param string $workerName
258
     * @param string $methodName
259
     * @param bool   $prioritize
260
     *
261
     * @return Job|null
262
     */
263 5
    public function getJob($workerName = null, $methodName = null, $prioritize = true, $runId = null)
264
    {
265 5
        $uniqid = uniqid(gethostname().'-'.getmypid(), true);
266 5
        $hash = hash('sha256', $uniqid);
267
268
        /** @var EntityManager $objectManager */
269 5
        $objectManager = $this->getObjectManager();
270
271 5
        $objectManager->beginTransaction();
272
273
        /** @var EntityRepository $repository */
274 5
        $repository = $this->getRepository();
275 5
        $qb = $repository->createQueryBuilder('j');
276 5
        $dateTime = new \DateTime();
277
        $qb
278 5
            ->select('j')
279 5
            ->where('j.status = :status')->setParameter(':status', BaseJob::STATUS_NEW)
280 5
            ->andWhere('j.locked is NULL')
281 5
            ->andWhere($qb->expr()->orX(
282 5
                $qb->expr()->isNull('j.whenAt'),
283 5
                        $qb->expr()->lte('j.whenAt', ':whenAt')
284
            ))
285 5
            ->andWhere($qb->expr()->orX(
286 5
                $qb->expr()->isNull('j.expiresAt'),
287 5
                        $qb->expr()->gt('j.expiresAt', ':expiresAt')
288
            ))
289 5
            ->setParameter(':whenAt', $dateTime)
290 5
            ->setParameter(':expiresAt', $dateTime);
291
292 5
        $this->addWorkerNameCriterion($qb, $workerName, $methodName);
293
294 5
        if ($prioritize) {
295 5
            $qb->add('orderBy', 'j.priority ASC, j.whenAt ASC');
0 ignored issues
show
Documentation introduced by
'j.priority ASC, j.whenAt ASC' is of type string, but the function expects a object<Doctrine\ORM\Query\Expr\Base>.

It seems like the type of the argument is not accepted by the function/method which you are calling.

In some cases, in particular if PHP’s automatic type-juggling kicks in this might be fine. In other cases, however this might be a bug.

We suggest to add an explicit type cast like in the following example:

function acceptsInteger($int) { }

$x = '123'; // string "123"

// Instead of
acceptsInteger($x);

// we recommend to use
acceptsInteger((integer) $x);
Loading history...
296
        } else {
297
            $qb->orderBy('j.whenAt', 'ASC');
298
        }
299 5
        $qb->setMaxResults(1);
300
301
        /** @var QueryBuilder $qb */
302 5
        $query = $qb->getQuery();
303 5
        $query->setLockMode(LockMode::PESSIMISTIC_WRITE);
304 5
        $jobs = $query->getResult();
305
306 5
        if ($jobs) {
307
            /** @var Job $job */
308 4
            $job = $jobs[0];
309 4
            if (!$job) {
310
                throw new \Exception("No job found for $hash, even though last result was count ".count($jobs));
311
            }
312 4
            $job->setLocked(true);
313 4
            $job->setLockedAt(new \DateTime());
314 4
            $job->setStatus(BaseJob::STATUS_RUNNING);
315 4
            $job->setRunId($runId);
316 4
            $objectManager->commit();
317 4
            $objectManager->flush();
318
319 4
            return $job;
320
        }
321
322 2
        $objectManager->rollback();
323
324 2
        return null;
325
    }
326
}
327