Completed
Push — master ( 32ab89...d4f4f0 )
by Matthew
09:35
created

JobManager   A

Complexity

Total Complexity 20

Size/Duplication

Total Lines 259
Duplicated Lines 0 %

Coupling/Cohesion

Components 1
Dependencies 7

Test Coverage

Coverage 97.27%

Importance

Changes 1
Bugs 1 Features 0
Metric Value
wmc 20
lcom 1
cbo 7
dl 0
loc 259
ccs 107
cts 110
cp 0.9727
rs 10
c 1
b 1
f 0

11 Methods

Rating   Name   Duplication   Size   Complexity  
A countJobsByStatus() 0 14 1
A stopIdGenerator() 0 4 1
A restoreIdGenerator() 0 4 1
A pruneErroneousJobs() 0 17 2
A addWorkerNameCriterion() 0 10 3
A updateExpired() 0 18 2
A pruneArchivedJobs() 0 7 1
A getJobCount() 0 22 1
A getStatus() 0 18 4
B getJob() 0 39 2
A getStatusByDocument() 0 51 2
1
<?php
2
3
namespace Dtc\QueueBundle\ODM;
4
5
use Doctrine\MongoDB\Query\Builder;
6
use Dtc\QueueBundle\Doctrine\BaseJobManager;
7
use Doctrine\ODM\MongoDB\DocumentManager;
8
use Dtc\QueueBundle\Document\Job;
9
use Dtc\QueueBundle\Model\BaseJob;
10
use Dtc\QueueBundle\Model\RetryableJob;
11
12
class JobManager extends BaseJobManager
13
{
14
    use CommonTrait;
15
16 3
    public function countJobsByStatus($objectName, $status, $workerName = null, $method = null)
17
    {
18
        /** @var DocumentManager $objectManager */
19 3
        $objectManager = $this->getObjectManager();
20 3
        $qb = $objectManager->createQueryBuilder($objectName);
21
        $qb
22 3
            ->find()
23 3
            ->field('status')->equals($status);
24
25 3
        $this->addWorkerNameCriterion($qb, $workerName, $method);
26 3
        $query = $qb->getQuery();
27
28 3
        return $query->count();
29
    }
30
31
    /**
32
     * @param string $objectName
33
     */
34 12
    public function stopIdGenerator($objectName)
35
    {
36
        // Not needed for ODM
37 12
    }
38
39 1
    public function restoreIdGenerator($objectName)
40
    {
41
        // Not needed for ODM
42 1
    }
43
44
    /**
45
     * @param string|null $workerName
46
     * @param string|null $method
47
     */
48 1
    public function pruneErroneousJobs($workerName = null, $method = null)
49
    {
50
        /** @var DocumentManager $objectManager */
51 1
        $objectManager = $this->getObjectManager();
52 1
        $qb = $objectManager->createQueryBuilder($this->getArchiveObjectName());
53 1
        $qb = $qb->remove();
54 1
        $qb->field('status')->equals(BaseJob::STATUS_ERROR);
55 1
        $this->addWorkerNameCriterion($qb, $workerName, $method);
56
57 1
        $query = $qb->getQuery();
58 1
        $result = $query->execute();
59 1
        if (isset($result['n'])) {
60 1
            return $result['n'];
61
        }
62
63
        return 0;
64
    }
65
66
    /**
67
     * @param Builder     $builder
68
     * @param string|null $workerName
69
     * @param string|null $method
70
     */
71 15
    protected function addWorkerNameCriterion(Builder $builder, $workerName = null, $method = null)
72
    {
73 15
        if (null !== $workerName) {
74 4
            $builder->field('workerName')->equals($workerName);
75
        }
76
77 15
        if (null !== $method) {
78 3
            $builder->field('method')->equals($method);
79
        }
80 15
    }
81
82
    /**
83
     * @param null $workerName
84
     * @param null $method
85
     *
86
     * @return int
87
     */
88 2
    protected function updateExpired($workerName = null, $method = null)
89
    {
90
        /** @var DocumentManager $objectManager */
91 2
        $objectManager = $this->getObjectManager();
92 2
        $qb = $objectManager->createQueryBuilder($this->getObjectName());
93 2
        $qb = $qb->updateMany();
94 2
        $qb->field('expiresAt')->lte(new \DateTime());
95 2
        $qb->field('status')->equals(BaseJob::STATUS_NEW);
96 2
        $this->addWorkerNameCriterion($qb, $workerName, $method);
97 2
        $qb->field('status')->set(\Dtc\QueueBundle\Model\Job::STATUS_EXPIRED);
98 2
        $query = $qb->getQuery();
99 2
        $result = $query->execute();
100 2
        if (isset($result['n'])) {
101 2
            return $result['n'];
102
        }
103
104
        return 0;
105
    }
106
107
    /**
108
     * Removes archived jobs older than $olderThan.
109
     *
110
     * @param \DateTime $olderThan
111
     *                             return int
112
     */
113 1
    public function pruneArchivedJobs(\DateTime $olderThan)
114
    {
115
        /** @var DocumentManager $documentManager */
116 1
        $documentManager = $this->getObjectManager();
117
118 1
        return $this->removeOlderThan($documentManager, $this->getArchiveObjectName(), 'updatedAt', $olderThan);
119
    }
120
121 2
    public function getJobCount($workerName = null, $method = null)
122
    {
123
        /** @var DocumentManager $objectManager */
124 2
        $objectManager = $this->getObjectManager();
125 2
        $qb = $objectManager->createQueryBuilder($this->getObjectName());
126
        $qb
127 2
            ->find();
128
129 2
        $this->addWorkerNameCriterion($qb, $workerName, $method);
130
131
        // Filter
132 2
        $date = new \DateTime();
133
        $qb
134 2
            ->addAnd(
135 2
                $qb->expr()->addOr($qb->expr()->field('expiresAt')->equals(null), $qb->expr()->field('expiresAt')->gt($date))
136
            )
137 2
            ->field('locked')->equals(null);
138
139 2
        $query = $qb->getQuery();
140
141 2
        return $query->count(true);
142
    }
143
144
    /**
145
     * Get Status Jobs.
146
     *
147
     * @param string $documentName
148
     *
149
     * @return array
150
     */
151 2
    protected function getStatusByDocument($documentName)
152
    {
153
        // Run a map reduce function get worker and status break down
154 2
        $mapFunc = "function() {
155
            var result = {};
156
            result[this.status] = 1;
157
            var key = this.worker_name + '->' + this.method + '()';
158
            emit(key, result);
159
        }";
160 2
        $reduceFunc = 'function(k, vals) {
161
            var result = {};
162
            for (var index in vals) {
163
                var val =  vals[index];
164
                for (var i in val) {
165
                    if (result.hasOwnProperty(i)) {
166
                        result[i] += val[i];
167
                    }
168
                    else {
169
                        result[i] = val[i];
170
                    }
171
                }
172
            }
173
            return result;
174
        }';
175
        /** @var DocumentManager $objectManager */
176 2
        $objectManager = $this->getObjectManager();
177 2
        $qb = $objectManager->createQueryBuilder($documentName);
178 2
        $qb->map($mapFunc)
179 2
            ->reduce($reduceFunc);
180 2
        $query = $qb->getQuery();
181 2
        $results = $query->execute();
182
183
        $allStatus = array(
184 2
            BaseJob::STATUS_ERROR => 0,
185 2
            BaseJob::STATUS_NEW => 0,
186 2
            RetryableJob::STATUS_EXPIRED => 0,
187 2
            RetryableJob::STATUS_MAX_ERROR => 0,
188 2
            RetryableJob::STATUS_MAX_RETRIES => 0,
189 2
            RetryableJob::STATUS_MAX_STALLED => 0,
190 2
            BaseJob::STATUS_RUNNING => 0,
191 2
            BaseJob::STATUS_SUCCESS => 0,
192
        );
193
194 2
        $status = [];
195
196 2
        foreach ($results as $info) {
197 1
            $status[$info['_id']] = $info['value'] + $allStatus;
198
        }
199
200 2
        return $status;
201
    }
202
203 2
    public function getStatus()
204
    {
205 2
        $result = $this->getStatusByDocument($this->getObjectName());
206 2
        $status2 = $this->getStatusByDocument($this->getArchiveObjectName());
207 2
        foreach ($status2 as $key => $value) {
208 1
            foreach ($value as $k => $v) {
209 1
                $result[$key][$k] += $v;
210
            }
211
        }
212
213 2
        $finalResult = [];
214 2
        foreach ($result as $key => $item) {
215 1
            ksort($item);
216 1
            $finalResult[$key] = $item;
217
        }
218
219 2
        return $finalResult;
220
    }
221
222
    /**
223
     * Get the next job to run (can be filtered by workername and method name).
224
     *
225
     * @param string $workerName
226
     * @param string $methodName
227
     * @param bool   $prioritize
228
     *
229
     * @return \Dtc\QueueBundle\Model\Job
230
     */
231 8
    public function getJob($workerName = null, $methodName = null, $prioritize = true, $runId = null)
232
    {
233
        /** @var DocumentManager $objectManager */
234 8
        $objectManager = $this->getObjectManager();
235 8
        $qb = $objectManager->createQueryBuilder($this->getObjectName());
236
        $qb
237 8
            ->findAndUpdate()
238 8
            ->returnNew();
239
240 8
        $this->addWorkerNameCriterion($qb, $workerName, $methodName);
241 8
        if ($prioritize) {
242 8
            $qb->sort('priority', 'desc');
243
        } else {
244
            $qb->sort('whenAt', 'asc');
245
        }
246
247
        // Filter
248 8
        $date = new \DateTime();
249
        $qb
250 8
            ->addAnd(
251 8
                $qb->expr()->addOr($qb->expr()->field('whenAt')->equals(null), $qb->expr()->field('whenAt')->lte($date)),
252 8
                $qb->expr()->addOr($qb->expr()->field('expiresAt')->equals(null), $qb->expr()->field('expiresAt')->gt($date))
253
            )
254 8
            ->field('status')->equals(BaseJob::STATUS_NEW)
255 8
            ->field('locked')->equals(null);
256
257
        // Update
258
        $qb
259 8
            ->field('lockedAt')->set($date) // Set started
260 8
            ->field('locked')->set(true)
261 8
            ->field('status')->set(BaseJob::STATUS_RUNNING)
262 8
            ->field('runId')->set($runId);
263
264 8
        $query = $qb->getQuery();
265
266 8
        $job = $query->execute();
267
268 8
        return $job;
269
    }
270
}
271