Completed
Push — master ( 8a247e...4109b7 )
by Matthew
05:12
created

JobManager::updateNearestBatch()   B

Complexity

Conditions 4
Paths 5

Size

Total Lines 43
Code Lines 28

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 27
CRAP Score 4.0007

Importance

Changes 0
Metric Value
dl 0
loc 43
ccs 27
cts 28
cp 0.9643
rs 8.5806
c 0
b 0
f 0
cc 4
eloc 28
nc 5
nop 1
crap 4.0007
1
<?php
2
3
namespace Dtc\QueueBundle\ODM;
4
5
use Doctrine\MongoDB\Query\Builder;
6
use Dtc\QueueBundle\Doctrine\BaseJobManager;
7
use Doctrine\ODM\MongoDB\DocumentManager;
8
use Dtc\QueueBundle\Model\BaseJob;
9
use Dtc\QueueBundle\Model\RetryableJob;
10
11
class JobManager extends BaseJobManager
12
{
13
    use CommonTrait;
14
15 3
    public function countJobsByStatus($objectName, $status, $workerName = null, $method = null)
16
    {
17
        /** @var DocumentManager $objectManager */
18 3
        $objectManager = $this->getObjectManager();
19 3
        $qb = $objectManager->createQueryBuilder($objectName);
20
        $qb
21 3
            ->find()
22 3
            ->field('status')->equals($status);
23
24 3
        $this->addWorkerNameCriterion($qb, $workerName, $method);
25 3
        $query = $qb->getQuery();
26
27 3
        return $query->count();
28
    }
29
30
    /**
31
     * @param string|null $workerName
32
     * @param string|null $method
33
     */
34 1
    public function pruneErroneousJobs($workerName = null, $method = null)
35
    {
36
        /** @var DocumentManager $objectManager */
37 1
        $objectManager = $this->getObjectManager();
38 1
        $qb = $objectManager->createQueryBuilder($this->getArchiveObjectName());
39 1
        $qb = $qb->remove();
40 1
        $qb->field('status')->equals(BaseJob::STATUS_ERROR);
41 1
        $this->addWorkerNameCriterion($qb, $workerName, $method);
42
43 1
        $query = $qb->getQuery();
44 1
        $result = $query->execute();
45 1
        if (isset($result['n'])) {
46 1
            return $result['n'];
47
        }
48
49
        return 0;
50
    }
51
52
    /**
53
     * @param Builder     $builder
54
     * @param string|null $workerName
55
     * @param string|null $method
56
     */
57 14
    protected function addWorkerNameCriterion(Builder $builder, $workerName = null, $method = null)
58
    {
59 14
        if (null !== $workerName) {
60 4
            $builder->field('workerName')->equals($workerName);
61
        }
62
63 14
        if (null !== $method) {
64 3
            $builder->field('method')->equals($method);
65
        }
66 14
    }
67
68
    /**
69
     * @param null $workerName
70
     * @param null $method
71
     *
72
     * @return int
73
     */
74 2
    protected function updateExpired($workerName = null, $method = null)
75
    {
76
        /** @var DocumentManager $objectManager */
77 2
        $objectManager = $this->getObjectManager();
78 2
        $qb = $objectManager->createQueryBuilder($this->getObjectName());
79 2
        $qb = $qb->updateMany();
80 2
        $qb->field('expiresAt')->lte(new \DateTime());
81 2
        $qb->field('status')->equals(BaseJob::STATUS_NEW);
82 2
        $this->addWorkerNameCriterion($qb, $workerName, $method);
83 2
        $qb->field('status')->set(\Dtc\QueueBundle\Model\Job::STATUS_EXPIRED);
84 2
        $query = $qb->getQuery();
85 2
        $result = $query->execute();
86 2
        if (isset($result['n'])) {
87 2
            return $result['n'];
88
        }
89
90
        return 0;
91
    }
92
93
    /**
94
     * Removes archived jobs older than $olderThan.
95
     *
96
     * @param \DateTime $olderThan
97
     *                             return int
98
     */
99 1
    public function pruneArchivedJobs(\DateTime $olderThan)
100
    {
101 1
        return $this->removeOlderThan($this->getArchiveObjectName(), 'updatedAt', $olderThan);
102
    }
103
104 2
    public function getJobCount($workerName = null, $method = null)
105
    {
106
        /** @var DocumentManager $objectManager */
107 2
        $objectManager = $this->getObjectManager();
108 2
        $qb = $objectManager->createQueryBuilder($this->getObjectName());
109
        $qb
110 2
            ->find();
111
112 2
        $this->addWorkerNameCriterion($qb, $workerName, $method);
113
114
        // Filter
115 2
        $date = new \DateTime();
116
        $qb
117 2
            ->addAnd(
118 2
                $qb->expr()->addOr($qb->expr()->field('expiresAt')->equals(null), $qb->expr()->field('expiresAt')->gt($date))
119
            )
120 2
            ->field('locked')->equals(null);
121
122 2
        $query = $qb->getQuery();
123
124 2
        return $query->count(true);
125
    }
126
127 1
    protected function getJobCurrentStatus(\Dtc\QueueBundle\Model\Job $job)
128
    {
129
        /** @var DocumentManager $objectManager */
130 1
        $objectManager = $this->getObjectManager();
131 1
        $qb = $objectManager->createQueryBuilder($this->getObjectName())->select('status')->field('_id')->equals($job->getId());
132 1
        $query = $qb->getQuery();
133 1
        $query->setHydrate(false);
134 1
        $result = $query->getSingleResult();
135
136 1
        return isset($result['status']) ? $result['status'] : null;
137
    }
138
139
    /**
140
     * Get Status Jobs.
141
     *
142
     * @param string $documentName
143
     *
144
     * @return array
145
     */
146 2
    protected function getStatusByDocument($documentName)
147
    {
148
        // Run a map reduce function get worker and status break down
149 2
        $mapFunc = "function() {
150
            var result = {};
151
            result[this.status] = 1;
152
            var key = this.worker_name + '->' + this.method + '()';
153
            emit(key, result);
154
        }";
155 2
        $reduceFunc = 'function(k, vals) {
156
            var result = {};
157
            for (var index in vals) {
158
                var val =  vals[index];
159
                for (var i in val) {
160
                    if (result.hasOwnProperty(i)) {
161
                        result[i] += val[i];
162
                    }
163
                    else {
164
                        result[i] = val[i];
165
                    }
166
                }
167
            }
168
            return result;
169
        }';
170
        /** @var DocumentManager $objectManager */
171 2
        $objectManager = $this->getObjectManager();
172 2
        $qb = $objectManager->createQueryBuilder($documentName);
173 2
        $qb->map($mapFunc)
174 2
            ->reduce($reduceFunc);
175 2
        $query = $qb->getQuery();
176 2
        $results = $query->execute();
177
178
        $allStatus = array(
179 2
            BaseJob::STATUS_ERROR => 0,
180
            BaseJob::STATUS_NEW => 0,
181
            RetryableJob::STATUS_EXPIRED => 0,
182
            RetryableJob::STATUS_MAX_ERROR => 0,
183
            RetryableJob::STATUS_MAX_RETRIES => 0,
184
            RetryableJob::STATUS_MAX_STALLED => 0,
185
            BaseJob::STATUS_RUNNING => 0,
186
            BaseJob::STATUS_SUCCESS => 0,
187
        );
188
189 2
        $status = [];
190
191 2
        foreach ($results as $info) {
192 1
            $status[$info['_id']] = $info['value'] + $allStatus;
193
        }
194
195 2
        return $status;
196
    }
197
198 2
    public function getStatus()
199
    {
200 2
        $result = $this->getStatusByDocument($this->getObjectName());
201 2
        $status2 = $this->getStatusByDocument($this->getArchiveObjectName());
202 2
        foreach ($status2 as $key => $value) {
203 1
            foreach ($value as $k => $v) {
204 1
                $result[$key][$k] += $v;
205
            }
206
        }
207
208 2
        $finalResult = [];
209 2
        foreach ($result as $key => $item) {
210 1
            ksort($item);
211 1
            $finalResult[$key] = $item;
212
        }
213
214 2
        return $finalResult;
215
    }
216
217
    /**
218
     * Get the next job to run (can be filtered by workername and method name).
219
     *
220
     * @param string $workerName
221
     * @param string $methodName
222
     * @param bool   $prioritize
223
     *
224
     * @return \Dtc\QueueBundle\Model\Job
225
     */
226 7
    public function getJob($workerName = null, $methodName = null, $prioritize = true, $runId = null)
227
    {
228
        /** @var DocumentManager $objectManager */
229 7
        $objectManager = $this->getObjectManager();
230 7
        $qb = $objectManager->createQueryBuilder($this->getObjectName());
231
        $qb
232 7
            ->findAndUpdate()
233 7
            ->returnNew();
234
235 7
        $this->addWorkerNameCriterion($qb, $workerName, $methodName);
236 7
        if ($prioritize) {
237 7
            $qb->sort([
238 7
                'priority' => 'desc',
239
                'whenAt' => 'asc',
240
            ]);
241
        } else {
242
            $qb->sort('whenAt', 'asc');
243
        }
244
245
        // Filter
246 7
        $date = new \DateTime();
247
        $qb
248 7
            ->addAnd(
249 7
                $qb->expr()->addOr($qb->expr()->field('whenAt')->equals(null), $qb->expr()->field('whenAt')->lte($date)),
250 7
                $qb->expr()->addOr($qb->expr()->field('expiresAt')->equals(null), $qb->expr()->field('expiresAt')->gt($date))
251
            )
252 7
            ->field('status')->equals(BaseJob::STATUS_NEW)
253 7
            ->field('locked')->equals(null);
254
255
        // Update
256
        $qb
257 7
            ->field('lockedAt')->set($date) // Set started
258 7
            ->field('locked')->set(true)
259 7
            ->field('status')->set(BaseJob::STATUS_RUNNING)
260 7
            ->field('runId')->set($runId);
261
262 7
        $query = $qb->getQuery();
263
264 7
        $job = $query->execute();
265
266 7
        return $job;
267
    }
268
269 1
    protected function updateNearestBatch(\Dtc\QueueBundle\Model\Job $job)
270
    {
271
        /** @var DocumentManager $objectManager */
272 1
        $objectManager = $this->getObjectManager();
273 1
        $qb = $objectManager->createQueryBuilder($this->getObjectName());
274 1
        $qb->find();
275
276 1
        $qb->sort('whenAt', 'asc');
277 1
        $qb->field('status')->equals(BaseJob::STATUS_NEW)
278 1
            ->field('crcHash')->equals($job->getCrcHash())
279 1
            ->field('locked')->equals(null);
280 1
        $oldJob = $qb->getQuery()->getSingleResult();
281
282 1
        if (!$oldJob) {
283
            return null;
284
        }
285
286
        // Update priority or whenAt
287
        //  This makes sure if someone else is updating at the same time
288
        //  that we don't trounce their changes.
289 1
        $qb = $objectManager->createQueryBuilder($this->getObjectName());
290 1
        $qb->findAndUpdate();
291 1
        $qb->field('_id')->equals($oldJob->getId());
292 1
        $qb->field('priority')->lt($job->getPriority());
293 1
        $qb->field('priority')->set($job->getPriority());
294 1
        $qb->getQuery()->execute();
295
296 1
        $qb = $objectManager->createQueryBuilder($this->getObjectName());
297 1
        $qb->findAndUpdate();
298 1
        $qb->field('_id')->equals($oldJob->getId());
299 1
        $qb->field('whenAt')->gt($job->getWhenAt());
300 1
        $qb->field('whenAt')->set($job->getWhenAt());
301 1
        $qb->getQuery()->execute();
302
303 1
        if ($job->getWhenAt() < $oldJob->getWhenAt()) {
304 1
            $oldJob->setWhenAt($job->getWhenAt());
305
        }
306 1
        if ($job->getPriority() > $oldJob->getPriority()) {
307 1
            $oldJob->setPriority($job->getPriority());
308
        }
309
310 1
        return $oldJob;
311
    }
312
}
313