Completed
Branch feature/new-scrutinizer (53aa9f)
by Matthew
03:01
created

JobManager::updateNearestBatch()   B

Complexity

Conditions 4
Paths 5

Size

Total Lines 42
Code Lines 27

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 28
CRAP Score 4.0006

Importance

Changes 0
Metric Value
dl 0
loc 42
rs 8.5806
c 0
b 0
f 0
ccs 28
cts 29
cp 0.9655
cc 4
eloc 27
nc 5
nop 1
crap 4.0006
1
<?php
2
3
namespace Dtc\QueueBundle\ODM;
4
5
use Doctrine\MongoDB\Exception\ResultException;
6
use Doctrine\MongoDB\Query\Builder;
7
use Dtc\QueueBundle\Doctrine\DoctrineJobManager;
8
use Doctrine\ODM\MongoDB\DocumentManager;
9
use Dtc\QueueBundle\Document\Job;
10
use Dtc\QueueBundle\Model\BaseJob;
11
use Dtc\QueueBundle\Util\Util;
12
13
class JobManager extends DoctrineJobManager
14
{
15
    use CommonTrait;
16
    const REDUCE_FUNCTION = 'function(k, vals) {
17
            var result = {};
18
            for (var index in vals) {
19
                var val = vals[index];
20
                for (var i in val) {
21
                    if (result.hasOwnProperty(i)) {
22
                        result[i] += val[i];
23
                    }
24
                    else {
25
                        result[i] = val[i];
26
                    }
27
                }
28
            }
29
            return result;
30
        }';
31
32 3
    public function countJobsByStatus($objectName, $status, $workerName = null, $method = null)
33
    {
34
        /** @var DocumentManager $objectManager */
35 3
        $objectManager = $this->getObjectManager();
36 3
        $qb = $objectManager->createQueryBuilder($objectName);
37
        $qb
38 3
            ->find()
39 3
            ->field('status')->equals($status);
40
41 3
        $this->addWorkerNameCriterion($qb, $workerName, $method);
42 3
        $query = $qb->getQuery();
43
44 3
        return $this->runQuery($query, 'count', [], 0);
45
    }
46
47
    /**
48
     * @param string|null $workerName
49
     * @param string|null $method
50
     */
51 1
    public function pruneExceptionJobs($workerName = null, $method = null)
52
    {
53
        /** @var DocumentManager $objectManager */
54 1
        $objectManager = $this->getObjectManager();
55 1
        $qb = $objectManager->createQueryBuilder($this->getJobArchiveClass());
56 1
        $qb = $qb->remove();
57 1
        $qb->field('status')->equals(BaseJob::STATUS_EXCEPTION);
58 1
        $this->addWorkerNameCriterion($qb, $workerName, $method);
59
60 1
        $query = $qb->getQuery();
61 1
        $result = $this->runQuery($query, 'execute');
62 1
        if (isset($result['n'])) {
63 1
            return $result['n'];
64
        }
65
66
        return 0;
67
    }
68
69
    /**
70
     * @param Builder     $builder
71
     * @param string|null $workerName
72
     * @param string|null $method
73
     */
74 20
    protected function addWorkerNameCriterion(Builder $builder, $workerName = null, $method = null)
75
    {
76 20
        if (null !== $workerName) {
77 6
            $builder->field('workerName')->equals($workerName);
78 6
        }
79
80 20
        if (null !== $method) {
81 5
            $builder->field('method')->equals($method);
82 5
        }
83 20
    }
84
85
    /**
86
     * @param null $workerName
87
     * @param null $method
88
     *
89
     * @return int
90
     */
91 2
    protected function updateExpired($workerName = null, $method = null)
92
    {
93
        /** @var DocumentManager $objectManager */
94 2
        $objectManager = $this->getObjectManager();
95 2
        $qb = $objectManager->createQueryBuilder($this->getJobClass());
96 2
        $qb = $qb->updateMany();
97 2
        $qb->field('expiresAt')->lte(Util::getMicrotimeDateTime());
98 2
        $qb->field('status')->equals(BaseJob::STATUS_NEW);
99 2
        $this->addWorkerNameCriterion($qb, $workerName, $method);
100 2
        $qb->field('status')->set(\Dtc\QueueBundle\Model\Job::STATUS_EXPIRED);
101 2
        $query = $qb->getQuery();
102 2
        $result = $this->runQuery($query, 'execute');
103 2
        if (isset($result['n'])) {
104 2
            return $result['n'];
105
        }
106
107
        return 0;
108
    }
109
110
    /**
111
     * Removes archived jobs older than $olderThan.
112
     *
113
     * @param \DateTime $olderThan
114
     *
115
     * @return int
116
     */
117 1
    public function pruneArchivedJobs(\DateTime $olderThan)
118
    {
119 1
        return $this->removeOlderThan($this->getJobArchiveClass(), 'updatedAt', $olderThan);
120
    }
121
122 2 View Code Duplication
    public function getWaitingJobCount($workerName = null, $method = null)
0 ignored issues
show
Duplication introduced by
This method seems to be duplicated in your project.

Duplicated code is one of the most pungent code smells. If you need to duplicate the same code in three or more different places, we strongly encourage you to look into extracting the code into a single class or operation.

You can also find more detailed suggestions in the “Code” section of your repository.

Loading history...
123
    {
124
        /** @var DocumentManager $objectManager */
125 2
        $objectManager = $this->getObjectManager();
126 2
        $builder = $objectManager->createQueryBuilder($this->getJobClass());
127
        $builder
128 2
            ->find();
129
130 2
        $this->addWorkerNameCriterion($builder, $workerName, $method);
131 2
        $this->addStandardPredicates($builder);
132
133 2
        $query = $builder->getQuery();
134
135 2
        return $this->runQuery($query, 'count', [true], 0);
136
    }
137
138
    /**
139
     * Get Status Jobs.
140
     *
141
     * @param string $documentName
142
     *
143
     * @return array
144
     */
145 3
    protected function getStatusByDocument($documentName)
146
    {
147
        // Run a map reduce function get worker and status break down
148
        $mapFunc = "function() {
149
            var result = {};
150
            result[this.status] = 1;
151
            var key = this.worker_name + '->' + this.method + '()';
152
            emit(key, result);
153 3
        }";
154 3
        $reduceFunc = self::REDUCE_FUNCTION;
155
        /** @var DocumentManager $objectManager */
156 3
        $objectManager = $this->getObjectManager();
157 3
        $builder = $objectManager->createQueryBuilder($documentName);
158 3
        $builder->map($mapFunc)
159 3
            ->reduce($reduceFunc);
160 3
        $query = $builder->getQuery();
161 3
        $results = $this->runQuery($query, 'execute', [], []);
162 3
        $allStatus = static::getAllStatuses();
163
164 3
        $status = [];
165
166 3
        foreach ($results as $info) {
167 1
            $status[$info['_id']] = $info['value'] + $allStatus;
168 3
        }
169
170 3
        return $status;
171
    }
172
173 3
    public function getStatus()
174
    {
175 3
        $result = $this->getStatusByDocument($this->getJobClass());
176 3
        $status2 = $this->getStatusByDocument($this->getJobArchiveClass());
177 3
        foreach ($status2 as $key => $value) {
178 1
            foreach ($value as $k => $v) {
179 1
                if (isset($result[$key][$k])) {
180 1
                    $result[$key][$k] += $v;
181 1
                } else {
182
                    $result[$key][$k] = $v;
183
                }
184 1
            }
185 3
        }
186
187 3
        $finalResult = [];
188 3
        foreach ($result as $key => $item) {
189 1
            ksort($item);
190 1
            $finalResult[$key] = $item;
191 3
        }
192
193 3
        return $finalResult;
194
    }
195
196
    /**
197
     * Get the next job to run (can be filtered by workername and method name).
198
     *
199
     * @param string      $workerName
200
     * @param string      $methodName
201
     * @param bool        $prioritize
202
     * @param string|null $runId
203
     *
204
     * @return \Dtc\QueueBundle\Model\Job
205
     */
206 14
    public function getJob($workerName = null, $methodName = null, $prioritize = true, $runId = null)
207
    {
208 14
        $builder = $this->getJobQueryBuilder($workerName, $methodName, $prioritize);
209
        $builder
210 14
            ->findAndUpdate()
211 14
            ->returnNew();
212
213 14
        $date = Util::getMicrotimeDateTime();
214
        // Update
215
        $builder
216 14
            ->field('startedAt')->set($date)
217 14
            ->field('status')->set(BaseJob::STATUS_RUNNING)
218 14
            ->field('runId')->set($runId);
219
220 14
        $query = $builder->getQuery();
221
222 14
        $job = $this->runQuery($query, 'execute');
223
224 14
        return $job;
225
    }
226
227 23
    protected function runQuery(\Doctrine\MongoDB\Query\Query $query, $method, array $arguments = [], $resultIfNamespaceError = null)
228
    {
229
        try {
230 23
            $result = call_user_func_array([$query, $method], $arguments);
231 23
        } catch (ResultException $resultException) {
232
            if (false === strpos($resultException->getMessage(), 'namespace does not exist') && false === strpos($resultException->getMessage(), 'ns doesn\'t exist')) {
233
                throw $resultException;
234
            }
235
            $result = $resultIfNamespaceError;
236
        }
237
238 23
        return $result;
239
    }
240
241
    /**
242
     * @param string|null $workerName
243
     * @param string|null $methodName
244
     * @param bool        $prioritize
245
     *
246
     * @return Builder
247
     */
248 14
    public function getJobQueryBuilder($workerName = null, $methodName = null, $prioritize = true)
249
    {
250
        /** @var DocumentManager $objectManager */
251 14
        $objectManager = $this->getObjectManager();
252 14
        $builder = $objectManager->createQueryBuilder($this->getJobClass());
253
254 14
        $this->addWorkerNameCriterion($builder, $workerName, $methodName);
255 14
        if ($prioritize) {
256 14
            $builder->sort([
257 14
                'priority' => 'desc',
258 14
                'whenAt' => 'asc',
259 14
            ]);
260 14
        } else {
261 1
            $builder->sort('whenAt', 'asc');
262
        }
263
264
        // Filter
265 14
        $this->addStandardPredicates($builder);
266
267 14
        return $builder;
268
    }
269
270 1
    protected function updateNearestBatch(\Dtc\QueueBundle\Model\Job $job)
271
    {
272
        /** @var DocumentManager $objectManager */
273 1
        $objectManager = $this->getObjectManager();
274 1
        $builder = $objectManager->createQueryBuilder($this->getJobClass());
275 1
        $builder->find();
276
277 1
        $builder->sort('whenAt', 'asc');
278 1
        $builder->field('status')->equals(BaseJob::STATUS_NEW)
279 1
            ->field('crcHash')->equals($job->getCrcHash());
280 1
        $oldJob = $this->runQuery($builder->getQuery(), 'getSingleResult');
281
282 1
        if (!$oldJob) {
283
            return null;
284
        }
285
286
        // Update priority or whenAt
287
        //  This makes sure if someone else is updating at the same time
288
        //  that we don't trounce their changes.
289 1
        $builder = $objectManager->createQueryBuilder($this->getJobClass());
290 1
        $builder->findAndUpdate();
291 1
        $builder->field('_id')->equals($oldJob->getId());
292 1
        $builder->field('priority')->lt($job->getPriority());
293 1
        $builder->field('priority')->set($job->getPriority());
294 1
        $this->runQuery($builder->getQuery(), 'execute');
295
296 1
        $builder = $objectManager->createQueryBuilder($this->getJobClass());
297 1
        $builder->findAndUpdate();
298 1
        $builder->field('_id')->equals($oldJob->getId());
299 1
        $builder->field('whenAt')->gt($job->getWhenAt());
300 1
        $builder->field('whenAt')->set($job->getWhenAt());
301 1
        $this->runQuery($builder->getQuery(), 'execute');
302
303 1
        if ($job->getWhenAt() < $oldJob->getWhenAt()) {
304 1
            $oldJob->setWhenAt($job->getWhenAt());
305 1
        }
306 1
        if ($job->getPriority() > $oldJob->getPriority()) {
307 1
            $oldJob->setPriority($job->getPriority());
308 1
        }
309
310 1
        return $oldJob;
311
    }
312
313
    /**
314
     * @param mixed $builder
315
     */
316 16
    protected function addStandardPredicates($builder)
317
    {
318 16
        $date = Util::getMicrotimeDateTime();
319
        $builder
320 16
            ->addAnd(
321 16
                $builder->expr()->addOr($builder->expr()->field('whenAt')->equals(null), $builder->expr()->field('whenAt')->lte($date)),
322 16
                $builder->expr()->addOr($builder->expr()->field('expiresAt')->equals(null), $builder->expr()->field('expiresAt')->gt($date))
323 16
            )
324 16
            ->field('status')->equals(BaseJob::STATUS_NEW);
325 16
    }
326
327 1
    public function getWorkersAndMethods()
328
    {
329
        /** @var DocumentManager $documentManager */
330 1
        $documentManager = $this->getObjectManager();
331
332 1
        if (!method_exists($documentManager, 'createAggregationBuilder')) {
333
            return [];
334
        }
335
336 1
        $aggregationBuilder = $documentManager->createAggregationBuilder($this->getJobClass());
337
338 1
        $this->addStandardPredicates($aggregationBuilder->match());
339
340 1
        $aggregationBuilder->group()
341 1
            ->field('id')
342 1
            ->expression(
343 1
                $aggregationBuilder->expr()
344 1
                ->field('workerName')->expression('$workerName')
345 1
                ->field('method')->expression('$method')
346 1
            );
347 1
        $results = $aggregationBuilder->execute()->toArray();
348
349 1
        if (!$results) {
350 1
            return [];
351
        }
352
353
        $workersMethods = [];
354
        foreach ($results as $result) {
355
            if (isset($result['_id'])) {
356
                $workersMethods[$result['_id']['worker_name']][] = $result['_id']['method'];
357
            }
358
        }
359
360
        return $workersMethods;
361
    }
362
363
    /**
364
     * @param string $workerName
365
     * @param string $methodName
366
     */
367 2
    public function countLiveJobs($workerName = null, $methodName = null)
368
    {
369
        /** @var DocumentManager $objectManager */
370 2
        $objectManager = $this->getObjectManager();
371 2
        $builder = $objectManager->createQueryBuilder($this->getJobClass());
372
373 2
        $this->addWorkerNameCriterion($builder, $workerName, $methodName);
374
        // Filter
375 2
        $this->addStandardPredicates($builder);
376
377 2
        return $this->runQuery($builder->getQuery(), 'count', [], 0);
378
    }
379
380
    /**
381
     * @param string        $workerName
382
     * @param string        $methodName
383
     * @param callable|null $progressCallback
384
     */
385 2
    public function archiveAllJobs($workerName = null, $methodName = null, callable $progressCallback = null)
386
    {
387
        /** @var DocumentManager $documentManager */
388 1
        $documentManager = $this->getObjectManager();
389 1
        $count = 0;
390 1
        $builder = $this->getJobQueryBuilder($workerName, $methodName, true);
391 1
        $builder
392 2
            ->findAndUpdate()
393 1
            ->returnNew();
394
395 1
        $builder->field('status')->set(Job::STATUS_ARCHIVE);
396 1
        $query = $builder->getQuery();
397
        do {
398 1
            $job = $this->runQuery($query, 'execute');
399 1
            if ($job) {
400 1
                $documentManager->remove($job);
401 1
                ++$count;
402
403 1
                if (0 == $count % 10) {
404
                    $this->flush();
405
                    $this->updateProgress($progressCallback, $count);
406
                }
407 1
            }
408 1
        } while ($job);
409 1
        $this->flush();
410 1
        $this->updateProgress($progressCallback, $count);
411 1
    }
412
}
413