Completed
Branch master (45c65c)
by Matthew
02:05
created

JobManager   A

Complexity

Total Complexity 25

Size/Duplication

Total Lines 276
Duplicated Lines 0 %

Coupling/Cohesion

Components 1
Dependencies 9

Test Coverage

Coverage 72.07%

Importance

Changes 1
Bugs 1 Features 0
Metric Value
wmc 25
lcom 1
cbo 9
dl 0
loc 276
ccs 80
cts 111
cp 0.7207
rs 10
c 1
b 1
f 0

10 Methods

Rating   Name   Duplication   Size   Complexity  
A countJobsByStatus() 0 21 3
A stopIdGenerator() 0 8 1
A pruneErroneousJobs() 0 7 1
B pruneJobs() 0 24 4
A pruneExpiredJobs() 0 7 1
A pruneArchivedJobs() 0 17 2
B getJobCount() 0 28 3
A getStatusByDocument() 0 47 2
A getStatus() 0 18 4
B getJob() 0 46 4
1
<?php
2
3
namespace Dtc\QueueBundle\ODM;
4
5
use Doctrine\MongoDB\Query\Builder;
6
use Doctrine\ODM\MongoDB\Mapping\ClassMetadata;
7
use Dtc\QueueBundle\Doctrine\BaseJobManager;
8
use Doctrine\ODM\MongoDB\DocumentManager;
9
use Dtc\QueueBundle\Document\Job;
10
use Dtc\QueueBundle\Model\BaseJob;
11
12
class JobManager extends BaseJobManager
13
{
14 3
    public function countJobsByStatus($objectName, $status, $workerName = null, $method = null)
15
    {
16
        /** @var DocumentManager $objectManager */
17 3
        $objectManager = $this->getObjectManager();
18 3
        $qb = $objectManager->createQueryBuilder($objectName);
19
        $qb
20 3
            ->find()
21 3
            ->field('status')->equals($status);
22
23 3
        if (null !== $workerName) {
24 1
            $qb->field('workerName')->equals($workerName);
25
        }
26
27 3
        if (null !== $method) {
28 1
            $qb->field('method')->equals($method);
29
        }
30
31 3
        $query = $qb->getQuery();
32
33 3
        return $query->count();
34
    }
35
36
    /**
37
     * @param string $objectName
38
     */
39 7
    public function stopIdGenerator($objectName)
40
    {
41 7
        $objectManager = $this->getObjectManager();
42 7
        $repository = $objectManager->getRepository($objectName);
43
        /** @var ClassMetadata $metadata */
44 7
        $metadata = $this->getObjectManager()->getClassMetadata($repository->getClassName());
45 7
        $metadata->setIdGeneratorType(ClassMetadata::GENERATOR_TYPE_NONE);
46 7
    }
47
48
    /**
49
     * @param string|null $workerName
50
     * @param string|null $method
51
     */
52
    public function pruneErroneousJobs($workerName = null, $method = null)
53
    {
54 1
        return $this->pruneJobs($workerName, $method, $this->getArchiveObjectName(), function ($qb) {
55
            /* @var Builder $qb */
56 1
            return $qb->field('status')->equals(BaseJob::STATUS_ERROR);
57 1
        });
58
    }
59
60
    /**
61
     * Prunes jobs according to a condition function.
62
     *
63
     * @param string|null $workerName
64
     * @param string|null $method
65
     * @param $conditionFunc
66
     *
67
     * @return int
68
     */
69 3
    protected function pruneJobs($workerName = null, $method = null, $objectName, $conditionFunc)
70
    {
71
        /** @var DocumentManager $objectManager */
72 3
        $objectManager = $this->getObjectManager();
73 3
        $qb = $objectManager->createQueryBuilder($objectName);
74 3
        $qb = $qb->remove();
75 3
        $qb = $conditionFunc($qb);
76
77 3
        if (null !== $workerName) {
78 2
            $qb->field('workerName')->equals($workerName);
79
        }
80
81 3
        if (null !== $method) {
82 2
            $qb->field('method')->equals($method);
83
        }
84
85 3
        $query = $qb->getQuery();
86 3
        $result = $query->execute();
87 3
        if (isset($result['n'])) {
88 3
            return $result['n'];
89
        }
90
91
        return 0;
92
    }
93
94
    /**
95
     * Prunes expired jobs.
96
     *
97
     * @param string|null $workerName
98
     * @param string|null $method
99
     */
100
    public function pruneExpiredJobs($workerName = null, $method = null)
101
    {
102 2
        return $this->pruneJobs($workerName, $method, $this->getObjectName(), function ($qb) {
103
            /* @var Builder $qb */
104 2
            return $qb->field('expiresAt')->lte(new \DateTime());
105 2
        });
106
    }
107
108
    /**
109
     * Removes archived jobs older than $olderThan.
110
     *
111
     * @param \DateTime $olderThan
112
     */
113 1
    public function pruneArchivedJobs(\DateTime $olderThan)
114
    {
115
        /** @var DocumentManager $objectManager */
116 1
        $objectManager = $this->getObjectManager();
117 1
        $qb = $objectManager->createQueryBuilder($this->getArchiveObjectName());
118
        $qb
119 1
            ->remove()
120 1
            ->field('updatedAt')->lt($olderThan);
121
122 1
        $query = $qb->getQuery();
123 1
        $result = $query->execute();
124 1
        if (isset($result['n'])) {
125 1
            return $result['n'];
126
        }
127
128
        return 0;
129
    }
130
131 1
    public function getJobCount($workerName = null, $method = null)
132
    {
133
        /** @var DocumentManager $objectManager */
134 1
        $objectManager = $this->getObjectManager();
135 1
        $qb = $objectManager->createQueryBuilder($this->getObjectName());
136
        $qb
137 1
            ->find();
138
139 1
        if (null !== $workerName) {
140
            $qb->field('workerName')->equals($workerName);
141
        }
142
143 1
        if (null !== $method) {
144
            $qb->field('method')->equals($method);
145
        }
146
147
        // Filter
148 1
        $date = new \DateTime();
149
        $qb
150 1
            ->addAnd(
151 1
                $qb->expr()->addOr($qb->expr()->field('expiresAt')->equals(null), $qb->expr()->field('expiresAt')->gt($date))
152
            )
153 1
            ->field('locked')->equals(null);
154
155 1
        $query = $qb->getQuery();
156
157 1
        return $query->count(true);
158
    }
159
160
    /**
161
     * Get Status Jobs.
162
     *
163
     * @param string $documentName
164
     */
165
    protected function getStatusByDocument($documentName)
166
    {
167
        // Run a map reduce function get worker and status break down
168
        $mapFunc = "function() {
169
            var result = {};
170
            result[this.status] = 1;
171
            var key = this.worker_name + '->' + this.method + '()';
172
            emit(key, result);
173
        }";
174
        $reduceFunc = 'function(k, vals) {
175
            var result = {};
176
            for (var index in vals) {
177
                var val =  vals[index];
178
                for (var i in val) {
179
                    if (result.hasOwnProperty(i)) {
180
                        result[i] += val[i];
181
                    }
182
                    else {
183
                        result[i] = val[i];
184
                    }
185
                }
186
            }
187
            return result;
188
        }';
189
        /** @var DocumentManager $objectManager */
190
        $objectManager = $this->getObjectManager();
191
        $qb = $objectManager->createQueryBuilder($documentName);
192
        $qb->map($mapFunc)
193
            ->reduce($reduceFunc);
194
        $query = $qb->getQuery();
195
        $results = $query->execute();
196
197
        $allStatus = array(
198
            BaseJob::STATUS_ERROR => 0,
199
            BaseJob::STATUS_NEW => 0,
200
            BaseJob::STATUS_RUNNING => 0,
201
            BaseJob::STATUS_SUCCESS => 0,
202
        );
203
204
        $status = [];
205
206
        foreach ($results as $info) {
207
            $status[$info['_id']] = $info['value'] + $allStatus;
208
        }
209
210
        return $status;
211
    }
212
213
    public function getStatus()
214
    {
215
        $result = $this->getStatusByDocument($this->getObjectName());
216
        $status2 = $this->getStatusByDocument($this->getArchiveObjectName());
217
        foreach ($status2 as $key => $value) {
218
            foreach ($value as $k => $v) {
219
                $result[$key][$k] += $v;
220
            }
221
        }
222
223
        $finalResult = [];
224
        foreach ($result as $key => $item) {
225
            ksort($item);
226
            $finalResult[$key] = $item;
227
        }
228
229
        return $finalResult;
230
    }
231
232
    /**
233
     * Get the next job to run (can be filtered by workername and method name).
234
     *
235
     * @param string $workerName
236
     * @param string $methodName
237
     * @param bool   $prioritize
238
     *
239
     * @return \Dtc\QueueBundle\Model\Job
240
     */
241 7
    public function getJob($workerName = null, $methodName = null, $prioritize = true, $runId = null)
242
    {
243
        /** @var DocumentManager $objectManager */
244 7
        $objectManager = $this->getObjectManager();
245 7
        $qb = $objectManager->createQueryBuilder($this->getObjectName());
246
        $qb
247 7
            ->findAndUpdate()
248 7
            ->returnNew();
249
250 7
        if (null !== $workerName) {
251 1
            $qb->field('workerName')->equals($workerName);
252
        }
253
254 7
        if (null !== $methodName) {
255
            $qb->field('method')->equals($methodName);
256
        }
257
258 7
        if ($prioritize) {
259 7
            $qb->sort('priority', 'asc');
260
        } else {
261
            $qb->sort('whenAt', 'asc');
262
        }
263
264
        // Filter
265 7
        $date = new \DateTime();
266
        $qb
267 7
            ->addAnd(
268 7
                $qb->expr()->addOr($qb->expr()->field('whenAt')->equals(null), $qb->expr()->field('whenAt')->lte($date)),
269 7
                $qb->expr()->addOr($qb->expr()->field('expiresAt')->equals(null), $qb->expr()->field('expiresAt')->gt($date))
270
            )
271 7
            ->field('status')->equals(BaseJob::STATUS_NEW)
272 7
            ->field('locked')->equals(null);
273
274
        // Update
275
        $qb
276 7
            ->field('lockedAt')->set($date) // Set started
277 7
            ->field('locked')->set(true)
278 7
            ->field('status')->set(BaseJob::STATUS_RUNNING)
279 7
            ->field('runId')->set($runId);
280
281 7
        $query = $qb->getQuery();
282
283 7
        $job = $query->execute();
284
285 7
        return $job;
286
    }
287
}
288