Completed
Pull Request — master (#40)
by Matthew
07:31
created

JobManager   A

Complexity

Total Complexity 35

Size/Duplication

Total Lines 387
Duplicated Lines 3.88 %

Coupling/Cohesion

Components 1
Dependencies 9

Test Coverage

Coverage 93.71%

Importance

Changes 3
Bugs 2 Features 1
Metric Value
wmc 35
lcom 1
cbo 9
dl 15
loc 387
ccs 164
cts 175
cp 0.9371
rs 9
c 3
b 2
f 1

15 Methods

Rating   Name   Duplication   Size   Complexity  
A updateExpired() 0 18 2
A countJobsByStatus() 0 14 1
A addWorkerNameCriterion() 0 10 3
A pruneArchivedJobs() 0 4 1
A pruneExceptionJobs() 0 17 2
A getWaitingJobCount() 15 15 1
A getJob() 0 20 1
A countLiveJobs() 0 12 1
B getStatusByDocument() 0 28 2
B getStatus() 0 22 5
A getJobQueryBuilder() 0 21 2
B updateNearestBatch() 0 42 4
A addStandardPredicates() 0 10 1
B getWorkersAndMethods() 0 35 5
B archiveAllJobs() 0 27 4

How to fix   Duplicated Code   

Duplicated Code

Duplicate code is one of the most pungent code smells. A rule that is often used is to re-structure code once it is duplicated in three or more places.

Common duplication problems, and corresponding solutions are:

1
<?php
2
3
namespace Dtc\QueueBundle\ODM;
4
5
use Doctrine\MongoDB\Query\Builder;
6
use Dtc\QueueBundle\Doctrine\DoctrineJobManager;
7
use Doctrine\ODM\MongoDB\DocumentManager;
8
use Dtc\QueueBundle\Document\Job;
9
use Dtc\QueueBundle\Model\BaseJob;
10
use Dtc\QueueBundle\Util\Util;
11
12
class JobManager extends DoctrineJobManager
13
{
14
    use CommonTrait;
15
    const REDUCE_FUNCTION = 'function(k, vals) {
16
            var result = {};
17
            for (var index in vals) {
18
                var val = vals[index];
19
                for (var i in val) {
20
                    if (result.hasOwnProperty(i)) {
21
                        result[i] += val[i];
22
                    }
23
                    else {
24
                        result[i] = val[i];
25
                    }
26
                }
27
            }
28
            return result;
29
        }';
30
31 3
    public function countJobsByStatus($objectName, $status, $workerName = null, $method = null)
32
    {
33
        /** @var DocumentManager $objectManager */
34 3
        $objectManager = $this->getObjectManager();
35 3
        $qb = $objectManager->createQueryBuilder($objectName);
36
        $qb
37 3
            ->find()
38 3
            ->field('status')->equals($status);
39
40 3
        $this->addWorkerNameCriterion($qb, $workerName, $method);
41 3
        $query = $qb->getQuery();
42
43 3
        return $query->count();
44
    }
45
46
    /**
47
     * @param string|null $workerName
48
     * @param string|null $method
49
     */
50 1
    public function pruneExceptionJobs($workerName = null, $method = null)
51
    {
52
        /** @var DocumentManager $objectManager */
53 1
        $objectManager = $this->getObjectManager();
54 1
        $qb = $objectManager->createQueryBuilder($this->getJobArchiveClass());
55 1
        $qb = $qb->remove();
56 1
        $qb->field('status')->equals(BaseJob::STATUS_EXCEPTION);
57 1
        $this->addWorkerNameCriterion($qb, $workerName, $method);
58
59 1
        $query = $qb->getQuery();
60 1
        $result = $query->execute();
61 1
        if (isset($result['n'])) {
62 1
            return $result['n'];
63
        }
64
65
        return 0;
66
    }
67
68
    /**
69
     * @param Builder     $builder
70
     * @param string|null $workerName
71
     * @param string|null $method
72
     */
73 18
    protected function addWorkerNameCriterion(Builder $builder, $workerName = null, $method = null)
74
    {
75 18
        if (null !== $workerName) {
76 6
            $builder->field('workerName')->equals($workerName);
77
        }
78
79 18
        if (null !== $method) {
80 5
            $builder->field('method')->equals($method);
81
        }
82 18
    }
83
84
    /**
85
     * @param null $workerName
86
     * @param null $method
87
     *
88
     * @return int
89
     */
90 2
    protected function updateExpired($workerName = null, $method = null)
91
    {
92
        /** @var DocumentManager $objectManager */
93 2
        $objectManager = $this->getObjectManager();
94 2
        $qb = $objectManager->createQueryBuilder($this->getJobClass());
95 2
        $qb = $qb->updateMany();
96 2
        $qb->field('expiresAt')->lte(Util::getMicrotimeDateTime());
97 2
        $qb->field('status')->equals(BaseJob::STATUS_NEW);
98 2
        $this->addWorkerNameCriterion($qb, $workerName, $method);
99 2
        $qb->field('status')->set(\Dtc\QueueBundle\Model\Job::STATUS_EXPIRED);
100 2
        $query = $qb->getQuery();
101 2
        $result = $query->execute();
102 2
        if (isset($result['n'])) {
103 2
            return $result['n'];
104
        }
105
106
        return 0;
107
    }
108
109
    /**
110
     * Removes archived jobs older than $olderThan.
111
     *
112
     * @param \DateTime $olderThan
113
     *
114
     * @return int
115
     */
116 1
    public function pruneArchivedJobs(\DateTime $olderThan)
117
    {
118 1
        return $this->removeOlderThan($this->getJobArchiveClass(), 'updatedAt', $olderThan);
119
    }
120
121 2 View Code Duplication
    public function getWaitingJobCount($workerName = null, $method = null)
0 ignored issues
show
Duplication introduced by
This method seems to be duplicated in your project.

Duplicated code is one of the most pungent code smells. If you need to duplicate the same code in three or more different places, we strongly encourage you to look into extracting the code into a single class or operation.

You can also find more detailed suggestions in the “Code” section of your repository.

Loading history...
122
    {
123
        /** @var DocumentManager $objectManager */
124 2
        $objectManager = $this->getObjectManager();
125 2
        $builder = $objectManager->createQueryBuilder($this->getJobClass());
126
        $builder
127 2
            ->find();
128
129 2
        $this->addWorkerNameCriterion($builder, $workerName, $method);
130 2
        $this->addStandardPredicates($builder);
131
132 2
        $query = $builder->getQuery();
133
134 2
        return $query->count(true);
135
    }
136
137
    /**
138
     * Get Status Jobs.
139
     *
140
     * @param string $documentName
141
     *
142
     * @return array
143
     */
144 3
    protected function getStatusByDocument($documentName)
145
    {
146
        // Run a map reduce function get worker and status break down
147 3
        $mapFunc = "function() {
148
            var result = {};
149
            result[this.status] = 1;
150
            var key = this.worker_name + '->' + this.method + '()';
151
            emit(key, result);
152
        }";
153 3
        $reduceFunc = self::REDUCE_FUNCTION;
154
        /** @var DocumentManager $objectManager */
155 3
        $objectManager = $this->getObjectManager();
156 3
        $builder = $objectManager->createQueryBuilder($documentName);
157 3
        $builder->map($mapFunc)
158 3
            ->reduce($reduceFunc);
159 3
        $query = $builder->getQuery();
160 3
        $results = $query->execute();
161
162 3
        $allStatus = static::getAllStatuses();
163
164 3
        $status = [];
165
166 3
        foreach ($results as $info) {
167 1
            $status[$info['_id']] = $info['value'] + $allStatus;
168
        }
169
170 3
        return $status;
171
    }
172
173 3
    public function getStatus()
174
    {
175 3
        $result = $this->getStatusByDocument($this->getJobClass());
176 3
        $status2 = $this->getStatusByDocument($this->getJobArchiveClass());
177 3
        foreach ($status2 as $key => $value) {
178 1
            foreach ($value as $k => $v) {
179 1
                if (isset($result[$key][$k])) {
180 1
                    $result[$key][$k] += $v;
181
                } else {
182 1
                    $result[$key][$k] = $v;
183
                }
184
            }
185
        }
186
187 3
        $finalResult = [];
188 3
        foreach ($result as $key => $item) {
189 1
            ksort($item);
190 1
            $finalResult[$key] = $item;
191
        }
192
193 3
        return $finalResult;
194
    }
195
196
    /**
197
     * Get the next job to run (can be filtered by workername and method name).
198
     *
199
     * @param string      $workerName
200
     * @param string      $methodName
201
     * @param bool        $prioritize
202
     * @param string|null $runId
203
     *
204
     * @return \Dtc\QueueBundle\Model\Job
205
     */
206 12
    public function getJob($workerName = null, $methodName = null, $prioritize = true, $runId = null)
207
    {
208 12
        $builder = $this->getJobQueryBuilder($workerName, $methodName, $prioritize);
209
        $builder
210 12
            ->findAndUpdate()
211 12
            ->returnNew();
212
213 12
        $date = Util::getMicrotimeDateTime();
214
        // Update
215
        $builder
216 12
            ->field('startedAt')->set($date)
217 12
            ->field('status')->set(BaseJob::STATUS_RUNNING)
218 12
            ->field('runId')->set($runId);
219
220 12
        $query = $builder->getQuery();
221
222 12
        $job = $query->execute();
223
224 12
        return $job;
225
    }
226
227
    /**
228
     * @param string|null $workerName
229
     * @param string|null $methodName
230
     * @param bool        $prioritize
231
     *
232
     * @return Builder
233
     */
234 12
    public function getJobQueryBuilder($workerName = null, $methodName = null, $prioritize = true)
235
    {
236
        /** @var DocumentManager $objectManager */
237 12
        $objectManager = $this->getObjectManager();
238 12
        $builder = $objectManager->createQueryBuilder($this->getJobClass());
239
240 12
        $this->addWorkerNameCriterion($builder, $workerName, $methodName);
241 12
        if ($prioritize) {
242 12
            $builder->sort([
243 12
                'priority' => 'desc',
244
                'whenAt' => 'asc',
245
            ]);
246
        } else {
247 1
            $builder->sort('whenAt', 'asc');
248
        }
249
250
        // Filter
251 12
        $this->addStandardPredicates($builder);
252
253 12
        return $builder;
254
    }
255
256 1
    protected function updateNearestBatch(\Dtc\QueueBundle\Model\Job $job)
257
    {
258
        /** @var DocumentManager $objectManager */
259 1
        $objectManager = $this->getObjectManager();
260 1
        $builder = $objectManager->createQueryBuilder($this->getJobClass());
261 1
        $builder->find();
262
263 1
        $builder->sort('whenAt', 'asc');
264 1
        $builder->field('status')->equals(BaseJob::STATUS_NEW)
265 1
            ->field('crcHash')->equals($job->getCrcHash());
266 1
        $oldJob = $builder->getQuery()->getSingleResult();
267
268 1
        if (!$oldJob) {
269
            return null;
270
        }
271
272
        // Update priority or whenAt
273
        //  This makes sure if someone else is updating at the same time
274
        //  that we don't trounce their changes.
275 1
        $builder = $objectManager->createQueryBuilder($this->getJobClass());
276 1
        $builder->findAndUpdate();
277 1
        $builder->field('_id')->equals($oldJob->getId());
278 1
        $builder->field('priority')->lt($job->getPriority());
279 1
        $builder->field('priority')->set($job->getPriority());
280 1
        $builder->getQuery()->execute();
281
282 1
        $builder = $objectManager->createQueryBuilder($this->getJobClass());
283 1
        $builder->findAndUpdate();
284 1
        $builder->field('_id')->equals($oldJob->getId());
285 1
        $builder->field('whenAt')->gt($job->getWhenAt());
286 1
        $builder->field('whenAt')->set($job->getWhenAt());
287 1
        $builder->getQuery()->execute();
288
289 1
        if ($job->getWhenAt() < $oldJob->getWhenAt()) {
290 1
            $oldJob->setWhenAt($job->getWhenAt());
291
        }
292 1
        if ($job->getPriority() > $oldJob->getPriority()) {
293 1
            $oldJob->setPriority($job->getPriority());
294
        }
295
296 1
        return $oldJob;
297
    }
298
299
    /**
300
     * @param mixed $builder
301
     */
302 14
    protected function addStandardPredicates($builder)
303
    {
304 14
        $date = Util::getMicrotimeDateTime();
305
        $builder
306 14
            ->addAnd(
307 14
                $builder->expr()->addOr($builder->expr()->field('whenAt')->equals(null), $builder->expr()->field('whenAt')->lte($date)),
308 14
                $builder->expr()->addOr($builder->expr()->field('expiresAt')->equals(null), $builder->expr()->field('expiresAt')->gt($date))
309
            )
310 14
            ->field('status')->equals(BaseJob::STATUS_NEW);
311 14
    }
312
313 1
    public function getWorkersAndMethods()
314
    {
315
        /** @var DocumentManager $documentManager */
316 1
        $documentManager = $this->getObjectManager();
317
318 1
        if (!method_exists($documentManager, 'createAggregationBuilder')) {
319
            return [];
320
        }
321
322 1
        $aggregationBuilder = $documentManager->createAggregationBuilder($this->getJobClass());
323
324 1
        $this->addStandardPredicates($aggregationBuilder->match());
325
326 1
        $aggregationBuilder->group()
327 1
            ->field('id')
328 1
            ->expression(
329 1
                $aggregationBuilder->expr()
330 1
                ->field('workerName')->expression('$workerName')
331 1
                ->field('method')->expression('$method')
332
            );
333 1
        $results = $aggregationBuilder->execute()->toArray();
334
335 1
        if (!$results) {
336 1
            return [];
337
        }
338
339
        $workersMethods = [];
340
        foreach ($results as $result) {
341
            if (isset($result['_id'])) {
342
                $workersMethods[$result['_id']['worker_name']][] = $result['_id']['method'];
343
            }
344
        }
345
346
        return $workersMethods;
347
    }
348
349
    /**
350
     * @param string $workerName
351
     * @param string $methodName
352
     */
353 2
    public function countLiveJobs($workerName = null, $methodName = null)
354
    {
355
        /** @var DocumentManager $objectManager */
356 2
        $objectManager = $this->getObjectManager();
357 2
        $builder = $objectManager->createQueryBuilder($this->getJobClass());
358
359 2
        $this->addWorkerNameCriterion($builder, $workerName, $methodName);
360
        // Filter
361 2
        $this->addStandardPredicates($builder);
362
363 2
        return $builder->getQuery()->count();
364
    }
365
366
    /**
367
     * @param string        $workerName
368
     * @param string        $methodName
369
     * @param callable|null $progressCallback
370
     */
371 1
    public function archiveAllJobs($workerName = null, $methodName = null, callable $progressCallback = null)
372
    {
373
        /** @var DocumentManager $documentManager */
374 1
        $documentManager = $this->getObjectManager();
375 1
        $count = 0;
376 1
        $builder = $this->getJobQueryBuilder($workerName, $methodName, true);
377
        $builder
378 1
            ->findAndUpdate()
379 1
            ->returnNew();
380
381 1
        $builder->field('status')->set(Job::STATUS_ARCHIVE);
382 1
        $query = $builder->getQuery();
383
        do {
384 1
            $job = $query->execute();
385 1
            if ($job) {
386 1
                $documentManager->remove($job);
387 1
                ++$count;
388
389 1
                if (0 == $count % 10) {
390
                    $this->flush();
391
                    $this->updateProgress($progressCallback, $count);
392
                }
393
            }
394 1
        } while ($job);
395 1
        $this->flush();
396 1
        $this->updateProgress($progressCallback, $count);
397 1
    }
398
}
399