Completed
Pull Request — master (#24)
by Matthew
12:26
created

JobManager::pruneErroneousJobs()   A

Complexity

Conditions 2
Paths 2

Size

Total Lines 17
Code Lines 11

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 10
CRAP Score 2.003

Importance

Changes 1
Bugs 1 Features 0
Metric Value
dl 0
loc 17
ccs 10
cts 11
cp 0.9091
rs 9.4285
c 1
b 1
f 0
cc 2
eloc 11
nc 2
nop 2
crap 2.003
1
<?php
2
3
namespace Dtc\QueueBundle\ODM;
4
5
use Doctrine\MongoDB\Query\Builder;
6
use Dtc\QueueBundle\Doctrine\BaseJobManager;
7
use Doctrine\ODM\MongoDB\DocumentManager;
8
use Dtc\QueueBundle\Document\Job;
9
use Dtc\QueueBundle\Model\BaseJob;
10
use Dtc\QueueBundle\Model\RetryableJob;
11
12
class JobManager extends BaseJobManager
13
{
14
    use CommonTrait;
15
16 3
    public function countJobsByStatus($objectName, $status, $workerName = null, $method = null)
17
    {
18
        /** @var DocumentManager $objectManager */
19 3
        $objectManager = $this->getObjectManager();
20 3
        $qb = $objectManager->createQueryBuilder($objectName);
21
        $qb
22 3
            ->find()
23 3
            ->field('status')->equals($status);
24
25 3
        $this->addWorkerNameCriterion($qb, $workerName, $method);
26 3
        $query = $qb->getQuery();
27
28 3
        return $query->count();
29
    }
30
31
    /**
32
     * @param string|null $workerName
33
     * @param string|null $method
34
     */
35 1
    public function pruneErroneousJobs($workerName = null, $method = null)
36
    {
37
        /** @var DocumentManager $objectManager */
38 1
        $objectManager = $this->getObjectManager();
39 1
        $qb = $objectManager->createQueryBuilder($this->getArchiveObjectName());
40 1
        $qb = $qb->remove();
41 1
        $qb->field('status')->equals(BaseJob::STATUS_ERROR);
42 1
        $this->addWorkerNameCriterion($qb, $workerName, $method);
43
44 1
        $query = $qb->getQuery();
45 1
        $result = $query->execute();
46 1
        if (isset($result['n'])) {
47 1
            return $result['n'];
48
        }
49
50
        return 0;
51
    }
52
53
    /**
54
     * @param Builder     $builder
55
     * @param string|null $workerName
56
     * @param string|null $method
57
     */
58 15
    protected function addWorkerNameCriterion(Builder $builder, $workerName = null, $method = null)
59
    {
60 15
        if (null !== $workerName) {
61 4
            $builder->field('workerName')->equals($workerName);
62
        }
63
64 15
        if (null !== $method) {
65 3
            $builder->field('method')->equals($method);
66
        }
67 15
    }
68
69
    /**
70
     * @param null $workerName
71
     * @param null $method
72
     *
73
     * @return int
74
     */
75 2
    protected function updateExpired($workerName = null, $method = null)
76
    {
77
        /** @var DocumentManager $objectManager */
78 2
        $objectManager = $this->getObjectManager();
79 2
        $qb = $objectManager->createQueryBuilder($this->getObjectName());
80 2
        $qb = $qb->updateMany();
81 2
        $qb->field('expiresAt')->lte(new \DateTime());
82 2
        $qb->field('status')->equals(BaseJob::STATUS_NEW);
83 2
        $this->addWorkerNameCriterion($qb, $workerName, $method);
84 2
        $qb->field('status')->set(\Dtc\QueueBundle\Model\Job::STATUS_EXPIRED);
85 2
        $query = $qb->getQuery();
86 2
        $result = $query->execute();
87 2
        if (isset($result['n'])) {
88 2
            return $result['n'];
89
        }
90
91
        return 0;
92
    }
93
94
    /**
95
     * Removes archived jobs older than $olderThan.
96
     *
97
     * @param \DateTime $olderThan
98
     *                             return int
99
     */
100 1
    public function pruneArchivedJobs(\DateTime $olderThan)
101
    {
102 1
        return $this->removeOlderThan($this->getArchiveObjectName(), 'updatedAt', $olderThan);
103
    }
104
105 2
    public function getJobCount($workerName = null, $method = null)
106
    {
107
        /** @var DocumentManager $objectManager */
108 2
        $objectManager = $this->getObjectManager();
109 2
        $qb = $objectManager->createQueryBuilder($this->getObjectName());
110
        $qb
111 2
            ->find();
112
113 2
        $this->addWorkerNameCriterion($qb, $workerName, $method);
114
115
        // Filter
116 2
        $date = new \DateTime();
117
        $qb
118 2
            ->addAnd(
119 2
                $qb->expr()->addOr($qb->expr()->field('expiresAt')->equals(null), $qb->expr()->field('expiresAt')->gt($date))
120
            )
121 2
            ->field('locked')->equals(null);
122
123 2
        $query = $qb->getQuery();
124
125 2
        return $query->count(true);
126
    }
127
128 1
    protected function getJobCurrentStatus(\Dtc\QueueBundle\Model\Job $job)
129
    {
130
        /** @var DocumentManager $objectManager */
131 1
        $objectManager = $this->getObjectManager();
132 1
        $qb = $objectManager->createQueryBuilder($this->getObjectName())->select('status')->field('_id')->equals($job->getId());
133 1
        $query = $qb->getQuery();
134 1
        $query->setHydrate(false);
135 1
        $result = $query->getSingleResult();
136
137 1
        return isset($result['status']) ? $result['status'] : null;
138
    }
139
140
    /**
141
     * Get Status Jobs.
142
     *
143
     * @param string $documentName
144
     *
145
     * @return array
146
     */
147 2
    protected function getStatusByDocument($documentName)
148
    {
149
        // Run a map reduce function get worker and status break down
150 2
        $mapFunc = "function() {
151
            var result = {};
152
            result[this.status] = 1;
153
            var key = this.worker_name + '->' + this.method + '()';
154
            emit(key, result);
155
        }";
156 2
        $reduceFunc = 'function(k, vals) {
157
            var result = {};
158
            for (var index in vals) {
159
                var val =  vals[index];
160
                for (var i in val) {
161
                    if (result.hasOwnProperty(i)) {
162
                        result[i] += val[i];
163
                    }
164
                    else {
165
                        result[i] = val[i];
166
                    }
167
                }
168
            }
169
            return result;
170
        }';
171
        /** @var DocumentManager $objectManager */
172 2
        $objectManager = $this->getObjectManager();
173 2
        $qb = $objectManager->createQueryBuilder($documentName);
174 2
        $qb->map($mapFunc)
175 2
            ->reduce($reduceFunc);
176 2
        $query = $qb->getQuery();
177 2
        $results = $query->execute();
178
179
        $allStatus = array(
180 2
            BaseJob::STATUS_ERROR => 0,
181 2
            BaseJob::STATUS_NEW => 0,
182 2
            RetryableJob::STATUS_EXPIRED => 0,
183 2
            RetryableJob::STATUS_MAX_ERROR => 0,
184 2
            RetryableJob::STATUS_MAX_RETRIES => 0,
185 2
            RetryableJob::STATUS_MAX_STALLED => 0,
186 2
            BaseJob::STATUS_RUNNING => 0,
187 2
            BaseJob::STATUS_SUCCESS => 0,
188
        );
189
190 2
        $status = [];
191
192 2
        foreach ($results as $info) {
193 1
            $status[$info['_id']] = $info['value'] + $allStatus;
194
        }
195
196 2
        return $status;
197
    }
198
199 2
    public function getStatus()
200
    {
201 2
        $result = $this->getStatusByDocument($this->getObjectName());
202 2
        $status2 = $this->getStatusByDocument($this->getArchiveObjectName());
203 2
        foreach ($status2 as $key => $value) {
204 1
            foreach ($value as $k => $v) {
205 1
                $result[$key][$k] += $v;
206
            }
207
        }
208
209 2
        $finalResult = [];
210 2
        foreach ($result as $key => $item) {
211 1
            ksort($item);
212 1
            $finalResult[$key] = $item;
213
        }
214
215 2
        return $finalResult;
216
    }
217
218
    /**
219
     * Get the next job to run (can be filtered by workername and method name).
220
     *
221
     * @param string $workerName
222
     * @param string $methodName
223
     * @param bool   $prioritize
224
     *
225
     * @return \Dtc\QueueBundle\Model\Job
226
     */
227 8
    public function getJob($workerName = null, $methodName = null, $prioritize = true, $runId = null)
228
    {
229
        /** @var DocumentManager $objectManager */
230 8
        $objectManager = $this->getObjectManager();
231 8
        $qb = $objectManager->createQueryBuilder($this->getObjectName());
232
        $qb
233 8
            ->findAndUpdate()
234 8
            ->returnNew();
235
236 8
        $this->addWorkerNameCriterion($qb, $workerName, $methodName);
237 8
        if ($prioritize) {
238 8
            $qb->sort([
239 8
                'priority' => 'desc',
240
                'whenAt' => 'asc',
241
            ]);
242
        } else {
243
            $qb->sort('whenAt', 'asc');
244
        }
245
246
        // Filter
247 8
        $date = new \DateTime();
248
        $qb
249 8
            ->addAnd(
250 8
                $qb->expr()->addOr($qb->expr()->field('whenAt')->equals(null), $qb->expr()->field('whenAt')->lte($date)),
251 8
                $qb->expr()->addOr($qb->expr()->field('expiresAt')->equals(null), $qb->expr()->field('expiresAt')->gt($date))
252
            )
253 8
            ->field('status')->equals(BaseJob::STATUS_NEW)
254 8
            ->field('locked')->equals(null);
255
256
        // Update
257
        $qb
258 8
            ->field('lockedAt')->set($date) // Set started
259 8
            ->field('locked')->set(true)
260 8
            ->field('status')->set(BaseJob::STATUS_RUNNING)
261 8
            ->field('runId')->set($runId);
262
263 8
        $query = $qb->getQuery();
264
265 8
        $job = $query->execute();
266
267 8
        return $job;
268
    }
269
270 1
    protected function updateNearestBatch(\Dtc\QueueBundle\Model\Job $job)
271
    {
272
        /** @var DocumentManager $objectManager */
273 1
        $objectManager = $this->getObjectManager();
274 1
        $qb = $objectManager->createQueryBuilder($this->getObjectName());
275 1
        $qb->find();
276
277 1
        $qb->sort('whenAt', 'asc');
278 1
        $qb->field('status')->equals(BaseJob::STATUS_NEW)
279 1
            ->field('crcHash')->equals($job->getCrcHash())
280 1
            ->field('locked')->equals(null);
281 1
        $oldJob = $qb->getQuery()->getSingleResult();
282
283 1
        if (!$oldJob) {
284
            return null;
285
        }
286
287
        // Update priority or whenAt
288
        //  This makes sure if someone else is updating at the same time
289
        //  that we don't trounce their changes.
290 1
        $qb = $objectManager->createQueryBuilder($this->getObjectName());
291 1
        $qb->findAndUpdate();
292 1
        $qb->field('_id')->equals($oldJob->getId());
293 1
        $qb->field('priority')->lt($job->getPriority());
294 1
        $qb->field('priority')->set($job->getPriority());
295 1
        $qb->getQuery()->execute();
296
297 1
        $qb = $objectManager->createQueryBuilder($this->getObjectName());
298 1
        $qb->findAndUpdate();
299 1
        $qb->field('_id')->equals($oldJob->getId());
300 1
        $qb->field('whenAt')->gt($job->getWhenAt());
301 1
        $qb->field('whenAt')->set($job->getWhenAt());
302 1
        $qb->getQuery()->execute();
303
304 1
        if ($job->getWhenAt() < $oldJob->getWhenAt()) {
305 1
            $oldJob->setWhenAt($job->getWhenAt());
306
        }
307 1
        if ($job->getPriority() > $oldJob->getPriority()) {
308 1
            $oldJob->setPriority($job->getPriority());
309
        }
310
311 1
        return $oldJob;
312
    }
313
}
314