Completed
Pull Request — master (#30)
by Matthew
16:57
created

JobManager::getJobCount()   A

Complexity

Conditions 1
Paths 1

Size

Total Lines 20
Code Lines 12

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 11
CRAP Score 1

Importance

Changes 0
Metric Value
dl 0
loc 20
ccs 11
cts 11
cp 1
rs 9.4285
c 0
b 0
f 0
cc 1
eloc 12
nc 1
nop 2
crap 1
1
<?php
2
3
namespace Dtc\QueueBundle\ODM;
4
5
use Doctrine\MongoDB\Query\Builder;
6
use Dtc\QueueBundle\Doctrine\DoctrineJobManager;
7
use Doctrine\ODM\MongoDB\DocumentManager;
8
use Dtc\QueueBundle\Document\Job;
9
use Dtc\QueueBundle\Model\BaseJob;
10
use Dtc\QueueBundle\Model\RetryableJob;
11
use Dtc\QueueBundle\Model\StallableJob;
12
13
class JobManager extends DoctrineJobManager
14
{
15
    use CommonTrait;
16
    const REDUCE_FUNCTION = 'function(k, vals) {
17
            var result = {};
18
            for (var index in vals) {
19
                var val = vals[index];
20
                for (var i in val) {
21
                    if (result.hasOwnProperty(i)) {
22
                        result[i] += val[i];
23
                    }
24
                    else {
25
                        result[i] = val[i];
26
                    }
27
                }
28
            }
29
            return result;
30
        }';
31
32
    public function countJobsByStatus($objectName, $status, $workerName = null, $method = null)
33
    {
34
        /** @var DocumentManager $objectManager */
35
        $objectManager = $this->getObjectManager();
36
        $qb = $objectManager->createQueryBuilder($objectName);
37
        $qb
38
            ->find()
39
            ->field('status')->equals($status);
40
41
        $this->addWorkerNameCriterion($qb, $workerName, $method);
42
        $query = $qb->getQuery();
43
44
        return $query->count();
45
    }
46
47
    /**
48
     * @param string|null $workerName
49
     * @param string|null $method
50
     */
51
    public function pruneExceptionJobs($workerName = null, $method = null)
52
    {
53
        /** @var DocumentManager $objectManager */
54
        $objectManager = $this->getObjectManager();
55
        $qb = $objectManager->createQueryBuilder($this->getJobArchiveClass());
56
        $qb = $qb->remove();
57
        $qb->field('status')->equals(BaseJob::STATUS_EXCEPTION);
58
        $this->addWorkerNameCriterion($qb, $workerName, $method);
59
60
        $query = $qb->getQuery();
61
        $result = $query->execute();
62
        if (isset($result['n'])) {
63
            return $result['n'];
64
        }
65
66
        return 0;
67
    }
68
69
    /**
70
     * @param Builder     $builder
71
     * @param string|null $workerName
72
     * @param string|null $method
73
     */
74 1
    protected function addWorkerNameCriterion(Builder $builder, $workerName = null, $method = null)
75
    {
76 1
        if (null !== $workerName) {
77
            $builder->field('workerName')->equals($workerName);
78
        }
79
80 1
        if (null !== $method) {
81
            $builder->field('method')->equals($method);
82
        }
83 1
    }
84
85
    /**
86
     * @param null $workerName
87
     * @param null $method
88
     *
89
     * @return int
90
     */
91
    protected function updateExpired($workerName = null, $method = null)
92
    {
93
        /** @var DocumentManager $objectManager */
94
        $objectManager = $this->getObjectManager();
95
        $qb = $objectManager->createQueryBuilder($this->getJobClass());
96
        $qb = $qb->updateMany();
97
        $qb->field('expiresAt')->lte(new \DateTime());
98
        $qb->field('status')->equals(BaseJob::STATUS_NEW);
99
        $this->addWorkerNameCriterion($qb, $workerName, $method);
100
        $qb->field('status')->set(\Dtc\QueueBundle\Model\Job::STATUS_EXPIRED);
101
        $query = $qb->getQuery();
102
        $result = $query->execute();
103
        if (isset($result['n'])) {
104
            return $result['n'];
105
        }
106
107
        return 0;
108
    }
109
110
    /**
111
     * Removes archived jobs older than $olderThan.
112
     *
113
     * @param \DateTime $olderThan
114
     *
115
     * @return int
116
     */
117
    public function pruneArchivedJobs(\DateTime $olderThan)
118
    {
119
        return $this->removeOlderThan($this->getJobArchiveClass(), 'updatedAt', $olderThan);
120
    }
121
122 1
    public function getJobCount($workerName = null, $method = null)
123
    {
124
        /** @var DocumentManager $objectManager */
125 1
        $objectManager = $this->getObjectManager();
126 1
        $qb = $objectManager->createQueryBuilder($this->getJobClass());
127
        $qb
128 1
            ->find();
129
130 1
        $this->addWorkerNameCriterion($qb, $workerName, $method);
131
132
        // Filter
133 1
        $date = new \DateTime();
134
        $qb
135 1
            ->addAnd(
136 1
                $qb->expr()->addOr($qb->expr()->field('expiresAt')->equals(null), $qb->expr()->field('expiresAt')->gt($date))
137 1
            );
138 1
        $query = $qb->getQuery();
139
140 1
        return $query->count(true);
141
    }
142
143
    /**
144
     * Get Status Jobs.
145
     *
146
     * @param string $documentName
147
     *
148
     * @return array
149
     */
150 1
    protected function getStatusByDocument($documentName)
151
    {
152
        // Run a map reduce function get worker and status break down
153
        $mapFunc = "function() {
154
            var result = {};
155
            result[this.status] = 1;
156
            var key = this.worker_name + '->' + this.method + '()';
157
            emit(key, result);
158 1
        }";
159 1
        $reduceFunc = self::REDUCE_FUNCTION;
160
        /** @var DocumentManager $objectManager */
161 1
        $objectManager = $this->getObjectManager();
162 1
        $qb = $objectManager->createQueryBuilder($documentName);
163 1
        $qb->map($mapFunc)
164 1
            ->reduce($reduceFunc);
165 1
        $query = $qb->getQuery();
166 1
        $results = $query->execute();
167
168
        $allStatus = array(
169 1
            BaseJob::STATUS_NEW => 0,
170 1
            BaseJob::STATUS_RUNNING => 0,
171 1
            BaseJob::STATUS_SUCCESS => 0,
172 1
            BaseJob::STATUS_FAILURE => 0,
173 1
            BaseJob::STATUS_EXCEPTION => 0,
174 1
            \Dtc\QueueBundle\Model\Job::STATUS_EXPIRED => 0,
175 1
            RetryableJob::STATUS_MAX_FAILURES => 0,
176 1
            RetryableJob::STATUS_MAX_EXCEPTIONS => 0,
177 1
            RetryableJob::STATUS_MAX_RETRIES => 0,
178 1
            StallableJob::STATUS_MAX_STALLS => 0,
179 1
        );
180
181 1
        $status = [];
182
183 1
        foreach ($results as $info) {
184
            $status[$info['_id']] = $info['value'] + $allStatus;
185 1
        }
186
187 1
        return $status;
188
    }
189
190 1
    public function getStatus()
191
    {
192 1
        $result = $this->getStatusByDocument($this->getJobClass());
193 1
        $status2 = $this->getStatusByDocument($this->getJobArchiveClass());
194 1
        foreach ($status2 as $key => $value) {
195
            foreach ($value as $k => $v) {
196
                if (isset($result[$key][$k])) {
197
                    $result[$key][$k] += $v;
198
                } else {
199
                    $result[$key][$k] = $v;
200
                }
201
            }
202 1
        }
203
204 1
        $finalResult = [];
205 1
        foreach ($result as $key => $item) {
206
            ksort($item);
207
            $finalResult[$key] = $item;
208 1
        }
209
210 1
        return $finalResult;
211
    }
212
213
    /**
214
     * Get the next job to run (can be filtered by workername and method name).
215
     *
216
     * @param string      $workerName
217
     * @param string      $methodName
218
     * @param bool        $prioritize
219
     * @param string|null $runId
220
     *
221
     * @return \Dtc\QueueBundle\Model\Job
222
     */
223
    public function getJob($workerName = null, $methodName = null, $prioritize = true, $runId = null)
224
    {
225
        $builder = $this->getJobQueryBuilder($workerName, $methodName, $prioritize);
226
        $builder
227
            ->findAndUpdate()
228
            ->returnNew();
229
230
        $date = new \DateTime();
231
        // Update
232
        $builder
233
            ->field('startedAt')->set($date)
234
            ->field('status')->set(BaseJob::STATUS_RUNNING)
235
            ->field('runId')->set($runId);
236
237
        $query = $builder->getQuery();
238
239
        $job = $query->execute();
240
241
        return $job;
242
    }
243
244
    /**
245
     * @param string|null $workerName
246
     * @param string|null $methodName
247
     * @param bool $prioritize
248
     *
249
     * @return Builder
250
     */
251
    public function getJobQueryBuilder($workerName = null, $methodName = null, $prioritize = true)
252
    {
253
        /** @var DocumentManager $objectManager */
254
        $objectManager = $this->getObjectManager();
255
        $builder = $objectManager->createQueryBuilder($this->getJobClass());
256
257
        $this->addWorkerNameCriterion($builder, $workerName, $methodName);
258
        if ($prioritize) {
259
            $builder->sort([
260
                'priority' => 'desc',
261
                'whenAt' => 'asc',
262
            ]);
263
        } else {
264
            $builder->sort('whenAt', 'asc');
265
        }
266
267
        // Filter
268
        $this->addStandardPredicates($builder);
269
270
        return $builder;
271
    }
272
273
    protected function updateNearestBatch(\Dtc\QueueBundle\Model\Job $job)
274
    {
275
        /** @var DocumentManager $objectManager */
276
        $objectManager = $this->getObjectManager();
277
        $builder = $objectManager->createQueryBuilder($this->getJobClass());
278
        $builder->find();
279
280
        $builder->sort('whenAt', 'asc');
281
        $builder->field('status')->equals(BaseJob::STATUS_NEW)
282
            ->field('crcHash')->equals($job->getCrcHash());
283
        $oldJob = $builder->getQuery()->getSingleResult();
284
285
        if (!$oldJob) {
286
            return null;
287
        }
288
289
        // Update priority or whenAt
290
        //  This makes sure if someone else is updating at the same time
291
        //  that we don't trounce their changes.
292
        $builder = $objectManager->createQueryBuilder($this->getJobClass());
293
        $builder->findAndUpdate();
294
        $builder->field('_id')->equals($oldJob->getId());
295
        $builder->field('priority')->lt($job->getPriority());
296
        $builder->field('priority')->set($job->getPriority());
297
        $builder->getQuery()->execute();
298
299
        $builder = $objectManager->createQueryBuilder($this->getJobClass());
300
        $builder->findAndUpdate();
301
        $builder->field('_id')->equals($oldJob->getId());
302
        $builder->field('whenAt')->gt($job->getWhenAt());
303
        $builder->field('whenAt')->set($job->getWhenAt());
304
        $builder->getQuery()->execute();
305
306
        if ($job->getWhenAt() < $oldJob->getWhenAt()) {
307
            $oldJob->setWhenAt($job->getWhenAt());
308
        }
309
        if ($job->getPriority() > $oldJob->getPriority()) {
310
            $oldJob->setPriority($job->getPriority());
311
        }
312
313
        return $oldJob;
314
    }
315
316
    /**
317
     * @param mixed $builder
318
     */
319
    protected function addStandardPredicates($builder)
320
    {
321
        $date = new \DateTime();
322
        $builder
323
            ->addAnd(
324
                $builder->expr()->addOr($builder->expr()->field('whenAt')->equals(null), $builder->expr()->field('whenAt')->lte($date)),
325
                $builder->expr()->addOr($builder->expr()->field('expiresAt')->equals(null), $builder->expr()->field('expiresAt')->gt($date))
326
            )
327
            ->field('status')->equals(BaseJob::STATUS_NEW);
328
    }
329
330
    public function getWorkersAndMethods()
331
    {
332
        /** @var DocumentManager $documentManager */
333
        $documentManager = $this->getObjectManager();
334
335
        if (!method_exists($documentManager, 'createAggregationBuilder')) {
336
            return [];
337
        }
338
339
        $aggregationBuilder = $documentManager->createAggregationBuilder($this->getJobClass());
340
341
        $this->addStandardPredicates($aggregationBuilder->match());
342
343
        $aggregationBuilder->group()
344
            ->field('id')
345
            ->expression(
346
                $aggregationBuilder->expr()
347
                ->field('workerName')->expression('$workerName')
348
                ->field('method')->expression('$method')
349
            );
350
        $results = $aggregationBuilder->execute()->toArray();
351
352
        if (!$results) {
353
            return [];
354
        }
355
356
        $workersMethods = [];
357
        foreach ($results as $result) {
358
            if (isset($result['_id'])) {
359
                $workersMethods[$result['_id']['worker_name']][] = $result['_id']['method'];
360
            }
361
        }
362
363
        return $workersMethods;
364
    }
365
366
    /**
367
     * @param string $workerName
368
     * @param string $methodName
369
     */
370 1
    public function countLiveJobs($workerName = null, $methodName = null)
371
    {
372
        /** @var DocumentManager $objectManager */
373
        $objectManager = $this->getObjectManager();
374
        $builder = $objectManager->createQueryBuilder($this->getJobClass());
375
376
        $this->addWorkerNameCriterion($builder, $workerName, $methodName);
377
        // Filter
378
        $this->addStandardPredicates($builder);
379
380 1
        return $builder->getQuery()->count();
381
    }
382
383
    /**
384
     * @param string   $workerName
385
     * @param string   $methodName
386
     * @param \Closure $progressCallback
387
     */
388 1
    public function archiveAllJobs($workerName = null, $methodName = null, $progressCallback)
389
    {
390
        /** @var DocumentManager $documentManager */
391
        $documentManager = $this->getObjectManager();
392 1
        $count = 0;
393
        $builder = $this->getJobQueryBuilder($workerName, $methodName, true);
394
        $builder
395
            ->findAndUpdate()
396
            ->returnNew();
397
398
        $builder->field('status')->set(Job::STATUS_ARCHIVE);
399
        $query = $builder->getQuery();
400
        do {
401
            $job = $query->execute();
402
            if ($job) {
403
                $documentManager->remove($job);
404
                ++$count;
405
406
                if (0 == $count % 10) {
407
                    $this->flush();
408
                    $progressCallback($count);
409
                }
410
            }
411
        } while ($job);
412
        $this->flush();
413
        $progressCallback($count);
414
    }
415
}
416