Completed
Push — master ( ed304f...e36e8e )
by Matthew
05:35
created

JobManager::restoreIdGenerator()   A

Complexity

Conditions 1
Paths 1

Size

Total Lines 4
Code Lines 1

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 2
CRAP Score 1

Importance

Changes 0
Metric Value
dl 0
loc 4
ccs 2
cts 2
cp 1
rs 10
c 0
b 0
f 0
cc 1
eloc 1
nc 1
nop 1
crap 1
1
<?php
2
3
namespace Dtc\QueueBundle\ODM;
4
5
use Doctrine\MongoDB\Query\Builder;
6
use Dtc\QueueBundle\Doctrine\BaseJobManager;
7
use Doctrine\ODM\MongoDB\DocumentManager;
8
use Dtc\QueueBundle\Document\Job;
9
use Dtc\QueueBundle\Model\BaseJob;
10
11
class JobManager extends BaseJobManager
12
{
13 3
    public function countJobsByStatus($objectName, $status, $workerName = null, $method = null)
14
    {
15
        /** @var DocumentManager $objectManager */
16 3
        $objectManager = $this->getObjectManager();
17 3
        $qb = $objectManager->createQueryBuilder($objectName);
18
        $qb
19 3
            ->find()
20 3
            ->field('status')->equals($status);
21
22 3
        $this->addWorkerNameCriterion($qb, $workerName, $method);
23 3
        $query = $qb->getQuery();
24
25 3
        return $query->count();
26
    }
27
28
    /**
29
     * @param string $objectName
30
     */
31 8
    public function stopIdGenerator($objectName)
32
    {
33
        // Not needed for ODM
34 8
    }
35
36 1
    public function restoreIdGenerator($objectName)
37
    {
38
        // Not needed for ODM
39 1
    }
40
41
    /**
42
     * @param string|null $workerName
43
     * @param string|null $method
44
     */
45 1
    public function pruneErroneousJobs($workerName = null, $method = null)
46
    {
47
        /** @var DocumentManager $objectManager */
48 1
        $objectManager = $this->getObjectManager();
49 1
        $qb = $objectManager->createQueryBuilder($this->getArchiveObjectName());
50 1
        $qb = $qb->remove();
51 1
        $qb->field('status')->equals(BaseJob::STATUS_ERROR);
52 1
        $this->addWorkerNameCriterion($qb, $workerName, $method);
53
54 1
        $query = $qb->getQuery();
55 1
        $result = $query->execute();
56 1
        if (isset($result['n'])) {
57 1
            return $result['n'];
58
        }
59
60
        return 0;
61
    }
62
63 10
    protected function addWorkerNameCriterion(Builder $builder, $workerName = null, $method = null)
64
    {
65 10
        if (null !== $workerName) {
66 4
            $builder->field('workerName')->equals($workerName);
67
        }
68
69 10
        if (null !== $method) {
70 3
            $builder->field('method')->equals($method);
71
        }
72 10
    }
73
74 1
    protected function updateExpired($workerName = null, $method = null)
75
    {
76
        /** @var DocumentManager $objectManager */
77 1
        $objectManager = $this->getObjectManager();
78 1
        $qb = $objectManager->createQueryBuilder($this->getObjectName());
79 1
        $qb = $qb->updateMany();
80 1
        $qb->field('expiresAt')->lte(new \DateTime());
81 1
        $qb->field('status')->equals(BaseJob::STATUS_NEW);
82 1
        $this->addWorkerNameCriterion($qb, $workerName, $method);
83 1
        $qb->field('status')->set(\Dtc\QueueBundle\Model\Job::STATUS_EXPIRED);
84 1
        $query = $qb->getQuery();
85 1
        $result = $query->execute();
86 1
        if (isset($result['n'])) {
87 1
            return $result['n'];
88
        }
89
90
        return 0;
91
    }
92
93
    /**
94
     * Removes archived jobs older than $olderThan.
95
     *
96
     * @param \DateTime $olderThan
97
     */
98 1
    public function pruneArchivedJobs(\DateTime $olderThan)
99
    {
100
        /** @var DocumentManager $objectManager */
101 1
        $objectManager = $this->getObjectManager();
102 1
        $qb = $objectManager->createQueryBuilder($this->getArchiveObjectName());
103
        $qb
104 1
            ->remove()
105 1
            ->field('updatedAt')->lt($olderThan);
106
107 1
        $query = $qb->getQuery();
108 1
        $result = $query->execute();
109 1
        if (isset($result['n'])) {
110 1
            return $result['n'];
111
        }
112
113
        return 0;
114
    }
115
116 1
    public function getJobCount($workerName = null, $method = null)
117
    {
118
        /** @var DocumentManager $objectManager */
119 1
        $objectManager = $this->getObjectManager();
120 1
        $qb = $objectManager->createQueryBuilder($this->getObjectName());
121
        $qb
122 1
            ->find();
123
124 1
        $this->addWorkerNameCriterion($qb, $workerName, $method);
125
126
        // Filter
127 1
        $date = new \DateTime();
128
        $qb
129 1
            ->addAnd(
130 1
                $qb->expr()->addOr($qb->expr()->field('expiresAt')->equals(null), $qb->expr()->field('expiresAt')->gt($date))
131
            )
132 1
            ->field('locked')->equals(null);
133
134 1
        $query = $qb->getQuery();
135
136 1
        return $query->count(true);
137
    }
138
139
    /**
140
     * Get Status Jobs.
141
     *
142
     * @param string $documentName
143
     */
144
    protected function getStatusByDocument($documentName)
145
    {
146
        // Run a map reduce function get worker and status break down
147
        $mapFunc = "function() {
148
            var result = {};
149
            result[this.status] = 1;
150
            var key = this.worker_name + '->' + this.method + '()';
151
            emit(key, result);
152
        }";
153
        $reduceFunc = 'function(k, vals) {
154
            var result = {};
155
            for (var index in vals) {
156
                var val =  vals[index];
157
                for (var i in val) {
158
                    if (result.hasOwnProperty(i)) {
159
                        result[i] += val[i];
160
                    }
161
                    else {
162
                        result[i] = val[i];
163
                    }
164
                }
165
            }
166
            return result;
167
        }';
168
        /** @var DocumentManager $objectManager */
169
        $objectManager = $this->getObjectManager();
170
        $qb = $objectManager->createQueryBuilder($documentName);
171
        $qb->map($mapFunc)
172
            ->reduce($reduceFunc);
173
        $query = $qb->getQuery();
174
        $results = $query->execute();
175
176
        $allStatus = array(
177
            BaseJob::STATUS_ERROR => 0,
178
            BaseJob::STATUS_NEW => 0,
179
            BaseJob::STATUS_RUNNING => 0,
180
            BaseJob::STATUS_SUCCESS => 0,
181
        );
182
183
        $status = [];
184
185
        foreach ($results as $info) {
186
            $status[$info['_id']] = $info['value'] + $allStatus;
187
        }
188
189
        return $status;
190
    }
191
192
    public function getStatus()
193
    {
194
        $result = $this->getStatusByDocument($this->getObjectName());
195
        $status2 = $this->getStatusByDocument($this->getArchiveObjectName());
196
        foreach ($status2 as $key => $value) {
197
            foreach ($value as $k => $v) {
198
                $result[$key][$k] += $v;
199
            }
200
        }
201
202
        $finalResult = [];
203
        foreach ($result as $key => $item) {
204
            ksort($item);
205
            $finalResult[$key] = $item;
206
        }
207
208
        return $finalResult;
209
    }
210
211
    /**
212
     * Get the next job to run (can be filtered by workername and method name).
213
     *
214
     * @param string $workerName
215
     * @param string $methodName
216
     * @param bool   $prioritize
217
     *
218
     * @return \Dtc\QueueBundle\Model\Job
219
     */
220 5
    public function getJob($workerName = null, $methodName = null, $prioritize = true, $runId = null)
221
    {
222
        /** @var DocumentManager $objectManager */
223 5
        $objectManager = $this->getObjectManager();
224 5
        $qb = $objectManager->createQueryBuilder($this->getObjectName());
225
        $qb
226 5
            ->findAndUpdate()
227 5
            ->returnNew();
228
229 5
        $this->addWorkerNameCriterion($qb, $workerName, $methodName);
230 5
        if ($prioritize) {
231 5
            $qb->sort('priority', 'asc');
232
        } else {
233
            $qb->sort('whenAt', 'asc');
234
        }
235
236
        // Filter
237 5
        $date = new \DateTime();
238
        $qb
239 5
            ->addAnd(
240 5
                $qb->expr()->addOr($qb->expr()->field('whenAt')->equals(null), $qb->expr()->field('whenAt')->lte($date)),
241 5
                $qb->expr()->addOr($qb->expr()->field('expiresAt')->equals(null), $qb->expr()->field('expiresAt')->gt($date))
242
            )
243 5
            ->field('status')->equals(BaseJob::STATUS_NEW)
244 5
            ->field('locked')->equals(null);
245
246
        // Update
247
        $qb
248 5
            ->field('lockedAt')->set($date) // Set started
249 5
            ->field('locked')->set(true)
250 5
            ->field('status')->set(BaseJob::STATUS_RUNNING)
251 5
            ->field('runId')->set($runId);
252
253 5
        $query = $qb->getQuery();
254
255 5
        $job = $query->execute();
256
257 5
        return $job;
258
    }
259
}
260