Completed
Push — master ( 089f73...dc3342 )
by Matthew
06:38
created

JobManager::updateNearestBatch()   B

Complexity

Conditions 4
Paths 5

Size

Total Lines 43
Code Lines 28

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 29
CRAP Score 4.0005

Importance

Changes 0
Metric Value
dl 0
loc 43
ccs 29
cts 30
cp 0.9667
rs 8.5806
c 0
b 0
f 0
cc 4
eloc 28
nc 5
nop 1
crap 4.0005
1
<?php
2
3
namespace Dtc\QueueBundle\ODM;
4
5
use Doctrine\MongoDB\Query\Builder;
6
use Dtc\QueueBundle\Doctrine\BaseJobManager;
7
use Doctrine\ODM\MongoDB\DocumentManager;
8
use Dtc\QueueBundle\Model\BaseJob;
9
use Dtc\QueueBundle\Model\RetryableJob;
10
11
class JobManager extends BaseJobManager
12
{
13
    use CommonTrait;
14
15 3
    public function countJobsByStatus($objectName, $status, $workerName = null, $method = null)
16
    {
17
        /** @var DocumentManager $objectManager */
18 3
        $objectManager = $this->getObjectManager();
19 3
        $qb = $objectManager->createQueryBuilder($objectName);
20
        $qb
21 3
            ->find()
22 3
            ->field('status')->equals($status);
23
24 3
        $this->addWorkerNameCriterion($qb, $workerName, $method);
25 3
        $query = $qb->getQuery();
26
27 3
        return $query->count();
28
    }
29
30
    /**
31
     * @param string|null $workerName
32
     * @param string|null $method
33
     */
34 1
    public function pruneErroneousJobs($workerName = null, $method = null)
35
    {
36
        /** @var DocumentManager $objectManager */
37 1
        $objectManager = $this->getObjectManager();
38 1
        $qb = $objectManager->createQueryBuilder($this->getArchiveObjectName());
39 1
        $qb = $qb->remove();
40 1
        $qb->field('status')->equals(BaseJob::STATUS_ERROR);
41 1
        $this->addWorkerNameCriterion($qb, $workerName, $method);
42
43 1
        $query = $qb->getQuery();
44 1
        $result = $query->execute();
45 1
        if (isset($result['n'])) {
46 1
            return $result['n'];
47
        }
48
49
        return 0;
50
    }
51
52
    /**
53
     * @param Builder     $builder
54
     * @param string|null $workerName
55
     * @param string|null $method
56
     */
57 15
    protected function addWorkerNameCriterion(Builder $builder, $workerName = null, $method = null)
58
    {
59 15
        if (null !== $workerName) {
60 4
            $builder->field('workerName')->equals($workerName);
61 4
        }
62
63 15
        if (null !== $method) {
64 3
            $builder->field('method')->equals($method);
65 3
        }
66 15
    }
67
68
    /**
69
     * @param null $workerName
70
     * @param null $method
71
     *
72
     * @return int
73
     */
74 2
    protected function updateExpired($workerName = null, $method = null)
75
    {
76
        /** @var DocumentManager $objectManager */
77 2
        $objectManager = $this->getObjectManager();
78 2
        $qb = $objectManager->createQueryBuilder($this->getObjectName());
79 2
        $qb = $qb->updateMany();
80 2
        $qb->field('expiresAt')->lte(new \DateTime());
81 2
        $qb->field('status')->equals(BaseJob::STATUS_NEW);
82 2
        $this->addWorkerNameCriterion($qb, $workerName, $method);
83 2
        $qb->field('status')->set(\Dtc\QueueBundle\Model\Job::STATUS_EXPIRED);
84 2
        $query = $qb->getQuery();
85 2
        $result = $query->execute();
86 2
        if (isset($result['n'])) {
87 2
            return $result['n'];
88
        }
89
90
        return 0;
91
    }
92
93
    /**
94
     * Removes archived jobs older than $olderThan.
95
     *
96
     * @param \DateTime $olderThan
97
     *                             return int
98
     */
99 1
    public function pruneArchivedJobs(\DateTime $olderThan)
100
    {
101 1
        return $this->removeOlderThan($this->getArchiveObjectName(), 'updatedAt', $olderThan);
102
    }
103
104 2
    public function getJobCount($workerName = null, $method = null)
105
    {
106
        /** @var DocumentManager $objectManager */
107 2
        $objectManager = $this->getObjectManager();
108 2
        $qb = $objectManager->createQueryBuilder($this->getObjectName());
109
        $qb
110 2
            ->find();
111
112 2
        $this->addWorkerNameCriterion($qb, $workerName, $method);
113
114
        // Filter
115 2
        $date = new \DateTime();
116
        $qb
117 2
            ->addAnd(
118 2
                $qb->expr()->addOr($qb->expr()->field('expiresAt')->equals(null), $qb->expr()->field('expiresAt')->gt($date))
119 2
            )
120 2
            ->field('locked')->equals(null);
121
122 2
        $query = $qb->getQuery();
123
124 2
        return $query->count(true);
125
    }
126
127
    /**
128
     * Get Status Jobs.
129
     *
130
     * @param string $documentName
131
     *
132
     * @return array
133
     */
134 2
    protected function getStatusByDocument($documentName)
135
    {
136
        // Run a map reduce function get worker and status break down
137
        $mapFunc = "function() {
138
            var result = {};
139
            result[this.status] = 1;
140
            var key = this.worker_name + '->' + this.method + '()';
141
            emit(key, result);
142 2
        }";
143
        $reduceFunc = 'function(k, vals) {
144
            var result = {};
145
            for (var index in vals) {
146
                var val =  vals[index];
147
                for (var i in val) {
148
                    if (result.hasOwnProperty(i)) {
149
                        result[i] += val[i];
150
                    }
151
                    else {
152
                        result[i] = val[i];
153
                    }
154
                }
155
            }
156
            return result;
157 2
        }';
158
        /** @var DocumentManager $objectManager */
159 2
        $objectManager = $this->getObjectManager();
160 2
        $qb = $objectManager->createQueryBuilder($documentName);
161 2
        $qb->map($mapFunc)
162 2
            ->reduce($reduceFunc);
163 2
        $query = $qb->getQuery();
164 2
        $results = $query->execute();
165
166
        $allStatus = array(
167 2
            BaseJob::STATUS_ERROR => 0,
168 2
            BaseJob::STATUS_NEW => 0,
169 2
            RetryableJob::STATUS_EXPIRED => 0,
170 2
            RetryableJob::STATUS_MAX_ERROR => 0,
171 2
            RetryableJob::STATUS_MAX_RETRIES => 0,
172 2
            RetryableJob::STATUS_MAX_STALLED => 0,
173 2
            BaseJob::STATUS_RUNNING => 0,
174 2
            BaseJob::STATUS_SUCCESS => 0,
175 2
        );
176
177 2
        $status = [];
178
179 2
        foreach ($results as $info) {
180 1
            $status[$info['_id']] = $info['value'] + $allStatus;
181 2
        }
182
183 2
        return $status;
184
    }
185
186 2
    public function getStatus()
187
    {
188 2
        $result = $this->getStatusByDocument($this->getObjectName());
189 2
        $status2 = $this->getStatusByDocument($this->getArchiveObjectName());
190 2
        foreach ($status2 as $key => $value) {
191 1
            foreach ($value as $k => $v) {
192 1
                if (isset($result[$key][$k])) {
193 1
                    $result[$key][$k] += $v;
194 1
                } else {
195
                    $result[$key][$k] = $v;
196
                }
197 1
            }
198 2
        }
199
200 2
        $finalResult = [];
201 2
        foreach ($result as $key => $item) {
202 1
            ksort($item);
203 1
            $finalResult[$key] = $item;
204 2
        }
205
206 2
        return $finalResult;
207
    }
208
209
    /**
210
     * Get the next job to run (can be filtered by workername and method name).
211
     *
212
     * @param string $workerName
213
     * @param string $methodName
214
     * @param bool   $prioritize
215
     *
216
     * @return \Dtc\QueueBundle\Model\Job
217
     */
218 8
    public function getJob($workerName = null, $methodName = null, $prioritize = true, $runId = null)
219
    {
220 8
        $builder = $this->getJobQueryBuilder($workerName, $methodName, $prioritize);
0 ignored issues
show
Bug introduced by
It seems like $workerName defined by parameter $workerName on line 218 can also be of type string; however, Dtc\QueueBundle\ODM\JobM...r::getJobQueryBuilder() does only seem to accept null, maybe add an additional type check?

This check looks at variables that have been passed in as parameters and are passed out again to other methods.

If the outgoing method call has stricter type requirements than the method itself, an issue is raised.

An additional type check may prevent trouble.

Loading history...
Bug introduced by
It seems like $methodName defined by parameter $methodName on line 218 can also be of type string; however, Dtc\QueueBundle\ODM\JobM...r::getJobQueryBuilder() does only seem to accept null, maybe add an additional type check?

This check looks at variables that have been passed in as parameters and are passed out again to other methods.

If the outgoing method call has stricter type requirements than the method itself, an issue is raised.

An additional type check may prevent trouble.

Loading history...
221
        $builder
222 8
            ->findAndUpdate()
223 8
            ->returnNew();
224
225 8
        $date = new \DateTime();
226
        // Update
227
        $builder
228 8
            ->field('lockedAt')->set($date) // Set started
229 8
            ->field('locked')->set(true)
230 8
            ->field('status')->set(BaseJob::STATUS_RUNNING)
231 8
            ->field('runId')->set($runId);
232
233 8
        $query = $builder->getQuery();
234
235 8
        $job = $query->execute();
236
237 8
        return $job;
238
    }
239
240
    /**
241
     * @param null $workerName
242
     * @param null $methodName
243
     * @param bool $prioritize
244
     *
245
     * @return Builder
246
     */
247 8
    public function getJobQueryBuilder($workerName = null, $methodName = null, $prioritize = true)
248
    {
249
        /** @var DocumentManager $objectManager */
250 8
        $objectManager = $this->getObjectManager();
251 8
        $builder = $objectManager->createQueryBuilder($this->getObjectName());
252
253 8
        $this->addWorkerNameCriterion($builder, $workerName, $methodName);
254 8
        if ($prioritize) {
255 8
            $builder->sort([
256 8
                'priority' => 'desc',
257 8
                'whenAt' => 'asc',
258 8
            ]);
259 8
        } else {
260 1
            $builder->sort('whenAt', 'asc');
261
        }
262
263
        // Filter
264 8
        $date = new \DateTime();
265
        $builder
266 8
            ->addAnd(
267 8
                $builder->expr()->addOr($builder->expr()->field('whenAt')->equals(null), $builder->expr()->field('whenAt')->lte($date)),
268 8
                $builder->expr()->addOr($builder->expr()->field('expiresAt')->equals(null), $builder->expr()->field('expiresAt')->gt($date))
269 8
            )
270 8
            ->field('status')->equals(BaseJob::STATUS_NEW)
271 8
            ->field('locked')->equals(null);
272
273 8
        return $builder;
274
    }
275
276 1
    protected function updateNearestBatch(\Dtc\QueueBundle\Model\Job $job)
277
    {
278
        /** @var DocumentManager $objectManager */
279 1
        $objectManager = $this->getObjectManager();
280 1
        $builder = $objectManager->createQueryBuilder($this->getObjectName());
281 1
        $builder->find();
282
283 1
        $builder->sort('whenAt', 'asc');
284 1
        $builder->field('status')->equals(BaseJob::STATUS_NEW)
285 1
            ->field('crcHash')->equals($job->getCrcHash())
286 1
            ->field('locked')->equals(null);
287 1
        $oldJob = $builder->getQuery()->getSingleResult();
288
289 1
        if (!$oldJob) {
290
            return null;
291
        }
292
293
        // Update priority or whenAt
294
        //  This makes sure if someone else is updating at the same time
295
        //  that we don't trounce their changes.
296 1
        $builder = $objectManager->createQueryBuilder($this->getObjectName());
297 1
        $builder->findAndUpdate();
298 1
        $builder->field('_id')->equals($oldJob->getId());
299 1
        $builder->field('priority')->lt($job->getPriority());
300 1
        $builder->field('priority')->set($job->getPriority());
301 1
        $builder->getQuery()->execute();
302
303 1
        $builder = $objectManager->createQueryBuilder($this->getObjectName());
304 1
        $builder->findAndUpdate();
305 1
        $builder->field('_id')->equals($oldJob->getId());
306 1
        $builder->field('whenAt')->gt($job->getWhenAt());
307 1
        $builder->field('whenAt')->set($job->getWhenAt());
308 1
        $builder->getQuery()->execute();
309
310 1
        if ($job->getWhenAt() < $oldJob->getWhenAt()) {
311 1
            $oldJob->setWhenAt($job->getWhenAt());
312 1
        }
313 1
        if ($job->getPriority() > $oldJob->getPriority()) {
314 1
            $oldJob->setPriority($job->getPriority());
315 1
        }
316
317 1
        return $oldJob;
318
    }
319
}
320