Passed
Push — master ( 4d8734...991786 )
by Matthew
09:35 queued 06:44
created

JobManager::updateNearestBatch()   A

Complexity

Conditions 4
Paths 5

Size

Total Lines 41
Code Lines 26

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 26
CRAP Score 4.0008

Importance

Changes 0
Metric Value
cc 4
eloc 26
nc 5
nop 1
dl 0
loc 41
ccs 26
cts 27
cp 0.963
crap 4.0008
rs 9.504
c 0
b 0
f 0
1
<?php
2
3
namespace Dtc\QueueBundle\ODM;
4
5
use Doctrine\MongoDB\Exception\ResultException;
6
use Doctrine\MongoDB\Query\Builder;
7
use Dtc\QueueBundle\Doctrine\DoctrineJobManager;
8
use Doctrine\ODM\MongoDB\DocumentManager;
9
use Dtc\QueueBundle\Document\Job;
10
use Dtc\QueueBundle\Model\BaseJob;
11
use Dtc\QueueBundle\Util\Util;
12
13
class JobManager extends DoctrineJobManager
14
{
15
    use CommonTrait;
16
    const REDUCE_FUNCTION = 'function(k, vals) {
17
            var result = {};
18
            for (var index in vals) {
19
                var val = vals[index];
20
                for (var i in val) {
21
                    if (result.hasOwnProperty(i)) {
22
                        result[i] += val[i];
23
                    }
24
                    else {
25
                        result[i] = val[i];
26
                    }
27
                }
28
            }
29
            return result;
30
        }';
31
32 3
    public function countJobsByStatus($objectName, $status, $workerName = null, $method = null)
33
    {
34
        /** @var DocumentManager $objectManager */
35 3
        $objectManager = $this->getObjectManager();
36 3
        $qb = $objectManager->createQueryBuilder($objectName);
37
        $qb
38 3
            ->find()
39 3
            ->field('status')->equals($status);
40
41 3
        $this->addWorkerNameCriterion($qb, $workerName, $method);
42 3
        $query = $qb->getQuery();
43
44 3
        return $this->runQuery($query, 'count', [], 0);
45
    }
46
47
    /**
48
     * @param string|null $workerName
49
     * @param string|null $method
50
     */
51 1
    public function pruneExceptionJobs($workerName = null, $method = null)
52
    {
53
        /** @var DocumentManager $objectManager */
54 1
        $objectManager = $this->getObjectManager();
55 1
        $qb = $objectManager->createQueryBuilder($this->getJobArchiveClass());
56 1
        $qb = $qb->remove();
57 1
        $qb->field('status')->equals(BaseJob::STATUS_EXCEPTION);
58 1
        $this->addWorkerNameCriterion($qb, $workerName, $method);
59
60 1
        $query = $qb->getQuery();
61 1
        $result = $this->runQuery($query, 'execute');
62 1
        if (isset($result['n'])) {
63 1
            return $result['n'];
64
        }
65
66
        return 0;
67
    }
68
69
    /**
70
     * @param Builder     $builder
71
     * @param string|null $workerName
72
     * @param string|null $method
73
     */
74 20
    protected function addWorkerNameCriterion(Builder $builder, $workerName = null, $method = null)
75
    {
76 20
        if (null !== $workerName) {
77 6
            $builder->field('workerName')->equals($workerName);
78
        }
79
80 20
        if (null !== $method) {
81 5
            $builder->field('method')->equals($method);
82
        }
83 20
    }
84
85
    /**
86
     * @param null $workerName
0 ignored issues
show
Documentation Bug introduced by
Are you sure the doc-type for parameter $workerName is correct as it would always require null to be passed?
Loading history...
87
     * @param null $method
0 ignored issues
show
Documentation Bug introduced by
Are you sure the doc-type for parameter $method is correct as it would always require null to be passed?
Loading history...
88
     *
89
     * @return int
90
     */
91 2
    protected function updateExpired($workerName = null, $method = null)
92
    {
93
        /** @var DocumentManager $objectManager */
94 2
        $objectManager = $this->getObjectManager();
95 2
        $qb = $objectManager->createQueryBuilder($this->getJobClass());
96 2
        $qb = $qb->updateMany();
97 2
        $qb->field('expiresAt')->lte(Util::getMicrotimeDateTime());
98 2
        $qb->field('status')->equals(BaseJob::STATUS_NEW);
99 2
        $this->addWorkerNameCriterion($qb, $workerName, $method);
100 2
        $qb->field('status')->set(\Dtc\QueueBundle\Model\Job::STATUS_EXPIRED);
101 2
        $query = $qb->getQuery();
102 2
        $result = $this->runQuery($query, 'execute');
103 2
        if (isset($result['n'])) {
104 2
            return $result['n'];
105
        }
106
107
        return 0;
108
    }
109
110
    /**
111
     * Removes archived jobs older than $olderThan.
112
     *
113
     * @param \DateTime $olderThan
114
     *
115
     * @return int
116
     */
117 1
    public function pruneArchivedJobs(\DateTime $olderThan)
118
    {
119 1
        return $this->removeOlderThan($this->getJobArchiveClass(), 'updatedAt', $olderThan);
120
    }
121
122 2
    public function getWaitingJobCount($workerName = null, $method = null)
123
    {
124
        /** @var DocumentManager $objectManager */
125 2
        $objectManager = $this->getObjectManager();
126 2
        $builder = $objectManager->createQueryBuilder($this->getJobClass());
127
        $builder
128 2
            ->find();
129
130 2
        $this->addWorkerNameCriterion($builder, $workerName, $method);
131 2
        $this->addStandardPredicates($builder);
132
133 2
        $query = $builder->getQuery();
134
135 2
        return $this->runQuery($query, 'count', [true], 0);
136
    }
137
138
    /**
139
     * Get Status Jobs.
140
     *
141
     * @param string $documentName
142
     *
143
     * @return array
144
     */
145 3
    protected function getStatusByDocument($documentName)
146
    {
147
        // Run a map reduce function get worker and status break down
148 3
        $mapFunc = "function() {
149
            var result = {};
150
            result[this.status] = 1;
151
            var key = this.worker_name + '->' + this.method + '()';
152
            emit(key, result);
153
        }";
154 3
        $reduceFunc = self::REDUCE_FUNCTION;
155
        /** @var DocumentManager $objectManager */
156 3
        $objectManager = $this->getObjectManager();
157 3
        $builder = $objectManager->createQueryBuilder($documentName);
158 3
        $builder->map($mapFunc)
159 3
            ->reduce($reduceFunc);
160 3
        $query = $builder->getQuery();
161 3
        $results = $this->runQuery($query, 'execute', [], []);
162 3
        $allStatus = static::getAllStatuses();
163
164 3
        $status = [];
165
166 3
        foreach ($results as $info) {
167 1
            $status[$info['_id']] = $info['value'] + $allStatus;
168
        }
169
170 3
        return $status;
171
    }
172
173 3
    public function getStatus()
174
    {
175 3
        $result = $this->getStatusByDocument($this->getJobClass());
176 3
        $status2 = $this->getStatusByDocument($this->getJobArchiveClass());
177 3
        foreach ($status2 as $key => $value) {
178 1
            foreach ($value as $k => $v) {
179 1
                if (isset($result[$key][$k])) {
180 1
                    $result[$key][$k] += $v;
181
                } else {
182 1
                    $result[$key][$k] = $v;
183
                }
184
            }
185
        }
186
187 3
        $finalResult = [];
188 3
        foreach ($result as $key => $item) {
189 1
            ksort($item);
190 1
            $finalResult[$key] = $item;
191
        }
192
193 3
        return $finalResult;
194
    }
195
196
    /**
197
     * Get the next job to run (can be filtered by workername and method name).
198
     *
199
     * @param string      $workerName
200
     * @param string      $methodName
201
     * @param bool        $prioritize
202
     * @param string|null $runId
203
     *
204
     * @return \Dtc\QueueBundle\Model\Job
205
     */
206 14
    public function getJob($workerName = null, $methodName = null, $prioritize = true, $runId = null)
207
    {
208 14
        $builder = $this->getJobQueryBuilder($workerName, $methodName, $prioritize);
209
        $builder
210 14
            ->findAndUpdate()
211 14
            ->returnNew();
212
213 14
        $date = Util::getMicrotimeDateTime();
214
        // Update
215
        $builder
216 14
            ->field('startedAt')->set($date)
217 14
            ->field('status')->set(BaseJob::STATUS_RUNNING)
218 14
            ->field('runId')->set($runId);
219
220 14
        $query = $builder->getQuery();
221
222 14
        $job = $this->runQuery($query, 'execute');
223
224 14
        return $job;
225
    }
226
227 23
    protected function runQuery(\Doctrine\MongoDB\Query\Query $query, $method, array $arguments = [], $resultIfNamespaceError = null)
228
    {
229
        try {
230 23
            $result = call_user_func_array([$query, $method], $arguments);
231
        } catch (ResultException $resultException) {
232
            if (false === strpos($resultException->getMessage(), 'namespace does not exist') && false === strpos($resultException->getMessage(), 'ns doesn\'t exist')) {
233
                throw $resultException;
234
            }
235
            $result = $resultIfNamespaceError;
236
        }
237
238 23
        return $result;
239
    }
240
241
    /**
242
     * @param string|null $workerName
243
     * @param string|null $methodName
244
     * @param bool        $prioritize
245
     *
246
     * @return Builder
247
     */
248 14
    public function getJobQueryBuilder($workerName = null, $methodName = null, $prioritize = true)
249
    {
250
        /** @var DocumentManager $objectManager */
251 14
        $objectManager = $this->getObjectManager();
252 14
        $builder = $objectManager->createQueryBuilder($this->getJobClass());
253
254 14
        $this->addWorkerNameCriterion($builder, $workerName, $methodName);
255 14
        if ($prioritize) {
256 14
            $builder->sort([
257 14
                'priority' => 'desc',
258
                'whenAt' => 'asc',
259
            ]);
260
        } else {
261 1
            $builder->sort('whenAt', 'asc');
262
        }
263
264
        // Filter
265 14
        $this->addStandardPredicates($builder);
266
267 14
        return $builder;
268
    }
269
270 1
    protected function updateNearestBatch(\Dtc\QueueBundle\Model\Job $job)
271
    {
272
        /** @var DocumentManager $objectManager */
273 1
        $objectManager = $this->getObjectManager();
274 1
        $builder = $objectManager->createQueryBuilder($this->getJobClass());
275 1
        $builder->find();
276
277 1
        $builder->sort('whenAt', 'asc');
278 1
        $builder->field('status')->equals(BaseJob::STATUS_NEW)
279 1
            ->field('crcHash')->equals($job->getCrcHash());
280 1
        $oldJob = $this->runQuery($builder->getQuery(), 'getSingleResult');
281
282 1
        if (!$oldJob) {
283
            return null;
284
        }
285
286
        // Update priority or whenAt
287
        //  This makes sure if someone else is updating at the same time
288
        //  that we don't trounce their changes.
289 1
        $builder = $objectManager->createQueryBuilder($this->getJobClass());
290 1
        $builder->findAndUpdate();
291 1
        $builder->field('_id')->equals($oldJob->getId());
292 1
        $builder->field('priority')->lt($job->getPriority());
293 1
        $builder->field('priority')->set($job->getPriority());
294 1
        $this->runQuery($builder->getQuery(), 'execute');
295
296 1
        $builder = $objectManager->createQueryBuilder($this->getJobClass());
297 1
        $builder->findAndUpdate();
298 1
        $builder->field('_id')->equals($oldJob->getId());
299 1
        $builder->field('whenAt')->gt($job->getWhenAt());
300 1
        $builder->field('whenAt')->set($job->getWhenAt());
301 1
        $this->runQuery($builder->getQuery(), 'execute');
302
303 1
        if ($job->getWhenAt() < $oldJob->getWhenAt()) {
304 1
            $oldJob->setWhenAt($job->getWhenAt());
305
        }
306 1
        if ($job->getPriority() > $oldJob->getPriority()) {
307 1
            $oldJob->setPriority($job->getPriority());
308
        }
309
310 1
        return $oldJob;
311
    }
312
313
    /**
314
     * @param mixed $builder
315
     */
316 16
    protected function addStandardPredicates($builder)
317
    {
318 16
        $date = Util::getMicrotimeDateTime();
319
        $builder
320 16
            ->addAnd(
321 16
                $builder->expr()->addOr($builder->expr()->field('whenAt')->equals(null), $builder->expr()->field('whenAt')->lte($date)),
322 16
                $builder->expr()->addOr($builder->expr()->field('expiresAt')->equals(null), $builder->expr()->field('expiresAt')->gt($date))
323
            )
324 16
            ->field('status')->equals(BaseJob::STATUS_NEW);
325 16
    }
326
327 1
    public function getWorkersAndMethods()
328
    {
329
        /** @var DocumentManager $documentManager */
330 1
        $documentManager = $this->getObjectManager();
331
332 1
        if (!method_exists($documentManager, 'createAggregationBuilder')) {
333
            return [];
334
        }
335
336 1
        $aggregationBuilder = $documentManager->createAggregationBuilder($this->getJobClass());
337
338 1
        $this->addStandardPredicates($aggregationBuilder->match());
339
340 1
        $aggregationBuilder->group()
341 1
            ->field('id')
342 1
            ->expression(
343 1
                $aggregationBuilder->expr()
344 1
                ->field('workerName')->expression('$workerName')
345 1
                ->field('method')->expression('$method')
346
            );
347 1
        $results = $aggregationBuilder->execute()->toArray();
348
349 1
        if (!$results) {
0 ignored issues
show
Bug Best Practice introduced by
The expression $results of type array is implicitly converted to a boolean; are you sure this is intended? If so, consider using empty($expr) instead to make it clear that you intend to check for an array without elements.

This check marks implicit conversions of arrays to boolean values in a comparison. While in PHP an empty array is considered to be equal (but not identical) to false, this is not always apparent.

Consider making the comparison explicit by using empty(..) or ! empty(...) instead.

Loading history...
350 1
            return [];
351
        }
352
353
        $workersMethods = [];
354
        foreach ($results as $result) {
355
            if (isset($result['_id'])) {
356
                $workersMethods[$result['_id']['worker_name']][] = $result['_id']['method'];
357
            }
358
        }
359
360
        return $workersMethods;
361
    }
362
363
    /**
364
     * @param string $workerName
365
     * @param string $methodName
366
     */
367 2
    public function countLiveJobs($workerName = null, $methodName = null)
368
    {
369
        /** @var DocumentManager $objectManager */
370 2
        $objectManager = $this->getObjectManager();
371 2
        $builder = $objectManager->createQueryBuilder($this->getJobClass());
372
373 2
        $this->addWorkerNameCriterion($builder, $workerName, $methodName);
374
        // Filter
375 2
        $this->addStandardPredicates($builder);
376
377 2
        return $this->runQuery($builder->getQuery(), 'count', [], 0);
378
    }
379
380
    /**
381
     * @param string        $workerName
382
     * @param string        $methodName
383
     * @param callable|null $progressCallback
384
     */
385 1
    public function archiveAllJobs($workerName = null, $methodName = null, callable $progressCallback = null)
386
    {
387
        /** @var DocumentManager $documentManager */
388 1
        $documentManager = $this->getObjectManager();
389 1
        $count = 0;
390 1
        $builder = $this->getJobQueryBuilder($workerName, $methodName, true);
391
        $builder
392 1
            ->findAndUpdate()
393 1
            ->returnNew();
394
395 1
        $builder->field('status')->set(Job::STATUS_ARCHIVE);
396 1
        $query = $builder->getQuery();
397
        do {
398 1
            $job = $this->runQuery($query, 'execute');
399 1
            if ($job) {
400 1
                $documentManager->remove($job);
401 1
                ++$count;
402
403 1
                if (0 == $count % 10) {
404
                    $this->flush();
405
                    $this->updateProgress($progressCallback, $count);
406
                }
407
            }
408 1
        } while ($job);
409 1
        $this->flush();
410 1
        $this->updateProgress($progressCallback, $count);
411 1
    }
412
}
413