Completed
Branch master (38bcb9)
by Matthew
02:36
created

JobManager::getStatusByDocument()   B

Complexity

Conditions 2
Paths 2

Size

Total Lines 37
Code Lines 22

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 13
CRAP Score 2.0014

Importance

Changes 0
Metric Value
dl 0
loc 37
ccs 13
cts 14
cp 0.9286
rs 8.8571
c 0
b 0
f 0
cc 2
eloc 22
nc 2
nop 1
crap 2.0014
1
<?php
2
3
namespace Dtc\QueueBundle\ODM;
4
5
use Doctrine\MongoDB\Query\Builder;
6
use Dtc\QueueBundle\Doctrine\BaseJobManager;
7
use Doctrine\ODM\MongoDB\DocumentManager;
8
use Dtc\QueueBundle\Document\Job;
9
use Dtc\QueueBundle\Model\BaseJob;
10
use Dtc\QueueBundle\Model\RetryableJob;
11
12
class JobManager extends BaseJobManager
13
{
14
    use CommonTrait;
15
    const REDUCE_FUNCTION = 'function(k, vals) {
16
            var result = {};
17
            for (var index in vals) {
18
                var val = vals[index];
19
                for (var i in val) {
20
                    if (result.hasOwnProperty(i)) {
21
                        result[i] += val[i];
22
                    }
23
                    else {
24
                        result[i] = val[i];
25
                    }
26
                }
27
            }
28
            return result;
29
        }';
30
31
    public function countJobsByStatus($objectName, $status, $workerName = null, $method = null)
32
    {
33
        /** @var DocumentManager $objectManager */
34
        $objectManager = $this->getObjectManager();
35
        $qb = $objectManager->createQueryBuilder($objectName);
36
        $qb
37
            ->find()
38
            ->field('status')->equals($status);
39
40
        $this->addWorkerNameCriterion($qb, $workerName, $method);
41
        $query = $qb->getQuery();
42
43
        return $query->count();
44
    }
45
46
    /**
47
     * @param string|null $workerName
48
     * @param string|null $method
49
     */
50
    public function pruneErroneousJobs($workerName = null, $method = null)
51
    {
52
        /** @var DocumentManager $objectManager */
53
        $objectManager = $this->getObjectManager();
54
        $qb = $objectManager->createQueryBuilder($this->getJobArchiveClass());
55
        $qb = $qb->remove();
56
        $qb->field('status')->equals(BaseJob::STATUS_ERROR);
57
        $this->addWorkerNameCriterion($qb, $workerName, $method);
58
59
        $query = $qb->getQuery();
60
        $result = $query->execute();
61
        if (isset($result['n'])) {
62
            return $result['n'];
63
        }
64
65
        return 0;
66
    }
67
68
    /**
69
     * @param Builder     $builder
70
     * @param string|null $workerName
71
     * @param string|null $method
72
     */
73 2
    protected function addWorkerNameCriterion(Builder $builder, $workerName = null, $method = null)
74
    {
75 2
        if (null !== $workerName) {
76
            $builder->field('workerName')->equals($workerName);
77
        }
78
79 2
        if (null !== $method) {
80
            $builder->field('method')->equals($method);
81
        }
82 2
    }
83
84
    /**
85
     * @param null $workerName
86
     * @param null $method
87
     *
88
     * @return int
89
     */
90 1
    protected function updateExpired($workerName = null, $method = null)
91
    {
92
        /** @var DocumentManager $objectManager */
93 1
        $objectManager = $this->getObjectManager();
94 1
        $qb = $objectManager->createQueryBuilder($this->getJobClass());
95 1
        $qb = $qb->updateMany();
96 1
        $qb->field('expiresAt')->lte(new \DateTime());
97 1
        $qb->field('status')->equals(BaseJob::STATUS_NEW);
98 1
        $this->addWorkerNameCriterion($qb, $workerName, $method);
99 1
        $qb->field('status')->set(\Dtc\QueueBundle\Model\Job::STATUS_EXPIRED);
100 1
        $query = $qb->getQuery();
101 1
        $result = $query->execute();
102 1
        if (isset($result['n'])) {
103 1
            return $result['n'];
104
        }
105
106
        return 0;
107
    }
108
109
    /**
110
     * Removes archived jobs older than $olderThan.
111
     *
112
     * @param \DateTime $olderThan
113
     *
114
     * @return int
115
     */
116
    public function pruneArchivedJobs(\DateTime $olderThan)
117
    {
118
        return $this->removeOlderThan($this->getJobArchiveClass(), 'updatedAt', $olderThan);
119
    }
120
121 1
    public function getJobCount($workerName = null, $method = null)
122
    {
123
        /** @var DocumentManager $objectManager */
124 1
        $objectManager = $this->getObjectManager();
125 1
        $qb = $objectManager->createQueryBuilder($this->getJobClass());
126
        $qb
127 1
            ->find();
128
129 1
        $this->addWorkerNameCriterion($qb, $workerName, $method);
130
131
        // Filter
132 1
        $date = new \DateTime();
133
        $qb
134 1
            ->addAnd(
135 1
                $qb->expr()->addOr($qb->expr()->field('expiresAt')->equals(null), $qb->expr()->field('expiresAt')->gt($date))
136
            )
137 1
            ->field('locked')->equals(null);
138
139 1
        $query = $qb->getQuery();
140
141 1
        return $query->count(true);
142
    }
143
144
    /**
145
     * Get Status Jobs.
146
     *
147
     * @param string $documentName
148
     *
149
     * @return array
150
     */
151 1
    protected function getStatusByDocument($documentName)
152
    {
153
        // Run a map reduce function get worker and status break down
154 1
        $mapFunc = "function() {
155
            var result = {};
156
            result[this.status] = 1;
157
            var key = this.worker_name + '->' + this.method + '()';
158
            emit(key, result);
159
        }";
160 1
        $reduceFunc = self::REDUCE_FUNCTION;
161
        /** @var DocumentManager $objectManager */
162 1
        $objectManager = $this->getObjectManager();
163 1
        $qb = $objectManager->createQueryBuilder($documentName);
164 1
        $qb->map($mapFunc)
165 1
            ->reduce($reduceFunc);
166 1
        $query = $qb->getQuery();
167 1
        $results = $query->execute();
168
169
        $allStatus = array(
170 1
            BaseJob::STATUS_ERROR => 0,
171
            BaseJob::STATUS_NEW => 0,
172
            RetryableJob::STATUS_EXPIRED => 0,
173
            RetryableJob::STATUS_MAX_ERROR => 0,
174
            RetryableJob::STATUS_MAX_RETRIES => 0,
175
            RetryableJob::STATUS_MAX_STALLED => 0,
176
            BaseJob::STATUS_RUNNING => 0,
177
            BaseJob::STATUS_SUCCESS => 0,
178
        );
179
180 1
        $status = [];
181
182 1
        foreach ($results as $info) {
183
            $status[$info['_id']] = $info['value'] + $allStatus;
184
        }
185
186 1
        return $status;
187
    }
188
189 1
    public function getStatus()
190
    {
191 1
        $result = $this->getStatusByDocument($this->getJobClass());
192 1
        $status2 = $this->getStatusByDocument($this->getJobArchiveClass());
193 1
        foreach ($status2 as $key => $value) {
194
            foreach ($value as $k => $v) {
195
                if (isset($result[$key][$k])) {
196
                    $result[$key][$k] += $v;
197
                } else {
198
                    $result[$key][$k] = $v;
199
                }
200
            }
201
        }
202
203 1
        $finalResult = [];
204 1
        foreach ($result as $key => $item) {
205
            ksort($item);
206
            $finalResult[$key] = $item;
207
        }
208
209 1
        return $finalResult;
210
    }
211
212
    /**
213
     * Get the next job to run (can be filtered by workername and method name).
214
     *
215
     * @param string      $workerName
216
     * @param string      $methodName
217
     * @param bool        $prioritize
218
     * @param string|null $runId
219
     *
220
     * @return \Dtc\QueueBundle\Model\Job
221
     */
222
    public function getJob($workerName = null, $methodName = null, $prioritize = true, $runId = null)
223
    {
224
        $builder = $this->getJobQueryBuilder($workerName, $methodName, $prioritize);
225
        $builder
226
            ->findAndUpdate()
227
            ->returnNew();
228
229
        $date = new \DateTime();
230
        // Update
231
        $builder
232
            ->field('lockedAt')->set($date) // Set started
233
            ->field('locked')->set(true)
234
            ->field('status')->set(BaseJob::STATUS_RUNNING)
235
            ->field('runId')->set($runId);
236
237
        $query = $builder->getQuery();
238
239
        $job = $query->execute();
240
241
        return $job;
242
    }
243
244
    /**
245
     * @param string|null $workerName
246
     * @param string|null $methodName
247
     * @param bool $prioritize
248
     *
249
     * @return Builder
250
     */
251
    public function getJobQueryBuilder($workerName = null, $methodName = null, $prioritize = true)
252
    {
253
        /** @var DocumentManager $objectManager */
254
        $objectManager = $this->getObjectManager();
255
        $builder = $objectManager->createQueryBuilder($this->getJobClass());
256
257
        $this->addWorkerNameCriterion($builder, $workerName, $methodName);
258
        if ($prioritize) {
259
            $builder->sort([
260
                'priority' => 'desc',
261
                'whenAt' => 'asc',
262
            ]);
263
        } else {
264
            $builder->sort('whenAt', 'asc');
265
        }
266
267
        // Filter
268
        $this->addStandardPredicates($builder);
269
270
        return $builder;
271
    }
272
273
    protected function updateNearestBatch(\Dtc\QueueBundle\Model\Job $job)
274
    {
275
        /** @var DocumentManager $objectManager */
276
        $objectManager = $this->getObjectManager();
277
        $builder = $objectManager->createQueryBuilder($this->getJobClass());
278
        $builder->find();
279
280
        $builder->sort('whenAt', 'asc');
281
        $builder->field('status')->equals(BaseJob::STATUS_NEW)
282
            ->field('crcHash')->equals($job->getCrcHash())
283
            ->field('locked')->equals(null);
284
        $oldJob = $builder->getQuery()->getSingleResult();
285
286
        if (!$oldJob) {
287
            return null;
288
        }
289
290
        // Update priority or whenAt
291
        //  This makes sure if someone else is updating at the same time
292
        //  that we don't trounce their changes.
293
        $builder = $objectManager->createQueryBuilder($this->getJobClass());
294
        $builder->findAndUpdate();
295
        $builder->field('_id')->equals($oldJob->getId());
296
        $builder->field('priority')->lt($job->getPriority());
297
        $builder->field('priority')->set($job->getPriority());
298
        $builder->getQuery()->execute();
299
300
        $builder = $objectManager->createQueryBuilder($this->getJobClass());
301
        $builder->findAndUpdate();
302
        $builder->field('_id')->equals($oldJob->getId());
303
        $builder->field('whenAt')->gt($job->getWhenAt());
304
        $builder->field('whenAt')->set($job->getWhenAt());
305
        $builder->getQuery()->execute();
306
307
        if ($job->getWhenAt() < $oldJob->getWhenAt()) {
308
            $oldJob->setWhenAt($job->getWhenAt());
309
        }
310
        if ($job->getPriority() > $oldJob->getPriority()) {
311
            $oldJob->setPriority($job->getPriority());
312
        }
313
314
        return $oldJob;
315
    }
316
317
    /**
318
     * @param mixed $builder
319
     */
320
    protected function addStandardPredicates($builder)
321
    {
322
        $date = new \DateTime();
323
        $builder
324
            ->addAnd(
325
                $builder->expr()->addOr($builder->expr()->field('whenAt')->equals(null), $builder->expr()->field('whenAt')->lte($date)),
326
                $builder->expr()->addOr($builder->expr()->field('expiresAt')->equals(null), $builder->expr()->field('expiresAt')->gt($date))
327
            )
328
            ->field('status')->equals(BaseJob::STATUS_NEW)
329
            ->field('locked')->equals(null);
330
    }
331
332
    public function getWorkersAndMethods()
333
    {
334
        /** @var DocumentManager $documentManager */
335
        $documentManager = $this->getObjectManager();
336
337
        if (!method_exists($documentManager, 'createAggregationBuilder')) {
338
            return [];
339
        }
340
341
        $aggregationBuilder = $documentManager->createAggregationBuilder($this->getJobClass());
342
343
        $this->addStandardPredicates($aggregationBuilder->match());
344
345
        $aggregationBuilder->group()
346
            ->field('id')
347
            ->expression(
348
                $aggregationBuilder->expr()
349
                ->field('workerName')->expression('$workerName')
350
                ->field('method')->expression('$method')
351
            );
352
        $results = $aggregationBuilder->execute()->toArray();
353
354
        if (!$results) {
355
            return [];
356
        }
357
358
        $workersMethods = [];
359
        foreach ($results as $result) {
360
            if (isset($result['_id'])) {
361
                $workersMethods[$result['_id']['worker_name']][] = $result['_id']['method'];
362
            }
363
        }
364
365
        return $workersMethods;
366
    }
367
368
    /**
369
     * @param string $workerName
370
     * @param string $methodName
371
     */
372
    public function countLiveJobs($workerName = null, $methodName = null)
373
    {
374
        /** @var DocumentManager $objectManager */
375
        $objectManager = $this->getObjectManager();
376
        $builder = $objectManager->createQueryBuilder($this->getJobClass());
377
378
        $this->addWorkerNameCriterion($builder, $workerName, $methodName);
379
        // Filter
380
        $this->addStandardPredicates($builder);
381
382
        return $builder->getQuery()->count();
383
    }
384
385
    /**
386
     * @param string   $workerName
387
     * @param string   $methodName
388
     * @param \Closure $progressCallback
389
     */
390
    public function archiveAllJobs($workerName = null, $methodName = null, $progressCallback)
391
    {
392
        /** @var DocumentManager $documentManager */
393
        $documentManager = $this->getObjectManager();
394
        $count = 0;
395
        $builder = $this->getJobQueryBuilder($workerName, $methodName, true);
396
        $builder
397
            ->findAndUpdate()
398
            ->returnNew();
399
400
        $builder->field('status')->set(Job::STATUS_ARCHIVE);
401
        $query = $builder->getQuery();
402
        do {
403
            $job = $query->execute();
404
            if ($job) {
405
                $documentManager->remove($job);
406
                ++$count;
407
408
                if (0 == $count % 10) {
409
                    $this->flush();
410
                    $progressCallback($count);
411
                }
412
            }
413
        } while ($job);
414
        $this->flush();
415
        $progressCallback($count);
416
    }
417
}
418