Completed
Push — master ( e02161...971421 )
by Matthew
12:50
created

JobManager::updateNearestBatch()   B

Complexity

Conditions 4
Paths 5

Size

Total Lines 43
Code Lines 28

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 0
CRAP Score 20

Importance

Changes 0
Metric Value
dl 0
loc 43
ccs 0
cts 30
cp 0
rs 8.5806
c 0
b 0
f 0
cc 4
eloc 28
nc 5
nop 1
crap 20
1
<?php
2
3
namespace Dtc\QueueBundle\ODM;
4
5
use Doctrine\MongoDB\Query\Builder;
6
use Dtc\QueueBundle\Doctrine\BaseJobManager;
7
use Doctrine\ODM\MongoDB\DocumentManager;
8
use Dtc\QueueBundle\Model\BaseJob;
9
use Dtc\QueueBundle\Model\RetryableJob;
10
11
class JobManager extends BaseJobManager
12
{
13
    use CommonTrait;
14
15
    public function countJobsByStatus($objectName, $status, $workerName = null, $method = null)
16
    {
17
        /** @var DocumentManager $objectManager */
18
        $objectManager = $this->getObjectManager();
19
        $qb = $objectManager->createQueryBuilder($objectName);
20
        $qb
21
            ->find()
22
            ->field('status')->equals($status);
23
24
        $this->addWorkerNameCriterion($qb, $workerName, $method);
25
        $query = $qb->getQuery();
26
27
        return $query->count();
28
    }
29
30
    /**
31
     * @param string|null $workerName
32
     * @param string|null $method
33
     */
34
    public function pruneErroneousJobs($workerName = null, $method = null)
35
    {
36
        /** @var DocumentManager $objectManager */
37
        $objectManager = $this->getObjectManager();
38
        $qb = $objectManager->createQueryBuilder($this->getArchiveObjectName());
39
        $qb = $qb->remove();
40
        $qb->field('status')->equals(BaseJob::STATUS_ERROR);
41
        $this->addWorkerNameCriterion($qb, $workerName, $method);
42
43
        $query = $qb->getQuery();
44
        $result = $query->execute();
45
        if (isset($result['n'])) {
46
            return $result['n'];
47
        }
48
49
        return 0;
50
    }
51
52
    /**
53
     * @param Builder     $builder
54
     * @param string|null $workerName
55
     * @param string|null $method
56
     */
57 3
    protected function addWorkerNameCriterion(Builder $builder, $workerName = null, $method = null)
58
    {
59 3
        if (null !== $workerName) {
60
            $builder->field('workerName')->equals($workerName);
61
        }
62
63 3
        if (null !== $method) {
64
            $builder->field('method')->equals($method);
65
        }
66 3
    }
67
68
    /**
69
     * @param null $workerName
70
     * @param null $method
71
     *
72
     * @return int
73
     */
74 1
    protected function updateExpired($workerName = null, $method = null)
75
    {
76
        /** @var DocumentManager $objectManager */
77 1
        $objectManager = $this->getObjectManager();
78 1
        $qb = $objectManager->createQueryBuilder($this->getObjectName());
79 1
        $qb = $qb->updateMany();
80 1
        $qb->field('expiresAt')->lte(new \DateTime());
81 1
        $qb->field('status')->equals(BaseJob::STATUS_NEW);
82 1
        $this->addWorkerNameCriterion($qb, $workerName, $method);
83 1
        $qb->field('status')->set(\Dtc\QueueBundle\Model\Job::STATUS_EXPIRED);
84 1
        $query = $qb->getQuery();
85 1
        $result = $query->execute();
86 1
        if (isset($result['n'])) {
87 1
            return $result['n'];
88
        }
89
90
        return 0;
91
    }
92
93
    /**
94
     * Removes archived jobs older than $olderThan.
95
     *
96
     * @param \DateTime $olderThan
97
     *                             return int
98
     */
99
    public function pruneArchivedJobs(\DateTime $olderThan)
100
    {
101
        return $this->removeOlderThan($this->getArchiveObjectName(), 'updatedAt', $olderThan);
102
    }
103
104 1
    public function getJobCount($workerName = null, $method = null)
105
    {
106
        /** @var DocumentManager $objectManager */
107 1
        $objectManager = $this->getObjectManager();
108 1
        $qb = $objectManager->createQueryBuilder($this->getObjectName());
109
        $qb
110 1
            ->find();
111
112 1
        $this->addWorkerNameCriterion($qb, $workerName, $method);
113
114
        // Filter
115 1
        $date = new \DateTime();
116
        $qb
117 1
            ->addAnd(
118 1
                $qb->expr()->addOr($qb->expr()->field('expiresAt')->equals(null), $qb->expr()->field('expiresAt')->gt($date))
119 1
            )
120 1
            ->field('locked')->equals(null);
121
122 1
        $query = $qb->getQuery();
123
124 1
        return $query->count(true);
125
    }
126
127
    /**
128
     * Get Status Jobs.
129
     *
130
     * @param string $documentName
131
     *
132
     * @return array
133
     */
134 1
    protected function getStatusByDocument($documentName)
135
    {
136
        // Run a map reduce function get worker and status break down
137
        $mapFunc = "function() {
138
            var result = {};
139
            result[this.status] = 1;
140
            var key = this.worker_name + '->' + this.method + '()';
141
            emit(key, result);
142 1
        }";
143
        $reduceFunc = 'function(k, vals) {
144
            var result = {};
145
            for (var index in vals) {
146
                var val =  vals[index];
147
                for (var i in val) {
148
                    if (result.hasOwnProperty(i)) {
149
                        result[i] += val[i];
150
                    }
151
                    else {
152
                        result[i] = val[i];
153
                    }
154
                }
155
            }
156
            return result;
157 1
        }';
158
        /** @var DocumentManager $objectManager */
159 1
        $objectManager = $this->getObjectManager();
160 1
        $qb = $objectManager->createQueryBuilder($documentName);
161 1
        $qb->map($mapFunc)
162 1
            ->reduce($reduceFunc);
163 1
        $query = $qb->getQuery();
164 1
        $results = $query->execute();
165
166
        $allStatus = array(
167 1
            BaseJob::STATUS_ERROR => 0,
168 1
            BaseJob::STATUS_NEW => 0,
169 1
            RetryableJob::STATUS_EXPIRED => 0,
170 1
            RetryableJob::STATUS_MAX_ERROR => 0,
171 1
            RetryableJob::STATUS_MAX_RETRIES => 0,
172 1
            RetryableJob::STATUS_MAX_STALLED => 0,
173 1
            BaseJob::STATUS_RUNNING => 0,
174 1
            BaseJob::STATUS_SUCCESS => 0,
175 1
        );
176
177 1
        $status = [];
178
179 1
        foreach ($results as $info) {
180
            $status[$info['_id']] = $info['value'] + $allStatus;
181 1
        }
182
183 1
        return $status;
184
    }
185
186 1
    public function getStatus()
187
    {
188 1
        $result = $this->getStatusByDocument($this->getObjectName());
189 1
        $status2 = $this->getStatusByDocument($this->getArchiveObjectName());
190 1
        foreach ($status2 as $key => $value) {
191
            foreach ($value as $k => $v) {
192
                $result[$key][$k] += $v;
193
            }
194 1
        }
195
196 1
        $finalResult = [];
197 1
        foreach ($result as $key => $item) {
198
            ksort($item);
199
            $finalResult[$key] = $item;
200 1
        }
201
202 1
        return $finalResult;
203
    }
204
205
    /**
206
     * Get the next job to run (can be filtered by workername and method name).
207
     *
208
     * @param string $workerName
209
     * @param string $methodName
210
     * @param bool   $prioritize
211
     *
212
     * @return \Dtc\QueueBundle\Model\Job
213
     */
214 1
    public function getJob($workerName = null, $methodName = null, $prioritize = true, $runId = null)
215
    {
216 1
        $builder = $this->getJobQueryBuilder($workerName, $methodName, $prioritize);
0 ignored issues
show
Bug introduced by
It seems like $workerName defined by parameter $workerName on line 214 can also be of type string; however, Dtc\QueueBundle\ODM\JobM...r::getJobQueryBuilder() does only seem to accept null, maybe add an additional type check?

This check looks at variables that have been passed in as parameters and are passed out again to other methods.

If the outgoing method call has stricter type requirements than the method itself, an issue is raised.

An additional type check may prevent trouble.

Loading history...
Bug introduced by
It seems like $methodName defined by parameter $methodName on line 214 can also be of type string; however, Dtc\QueueBundle\ODM\JobM...r::getJobQueryBuilder() does only seem to accept null, maybe add an additional type check?

This check looks at variables that have been passed in as parameters and are passed out again to other methods.

If the outgoing method call has stricter type requirements than the method itself, an issue is raised.

An additional type check may prevent trouble.

Loading history...
217
        $builder
218 1
            ->findAndUpdate()
219 1
            ->returnNew();
220
221
        // Update
222
        $builder
223 1
            ->field('lockedAt')->set($date) // Set started
0 ignored issues
show
Bug introduced by
The variable $date does not exist. Did you forget to declare it?

This check marks access to variables or properties that have not been declared yet. While PHP has no explicit notion of declaring a variable, accessing it before a value is assigned to it is most likely a bug.

Loading history...
224
            ->field('locked')->set(true)
225
            ->field('status')->set(BaseJob::STATUS_RUNNING)
226
            ->field('runId')->set($runId);
227
228
        $query = $builder->getQuery();
229
230
        $job = $query->execute();
231
232
        return $job;
233
    }
234
235
    /**
236
     * @param null $workerName
237
     * @param null $methodName
238
     * @param bool $prioritize
239
     *
240
     * @return Builder
241
     */
242 1
    public function getJobQueryBuilder($workerName = null, $methodName = null, $prioritize = true)
243
    {
244
        /** @var DocumentManager $objectManager */
245 1
        $objectManager = $this->getObjectManager();
246 1
        $builder = $objectManager->createQueryBuilder($this->getObjectName());
247
248 1
        $this->addWorkerNameCriterion($builder, $workerName, $methodName);
249 1
        if ($prioritize) {
250 1
            $builder->sort([
251 1
                'priority' => 'desc',
252 1
                'whenAt' => 'asc',
253 1
            ]);
254 1
        } else {
255
            $builder->sort('whenAt', 'asc');
256
        }
257
258
        // Filter
259 1
        $date = new \DateTime();
260
        $builder
261 1
            ->addAnd(
262 1
                $builder->expr()->addOr($builder->expr()->field('whenAt')->equals(null), $builder->expr()->field('whenAt')->lte($date)),
263 1
                $builder->expr()->addOr($builder->expr()->field('expiresAt')->equals(null), $builder->expr()->field('expiresAt')->gt($date))
264 1
            )
265 1
            ->field('status')->equals(BaseJob::STATUS_NEW)
266 1
            ->field('locked')->equals(null);
267
268 1
        return $builder;
269
    }
270
271
    protected function updateNearestBatch(\Dtc\QueueBundle\Model\Job $job)
272
    {
273
        /** @var DocumentManager $objectManager */
274
        $objectManager = $this->getObjectManager();
275
        $builder = $objectManager->createQueryBuilder($this->getObjectName());
276
        $builder->find();
277
278
        $builder->sort('whenAt', 'asc');
279
        $builder->field('status')->equals(BaseJob::STATUS_NEW)
280
            ->field('crcHash')->equals($job->getCrcHash())
281
            ->field('locked')->equals(null);
282
        $oldJob = $builder->getQuery()->getSingleResult();
283
284
        if (!$oldJob) {
285
            return null;
286
        }
287
288
        // Update priority or whenAt
289
        //  This makes sure if someone else is updating at the same time
290
        //  that we don't trounce their changes.
291
        $builder = $objectManager->createQueryBuilder($this->getObjectName());
292
        $builder->findAndUpdate();
293
        $builder->field('_id')->equals($oldJob->getId());
294
        $builder->field('priority')->lt($job->getPriority());
295
        $builder->field('priority')->set($job->getPriority());
296
        $builder->getQuery()->execute();
297
298
        $builder = $objectManager->createQueryBuilder($this->getObjectName());
299
        $builder->findAndUpdate();
300
        $builder->field('_id')->equals($oldJob->getId());
301
        $builder->field('whenAt')->gt($job->getWhenAt());
302
        $builder->field('whenAt')->set($job->getWhenAt());
303
        $builder->getQuery()->execute();
304
305
        if ($job->getWhenAt() < $oldJob->getWhenAt()) {
306
            $oldJob->setWhenAt($job->getWhenAt());
307
        }
308
        if ($job->getPriority() > $oldJob->getPriority()) {
309
            $oldJob->setPriority($job->getPriority());
310
        }
311
312
        return $oldJob;
313
    }
314
}
315