Completed
Push — master ( ae48f4...bf1f19 )
by Matthew
08:18 queued 06:11
created

JobManager::pruneErroneousJobs()   A

Complexity

Conditions 2
Paths 2

Size

Total Lines 17
Code Lines 11

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 10
CRAP Score 2.003

Importance

Changes 1
Bugs 1 Features 0
Metric Value
dl 0
loc 17
ccs 10
cts 11
cp 0.9091
rs 9.4285
c 1
b 1
f 0
cc 2
eloc 11
nc 2
nop 2
crap 2.003
1
<?php
2
3
namespace Dtc\QueueBundle\ODM;
4
5
use Doctrine\MongoDB\Query\Builder;
6
use Dtc\QueueBundle\Doctrine\BaseJobManager;
7
use Doctrine\ODM\MongoDB\DocumentManager;
8
use Dtc\QueueBundle\Document\Job;
9
use Dtc\QueueBundle\Model\BaseJob;
10
use Dtc\QueueBundle\Model\RetryableJob;
11
12
class JobManager extends BaseJobManager
13
{
14
    use CommonTrait;
15
16 3
    public function countJobsByStatus($objectName, $status, $workerName = null, $method = null)
17
    {
18
        /** @var DocumentManager $objectManager */
19 3
        $objectManager = $this->getObjectManager();
20 3
        $qb = $objectManager->createQueryBuilder($objectName);
21
        $qb
22 3
            ->find()
23 3
            ->field('status')->equals($status);
24
25 3
        $this->addWorkerNameCriterion($qb, $workerName, $method);
26 3
        $query = $qb->getQuery();
27
28 3
        return $query->count();
29
    }
30
31
    /**
32
     * @param string|null $workerName
33
     * @param string|null $method
34
     */
35 1
    public function pruneErroneousJobs($workerName = null, $method = null)
36
    {
37
        /** @var DocumentManager $objectManager */
38 1
        $objectManager = $this->getObjectManager();
39 1
        $qb = $objectManager->createQueryBuilder($this->getArchiveObjectName());
40 1
        $qb = $qb->remove();
41 1
        $qb->field('status')->equals(BaseJob::STATUS_ERROR);
42 1
        $this->addWorkerNameCriterion($qb, $workerName, $method);
43
44 1
        $query = $qb->getQuery();
45 1
        $result = $query->execute();
46 1
        if (isset($result['n'])) {
47 1
            return $result['n'];
48
        }
49
50
        return 0;
51
    }
52
53
    /**
54
     * @param Builder     $builder
55
     * @param string|null $workerName
56
     * @param string|null $method
57
     */
58 15
    protected function addWorkerNameCriterion(Builder $builder, $workerName = null, $method = null)
59
    {
60 15
        if (null !== $workerName) {
61 4
            $builder->field('workerName')->equals($workerName);
62 4
        }
63
64 15
        if (null !== $method) {
65 3
            $builder->field('method')->equals($method);
66 3
        }
67 15
    }
68
69
    /**
70
     * @param null $workerName
71
     * @param null $method
72
     *
73
     * @return int
74
     */
75 2
    protected function updateExpired($workerName = null, $method = null)
76
    {
77
        /** @var DocumentManager $objectManager */
78 2
        $objectManager = $this->getObjectManager();
79 2
        $qb = $objectManager->createQueryBuilder($this->getObjectName());
80 2
        $qb = $qb->updateMany();
81 2
        $qb->field('expiresAt')->lte(new \DateTime());
82 2
        $qb->field('status')->equals(BaseJob::STATUS_NEW);
83 2
        $this->addWorkerNameCriterion($qb, $workerName, $method);
84 2
        $qb->field('status')->set(\Dtc\QueueBundle\Model\Job::STATUS_EXPIRED);
85 2
        $query = $qb->getQuery();
86 2
        $result = $query->execute();
87 2
        if (isset($result['n'])) {
88 2
            return $result['n'];
89
        }
90
91
        return 0;
92
    }
93
94
    /**
95
     * Removes archived jobs older than $olderThan.
96
     *
97
     * @param \DateTime $olderThan
98
     *                             return int
99
     */
100 1
    public function pruneArchivedJobs(\DateTime $olderThan)
101
    {
102 1
        return $this->removeOlderThan($this->getArchiveObjectName(), 'updatedAt', $olderThan);
103
    }
104
105 2
    public function getJobCount($workerName = null, $method = null)
106
    {
107
        /** @var DocumentManager $objectManager */
108 2
        $objectManager = $this->getObjectManager();
109 2
        $qb = $objectManager->createQueryBuilder($this->getObjectName());
110
        $qb
111 2
            ->find();
112
113 2
        $this->addWorkerNameCriterion($qb, $workerName, $method);
114
115
        // Filter
116 2
        $date = new \DateTime();
117
        $qb
118 2
            ->addAnd(
119 2
                $qb->expr()->addOr($qb->expr()->field('expiresAt')->equals(null), $qb->expr()->field('expiresAt')->gt($date))
120 2
            )
121 2
            ->field('locked')->equals(null);
122
123 2
        $query = $qb->getQuery();
124
125 2
        return $query->count(true);
126
    }
127
128
    /**
129
     * Get Status Jobs.
130
     *
131
     * @param string $documentName
132
     *
133
     * @return array
134
     */
135 2
    protected function getStatusByDocument($documentName)
136
    {
137
        // Run a map reduce function get worker and status break down
138
        $mapFunc = "function() {
139
            var result = {};
140
            result[this.status] = 1;
141
            var key = this.worker_name + '->' + this.method + '()';
142
            emit(key, result);
143 2
        }";
144
        $reduceFunc = 'function(k, vals) {
145
            var result = {};
146
            for (var index in vals) {
147
                var val =  vals[index];
148
                for (var i in val) {
149
                    if (result.hasOwnProperty(i)) {
150
                        result[i] += val[i];
151
                    }
152
                    else {
153
                        result[i] = val[i];
154
                    }
155
                }
156
            }
157
            return result;
158 2
        }';
159
        /** @var DocumentManager $objectManager */
160 2
        $objectManager = $this->getObjectManager();
161 2
        $qb = $objectManager->createQueryBuilder($documentName);
162 2
        $qb->map($mapFunc)
163 2
            ->reduce($reduceFunc);
164 2
        $query = $qb->getQuery();
165 2
        $results = $query->execute();
166
167
        $allStatus = array(
168 2
            BaseJob::STATUS_ERROR => 0,
169 2
            BaseJob::STATUS_NEW => 0,
170 2
            RetryableJob::STATUS_EXPIRED => 0,
171 2
            RetryableJob::STATUS_MAX_ERROR => 0,
172 2
            RetryableJob::STATUS_MAX_RETRIES => 0,
173 2
            RetryableJob::STATUS_MAX_STALLED => 0,
174 2
            BaseJob::STATUS_RUNNING => 0,
175 2
            BaseJob::STATUS_SUCCESS => 0,
176 2
        );
177
178 2
        $status = [];
179
180 2
        foreach ($results as $info) {
181 1
            $status[$info['_id']] = $info['value'] + $allStatus;
182 2
        }
183
184 2
        return $status;
185
    }
186
187 2
    public function getStatus()
188
    {
189 2
        $result = $this->getStatusByDocument($this->getObjectName());
190 2
        $status2 = $this->getStatusByDocument($this->getArchiveObjectName());
191 2
        foreach ($status2 as $key => $value) {
192 1
            foreach ($value as $k => $v) {
193 1
                $result[$key][$k] += $v;
194 1
            }
195 2
        }
196
197 2
        $finalResult = [];
198 2
        foreach ($result as $key => $item) {
199 1
            ksort($item);
200 1
            $finalResult[$key] = $item;
201 2
        }
202
203 2
        return $finalResult;
204
    }
205
206
    /**
207
     * Get the next job to run (can be filtered by workername and method name).
208
     *
209
     * @param string $workerName
210
     * @param string $methodName
211
     * @param bool   $prioritize
212
     *
213
     * @return \Dtc\QueueBundle\Model\Job
214
     */
215 8
    public function getJob($workerName = null, $methodName = null, $prioritize = true, $runId = null)
216
    {
217
        /** @var DocumentManager $objectManager */
218 8
        $objectManager = $this->getObjectManager();
219 8
        $qb = $objectManager->createQueryBuilder($this->getObjectName());
220
        $qb
221 8
            ->findAndUpdate()
222 8
            ->returnNew();
223
224 8
        $this->addWorkerNameCriterion($qb, $workerName, $methodName);
225 8
        if ($prioritize) {
226 8
            $qb->sort([
227 8
                'priority' => 'desc',
228 8
                'whenAt' => 'asc',
229 8
            ]);
230 8
        } else {
231
            $qb->sort('whenAt', 'asc');
232
        }
233
234
        // Filter
235 8
        $date = new \DateTime();
236
        $qb
237 8
            ->addAnd(
238 8
                $qb->expr()->addOr($qb->expr()->field('whenAt')->equals(null), $qb->expr()->field('whenAt')->lte($date)),
239 8
                $qb->expr()->addOr($qb->expr()->field('expiresAt')->equals(null), $qb->expr()->field('expiresAt')->gt($date))
240 8
            )
241 8
            ->field('status')->equals(BaseJob::STATUS_NEW)
242 8
            ->field('locked')->equals(null);
243
244
        // Update
245
        $qb
246 8
            ->field('lockedAt')->set($date) // Set started
247 8
            ->field('locked')->set(true)
248 8
            ->field('status')->set(BaseJob::STATUS_RUNNING)
249 8
            ->field('runId')->set($runId);
250
251 8
        $query = $qb->getQuery();
252
253 8
        $job = $query->execute();
254
255 8
        return $job;
256
    }
257
258 1
    protected function updateNearestBatch(\Dtc\QueueBundle\Model\Job $job)
259
    {
260
        /** @var DocumentManager $objectManager */
261 1
        $objectManager = $this->getObjectManager();
262 1
        $qb = $objectManager->createQueryBuilder($this->getObjectName());
263 1
        $qb->find();
264
265 1
        $qb->sort('whenAt', 'asc');
266 1
        $qb->field('status')->equals(BaseJob::STATUS_NEW)
267 1
            ->field('crcHash')->equals($job->getCrcHash())
268 1
            ->field('locked')->equals(null);
269 1
        $oldJob = $qb->getQuery()->getSingleResult();
270
271 1
        if (!$oldJob) {
272
            return null;
273
        }
274
275
        // Update priority or whenAt
276
        //  This makes sure if someone else is updating at the same time
277
        //  that we don't trounce their changes.
278 1
        $qb = $objectManager->createQueryBuilder($this->getObjectName());
279 1
        $qb->findAndUpdate();
280 1
        $qb->field('_id')->equals($oldJob->getId());
281 1
        $qb->field('priority')->lt($job->getPriority());
282 1
        $qb->field('priority')->set($job->getPriority());
283 1
        $qb->getQuery()->execute();
284
285 1
        $qb = $objectManager->createQueryBuilder($this->getObjectName());
286 1
        $qb->findAndUpdate();
287 1
        $qb->field('_id')->equals($oldJob->getId());
288 1
        $qb->field('whenAt')->gt($job->getWhenAt());
289 1
        $qb->field('whenAt')->set($job->getWhenAt());
290 1
        $qb->getQuery()->execute();
291
292 1
        if ($job->getWhenAt() < $oldJob->getWhenAt()) {
293 1
            $oldJob->setWhenAt($job->getWhenAt());
294 1
        }
295 1
        if ($job->getPriority() > $oldJob->getPriority()) {
296 1
            $oldJob->setPriority($job->getPriority());
297 1
        }
298
299 1
        return $oldJob;
300
    }
301
}
302