Completed
Push — master ( 2db41f...aa6ae6 )
by Matthew
15:06 queued 36s
created

JobManager::getJobQueryBuilder()   B

Complexity

Conditions 2
Paths 2

Size

Total Lines 28
Code Lines 18

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 15
CRAP Score 2

Importance

Changes 0
Metric Value
dl 0
loc 28
ccs 15
cts 15
cp 1
rs 8.8571
c 0
b 0
f 0
cc 2
eloc 18
nc 2
nop 3
crap 2
1
<?php
2
3
namespace Dtc\QueueBundle\ODM;
4
5
use Doctrine\MongoDB\Query\Builder;
6
use Dtc\QueueBundle\Doctrine\BaseJobManager;
7
use Doctrine\ODM\MongoDB\DocumentManager;
8
use Dtc\QueueBundle\Model\BaseJob;
9
use Dtc\QueueBundle\Model\RetryableJob;
10
11
class JobManager extends BaseJobManager
12
{
13
    use CommonTrait;
14
    const REDUCE_FUNCTION = 'function(k, vals) {
15
            var result = {};
16
            for (var index in vals) {
17
                var val =  vals[index];
18
                for (var i in val) {
19
                    if (result.hasOwnProperty(i)) {
20
                        result[i] += val[i];
21
                    }
22
                    else {
23
                        result[i] = val[i];
24
                    }
25
                }
26
            }
27
            return result;
28
        }';
29
30 3
    public function countJobsByStatus($objectName, $status, $workerName = null, $method = null)
31
    {
32
        /** @var DocumentManager $objectManager */
33 3
        $objectManager = $this->getObjectManager();
34 3
        $qb = $objectManager->createQueryBuilder($objectName);
35
        $qb
36 3
            ->find()
37 3
            ->field('status')->equals($status);
38
39 3
        $this->addWorkerNameCriterion($qb, $workerName, $method);
40 3
        $query = $qb->getQuery();
41
42 3
        return $query->count();
43
    }
44
45
    /**
46
     * @param string|null $workerName
47
     * @param string|null $method
48
     */
49 1
    public function pruneErroneousJobs($workerName = null, $method = null)
50
    {
51
        /** @var DocumentManager $objectManager */
52 1
        $objectManager = $this->getObjectManager();
53 1
        $qb = $objectManager->createQueryBuilder($this->getJobArchiveClass());
54 1
        $qb = $qb->remove();
55 1
        $qb->field('status')->equals(BaseJob::STATUS_ERROR);
56 1
        $this->addWorkerNameCriterion($qb, $workerName, $method);
57
58 1
        $query = $qb->getQuery();
59 1
        $result = $query->execute();
60 1
        if (isset($result['n'])) {
61 1
            return $result['n'];
62
        }
63
64
        return 0;
65
    }
66
67
    /**
68
     * @param Builder     $builder
69
     * @param string|null $workerName
70
     * @param string|null $method
71
     */
72 15
    protected function addWorkerNameCriterion(Builder $builder, $workerName = null, $method = null)
73
    {
74 15
        if (null !== $workerName) {
75 4
            $builder->field('workerName')->equals($workerName);
76
        }
77
78 15
        if (null !== $method) {
79 3
            $builder->field('method')->equals($method);
80
        }
81 15
    }
82
83
    /**
84
     * @param null $workerName
85
     * @param null $method
86
     *
87
     * @return int
88
     */
89 2
    protected function updateExpired($workerName = null, $method = null)
90
    {
91
        /** @var DocumentManager $objectManager */
92 2
        $objectManager = $this->getObjectManager();
93 2
        $qb = $objectManager->createQueryBuilder($this->getJobClass());
94 2
        $qb = $qb->updateMany();
95 2
        $qb->field('expiresAt')->lte(new \DateTime());
96 2
        $qb->field('status')->equals(BaseJob::STATUS_NEW);
97 2
        $this->addWorkerNameCriterion($qb, $workerName, $method);
98 2
        $qb->field('status')->set(\Dtc\QueueBundle\Model\Job::STATUS_EXPIRED);
99 2
        $query = $qb->getQuery();
100 2
        $result = $query->execute();
101 2
        if (isset($result['n'])) {
102 2
            return $result['n'];
103
        }
104
105
        return 0;
106
    }
107
108
    /**
109
     * Removes archived jobs older than $olderThan.
110
     *
111
     * @param \DateTime $olderThan
112
     *
113
     * @return int
114
     */
115 1
    public function pruneArchivedJobs(\DateTime $olderThan)
116
    {
117 1
        return $this->removeOlderThan($this->getJobArchiveClass(), 'updatedAt', $olderThan);
118
    }
119
120 2
    public function getJobCount($workerName = null, $method = null)
121
    {
122
        /** @var DocumentManager $objectManager */
123 2
        $objectManager = $this->getObjectManager();
124 2
        $qb = $objectManager->createQueryBuilder($this->getJobClass());
125
        $qb
126 2
            ->find();
127
128 2
        $this->addWorkerNameCriterion($qb, $workerName, $method);
129
130
        // Filter
131 2
        $date = new \DateTime();
132
        $qb
133 2
            ->addAnd(
134 2
                $qb->expr()->addOr($qb->expr()->field('expiresAt')->equals(null), $qb->expr()->field('expiresAt')->gt($date))
135
            )
136 2
            ->field('locked')->equals(null);
137
138 2
        $query = $qb->getQuery();
139
140 2
        return $query->count(true);
141
    }
142
143
    /**
144
     * Get Status Jobs.
145
     *
146
     * @param string $documentName
147
     *
148
     * @return array
149
     */
150 2
    protected function getStatusByDocument($documentName)
151
    {
152
        // Run a map reduce function get worker and status break down
153 2
        $mapFunc = "function() {
154
            var result = {};
155
            result[this.status] = 1;
156
            var key = this.worker_name + '->' + this.method + '()';
157
            emit(key, result);
158
        }";
159 2
        $reduceFunc = self::REDUCE_FUNCTION;
160
        /** @var DocumentManager $objectManager */
161 2
        $objectManager = $this->getObjectManager();
162 2
        $qb = $objectManager->createQueryBuilder($documentName);
163 2
        $qb->map($mapFunc)
164 2
            ->reduce($reduceFunc);
165 2
        $query = $qb->getQuery();
166 2
        $results = $query->execute();
167
168
        $allStatus = array(
169 2
            BaseJob::STATUS_ERROR => 0,
170
            BaseJob::STATUS_NEW => 0,
171
            RetryableJob::STATUS_EXPIRED => 0,
172
            RetryableJob::STATUS_MAX_ERROR => 0,
173
            RetryableJob::STATUS_MAX_RETRIES => 0,
174
            RetryableJob::STATUS_MAX_STALLED => 0,
175
            BaseJob::STATUS_RUNNING => 0,
176
            BaseJob::STATUS_SUCCESS => 0,
177
        );
178
179 2
        $status = [];
180
181 2
        foreach ($results as $info) {
182 1
            $status[$info['_id']] = $info['value'] + $allStatus;
183
        }
184
185 2
        return $status;
186
    }
187
188 2
    public function getStatus()
189
    {
190 2
        $result = $this->getStatusByDocument($this->getJobClass());
191 2
        $status2 = $this->getStatusByDocument($this->getJobArchiveClass());
192 2
        foreach ($status2 as $key => $value) {
193 1
            foreach ($value as $k => $v) {
194 1
                if (isset($result[$key][$k])) {
195 1
                    $result[$key][$k] += $v;
196
                } else {
197 1
                    $result[$key][$k] = $v;
198
                }
199
            }
200
        }
201
202 2
        $finalResult = [];
203 2
        foreach ($result as $key => $item) {
204 1
            ksort($item);
205 1
            $finalResult[$key] = $item;
206
        }
207
208 2
        return $finalResult;
209
    }
210
211
    /**
212
     * Get the next job to run (can be filtered by workername and method name).
213
     *
214
     * @param string      $workerName
215
     * @param string      $methodName
216
     * @param bool        $prioritize
217
     * @param string|null $runId
218
     *
219
     * @return \Dtc\QueueBundle\Model\Job
220
     */
221 8
    public function getJob($workerName = null, $methodName = null, $prioritize = true, $runId = null)
222
    {
223 8
        $builder = $this->getJobQueryBuilder($workerName, $methodName, $prioritize);
0 ignored issues
show
Bug introduced by
It seems like $workerName defined by parameter $workerName on line 221 can also be of type string; however, Dtc\QueueBundle\ODM\JobM...r::getJobQueryBuilder() does only seem to accept null, maybe add an additional type check?

This check looks at variables that have been passed in as parameters and are passed out again to other methods.

If the outgoing method call has stricter type requirements than the method itself, an issue is raised.

An additional type check may prevent trouble.

Loading history...
Bug introduced by
It seems like $methodName defined by parameter $methodName on line 221 can also be of type string; however, Dtc\QueueBundle\ODM\JobM...r::getJobQueryBuilder() does only seem to accept null, maybe add an additional type check?

This check looks at variables that have been passed in as parameters and are passed out again to other methods.

If the outgoing method call has stricter type requirements than the method itself, an issue is raised.

An additional type check may prevent trouble.

Loading history...
224
        $builder
225 8
            ->findAndUpdate()
226 8
            ->returnNew();
227
228 8
        $date = new \DateTime();
229
        // Update
230
        $builder
231 8
            ->field('lockedAt')->set($date) // Set started
232 8
            ->field('locked')->set(true)
233 8
            ->field('status')->set(BaseJob::STATUS_RUNNING)
234 8
            ->field('runId')->set($runId);
235
236 8
        $query = $builder->getQuery();
237
238 8
        $job = $query->execute();
239
240 8
        return $job;
241
    }
242
243
    /**
244
     * @param null $workerName
245
     * @param null $methodName
246
     * @param bool $prioritize
247
     *
248
     * @return Builder
249
     */
250 8
    public function getJobQueryBuilder($workerName = null, $methodName = null, $prioritize = true)
251
    {
252
        /** @var DocumentManager $objectManager */
253 8
        $objectManager = $this->getObjectManager();
254 8
        $builder = $objectManager->createQueryBuilder($this->getJobClass());
255
256 8
        $this->addWorkerNameCriterion($builder, $workerName, $methodName);
257 8
        if ($prioritize) {
258 8
            $builder->sort([
259 8
                'priority' => 'desc',
260
                'whenAt' => 'asc',
261
            ]);
262
        } else {
263 1
            $builder->sort('whenAt', 'asc');
264
        }
265
266
        // Filter
267 8
        $date = new \DateTime();
268
        $builder
269 8
            ->addAnd(
270 8
                $builder->expr()->addOr($builder->expr()->field('whenAt')->equals(null), $builder->expr()->field('whenAt')->lte($date)),
271 8
                $builder->expr()->addOr($builder->expr()->field('expiresAt')->equals(null), $builder->expr()->field('expiresAt')->gt($date))
272
            )
273 8
            ->field('status')->equals(BaseJob::STATUS_NEW)
274 8
            ->field('locked')->equals(null);
275
276 8
        return $builder;
277
    }
278
279 1
    protected function updateNearestBatch(\Dtc\QueueBundle\Model\Job $job)
280
    {
281
        /** @var DocumentManager $objectManager */
282 1
        $objectManager = $this->getObjectManager();
283 1
        $builder = $objectManager->createQueryBuilder($this->getJobClass());
284 1
        $builder->find();
285
286 1
        $builder->sort('whenAt', 'asc');
287 1
        $builder->field('status')->equals(BaseJob::STATUS_NEW)
288 1
            ->field('crcHash')->equals($job->getCrcHash())
289 1
            ->field('locked')->equals(null);
290 1
        $oldJob = $builder->getQuery()->getSingleResult();
291
292 1
        if (!$oldJob) {
293
            return null;
294
        }
295
296
        // Update priority or whenAt
297
        //  This makes sure if someone else is updating at the same time
298
        //  that we don't trounce their changes.
299 1
        $builder = $objectManager->createQueryBuilder($this->getJobClass());
300 1
        $builder->findAndUpdate();
301 1
        $builder->field('_id')->equals($oldJob->getId());
302 1
        $builder->field('priority')->lt($job->getPriority());
303 1
        $builder->field('priority')->set($job->getPriority());
304 1
        $builder->getQuery()->execute();
305
306 1
        $builder = $objectManager->createQueryBuilder($this->getJobClass());
307 1
        $builder->findAndUpdate();
308 1
        $builder->field('_id')->equals($oldJob->getId());
309 1
        $builder->field('whenAt')->gt($job->getWhenAt());
310 1
        $builder->field('whenAt')->set($job->getWhenAt());
311 1
        $builder->getQuery()->execute();
312
313 1
        if ($job->getWhenAt() < $oldJob->getWhenAt()) {
314 1
            $oldJob->setWhenAt($job->getWhenAt());
315
        }
316 1
        if ($job->getPriority() > $oldJob->getPriority()) {
317 1
            $oldJob->setPriority($job->getPriority());
318
        }
319
320 1
        return $oldJob;
321
    }
322
}
323