Completed
Push — master ( d4f4f0...01653d )
by Matthew
04:03
created

JobManager::stopIdGenerator()   A

Complexity

Conditions 1
Paths 1

Size

Total Lines 4
Code Lines 1

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 2
CRAP Score 1

Importance

Changes 0
Metric Value
dl 0
loc 4
ccs 2
cts 2
cp 1
rs 10
c 0
b 0
f 0
cc 1
eloc 1
nc 1
nop 1
crap 1
1
<?php
2
3
namespace Dtc\QueueBundle\ODM;
4
5
use Doctrine\MongoDB\Query\Builder;
6
use Dtc\QueueBundle\Doctrine\BaseJobManager;
7
use Doctrine\ODM\MongoDB\DocumentManager;
8
use Dtc\QueueBundle\Document\Job;
9
use Dtc\QueueBundle\Model\BaseJob;
10
use Dtc\QueueBundle\Model\RetryableJob;
11
12
class JobManager extends BaseJobManager
13
{
14
    use CommonTrait;
15
16 3
    public function countJobsByStatus($objectName, $status, $workerName = null, $method = null)
17
    {
18
        /** @var DocumentManager $objectManager */
19 3
        $objectManager = $this->getObjectManager();
20 3
        $qb = $objectManager->createQueryBuilder($objectName);
21
        $qb
22 3
            ->find()
23 3
            ->field('status')->equals($status);
24
25 3
        $this->addWorkerNameCriterion($qb, $workerName, $method);
26 3
        $query = $qb->getQuery();
27
28 3
        return $query->count();
29
    }
30
31
    /**
32
     * @param string|null $workerName
33
     * @param string|null $method
34 12
     */
35
    public function pruneErroneousJobs($workerName = null, $method = null)
36
    {
37 12
        /** @var DocumentManager $objectManager */
38
        $objectManager = $this->getObjectManager();
39 1
        $qb = $objectManager->createQueryBuilder($this->getArchiveObjectName());
40
        $qb = $qb->remove();
41
        $qb->field('status')->equals(BaseJob::STATUS_ERROR);
42 1
        $this->addWorkerNameCriterion($qb, $workerName, $method);
43
44
        $query = $qb->getQuery();
45
        $result = $query->execute();
46
        if (isset($result['n'])) {
47
            return $result['n'];
48 1
        }
49
50
        return 0;
51 1
    }
52 1
53 1
    /**
54 1
     * @param Builder     $builder
55 1
     * @param string|null $workerName
56
     * @param string|null $method
57 1
     */
58 1
    protected function addWorkerNameCriterion(Builder $builder, $workerName = null, $method = null)
59 1
    {
60 1
        if (null !== $workerName) {
61
            $builder->field('workerName')->equals($workerName);
62
        }
63
64
        if (null !== $method) {
65
            $builder->field('method')->equals($method);
66
        }
67
    }
68
69
    /**
70
     * @param null $workerName
71 15
     * @param null $method
72
     *
73 15
     * @return int
74 4
     */
75
    protected function updateExpired($workerName = null, $method = null)
76
    {
77 15
        /** @var DocumentManager $objectManager */
78 3
        $objectManager = $this->getObjectManager();
79
        $qb = $objectManager->createQueryBuilder($this->getObjectName());
80 15
        $qb = $qb->updateMany();
81
        $qb->field('expiresAt')->lte(new \DateTime());
82
        $qb->field('status')->equals(BaseJob::STATUS_NEW);
83
        $this->addWorkerNameCriterion($qb, $workerName, $method);
84
        $qb->field('status')->set(\Dtc\QueueBundle\Model\Job::STATUS_EXPIRED);
85
        $query = $qb->getQuery();
86
        $result = $query->execute();
87
        if (isset($result['n'])) {
88 2
            return $result['n'];
89
        }
90
91 2
        return 0;
92 2
    }
93 2
94 2
    /**
95 2
     * Removes archived jobs older than $olderThan.
96 2
     *
97 2
     * @param \DateTime $olderThan
98 2
     *                             return int
99 2
     */
100 2
    public function pruneArchivedJobs(\DateTime $olderThan)
101 2
    {
102
        /** @var DocumentManager $documentManager */
103
        $documentManager = $this->getObjectManager();
104
105
        return $this->removeOlderThan($documentManager, $this->getArchiveObjectName(), 'updatedAt', $olderThan);
106
    }
107
108
    public function getJobCount($workerName = null, $method = null)
109
    {
110
        /** @var DocumentManager $objectManager */
111
        $objectManager = $this->getObjectManager();
112
        $qb = $objectManager->createQueryBuilder($this->getObjectName());
113 1
        $qb
114
            ->find();
115
116 1
        $this->addWorkerNameCriterion($qb, $workerName, $method);
117
118 1
        // Filter
119
        $date = new \DateTime();
120
        $qb
121 2
            ->addAnd(
122
                $qb->expr()->addOr($qb->expr()->field('expiresAt')->equals(null), $qb->expr()->field('expiresAt')->gt($date))
123
            )
124 2
            ->field('locked')->equals(null);
125 2
126
        $query = $qb->getQuery();
127 2
128
        return $query->count(true);
129 2
    }
130
131
    /**
132 2
     * Get Status Jobs.
133
     *
134 2
     * @param string $documentName
135 2
     *
136
     * @return array
137 2
     */
138
    protected function getStatusByDocument($documentName)
139 2
    {
140
        // Run a map reduce function get worker and status break down
141 2
        $mapFunc = "function() {
142
            var result = {};
143
            result[this.status] = 1;
144
            var key = this.worker_name + '->' + this.method + '()';
145
            emit(key, result);
146
        }";
147
        $reduceFunc = 'function(k, vals) {
148
            var result = {};
149
            for (var index in vals) {
150
                var val =  vals[index];
151 2
                for (var i in val) {
152
                    if (result.hasOwnProperty(i)) {
153
                        result[i] += val[i];
154 2
                    }
155
                    else {
156
                        result[i] = val[i];
157
                    }
158
                }
159
            }
160 2
            return result;
161
        }';
162
        /** @var DocumentManager $objectManager */
163
        $objectManager = $this->getObjectManager();
164
        $qb = $objectManager->createQueryBuilder($documentName);
165
        $qb->map($mapFunc)
166
            ->reduce($reduceFunc);
167
        $query = $qb->getQuery();
168
        $results = $query->execute();
169
170
        $allStatus = array(
171
            BaseJob::STATUS_ERROR => 0,
172
            BaseJob::STATUS_NEW => 0,
173
            RetryableJob::STATUS_EXPIRED => 0,
174
            RetryableJob::STATUS_MAX_ERROR => 0,
175
            RetryableJob::STATUS_MAX_RETRIES => 0,
176 2
            RetryableJob::STATUS_MAX_STALLED => 0,
177 2
            BaseJob::STATUS_RUNNING => 0,
178 2
            BaseJob::STATUS_SUCCESS => 0,
179 2
        );
180 2
181 2
        $status = [];
182
183
        foreach ($results as $info) {
184 2
            $status[$info['_id']] = $info['value'] + $allStatus;
185 2
        }
186 2
187 2
        return $status;
188 2
    }
189 2
190 2
    public function getStatus()
191 2
    {
192
        $result = $this->getStatusByDocument($this->getObjectName());
193
        $status2 = $this->getStatusByDocument($this->getArchiveObjectName());
194 2
        foreach ($status2 as $key => $value) {
195
            foreach ($value as $k => $v) {
196 2
                $result[$key][$k] += $v;
197 1
            }
198
        }
199
200 2
        $finalResult = [];
201
        foreach ($result as $key => $item) {
202
            ksort($item);
203 2
            $finalResult[$key] = $item;
204
        }
205 2
206 2
        return $finalResult;
207 2
    }
208 1
209 1
    /**
210
     * Get the next job to run (can be filtered by workername and method name).
211
     *
212
     * @param string $workerName
213 2
     * @param string $methodName
214 2
     * @param bool   $prioritize
215 1
     *
216 1
     * @return \Dtc\QueueBundle\Model\Job
217
     */
218
    public function getJob($workerName = null, $methodName = null, $prioritize = true, $runId = null)
219 2
    {
220
        /** @var DocumentManager $objectManager */
221
        $objectManager = $this->getObjectManager();
222
        $qb = $objectManager->createQueryBuilder($this->getObjectName());
223
        $qb
224
            ->findAndUpdate()
225
            ->returnNew();
226
227
        $this->addWorkerNameCriterion($qb, $workerName, $methodName);
228
        if ($prioritize) {
229
            $qb->sort('priority', 'desc');
230
        } else {
231 8
            $qb->sort('whenAt', 'asc');
232
        }
233
234 8
        // Filter
235 8
        $date = new \DateTime();
236
        $qb
237 8
            ->addAnd(
238 8
                $qb->expr()->addOr($qb->expr()->field('whenAt')->equals(null), $qb->expr()->field('whenAt')->lte($date)),
239
                $qb->expr()->addOr($qb->expr()->field('expiresAt')->equals(null), $qb->expr()->field('expiresAt')->gt($date))
240 8
            )
241 8
            ->field('status')->equals(BaseJob::STATUS_NEW)
242 8
            ->field('locked')->equals(null);
243
244
        // Update
245
        $qb
246
            ->field('lockedAt')->set($date) // Set started
247
            ->field('locked')->set(true)
248 8
            ->field('status')->set(BaseJob::STATUS_RUNNING)
249
            ->field('runId')->set($runId);
250 8
251 8
        $query = $qb->getQuery();
252 8
253
        $job = $query->execute();
254 8
255 8
        return $job;
256
    }
257
}
258