Completed
Pull Request — master (#43)
by Matthew
11:10
created

JobManager::addStandardPredicates()   A

Complexity

Conditions 1
Paths 1

Size

Total Lines 10
Code Lines 7

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 8
CRAP Score 1

Importance

Changes 0
Metric Value
dl 0
loc 10
ccs 8
cts 8
cp 1
rs 9.4285
c 0
b 0
f 0
cc 1
eloc 7
nc 1
nop 1
crap 1
1
<?php
2
3
namespace Dtc\QueueBundle\ODM;
4
5
use Doctrine\MongoDB\Exception\ResultException;
6
use Doctrine\MongoDB\Query\Builder;
7
use Dtc\QueueBundle\Doctrine\DoctrineJobManager;
8
use Doctrine\ODM\MongoDB\DocumentManager;
9
use Dtc\QueueBundle\Document\Job;
10
use Dtc\QueueBundle\Model\BaseJob;
11
use Dtc\QueueBundle\Util\Util;
12
13
class JobManager extends DoctrineJobManager
14
{
15
    use CommonTrait;
16
    const REDUCE_FUNCTION = 'function(k, vals) {
17
            var result = {};
18
            for (var index in vals) {
19
                var val = vals[index];
20
                for (var i in val) {
21
                    if (result.hasOwnProperty(i)) {
22
                        result[i] += val[i];
23
                    }
24
                    else {
25
                        result[i] = val[i];
26
                    }
27
                }
28
            }
29
            return result;
30
        }';
31
32 3
    public function countJobsByStatus($objectName, $status, $workerName = null, $method = null)
33
    {
34
        /** @var DocumentManager $objectManager */
35 3
        $objectManager = $this->getObjectManager();
36 3
        $qb = $objectManager->createQueryBuilder($objectName);
37
        $qb
38 3
            ->find()
39 3
            ->field('status')->equals($status);
40
41 3
        $this->addWorkerNameCriterion($qb, $workerName, $method);
42 3
        $query = $qb->getQuery();
43
44 3
        return $this->runQuery($query,'count', [], 0);
45
    }
46
47
    /**
48
     * @param string|null $workerName
49
     * @param string|null $method
50
     */
51 1
    public function pruneExceptionJobs($workerName = null, $method = null)
52
    {
53
        /** @var DocumentManager $objectManager */
54 1
        $objectManager = $this->getObjectManager();
55 1
        $qb = $objectManager->createQueryBuilder($this->getJobArchiveClass());
56 1
        $qb = $qb->remove();
57 1
        $qb->field('status')->equals(BaseJob::STATUS_EXCEPTION);
58 1
        $this->addWorkerNameCriterion($qb, $workerName, $method);
59
60 1
        $query = $qb->getQuery();
61 1
        $result = $this->runQuery($query, 'execute');
62 1
        if (isset($result['n'])) {
63 1
            return $result['n'];
64
        }
65
66
        return 0;
67
    }
68
69
    /**
70
     * @param Builder     $builder
71
     * @param string|null $workerName
72
     * @param string|null $method
73
     */
74 18
    protected function addWorkerNameCriterion(Builder $builder, $workerName = null, $method = null)
75
    {
76 18
        if (null !== $workerName) {
77 6
            $builder->field('workerName')->equals($workerName);
78 6
        }
79
80 18
        if (null !== $method) {
81 5
            $builder->field('method')->equals($method);
82 5
        }
83 18
    }
84
85
    /**
86
     * @param null $workerName
87
     * @param null $method
88
     *
89
     * @return int
90
     */
91 2
    protected function updateExpired($workerName = null, $method = null)
92
    {
93
        /** @var DocumentManager $objectManager */
94 2
        $objectManager = $this->getObjectManager();
95 2
        $qb = $objectManager->createQueryBuilder($this->getJobClass());
96 2
        $qb = $qb->updateMany();
97 2
        $qb->field('expiresAt')->lte(Util::getMicrotimeDateTime());
98 2
        $qb->field('status')->equals(BaseJob::STATUS_NEW);
99 2
        $this->addWorkerNameCriterion($qb, $workerName, $method);
100 2
        $qb->field('status')->set(\Dtc\QueueBundle\Model\Job::STATUS_EXPIRED);
101 2
        $query = $qb->getQuery();
102 2
        $result = $this->runQuery($query, 'execute');
103 2
        if (isset($result['n'])) {
104 2
            return $result['n'];
105
        }
106
107
        return 0;
108
    }
109
110
    /**
111
     * Removes archived jobs older than $olderThan.
112
     *
113
     * @param \DateTime $olderThan
114
     *
115
     * @return int
116
     */
117 1
    public function pruneArchivedJobs(\DateTime $olderThan)
118
    {
119 1
        return $this->removeOlderThan($this->getJobArchiveClass(), 'updatedAt', $olderThan);
120
    }
121
122 2 View Code Duplication
    public function getWaitingJobCount($workerName = null, $method = null)
0 ignored issues
show
Duplication introduced by
This method seems to be duplicated in your project.

Duplicated code is one of the most pungent code smells. If you need to duplicate the same code in three or more different places, we strongly encourage you to look into extracting the code into a single class or operation.

You can also find more detailed suggestions in the “Code” section of your repository.

Loading history...
123
    {
124
        /** @var DocumentManager $objectManager */
125 2
        $objectManager = $this->getObjectManager();
126 2
        $builder = $objectManager->createQueryBuilder($this->getJobClass());
127
        $builder
128 2
            ->find();
129
130 2
        $this->addWorkerNameCriterion($builder, $workerName, $method);
131 2
        $this->addStandardPredicates($builder);
132
133 2
        $query = $builder->getQuery();
134 2
        return $this->runQuery($query, 'count', [true], 0);
135
    }
136
137
    /**
138
     * Get Status Jobs.
139
     *
140
     * @param string $documentName
141
     *
142
     * @return array
143
     */
144 3
    protected function getStatusByDocument($documentName)
145
    {
146
        // Run a map reduce function get worker and status break down
147
        $mapFunc = "function() {
148
            var result = {};
149
            result[this.status] = 1;
150
            var key = this.worker_name + '->' + this.method + '()';
151
            emit(key, result);
152 3
        }";
153 3
        $reduceFunc = self::REDUCE_FUNCTION;
154
        /** @var DocumentManager $objectManager */
155 3
        $objectManager = $this->getObjectManager();
156 3
        $builder = $objectManager->createQueryBuilder($documentName);
157 3
        $builder->map($mapFunc)
158 3
            ->reduce($reduceFunc);
159 3
        $query = $builder->getQuery();
160 3
        $results = $this->runQuery($query, 'execute', [], []);
161 3
        $allStatus = static::getAllStatuses();
162
163 3
        $status = [];
164
165 3
        foreach ($results as $info) {
166 1
            $status[$info['_id']] = $info['value'] + $allStatus;
167 3
        }
168
169 3
        return $status;
170
    }
171
172 3
    public function getStatus()
173
    {
174 3
        $result = $this->getStatusByDocument($this->getJobClass());
175 3
        $status2 = $this->getStatusByDocument($this->getJobArchiveClass());
176 3
        foreach ($status2 as $key => $value) {
177 1
            foreach ($value as $k => $v) {
178 1
                if (isset($result[$key][$k])) {
179 1
                    $result[$key][$k] += $v;
180 1
                } else {
181
                    $result[$key][$k] = $v;
182
                }
183 1
            }
184 3
        }
185
186 3
        $finalResult = [];
187 3
        foreach ($result as $key => $item) {
188 1
            ksort($item);
189 1
            $finalResult[$key] = $item;
190 3
        }
191
192 3
        return $finalResult;
193
    }
194
195
    /**
196
     * Get the next job to run (can be filtered by workername and method name).
197
     *
198
     * @param string      $workerName
199
     * @param string      $methodName
200
     * @param bool        $prioritize
201
     * @param string|null $runId
202
     *
203
     * @return \Dtc\QueueBundle\Model\Job
204
     */
205 12
    public function getJob($workerName = null, $methodName = null, $prioritize = true, $runId = null)
206
    {
207 12
        $builder = $this->getJobQueryBuilder($workerName, $methodName, $prioritize);
208
        $builder
209 12
            ->findAndUpdate()
210 12
            ->returnNew();
211
212 12
        $date = Util::getMicrotimeDateTime();
213
        // Update
214
        $builder
215 12
            ->field('startedAt')->set($date)
216 12
            ->field('status')->set(BaseJob::STATUS_RUNNING)
217 12
            ->field('runId')->set($runId);
218
219 12
        $query = $builder->getQuery();
220
221 12
        $job = $this->runQuery($query, 'execute');
222 12
        return $job;
223
    }
224
225 21
    protected function runQuery(\Doctrine\MongoDB\Query\Query $query, $method, array $arguments = [], $resultIfNamespaceError = null) {
226
        try {
227 21
            $result = call_user_func_array([$query, $method], $arguments);
228
        }
229 21
        catch (ResultException $resultException) {
230
            if (strpos($resultException->getMessage(), 'namespace does not exist') === false) {
231
                throw $resultException;
232
            }
233
            $result = $resultIfNamespaceError;
234
        }
235 21
        return $result;
236
    }
237
238
    /**
239
     * @param string|null $workerName
240
     * @param string|null $methodName
241
     * @param bool        $prioritize
242
     *
243
     * @return Builder
244
     */
245 13
    public function getJobQueryBuilder($workerName = null, $methodName = null, $prioritize = true)
246
    {
247
        /** @var DocumentManager $objectManager */
248 12
        $objectManager = $this->getObjectManager();
249 12
        $builder = $objectManager->createQueryBuilder($this->getJobClass());
250
251 12
        $this->addWorkerNameCriterion($builder, $workerName, $methodName);
252 12
        if ($prioritize) {
253 13
            $builder->sort([
254 12
                'priority' => 'desc',
255 12
                'whenAt' => 'asc',
256 12
            ]);
257 12
        } else {
258 1
            $builder->sort('whenAt', 'asc');
259
        }
260
261
        // Filter
262 12
        $this->addStandardPredicates($builder);
263
264 12
        return $builder;
265
    }
266
267 1
    protected function updateNearestBatch(\Dtc\QueueBundle\Model\Job $job)
268
    {
269
        /** @var DocumentManager $objectManager */
270 1
        $objectManager = $this->getObjectManager();
271 1
        $builder = $objectManager->createQueryBuilder($this->getJobClass());
272 1
        $builder->find();
273
274 1
        $builder->sort('whenAt', 'asc');
275 1
        $builder->field('status')->equals(BaseJob::STATUS_NEW)
276 1
            ->field('crcHash')->equals($job->getCrcHash());
277 1
        $oldJob = $this->runQuery($builder->getQuery(), 'getSingleResult');
278
279 1
        if (!$oldJob) {
280
            return null;
281
        }
282
283
        // Update priority or whenAt
284
        //  This makes sure if someone else is updating at the same time
285
        //  that we don't trounce their changes.
286 1
        $builder = $objectManager->createQueryBuilder($this->getJobClass());
287 1
        $builder->findAndUpdate();
288 1
        $builder->field('_id')->equals($oldJob->getId());
289 1
        $builder->field('priority')->lt($job->getPriority());
290 1
        $builder->field('priority')->set($job->getPriority());
291 1
        $this->runQuery($builder->getQuery(), 'execute');
292
293 1
        $builder = $objectManager->createQueryBuilder($this->getJobClass());
294 1
        $builder->findAndUpdate();
295 1
        $builder->field('_id')->equals($oldJob->getId());
296 1
        $builder->field('whenAt')->gt($job->getWhenAt());
297 1
        $builder->field('whenAt')->set($job->getWhenAt());
298 1
        $this->runQuery($builder->getQuery(), 'execute');
299
300 1
        if ($job->getWhenAt() < $oldJob->getWhenAt()) {
301 1
            $oldJob->setWhenAt($job->getWhenAt());
302 1
        }
303 1
        if ($job->getPriority() > $oldJob->getPriority()) {
304 1
            $oldJob->setPriority($job->getPriority());
305 1
        }
306
307 1
        return $oldJob;
308
    }
309
310
    /**
311
     * @param mixed $builder
312
     */
313 14
    protected function addStandardPredicates($builder)
314
    {
315 14
        $date = Util::getMicrotimeDateTime();
316
        $builder
317 14
            ->addAnd(
318 14
                $builder->expr()->addOr($builder->expr()->field('whenAt')->equals(null), $builder->expr()->field('whenAt')->lte($date)),
319 14
                $builder->expr()->addOr($builder->expr()->field('expiresAt')->equals(null), $builder->expr()->field('expiresAt')->gt($date))
320 14
            )
321 14
            ->field('status')->equals(BaseJob::STATUS_NEW);
322 14
    }
323
324 1
    public function getWorkersAndMethods()
325
    {
326
        /** @var DocumentManager $documentManager */
327 1
        $documentManager = $this->getObjectManager();
328
329 1
        if (!method_exists($documentManager, 'createAggregationBuilder')) {
330
            return [];
331
        }
332
333 1
        $aggregationBuilder = $documentManager->createAggregationBuilder($this->getJobClass());
334
335 1
        $this->addStandardPredicates($aggregationBuilder->match());
336
337 1
        $aggregationBuilder->group()
338 1
            ->field('id')
339 1
            ->expression(
340 1
                $aggregationBuilder->expr()
341 1
                ->field('workerName')->expression('$workerName')
342 1
                ->field('method')->expression('$method')
343 1
            );
344 1
        $results = $aggregationBuilder->execute()->toArray();
345
346 1
        if (!$results) {
347 1
            return [];
348
        }
349
350
        $workersMethods = [];
351
        foreach ($results as $result) {
352
            if (isset($result['_id'])) {
353
                $workersMethods[$result['_id']['worker_name']][] = $result['_id']['method'];
354
            }
355
        }
356
357
        return $workersMethods;
358
    }
359
360
    /**
361
     * @param string $workerName
362
     * @param string $methodName
363
     */
364 2
    public function countLiveJobs($workerName = null, $methodName = null)
365
    {
366
        /** @var DocumentManager $objectManager */
367 2
        $objectManager = $this->getObjectManager();
368 2
        $builder = $objectManager->createQueryBuilder($this->getJobClass());
369
370 2
        $this->addWorkerNameCriterion($builder, $workerName, $methodName);
371
        // Filter
372 2
        $this->addStandardPredicates($builder);
373
374 2
        return $this->runQuery($builder->getQuery(), 'count', [], 0);
375
    }
376
377
    /**
378
     * @param string        $workerName
379
     * @param string        $methodName
380
     * @param callable|null $progressCallback
381
     */
382 2
    public function archiveAllJobs($workerName = null, $methodName = null, callable $progressCallback = null)
383
    {
384
        /** @var DocumentManager $documentManager */
385 1
        $documentManager = $this->getObjectManager();
386 1
        $count = 0;
387 1
        $builder = $this->getJobQueryBuilder($workerName, $methodName, true);
388
        $builder
389 1
            ->findAndUpdate()
390 1
            ->returnNew();
391
392 2
        $builder->field('status')->set(Job::STATUS_ARCHIVE);
393 1
        $query = $builder->getQuery();
394
        do {
395 1
            $job = $this->runQuery($query, 'execute');
396 1
            if ($job) {
397 1
                $documentManager->remove($job);
398 1
                ++$count;
399
400 1
                if (0 == $count % 10) {
401
                    $this->flush();
402
                    $this->updateProgress($progressCallback, $count);
403
                }
404 1
            }
405 1
        } while ($job);
406 1
        $this->flush();
407 1
        $this->updateProgress($progressCallback, $count);
408 1
    }
409
}
410