Completed
Branch master (45c65c)
by Matthew
02:05
created

JobManager   A

Complexity

Total Complexity 26

Size/Duplication

Total Lines 288
Duplicated Lines 0 %

Coupling/Cohesion

Components 1
Dependencies 10

Test Coverage

Coverage 67.61%

Importance

Changes 0
Metric Value
wmc 26
lcom 1
cbo 10
dl 0
loc 288
ccs 96
cts 142
cp 0.6761
rs 10
c 0
b 0
f 0

10 Methods

Rating   Name   Duplication   Size   Complexity  
A stopIdGenerator() 0 9 1
B countJobsByStatus() 0 30 4
A pruneErroneousJobs() 0 8 1
A pruneJobs() 0 19 3
A pruneExpiredJobs() 0 8 1
A pruneArchivedJobs() 0 13 1
B getJobCount() 0 45 4
A getStatus() 0 14 2
A getStatusByEntityName() 0 18 3
B getJob() 0 71 6
1
<?php
2
3
namespace Dtc\QueueBundle\ORM;
4
5
use Doctrine\DBAL\LockMode;
6
use Doctrine\ORM\EntityManager;
7
use Doctrine\ORM\EntityRepository;
8
use Doctrine\ORM\Id\AssignedGenerator;
9
use Doctrine\ORM\Mapping\ClassMetadata;
10
use Doctrine\ORM\QueryBuilder;
11
use Dtc\QueueBundle\Doctrine\BaseJobManager;
12
use Dtc\QueueBundle\Entity\Job;
13
use Dtc\QueueBundle\Model\BaseJob;
14
15
class JobManager extends BaseJobManager
16
{
17
    public function stopIdGenerator($objectName)
18
    {
19
        $objectManager = $this->getObjectManager();
20
        $repository = $objectManager->getRepository($objectName);
21
        /** @var ClassMetadata $metadata */
22
        $metadata = $objectManager->getClassMetadata($repository->getClassName());
23
        $metadata->setIdGeneratorType(ClassMetadata::GENERATOR_TYPE_NONE);
24
        $metadata->setIdGenerator(new AssignedGenerator());
25
    }
26
27 3
    public function countJobsByStatus($objectName, $status, $workerName = null, $method = null)
28
    {
29
        /** @var EntityManager $objectManager */
30 3
        $objectManager = $this->getObjectManager();
31
32
        $qb = $objectManager
33 3
            ->createQueryBuilder()
34 3
            ->select('count(a.id)')
35 3
            ->from($objectName, 'a')
36 3
            ->where('a.status = :status');
37
38 3
        if (null !== $workerName) {
39
            $qb->andWhere('a.workerName = :workerName')
40
                ->setParameter(':workerName', $workerName);
41
        }
42
43 3
        if (null !== $method) {
44
            $qb->andWhere('a.method = :method')
45
                ->setParameter(':method', $workerName);
46
        }
47
48 3
        $count = $qb->setParameter(':status', $status)
49 3
            ->getQuery()->getSingleScalarResult();
50
51 3
        if (!$count) {
52 1
            return 0;
53
        }
54
55 2
        return $count;
56
    }
57
58
    /**
59
     * @param string|null $workerName
60
     * @param string|null $method
61
     *
62
     * @return int Count of jobs pruned
63
     */
64
    public function pruneErroneousJobs($workerName = null, $method = null)
65
    {
66 1
        return $this->pruneJobs($workerName, $method, $this->getArchiveObjectName(), function ($qb) {
67
            /* @var QueryBuilder $qb */
68 1
            $qb->where('j.status = :status')
69 1
                ->setParameter(':status', BaseJob::STATUS_ERROR);
70 1
        });
71
    }
72
73
    /**
74
     * Prunes jobs according to a condition function.
75
     *
76
     * @param string|null $workerName
77
     * @param string|null $method
78
     * @param $conditionFunc
79
     *
80
     * @return int Count of jobs pruned
81
     */
82 2
    protected function pruneJobs($workerName = null, $method = null, $objectName, $conditionFunc)
83
    {
84
        /** @var EntityManager $objectManager */
85 2
        $objectManager = $this->getObjectManager();
86 2
        $qb = $objectManager->createQueryBuilder()->delete($objectName, 'j');
87 2
        $conditionFunc($qb);
88
89 2
        if (null !== $workerName) {
90
            $qb->andWhere('j.workerName = :workerName')->setParameter(':workerName', $workerName);
91
        }
92
93 2
        if (null !== $method) {
94
            $qb->andWhere('j.method = :method')->setParameter(':method', $method);
95
        }
96
97 2
        $query = $qb->getQuery();
98
99 2
        return intval($query->execute());
100
    }
101
102
    /**
103
     * @param string|null $workerName
104
     * @param string|null $method
105
     *
106
     * @return int Count of jobs pruned
107
     */
108
    public function pruneExpiredJobs($workerName = null, $method = null)
109
    {
110 1
        return $this->pruneJobs($workerName, $method, $this->getObjectName(), function ($qb) {
111
            /* @var QueryBuilder $qb */
112 1
            $qb->where('j.expiresAt <= :expiresAt')
113 1
                ->setParameter(':expiresAt', new \DateTime());
114 1
        });
115
    }
116
117
    /**
118
     * Removes archived jobs older than $olderThan.
119
     *
120
     * @param \DateTime $olderThan
121
     */
122 1
    public function pruneArchivedJobs(\DateTime $olderThan)
123
    {
124
        /** @var EntityManager $objectManager */
125 1
        $objectManager = $this->getObjectManager();
126 1
        $qb = $objectManager->createQueryBuilder()->delete($this->getArchiveObjectName(), 'j');
127
        $qb = $qb
128 1
            ->where('j.updatedAt < :updatedAt')
129 1
            ->setParameter(':updatedAt', $olderThan);
130
131 1
        $query = $qb->getQuery();
132
133 1
        return $query->execute();
134
    }
135
136 1
    public function getJobCount($workerName = null, $method = null)
137
    {
138
        /** @var EntityManager $objectManager */
139 1
        $objectManager = $this->getObjectManager();
140 1
        $qb = $objectManager->createQueryBuilder();
141
142 1
        $qb = $qb->select('count(j)')->from($this->getObjectName(), 'j');
143
144 1
        $where = 'where';
145 1
        if (null !== $workerName) {
146
            if (null !== $method) {
147
                $qb->where($qb->expr()->andX(
148
                    $qb->expr()->eq('j.workerName', ':workerName'),
149
                                                $qb->expr()->eq('j.method', ':method')
150
                ))
151
                    ->setParameter(':method', $method);
152
            } else {
153
                $qb->where('j.workerName = :workerName');
154
            }
155
            $qb->setParameter(':workerName', $workerName);
156
            $where = 'andWhere';
157 1
        } elseif (null !== $method) {
158
            $qb->where('j.method = :method')->setParameter(':method', $method);
159
            $where = 'andWhere';
160
        }
161
162 1
        $dateTime = new \DateTime();
163
        // Filter
164
        $qb
165 1
            ->$where($qb->expr()->orX(
166 1
                $qb->expr()->isNull('j.whenAt'),
167 1
                                        $qb->expr()->lte('j.whenAt', ':whenAt')
168
            ))
169 1
            ->andWhere($qb->expr()->orX(
170 1
                $qb->expr()->isNull('j.expiresAt'),
171 1
                $qb->expr()->gt('j.expiresAt', ':expiresAt')
172
            ))
173 1
            ->andWhere('j.locked is NULL')
174 1
            ->setParameter(':whenAt', $dateTime)
175 1
            ->setParameter(':expiresAt', $dateTime);
176
177 1
        $query = $qb->getQuery();
178
179 1
        return $query->getSingleScalarResult();
180
    }
181
182
    /**
183
     * Get Jobs statuses.
184
     */
185
    public function getStatus()
186
    {
187
        $result = [];
188
        $this->getStatusByEntityName($this->getObjectName(), $result);
189
        $this->getStatusByEntityName($this->getObjectName(), $result);
190
191
        $finalResult = [];
192
        foreach ($result as $key => $item) {
193
            ksort($item);
194
            $finalResult[$key] = $item;
195
        }
196
197
        return $finalResult;
198
    }
199
200
    /**
201
     * @param string $entityName
202
     */
203
    protected function getStatusByEntityName($entityName, array &$result)
204
    {
205
        /** @var EntityManager $objectManager */
206
        $objectManager = $this->getObjectManager();
207
        $result1 = $objectManager->getRepository($entityName)->createQueryBuilder('j')->select('j.workerName, j.method, j.status, count(j) as c')
208
            ->groupBy('j.workerName, j.method, j.status')->getQuery()->getArrayResult();
209
210
        foreach ($result1 as $item) {
211
            $method = $item['workerName'].'->'.$item['method'];
212
            if (!isset($result[$method])) {
213
                $result[$method] = [BaseJob::STATUS_NEW => 0,
214
                    BaseJob::STATUS_RUNNING => 0,
215
                    BaseJob::STATUS_SUCCESS => 0,
216
                    BaseJob::STATUS_ERROR => 0, ];
217
            }
218
            $result[$method][$item['status']] += intval($item['c']);
219
        }
220
    }
221
222
    /**
223
     * Get the next job to run (can be filtered by workername and method name).
224
     *
225
     * @param string $workerName
226
     * @param string $methodName
227
     * @param bool   $prioritize
228
     *
229
     * @return Job|null
230
     */
231 5
    public function getJob($workerName = null, $methodName = null, $prioritize = true, $runId = null)
232
    {
233 5
        $uniqid = uniqid(gethostname().'-'.getmypid(), true);
234 5
        $hash = hash('sha256', $uniqid);
235
236
        /** @var EntityManager $objectManager */
237 5
        $objectManager = $this->getObjectManager();
238
239 5
        $objectManager->beginTransaction();
240
241
        /** @var EntityRepository $repository */
242 5
        $repository = $this->getRepository();
243 5
        $qb = $repository->createQueryBuilder('j');
244 5
        $dateTime = new \DateTime();
245
        $qb
246 5
            ->select('j')
247 5
            ->where('j.status = :status')->setParameter(':status', BaseJob::STATUS_NEW)
248 5
            ->andWhere('j.locked is NULL')
249 5
            ->andWhere($qb->expr()->orX(
250 5
                $qb->expr()->isNull('j.whenAt'),
251 5
                        $qb->expr()->lte('j.whenAt', ':whenAt')
252
            ))
253 5
            ->andWhere($qb->expr()->orX(
254 5
                $qb->expr()->isNull('j.expiresAt'),
255 5
                        $qb->expr()->gt('j.expiresAt', ':expiresAt')
256
            ))
257 5
            ->setParameter(':whenAt', $dateTime)
258 5
            ->setParameter(':expiresAt', $dateTime);
259
260 5
        if (null !== $workerName) {
261 1
            $qb->andWhere('j.workerName = :workerName')
262 1
                ->setParameter(':workerName', $workerName);
263
        }
264
265 5
        if (null !== $methodName) {
266
            $qb->andWhere('j.method = :method')
267
                ->setParameter(':method', $methodName);
268
        }
269
270 5
        if ($prioritize) {
271 5
            $qb->add('orderBy', 'j.priority ASC, j.whenAt ASC');
0 ignored issues
show
Documentation introduced by
'j.priority ASC, j.whenAt ASC' is of type string, but the function expects a object<Doctrine\ORM\Query\Expr\Base>.

It seems like the type of the argument is not accepted by the function/method which you are calling.

In some cases, in particular if PHP’s automatic type-juggling kicks in this might be fine. In other cases, however this might be a bug.

We suggest to add an explicit type cast like in the following example:

function acceptsInteger($int) { }

$x = '123'; // string "123"

// Instead of
acceptsInteger($x);

// we recommend to use
acceptsInteger((integer) $x);
Loading history...
272
        } else {
273
            $qb->orderBy('j.whenAt', 'ASC');
274
        }
275 5
        $qb->setMaxResults(1);
276
277
        /** @var QueryBuilder $qb */
278 5
        $query = $qb->getQuery();
279 5
        $query->setLockMode(LockMode::PESSIMISTIC_WRITE);
280 5
        $jobs = $query->getResult();
281
282 5
        if ($jobs) {
283
            /** @var Job $job */
284 4
            $job = $jobs[0];
285 4
            if (!$job) {
286
                throw new \Exception("No job found for $hash, even though last result was count ".count($jobs));
287
            }
288 4
            $job->setLocked(true);
289 4
            $job->setLockedAt(new \DateTime());
290 4
            $job->setStatus(BaseJob::STATUS_RUNNING);
291 4
            $job->setRunId($runId);
292 4
            $objectManager->commit();
293 4
            $objectManager->flush();
294
295 4
            return $job;
296
        }
297
298 2
        $objectManager->rollback();
299
300 2
        return null;
301
    }
302
}
303