Passed
Push — master ( 82017d...f35517 )
by Matthew
08:19
created

JobManager::pruneExceptionJobs()   A

Complexity

Conditions 3
Paths 3

Size

Total Lines 18
Code Lines 12

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 10
CRAP Score 3.1105

Importance

Changes 0
Metric Value
cc 3
eloc 12
c 0
b 0
f 0
nc 3
nop 2
dl 0
loc 18
ccs 10
cts 13
cp 0.7692
crap 3.1105
rs 9.8666
1
<?php
2
3
namespace Dtc\QueueBundle\ODM;
4
5
use Doctrine\MongoDB\Exception\ResultException;
0 ignored issues
show
Bug introduced by
The type Doctrine\MongoDB\Exception\ResultException was not found. Maybe you did not declare it correctly or list all dependencies?

The issue could also be caused by a filter entry in the build configuration. If the path has been excluded in your configuration, e.g. excluded_paths: ["lib/*"], you can move it to the dependency path list as follows:

filter:
    dependency_paths: ["lib/*"]

For further information see https://scrutinizer-ci.com/docs/tools/php/php-scrutinizer/#list-dependency-paths

Loading history...
6
use Doctrine\MongoDB\Query\Builder;
0 ignored issues
show
Bug introduced by
The type Doctrine\MongoDB\Query\Builder was not found. Maybe you did not declare it correctly or list all dependencies?

The issue could also be caused by a filter entry in the build configuration. If the path has been excluded in your configuration, e.g. excluded_paths: ["lib/*"], you can move it to the dependency path list as follows:

filter:
    dependency_paths: ["lib/*"]

For further information see https://scrutinizer-ci.com/docs/tools/php/php-scrutinizer/#list-dependency-paths

Loading history...
7
use Doctrine\ODM\MongoDB\DocumentManager;
8
use Dtc\QueueBundle\Doctrine\DoctrineJobManager;
9
use Dtc\QueueBundle\Document\Job;
10
use Dtc\QueueBundle\Model\BaseJob;
11
use Dtc\QueueBundle\Util\Util;
12
use MongoDB\DeleteResult;
13
use MongoDB\UpdateResult;
14
15
class JobManager extends DoctrineJobManager
16
{
17
    use CommonTrait;
18
    const REDUCE_FUNCTION = 'function(k, vals) {
19
            var result = {};
20
            for (var index in vals) {
21
                var val = vals[index];
22
                for (var i in val) {
23
                    if (result.hasOwnProperty(i)) {
24
                        result[i] += val[i];
25
                    }
26
                    else {
27
                        result[i] = val[i];
28
                    }
29
                }
30
            }
31
            return result;
32
        }';
33
34 3
    public function countJobsByStatus($objectName, $status, $workerName = null, $method = null)
35
    {
36
        /** @var DocumentManager $objectManager */
37 3
        $objectManager = $this->getObjectManager();
38 3
        $qb = $objectManager->createQueryBuilder($objectName);
39
        $qb
40 3
            ->find()
41 3
            ->field('status')->equals($status);
42
43 3
        $this->addWorkerNameCriterion($qb, $workerName, $method);
44 3
        $method = 'count';
45 3
        if (method_exists($qb, 'count')) {
46 3
            $method = 'execute';
47 3
            $qb->count();
48
        }
49 3
        $query = $qb->getQuery();
50
51 3
        return $this->runQuery($query, $method, [], 0);
52
    }
53
54
    /**
55
     * @param string|null $workerName
56
     * @param string|null $method
57
     */
58 1
    public function pruneExceptionJobs($workerName = null, $method = null)
59
    {
60
        /** @var DocumentManager $objectManager */
61 1
        $objectManager = $this->getObjectManager();
62 1
        $qb = $objectManager->createQueryBuilder($this->getJobArchiveClass());
63 1
        $qb = $qb->remove();
64 1
        $qb->field('status')->equals(BaseJob::STATUS_EXCEPTION);
65 1
        $this->addWorkerNameCriterion($qb, $workerName, $method);
66
67 1
        $query = $qb->getQuery();
68 1
        $result = $this->runQuery($query, 'execute');
69 1
        if ($result instanceof DeleteResult) {
70 1
            return $result->getDeletedCount();
71
        } elseif (isset($result['n'])) {
72
            return $result['n'];
73
        }
74
75
        return 0;
76
    }
77
78
    /**
79
     * @param $builder
80
     * @param string|null $workerName
81
     * @param string|null $method
82
     */
83 20
    protected function addWorkerNameCriterion($builder, $workerName = null, $method = null)
84
    {
85 20
        if (null !== $workerName) {
86 6
            $builder->field('workerName')->equals($workerName);
87
        }
88
89 20
        if (null !== $method) {
90 5
            $builder->field('method')->equals($method);
91
        }
92 20
    }
93
94
    /**
95
     * @param null $workerName
0 ignored issues
show
Documentation Bug introduced by
Are you sure the doc-type for parameter $workerName is correct as it would always require null to be passed?
Loading history...
96
     * @param null $method
0 ignored issues
show
Documentation Bug introduced by
Are you sure the doc-type for parameter $method is correct as it would always require null to be passed?
Loading history...
97
     *
98
     * @return int
99
     */
100 2
    protected function updateExpired($workerName = null, $method = null)
101
    {
102
        /** @var DocumentManager $objectManager */
103 2
        $objectManager = $this->getObjectManager();
104 2
        $qb = $objectManager->createQueryBuilder($this->getJobClass());
105 2
        $qb = $qb->updateMany();
106 2
        $qb->field('expiresAt')->lte(Util::getMicrotimeDateTime());
107 2
        $qb->field('status')->equals(BaseJob::STATUS_NEW);
108 2
        $this->addWorkerNameCriterion($qb, $workerName, $method);
109 2
        $qb->field('status')->set(\Dtc\QueueBundle\Model\Job::STATUS_EXPIRED);
110 2
        $query = $qb->getQuery();
111 2
        $result = $this->runQuery($query, 'execute');
112 2
        if ($result instanceof UpdateResult) {
113 2
            return $result->getModifiedCount();
114
        } elseif (isset($result['n'])) {
115
            return $result['n'];
116
        }
117
118
        return 0;
119
    }
120
121
    /**
122
     * Removes archived jobs older than $olderThan.
123
     *
124
     * @return int
125
     */
126 1
    public function pruneArchivedJobs(\DateTime $olderThan)
127
    {
128 1
        return $this->removeOlderThan($this->getJobArchiveClass(), 'updatedAt', $olderThan);
129
    }
130
131 2
    public function getWaitingJobCount($workerName = null, $method = null)
132
    {
133
        /** @var DocumentManager $objectManager */
134 2
        $objectManager = $this->getObjectManager();
135 2
        $builder = $objectManager->createQueryBuilder($this->getJobClass());
136
        $builder
137 2
            ->find();
138
139 2
        $this->addWorkerNameCriterion($builder, $workerName, $method);
140 2
        $this->addStandardPredicates($builder);
141 2
        $method = 'count';
142 2
        if (method_exists($builder, 'count')) {
143 2
            $builder->count();
144 2
            $method = 'execute';
145
        }
146 2
        $query = $builder->getQuery();
147
148 2
        return $this->runQuery($query, $method, [true], 0);
149
    }
150
151
    protected function getStatusByDocumentMapReduce($builder, $documentName)
152
    {
153
        $reduceFunc = self::REDUCE_FUNCTION;
154
        $mapFunc = "function() {
155
            var result = {};
156
            result[this.status] = 1;
157
            var key = this.worker_name + '->' + this.method + '()';
158
            emit(key, result);
159
            }";
160
        $builder->map($mapFunc)
161
            ->reduce($reduceFunc);
162
        $query = $builder->getQuery();
163
        $results = $this->runQuery($query, 'execute', [], []);
164
        $allStatus = static::getAllStatuses();
165
166
        $status = [];
167
168
        foreach ($results as $info) {
169
            $status[$info['_id']] = $info['value'] + $allStatus;
170
        }
171
172
        return $status;
173
    }
174
175
    /**
176
     * Get Status Jobs.
177
     *
178
     * @param string $documentName
179
     *
180
     * @return array
181
     */
182 3
    protected function getStatusByDocument($documentName)
183
    {
184
        // Run a map reduce function get worker and status break down
185
        /** @var DocumentManager $objectManager */
186 3
        $objectManager = $this->getObjectManager();
187 3
        $builder = $objectManager->createQueryBuilder($documentName);
188 3
        if (method_exists($builder, 'map')) {
189
            return $this->getStatusByDocumentMapReduce($builder, $documentName);
190
        }
191
192 3
        $aggregation = $objectManager->createAggregationBuilder($documentName);
193 3
        $aggregation->group()
194 3
            ->field('id')->expression(
195 3
                $aggregation->expr()
196 3
                                                ->field('worker_name')
197 3
                                                ->expression('$worker_name')
198 3
                                                ->field('method')
199 3
                                                ->expression('$method')
200 3
                                                ->field('status')
201 3
                                                ->expression('$status')
202
            )
203 3
            ->field('value')
204 3
            ->sum(1);
205 3
        $results = $this->runQuery($aggregation, 'execute', [], []);
206 3
        $allStatus = static::getAllStatuses();
207
208 3
        $status = [];
209
210 3
        foreach ($results as $info) {
211 1
            $key = $info['_id']['worker_name'].'->'.$info['_id']['method'].'()';
212 1
            if (!isset($status[$key])) {
213 1
                $status[$key] = $allStatus;
214
            }
215 1
            $status[$key][$info['_id']['status']] = $info['value'];
216
        }
217
218 3
        return $status;
219
    }
220
221 3
    public function getStatus()
222
    {
223 3
        $result = $this->getStatusByDocument($this->getJobClass());
224 3
        $status2 = $this->getStatusByDocument($this->getJobArchiveClass());
225 3
        foreach ($status2 as $key => $value) {
226 1
            foreach ($value as $k => $v) {
227 1
                if (isset($result[$key][$k])) {
228 1
                    $result[$key][$k] += $v;
229
                } else {
230
                    $result[$key][$k] = $v;
231
                }
232
            }
233
        }
234
235 3
        $finalResult = [];
236 3
        foreach ($result as $key => $item) {
237 1
            ksort($item);
238 1
            $finalResult[$key] = $item;
239
        }
240
241 3
        return $finalResult;
242
    }
243
244
    /**
245
     * Get the next job to run (can be filtered by workername and method name).
246
     *
247
     * @param string      $workerName
248
     * @param string      $methodName
249
     * @param bool        $prioritize
250
     * @param string|null $runId
251
     *
252
     * @return \Dtc\QueueBundle\Model\Job
253
     */
254 14
    public function getJob($workerName = null, $methodName = null, $prioritize = true, $runId = null)
255
    {
256 14
        $builder = $this->getJobQueryBuilder($workerName, $methodName, $prioritize);
257
        $builder
258 14
            ->findAndUpdate()
259 14
            ->returnNew();
260
261 14
        $date = Util::getMicrotimeDateTime();
262
        // Update
263
        $builder
264 14
            ->field('startedAt')->set($date)
265 14
            ->field('status')->set(BaseJob::STATUS_RUNNING)
266 14
            ->field('runId')->set($runId);
267
268 14
        $query = $builder->getQuery();
269
270 14
        $job = $this->runQuery($query, 'execute');
271
272 14
        return $job;
273
    }
274
275 23
    protected function runQuery($query, $method, array $arguments = [], $resultIfNamespaceError = null)
276
    {
277
        try {
278 23
            $result = call_user_func_array([$query, $method], $arguments);
279
        } catch (ResultException $resultException) {
280
            if (false === strpos($resultException->getMessage(), 'namespace does not exist') && false === strpos($resultException->getMessage(), 'ns doesn\'t exist')) {
281
                throw $resultException;
282
            }
283
            $result = $resultIfNamespaceError;
284
        }
285
286 23
        return $result;
287
    }
288
289
    /**
290
     * @param string|null $workerName
291
     * @param string|null $methodName
292
     * @param bool        $prioritize
293
     *
294
     * @return Builder
295
     */
296 14
    public function getJobQueryBuilder($workerName = null, $methodName = null, $prioritize = true)
297
    {
298
        /** @var DocumentManager $objectManager */
299 14
        $objectManager = $this->getObjectManager();
300 14
        $builder = $objectManager->createQueryBuilder($this->getJobClass());
301
302 14
        $this->addWorkerNameCriterion($builder, $workerName, $methodName);
303 14
        if ($prioritize) {
304 14
            $builder->sort([
305 14
                'priority' => 'desc',
306
                'whenAt' => 'asc',
307
            ]);
308
        } else {
309 1
            $builder->sort('whenAt', 'asc');
310
        }
311
312
        // Filter
313 14
        $this->addStandardPredicates($builder);
314
315 14
        return $builder;
0 ignored issues
show
Bug Best Practice introduced by
The expression return $builder returns the type Doctrine\ODM\MongoDB\Query\Builder which is incompatible with the documented return type Doctrine\MongoDB\Query\Builder.
Loading history...
316
    }
317
318 1
    protected function updateNearestBatch(\Dtc\QueueBundle\Model\Job $job)
319
    {
320
        /** @var DocumentManager $objectManager */
321 1
        $objectManager = $this->getObjectManager();
322 1
        $builder = $objectManager->createQueryBuilder($this->getJobClass());
323 1
        $builder->find();
324
325 1
        $builder->sort('whenAt', 'asc');
326 1
        $builder->field('status')->equals(BaseJob::STATUS_NEW)
327 1
            ->field('crcHash')->equals($job->getCrcHash());
328 1
        $oldJob = $this->runQuery($builder->getQuery(), 'getSingleResult');
329
330 1
        if (!$oldJob) {
331
            return null;
332
        }
333
334
        // Update priority or whenAt
335
        //  This makes sure if someone else is updating at the same time
336
        //  that we don't trounce their changes.
337 1
        $builder = $objectManager->createQueryBuilder($this->getJobClass());
338 1
        $builder->findAndUpdate();
339 1
        $builder->field('_id')->equals($oldJob->getId());
340 1
        $builder->field('priority')->lt($job->getPriority());
341 1
        $builder->field('priority')->set($job->getPriority());
342 1
        $this->runQuery($builder->getQuery(), 'execute');
343
344 1
        $builder = $objectManager->createQueryBuilder($this->getJobClass());
345 1
        $builder->findAndUpdate();
346 1
        $builder->field('_id')->equals($oldJob->getId());
347 1
        $builder->field('whenAt')->gt($job->getWhenAt());
348 1
        $builder->field('whenAt')->set($job->getWhenAt());
349 1
        $this->runQuery($builder->getQuery(), 'execute');
350
351 1
        if ($job->getWhenAt() < $oldJob->getWhenAt()) {
352 1
            $oldJob->setWhenAt($job->getWhenAt());
353
        }
354 1
        if ($job->getPriority() > $oldJob->getPriority()) {
355 1
            $oldJob->setPriority($job->getPriority());
356
        }
357
358 1
        return $oldJob;
359
    }
360
361
    /**
362
     * @param mixed $builder
363
     */
364 16
    protected function addStandardPredicates($builder)
365
    {
366 16
        $date = Util::getMicrotimeDateTime();
367
        $builder
368 16
            ->addAnd(
369 16
                $builder->expr()->addOr($builder->expr()->field('whenAt')->equals(null), $builder->expr()->field('whenAt')->lte($date)),
370 16
                $builder->expr()->addOr($builder->expr()->field('expiresAt')->equals(null), $builder->expr()->field('expiresAt')->gt($date))
371
            )
372 16
            ->field('status')->equals(BaseJob::STATUS_NEW);
373 16
    }
374
375 1
    public function getWorkersAndMethods()
376
    {
377
        /** @var DocumentManager $documentManager */
378 1
        $documentManager = $this->getObjectManager();
379
380 1
        if (!method_exists($documentManager, 'createAggregationBuilder')) {
381
            return [];
382
        }
383
384 1
        $aggregationBuilder = $documentManager->createAggregationBuilder($this->getJobClass());
385
386 1
        $this->addStandardPredicates($aggregationBuilder->match());
387
388 1
        $aggregationBuilder->group()
389 1
            ->field('id')
390 1
            ->expression(
391 1
                $aggregationBuilder->expr()
392 1
                ->field('workerName')->expression('$workerName')
393 1
                ->field('method')->expression('$method')
394
            );
395 1
        $results = $aggregationBuilder->execute()->toArray();
396
397 1
        if (!$results) {
0 ignored issues
show
Bug Best Practice introduced by
The expression $results of type array is implicitly converted to a boolean; are you sure this is intended? If so, consider using empty($expr) instead to make it clear that you intend to check for an array without elements.

This check marks implicit conversions of arrays to boolean values in a comparison. While in PHP an empty array is considered to be equal (but not identical) to false, this is not always apparent.

Consider making the comparison explicit by using empty(..) or ! empty(...) instead.

Loading history...
398 1
            return [];
399
        }
400
401
        $workersMethods = [];
402
        foreach ($results as $result) {
403
            if (isset($result['_id'])) {
404
                $workersMethods[$result['_id']['worker_name']][] = $result['_id']['method'];
405
            }
406
        }
407
408
        return $workersMethods;
409
    }
410
411
    /**
412
     * @param string $workerName
413
     * @param string $methodName
414
     */
415 2
    public function countLiveJobs($workerName = null, $methodName = null)
416
    {
417
        /** @var DocumentManager $objectManager */
418 2
        $objectManager = $this->getObjectManager();
419 2
        $builder = $objectManager->createQueryBuilder($this->getJobClass());
420
421 2
        $this->addWorkerNameCriterion($builder, $workerName, $methodName);
422
        // Filter
423 2
        $this->addStandardPredicates($builder);
424 2
        $method = 'count';
425 2
        if (method_exists($builder, 'count')) {
426 2
            $builder->count();
427 2
            $method = 'execute';
428
        }
429
430 2
        return $this->runQuery($builder->getQuery(), $method, [], 0);
431
    }
432
433
    /**
434
     * @param string $workerName
435
     * @param string $methodName
436
     */
437 1
    public function archiveAllJobs($workerName = null, $methodName = null, callable $progressCallback = null)
438
    {
439
        /** @var DocumentManager $documentManager */
440 1
        $documentManager = $this->getObjectManager();
441 1
        $count = 0;
442 1
        $builder = $this->getJobQueryBuilder($workerName, $methodName, true);
443
        $builder
444 1
            ->findAndUpdate()
445 1
            ->returnNew();
446
447 1
        $builder->field('status')->set(Job::STATUS_ARCHIVE);
448 1
        $query = $builder->getQuery();
449
        do {
450 1
            $job = $this->runQuery($query, 'execute');
451 1
            if ($job) {
452 1
                $documentManager->remove($job);
453 1
                ++$count;
454
455 1
                if (0 == $count % 10) {
456
                    $this->flush();
457
                    $this->updateProgress($progressCallback, $count);
458
                }
459
            }
460 1
        } while ($job);
461 1
        $this->flush();
462 1
        $this->updateProgress($progressCallback, $count);
463 1
    }
464
}
465