Completed
Push — master ( cf2de2...e02161 )
by Matthew
10:11
created

JobManager   A

Complexity

Total Complexity 22

Size/Duplication

Total Lines 290
Duplicated Lines 0 %

Coupling/Cohesion

Components 1
Dependencies 8

Test Coverage

Coverage 97.76%

Importance

Changes 1
Bugs 1 Features 0
Metric Value
wmc 22
lcom 1
cbo 8
dl 0
loc 290
ccs 131
cts 134
cp 0.9776
rs 10
c 1
b 1
f 0

10 Methods

Rating   Name   Duplication   Size   Complexity  
A updateExpired() 0 18 2
A getStatusByDocument() 0 51 2
A getStatus() 0 18 4
A countJobsByStatus() 0 14 1
A pruneErroneousJobs() 0 17 2
A pruneArchivedJobs() 0 4 1
A getJobCount() 0 22 1
A addWorkerNameCriterion() 0 10 3
B updateNearestBatch() 0 43 4
B getJob() 0 42 2
1
<?php
2
3
namespace Dtc\QueueBundle\ODM;
4
5
use Doctrine\MongoDB\Query\Builder;
6
use Dtc\QueueBundle\Doctrine\BaseJobManager;
7
use Doctrine\ODM\MongoDB\DocumentManager;
8
use Dtc\QueueBundle\Model\BaseJob;
9
use Dtc\QueueBundle\Model\RetryableJob;
10
11
class JobManager extends BaseJobManager
12
{
13
    use CommonTrait;
14
15 3
    public function countJobsByStatus($objectName, $status, $workerName = null, $method = null)
16
    {
17
        /** @var DocumentManager $objectManager */
18 3
        $objectManager = $this->getObjectManager();
19 3
        $qb = $objectManager->createQueryBuilder($objectName);
20
        $qb
21 3
            ->find()
22 3
            ->field('status')->equals($status);
23
24 3
        $this->addWorkerNameCriterion($qb, $workerName, $method);
25 3
        $query = $qb->getQuery();
26
27 3
        return $query->count();
28
    }
29
30
    /**
31
     * @param string|null $workerName
32
     * @param string|null $method
33
     */
34 1
    public function pruneErroneousJobs($workerName = null, $method = null)
35
    {
36
        /** @var DocumentManager $objectManager */
37 1
        $objectManager = $this->getObjectManager();
38 1
        $qb = $objectManager->createQueryBuilder($this->getArchiveObjectName());
39 1
        $qb = $qb->remove();
40 1
        $qb->field('status')->equals(BaseJob::STATUS_ERROR);
41 1
        $this->addWorkerNameCriterion($qb, $workerName, $method);
42
43 1
        $query = $qb->getQuery();
44 1
        $result = $query->execute();
45 1
        if (isset($result['n'])) {
46 1
            return $result['n'];
47
        }
48
49
        return 0;
50
    }
51
52
    /**
53
     * @param Builder     $builder
54
     * @param string|null $workerName
55
     * @param string|null $method
56
     */
57 16
    protected function addWorkerNameCriterion(Builder $builder, $workerName = null, $method = null)
58
    {
59 16
        if (null !== $workerName) {
60 4
            $builder->field('workerName')->equals($workerName);
61
        }
62
63 16
        if (null !== $method) {
64 3
            $builder->field('method')->equals($method);
65
        }
66 16
    }
67
68
    /**
69
     * @param null $workerName
70
     * @param null $method
71
     *
72
     * @return int
73
     */
74 2
    protected function updateExpired($workerName = null, $method = null)
75
    {
76
        /** @var DocumentManager $objectManager */
77 2
        $objectManager = $this->getObjectManager();
78 2
        $qb = $objectManager->createQueryBuilder($this->getObjectName());
79 2
        $qb = $qb->updateMany();
80 2
        $qb->field('expiresAt')->lte(new \DateTime());
81 2
        $qb->field('status')->equals(BaseJob::STATUS_NEW);
82 2
        $this->addWorkerNameCriterion($qb, $workerName, $method);
83 2
        $qb->field('status')->set(\Dtc\QueueBundle\Model\Job::STATUS_EXPIRED);
84 2
        $query = $qb->getQuery();
85 2
        $result = $query->execute();
86 2
        if (isset($result['n'])) {
87 2
            return $result['n'];
88
        }
89
90
        return 0;
91
    }
92
93
    /**
94
     * Removes archived jobs older than $olderThan.
95
     *
96
     * @param \DateTime $olderThan
97
     *                             return int
98
     */
99 1
    public function pruneArchivedJobs(\DateTime $olderThan)
100
    {
101 1
        return $this->removeOlderThan($this->getArchiveObjectName(), 'updatedAt', $olderThan);
102
    }
103
104 2
    public function getJobCount($workerName = null, $method = null)
105
    {
106
        /** @var DocumentManager $objectManager */
107 2
        $objectManager = $this->getObjectManager();
108 2
        $qb = $objectManager->createQueryBuilder($this->getObjectName());
109
        $qb
110 2
            ->find();
111
112 2
        $this->addWorkerNameCriterion($qb, $workerName, $method);
113
114
        // Filter
115 2
        $date = new \DateTime();
116
        $qb
117 2
            ->addAnd(
118 2
                $qb->expr()->addOr($qb->expr()->field('expiresAt')->equals(null), $qb->expr()->field('expiresAt')->gt($date))
119
            )
120 2
            ->field('locked')->equals(null);
121
122 2
        $query = $qb->getQuery();
123
124 2
        return $query->count(true);
125
    }
126
127
    /**
128
     * Get Status Jobs.
129
     *
130
     * @param string $documentName
131
     *
132
     * @return array
133
     */
134 2
    protected function getStatusByDocument($documentName)
135
    {
136
        // Run a map reduce function get worker and status break down
137 2
        $mapFunc = "function() {
138
            var result = {};
139
            result[this.status] = 1;
140
            var key = this.worker_name + '->' + this.method + '()';
141
            emit(key, result);
142
        }";
143 2
        $reduceFunc = 'function(k, vals) {
144
            var result = {};
145
            for (var index in vals) {
146
                var val =  vals[index];
147
                for (var i in val) {
148
                    if (result.hasOwnProperty(i)) {
149
                        result[i] += val[i];
150
                    }
151
                    else {
152
                        result[i] = val[i];
153
                    }
154
                }
155
            }
156
            return result;
157
        }';
158
        /** @var DocumentManager $objectManager */
159 2
        $objectManager = $this->getObjectManager();
160 2
        $qb = $objectManager->createQueryBuilder($documentName);
161 2
        $qb->map($mapFunc)
162 2
            ->reduce($reduceFunc);
163 2
        $query = $qb->getQuery();
164 2
        $results = $query->execute();
165
166
        $allStatus = array(
167 2
            BaseJob::STATUS_ERROR => 0,
168 2
            BaseJob::STATUS_NEW => 0,
169 2
            RetryableJob::STATUS_EXPIRED => 0,
170 2
            RetryableJob::STATUS_MAX_ERROR => 0,
171 2
            RetryableJob::STATUS_MAX_RETRIES => 0,
172 2
            RetryableJob::STATUS_MAX_STALLED => 0,
173 2
            BaseJob::STATUS_RUNNING => 0,
174 2
            BaseJob::STATUS_SUCCESS => 0,
175
        );
176
177 2
        $status = [];
178
179 2
        foreach ($results as $info) {
180 1
            $status[$info['_id']] = $info['value'] + $allStatus;
181
        }
182
183 2
        return $status;
184
    }
185
186 2
    public function getStatus()
187
    {
188 2
        $result = $this->getStatusByDocument($this->getObjectName());
189 2
        $status2 = $this->getStatusByDocument($this->getArchiveObjectName());
190 2
        foreach ($status2 as $key => $value) {
191 1
            foreach ($value as $k => $v) {
192 1
                $result[$key][$k] += $v;
193
            }
194
        }
195
196 2
        $finalResult = [];
197 2
        foreach ($result as $key => $item) {
198 1
            ksort($item);
199 1
            $finalResult[$key] = $item;
200
        }
201
202 2
        return $finalResult;
203
    }
204
205
    /**
206
     * Get the next job to run (can be filtered by workername and method name).
207
     *
208
     * @param string $workerName
209
     * @param string $methodName
210
     * @param bool   $prioritize
211
     *
212
     * @return \Dtc\QueueBundle\Model\Job
213
     */
214 9
    public function getJob($workerName = null, $methodName = null, $prioritize = true, $runId = null)
215
    {
216
        /** @var DocumentManager $objectManager */
217 9
        $objectManager = $this->getObjectManager();
218 9
        $qb = $objectManager->createQueryBuilder($this->getObjectName());
219
        $qb
220 9
            ->findAndUpdate()
221 9
            ->returnNew();
222
223 9
        $this->addWorkerNameCriterion($qb, $workerName, $methodName);
224 9
        if ($prioritize) {
225 9
            $qb->sort([
226 9
                'priority' => 'desc',
227
                'whenAt' => 'asc',
228
            ]);
229
        } else {
230 1
            $qb->sort('whenAt', 'asc');
231
        }
232
233
        // Filter
234 9
        $date = new \DateTime();
235
        $qb
236 9
            ->addAnd(
237 9
                $qb->expr()->addOr($qb->expr()->field('whenAt')->equals(null), $qb->expr()->field('whenAt')->lte($date)),
238 9
                $qb->expr()->addOr($qb->expr()->field('expiresAt')->equals(null), $qb->expr()->field('expiresAt')->gt($date))
239
            )
240 9
            ->field('status')->equals(BaseJob::STATUS_NEW)
241 9
            ->field('locked')->equals(null);
242
243
        // Update
244
        $qb
245 9
            ->field('lockedAt')->set($date) // Set started
246 9
            ->field('locked')->set(true)
247 9
            ->field('status')->set(BaseJob::STATUS_RUNNING)
248 9
            ->field('runId')->set($runId);
249
250 9
        $query = $qb->getQuery();
251
252 9
        $job = $query->execute();
253
254 9
        return $job;
255
    }
256
257 1
    protected function updateNearestBatch(\Dtc\QueueBundle\Model\Job $job)
258
    {
259
        /** @var DocumentManager $objectManager */
260 1
        $objectManager = $this->getObjectManager();
261 1
        $qb = $objectManager->createQueryBuilder($this->getObjectName());
262 1
        $qb->find();
263
264 1
        $qb->sort('whenAt', 'asc');
265 1
        $qb->field('status')->equals(BaseJob::STATUS_NEW)
266 1
            ->field('crcHash')->equals($job->getCrcHash())
267 1
            ->field('locked')->equals(null);
268 1
        $oldJob = $qb->getQuery()->getSingleResult();
269
270 1
        if (!$oldJob) {
271
            return null;
272
        }
273
274
        // Update priority or whenAt
275
        //  This makes sure if someone else is updating at the same time
276
        //  that we don't trounce their changes.
277 1
        $qb = $objectManager->createQueryBuilder($this->getObjectName());
278 1
        $qb->findAndUpdate();
279 1
        $qb->field('_id')->equals($oldJob->getId());
280 1
        $qb->field('priority')->lt($job->getPriority());
281 1
        $qb->field('priority')->set($job->getPriority());
282 1
        $qb->getQuery()->execute();
283
284 1
        $qb = $objectManager->createQueryBuilder($this->getObjectName());
285 1
        $qb->findAndUpdate();
286 1
        $qb->field('_id')->equals($oldJob->getId());
287 1
        $qb->field('whenAt')->gt($job->getWhenAt());
288 1
        $qb->field('whenAt')->set($job->getWhenAt());
289 1
        $qb->getQuery()->execute();
290
291 1
        if ($job->getWhenAt() < $oldJob->getWhenAt()) {
292 1
            $oldJob->setWhenAt($job->getWhenAt());
293
        }
294 1
        if ($job->getPriority() > $oldJob->getPriority()) {
295 1
            $oldJob->setPriority($job->getPriority());
296
        }
297
298 1
        return $oldJob;
299
    }
300
}
301