Completed
Push — master ( dd6df0...ad18c7 )
by Matthew
05:58 queued 03:58
created

JobManager::getJobCount()   A

Complexity

Conditions 1
Paths 1

Size

Total Lines 22
Code Lines 13

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 11
CRAP Score 1

Importance

Changes 1
Bugs 1 Features 0
Metric Value
dl 0
loc 22
ccs 11
cts 11
cp 1
rs 9.2
c 1
b 1
f 0
cc 1
eloc 13
nc 1
nop 2
crap 1
1
<?php
2
3
namespace Dtc\QueueBundle\ODM;
4
5
use Doctrine\MongoDB\Query\Builder;
6
use Dtc\QueueBundle\Doctrine\BaseJobManager;
7
use Doctrine\ODM\MongoDB\DocumentManager;
8
use Dtc\QueueBundle\Document\Job;
9
use Dtc\QueueBundle\Model\BaseJob;
10
use Dtc\QueueBundle\Model\RetryableJob;
11
12
class JobManager extends BaseJobManager
13
{
14
    use CommonTrait;
15
16 3
    public function countJobsByStatus($objectName, $status, $workerName = null, $method = null)
17
    {
18
        /** @var DocumentManager $objectManager */
19 3
        $objectManager = $this->getObjectManager();
20 3
        $qb = $objectManager->createQueryBuilder($objectName);
21
        $qb
22 3
            ->find()
23 3
            ->field('status')->equals($status);
24
25 3
        $this->addWorkerNameCriterion($qb, $workerName, $method);
26 3
        $query = $qb->getQuery();
27
28 3
        return $query->count();
29
    }
30
31
    /**
32
     * @param string $objectName
33
     */
34 12
    public function stopIdGenerator($objectName)
35
    {
36
        // Not needed for ODM
37 12
    }
38
39 1
    public function restoreIdGenerator($objectName)
40
    {
41
        // Not needed for ODM
42 1
    }
43
44
    /**
45
     * @param string|null $workerName
46
     * @param string|null $method
47
     */
48 1
    public function pruneErroneousJobs($workerName = null, $method = null)
49
    {
50
        /** @var DocumentManager $objectManager */
51 1
        $objectManager = $this->getObjectManager();
52 1
        $qb = $objectManager->createQueryBuilder($this->getArchiveObjectName());
53 1
        $qb = $qb->remove();
54 1
        $qb->field('status')->equals(BaseJob::STATUS_ERROR);
55 1
        $this->addWorkerNameCriterion($qb, $workerName, $method);
56
57 1
        $query = $qb->getQuery();
58 1
        $result = $query->execute();
59 1
        if (isset($result['n'])) {
60 1
            return $result['n'];
61
        }
62
63
        return 0;
64
    }
65
66 14
    protected function addWorkerNameCriterion(Builder $builder, $workerName = null, $method = null)
67
    {
68 14
        if (null !== $workerName) {
69 4
            $builder->field('workerName')->equals($workerName);
70
        }
71
72 14
        if (null !== $method) {
73 3
            $builder->field('method')->equals($method);
74
        }
75 14
    }
76
77 2
    protected function updateExpired($workerName = null, $method = null)
78
    {
79
        /** @var DocumentManager $objectManager */
80 2
        $objectManager = $this->getObjectManager();
81 2
        $qb = $objectManager->createQueryBuilder($this->getObjectName());
82 2
        $qb = $qb->updateMany();
83 2
        $qb->field('expiresAt')->lte(new \DateTime());
84 2
        $qb->field('status')->equals(BaseJob::STATUS_NEW);
85 2
        $this->addWorkerNameCriterion($qb, $workerName, $method);
86 2
        $qb->field('status')->set(\Dtc\QueueBundle\Model\Job::STATUS_EXPIRED);
87 2
        $query = $qb->getQuery();
88 2
        $result = $query->execute();
89 2
        if (isset($result['n'])) {
90 2
            return $result['n'];
91
        }
92
93
        return 0;
94
    }
95
96
    /**
97
     * Removes archived jobs older than $olderThan.
98
     *
99
     * @param \DateTime $olderThan
100
     */
101 1
    public function pruneArchivedJobs(\DateTime $olderThan)
102
    {
103 1
        return $this->removeOlderThan($this->getObjectManager(), $this->getArchiveObjectName(), 'updatedAt', $olderThan);
0 ignored issues
show
Compatibility introduced by
$this->getObjectManager() of type object<Doctrine\Common\Persistence\ObjectManager> is not a sub-type of object<Doctrine\ODM\MongoDB\DocumentManager>. It seems like you assume a concrete implementation of the interface Doctrine\Common\Persistence\ObjectManager to be always present.

This check looks for parameters that are defined as one type in their type hint or doc comment but seem to be used as a narrower type, i.e an implementation of an interface or a subclass.

Consider changing the type of the parameter or doing an instanceof check before assuming your parameter is of the expected type.

Loading history...
104
    }
105
106 2
    public function getJobCount($workerName = null, $method = null)
107
    {
108
        /** @var DocumentManager $objectManager */
109 2
        $objectManager = $this->getObjectManager();
110 2
        $qb = $objectManager->createQueryBuilder($this->getObjectName());
111
        $qb
112 2
            ->find();
113
114 2
        $this->addWorkerNameCriterion($qb, $workerName, $method);
115
116
        // Filter
117 2
        $date = new \DateTime();
118
        $qb
119 2
            ->addAnd(
120 2
                $qb->expr()->addOr($qb->expr()->field('expiresAt')->equals(null), $qb->expr()->field('expiresAt')->gt($date))
121
            )
122 2
            ->field('locked')->equals(null);
123
124 2
        $query = $qb->getQuery();
125
126 2
        return $query->count(true);
127
    }
128
129
    /**
130
     * Get Status Jobs.
131
     *
132
     * @param string $documentName
133
     */
134 2
    protected function getStatusByDocument($documentName)
135
    {
136
        // Run a map reduce function get worker and status break down
137 2
        $mapFunc = "function() {
138
            var result = {};
139
            result[this.status] = 1;
140
            var key = this.worker_name + '->' + this.method + '()';
141
            emit(key, result);
142
        }";
143 2
        $reduceFunc = 'function(k, vals) {
144
            var result = {};
145
            for (var index in vals) {
146
                var val =  vals[index];
147
                for (var i in val) {
148
                    if (result.hasOwnProperty(i)) {
149
                        result[i] += val[i];
150
                    }
151
                    else {
152
                        result[i] = val[i];
153
                    }
154
                }
155
            }
156
            return result;
157
        }';
158
        /** @var DocumentManager $objectManager */
159 2
        $objectManager = $this->getObjectManager();
160 2
        $qb = $objectManager->createQueryBuilder($documentName);
161 2
        $qb->map($mapFunc)
162 2
            ->reduce($reduceFunc);
163 2
        $query = $qb->getQuery();
164 2
        $results = $query->execute();
165
166
        $allStatus = array(
167 2
            BaseJob::STATUS_ERROR => 0,
168
            BaseJob::STATUS_NEW => 0,
169
            RetryableJob::STATUS_EXPIRED => 0,
170
            RetryableJob::STATUS_MAX_ERROR => 0,
171
            RetryableJob::STATUS_MAX_RETRIES => 0,
172
            RetryableJob::STATUS_MAX_STALLED => 0,
173
            BaseJob::STATUS_RUNNING => 0,
174
            BaseJob::STATUS_SUCCESS => 0,
175
        );
176
177 2
        $status = [];
178
179 2
        foreach ($results as $info) {
180 1
            $status[$info['_id']] = $info['value'] + $allStatus;
181
        }
182
183 2
        return $status;
184
    }
185
186 2
    public function getStatus()
187
    {
188 2
        $result = $this->getStatusByDocument($this->getObjectName());
189 2
        $status2 = $this->getStatusByDocument($this->getArchiveObjectName());
190 2
        foreach ($status2 as $key => $value) {
191 1
            foreach ($value as $k => $v) {
192 1
                $result[$key][$k] += $v;
193
            }
194
        }
195
196 2
        $finalResult = [];
197 2
        foreach ($result as $key => $item) {
198 1
            ksort($item);
199 1
            $finalResult[$key] = $item;
200
        }
201
202 2
        return $finalResult;
203
    }
204
205
    /**
206
     * Get the next job to run (can be filtered by workername and method name).
207
     *
208
     * @param string $workerName
209
     * @param string $methodName
210
     * @param bool   $prioritize
211
     *
212
     * @return \Dtc\QueueBundle\Model\Job
213
     */
214 7
    public function getJob($workerName = null, $methodName = null, $prioritize = true, $runId = null)
215
    {
216
        /** @var DocumentManager $objectManager */
217 7
        $objectManager = $this->getObjectManager();
218 7
        $qb = $objectManager->createQueryBuilder($this->getObjectName());
219
        $qb
220 7
            ->findAndUpdate()
221 7
            ->returnNew();
222
223 7
        $this->addWorkerNameCriterion($qb, $workerName, $methodName);
224 7
        if ($prioritize) {
225 7
            $qb->sort('priority', 'asc');
226
        } else {
227
            $qb->sort('whenAt', 'asc');
228
        }
229
230
        // Filter
231 7
        $date = new \DateTime();
232
        $qb
233 7
            ->addAnd(
234 7
                $qb->expr()->addOr($qb->expr()->field('whenAt')->equals(null), $qb->expr()->field('whenAt')->lte($date)),
235 7
                $qb->expr()->addOr($qb->expr()->field('expiresAt')->equals(null), $qb->expr()->field('expiresAt')->gt($date))
236
            )
237 7
            ->field('status')->equals(BaseJob::STATUS_NEW)
238 7
            ->field('locked')->equals(null);
239
240
        // Update
241
        $qb
242 7
            ->field('lockedAt')->set($date) // Set started
243 7
            ->field('locked')->set(true)
244 7
            ->field('status')->set(BaseJob::STATUS_RUNNING)
245 7
            ->field('runId')->set($runId);
246
247 7
        $query = $qb->getQuery();
248
249 7
        $job = $query->execute();
250
251 7
        return $job;
252
    }
253
}
254