Passed
Push — master ( 26ea1a...43abdf )
by Matthew
07:12
created

JobManager   B

Complexity

Total Complexity 48

Size/Duplication

Total Lines 449
Duplicated Lines 0 %

Test Coverage

Coverage 85%

Importance

Changes 3
Bugs 2 Features 1
Metric Value
eloc 215
c 3
b 2
f 1
dl 0
loc 449
ccs 187
cts 220
cp 0.85
rs 8.5599
wmc 48

17 Methods

Rating   Name   Duplication   Size   Complexity  
A getWorkersAndMethods() 0 34 5
A updateNearestBatch() 0 41 4
A archiveAllJobs() 0 26 4
A pruneArchivedJobs() 0 3 1
A getStatus() 0 21 5
A runQuery() 0 12 4
A getJob() 0 19 1
A pruneExceptionJobs() 0 18 3
A countLiveJobs() 0 15 2
A getStatusByDocumentMapReduce() 0 21 2
A getStatusByDocument() 0 36 4
A updateExpired() 0 20 3
A countJobsByStatus() 0 18 2
A addWorkerNameCriterion() 0 8 3
A getJobQueryBuilder() 0 20 2
A getWaitingJobCount() 0 18 2
A addStandardPredicates() 0 9 1

How to fix   Complexity   

Complex Class

Complex classes like JobManager often do a lot of different things. To break such a class down, we need to identify a cohesive component within that class. A common approach to find such a component is to look for fields/methods that share the same prefixes, or suffixes.

Once you have determined the fields that belong together, you can apply the Extract Class refactoring. If the component makes sense as a sub-class, Extract Subclass is also a candidate, and is often faster.

While breaking up the class, it is a good idea to analyze how other classes use JobManager, and based on these observations, apply Extract Interface, too.

1
<?php
2
3
namespace Dtc\QueueBundle\ODM;
4
5
use Doctrine\MongoDB\Exception\ResultException;
0 ignored issues
show
Bug introduced by
The type Doctrine\MongoDB\Exception\ResultException was not found. Maybe you did not declare it correctly or list all dependencies?

The issue could also be caused by a filter entry in the build configuration. If the path has been excluded in your configuration, e.g. excluded_paths: ["lib/*"], you can move it to the dependency path list as follows:

filter:
    dependency_paths: ["lib/*"]

For further information see https://scrutinizer-ci.com/docs/tools/php/php-scrutinizer/#list-dependency-paths

Loading history...
6
use Doctrine\MongoDB\Query\Builder;
0 ignored issues
show
Bug introduced by
The type Doctrine\MongoDB\Query\Builder was not found. Maybe you did not declare it correctly or list all dependencies?

The issue could also be caused by a filter entry in the build configuration. If the path has been excluded in your configuration, e.g. excluded_paths: ["lib/*"], you can move it to the dependency path list as follows:

filter:
    dependency_paths: ["lib/*"]

For further information see https://scrutinizer-ci.com/docs/tools/php/php-scrutinizer/#list-dependency-paths

Loading history...
7
use Dtc\QueueBundle\Doctrine\DoctrineJobManager;
8
use Doctrine\ODM\MongoDB\DocumentManager;
9
use Dtc\QueueBundle\Document\Job;
10
use Dtc\QueueBundle\Model\BaseJob;
11
use Dtc\QueueBundle\Util\Util;
12
use MongoDB\DeleteResult;
13
use MongoDB\UpdateResult;
14
15
class JobManager extends DoctrineJobManager
16
{
17
    use CommonTrait;
18
    const REDUCE_FUNCTION = 'function(k, vals) {
19
            var result = {};
20
            for (var index in vals) {
21
                var val = vals[index];
22
                for (var i in val) {
23
                    if (result.hasOwnProperty(i)) {
24
                        result[i] += val[i];
25
                    }
26
                    else {
27
                        result[i] = val[i];
28
                    }
29
                }
30
            }
31
            return result;
32
        }';
33
34 3
    public function countJobsByStatus($objectName, $status, $workerName = null, $method = null)
35
    {
36
        /** @var DocumentManager $objectManager */
37 3
        $objectManager = $this->getObjectManager();
38 3
        $qb = $objectManager->createQueryBuilder($objectName);
39
        $qb
40 3
            ->find()
41 3
            ->field('status')->equals($status);
42
43 3
        $this->addWorkerNameCriterion($qb, $workerName, $method);
44 3
        $method = 'count';
45 3
        if (method_exists($qb, 'count')) {
46 3
            $method = 'execute';
47 3
            $qb->count();
48
        }
49 3
        $query = $qb->getQuery();
50
51 3
        return $this->runQuery($query, $method, [], 0);
52
    }
53
54
    /**
55
     * @param string|null $workerName
56
     * @param string|null $method
57
     */
58 1
    public function pruneExceptionJobs($workerName = null, $method = null)
59
    {
60
        /** @var DocumentManager $objectManager */
61 1
        $objectManager = $this->getObjectManager();
62 1
        $qb = $objectManager->createQueryBuilder($this->getJobArchiveClass());
63 1
        $qb = $qb->remove();
64 1
        $qb->field('status')->equals(BaseJob::STATUS_EXCEPTION);
65 1
        $this->addWorkerNameCriterion($qb, $workerName, $method);
66
67 1
        $query = $qb->getQuery();
68 1
        $result = $this->runQuery($query, 'execute');
69 1
        if ($result instanceof DeleteResult) {
70 1
            return $result->getDeletedCount();
71
        } else if (isset($result['n'])) {
72
            return $result['n'];
73
        }
74
75
        return 0;
76
    }
77
78
    /**
79
     * @param $builder
80
     * @param string|null $workerName
81
     * @param string|null $method
82
     */
83 20
    protected function addWorkerNameCriterion($builder, $workerName = null, $method = null)
84
    {
85 20
        if (null !== $workerName) {
86 6
            $builder->field('workerName')->equals($workerName);
87
        }
88
89 20
        if (null !== $method) {
90 5
            $builder->field('method')->equals($method);
91
        }
92 20
    }
93
94
    /**
95
     * @param null $workerName
0 ignored issues
show
Documentation Bug introduced by
Are you sure the doc-type for parameter $workerName is correct as it would always require null to be passed?
Loading history...
96
     * @param null $method
0 ignored issues
show
Documentation Bug introduced by
Are you sure the doc-type for parameter $method is correct as it would always require null to be passed?
Loading history...
97
     *
98
     * @return int
99
     */
100 2
    protected function updateExpired($workerName = null, $method = null)
101
    {
102
        /** @var DocumentManager $objectManager */
103 2
        $objectManager = $this->getObjectManager();
104 2
        $qb = $objectManager->createQueryBuilder($this->getJobClass());
105 2
        $qb = $qb->updateMany();
106 2
        $qb->field('expiresAt')->lte(Util::getMicrotimeDateTime());
107 2
        $qb->field('status')->equals(BaseJob::STATUS_NEW);
108 2
        $this->addWorkerNameCriterion($qb, $workerName, $method);
109 2
        $qb->field('status')->set(\Dtc\QueueBundle\Model\Job::STATUS_EXPIRED);
110 2
        $query = $qb->getQuery();
111 2
        $result = $this->runQuery($query, 'execute');
112 2
        if ($result instanceof UpdateResult) {
113 2
            return $result->getModifiedCount();
114
        }
115
        else if (isset($result['n'])) {
116
            return $result['n'];
117
        }
118
119
        return 0;
120
    }
121
122
    /**
123
     * Removes archived jobs older than $olderThan.
124
     *
125
     * @param \DateTime $olderThan
126
     *
127
     * @return int
128
     */
129 1
    public function pruneArchivedJobs(\DateTime $olderThan)
130
    {
131 1
        return $this->removeOlderThan($this->getJobArchiveClass(), 'updatedAt', $olderThan);
132
    }
133
134 2
    public function getWaitingJobCount($workerName = null, $method = null)
135
    {
136
        /** @var DocumentManager $objectManager */
137 2
        $objectManager = $this->getObjectManager();
138 2
        $builder = $objectManager->createQueryBuilder($this->getJobClass());
139
        $builder
140 2
            ->find();
141
142 2
        $this->addWorkerNameCriterion($builder, $workerName, $method);
143 2
        $this->addStandardPredicates($builder);
144 2
        $method = 'count';
145 2
        if (method_exists($builder, 'count')) {
146 2
            $builder->count();
147 2
            $method = 'execute';
148
        }
149 2
        $query = $builder->getQuery();
150
151 2
        return $this->runQuery($query, $method, [true], 0);
152
    }
153
154
    protected function getStatusByDocumentMapReduce($builder, $documentName) {
155
        $reduceFunc = self::REDUCE_FUNCTION;
156
        $mapFunc = "function() {
157
            var result = {};
158
            result[this.status] = 1;
159
            var key = this.worker_name + '->' + this.method + '()';
160
            emit(key, result);
161
            }";
162
        $builder->map($mapFunc)
163
            ->reduce($reduceFunc);
164
        $query = $builder->getQuery();
165
        $results = $this->runQuery($query, 'execute', [], []);
166
        $allStatus = static::getAllStatuses();
167
168
        $status = [];
169
170
        foreach ($results as $info) {
171
            $status[$info['_id']] = $info['value'] + $allStatus;
172
        }
173
174
        return $status;
175
    }
176
177
    /**
178
     * Get Status Jobs.
179
     *
180
     * @param string $documentName
181
     *
182
     * @return array
183
     */
184 3
    protected function getStatusByDocument($documentName)
185
    {
186
        // Run a map reduce function get worker and status break down
187
        /** @var DocumentManager $objectManager */
188 3
        $objectManager = $this->getObjectManager();
189 3
        $builder = $objectManager->createQueryBuilder($documentName);
190 3
        if (method_exists($builder, 'map')) {
191
            return $this->getStatusByDocumentMapReduce($builder, $documentName);
192
        }
193
194 3
        $aggregation = $objectManager->createAggregationBuilder($documentName);
195 3
        $aggregation->group()
196 3
            ->field('id')->expression($aggregation->expr()
197 3
                                                ->field('worker_name')
198 3
                                                ->expression('$worker_name')
199 3
                                                ->field('method')
200 3
                                                ->expression('$method')
201 3
                                                ->field('status')
202 3
                                                ->expression('$status')
203
            )
204 3
            ->field('value')
205 3
            ->sum(1);
206 3
        $results = $this->runQuery($aggregation, 'execute', [], []);
207 3
        $allStatus = static::getAllStatuses();
208
209 3
        $status = [];
210
211 3
        foreach ($results as $info) {
212 1
            $key = $info['_id']['worker_name'] . '->' .$info['_id']['method'] . '()';
213 1
            if (!isset($status[$key])) {
214 1
                $status[$key] = $allStatus;
215
            }
216 1
            $status[$key][$info['_id']['status']] = $info['value'];
217
        }
218
219 3
        return $status;
220
    }
221
222 3
    public function getStatus()
223
    {
224 3
        $result = $this->getStatusByDocument($this->getJobClass());
225 3
        $status2 = $this->getStatusByDocument($this->getJobArchiveClass());
226 3
        foreach ($status2 as $key => $value) {
227 1
            foreach ($value as $k => $v) {
228 1
                if (isset($result[$key][$k])) {
229 1
                    $result[$key][$k] += $v;
230
                } else {
231
                    $result[$key][$k] = $v;
232
                }
233
            }
234
        }
235
236 3
        $finalResult = [];
237 3
        foreach ($result as $key => $item) {
238 1
            ksort($item);
239 1
            $finalResult[$key] = $item;
240
        }
241
242 3
        return $finalResult;
243
    }
244
245
    /**
246
     * Get the next job to run (can be filtered by workername and method name).
247
     *
248
     * @param string      $workerName
249
     * @param string      $methodName
250
     * @param bool        $prioritize
251
     * @param string|null $runId
252
     *
253
     * @return \Dtc\QueueBundle\Model\Job
254
     */
255 14
    public function getJob($workerName = null, $methodName = null, $prioritize = true, $runId = null)
256
    {
257 14
        $builder = $this->getJobQueryBuilder($workerName, $methodName, $prioritize);
258
        $builder
259 14
            ->findAndUpdate()
260 14
            ->returnNew();
261
262 14
        $date = Util::getMicrotimeDateTime();
263
        // Update
264
        $builder
265 14
            ->field('startedAt')->set($date)
266 14
            ->field('status')->set(BaseJob::STATUS_RUNNING)
267 14
            ->field('runId')->set($runId);
268
269 14
        $query = $builder->getQuery();
270
271 14
        $job = $this->runQuery($query, 'execute');
272
273 14
        return $job;
274
    }
275
276 23
    protected function runQuery($query, $method, array $arguments = [], $resultIfNamespaceError = null)
277
    {
278
        try {
279 23
            $result = call_user_func_array([$query, $method], $arguments);
280
        } catch (ResultException $resultException) {
281
            if (false === strpos($resultException->getMessage(), 'namespace does not exist') && false === strpos($resultException->getMessage(), 'ns doesn\'t exist')) {
282
                throw $resultException;
283
            }
284
            $result = $resultIfNamespaceError;
285
        }
286
287 23
        return $result;
288
    }
289
290
    /**
291
     * @param string|null $workerName
292
     * @param string|null $methodName
293
     * @param bool        $prioritize
294
     *
295
     * @return Builder
296
     */
297 14
    public function getJobQueryBuilder($workerName = null, $methodName = null, $prioritize = true)
298
    {
299
        /** @var DocumentManager $objectManager */
300 14
        $objectManager = $this->getObjectManager();
301 14
        $builder = $objectManager->createQueryBuilder($this->getJobClass());
302
303 14
        $this->addWorkerNameCriterion($builder, $workerName, $methodName);
304 14
        if ($prioritize) {
305 14
            $builder->sort([
306 14
                'priority' => 'desc',
307
                'whenAt' => 'asc',
308
            ]);
309
        } else {
310 1
            $builder->sort('whenAt', 'asc');
311
        }
312
313
        // Filter
314 14
        $this->addStandardPredicates($builder);
315
316 14
        return $builder;
0 ignored issues
show
Bug Best Practice introduced by
The expression return $builder returns the type Doctrine\ODM\MongoDB\Query\Builder which is incompatible with the documented return type Doctrine\MongoDB\Query\Builder.
Loading history...
317
    }
318
319 1
    protected function updateNearestBatch(\Dtc\QueueBundle\Model\Job $job)
320
    {
321
        /** @var DocumentManager $objectManager */
322 1
        $objectManager = $this->getObjectManager();
323 1
        $builder = $objectManager->createQueryBuilder($this->getJobClass());
324 1
        $builder->find();
325
326 1
        $builder->sort('whenAt', 'asc');
327 1
        $builder->field('status')->equals(BaseJob::STATUS_NEW)
328 1
            ->field('crcHash')->equals($job->getCrcHash());
329 1
        $oldJob = $this->runQuery($builder->getQuery(), 'getSingleResult');
330
331 1
        if (!$oldJob) {
332
            return null;
333
        }
334
335
        // Update priority or whenAt
336
        //  This makes sure if someone else is updating at the same time
337
        //  that we don't trounce their changes.
338 1
        $builder = $objectManager->createQueryBuilder($this->getJobClass());
339 1
        $builder->findAndUpdate();
340 1
        $builder->field('_id')->equals($oldJob->getId());
341 1
        $builder->field('priority')->lt($job->getPriority());
342 1
        $builder->field('priority')->set($job->getPriority());
343 1
        $this->runQuery($builder->getQuery(), 'execute');
344
345 1
        $builder = $objectManager->createQueryBuilder($this->getJobClass());
346 1
        $builder->findAndUpdate();
347 1
        $builder->field('_id')->equals($oldJob->getId());
348 1
        $builder->field('whenAt')->gt($job->getWhenAt());
349 1
        $builder->field('whenAt')->set($job->getWhenAt());
350 1
        $this->runQuery($builder->getQuery(), 'execute');
351
352 1
        if ($job->getWhenAt() < $oldJob->getWhenAt()) {
353 1
            $oldJob->setWhenAt($job->getWhenAt());
354
        }
355 1
        if ($job->getPriority() > $oldJob->getPriority()) {
356 1
            $oldJob->setPriority($job->getPriority());
357
        }
358
359 1
        return $oldJob;
360
    }
361
362
    /**
363
     * @param mixed $builder
364
     */
365 16
    protected function addStandardPredicates($builder)
366
    {
367 16
        $date = Util::getMicrotimeDateTime();
368
        $builder
369 16
            ->addAnd(
370 16
                $builder->expr()->addOr($builder->expr()->field('whenAt')->equals(null), $builder->expr()->field('whenAt')->lte($date)),
371 16
                $builder->expr()->addOr($builder->expr()->field('expiresAt')->equals(null), $builder->expr()->field('expiresAt')->gt($date))
372
            )
373 16
            ->field('status')->equals(BaseJob::STATUS_NEW);
374 16
    }
375
376 1
    public function getWorkersAndMethods()
377
    {
378
        /** @var DocumentManager $documentManager */
379 1
        $documentManager = $this->getObjectManager();
380
381 1
        if (!method_exists($documentManager, 'createAggregationBuilder')) {
382
            return [];
383
        }
384
385 1
        $aggregationBuilder = $documentManager->createAggregationBuilder($this->getJobClass());
386
387 1
        $this->addStandardPredicates($aggregationBuilder->match());
388
389 1
        $aggregationBuilder->group()
390 1
            ->field('id')
391 1
            ->expression(
392 1
                $aggregationBuilder->expr()
393 1
                ->field('workerName')->expression('$workerName')
394 1
                ->field('method')->expression('$method')
395
            );
396 1
        $results = $aggregationBuilder->execute()->toArray();
397
398 1
        if (!$results) {
0 ignored issues
show
Bug Best Practice introduced by
The expression $results of type array is implicitly converted to a boolean; are you sure this is intended? If so, consider using empty($expr) instead to make it clear that you intend to check for an array without elements.

This check marks implicit conversions of arrays to boolean values in a comparison. While in PHP an empty array is considered to be equal (but not identical) to false, this is not always apparent.

Consider making the comparison explicit by using empty(..) or ! empty(...) instead.

Loading history...
399 1
            return [];
400
        }
401
402
        $workersMethods = [];
403
        foreach ($results as $result) {
404
            if (isset($result['_id'])) {
405
                $workersMethods[$result['_id']['worker_name']][] = $result['_id']['method'];
406
            }
407
        }
408
409
        return $workersMethods;
410
    }
411
412
    /**
413
     * @param string $workerName
414
     * @param string $methodName
415
     */
416 2
    public function countLiveJobs($workerName = null, $methodName = null)
417
    {
418
        /** @var DocumentManager $objectManager */
419 2
        $objectManager = $this->getObjectManager();
420 2
        $builder = $objectManager->createQueryBuilder($this->getJobClass());
421
422 2
        $this->addWorkerNameCriterion($builder, $workerName, $methodName);
423
        // Filter
424 2
        $this->addStandardPredicates($builder);
425 2
        $method = 'count';
426 2
        if (method_exists($builder, 'count')) {
427 2
            $builder->count();
428 2
            $method = 'execute';
429
        }
430 2
        return $this->runQuery($builder->getQuery(), $method, [], 0);
431
    }
432
433
    /**
434
     * @param string        $workerName
435
     * @param string        $methodName
436
     * @param callable|null $progressCallback
437
     */
438 1
    public function archiveAllJobs($workerName = null, $methodName = null, callable $progressCallback = null)
439
    {
440
        /** @var DocumentManager $documentManager */
441 1
        $documentManager = $this->getObjectManager();
442 1
        $count = 0;
443 1
        $builder = $this->getJobQueryBuilder($workerName, $methodName, true);
444
        $builder
445 1
            ->findAndUpdate()
446 1
            ->returnNew();
447
448 1
        $builder->field('status')->set(Job::STATUS_ARCHIVE);
449 1
        $query = $builder->getQuery();
450
        do {
451 1
            $job = $this->runQuery($query, 'execute');
452 1
            if ($job) {
453 1
                $documentManager->remove($job);
454 1
                ++$count;
455
456 1
                if (0 == $count % 10) {
457
                    $this->flush();
458
                    $this->updateProgress($progressCallback, $count);
459
                }
460
            }
461 1
        } while ($job);
462 1
        $this->flush();
463 1
        $this->updateProgress($progressCallback, $count);
464 1
    }
465
}
466