Completed
Pull Request — master (#27)
by Matthew
16:48
created

JobManager::getStatusByDocument()   B

Complexity

Conditions 2
Paths 2

Size

Total Lines 36
Code Lines 21

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 21
CRAP Score 2

Importance

Changes 0
Metric Value
dl 0
loc 36
rs 8.8571
c 0
b 0
f 0
ccs 21
cts 21
cp 1
cc 2
eloc 21
nc 2
nop 1
crap 2
1
<?php
2
3
namespace Dtc\QueueBundle\ODM;
4
5
use Doctrine\MongoDB\Query\Builder;
6
use Dtc\QueueBundle\Doctrine\BaseJobManager;
7
use Doctrine\ODM\MongoDB\DocumentManager;
8
use Dtc\QueueBundle\Model\BaseJob;
9
use Dtc\QueueBundle\Model\RetryableJob;
10
11
class JobManager extends BaseJobManager
12
{
13
    use CommonTrait;
14
    const REDUCE_FUNCTION = 'function(k, vals) {
15 3
            var result = {};
16
            for (var index in vals) {
17
                var val =  vals[index];
18 3
                for (var i in val) {
19 3
                    if (result.hasOwnProperty(i)) {
20
                        result[i] += val[i];
21 3
                    }
22 3
                    else {
23
                        result[i] = val[i];
24 3
                    }
25 3
                }
26
            }
27 3
            return result;
28
        }';
29
30
    public function countJobsByStatus($objectName, $status, $workerName = null, $method = null)
31
    {
32
        /** @var DocumentManager $objectManager */
33
        $objectManager = $this->getObjectManager();
34 1
        $qb = $objectManager->createQueryBuilder($objectName);
35
        $qb
36
            ->find()
37 1
            ->field('status')->equals($status);
38 1
39 1
        $this->addWorkerNameCriterion($qb, $workerName, $method);
40 1
        $query = $qb->getQuery();
41 1
42
        return $query->count();
43 1
    }
44 1
45 1
    /**
46 1
     * @param string|null $workerName
47
     * @param string|null $method
48
     */
49
    public function pruneErroneousJobs($workerName = null, $method = null)
50
    {
51
        /** @var DocumentManager $objectManager */
52
        $objectManager = $this->getObjectManager();
53
        $qb = $objectManager->createQueryBuilder($this->getJobArchiveClass());
54
        $qb = $qb->remove();
55
        $qb->field('status')->equals(BaseJob::STATUS_ERROR);
56
        $this->addWorkerNameCriterion($qb, $workerName, $method);
57 15
58
        $query = $qb->getQuery();
59 15
        $result = $query->execute();
60 4
        if (isset($result['n'])) {
61 4
            return $result['n'];
62
        }
63 15
64 3
        return 0;
65 3
    }
66 15
67
    /**
68
     * @param Builder     $builder
69
     * @param string|null $workerName
70
     * @param string|null $method
71
     */
72
    protected function addWorkerNameCriterion(Builder $builder, $workerName = null, $method = null)
73
    {
74 2
        if (null !== $workerName) {
75
            $builder->field('workerName')->equals($workerName);
76
        }
77 2
78 2
        if (null !== $method) {
79 2
            $builder->field('method')->equals($method);
80 2
        }
81 2
    }
82 2
83 2
    /**
84 2
     * @param null $workerName
0 ignored issues
show
Documentation Bug introduced by
Are you sure the doc-type for parameter $workerName is correct as it would always require null to be passed?
Loading history...
Documentation Bug introduced by
Are you sure the doc-type for parameter $method is correct as it would always require null to be passed?
Loading history...
85 2
     * @param null $method
86 2
     *
87 2
     * @return int
88
     */
89
    protected function updateExpired($workerName = null, $method = null)
90
    {
91
        /** @var DocumentManager $objectManager */
92
        $objectManager = $this->getObjectManager();
93
        $qb = $objectManager->createQueryBuilder($this->getJobClass());
94
        $qb = $qb->updateMany();
95
        $qb->field('expiresAt')->lte(new \DateTime());
96
        $qb->field('status')->equals(BaseJob::STATUS_NEW);
97
        $this->addWorkerNameCriterion($qb, $workerName, $method);
98
        $qb->field('status')->set(\Dtc\QueueBundle\Model\Job::STATUS_EXPIRED);
99 1
        $query = $qb->getQuery();
100
        $result = $query->execute();
101 1
        if (isset($result['n'])) {
102
            return $result['n'];
103
        }
104 2
105
        return 0;
106
    }
107 2
108 2
    /**
109
     * Removes archived jobs older than $olderThan.
110 2
     *
111
     * @param \DateTime $olderThan
112 2
     *
113
     * @return int
114
     */
115 2
    public function pruneArchivedJobs(\DateTime $olderThan)
116
    {
117 2
        return $this->removeOlderThan($this->getJobArchiveClass(), 'updatedAt', $olderThan);
118 2
    }
119 2
120 2
    public function getJobCount($workerName = null, $method = null)
121
    {
122 2
        /** @var DocumentManager $objectManager */
123
        $objectManager = $this->getObjectManager();
124 2
        $qb = $objectManager->createQueryBuilder($this->getJobClass());
125
        $qb
126
            ->find();
127
128
        $this->addWorkerNameCriterion($qb, $workerName, $method);
129
130
        // Filter
131
        $date = new \DateTime();
132
        $qb
133
            ->addAnd(
134 2
                $qb->expr()->addOr($qb->expr()->field('expiresAt')->equals(null), $qb->expr()->field('expiresAt')->gt($date))
135
            )
136
            ->field('locked')->equals(null);
137
138
        $query = $qb->getQuery();
139
140
        return $query->count(true);
141
    }
142 2
143
    /**
144
     * Get Status Jobs.
145
     *
146
     * @param string $documentName
147
     *
148
     * @return array
149
     */
150
    protected function getStatusByDocument($documentName)
151
    {
152
        // Run a map reduce function get worker and status break down
153
        $mapFunc = "function() {
154
            var result = {};
155
            result[this.status] = 1;
156
            var key = this.worker_name + '->' + this.method + '()';
157 2
            emit(key, result);
158
        }";
159 2
        $reduceFunc = self::REDUCE_FUNCTION;
160 2
        /** @var DocumentManager $objectManager */
161 2
        $objectManager = $this->getObjectManager();
162 2
        $qb = $objectManager->createQueryBuilder($documentName);
163 2
        $qb->map($mapFunc)
164 2
            ->reduce($reduceFunc);
165
        $query = $qb->getQuery();
166
        $results = $query->execute();
167 2
168 2
        $allStatus = array(
169 2
            BaseJob::STATUS_ERROR => 0,
170 2
            BaseJob::STATUS_NEW => 0,
171 2
            RetryableJob::STATUS_EXPIRED => 0,
172 2
            RetryableJob::STATUS_MAX_ERROR => 0,
173 2
            RetryableJob::STATUS_MAX_RETRIES => 0,
174 2
            RetryableJob::STATUS_MAX_STALLED => 0,
175 2
            BaseJob::STATUS_RUNNING => 0,
176
            BaseJob::STATUS_SUCCESS => 0,
177 2
        );
178
179 2
        $status = [];
180 1
181 2
        foreach ($results as $info) {
182
            $status[$info['_id']] = $info['value'] + $allStatus;
183 2
        }
184
185
        return $status;
186 2
    }
187
188 2
    public function getStatus()
189 2
    {
190 2
        $result = $this->getStatusByDocument($this->getJobClass());
191 1
        $status2 = $this->getStatusByDocument($this->getJobArchiveClass());
192 1
        foreach ($status2 as $key => $value) {
193 1
            foreach ($value as $k => $v) {
194 1
                if (isset($result[$key][$k])) {
195
                    $result[$key][$k] += $v;
196
                } else {
197 1
                    $result[$key][$k] = $v;
198 2
                }
199
            }
200 2
        }
201 2
202 1
        $finalResult = [];
203 1
        foreach ($result as $key => $item) {
204 2
            ksort($item);
205
            $finalResult[$key] = $item;
206 2
        }
207
208
        return $finalResult;
209
    }
210
211
    /**
212
     * Get the next job to run (can be filtered by workername and method name).
213
     *
214
     * @param string      $workerName
215
     * @param string      $methodName
216
     * @param bool        $prioritize
217
     * @param string|null $runId
218 8
     *
219
     * @return \Dtc\QueueBundle\Model\Job
220 8
     */
221
    public function getJob($workerName = null, $methodName = null, $prioritize = true, $runId = null)
222 8
    {
223 8
        $builder = $this->getJobQueryBuilder($workerName, $methodName, $prioritize);
0 ignored issues
show
Bug introduced by
It seems like $workerName can also be of type string; however, parameter $workerName of Dtc\QueueBundle\ODM\JobM...r::getJobQueryBuilder() does only seem to accept null, maybe add an additional type check? ( Ignorable by Annotation )

If this is a false-positive, you can also ignore this issue in your code via the ignore-type  annotation

223
        $builder = $this->getJobQueryBuilder(/** @scrutinizer ignore-type */ $workerName, $methodName, $prioritize);
Loading history...
Bug introduced by
It seems like $methodName can also be of type string; however, parameter $methodName of Dtc\QueueBundle\ODM\JobM...r::getJobQueryBuilder() does only seem to accept null, maybe add an additional type check? ( Ignorable by Annotation )

If this is a false-positive, you can also ignore this issue in your code via the ignore-type  annotation

223
        $builder = $this->getJobQueryBuilder($workerName, /** @scrutinizer ignore-type */ $methodName, $prioritize);
Loading history...
224
        $builder
225 8
            ->findAndUpdate()
226
            ->returnNew();
227
228 8
        $date = new \DateTime();
229 8
        // Update
230 8
        $builder
231 8
            ->field('lockedAt')->set($date) // Set started
232
            ->field('locked')->set(true)
233 8
            ->field('status')->set(BaseJob::STATUS_RUNNING)
234
            ->field('runId')->set($runId);
235 8
236
        $query = $builder->getQuery();
237 8
238
        $job = $query->execute();
239
240
        return $job;
241
    }
242
243
    /**
244
     * @param null $workerName
0 ignored issues
show
Documentation Bug introduced by
Are you sure the doc-type for parameter $methodName is correct as it would always require null to be passed?
Loading history...
Documentation Bug introduced by
Are you sure the doc-type for parameter $workerName is correct as it would always require null to be passed?
Loading history...
245
     * @param null $methodName
246
     * @param bool $prioritize
247 8
     *
248
     * @return Builder
249
     */
250 8
    public function getJobQueryBuilder($workerName = null, $methodName = null, $prioritize = true)
251 8
    {
252
        /** @var DocumentManager $objectManager */
253 8
        $objectManager = $this->getObjectManager();
254 8
        $builder = $objectManager->createQueryBuilder($this->getJobClass());
255 8
256 8
        $this->addWorkerNameCriterion($builder, $workerName, $methodName);
257 8
        if ($prioritize) {
258 8
            $builder->sort([
259 8
                'priority' => 'desc',
260 1
                'whenAt' => 'asc',
261
            ]);
262
        } else {
263
            $builder->sort('whenAt', 'asc');
264 8
        }
265
266 8
        // Filter
267 8
        $date = new \DateTime();
268 8
        $builder
269 8
            ->addAnd(
270 8
                $builder->expr()->addOr($builder->expr()->field('whenAt')->equals(null), $builder->expr()->field('whenAt')->lte($date)),
271 8
                $builder->expr()->addOr($builder->expr()->field('expiresAt')->equals(null), $builder->expr()->field('expiresAt')->gt($date))
272
            )
273 8
            ->field('status')->equals(BaseJob::STATUS_NEW)
274
            ->field('locked')->equals(null);
275
276 1
        return $builder;
277
    }
278
279 1
    protected function updateNearestBatch(\Dtc\QueueBundle\Model\Job $job)
280 1
    {
281 1
        /** @var DocumentManager $objectManager */
282
        $objectManager = $this->getObjectManager();
283 1
        $builder = $objectManager->createQueryBuilder($this->getJobClass());
284 1
        $builder->find();
285 1
286 1
        $builder->sort('whenAt', 'asc');
287 1
        $builder->field('status')->equals(BaseJob::STATUS_NEW)
288
            ->field('crcHash')->equals($job->getCrcHash())
289 1
            ->field('locked')->equals(null);
290
        $oldJob = $builder->getQuery()->getSingleResult();
291
292
        if (!$oldJob) {
293
            return null;
294
        }
295
296 1
        // Update priority or whenAt
297 1
        //  This makes sure if someone else is updating at the same time
298 1
        //  that we don't trounce their changes.
299 1
        $builder = $objectManager->createQueryBuilder($this->getJobClass());
300 1
        $builder->findAndUpdate();
301 1
        $builder->field('_id')->equals($oldJob->getId());
302
        $builder->field('priority')->lt($job->getPriority());
303 1
        $builder->field('priority')->set($job->getPriority());
304 1
        $builder->getQuery()->execute();
305 1
306 1
        $builder = $objectManager->createQueryBuilder($this->getJobClass());
307 1
        $builder->findAndUpdate();
308 1
        $builder->field('_id')->equals($oldJob->getId());
309
        $builder->field('whenAt')->gt($job->getWhenAt());
310 1
        $builder->field('whenAt')->set($job->getWhenAt());
311 1
        $builder->getQuery()->execute();
312 1
313 1
        if ($job->getWhenAt() < $oldJob->getWhenAt()) {
314 1
            $oldJob->setWhenAt($job->getWhenAt());
315 1
        }
316
        if ($job->getPriority() > $oldJob->getPriority()) {
317 1
            $oldJob->setPriority($job->getPriority());
318
        }
319
320
        return $oldJob;
321
    }
322
}
323