Completed
Push — master ( 122dc2...fa7a2a )
by Matthew
09:15
created

JobManager::updateNearestBatch()   B

Complexity

Conditions 4
Paths 5

Size

Total Lines 43
Code Lines 28

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 27
CRAP Score 4.0007

Importance

Changes 0
Metric Value
dl 0
loc 43
ccs 27
cts 28
cp 0.9643
rs 8.5806
c 0
b 0
f 0
cc 4
eloc 28
nc 5
nop 1
crap 4.0007
1
<?php
2
3
namespace Dtc\QueueBundle\ODM;
4
5
use Doctrine\MongoDB\Query\Builder;
6
use Dtc\QueueBundle\Doctrine\BaseJobManager;
7
use Doctrine\ODM\MongoDB\DocumentManager;
8
use Dtc\QueueBundle\Document\Job;
9
use Dtc\QueueBundle\Model\BaseJob;
10
use Dtc\QueueBundle\Model\RetryableJob;
11
12
class JobManager extends BaseJobManager
13
{
14
    use CommonTrait;
15
16 3
    public function countJobsByStatus($objectName, $status, $workerName = null, $method = null)
17
    {
18
        /** @var DocumentManager $objectManager */
19 3
        $objectManager = $this->getObjectManager();
20 3
        $qb = $objectManager->createQueryBuilder($objectName);
21
        $qb
22 3
            ->find()
23 3
            ->field('status')->equals($status);
24
25 3
        $this->addWorkerNameCriterion($qb, $workerName, $method);
26 3
        $query = $qb->getQuery();
27
28 3
        return $query->count();
29
    }
30
31
    /**
32
     * @param string|null $workerName
33
     * @param string|null $method
34
     */
35 1
    public function pruneErroneousJobs($workerName = null, $method = null)
36
    {
37
        /** @var DocumentManager $objectManager */
38 1
        $objectManager = $this->getObjectManager();
39 1
        $qb = $objectManager->createQueryBuilder($this->getArchiveObjectName());
40 1
        $qb = $qb->remove();
41 1
        $qb->field('status')->equals(BaseJob::STATUS_ERROR);
42 1
        $this->addWorkerNameCriterion($qb, $workerName, $method);
43
44 1
        $query = $qb->getQuery();
45 1
        $result = $query->execute();
46 1
        if (isset($result['n'])) {
47 1
            return $result['n'];
48
        }
49
50
        return 0;
51
    }
52
53
    /**
54
     * @param Builder     $builder
55
     * @param string|null $workerName
56
     * @param string|null $method
57
     */
58 15
    protected function addWorkerNameCriterion(Builder $builder, $workerName = null, $method = null)
59
    {
60 15
        if (null !== $workerName) {
61 4
            $builder->field('workerName')->equals($workerName);
62
        }
63
64 15
        if (null !== $method) {
65 3
            $builder->field('method')->equals($method);
66
        }
67 15
    }
68
69
    /**
70
     * @param null $workerName
71
     * @param null $method
72
     *
73
     * @return int
74
     */
75 2
    protected function updateExpired($workerName = null, $method = null)
76
    {
77
        /** @var DocumentManager $objectManager */
78 2
        $objectManager = $this->getObjectManager();
79 2
        $qb = $objectManager->createQueryBuilder($this->getObjectName());
80 2
        $qb = $qb->updateMany();
81 2
        $qb->field('expiresAt')->lte(new \DateTime());
82 2
        $qb->field('status')->equals(BaseJob::STATUS_NEW);
83 2
        $this->addWorkerNameCriterion($qb, $workerName, $method);
84 2
        $qb->field('status')->set(\Dtc\QueueBundle\Model\Job::STATUS_EXPIRED);
85 2
        $query = $qb->getQuery();
86 2
        $result = $query->execute();
87 2
        if (isset($result['n'])) {
88 2
            return $result['n'];
89
        }
90
91
        return 0;
92
    }
93
94
    /**
95
     * Removes archived jobs older than $olderThan.
96
     *
97
     * @param \DateTime $olderThan
98
     *                             return int
99
     */
100 1
    public function pruneArchivedJobs(\DateTime $olderThan)
101
    {
102
        /** @var DocumentManager $documentManager */
103 1
        $documentManager = $this->getObjectManager();
104
105 1
        return $this->removeOlderThan($documentManager, $this->getArchiveObjectName(), 'updatedAt', $olderThan);
106
    }
107
108 2
    public function getJobCount($workerName = null, $method = null)
109
    {
110
        /** @var DocumentManager $objectManager */
111 2
        $objectManager = $this->getObjectManager();
112 2
        $qb = $objectManager->createQueryBuilder($this->getObjectName());
113
        $qb
114 2
            ->find();
115
116 2
        $this->addWorkerNameCriterion($qb, $workerName, $method);
117
118
        // Filter
119 2
        $date = new \DateTime();
120
        $qb
121 2
            ->addAnd(
122 2
                $qb->expr()->addOr($qb->expr()->field('expiresAt')->equals(null), $qb->expr()->field('expiresAt')->gt($date))
123
            )
124 2
            ->field('locked')->equals(null);
125
126 2
        $query = $qb->getQuery();
127
128 2
        return $query->count(true);
129
    }
130
131
    /**
132
     * Get Status Jobs.
133
     *
134
     * @param string $documentName
135
     *
136
     * @return array
137
     */
138 2
    protected function getStatusByDocument($documentName)
139
    {
140
        // Run a map reduce function get worker and status break down
141 2
        $mapFunc = "function() {
142
            var result = {};
143
            result[this.status] = 1;
144
            var key = this.worker_name + '->' + this.method + '()';
145
            emit(key, result);
146
        }";
147 2
        $reduceFunc = 'function(k, vals) {
148
            var result = {};
149
            for (var index in vals) {
150
                var val =  vals[index];
151
                for (var i in val) {
152
                    if (result.hasOwnProperty(i)) {
153
                        result[i] += val[i];
154
                    }
155
                    else {
156
                        result[i] = val[i];
157
                    }
158
                }
159
            }
160
            return result;
161
        }';
162
        /** @var DocumentManager $objectManager */
163 2
        $objectManager = $this->getObjectManager();
164 2
        $qb = $objectManager->createQueryBuilder($documentName);
165 2
        $qb->map($mapFunc)
166 2
            ->reduce($reduceFunc);
167 2
        $query = $qb->getQuery();
168 2
        $results = $query->execute();
169
170
        $allStatus = array(
171 2
            BaseJob::STATUS_ERROR => 0,
172 2
            BaseJob::STATUS_NEW => 0,
173 2
            RetryableJob::STATUS_EXPIRED => 0,
174 2
            RetryableJob::STATUS_MAX_ERROR => 0,
175 2
            RetryableJob::STATUS_MAX_RETRIES => 0,
176 2
            RetryableJob::STATUS_MAX_STALLED => 0,
177 2
            BaseJob::STATUS_RUNNING => 0,
178 2
            BaseJob::STATUS_SUCCESS => 0,
179
        );
180
181 2
        $status = [];
182
183 2
        foreach ($results as $info) {
184 1
            $status[$info['_id']] = $info['value'] + $allStatus;
185
        }
186
187 2
        return $status;
188
    }
189
190 2
    public function getStatus()
191
    {
192 2
        $result = $this->getStatusByDocument($this->getObjectName());
193 2
        $status2 = $this->getStatusByDocument($this->getArchiveObjectName());
194 2
        foreach ($status2 as $key => $value) {
195 1
            foreach ($value as $k => $v) {
196 1
                $result[$key][$k] += $v;
197
            }
198
        }
199
200 2
        $finalResult = [];
201 2
        foreach ($result as $key => $item) {
202 1
            ksort($item);
203 1
            $finalResult[$key] = $item;
204
        }
205
206 2
        return $finalResult;
207
    }
208
209
    /**
210
     * Get the next job to run (can be filtered by workername and method name).
211
     *
212
     * @param string $workerName
213
     * @param string $methodName
214
     * @param bool   $prioritize
215
     *
216
     * @return \Dtc\QueueBundle\Model\Job
217
     */
218 8
    public function getJob($workerName = null, $methodName = null, $prioritize = true, $runId = null)
219
    {
220
        /** @var DocumentManager $objectManager */
221 8
        $objectManager = $this->getObjectManager();
222 8
        $qb = $objectManager->createQueryBuilder($this->getObjectName());
223
        $qb
224 8
            ->findAndUpdate()
225 8
            ->returnNew();
226
227 8
        $this->addWorkerNameCriterion($qb, $workerName, $methodName);
228 8
        if ($prioritize) {
229 8
            $qb->sort([
230 8
                'priority' => 'desc',
231
                'whenAt' => 'asc',
232
            ]);
233
        } else {
234
            $qb->sort('whenAt', 'asc');
235
        }
236
237
        // Filter
238 8
        $date = new \DateTime();
239
        $qb
240 8
            ->addAnd(
241 8
                $qb->expr()->addOr($qb->expr()->field('whenAt')->equals(null), $qb->expr()->field('whenAt')->lte($date)),
242 8
                $qb->expr()->addOr($qb->expr()->field('expiresAt')->equals(null), $qb->expr()->field('expiresAt')->gt($date))
243
            )
244 8
            ->field('status')->equals(BaseJob::STATUS_NEW)
245 8
            ->field('locked')->equals(null);
246
247
        // Update
248
        $qb
249 8
            ->field('lockedAt')->set($date) // Set started
250 8
            ->field('locked')->set(true)
251 8
            ->field('status')->set(BaseJob::STATUS_RUNNING)
252 8
            ->field('runId')->set($runId);
253
254 8
        $query = $qb->getQuery();
255
256 8
        $job = $query->execute();
257
258 8
        return $job;
259
    }
260
261 1
    protected function updateNearestBatch(\Dtc\QueueBundle\Model\Job $job)
262
    {
263
        /** @var DocumentManager $objectManager */
264 1
        $objectManager = $this->getObjectManager();
265 1
        $qb = $objectManager->createQueryBuilder($this->getObjectName());
266 1
        $qb->find();
267
268 1
        $qb->sort('whenAt', 'asc');
269 1
        $qb->field('status')->equals(BaseJob::STATUS_NEW)
270 1
            ->field('crcHash')->equals($job->getCrcHash())
271 1
            ->field('locked')->equals(null);
272 1
        $oldJob = $qb->getQuery()->getSingleResult();
273
274 1
        if (!$oldJob) {
275
            return null;
276
        }
277
278
        // Update priority or whenAt
279
        //  This makes sure if someone else is updating at the same time
280
        //  that we don't trounce their changes.
281 1
        $qb = $objectManager->createQueryBuilder($this->getObjectName());
282 1
        $qb->findAndUpdate();
283 1
        $qb->field('_id')->equals($oldJob->getId());
284 1
        $qb->field('priority')->lt($job->getPriority());
285 1
        $qb->field('priority')->set($job->getPriority());
286 1
        $qb->getQuery()->execute();
287
288 1
        $qb = $objectManager->createQueryBuilder($this->getObjectName());
289 1
        $qb->findAndUpdate();
290 1
        $qb->field('_id')->equals($oldJob->getId());
291 1
        $qb->field('whenAt')->gt($job->getWhenAt());
292 1
        $qb->field('whenAt')->set($job->getWhenAt());
293 1
        $qb->getQuery()->execute();
294
295 1
        if ($job->getWhenAt() < $oldJob->getWhenAt()) {
296 1
            $oldJob->setWhenAt($job->getWhenAt());
297
        }
298 1
        if ($job->getPriority() > $oldJob->getPriority()) {
299 1
            $oldJob->setPriority($job->getPriority());
300
        }
301
302 1
        return $oldJob;
303
    }
304
}
305