Completed
Push — master ( 5fe2ea...bba9f0 )
by Matthew
13:23
created

JobManager   A

Complexity

Total Complexity 35

Size/Duplication

Total Lines 397
Duplicated Lines 0 %

Coupling/Cohesion

Components 1
Dependencies 9

Test Coverage

Coverage 25.97%

Importance

Changes 4
Bugs 3 Features 1
Metric Value
wmc 35
lcom 1
cbo 9
dl 0
loc 397
ccs 47
cts 181
cp 0.2597
rs 9
c 4
b 3
f 1

15 Methods

Rating   Name   Duplication   Size   Complexity  
A getJobCount() 0 22 1
A countJobsByStatus() 0 14 1
A pruneErroneousJobs() 0 17 2
A addWorkerNameCriterion() 0 10 3
A updateExpired() 0 18 2
A pruneArchivedJobs() 0 4 1
B getStatusByDocument() 0 37 2
B getStatus() 0 22 5
A getJob() 0 21 1
A getJobQueryBuilder() 0 21 2
B updateNearestBatch() 0 43 4
A addStandardPredicates() 0 11 1
B getWorkersAndMethods() 0 35 5
A countLiveJobs() 0 12 1
B archiveAllJobs() 0 27 4
1
<?php
2
3
namespace Dtc\QueueBundle\ODM;
4
5
use Doctrine\MongoDB\Query\Builder;
6
use Dtc\QueueBundle\Doctrine\BaseJobManager;
7
use Doctrine\ODM\MongoDB\DocumentManager;
8
use Dtc\QueueBundle\Document\Job;
9
use Dtc\QueueBundle\Model\BaseJob;
10
use Dtc\QueueBundle\Model\RetryableJob;
11
12
class JobManager extends BaseJobManager
13
{
14
    use CommonTrait;
15
    const REDUCE_FUNCTION = 'function(k, vals) {
16
            var result = {};
17
            for (var index in vals) {
18
                var val =  vals[index];
19
                for (var i in val) {
20
                    if (result.hasOwnProperty(i)) {
21
                        result[i] += val[i];
22
                    }
23
                    else {
24
                        result[i] = val[i];
25
                    }
26
                }
27
            }
28
            return result;
29
        }';
30
31
    public function countJobsByStatus($objectName, $status, $workerName = null, $method = null)
32
    {
33
        /** @var DocumentManager $objectManager */
34
        $objectManager = $this->getObjectManager();
35
        $qb = $objectManager->createQueryBuilder($objectName);
36
        $qb
37
            ->find()
38
            ->field('status')->equals($status);
39
40
        $this->addWorkerNameCriterion($qb, $workerName, $method);
41
        $query = $qb->getQuery();
42
43
        return $query->count();
44
    }
45
46
    /**
47
     * @param string|null $workerName
48
     * @param string|null $method
49
     */
50
    public function pruneErroneousJobs($workerName = null, $method = null)
51
    {
52
        /** @var DocumentManager $objectManager */
53
        $objectManager = $this->getObjectManager();
54
        $qb = $objectManager->createQueryBuilder($this->getJobArchiveClass());
55
        $qb = $qb->remove();
56
        $qb->field('status')->equals(BaseJob::STATUS_ERROR);
57
        $this->addWorkerNameCriterion($qb, $workerName, $method);
58
59
        $query = $qb->getQuery();
60
        $result = $query->execute();
61
        if (isset($result['n'])) {
62
            return $result['n'];
63
        }
64
65
        return 0;
66
    }
67
68
    /**
69
     * @param Builder     $builder
70
     * @param string|null $workerName
71
     * @param string|null $method
72
     */
73 2
    protected function addWorkerNameCriterion(Builder $builder, $workerName = null, $method = null)
74
    {
75 2
        if (null !== $workerName) {
76
            $builder->field('workerName')->equals($workerName);
77
        }
78
79 2
        if (null !== $method) {
80
            $builder->field('method')->equals($method);
81
        }
82 2
    }
83
84
    /**
85
     * @param null $workerName
86
     * @param null $method
87
     *
88
     * @return int
89
     */
90 1
    protected function updateExpired($workerName = null, $method = null)
91
    {
92
        /** @var DocumentManager $objectManager */
93 1
        $objectManager = $this->getObjectManager();
94 1
        $qb = $objectManager->createQueryBuilder($this->getJobClass());
95 1
        $qb = $qb->updateMany();
96 1
        $qb->field('expiresAt')->lte(new \DateTime());
97 1
        $qb->field('status')->equals(BaseJob::STATUS_NEW);
98 1
        $this->addWorkerNameCriterion($qb, $workerName, $method);
99 1
        $qb->field('status')->set(\Dtc\QueueBundle\Model\Job::STATUS_EXPIRED);
100 1
        $query = $qb->getQuery();
101 1
        $result = $query->execute();
102 1
        if (isset($result['n'])) {
103 1
            return $result['n'];
104
        }
105
106
        return 0;
107
    }
108
109
    /**
110
     * Removes archived jobs older than $olderThan.
111
     *
112
     * @param \DateTime $olderThan
113
     *
114
     * @return int
115
     */
116
    public function pruneArchivedJobs(\DateTime $olderThan)
117
    {
118
        return $this->removeOlderThan($this->getJobArchiveClass(), 'updatedAt', $olderThan);
119
    }
120
121 1
    public function getJobCount($workerName = null, $method = null)
122
    {
123
        /** @var DocumentManager $objectManager */
124 1
        $objectManager = $this->getObjectManager();
125 1
        $qb = $objectManager->createQueryBuilder($this->getJobClass());
126
        $qb
127 1
            ->find();
128
129 1
        $this->addWorkerNameCriterion($qb, $workerName, $method);
130
131
        // Filter
132 1
        $date = new \DateTime();
133
        $qb
134 1
            ->addAnd(
135 1
                $qb->expr()->addOr($qb->expr()->field('expiresAt')->equals(null), $qb->expr()->field('expiresAt')->gt($date))
136
            )
137 1
            ->field('locked')->equals(null);
138
139 1
        $query = $qb->getQuery();
140
141 1
        return $query->count(true);
142
    }
143
144
    /**
145
     * Get Status Jobs.
146
     *
147
     * @param string $documentName
148
     *
149
     * @return array
150
     */
151 1
    protected function getStatusByDocument($documentName)
152
    {
153
        // Run a map reduce function get worker and status break down
154 1
        $mapFunc = "function() {
155
            var result = {};
156
            result[this.status] = 1;
157
            var key = this.worker_name + '->' + this.method + '()';
158
            emit(key, result);
159
        }";
160 1
        $reduceFunc = self::REDUCE_FUNCTION;
161
        /** @var DocumentManager $objectManager */
162 1
        $objectManager = $this->getObjectManager();
163 1
        $qb = $objectManager->createQueryBuilder($documentName);
164 1
        $qb->map($mapFunc)
165 1
            ->reduce($reduceFunc);
166 1
        $query = $qb->getQuery();
167 1
        $results = $query->execute();
168
169
        $allStatus = array(
170 1
            BaseJob::STATUS_ERROR => 0,
171
            BaseJob::STATUS_NEW => 0,
172
            RetryableJob::STATUS_EXPIRED => 0,
173
            RetryableJob::STATUS_MAX_ERROR => 0,
174
            RetryableJob::STATUS_MAX_RETRIES => 0,
175
            RetryableJob::STATUS_MAX_STALLED => 0,
176
            BaseJob::STATUS_RUNNING => 0,
177
            BaseJob::STATUS_SUCCESS => 0,
178
        );
179
180 1
        $status = [];
181
182 1
        foreach ($results as $info) {
183
            $status[$info['_id']] = $info['value'] + $allStatus;
184
        }
185
186 1
        return $status;
187
    }
188
189 1
    public function getStatus()
190
    {
191 1
        $result = $this->getStatusByDocument($this->getJobClass());
192 1
        $status2 = $this->getStatusByDocument($this->getJobArchiveClass());
193 1
        foreach ($status2 as $key => $value) {
194
            foreach ($value as $k => $v) {
195
                if (isset($result[$key][$k])) {
196
                    $result[$key][$k] += $v;
197
                } else {
198
                    $result[$key][$k] = $v;
199
                }
200
            }
201
        }
202
203 1
        $finalResult = [];
204 1
        foreach ($result as $key => $item) {
205
            ksort($item);
206
            $finalResult[$key] = $item;
207
        }
208
209 1
        return $finalResult;
210
    }
211
212
    /**
213
     * Get the next job to run (can be filtered by workername and method name).
214
     *
215
     * @param string      $workerName
216
     * @param string      $methodName
217
     * @param bool        $prioritize
218
     * @param string|null $runId
219
     *
220
     * @return \Dtc\QueueBundle\Model\Job
221
     */
222
    public function getJob($workerName = null, $methodName = null, $prioritize = true, $runId = null)
223
    {
224
        $builder = $this->getJobQueryBuilder($workerName, $methodName, $prioritize);
0 ignored issues
show
Bug introduced by
It seems like $workerName defined by parameter $workerName on line 222 can also be of type string; however, Dtc\QueueBundle\ODM\JobM...r::getJobQueryBuilder() does only seem to accept null, maybe add an additional type check?

This check looks at variables that have been passed in as parameters and are passed out again to other methods.

If the outgoing method call has stricter type requirements than the method itself, an issue is raised.

An additional type check may prevent trouble.

Loading history...
Bug introduced by
It seems like $methodName defined by parameter $methodName on line 222 can also be of type string; however, Dtc\QueueBundle\ODM\JobM...r::getJobQueryBuilder() does only seem to accept null, maybe add an additional type check?

This check looks at variables that have been passed in as parameters and are passed out again to other methods.

If the outgoing method call has stricter type requirements than the method itself, an issue is raised.

An additional type check may prevent trouble.

Loading history...
225
        $builder
226
            ->findAndUpdate()
227
            ->returnNew();
228
229
        $date = new \DateTime();
230
        // Update
231
        $builder
232
            ->field('lockedAt')->set($date) // Set started
233
            ->field('locked')->set(true)
234
            ->field('status')->set(BaseJob::STATUS_RUNNING)
235
            ->field('runId')->set($runId);
236
237
        $query = $builder->getQuery();
238
239
        $job = $query->execute();
240
241
        return $job;
242
    }
243
244
    /**
245
     * @param null $workerName
246
     * @param null $methodName
247
     * @param bool $prioritize
248
     *
249
     * @return Builder
250
     */
251
    public function getJobQueryBuilder($workerName = null, $methodName = null, $prioritize = true)
252
    {
253
        /** @var DocumentManager $objectManager */
254
        $objectManager = $this->getObjectManager();
255
        $builder = $objectManager->createQueryBuilder($this->getJobClass());
256
257
        $this->addWorkerNameCriterion($builder, $workerName, $methodName);
258
        if ($prioritize) {
259
            $builder->sort([
260
                'priority' => 'desc',
261
                'whenAt' => 'asc',
262
            ]);
263
        } else {
264
            $builder->sort('whenAt', 'asc');
265
        }
266
267
        // Filter
268
        $this->addStandardPredicates($builder);
269
270
        return $builder;
271
    }
272
273
    protected function updateNearestBatch(\Dtc\QueueBundle\Model\Job $job)
274
    {
275
        /** @var DocumentManager $objectManager */
276
        $objectManager = $this->getObjectManager();
277
        $builder = $objectManager->createQueryBuilder($this->getJobClass());
278
        $builder->find();
279
280
        $builder->sort('whenAt', 'asc');
281
        $builder->field('status')->equals(BaseJob::STATUS_NEW)
282
            ->field('crcHash')->equals($job->getCrcHash())
283
            ->field('locked')->equals(null);
284
        $oldJob = $builder->getQuery()->getSingleResult();
285
286
        if (!$oldJob) {
287
            return null;
288
        }
289
290
        // Update priority or whenAt
291
        //  This makes sure if someone else is updating at the same time
292
        //  that we don't trounce their changes.
293
        $builder = $objectManager->createQueryBuilder($this->getJobClass());
294
        $builder->findAndUpdate();
295
        $builder->field('_id')->equals($oldJob->getId());
296
        $builder->field('priority')->lt($job->getPriority());
297
        $builder->field('priority')->set($job->getPriority());
298
        $builder->getQuery()->execute();
299
300
        $builder = $objectManager->createQueryBuilder($this->getJobClass());
301
        $builder->findAndUpdate();
302
        $builder->field('_id')->equals($oldJob->getId());
303
        $builder->field('whenAt')->gt($job->getWhenAt());
304
        $builder->field('whenAt')->set($job->getWhenAt());
305
        $builder->getQuery()->execute();
306
307
        if ($job->getWhenAt() < $oldJob->getWhenAt()) {
308
            $oldJob->setWhenAt($job->getWhenAt());
309
        }
310
        if ($job->getPriority() > $oldJob->getPriority()) {
311
            $oldJob->setPriority($job->getPriority());
312
        }
313
314
        return $oldJob;
315
    }
316
317
    /**
318
     * @param mixed $builder
319
     */
320
    protected function addStandardPredicates($builder)
321
    {
322
        $date = new \DateTime();
323
        $builder
324
            ->addAnd(
325
                $builder->expr()->addOr($builder->expr()->field('whenAt')->equals(null), $builder->expr()->field('whenAt')->lte($date)),
326
                $builder->expr()->addOr($builder->expr()->field('expiresAt')->equals(null), $builder->expr()->field('expiresAt')->gt($date))
327
            )
328
            ->field('status')->equals(BaseJob::STATUS_NEW)
329
            ->field('locked')->equals(null);
330
    }
331
332
    public function getWorkersAndMethods()
333
    {
334
        /** @var DocumentManager $documentManager */
335
        $documentManager = $this->getObjectManager();
336
337
        if (!method_exists($documentManager, 'createAggregationBuilder')) {
338
            return [];
339
        }
340
341
        $aggregationBuilder = $documentManager->createAggregationBuilder($this->getJobClass());
342
343
        $this->addStandardPredicates($aggregationBuilder->match());
344
345
        $aggregationBuilder->group()
346
            ->field('id')
347
            ->expression(
348
                $aggregationBuilder->expr()
349
                ->field('workerName')->expression('$workerName')
350
                ->field('method')->expression('$method')
351
            );
352
        $results = $aggregationBuilder->execute()->toArray();
353
354
        if (!$results) {
355
            return [];
356
        }
357
358
        $workersMethods = [];
359
        foreach ($results as $result) {
360
            if (isset($result['_id'])) {
361
                $workersMethods[$result['_id']['worker_name']][] = $result['_id']['method'];
362
            }
363
        }
364
365
        return $workersMethods;
366
    }
367
368
    public function countLiveJobs($workerName = null, $methodName = null)
369
    {
370
        /** @var DocumentManager $objectManager */
371
        $objectManager = $this->getObjectManager();
372
        $builder = $objectManager->createQueryBuilder($this->getJobClass());
373
374
        $this->addWorkerNameCriterion($builder, $workerName, $methodName);
375
        // Filter
376
        $this->addStandardPredicates($builder);
377
378
        return $builder->getQuery()->count();
379
    }
380
381
    public function archiveAllJobs($workerName = null, $methodName = null, $progressCallback)
382
    {
383
        /** @var DocumentManager $documentManager */
384
        $documentManager = $this->getObjectManager();
385
        $count = 0;
386
        $builder = $this->getJobQueryBuilder($workerName, $methodName, true);
387
        $builder
388
            ->findAndUpdate()
389
            ->returnNew();
390
391
        $builder->field('status')->set(Job::STATUS_ARCHIVE);
392
        $query = $builder->getQuery();
393
        do {
394
            $job = $query->execute();
395
            if ($job) {
396
                $documentManager->remove($job);
397
                ++$count;
398
399
                if (0 == $count % 10) {
400
                    $this->flush();
401
                    $progressCallback($count);
402
                }
403
            }
404
        } while ($job);
405
        $this->flush();
406
        $progressCallback($count);
407
    }
408
}
409