Completed
Push — master ( 203a9c...2f9634 )
by Matthew
05:23 queued 25s
created

JobManager::getJobCount()   B

Complexity

Conditions 4
Paths 4

Size

Total Lines 45
Code Lines 31

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 19
CRAP Score 4.6558

Importance

Changes 0
Metric Value
dl 0
loc 45
ccs 19
cts 29
cp 0.6552
rs 8.5806
c 0
b 0
f 0
cc 4
eloc 31
nc 4
nop 2
crap 4.6558
1
<?php
2
3
namespace Dtc\QueueBundle\ORM;
4
5
use Doctrine\DBAL\LockMode;
6
use Doctrine\ORM\EntityManager;
7
use Doctrine\ORM\EntityRepository;
8
use Doctrine\ORM\Id\AssignedGenerator;
9
use Doctrine\ORM\Mapping\ClassMetadata;
10
use Doctrine\ORM\QueryBuilder;
11
use Dtc\QueueBundle\Doctrine\BaseJobManager;
12
use Dtc\QueueBundle\Entity\Job;
13
use Dtc\QueueBundle\Model\BaseJob;
14
15
class JobManager extends BaseJobManager
16
{
17
    protected $formerIdGenerators;
18
19 8
    public function stopIdGenerator($objectName)
20
    {
21 8
        $objectManager = $this->getObjectManager();
22 8
        $repository = $objectManager->getRepository($objectName);
23
        /** @var ClassMetadata $metadata */
24 8
        $metadata = $objectManager->getClassMetadata($repository->getClassName());
25 8
        $this->formerIdGenerators[$objectName]['generator'] = $metadata->idGenerator;
26 8
        $this->formerIdGenerators[$objectName]['type'] = $metadata->generatorType;
27 8
        $metadata->setIdGeneratorType(ClassMetadata::GENERATOR_TYPE_NONE);
28 8
        $metadata->setIdGenerator(new AssignedGenerator());
29 8
    }
30
31 1
    public function restoreIdGenerator($objectName)
32
    {
33 1
        $objectManager = $this->getObjectManager();
34 1
        $repository = $objectManager->getRepository($objectName);
35
        /** @var ClassMetadata $metadata */
36 1
        $metadata = $objectManager->getClassMetadata($repository->getClassName());
37 1
        $generator = $this->formerIdGenerators[$objectName]['generator'];
38 1
        $type = $this->formerIdGenerators[$objectName]['type'];
39 1
        $metadata->setIdGeneratorType($type);
40 1
        $metadata->setIdGenerator($generator);
41 1
    }
42
43 3
    public function countJobsByStatus($objectName, $status, $workerName = null, $method = null)
44
    {
45
        /** @var EntityManager $objectManager */
46 3
        $objectManager = $this->getObjectManager();
47
48
        $qb = $objectManager
49 3
            ->createQueryBuilder()
50 3
            ->select('count(a.id)')
51 3
            ->from($objectName, 'a')
52 3
            ->where('a.status = :status');
53
54 3
        if (null !== $workerName) {
55 1
            $qb->andWhere('a.workerName = :workerName')
56 1
                ->setParameter(':workerName', $workerName);
57
        }
58
59 3
        if (null !== $method) {
60 1
            $qb->andWhere('a.method = :method')
61 1
                ->setParameter(':method', $workerName);
62
        }
63
64 3
        $count = $qb->setParameter(':status', $status)
65 3
            ->getQuery()->getSingleScalarResult();
66
67 3
        if (!$count) {
68 1
            return 0;
69
        }
70
71 3
        return $count;
72
    }
73
74
    /**
75
     * @param string|null $workerName
76
     * @param string|null $method
77
     *
78
     * @return int Count of jobs pruned
79
     */
80
    public function pruneErroneousJobs($workerName = null, $method = null)
81
    {
82 1
        return $this->pruneJobs($workerName, $method, $this->getArchiveObjectName(), function ($qb) {
83
            /* @var QueryBuilder $qb */
84 1
            $qb->where('j.status = :status')
85 1
                ->setParameter(':status', BaseJob::STATUS_ERROR);
86 1
        });
87
    }
88
89
    /**
90
     * Prunes jobs according to a condition function.
91
     *
92
     * @param string|null $workerName
93
     * @param string|null $method
94
     * @param $conditionFunc
95
     *
96
     * @return int Count of jobs pruned
97
     */
98 1 View Code Duplication
    protected function pruneJobs($workerName = null, $method = null, $objectName, $conditionFunc)
0 ignored issues
show
Duplication introduced by
This method seems to be duplicated in your project.

Duplicated code is one of the most pungent code smells. If you need to duplicate the same code in three or more different places, we strongly encourage you to look into extracting the code into a single class or operation.

You can also find more detailed suggestions in the “Code” section of your repository.

Loading history...
99
    {
100
        /** @var EntityManager $objectManager */
101 1
        $objectManager = $this->getObjectManager();
102 1
        $qb = $objectManager->createQueryBuilder()->delete($objectName, 'j');
103 1
        $conditionFunc($qb);
104
105 1
        $this->addWorkerNameCriterion($qb, $workerName, $method);
106 1
        $query = $qb->getQuery();
107
108 1
        return intval($query->execute());
109
    }
110
111 7
    protected function addWorkerNameCriterion(QueryBuilder $queryBuilder, $workerName = null, $method = null)
112
    {
113 7
        if (null !== $workerName) {
114 3
            $queryBuilder->andWhere('j.workerName = :workerName')->setParameter(':workerName', $workerName);
115
        }
116
117 7
        if (null !== $method) {
118 2
            $queryBuilder->andWhere('j.method = :method')->setParameter(':method', $method);
119
        }
120 7
    }
121
122 1
    protected function updateExpired($workerName = null, $method = null)
123
    {
124
        /** @var EntityManager $objectManager */
125 1
        $objectManager = $this->getObjectManager();
126 1
        $qb = $objectManager->createQueryBuilder()->update($this->getObjectName(), 'j');
127 1
        $qb->set('j.status', ':newStatus');
128 1
        $qb->where('j.expiresAt <= :expiresAt')
129 1
            ->setParameter(':expiresAt', new \DateTime());
130 1
        $qb->andWhere('j.status = :status')
131 1
            ->setParameter(':status', BaseJob::STATUS_NEW)
132 1
            ->setParameter(':newStatus', Job::STATUS_EXPIRED);
133
134 1
        $this->addWorkerNameCriterion($qb, $workerName, $method);
135 1
        $query = $qb->getQuery();
136
137 1
        return intval($query->execute());
138
    }
139
140
    /**
141
     * Removes archived jobs older than $olderThan.
142
     *
143
     * @param \DateTime $olderThan
144
     */
145 1
    public function pruneArchivedJobs(\DateTime $olderThan)
146
    {
147
        /** @var EntityManager $objectManager */
148 1
        $objectManager = $this->getObjectManager();
149 1
        $qb = $objectManager->createQueryBuilder()->delete($this->getArchiveObjectName(), 'j');
150
        $qb = $qb
151 1
            ->where('j.updatedAt < :updatedAt')
152 1
            ->setParameter(':updatedAt', $olderThan);
153
154 1
        $query = $qb->getQuery();
155
156 1
        return $query->execute();
157
    }
158
159 1
    public function getJobCount($workerName = null, $method = null)
160
    {
161
        /** @var EntityManager $objectManager */
162 1
        $objectManager = $this->getObjectManager();
163 1
        $qb = $objectManager->createQueryBuilder();
164
165 1
        $qb = $qb->select('count(j)')->from($this->getObjectName(), 'j');
166
167 1
        $where = 'where';
168 1
        if (null !== $workerName) {
169
            if (null !== $method) {
170
                $qb->where($qb->expr()->andX(
171
                    $qb->expr()->eq('j.workerName', ':workerName'),
172
                                                $qb->expr()->eq('j.method', ':method')
173
                ))
174
                    ->setParameter(':method', $method);
175
            } else {
176
                $qb->where('j.workerName = :workerName');
177
            }
178
            $qb->setParameter(':workerName', $workerName);
179
            $where = 'andWhere';
180 1
        } elseif (null !== $method) {
181
            $qb->where('j.method = :method')->setParameter(':method', $method);
182
            $where = 'andWhere';
183
        }
184
185 1
        $dateTime = new \DateTime();
186
        // Filter
187
        $qb
188 1
            ->$where($qb->expr()->orX(
189 1
                $qb->expr()->isNull('j.whenAt'),
190 1
                                        $qb->expr()->lte('j.whenAt', ':whenAt')
191
            ))
192 1
            ->andWhere($qb->expr()->orX(
193 1
                $qb->expr()->isNull('j.expiresAt'),
194 1
                $qb->expr()->gt('j.expiresAt', ':expiresAt')
195
            ))
196 1
            ->andWhere('j.locked is NULL')
197 1
            ->setParameter(':whenAt', $dateTime)
198 1
            ->setParameter(':expiresAt', $dateTime);
199
200 1
        $query = $qb->getQuery();
201
202 1
        return $query->getSingleScalarResult();
203
    }
204
205
    /**
206
     * Get Jobs statuses.
207
     */
208
    public function getStatus()
209
    {
210
        $result = [];
211
        $this->getStatusByEntityName($this->getObjectName(), $result);
212
        $this->getStatusByEntityName($this->getObjectName(), $result);
213
214
        $finalResult = [];
215
        foreach ($result as $key => $item) {
216
            ksort($item);
217
            $finalResult[$key] = $item;
218
        }
219
220
        return $finalResult;
221
    }
222
223
    /**
224
     * @param string $entityName
225
     */
226
    protected function getStatusByEntityName($entityName, array &$result)
227
    {
228
        /** @var EntityManager $objectManager */
229
        $objectManager = $this->getObjectManager();
230
        $result1 = $objectManager->getRepository($entityName)->createQueryBuilder('j')->select('j.workerName, j.method, j.status, count(j) as c')
231
            ->groupBy('j.workerName, j.method, j.status')->getQuery()->getArrayResult();
232
233
        foreach ($result1 as $item) {
234
            $method = $item['workerName'].'->'.$item['method'];
235
            if (!isset($result[$method])) {
236
                $result[$method] = [BaseJob::STATUS_NEW => 0,
237
                    BaseJob::STATUS_RUNNING => 0,
238
                    BaseJob::STATUS_SUCCESS => 0,
239
                    BaseJob::STATUS_ERROR => 0, ];
240
            }
241
            $result[$method][$item['status']] += intval($item['c']);
242
        }
243
    }
244
245
    /**
246
     * Get the next job to run (can be filtered by workername and method name).
247
     *
248
     * @param string $workerName
249
     * @param string $methodName
250
     * @param bool   $prioritize
251
     *
252
     * @return Job|null
253
     */
254 5
    public function getJob($workerName = null, $methodName = null, $prioritize = true, $runId = null)
255
    {
256 5
        $uniqid = uniqid(gethostname().'-'.getmypid(), true);
257 5
        $hash = hash('sha256', $uniqid);
258
259
        /** @var EntityManager $objectManager */
260 5
        $objectManager = $this->getObjectManager();
261
262 5
        $objectManager->beginTransaction();
263
264
        /** @var EntityRepository $repository */
265 5
        $repository = $this->getRepository();
266 5
        $qb = $repository->createQueryBuilder('j');
267 5
        $dateTime = new \DateTime();
268
        $qb
269 5
            ->select('j')
270 5
            ->where('j.status = :status')->setParameter(':status', BaseJob::STATUS_NEW)
271 5
            ->andWhere('j.locked is NULL')
272 5
            ->andWhere($qb->expr()->orX(
273 5
                $qb->expr()->isNull('j.whenAt'),
274 5
                        $qb->expr()->lte('j.whenAt', ':whenAt')
275
            ))
276 5
            ->andWhere($qb->expr()->orX(
277 5
                $qb->expr()->isNull('j.expiresAt'),
278 5
                        $qb->expr()->gt('j.expiresAt', ':expiresAt')
279
            ))
280 5
            ->setParameter(':whenAt', $dateTime)
281 5
            ->setParameter(':expiresAt', $dateTime);
282
283 5
        $this->addWorkerNameCriterion($qb, $workerName, $methodName);
284
285 5
        if ($prioritize) {
286 5
            $qb->add('orderBy', 'j.priority ASC, j.whenAt ASC');
0 ignored issues
show
Documentation introduced by
'j.priority ASC, j.whenAt ASC' is of type string, but the function expects a object<Doctrine\ORM\Query\Expr\Base>.

It seems like the type of the argument is not accepted by the function/method which you are calling.

In some cases, in particular if PHP’s automatic type-juggling kicks in this might be fine. In other cases, however this might be a bug.

We suggest to add an explicit type cast like in the following example:

function acceptsInteger($int) { }

$x = '123'; // string "123"

// Instead of
acceptsInteger($x);

// we recommend to use
acceptsInteger((integer) $x);
Loading history...
287
        } else {
288
            $qb->orderBy('j.whenAt', 'ASC');
289
        }
290 5
        $qb->setMaxResults(1);
291
292
        /** @var QueryBuilder $qb */
293 5
        $query = $qb->getQuery();
294 5
        $query->setLockMode(LockMode::PESSIMISTIC_WRITE);
295 5
        $jobs = $query->getResult();
296
297 5
        if ($jobs) {
298
            /** @var Job $job */
299 4
            $job = $jobs[0];
300 4
            if (!$job) {
301
                throw new \Exception("No job found for $hash, even though last result was count ".count($jobs));
302
            }
303 4
            $job->setLocked(true);
304 4
            $job->setLockedAt(new \DateTime());
305 4
            $job->setStatus(BaseJob::STATUS_RUNNING);
306 4
            $job->setRunId($runId);
307 4
            $objectManager->commit();
308 4
            $objectManager->flush();
309
310 4
            return $job;
311
        }
312
313 2
        $objectManager->rollback();
314
315 2
        return null;
316
    }
317
}
318