Completed
Push — master ( 74ff85...ce55af )
by Matthew
06:18
created

JobManager::getJobCount()   A

Complexity

Conditions 1
Paths 1

Size

Total Lines 22
Code Lines 13

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 11
CRAP Score 1

Importance

Changes 1
Bugs 1 Features 0
Metric Value
dl 0
loc 22
ccs 11
cts 11
cp 1
rs 9.2
c 1
b 1
f 0
cc 1
eloc 13
nc 1
nop 2
crap 1
1
<?php
2
3
namespace Dtc\QueueBundle\ODM;
4
5
use Doctrine\MongoDB\Query\Builder;
6
use Dtc\QueueBundle\Doctrine\BaseJobManager;
7
use Doctrine\ODM\MongoDB\DocumentManager;
8
use Dtc\QueueBundle\Model\BaseJob;
9
use Dtc\QueueBundle\Model\RetryableJob;
10
11
class JobManager extends BaseJobManager
12
{
13
    use CommonTrait;
14
15 3
    public function countJobsByStatus($objectName, $status, $workerName = null, $method = null)
16
    {
17
        /** @var DocumentManager $objectManager */
18 3
        $objectManager = $this->getObjectManager();
19 3
        $qb = $objectManager->createQueryBuilder($objectName);
20
        $qb
21 3
            ->find()
22 3
            ->field('status')->equals($status);
23
24 3
        $this->addWorkerNameCriterion($qb, $workerName, $method);
25 3
        $query = $qb->getQuery();
26
27 3
        return $query->count();
28
    }
29
30
    /**
31
     * @param string|null $workerName
32
     * @param string|null $method
33
     */
34 1
    public function pruneErroneousJobs($workerName = null, $method = null)
35
    {
36
        /** @var DocumentManager $objectManager */
37 1
        $objectManager = $this->getObjectManager();
38 1
        $qb = $objectManager->createQueryBuilder($this->getArchiveObjectName());
39 1
        $qb = $qb->remove();
40 1
        $qb->field('status')->equals(BaseJob::STATUS_ERROR);
41 1
        $this->addWorkerNameCriterion($qb, $workerName, $method);
42
43 1
        $query = $qb->getQuery();
44 1
        $result = $query->execute();
45 1
        if (isset($result['n'])) {
46 1
            return $result['n'];
47
        }
48
49
        return 0;
50
    }
51
52
    /**
53
     * @param Builder     $builder
54
     * @param string|null $workerName
55
     * @param string|null $method
56
     */
57 15
    protected function addWorkerNameCriterion(Builder $builder, $workerName = null, $method = null)
58
    {
59 15
        if (null !== $workerName) {
60 4
            $builder->field('workerName')->equals($workerName);
61
        }
62
63 15
        if (null !== $method) {
64 3
            $builder->field('method')->equals($method);
65
        }
66 15
    }
67
68
    /**
69
     * @param null $workerName
70
     * @param null $method
71
     *
72
     * @return int
73
     */
74 2
    protected function updateExpired($workerName = null, $method = null)
75
    {
76
        /** @var DocumentManager $objectManager */
77 2
        $objectManager = $this->getObjectManager();
78 2
        $qb = $objectManager->createQueryBuilder($this->getObjectName());
79 2
        $qb = $qb->updateMany();
80 2
        $qb->field('expiresAt')->lte(new \DateTime());
81 2
        $qb->field('status')->equals(BaseJob::STATUS_NEW);
82 2
        $this->addWorkerNameCriterion($qb, $workerName, $method);
83 2
        $qb->field('status')->set(\Dtc\QueueBundle\Model\Job::STATUS_EXPIRED);
84 2
        $query = $qb->getQuery();
85 2
        $result = $query->execute();
86 2
        if (isset($result['n'])) {
87 2
            return $result['n'];
88
        }
89
90
        return 0;
91
    }
92
93
    /**
94
     * Removes archived jobs older than $olderThan.
95
     *
96
     * @param \DateTime $olderThan
97
     *                             return int
98
     */
99 1
    public function pruneArchivedJobs(\DateTime $olderThan)
100
    {
101 1
        return $this->removeOlderThan($this->getArchiveObjectName(), 'updatedAt', $olderThan);
102
    }
103
104 2
    public function getJobCount($workerName = null, $method = null)
105
    {
106
        /** @var DocumentManager $objectManager */
107 2
        $objectManager = $this->getObjectManager();
108 2
        $qb = $objectManager->createQueryBuilder($this->getObjectName());
109
        $qb
110 2
            ->find();
111
112 2
        $this->addWorkerNameCriterion($qb, $workerName, $method);
113
114
        // Filter
115 2
        $date = new \DateTime();
116
        $qb
117 2
            ->addAnd(
118 2
                $qb->expr()->addOr($qb->expr()->field('expiresAt')->equals(null), $qb->expr()->field('expiresAt')->gt($date))
119
            )
120 2
            ->field('locked')->equals(null);
121
122 2
        $query = $qb->getQuery();
123
124 2
        return $query->count(true);
125
    }
126
127
    /**
128
     * Get Status Jobs.
129
     *
130
     * @param string $documentName
131
     *
132
     * @return array
133
     */
134 2
    protected function getStatusByDocument($documentName)
135
    {
136
        // Run a map reduce function get worker and status break down
137 2
        $mapFunc = "function() {
138
            var result = {};
139
            result[this.status] = 1;
140
            var key = this.worker_name + '->' + this.method + '()';
141
            emit(key, result);
142
        }";
143 2
        $reduceFunc = 'function(k, vals) {
144
            var result = {};
145
            for (var index in vals) {
146
                var val =  vals[index];
147
                for (var i in val) {
148
                    if (result.hasOwnProperty(i)) {
149
                        result[i] += val[i];
150
                    }
151
                    else {
152
                        result[i] = val[i];
153
                    }
154
                }
155
            }
156
            return result;
157
        }';
158
        /** @var DocumentManager $objectManager */
159 2
        $objectManager = $this->getObjectManager();
160 2
        $qb = $objectManager->createQueryBuilder($documentName);
161 2
        $qb->map($mapFunc)
162 2
            ->reduce($reduceFunc);
163 2
        $query = $qb->getQuery();
164 2
        $results = $query->execute();
165
166
        $allStatus = array(
167 2
            BaseJob::STATUS_ERROR => 0,
168
            BaseJob::STATUS_NEW => 0,
169
            RetryableJob::STATUS_EXPIRED => 0,
170
            RetryableJob::STATUS_MAX_ERROR => 0,
171
            RetryableJob::STATUS_MAX_RETRIES => 0,
172
            RetryableJob::STATUS_MAX_STALLED => 0,
173
            BaseJob::STATUS_RUNNING => 0,
174
            BaseJob::STATUS_SUCCESS => 0,
175
        );
176
177 2
        $status = [];
178
179 2
        foreach ($results as $info) {
180 1
            $status[$info['_id']] = $info['value'] + $allStatus;
181
        }
182
183 2
        return $status;
184
    }
185
186 2
    public function getStatus()
187
    {
188 2
        $result = $this->getStatusByDocument($this->getObjectName());
189 2
        $status2 = $this->getStatusByDocument($this->getArchiveObjectName());
190 2
        foreach ($status2 as $key => $value) {
191 1
            foreach ($value as $k => $v) {
192 1
                if (isset($result[$key][$k])) {
193 1
                    $result[$key][$k] += $v;
194
                }
195
                else {
196 1
                    $result[$key][$k] = $v;
197
                }
198
            }
199
        }
200
201 2
        $finalResult = [];
202 2
        foreach ($result as $key => $item) {
203 1
            ksort($item);
204 1
            $finalResult[$key] = $item;
205
        }
206
207 2
        return $finalResult;
208
    }
209
210
    /**
211
     * Get the next job to run (can be filtered by workername and method name).
212
     *
213
     * @param string $workerName
214
     * @param string $methodName
215
     * @param bool   $prioritize
216
     *
217
     * @return \Dtc\QueueBundle\Model\Job
218
     */
219 8
    public function getJob($workerName = null, $methodName = null, $prioritize = true, $runId = null)
220
    {
221 8
        $builder = $this->getJobQueryBuilder($workerName, $methodName, $prioritize);
0 ignored issues
show
Bug introduced by
It seems like $workerName defined by parameter $workerName on line 219 can also be of type string; however, Dtc\QueueBundle\ODM\JobM...r::getJobQueryBuilder() does only seem to accept null, maybe add an additional type check?

This check looks at variables that have been passed in as parameters and are passed out again to other methods.

If the outgoing method call has stricter type requirements than the method itself, an issue is raised.

An additional type check may prevent trouble.

Loading history...
Bug introduced by
It seems like $methodName defined by parameter $methodName on line 219 can also be of type string; however, Dtc\QueueBundle\ODM\JobM...r::getJobQueryBuilder() does only seem to accept null, maybe add an additional type check?

This check looks at variables that have been passed in as parameters and are passed out again to other methods.

If the outgoing method call has stricter type requirements than the method itself, an issue is raised.

An additional type check may prevent trouble.

Loading history...
222
        $builder
223 8
            ->findAndUpdate()
224 8
            ->returnNew();
225
226 8
        $date = new \DateTime();
227
        // Update
228
        $builder
229 8
            ->field('lockedAt')->set($date) // Set started
230 8
            ->field('locked')->set(true)
231 8
            ->field('status')->set(BaseJob::STATUS_RUNNING)
232 8
            ->field('runId')->set($runId);
233
234 8
        $query = $builder->getQuery();
235
236 8
        $job = $query->execute();
237
238 8
        return $job;
239
    }
240
241
    /**
242
     * @param null $workerName
243
     * @param null $methodName
244
     * @param bool $prioritize
245
     *
246
     * @return Builder
247
     */
248 8
    public function getJobQueryBuilder($workerName = null, $methodName = null, $prioritize = true)
249
    {
250
        /** @var DocumentManager $objectManager */
251 8
        $objectManager = $this->getObjectManager();
252 8
        $builder = $objectManager->createQueryBuilder($this->getObjectName());
253
254 8
        $this->addWorkerNameCriterion($builder, $workerName, $methodName);
255 8
        if ($prioritize) {
256 8
            $builder->sort([
257 8
                'priority' => 'desc',
258
                'whenAt' => 'asc',
259
            ]);
260
        } else {
261 1
            $builder->sort('whenAt', 'asc');
262
        }
263
264
        // Filter
265 8
        $date = new \DateTime();
266
        $builder
267 8
            ->addAnd(
268 8
                $builder->expr()->addOr($builder->expr()->field('whenAt')->equals(null), $builder->expr()->field('whenAt')->lte($date)),
269 8
                $builder->expr()->addOr($builder->expr()->field('expiresAt')->equals(null), $builder->expr()->field('expiresAt')->gt($date))
270
            )
271 8
            ->field('status')->equals(BaseJob::STATUS_NEW)
272 8
            ->field('locked')->equals(null);
273
274 8
        return $builder;
275
    }
276
277 1
    protected function updateNearestBatch(\Dtc\QueueBundle\Model\Job $job)
278
    {
279
        /** @var DocumentManager $objectManager */
280 1
        $objectManager = $this->getObjectManager();
281 1
        $builder = $objectManager->createQueryBuilder($this->getObjectName());
282 1
        $builder->find();
283
284 1
        $builder->sort('whenAt', 'asc');
285 1
        $builder->field('status')->equals(BaseJob::STATUS_NEW)
286 1
            ->field('crcHash')->equals($job->getCrcHash())
287 1
            ->field('locked')->equals(null);
288 1
        $oldJob = $builder->getQuery()->getSingleResult();
289
290 1
        if (!$oldJob) {
291
            return null;
292
        }
293
294
        // Update priority or whenAt
295
        //  This makes sure if someone else is updating at the same time
296
        //  that we don't trounce their changes.
297 1
        $builder = $objectManager->createQueryBuilder($this->getObjectName());
298 1
        $builder->findAndUpdate();
299 1
        $builder->field('_id')->equals($oldJob->getId());
300 1
        $builder->field('priority')->lt($job->getPriority());
301 1
        $builder->field('priority')->set($job->getPriority());
302 1
        $builder->getQuery()->execute();
303
304 1
        $builder = $objectManager->createQueryBuilder($this->getObjectName());
305 1
        $builder->findAndUpdate();
306 1
        $builder->field('_id')->equals($oldJob->getId());
307 1
        $builder->field('whenAt')->gt($job->getWhenAt());
308 1
        $builder->field('whenAt')->set($job->getWhenAt());
309 1
        $builder->getQuery()->execute();
310
311 1
        if ($job->getWhenAt() < $oldJob->getWhenAt()) {
312 1
            $oldJob->setWhenAt($job->getWhenAt());
313
        }
314 1
        if ($job->getPriority() > $oldJob->getPriority()) {
315 1
            $oldJob->setPriority($job->getPriority());
316
        }
317
318 1
        return $oldJob;
319
    }
320
}
321