Test Setup Failed
Push — master ( a87626...531aa8 )
by Matthew
02:42
created

JobManager::pruneJobs()   A

Complexity

Conditions 3
Paths 4

Size

Total Lines 19
Code Lines 10

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
dl 0
loc 19
rs 9.4285
c 0
b 0
f 0
cc 3
eloc 10
nc 4
nop 4
1
<?php
2
3
namespace Dtc\QueueBundle\ORM;
4
5
use Doctrine\DBAL\LockMode;
6
use Doctrine\ORM\EntityManager;
7
use Doctrine\ORM\EntityRepository;
8
use Doctrine\ORM\Id\AssignedGenerator;
9
use Doctrine\ORM\Mapping\ClassMetadata;
10
use Doctrine\ORM\QueryBuilder;
11
use Dtc\QueueBundle\Doctrine\BaseJobManager;
12
use Dtc\QueueBundle\Entity\Job;
13
14
class JobManager extends BaseJobManager
15
{
16
    public function stopIdGenerator($objectName)
17
    {
18
        $objectManager = $this->getObjectManager();
19
        $repository = $objectManager->getRepository($objectName);
20
        /** @var ClassMetadata $metadata */
21
        $metadata = $objectManager->getClassMetadata($repository->getClassName());
22
        $metadata->setIdGeneratorType(ClassMetadata::GENERATOR_TYPE_NONE);
23
        $metadata->setIdGenerator(new AssignedGenerator());
24
    }
25
26
    public function countJobsByStatus($objectName, $status, $workerName = null, $method = null)
27
    {
28
        /** @var EntityManager $objectManager */
29
        $objectManager = $this->getObjectManager();
30
31
        $qb = $objectManager
32
            ->createQueryBuilder()
33
            ->select('count(a.id)')
34
            ->from($objectName, 'a')
35
            ->where('a.status = :status');
36
37
        if (null !== $workerName) {
38
            $qb->andWhere('a.workerName = :workerName')
39
                ->setParameter(':workerName', $workerName);
40
        }
41
42
        if (null !== $method) {
43
            $qb->andWhere('a.method = :method')
44
                ->setParameter(':method', $workerName);
45
        }
46
47
        $count = $qb->setParameter(':status', $status)
48
            ->getQuery()->getSingleScalarResult();
49
50
        if (!$count) {
51
            return 0;
52
        }
53
54
        return $count;
55
    }
56
57
    /**
58
     * @param string $workerName
59
     * @param string $method
60
     *
61
     * @return int Count of jobs pruned
62
     */
63
    public function pruneErroneousJobs($workerName = null, $method = null)
64
    {
65
        return $this->pruneJobs($workerName, $method, $this->getArchiveObjectName(), function ($qb) {
0 ignored issues
show
Bug introduced by
It seems like $workerName defined by parameter $workerName on line 63 can also be of type string; however, Dtc\QueueBundle\ORM\JobManager::pruneJobs() does only seem to accept null, maybe add an additional type check?

This check looks at variables that have been passed in as parameters and are passed out again to other methods.

If the outgoing method call has stricter type requirements than the method itself, an issue is raised.

An additional type check may prevent trouble.

Loading history...
Bug introduced by
It seems like $method defined by parameter $method on line 63 can also be of type string; however, Dtc\QueueBundle\ORM\JobManager::pruneJobs() does only seem to accept null, maybe add an additional type check?

This check looks at variables that have been passed in as parameters and are passed out again to other methods.

If the outgoing method call has stricter type requirements than the method itself, an issue is raised.

An additional type check may prevent trouble.

Loading history...
66
            /* @var QueryBuilder $qb */
67
            $qb->where('j.status = :status')
68
                ->setParameter(':status', Job::STATUS_ERROR);
69
        });
70
    }
71
72
    /**
73
     * Prunes jobs according to a condition function.
74
     *
75
     * @param null $workerName
76
     * @param null $method
77
     * @param $conditionFunc
78
     *
79
     * @return int Count of jobs pruned
80
     */
81
    protected function pruneJobs($workerName = null, $method = null, $objectName, $conditionFunc)
82
    {
83
        /** @var EntityManager $objectManager */
84
        $objectManager = $this->getObjectManager();
85
        $qb = $objectManager->createQueryBuilder()->delete($objectName, 'j');
86
        $conditionFunc($qb);
87
88
        if (null !== $workerName) {
89
            $qb->andWhere('j.workerName = :workerName')->setParameter(':workerName', $workerName);
90
        }
91
92
        if (null !== $method) {
93
            $qb->andWhere('j.method = :method')->setParameter(':method', $method);
94
        }
95
96
        $query = $qb->getQuery();
97
98
        return intval($query->execute());
99
    }
100
101
    /**
102
     * @param string $workerName
103
     * @param string $method
104
     *
105
     * @return int Count of jobs pruned
106
     */
107
    public function pruneExpiredJobs($workerName = null, $method = null)
108
    {
109
        return $this->pruneJobs($workerName, $method, $this->getObjectName(), function ($qb) {
0 ignored issues
show
Bug introduced by
It seems like $workerName defined by parameter $workerName on line 107 can also be of type string; however, Dtc\QueueBundle\ORM\JobManager::pruneJobs() does only seem to accept null, maybe add an additional type check?

This check looks at variables that have been passed in as parameters and are passed out again to other methods.

If the outgoing method call has stricter type requirements than the method itself, an issue is raised.

An additional type check may prevent trouble.

Loading history...
Bug introduced by
It seems like $method defined by parameter $method on line 107 can also be of type string; however, Dtc\QueueBundle\ORM\JobManager::pruneJobs() does only seem to accept null, maybe add an additional type check?

This check looks at variables that have been passed in as parameters and are passed out again to other methods.

If the outgoing method call has stricter type requirements than the method itself, an issue is raised.

An additional type check may prevent trouble.

Loading history...
110
            /* @var QueryBuilder $qb */
111
            $qb->where('j.expiresAt <= :expiresAt')
112
                ->setParameter(':expiresAt', new \DateTime());
113
        });
114
    }
115
116
    /**
117
     * Removes archived jobs older than $olderThan.
118
     *
119
     * @param \DateTime $olderThan
120
     */
121
    public function pruneArchivedJobs(\DateTime $olderThan)
122
    {
123
        /** @var EntityManager $objectManager */
124
        $objectManager = $this->getObjectManager();
125
        $qb = $objectManager->createQueryBuilder()->delete($this->getArchiveObjectName(), 'j');
126
        $qb = $qb
127
            ->where('j.updatedAt < :updatedAt')
128
            ->setParameter(':updatedAt', $olderThan);
129
130
        $query = $qb->getQuery();
131
132
        return $query->execute();
133
    }
134
135
    public function getJobCount($workerName = null, $method = null)
136
    {
137
        /** @var EntityManager $objectManager */
138
        $objectManager = $this->getObjectManager();
139
        $qb = $objectManager->createQueryBuilder();
140
141
        $qb = $qb->select('count(j)')->from($this->getObjectName(), 'j');
142
143
        $where = 'where';
144
        if (null !== $workerName) {
145
            if (null !== $method) {
146
                $qb->where($qb->expr()->andX(
147
                    $qb->expr()->eq('j.workerName', ':workerName'),
148
                                                $qb->expr()->eq('j.method', ':method')
149
                ))
150
                    ->setParameter(':method', $method);
151
            } else {
152
                $qb->where('j.workerName = :workerName');
153
            }
154
            $qb->setParameter(':workerName', $workerName);
155
            $where = 'andWhere';
156
        } elseif (null !== $method) {
157
            $qb->where('j.method = :method')->setParameter(':method', $method);
158
            $where = 'andWhere';
159
        }
160
161
        $dateTime = new \DateTime();
162
        // Filter
163
        $qb
164
            ->$where($qb->expr()->orX(
165
                $qb->expr()->isNull('j.whenAt'),
166
                                        $qb->expr()->lte('j.whenAt', ':whenAt')
167
            ))
168
            ->andWhere($qb->expr()->orX(
169
                $qb->expr()->isNull('j.expiresAt'),
170
                $qb->expr()->gt('j.expiresAt', ':expiresAt')
171
            ))
172
            ->andWhere('j.locked is NULL')
173
            ->setParameter(':whenAt', $dateTime)
174
            ->setParameter(':expiresAt', $dateTime);
175
176
        $query = $qb->getQuery();
177
178
        return $query->getSingleScalarResult();
179
    }
180
181
    /**
182
     * Get Jobs statuses.
183
     */
184
    public function getStatus()
185
    {
186
        $result = [];
187
        $this->getStatusByEntityName($this->getObjectName(), $result);
188
        $this->getStatusByEntityName($this->getObjectName(), $result);
189
190
        $finalResult = [];
191
        foreach ($result as $key => $item) {
192
            ksort($item);
193
            $finalResult[$key] = $item;
194
        }
195
196
        return $finalResult;
197
    }
198
199
    /**
200
     * @param string $entityName
201
     */
202
    protected function getStatusByEntityName($entityName, array &$result)
203
    {
204
        /** @var EntityManager $objectManager */
205
        $objectManager = $this->getObjectManager();
206
        $result1 = $objectManager->getRepository($entityName)->createQueryBuilder('j')->select('j.workerName, j.method, j.status, count(j) as c')
207
            ->groupBy('j.workerName, j.method, j.status')->getQuery()->getArrayResult();
208
209
        foreach ($result1 as $item) {
210
            $method = $item['workerName'].'->'.$item['method'];
211
            if (!isset($result[$method])) {
212
                $result[$method] = [Job::STATUS_NEW => 0,
213
                                    Job::STATUS_RUNNING => 0,
214
                                    Job::STATUS_SUCCESS => 0,
215
                                    Job::STATUS_ERROR => 0, ];
216
            }
217
            $result[$method][$item['status']] += intval($item['c']);
218
        }
219
    }
220
221
    /**
222
     * Get the next job to run (can be filtered by workername and method name).
223
     *
224
     * @param string $workerName
225
     * @param string $methodName
226
     * @param bool   $prioritize
227
     *
228
     * @return Job|null
229
     */
230
    public function getJob($workerName = null, $methodName = null, $prioritize = true, $runId = null)
231
    {
232
        $uniqid = uniqid(gethostname().'-'.getmypid(), true);
233
        $hash = hash('sha256', $uniqid);
234
235
        /** @var EntityManager $objectManager */
236
        $objectManager = $this->getObjectManager();
237
238
        $objectManager->beginTransaction();
239
240
        /** @var EntityRepository $repository */
241
        $repository = $this->getRepository();
242
        $qb = $repository->createQueryBuilder('j');
243
        $dateTime = new \DateTime();
244
        $qb
245
            ->select('j')
246
            ->where('j.status = :status')->setParameter(':status', Job::STATUS_NEW)
247
            ->andWhere('j.locked is NULL')
248
            ->andWhere($qb->expr()->orX(
249
                $qb->expr()->isNull('j.whenAt'),
250
                        $qb->expr()->lte('j.whenAt', ':whenAt')
251
            ))
252
            ->andWhere($qb->expr()->orX(
253
                $qb->expr()->isNull('j.expiresAt'),
254
                        $qb->expr()->gt('j.expiresAt', ':expiresAt')
255
            ))
256
            ->setParameter(':whenAt', $dateTime)
257
            ->setParameter(':expiresAt', $dateTime);
258
259
        if (null !== $workerName) {
260
            $qb->andWhere('j.workerName = :workerName')
261
                ->setParameter(':workerName', $workerName);
262
        }
263
264
        if (null !== $methodName) {
265
            $qb->andWhere('j.method = :method')
266
                ->setParameter(':method', $methodName);
267
        }
268
269
        if ($prioritize) {
270
            $qb->add('orderBy', 'j.priority ASC, j.whenAt ASC');
0 ignored issues
show
Documentation introduced by
'j.priority ASC, j.whenAt ASC' is of type string, but the function expects a object<Doctrine\ORM\Query\Expr\Base>.

It seems like the type of the argument is not accepted by the function/method which you are calling.

In some cases, in particular if PHP’s automatic type-juggling kicks in this might be fine. In other cases, however this might be a bug.

We suggest to add an explicit type cast like in the following example:

function acceptsInteger($int) { }

$x = '123'; // string "123"

// Instead of
acceptsInteger($x);

// we recommend to use
acceptsInteger((integer) $x);
Loading history...
271
        } else {
272
            $qb->orderBy('j.whenAt', 'ASC');
273
        }
274
        $qb->setMaxResults(1);
275
276
        /** @var QueryBuilder $qb */
277
        $query = $qb->getQuery();
278
        $query->setLockMode(LockMode::PESSIMISTIC_WRITE);
279
        $jobs = $query->getResult();
280
281
        if ($jobs) {
282
            /** @var Job $job */
283
            $job = $jobs[0];
284
            if (!$job) {
285
                throw new \Exception("No job found for $hash, even though last result was count ".count($jobs));
286
            }
287
            $job->setLocked(true);
288
            $job->setLockedAt(new \DateTime());
289
            $job->setStatus(Job::STATUS_RUNNING);
290
            $job->setRunId($runId);
291
            $objectManager->commit();
292
            $objectManager->flush();
293
294
            return $job;
295
        }
296
297
        $objectManager->rollback();
298
299
        return null;
300
    }
301
}
302