Completed
Pull Request — master (#40)
by Matthew
17:20
created

JobManager::getJob()   A

Complexity

Conditions 1
Paths 1

Size

Total Lines 20
Code Lines 13

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 7
CRAP Score 1

Importance

Changes 1
Bugs 1 Features 0
Metric Value
dl 0
loc 20
ccs 7
cts 7
cp 1
rs 9.4285
c 1
b 1
f 0
cc 1
eloc 13
nc 1
nop 4
crap 1
1
<?php
2
3
namespace Dtc\QueueBundle\ODM;
4
5
use Doctrine\MongoDB\Query\Builder;
6
use Dtc\QueueBundle\Doctrine\DoctrineJobManager;
7
use Doctrine\ODM\MongoDB\DocumentManager;
8
use Dtc\QueueBundle\Document\Job;
9
use Dtc\QueueBundle\Model\BaseJob;
10
use Dtc\QueueBundle\Util\Util;
11
12
class JobManager extends DoctrineJobManager
13
{
14
    use CommonTrait;
15
    const REDUCE_FUNCTION = 'function(k, vals) {
16
            var result = {};
17
            for (var index in vals) {
18
                var val = vals[index];
19
                for (var i in val) {
20
                    if (result.hasOwnProperty(i)) {
21
                        result[i] += val[i];
22
                    }
23
                    else {
24
                        result[i] = val[i];
25
                    }
26
                }
27
            }
28
            return result;
29
        }';
30
31
    public function countJobsByStatus($objectName, $status, $workerName = null, $method = null)
32
    {
33 3
        /** @var DocumentManager $objectManager */
34
        $objectManager = $this->getObjectManager();
35
        $qb = $objectManager->createQueryBuilder($objectName);
36 3
        $qb
37 3
            ->find()
38
            ->field('status')->equals($status);
39 3
40 3
        $this->addWorkerNameCriterion($qb, $workerName, $method);
41
        $query = $qb->getQuery();
42 3
43 3
        return $query->count();
44
    }
45 3
46
    /**
47
     * @param string|null $workerName
48
     * @param string|null $method
49
     */
50
    public function pruneExceptionJobs($workerName = null, $method = null)
51
    {
52 1
        /** @var DocumentManager $objectManager */
53
        $objectManager = $this->getObjectManager();
54
        $qb = $objectManager->createQueryBuilder($this->getJobArchiveClass());
55 1
        $qb = $qb->remove();
56 1
        $qb->field('status')->equals(BaseJob::STATUS_EXCEPTION);
57 1
        $this->addWorkerNameCriterion($qb, $workerName, $method);
58 1
59 1
        $query = $qb->getQuery();
60
        $result = $query->execute();
61 1
        if (isset($result['n'])) {
62 1
            return $result['n'];
63 1
        }
64 1
65
        return 0;
66
    }
67
68
    /**
69
     * @param Builder     $builder
70
     * @param string|null $workerName
71
     * @param string|null $method
72
     */
73
    protected function addWorkerNameCriterion(Builder $builder, $workerName = null, $method = null)
74
    {
75 18
        if (null !== $workerName) {
76
            $builder->field('workerName')->equals($workerName);
77 18
        }
78 6
79 6
        if (null !== $method) {
80
            $builder->field('method')->equals($method);
81 18
        }
82 5
    }
83 5
84 18
    /**
85
     * @param null $workerName
86
     * @param null $method
87
     *
88
     * @return int
89
     */
90
    protected function updateExpired($workerName = null, $method = null)
91
    {
92 2
        /** @var DocumentManager $objectManager */
93
        $objectManager = $this->getObjectManager();
94
        $qb = $objectManager->createQueryBuilder($this->getJobClass());
95 2
        $qb = $qb->updateMany();
96 2
        $qb->field('expiresAt')->lte(Util::getMicrotimeDateTime());
97 2
        $qb->field('status')->equals(BaseJob::STATUS_NEW);
98 2
        $this->addWorkerNameCriterion($qb, $workerName, $method);
99 2
        $qb->field('status')->set(\Dtc\QueueBundle\Model\Job::STATUS_EXPIRED);
100 2
        $query = $qb->getQuery();
101 2
        $result = $query->execute();
102 2
        if (isset($result['n'])) {
103 2
            return $result['n'];
104 2
        }
105 2
106
        return 0;
107
    }
108
109
    /**
110
     * Removes archived jobs older than $olderThan.
111
     *
112
     * @param \DateTime $olderThan
113
     *
114
     * @return int
115
     */
116
    public function pruneArchivedJobs(\DateTime $olderThan)
117
    {
118 1
        return $this->removeOlderThan($this->getJobArchiveClass(), 'updatedAt', $olderThan);
119
    }
120 1
121 View Code Duplication
    public function getWaitingJobCount($workerName = null, $method = null)
0 ignored issues
show
Duplication introduced by
This method seems to be duplicated in your project.

Duplicated code is one of the most pungent code smells. If you need to duplicate the same code in three or more different places, we strongly encourage you to look into extracting the code into a single class or operation.

You can also find more detailed suggestions in the “Code” section of your repository.

Loading history...
122
    {
123 2
        /** @var DocumentManager $objectManager */
124
        $objectManager = $this->getObjectManager();
125
        $builder = $objectManager->createQueryBuilder($this->getJobClass());
126 2
        $builder
127 2
            ->find();
128
129 2
        $this->addWorkerNameCriterion($builder, $workerName, $method);
130
        $this->addStandardPredicates($builder);
131 2
132
        $query = $builder->getQuery();
133
134 2
        return $query->count(true);
135
    }
136 2
137 2
    /**
138 2
     * Get Status Jobs.
139 2
     *
140
     * @param string $documentName
141 2
     *
142
     * @return array
143
     */
144
    protected function getStatusByDocument($documentName)
145
    {
146
        // Run a map reduce function get worker and status break down
147
        $mapFunc = "function() {
148
            var result = {};
149
            result[this.status] = 1;
150
            var key = this.worker_name + '->' + this.method + '()';
151 3
            emit(key, result);
152
        }";
153
        $reduceFunc = self::REDUCE_FUNCTION;
154
        /** @var DocumentManager $objectManager */
155
        $objectManager = $this->getObjectManager();
156
        $builder = $objectManager->createQueryBuilder($documentName);
157
        $builder->map($mapFunc)
158
            ->reduce($reduceFunc);
159 3
        $query = $builder->getQuery();
160 3
        $results = $query->execute();
161
162 3
        $allStatus = static::getAllStatuses();
163 3
164 3
        $status = [];
165 3
166 3
        foreach ($results as $info) {
167 3
            $status[$info['_id']] = $info['value'] + $allStatus;
168
        }
169
170 3
        return $status;
171 3
    }
172 3
173 3
    public function getStatus()
174 3
    {
175 3
        $result = $this->getStatusByDocument($this->getJobClass());
176 3
        $status2 = $this->getStatusByDocument($this->getJobArchiveClass());
177 3
        foreach ($status2 as $key => $value) {
178 3
            foreach ($value as $k => $v) {
179 3
                if (isset($result[$key][$k])) {
180 3
                    $result[$key][$k] += $v;
181 3
                } else {
182
                    $result[$key][$k] = $v;
183 3
                }
184
            }
185 3
        }
186 1
187 3
        $finalResult = [];
188
        foreach ($result as $key => $item) {
189 3
            ksort($item);
190
            $finalResult[$key] = $item;
191
        }
192 3
193
        return $finalResult;
194 3
    }
195 3
196 3
    /**
197 1
     * Get the next job to run (can be filtered by workername and method name).
198 1
     *
199 1
     * @param string      $workerName
200 1
     * @param string      $methodName
201
     * @param bool        $prioritize
202
     * @param string|null $runId
203 1
     *
204 3
     * @return \Dtc\QueueBundle\Model\Job
205
     */
206 3
    public function getJob($workerName = null, $methodName = null, $prioritize = true, $runId = null)
207 3
    {
208 1
        $builder = $this->getJobQueryBuilder($workerName, $methodName, $prioritize);
209 1
        $builder
210 3
            ->findAndUpdate()
211
            ->returnNew();
212 3
213
        $date = Util::getMicrotimeDateTime();
214
        // Update
215
        $builder
216
            ->field('startedAt')->set($date)
217
            ->field('status')->set(BaseJob::STATUS_RUNNING)
218
            ->field('runId')->set($runId);
219
220
        $query = $builder->getQuery();
221
222
        $job = $query->execute();
223
224
        return $job;
225 11
    }
226
227 11
    /**
228
     * @param string|null $workerName
229 11
     * @param string|null $methodName
230 11
     * @param bool        $prioritize
231
     *
232 11
     * @return Builder
233
     */
234
    public function getJobQueryBuilder($workerName = null, $methodName = null, $prioritize = true)
235 11
    {
236 11
        /** @var DocumentManager $objectManager */
237 11
        $objectManager = $this->getObjectManager();
238
        $builder = $objectManager->createQueryBuilder($this->getJobClass());
239 11
240
        $this->addWorkerNameCriterion($builder, $workerName, $methodName);
241 11
        if ($prioritize) {
242
            $builder->sort([
243 11
                'priority' => 'desc',
244
                'whenAt' => 'asc',
245
            ]);
246
        } else {
247
            $builder->sort('whenAt', 'asc');
248
        }
249
250
        // Filter
251
        $this->addStandardPredicates($builder);
252
253 11
        return $builder;
254
    }
255
256 11
    protected function updateNearestBatch(\Dtc\QueueBundle\Model\Job $job)
257 11
    {
258
        /** @var DocumentManager $objectManager */
259 11
        $objectManager = $this->getObjectManager();
260 11
        $builder = $objectManager->createQueryBuilder($this->getJobClass());
261 11
        $builder->find();
262 11
263 11
        $builder->sort('whenAt', 'asc');
264 11
        $builder->field('status')->equals(BaseJob::STATUS_NEW)
265 11
            ->field('crcHash')->equals($job->getCrcHash());
266 1
        $oldJob = $builder->getQuery()->getSingleResult();
267
268
        if (!$oldJob) {
269
            return null;
270 11
        }
271
272 11
        // Update priority or whenAt
273
        //  This makes sure if someone else is updating at the same time
274
        //  that we don't trounce their changes.
275 1
        $builder = $objectManager->createQueryBuilder($this->getJobClass());
276
        $builder->findAndUpdate();
277
        $builder->field('_id')->equals($oldJob->getId());
278 1
        $builder->field('priority')->lt($job->getPriority());
279 1
        $builder->field('priority')->set($job->getPriority());
280 1
        $builder->getQuery()->execute();
281
282 1
        $builder = $objectManager->createQueryBuilder($this->getJobClass());
283 1
        $builder->findAndUpdate();
284 1
        $builder->field('_id')->equals($oldJob->getId());
285 1
        $builder->field('whenAt')->gt($job->getWhenAt());
286
        $builder->field('whenAt')->set($job->getWhenAt());
287 1
        $builder->getQuery()->execute();
288
289
        if ($job->getWhenAt() < $oldJob->getWhenAt()) {
290
            $oldJob->setWhenAt($job->getWhenAt());
291
        }
292
        if ($job->getPriority() > $oldJob->getPriority()) {
293
            $oldJob->setPriority($job->getPriority());
294 1
        }
295 1
296 1
        return $oldJob;
297 1
    }
298 1
299 1
    /**
300
     * @param mixed $builder
301 1
     */
302 1
    protected function addStandardPredicates($builder)
303 1
    {
304 1
        $date = Util::getMicrotimeDateTime();
305 1
        $builder
306 1
            ->addAnd(
307
                $builder->expr()->addOr($builder->expr()->field('whenAt')->equals(null), $builder->expr()->field('whenAt')->lte($date)),
308 1
                $builder->expr()->addOr($builder->expr()->field('expiresAt')->equals(null), $builder->expr()->field('expiresAt')->gt($date))
309 1
            )
310 1
            ->field('status')->equals(BaseJob::STATUS_NEW);
311 1
    }
312 1
313 1
    public function getWorkersAndMethods()
314
    {
315 1
        /** @var DocumentManager $documentManager */
316
        $documentManager = $this->getObjectManager();
317
318
        if (!method_exists($documentManager, 'createAggregationBuilder')) {
319
            return [];
320
        }
321 12
322
        $aggregationBuilder = $documentManager->createAggregationBuilder($this->getJobClass());
323 12
324
        $this->addStandardPredicates($aggregationBuilder->match());
325 12
326 12
        $aggregationBuilder->group()
327 12
            ->field('id')
328 12
            ->expression(
329 12
                $aggregationBuilder->expr()
330 12
                ->field('workerName')->expression('$workerName')
331
                ->field('method')->expression('$method')
332 1
            );
333
        $results = $aggregationBuilder->execute()->toArray();
334
335 1
        if (!$results) {
336
            return [];
337 1
        }
338
339
        $workersMethods = [];
340
        foreach ($results as $result) {
341 1
            if (isset($result['_id'])) {
342
                $workersMethods[$result['_id']['worker_name']][] = $result['_id']['method'];
343 1
            }
344
        }
345 1
346 1
        return $workersMethods;
347 1
    }
348 1
349 1
    /**
350 1
     * @param string $workerName
351 1
     * @param string $methodName
352 1
     */
353
    public function countLiveJobs($workerName = null, $methodName = null)
354 1
    {
355 1
        /** @var DocumentManager $objectManager */
356
        $objectManager = $this->getObjectManager();
357
        $builder = $objectManager->createQueryBuilder($this->getJobClass());
358
359
        $this->addWorkerNameCriterion($builder, $workerName, $methodName);
360
        // Filter
361
        $this->addStandardPredicates($builder);
362
363
        return $builder->getQuery()->count();
364
    }
365
366
    /**
367
     * @param string        $workerName
368
     * @param string        $methodName
369
     * @param callable|null $progressCallback
370
     */
371
    public function archiveAllJobs($workerName = null, $methodName = null, callable $progressCallback = null)
372 3
    {
373
        /** @var DocumentManager $documentManager */
374
        $documentManager = $this->getObjectManager();
375 2
        $count = 0;
376 2
        $builder = $this->getJobQueryBuilder($workerName, $methodName, true);
377
        $builder
378 2
            ->findAndUpdate()
379
            ->returnNew();
380 3
381
        $builder->field('status')->set(Job::STATUS_ARCHIVE);
382 2
        $query = $builder->getQuery();
383
        do {
384
            $job = $query->execute();
385
            if ($job) {
386
                $documentManager->remove($job);
387
                ++$count;
388
389
                if (0 == $count % 10) {
390 1
                    $this->flush();
391 1
                    $this->updateProgress($progressCallback, $count);
392
                }
393 1
            }
394 1
        } while ($job);
395 1
        $this->flush();
396
        $this->updateProgress($progressCallback, $count);
397 1
    }
398
}
399