Completed
Push — master ( 16d212...b7b31c )
by Matthew
07:34 queued 05:11
created

JobManager::addWorkerNameCriterion()   A

Complexity

Conditions 3
Paths 4

Size

Total Lines 10
Code Lines 5

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 8
CRAP Score 3

Importance

Changes 0
Metric Value
dl 0
loc 10
ccs 8
cts 8
cp 1
rs 9.4285
c 0
b 0
f 0
cc 3
eloc 5
nc 4
nop 3
crap 3
1
<?php
2
3
namespace Dtc\QueueBundle\ODM;
4
5
use Doctrine\MongoDB\Query\Builder;
6
use Dtc\QueueBundle\Doctrine\BaseJobManager;
7
use Doctrine\ODM\MongoDB\DocumentManager;
8
use Dtc\QueueBundle\Model\BaseJob;
9
use Dtc\QueueBundle\Model\RetryableJob;
10
11
class JobManager extends BaseJobManager
12
{
13
    use CommonTrait;
14
15 3
    public function countJobsByStatus($objectName, $status, $workerName = null, $method = null)
16
    {
17
        /** @var DocumentManager $objectManager */
18 3
        $objectManager = $this->getObjectManager();
19 3
        $qb = $objectManager->createQueryBuilder($objectName);
20
        $qb
21 3
            ->find()
22 3
            ->field('status')->equals($status);
23
24 3
        $this->addWorkerNameCriterion($qb, $workerName, $method);
25 3
        $query = $qb->getQuery();
26
27 3
        return $query->count();
28
    }
29
30
    /**
31
     * @param string|null $workerName
32
     * @param string|null $method
33
     */
34 1
    public function pruneErroneousJobs($workerName = null, $method = null)
35
    {
36
        /** @var DocumentManager $objectManager */
37 1
        $objectManager = $this->getObjectManager();
38 1
        $qb = $objectManager->createQueryBuilder($this->getArchiveObjectName());
39 1
        $qb = $qb->remove();
40 1
        $qb->field('status')->equals(BaseJob::STATUS_ERROR);
41 1
        $this->addWorkerNameCriterion($qb, $workerName, $method);
42
43 1
        $query = $qb->getQuery();
44 1
        $result = $query->execute();
45 1
        if (isset($result['n'])) {
46 1
            return $result['n'];
47
        }
48
49
        return 0;
50
    }
51
52
    /**
53
     * @param Builder     $builder
54
     * @param string|null $workerName
55
     * @param string|null $method
56
     */
57 15
    protected function addWorkerNameCriterion(Builder $builder, $workerName = null, $method = null)
58
    {
59 15
        if (null !== $workerName) {
60 4
            $builder->field('workerName')->equals($workerName);
61 4
        }
62
63 15
        if (null !== $method) {
64 3
            $builder->field('method')->equals($method);
65 3
        }
66 15
    }
67
68
    /**
69
     * @param null $workerName
70
     * @param null $method
71
     *
72
     * @return int
73
     */
74 2
    protected function updateExpired($workerName = null, $method = null)
75
    {
76
        /** @var DocumentManager $objectManager */
77 2
        $objectManager = $this->getObjectManager();
78 2
        $qb = $objectManager->createQueryBuilder($this->getObjectName());
79 2
        $qb = $qb->updateMany();
80 2
        $qb->field('expiresAt')->lte(new \DateTime());
81 2
        $qb->field('status')->equals(BaseJob::STATUS_NEW);
82 2
        $this->addWorkerNameCriterion($qb, $workerName, $method);
83 2
        $qb->field('status')->set(\Dtc\QueueBundle\Model\Job::STATUS_EXPIRED);
84 2
        $query = $qb->getQuery();
85 2
        $result = $query->execute();
86 2
        if (isset($result['n'])) {
87 2
            return $result['n'];
88
        }
89
90
        return 0;
91
    }
92
93
    /**
94
     * Removes archived jobs older than $olderThan.
95
     *
96
     * @param \DateTime $olderThan
97
     *                             return int
98
     */
99 1
    public function pruneArchivedJobs(\DateTime $olderThan)
100
    {
101 1
        return $this->removeOlderThan($this->getArchiveObjectName(), 'updatedAt', $olderThan);
102
    }
103
104 2
    public function getJobCount($workerName = null, $method = null)
105
    {
106
        /** @var DocumentManager $objectManager */
107 2
        $objectManager = $this->getObjectManager();
108 2
        $qb = $objectManager->createQueryBuilder($this->getObjectName());
109
        $qb
110 2
            ->find();
111
112 2
        $this->addWorkerNameCriterion($qb, $workerName, $method);
113
114
        // Filter
115 2
        $date = new \DateTime();
116
        $qb
117 2
            ->addAnd(
118 2
                $qb->expr()->addOr($qb->expr()->field('expiresAt')->equals(null), $qb->expr()->field('expiresAt')->gt($date))
119 2
            )
120 2
            ->field('locked')->equals(null);
121
122 2
        $query = $qb->getQuery();
123
124 2
        return $query->count(true);
125
    }
126
127
    /**
128
     * Get Status Jobs.
129
     *
130
     * @param string $documentName
131
     *
132
     * @return array
133
     */
134 2
    protected function getStatusByDocument($documentName)
135
    {
136
        // Run a map reduce function get worker and status break down
137
        $mapFunc = "function() {
138
            var result = {};
139
            result[this.status] = 1;
140
            var key = this.worker_name + '->' + this.method + '()';
141
            emit(key, result);
142 2
        }";
143
        $reduceFunc = 'function(k, vals) {
144
            var result = {};
145
            for (var index in vals) {
146
                var val =  vals[index];
147
                for (var i in val) {
148
                    if (result.hasOwnProperty(i)) {
149
                        result[i] += val[i];
150
                    }
151
                    else {
152
                        result[i] = val[i];
153
                    }
154
                }
155
            }
156
            return result;
157 2
        }';
158
        /** @var DocumentManager $objectManager */
159 2
        $objectManager = $this->getObjectManager();
160 2
        $qb = $objectManager->createQueryBuilder($documentName);
161 2
        $qb->map($mapFunc)
162 2
            ->reduce($reduceFunc);
163 2
        $query = $qb->getQuery();
164 2
        $results = $query->execute();
165
166
        $allStatus = array(
167 2
            BaseJob::STATUS_ERROR => 0,
168 2
            BaseJob::STATUS_NEW => 0,
169 2
            RetryableJob::STATUS_EXPIRED => 0,
170 2
            RetryableJob::STATUS_MAX_ERROR => 0,
171 2
            RetryableJob::STATUS_MAX_RETRIES => 0,
172 2
            RetryableJob::STATUS_MAX_STALLED => 0,
173 2
            BaseJob::STATUS_RUNNING => 0,
174 2
            BaseJob::STATUS_SUCCESS => 0,
175 2
        );
176
177 2
        $status = [];
178
179 2
        foreach ($results as $info) {
180 1
            $status[$info['_id']] = $info['value'] + $allStatus;
181 2
        }
182
183 2
        return $status;
184
    }
185
186 2
    public function getStatus()
187
    {
188 2
        $result = $this->getStatusByDocument($this->getObjectName());
189 2
        $status2 = $this->getStatusByDocument($this->getArchiveObjectName());
190 2
        foreach ($status2 as $key => $value) {
191 1
            foreach ($value as $k => $v) {
192 1
                $result[$key][$k] += $v;
193 1
            }
194 2
        }
195
196 2
        $finalResult = [];
197 2
        foreach ($result as $key => $item) {
198 1
            ksort($item);
199 1
            $finalResult[$key] = $item;
200 2
        }
201
202 2
        return $finalResult;
203
    }
204
205
    /**
206
     * Get the next job to run (can be filtered by workername and method name).
207
     *
208
     * @param string $workerName
209
     * @param string $methodName
210
     * @param bool   $prioritize
211
     *
212
     * @return \Dtc\QueueBundle\Model\Job
213
     */
214 8
    public function getJob($workerName = null, $methodName = null, $prioritize = true, $runId = null)
215
    {
216 8
        $builder = $this->getJobQueryBuilder($workerName, $methodName, $prioritize);
0 ignored issues
show
Bug introduced by
It seems like $workerName defined by parameter $workerName on line 214 can also be of type string; however, Dtc\QueueBundle\ODM\JobM...r::getJobQueryBuilder() does only seem to accept null, maybe add an additional type check?

This check looks at variables that have been passed in as parameters and are passed out again to other methods.

If the outgoing method call has stricter type requirements than the method itself, an issue is raised.

An additional type check may prevent trouble.

Loading history...
Bug introduced by
It seems like $methodName defined by parameter $methodName on line 214 can also be of type string; however, Dtc\QueueBundle\ODM\JobM...r::getJobQueryBuilder() does only seem to accept null, maybe add an additional type check?

This check looks at variables that have been passed in as parameters and are passed out again to other methods.

If the outgoing method call has stricter type requirements than the method itself, an issue is raised.

An additional type check may prevent trouble.

Loading history...
217
        $builder
218 8
            ->findAndUpdate()
219 8
            ->returnNew();
220
221 8
        $date = new \DateTime();
222
        // Update
223
        $builder
224 8
            ->field('lockedAt')->set($date) // Set started
225 8
            ->field('locked')->set(true)
226 8
            ->field('status')->set(BaseJob::STATUS_RUNNING)
227 8
            ->field('runId')->set($runId);
228
229 8
        $query = $builder->getQuery();
230
231 8
        $job = $query->execute();
232
233 8
        return $job;
234
    }
235
236
    /**
237
     * @param null $workerName
238
     * @param null $methodName
239
     * @param bool $prioritize
240
     *
241
     * @return Builder
242
     */
243 8
    public function getJobQueryBuilder($workerName = null, $methodName = null, $prioritize = true)
244
    {
245
        /** @var DocumentManager $objectManager */
246 8
        $objectManager = $this->getObjectManager();
247 8
        $builder = $objectManager->createQueryBuilder($this->getObjectName());
248
249 8
        $this->addWorkerNameCriterion($builder, $workerName, $methodName);
250 8
        if ($prioritize) {
251 8
            $builder->sort([
252 8
                'priority' => 'desc',
253 8
                'whenAt' => 'asc',
254 8
            ]);
255 8
        } else {
256 1
            $builder->sort('whenAt', 'asc');
257
        }
258
259
        // Filter
260 8
        $date = new \DateTime();
261
        $builder
262 8
            ->addAnd(
263 8
                $builder->expr()->addOr($builder->expr()->field('whenAt')->equals(null), $builder->expr()->field('whenAt')->lte($date)),
264 8
                $builder->expr()->addOr($builder->expr()->field('expiresAt')->equals(null), $builder->expr()->field('expiresAt')->gt($date))
265 8
            )
266 8
            ->field('status')->equals(BaseJob::STATUS_NEW)
267 8
            ->field('locked')->equals(null);
268
269 8
        return $builder;
270
    }
271
272 1
    protected function updateNearestBatch(\Dtc\QueueBundle\Model\Job $job)
273
    {
274
        /** @var DocumentManager $objectManager */
275 1
        $objectManager = $this->getObjectManager();
276 1
        $builder = $objectManager->createQueryBuilder($this->getObjectName());
277 1
        $builder->find();
278
279 1
        $builder->sort('whenAt', 'asc');
280 1
        $builder->field('status')->equals(BaseJob::STATUS_NEW)
281 1
            ->field('crcHash')->equals($job->getCrcHash())
282 1
            ->field('locked')->equals(null);
283 1
        $oldJob = $builder->getQuery()->getSingleResult();
284
285 1
        if (!$oldJob) {
286
            return null;
287
        }
288
289
        // Update priority or whenAt
290
        //  This makes sure if someone else is updating at the same time
291
        //  that we don't trounce their changes.
292 1
        $builder = $objectManager->createQueryBuilder($this->getObjectName());
293 1
        $builder->findAndUpdate();
294 1
        $builder->field('_id')->equals($oldJob->getId());
295 1
        $builder->field('priority')->lt($job->getPriority());
296 1
        $builder->field('priority')->set($job->getPriority());
297 1
        $builder->getQuery()->execute();
298
299 1
        $builder = $objectManager->createQueryBuilder($this->getObjectName());
300 1
        $builder->findAndUpdate();
301 1
        $builder->field('_id')->equals($oldJob->getId());
302 1
        $builder->field('whenAt')->gt($job->getWhenAt());
303 1
        $builder->field('whenAt')->set($job->getWhenAt());
304 1
        $builder->getQuery()->execute();
305
306 1
        if ($job->getWhenAt() < $oldJob->getWhenAt()) {
307 1
            $oldJob->setWhenAt($job->getWhenAt());
308 1
        }
309 1
        if ($job->getPriority() > $oldJob->getPriority()) {
310 1
            $oldJob->setPriority($job->getPriority());
311 1
        }
312
313 1
        return $oldJob;
314
    }
315
}
316