Completed
Push — master ( 2f9634...58e792 )
by Matthew
14:33 queued 03:52
created

JobManager::stopIdGenerator()   A

Complexity

Conditions 1
Paths 1

Size

Total Lines 4
Code Lines 1

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 2
CRAP Score 1

Importance

Changes 0
Metric Value
dl 0
loc 4
ccs 2
cts 2
cp 1
rs 10
c 0
b 0
f 0
cc 1
eloc 1
nc 1
nop 1
crap 1
1
<?php
2
3
namespace Dtc\QueueBundle\ODM;
4
5
use Doctrine\MongoDB\Query\Builder;
6
use Dtc\QueueBundle\Doctrine\BaseJobManager;
7
use Doctrine\ODM\MongoDB\DocumentManager;
8
use Dtc\QueueBundle\Document\Job;
9
use Dtc\QueueBundle\Model\BaseJob;
10
11
class JobManager extends BaseJobManager
12
{
13 3 View Code Duplication
    public function countJobsByStatus($objectName, $status, $workerName = null, $method = null)
0 ignored issues
show
Duplication introduced by
This method seems to be duplicated in your project.

Duplicated code is one of the most pungent code smells. If you need to duplicate the same code in three or more different places, we strongly encourage you to look into extracting the code into a single class or operation.

You can also find more detailed suggestions in the “Code” section of your repository.

Loading history...
14
    {
15
        /** @var DocumentManager $objectManager */
16 3
        $objectManager = $this->getObjectManager();
17 3
        $qb = $objectManager->createQueryBuilder($objectName);
18
        $qb
19 3
            ->find()
20 3
            ->field('status')->equals($status);
21
22 3
        $this->addWorkerNameCriterion($qb, $workerName, $method);
23 3
        $query = $qb->getQuery();
24
25 3
        return $query->count();
26
    }
27
28
    /**
29
     * @param string $objectName
30
     */
31 8
    public function stopIdGenerator($objectName)
32
    {
33
        // Not needed for ODM
34 8
    }
35
36 1
    public function restoreIdGenerator($objectName)
37
    {
38
        // Not needed for ODM
39 1
    }
40
41
    /**
42
     * @param string|null $workerName
43
     * @param string|null $method
44
     */
45
    public function pruneErroneousJobs($workerName = null, $method = null)
46
    {
47 1
        return $this->pruneJobs($workerName, $method, $this->getArchiveObjectName(), function ($qb) {
48
            /* @var Builder $qb */
49 1
            return $qb->field('status')->equals(BaseJob::STATUS_ERROR);
50 1
        });
51
    }
52
53
    /**
54
     * Prunes jobs according to a condition function.
55
     *
56
     * @param string|null $workerName
57
     * @param string|null $method
58
     * @param $conditionFunc
59
     *
60
     * @return int
61
     */
62 1
    protected function pruneJobs($workerName = null, $method = null, $objectName, $conditionFunc)
63
    {
64
        /** @var DocumentManager $objectManager */
65 1
        $objectManager = $this->getObjectManager();
66 1
        $qb = $objectManager->createQueryBuilder($objectName);
67 1
        $qb = $qb->remove();
68 1
        $qb = $conditionFunc($qb);
69 1
        $this->addWorkerNameCriterion($qb, $workerName, $method);
70
71 1
        $query = $qb->getQuery();
72 1
        $result = $query->execute();
73 1
        if (isset($result['n'])) {
74 1
            return $result['n'];
75
        }
76
77
        return 0;
78
    }
79
80 10
    protected function addWorkerNameCriterion(Builder $builder, $workerName = null, $method = null)
81
    {
82 10
        if (null !== $workerName) {
83 4
            $builder->field('workerName')->equals($workerName);
84 4
        }
85
86 10
        if (null !== $method) {
87 3
            $builder->field('method')->equals($method);
88 3
        }
89 10
    }
90
91 1
    protected function updateExpired($workerName = null, $method = null)
92
    {
93
        /** @var DocumentManager $objectManager */
94 1
        $objectManager = $this->getObjectManager();
95 1
        $qb = $objectManager->createQueryBuilder($this->getObjectName());
96 1
        $qb = $qb->update()->updateMany();
0 ignored issues
show
Deprecated Code introduced by
The method Doctrine\ODM\MongoDB\Query\Builder::update() has been deprecated with message: Deprecated in version 1.2 - use updateOne or updateMany instead

This method has been deprecated. The supplier of the class has supplied an explanatory message.

The explanatory message should give you some clue as to whether and when the method will be removed from the class and what other method or class to use instead.

Loading history...
97 1
        $qb->field('expiresAt')->lte(new \DateTime());
98 1
        $qb->field('status')->equals(BaseJob::STATUS_NEW);
99 1
        $this->addWorkerNameCriterion($qb, $workerName, $method);
100 1
        $qb->field('status')->set(\Dtc\QueueBundle\Model\Job::STATUS_EXPIRED);
101 1
        $query = $qb->getQuery();
102 1
        $result = $query->execute();
103 1
        if (isset($result['n'])) {
104 1
            return $result['n'];
105
        }
106
107
        return 0;
108
    }
109
110
    /**
111
     * Removes archived jobs older than $olderThan.
112
     *
113
     * @param \DateTime $olderThan
114
     */
115 1
    public function pruneArchivedJobs(\DateTime $olderThan)
116
    {
117
        /** @var DocumentManager $objectManager */
118 1
        $objectManager = $this->getObjectManager();
119 1
        $qb = $objectManager->createQueryBuilder($this->getArchiveObjectName());
120
        $qb
121 1
            ->remove()
122 1
            ->field('updatedAt')->lt($olderThan);
123
124 1
        $query = $qb->getQuery();
125 1
        $result = $query->execute();
126 1
        if (isset($result['n'])) {
127 1
            return $result['n'];
128
        }
129
130
        return 0;
131
    }
132
133 1
    public function getJobCount($workerName = null, $method = null)
134
    {
135
        /** @var DocumentManager $objectManager */
136 1
        $objectManager = $this->getObjectManager();
137 1
        $qb = $objectManager->createQueryBuilder($this->getObjectName());
138
        $qb
139 1
            ->find();
140
141 1
        $this->addWorkerNameCriterion($qb, $workerName, $method);
142
143
        // Filter
144 1
        $date = new \DateTime();
145
        $qb
146 1
            ->addAnd(
147 1
                $qb->expr()->addOr($qb->expr()->field('expiresAt')->equals(null), $qb->expr()->field('expiresAt')->gt($date))
148 1
            )
149 1
            ->field('locked')->equals(null);
150
151 1
        $query = $qb->getQuery();
152
153 1
        return $query->count(true);
154
    }
155
156
    /**
157
     * Get Status Jobs.
158
     *
159
     * @param string $documentName
160
     */
161
    protected function getStatusByDocument($documentName)
162
    {
163
        // Run a map reduce function get worker and status break down
164
        $mapFunc = "function() {
165
            var result = {};
166
            result[this.status] = 1;
167
            var key = this.worker_name + '->' + this.method + '()';
168
            emit(key, result);
169
        }";
170
        $reduceFunc = 'function(k, vals) {
171
            var result = {};
172
            for (var index in vals) {
173
                var val =  vals[index];
174
                for (var i in val) {
175
                    if (result.hasOwnProperty(i)) {
176
                        result[i] += val[i];
177
                    }
178
                    else {
179
                        result[i] = val[i];
180
                    }
181
                }
182
            }
183
            return result;
184
        }';
185
        /** @var DocumentManager $objectManager */
186
        $objectManager = $this->getObjectManager();
187
        $qb = $objectManager->createQueryBuilder($documentName);
188
        $qb->map($mapFunc)
189
            ->reduce($reduceFunc);
190
        $query = $qb->getQuery();
191
        $results = $query->execute();
192
193
        $allStatus = array(
194
            BaseJob::STATUS_ERROR => 0,
195
            BaseJob::STATUS_NEW => 0,
196
            BaseJob::STATUS_RUNNING => 0,
197
            BaseJob::STATUS_SUCCESS => 0,
198
        );
199
200
        $status = [];
201
202
        foreach ($results as $info) {
203
            $status[$info['_id']] = $info['value'] + $allStatus;
204
        }
205
206
        return $status;
207
    }
208
209
    public function getStatus()
210
    {
211
        $result = $this->getStatusByDocument($this->getObjectName());
212
        $status2 = $this->getStatusByDocument($this->getArchiveObjectName());
213
        foreach ($status2 as $key => $value) {
214
            foreach ($value as $k => $v) {
215
                $result[$key][$k] += $v;
216
            }
217
        }
218
219
        $finalResult = [];
220
        foreach ($result as $key => $item) {
221
            ksort($item);
222
            $finalResult[$key] = $item;
223
        }
224
225
        return $finalResult;
226
    }
227
228
    /**
229
     * Get the next job to run (can be filtered by workername and method name).
230
     *
231
     * @param string $workerName
232
     * @param string $methodName
233
     * @param bool   $prioritize
234
     *
235
     * @return \Dtc\QueueBundle\Model\Job
236
     */
237 5
    public function getJob($workerName = null, $methodName = null, $prioritize = true, $runId = null)
238
    {
239
        /** @var DocumentManager $objectManager */
240 5
        $objectManager = $this->getObjectManager();
241 5
        $qb = $objectManager->createQueryBuilder($this->getObjectName());
242
        $qb
243 5
            ->findAndUpdate()
244 5
            ->returnNew();
245
246 5
        $this->addWorkerNameCriterion($qb, $workerName, $methodName);
247 5
        if ($prioritize) {
248 5
            $qb->sort('priority', 'asc');
249 5
        } else {
250
            $qb->sort('whenAt', 'asc');
251
        }
252
253
        // Filter
254 5
        $date = new \DateTime();
255
        $qb
256 5
            ->addAnd(
257 5
                $qb->expr()->addOr($qb->expr()->field('whenAt')->equals(null), $qb->expr()->field('whenAt')->lte($date)),
258 5
                $qb->expr()->addOr($qb->expr()->field('expiresAt')->equals(null), $qb->expr()->field('expiresAt')->gt($date))
259 5
            )
260 5
            ->field('status')->equals(BaseJob::STATUS_NEW)
261 5
            ->field('locked')->equals(null);
262
263
        // Update
264
        $qb
265 5
            ->field('lockedAt')->set($date) // Set started
266 5
            ->field('locked')->set(true)
267 5
            ->field('status')->set(BaseJob::STATUS_RUNNING)
268 5
            ->field('runId')->set($runId);
269
270 5
        $query = $qb->getQuery();
271
272 5
        $job = $query->execute();
273
274 5
        return $job;
275
    }
276
}
277