Completed
Push — master ( d4f4f0...01653d )
by Matthew
04:03
created

JobManager   A

Complexity

Total Complexity 29

Size/Duplication

Total Lines 300
Duplicated Lines 0 %

Coupling/Cohesion

Components 1
Dependencies 8

Test Coverage

Coverage 90.12%

Importance

Changes 0
Metric Value
wmc 29
lcom 1
cbo 8
dl 0
loc 300
ccs 146
cts 162
cp 0.9012
rs 10
c 0
b 0
f 0

10 Methods

Rating   Name   Duplication   Size   Complexity  
B countJobsByStatus() 0 30 4
A pruneErroneousJobs() 0 13 1
B resetSaveOk() 0 23 4
A addWorkerNameCriterion() 0 10 3
A updateExpired() 0 17 1
A pruneArchivedJobs() 0 10 1
B getJobCount() 0 45 4
A getStatus() 0 20 4
A getStatusByEntityName() 0 22 3
B getJob() 0 63 4
1
<?php
2
3
namespace Dtc\QueueBundle\ORM;
4
5
use Doctrine\DBAL\LockMode;
6
use Doctrine\ORM\EntityManager;
7
use Doctrine\ORM\EntityRepository;
8
use Doctrine\ORM\QueryBuilder;
9
use Dtc\QueueBundle\Doctrine\BaseJobManager;
10
use Dtc\QueueBundle\Entity\Job;
11
use Dtc\QueueBundle\Model\BaseJob;
12
use Dtc\QueueBundle\Model\RetryableJob;
13
14
class JobManager extends BaseJobManager
15
{
16
    use CommonTrait;
17
    protected $formerIdGenerators;
18
    protected static $saveInsertCalled = null;
19
    protected static $resetInsertCalled = null;
20
21
    public function countJobsByStatus($objectName, $status, $workerName = null, $method = null)
22
    {
23 9
        /** @var EntityManager $objectManager */
24
        $objectManager = $this->getObjectManager();
25 9
26 9
        $qb = $objectManager
27
            ->createQueryBuilder()
28 9
            ->select('count(a.id)')
29 9
            ->from($objectName, 'a')
30 9
            ->where('a.status = :status');
31 9
32 9
        if (null !== $workerName) {
33 9
            $qb->andWhere('a.workerName = :workerName')
34
                ->setParameter(':workerName', $workerName);
35 1
        }
36
37 1
        if (null !== $method) {
38 1
            $qb->andWhere('a.method = :method')
39
                ->setParameter(':method', $workerName);
40 1
        }
41 1
42 1
        $count = $qb->setParameter(':status', $status)
43 1
            ->getQuery()->getSingleScalarResult();
44 1
45 1
        if (!$count) {
46
            return 0;
47 3
        }
48
49
        return $count;
50 3
    }
51
52
    /**
53 3
     * @param string|null $workerName
54 3
     * @param string|null $method
55 3
     *
56 3
     * @return int Count of jobs pruned
57
     */
58 3
    public function pruneErroneousJobs($workerName = null, $method = null)
59 1
    {
60 1
        /** @var EntityManager $objectManager */
61
        $objectManager = $this->getObjectManager();
62
        $qb = $objectManager->createQueryBuilder()->delete($this->getArchiveObjectName(), 'j');
63 3
        $qb->where('j.status = :status')
64 1
            ->setParameter(':status', BaseJob::STATUS_ERROR);
65 1
66
        $this->addWorkerNameCriterion($qb, $workerName, $method);
67
        $query = $qb->getQuery();
68 3
69 3
        return intval($query->execute());
70
    }
71 3
72 1
    protected function resetSaveOk($function)
73
    {
74
        $objectManager = $this->getObjectManager();
75 3
        $splObjectHash = spl_object_hash($objectManager);
76
77
        if ('save' === $function) {
78
            $compare = static::$resetInsertCalled;
79
        } else {
80
            $compare = static::$saveInsertCalled;
81
        }
82
83
        if ($splObjectHash === $compare) {
84 1
            // Insert SQL is cached...
85
            $msg = "Can't call save and reset within the same process cycle (or using the same EntityManager)";
86
            throw new \Exception($msg);
87 1
        }
88 1
89 1
        if ('save' === $function) {
90 1
            static::$saveInsertCalled = spl_object_hash($objectManager);
91
        } else {
92 1
            static::$resetInsertCalled = spl_object_hash($objectManager);
93 1
        }
94
    }
95 1
96
    /**
97
     * @param string $workerName
98 13
     * @param string $method
99
     */
100 13
    protected function addWorkerNameCriterion(QueryBuilder $queryBuilder, $workerName = null, $method = null)
101 13
    {
102
        if (null !== $workerName) {
103 13
            $queryBuilder->andWhere('j.workerName = :workerName')->setParameter(':workerName', $workerName);
104
        }
105
106 13
        if (null !== $method) {
107
            $queryBuilder->andWhere('j.method = :method')->setParameter(':method', $method);
108
        }
109 13
    }
110
111
    protected function updateExpired($workerName = null, $method = null)
112
    {
113
        /** @var EntityManager $objectManager */
114
        $objectManager = $this->getObjectManager();
115 13
        $qb = $objectManager->createQueryBuilder()->update($this->getObjectName(), 'j');
116
        $qb->set('j.status', ':newStatus');
117
        $qb->where('j.expiresAt <= :expiresAt')
118 13
            ->setParameter(':expiresAt', new \DateTime());
119
        $qb->andWhere('j.status = :status')
120 13
            ->setParameter(':status', BaseJob::STATUS_NEW)
121
            ->setParameter(':newStatus', Job::STATUS_EXPIRED);
122
123
        $this->addWorkerNameCriterion($qb, $workerName, $method);
124
        $query = $qb->getQuery();
125
126 8
        return intval($query->execute());
127
    }
128 8
129 3
    /**
130
     * Removes archived jobs older than $olderThan.
131
     *
132 8
     * @param \DateTime $olderThan
133 2
     */
134
    public function pruneArchivedJobs(\DateTime $olderThan)
135 8
    {
136
        /** @var EntityManager $entityManager */
137 1
        $entityManager = $this->getObjectManager();
138
139
        return $this->removeOlderThan($entityManager,
140 1
                $this->getArchiveObjectName(),
141 1
                'updatedAt',
142 1
                $olderThan);
143 1
    }
144 1
145 1
    public function getJobCount($workerName = null, $method = null)
146 1
    {
147 1
        /** @var EntityManager $objectManager */
148
        $objectManager = $this->getObjectManager();
149 1
        $qb = $objectManager->createQueryBuilder();
150 1
151
        $qb = $qb->select('count(j)')->from($this->getObjectName(), 'j');
152 1
153
        $where = 'where';
154
        if (null !== $workerName) {
155
            if (null !== $method) {
156
                $qb->where($qb->expr()->andX(
157
                    $qb->expr()->eq('j.workerName', ':workerName'),
158
                                                $qb->expr()->eq('j.method', ':method')
159
                ))
160 1
                    ->setParameter(':method', $method);
161
            } else {
162
                $qb->where('j.workerName = :workerName');
163 1
            }
164
            $qb->setParameter(':workerName', $workerName);
165 1
            $where = 'andWhere';
166 1
        } elseif (null !== $method) {
167 1
            $qb->where('j.method = :method')->setParameter(':method', $method);
168 1
            $where = 'andWhere';
169
        }
170
171 2
        $dateTime = new \DateTime();
172
        // Filter
173
        $qb
174 2
            ->$where($qb->expr()->orX(
175 2
                $qb->expr()->isNull('j.whenAt'),
176
                                        $qb->expr()->lte('j.whenAt', ':whenAt')
177 2
            ))
178
            ->andWhere($qb->expr()->orX(
179 2
                $qb->expr()->isNull('j.expiresAt'),
180 2
                $qb->expr()->gt('j.expiresAt', ':expiresAt')
181
            ))
182
            ->andWhere('j.locked is NULL')
183
            ->setParameter(':whenAt', $dateTime)
184
            ->setParameter(':expiresAt', $dateTime);
185
186
        $query = $qb->getQuery();
187
188
        return $query->getSingleScalarResult();
189
    }
190
191
    /**
192 2
     * Get Jobs statuses.
193
     */
194
    public function getStatus()
195
    {
196
        $result = [];
197 2
        $this->getStatusByEntityName($this->getObjectName(), $result);
198
        $this->getStatusByEntityName($this->getArchiveObjectName(), $result);
199
200 2
        $finalResult = [];
201 2
        foreach ($result as $key => $item) {
202 2
            ksort($item);
203
            foreach ($item as $status => $count) {
204 2
                if (isset($finalResult[$key][$status])) {
205 2
                    $finalResult[$key][$status] += $count;
206 2
                } else {
207
                    $finalResult[$key][$status] = $count;
208 2
                }
209 2
            }
210 2
        }
211
212 2
        return $finalResult;
213
    }
214 2
215
    /**
216
     * @param string $entityName
217
     */
218
    protected function getStatusByEntityName($entityName, array &$result)
219
    {
220 2
        /** @var EntityManager $objectManager */
221
        $objectManager = $this->getObjectManager();
222 2
        $result1 = $objectManager->getRepository($entityName)->createQueryBuilder('j')->select('j.workerName, j.method, j.status, count(j) as c')
223 2
            ->groupBy('j.workerName, j.method, j.status')->getQuery()->getArrayResult();
224 2
225
        foreach ($result1 as $item) {
226 2
            $method = $item['workerName'].'->'.$item['method'].'()';
227 2
            if (!isset($result[$method])) {
228 1
                $result[$method] = [BaseJob::STATUS_NEW => 0,
229 1
                    BaseJob::STATUS_RUNNING => 0,
230 1
                    RetryableJob::STATUS_EXPIRED => 0,
231
                    RetryableJob::STATUS_MAX_ERROR => 0,
232
                    RetryableJob::STATUS_MAX_STALLED => 0,
233 1
                    RetryableJob::STATUS_MAX_RETRIES => 0,
234
                    BaseJob::STATUS_SUCCESS => 0,
235
                    BaseJob::STATUS_ERROR => 0, ];
236
            }
237
            $result[$method][$item['status']] += intval($item['c']);
238 2
        }
239
    }
240
241
    /**
242
     * Get the next job to run (can be filtered by workername and method name).
243
     *
244 2
     * @param string $workerName
245
     * @param string $methodName
246
     * @param bool   $prioritize
247 2
     *
248 2
     * @return Job|null
249 2
     */
250
    public function getJob($workerName = null, $methodName = null, $prioritize = true, $runId = null)
251 2
    {
252 1
        $uniqid = uniqid(gethostname().'-'.getmypid(), true);
253 1
        $hash = hash('sha256', $uniqid);
254 1
255 1
        /** @var EntityManager $objectManager */
256 1
        $objectManager = $this->getObjectManager();
257 1
258 1
        $objectManager->beginTransaction();
259 1
260 1
        /** @var EntityRepository $repository */
261 1
        $repository = $this->getRepository();
262
        $qb = $repository->createQueryBuilder('j');
263 1
        $dateTime = new \DateTime();
264
        $qb
265 2
            ->select('j')
266
            ->where('j.status = :status')->setParameter(':status', BaseJob::STATUS_NEW)
267
            ->andWhere('j.locked is NULL')
268
            ->andWhere($qb->expr()->orX(
269
                $qb->expr()->isNull('j.whenAt'),
270
                        $qb->expr()->lte('j.whenAt', ':whenAt')
271
            ))
272
            ->andWhere($qb->expr()->orX(
273
                $qb->expr()->isNull('j.expiresAt'),
274
                        $qb->expr()->gt('j.expiresAt', ':expiresAt')
275
            ))
276 6
            ->setParameter(':whenAt', $dateTime)
277
            ->setParameter(':expiresAt', $dateTime);
278 6
279 6
        $this->addWorkerNameCriterion($qb, $workerName, $methodName);
280
281
        if ($prioritize) {
282 6
            $qb->add('orderBy', 'j.priority DESC, j.whenAt ASC');
0 ignored issues
show
Documentation introduced by
'j.priority DESC, j.whenAt ASC' is of type string, but the function expects a object<Doctrine\ORM\Query\Expr\Base>.

It seems like the type of the argument is not accepted by the function/method which you are calling.

In some cases, in particular if PHP’s automatic type-juggling kicks in this might be fine. In other cases, however this might be a bug.

We suggest to add an explicit type cast like in the following example:

function acceptsInteger($int) { }

$x = '123'; // string "123"

// Instead of
acceptsInteger($x);

// we recommend to use
acceptsInteger((integer) $x);
Loading history...
283
        } else {
284 6
            $qb->orderBy('j.whenAt', 'ASC');
285
        }
286
        $qb->setMaxResults(1);
287 6
288 6
        /** @var QueryBuilder $qb */
289 6
        $query = $qb->getQuery();
290
        $query->setLockMode(LockMode::PESSIMISTIC_WRITE);
291 6
        $jobs = $query->getResult();
292 6
293 6
        if ($jobs) {
294 6
            /** @var Job $job */
295 6
            $job = $jobs[0];
296 6
            if (!$job) {
297
                throw new \Exception("No job found for $hash, even though last result was count ".count($jobs));
298 6
            }
299 6
            $job->setLocked(true);
300 6
            $job->setLockedAt(new \DateTime());
301
            $job->setStatus(BaseJob::STATUS_RUNNING);
302 6
            $job->setRunId($runId);
303 6
            $objectManager->commit();
304
            $objectManager->flush();
305 6
306
            return $job;
307 6
        }
308 6
309
        $objectManager->rollback();
310
311
        return null;
312 6
    }
313
}
314