Completed
Push — master ( 239355...d39289 )
by Matthew
05:26
created

JobManager::stopIdGenerator()   A

Complexity

Conditions 1
Paths 1

Size

Total Lines 4
Code Lines 1

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 2
CRAP Score 1

Importance

Changes 0
Metric Value
dl 0
loc 4
ccs 2
cts 2
cp 1
rs 10
c 0
b 0
f 0
cc 1
eloc 1
nc 1
nop 1
crap 1
1
<?php
2
3
namespace Dtc\QueueBundle\ODM;
4
5
use Doctrine\MongoDB\Query\Builder;
6
use Dtc\QueueBundle\Doctrine\BaseJobManager;
7
use Doctrine\ODM\MongoDB\DocumentManager;
8
use Dtc\QueueBundle\Document\Job;
9
use Dtc\QueueBundle\Model\BaseJob;
10
use Dtc\QueueBundle\Model\RetryableJob;
11
12
class JobManager extends BaseJobManager
13
{
14 3
    public function countJobsByStatus($objectName, $status, $workerName = null, $method = null)
15
    {
16
        /** @var DocumentManager $objectManager */
17 3
        $objectManager = $this->getObjectManager();
18 3
        $qb = $objectManager->createQueryBuilder($objectName);
19
        $qb
20 3
            ->find()
21 3
            ->field('status')->equals($status);
22
23 3
        $this->addWorkerNameCriterion($qb, $workerName, $method);
24 3
        $query = $qb->getQuery();
25
26 3
        return $query->count();
27
    }
28
29
    /**
30
     * @param string $objectName
31
     */
32 9
    public function stopIdGenerator($objectName)
33
    {
34
        // Not needed for ODM
35 9
    }
36
37 1
    public function restoreIdGenerator($objectName)
38
    {
39
        // Not needed for ODM
40 1
    }
41
42
    /**
43
     * @param string|null $workerName
44
     * @param string|null $method
45
     */
46 1
    public function pruneErroneousJobs($workerName = null, $method = null)
47
    {
48
        /** @var DocumentManager $objectManager */
49 1
        $objectManager = $this->getObjectManager();
50 1
        $qb = $objectManager->createQueryBuilder($this->getArchiveObjectName());
51 1
        $qb = $qb->remove();
52 1
        $qb->field('status')->equals(BaseJob::STATUS_ERROR);
53 1
        $this->addWorkerNameCriterion($qb, $workerName, $method);
54
55 1
        $query = $qb->getQuery();
56 1
        $result = $query->execute();
57 1
        if (isset($result['n'])) {
58 1
            return $result['n'];
59
        }
60
61
        return 0;
62
    }
63
64 10
    protected function addWorkerNameCriterion(Builder $builder, $workerName = null, $method = null)
65
    {
66 10
        if (null !== $workerName) {
67 4
            $builder->field('workerName')->equals($workerName);
68
        }
69
70 10
        if (null !== $method) {
71 3
            $builder->field('method')->equals($method);
72
        }
73 10
    }
74
75 1
    protected function updateExpired($workerName = null, $method = null)
76
    {
77
        /** @var DocumentManager $objectManager */
78 1
        $objectManager = $this->getObjectManager();
79 1
        $qb = $objectManager->createQueryBuilder($this->getObjectName());
80 1
        $qb = $qb->updateMany();
81 1
        $qb->field('expiresAt')->lte(new \DateTime());
82 1
        $qb->field('status')->equals(BaseJob::STATUS_NEW);
83 1
        $this->addWorkerNameCriterion($qb, $workerName, $method);
84 1
        $qb->field('status')->set(\Dtc\QueueBundle\Model\Job::STATUS_EXPIRED);
85 1
        $query = $qb->getQuery();
86 1
        $result = $query->execute();
87 1
        if (isset($result['n'])) {
88 1
            return $result['n'];
89
        }
90
91
        return 0;
92
    }
93
94
    /**
95
     * Removes archived jobs older than $olderThan.
96
     *
97
     * @param \DateTime $olderThan
98
     */
99 1
    public function pruneArchivedJobs(\DateTime $olderThan)
100
    {
101
        /** @var DocumentManager $objectManager */
102 1
        $objectManager = $this->getObjectManager();
103 1
        $qb = $objectManager->createQueryBuilder($this->getArchiveObjectName());
104
        $qb
105 1
            ->remove()
106 1
            ->field('updatedAt')->lt($olderThan);
107
108 1
        $query = $qb->getQuery();
109 1
        $result = $query->execute();
110 1
        if (isset($result['n'])) {
111 1
            return $result['n'];
112
        }
113
114
        return 0;
115
    }
116
117 1
    public function getJobCount($workerName = null, $method = null)
118
    {
119
        /** @var DocumentManager $objectManager */
120 1
        $objectManager = $this->getObjectManager();
121 1
        $qb = $objectManager->createQueryBuilder($this->getObjectName());
122
        $qb
123 1
            ->find();
124
125 1
        $this->addWorkerNameCriterion($qb, $workerName, $method);
126
127
        // Filter
128 1
        $date = new \DateTime();
129
        $qb
130 1
            ->addAnd(
131 1
                $qb->expr()->addOr($qb->expr()->field('expiresAt')->equals(null), $qb->expr()->field('expiresAt')->gt($date))
132
            )
133 1
            ->field('locked')->equals(null);
134
135 1
        $query = $qb->getQuery();
136
137 1
        return $query->count(true);
138
    }
139
140
    /**
141
     * Get Status Jobs.
142
     *
143
     * @param string $documentName
144
     */
145 1
    protected function getStatusByDocument($documentName)
146
    {
147
        // Run a map reduce function get worker and status break down
148 1
        $mapFunc = "function() {
149
            var result = {};
150
            result[this.status] = 1;
151
            var key = this.worker_name + '->' + this.method + '()';
152
            emit(key, result);
153
        }";
154 1
        $reduceFunc = 'function(k, vals) {
155
            var result = {};
156
            for (var index in vals) {
157
                var val =  vals[index];
158
                for (var i in val) {
159
                    if (result.hasOwnProperty(i)) {
160
                        result[i] += val[i];
161
                    }
162
                    else {
163
                        result[i] = val[i];
164
                    }
165
                }
166
            }
167
            return result;
168
        }';
169
        /** @var DocumentManager $objectManager */
170 1
        $objectManager = $this->getObjectManager();
171 1
        $qb = $objectManager->createQueryBuilder($documentName);
172 1
        $qb->map($mapFunc)
173 1
            ->reduce($reduceFunc);
174 1
        $query = $qb->getQuery();
175 1
        $results = $query->execute();
176
177
        $allStatus = array(
178 1
            BaseJob::STATUS_ERROR => 0,
179
            BaseJob::STATUS_NEW => 0,
180
            RetryableJob::STATUS_EXPIRED => 0,
181
            RetryableJob::STATUS_MAX_ERROR => 0,
182
            RetryableJob::STATUS_MAX_RETRIES => 0,
183
            RetryableJob::STATUS_MAX_STALLED => 0,
184
            BaseJob::STATUS_RUNNING => 0,
185
            BaseJob::STATUS_SUCCESS => 0,
186
        );
187
188 1
        $status = [];
189
190 1
        foreach ($results as $info) {
191 1
            $status[$info['_id']] = $info['value'] + $allStatus;
192
        }
193
194 1
        return $status;
195
    }
196
197 1
    public function getStatus()
198
    {
199 1
        $result = $this->getStatusByDocument($this->getObjectName());
200 1
        $status2 = $this->getStatusByDocument($this->getArchiveObjectName());
201 1
        foreach ($status2 as $key => $value) {
202 1
            foreach ($value as $k => $v) {
203 1
                $result[$key][$k] += $v;
204
            }
205
        }
206
207 1
        $finalResult = [];
208 1
        foreach ($result as $key => $item) {
209 1
            ksort($item);
210 1
            $finalResult[$key] = $item;
211
        }
212
213 1
        return $finalResult;
214
    }
215
216
    /**
217
     * Get the next job to run (can be filtered by workername and method name).
218
     *
219
     * @param string $workerName
220
     * @param string $methodName
221
     * @param bool   $prioritize
222
     *
223
     * @return \Dtc\QueueBundle\Model\Job
224
     */
225 5
    public function getJob($workerName = null, $methodName = null, $prioritize = true, $runId = null)
226
    {
227
        /** @var DocumentManager $objectManager */
228 5
        $objectManager = $this->getObjectManager();
229 5
        $qb = $objectManager->createQueryBuilder($this->getObjectName());
230
        $qb
231 5
            ->findAndUpdate()
232 5
            ->returnNew();
233
234 5
        $this->addWorkerNameCriterion($qb, $workerName, $methodName);
235 5
        if ($prioritize) {
236 5
            $qb->sort('priority', 'asc');
237
        } else {
238
            $qb->sort('whenAt', 'asc');
239
        }
240
241
        // Filter
242 5
        $date = new \DateTime();
243
        $qb
244 5
            ->addAnd(
245 5
                $qb->expr()->addOr($qb->expr()->field('whenAt')->equals(null), $qb->expr()->field('whenAt')->lte($date)),
246 5
                $qb->expr()->addOr($qb->expr()->field('expiresAt')->equals(null), $qb->expr()->field('expiresAt')->gt($date))
247
            )
248 5
            ->field('status')->equals(BaseJob::STATUS_NEW)
249 5
            ->field('locked')->equals(null);
250
251
        // Update
252
        $qb
253 5
            ->field('lockedAt')->set($date) // Set started
254 5
            ->field('locked')->set(true)
255 5
            ->field('status')->set(BaseJob::STATUS_RUNNING)
256 5
            ->field('runId')->set($runId);
257
258 5
        $query = $qb->getQuery();
259
260 5
        $job = $query->execute();
261
262 5
        return $job;
263
    }
264
}
265