Test Setup Failed
Push — master ( a87626...531aa8 )
by Matthew
02:42
created

JobManager   A

Complexity

Total Complexity 25

Size/Duplication

Total Lines 276
Duplicated Lines 0 %

Coupling/Cohesion

Components 1
Dependencies 9

Importance

Changes 0
Metric Value
wmc 25
lcom 1
cbo 9
dl 0
loc 276
rs 10
c 0
b 0
f 0

10 Methods

Rating   Name   Duplication   Size   Complexity  
A pruneArchivedJobs() 0 17 2
B getJobCount() 0 28 3
A getStatusByDocument() 0 47 2
A getStatus() 0 18 4
A countJobsByStatus() 0 21 3
A stopIdGenerator() 0 8 1
A pruneErroneousJobs() 0 7 1
B pruneJobs() 0 24 4
A pruneExpiredJobs() 0 7 1
B getJob() 0 46 4
1
<?php
2
3
namespace Dtc\QueueBundle\ODM;
4
5
use Doctrine\MongoDB\Query\Builder;
6
use Doctrine\ODM\MongoDB\Mapping\ClassMetadata;
7
use Dtc\QueueBundle\Doctrine\BaseJobManager;
8
use Doctrine\ODM\MongoDB\DocumentManager;
9
use Dtc\QueueBundle\Document\Job;
10
11
class JobManager extends BaseJobManager
12
{
13
    public function countJobsByStatus($objectName, $status, $workerName = null, $method = null)
14
    {
15
        /** @var DocumentManager $objectManager */
16
        $objectManager = $this->getObjectManager();
17
        $qb = $objectManager->createQueryBuilder($objectName);
18
        $qb
19
            ->find()
20
            ->field('status')->equals($status);
21
22
        if (null !== $workerName) {
23
            $qb->field('workerName')->equals($workerName);
24
        }
25
26
        if (null !== $method) {
27
            $qb->field('method')->equals($method);
28
        }
29
30
        $query = $qb->getQuery();
31
32
        return $query->count();
33
    }
34
35
    /**
36
     * @param string $objectName
37
     */
38
    public function stopIdGenerator($objectName)
39
    {
40
        $objectManager = $this->getObjectManager();
41
        $repository = $objectManager->getRepository($objectName);
42
        /** @var ClassMetadata $metadata */
43
        $metadata = $this->getObjectManager()->getClassMetadata($repository->getClassName());
44
        $metadata->setIdGeneratorType(ClassMetadata::GENERATOR_TYPE_NONE);
45
    }
46
47
    /**
48
     * @param string $workerName
49
     * @param string $method
50
     */
51
    public function pruneErroneousJobs($workerName = null, $method = null)
52
    {
53
        return $this->pruneJobs($workerName, $method, $this->getArchiveObjectName(), function ($qb) {
0 ignored issues
show
Bug introduced by
It seems like $workerName defined by parameter $workerName on line 51 can also be of type string; however, Dtc\QueueBundle\ODM\JobManager::pruneJobs() does only seem to accept null, maybe add an additional type check?

This check looks at variables that have been passed in as parameters and are passed out again to other methods.

If the outgoing method call has stricter type requirements than the method itself, an issue is raised.

An additional type check may prevent trouble.

Loading history...
Bug introduced by
It seems like $method defined by parameter $method on line 51 can also be of type string; however, Dtc\QueueBundle\ODM\JobManager::pruneJobs() does only seem to accept null, maybe add an additional type check?

This check looks at variables that have been passed in as parameters and are passed out again to other methods.

If the outgoing method call has stricter type requirements than the method itself, an issue is raised.

An additional type check may prevent trouble.

Loading history...
54
            /* @var Builder $qb */
55
            return $qb->field('status')->equals(Job::STATUS_ERROR);
56
        });
57
    }
58
59
    /**
60
     * Prunes jobs according to a condition function.
61
     *
62
     * @param null $workerName
63
     * @param null $method
64
     * @param $conditionFunc
65
     *
66
     * @return int
67
     */
68
    protected function pruneJobs($workerName = null, $method = null, $objectName, $conditionFunc)
69
    {
70
        /** @var DocumentManager $objectManager */
71
        $objectManager = $this->getObjectManager();
72
        $qb = $objectManager->createQueryBuilder($objectName);
73
        $qb = $qb->remove();
74
        $qb = $conditionFunc($qb);
75
76
        if (null !== $workerName) {
77
            $qb->field('workerName')->equals($workerName);
78
        }
79
80
        if (null !== $method) {
81
            $qb->field('method')->equals($method);
82
        }
83
84
        $query = $qb->getQuery();
85
        $result = $query->execute();
86
        if (isset($result['n'])) {
87
            return $result['n'];
88
        }
89
90
        return 0;
91
    }
92
93
    /**
94
     * Prunes expired jobs.
95
     *
96
     * @param string $workerName
97
     * @param string $method
98
     */
99
    public function pruneExpiredJobs($workerName = null, $method = null)
100
    {
101
        return $this->pruneJobs($workerName, $method, $this->getObjectName(), function ($qb) {
0 ignored issues
show
Bug introduced by
It seems like $workerName defined by parameter $workerName on line 99 can also be of type string; however, Dtc\QueueBundle\ODM\JobManager::pruneJobs() does only seem to accept null, maybe add an additional type check?

This check looks at variables that have been passed in as parameters and are passed out again to other methods.

If the outgoing method call has stricter type requirements than the method itself, an issue is raised.

An additional type check may prevent trouble.

Loading history...
Bug introduced by
It seems like $method defined by parameter $method on line 99 can also be of type string; however, Dtc\QueueBundle\ODM\JobManager::pruneJobs() does only seem to accept null, maybe add an additional type check?

This check looks at variables that have been passed in as parameters and are passed out again to other methods.

If the outgoing method call has stricter type requirements than the method itself, an issue is raised.

An additional type check may prevent trouble.

Loading history...
102
            /* @var Builder $qb */
103
            return $qb->field('expiresAt')->lte(new \DateTime());
104
        });
105
    }
106
107
    /**
108
     * Removes archived jobs older than $olderThan.
109
     *
110
     * @param \DateTime $olderThan
111
     */
112
    public function pruneArchivedJobs(\DateTime $olderThan)
113
    {
114
        /** @var DocumentManager $objectManager */
115
        $objectManager = $this->getObjectManager();
116
        $qb = $objectManager->createQueryBuilder($this->getArchiveObjectName());
117
        $qb
118
            ->remove()
119
            ->field('updatedAt')->lt($olderThan);
120
121
        $query = $qb->getQuery();
122
        $result = $query->execute();
123
        if (isset($result['n'])) {
124
            return $result['n'];
125
        }
126
127
        return 0;
128
    }
129
130
    public function getJobCount($workerName = null, $method = null)
131
    {
132
        /** @var DocumentManager $objectManager */
133
        $objectManager = $this->getObjectManager();
134
        $qb = $objectManager->createQueryBuilder($this->getObjectName());
135
        $qb
136
            ->find();
137
138
        if (null !== $workerName) {
139
            $qb->field('workerName')->equals($workerName);
140
        }
141
142
        if (null !== $method) {
143
            $qb->field('method')->equals($method);
144
        }
145
146
        // Filter
147
        $date = new \DateTime();
148
        $qb
149
            ->addAnd(
150
                $qb->expr()->addOr($qb->expr()->field('expiresAt')->equals(null), $qb->expr()->field('expiresAt')->gt($date))
151
            )
152
            ->field('locked')->equals(null);
153
154
        $query = $qb->getQuery();
155
156
        return $query->count(true);
157
    }
158
159
    /**
160
     * Get Status Jobs.
161
     *
162
     * @param string $documentName
163
     */
164
    protected function getStatusByDocument($documentName)
165
    {
166
        // Run a map reduce function get worker and status break down
167
        $mapFunc = "function() {
168
            var result = {};
169
            result[this.status] = 1;
170
            var key = this.worker_name + '->' + this.method + '()';
171
            emit(key, result);
172
        }";
173
        $reduceFunc = 'function(k, vals) {
174
            var result = {};
175
            for (var index in vals) {
176
                var val =  vals[index];
177
                for (var i in val) {
178
                    if (result.hasOwnProperty(i)) {
179
                        result[i] += val[i];
180
                    }
181
                    else {
182
                        result[i] = val[i];
183
                    }
184
                }
185
            }
186
            return result;
187
        }';
188
        /** @var DocumentManager $objectManager */
189
        $objectManager = $this->getObjectManager();
190
        $qb = $objectManager->createQueryBuilder($documentName);
191
        $qb->map($mapFunc)
192
            ->reduce($reduceFunc);
193
        $query = $qb->getQuery();
194
        $results = $query->execute();
195
196
        $allStatus = array(
197
            Job::STATUS_ERROR => 0,
198
            Job::STATUS_NEW => 0,
199
            Job::STATUS_RUNNING => 0,
200
            Job::STATUS_SUCCESS => 0,
201
        );
202
203
        $status = [];
204
205
        foreach ($results as $info) {
206
            $status[$info['_id']] = $info['value'] + $allStatus;
207
        }
208
209
        return $status;
210
    }
211
212
    public function getStatus()
213
    {
214
        $result = $this->getStatusByDocument($this->getObjectName());
215
        $status2 = $this->getStatusByDocument($this->getArchiveObjectName());
216
        foreach ($status2 as $key => $value) {
217
            foreach ($value as $k => $v) {
218
                $result[$key][$k] += $v;
219
            }
220
        }
221
222
        $finalResult = [];
223
        foreach ($result as $key => $item) {
224
            ksort($item);
225
            $finalResult[$key] = $item;
226
        }
227
228
        return $finalResult;
229
    }
230
231
    /**
232
     * Get the next job to run (can be filtered by workername and method name).
233
     *
234
     * @param string $workerName
235
     * @param string $methodName
236
     * @param bool   $prioritize
237
     *
238
     * @return \Dtc\QueueBundle\Model\Job
239
     */
240
    public function getJob($workerName = null, $methodName = null, $prioritize = true, $runId = null)
241
    {
242
        /** @var DocumentManager $objectManager */
243
        $objectManager = $this->getObjectManager();
244
        $qb = $objectManager->createQueryBuilder($this->getObjectName());
245
        $qb
246
            ->findAndUpdate()
247
            ->returnNew();
248
249
        if (null !== $workerName) {
250
            $qb->field('workerName')->equals($workerName);
251
        }
252
253
        if (null !== $methodName) {
254
            $qb->field('method')->equals($methodName);
255
        }
256
257
        if ($prioritize) {
258
            $qb->sort('priority', 'asc');
259
        } else {
260
            $qb->sort('whenAt', 'asc');
261
        }
262
263
        // Filter
264
        $date = new \DateTime();
265
        $qb
266
            ->addAnd(
267
                $qb->expr()->addOr($qb->expr()->field('whenAt')->equals(null), $qb->expr()->field('whenAt')->lte($date)),
268
                $qb->expr()->addOr($qb->expr()->field('expiresAt')->equals(null), $qb->expr()->field('expiresAt')->gt($date))
269
            )
270
            ->field('status')->equals(Job::STATUS_NEW)
271
            ->field('locked')->equals(null);
272
273
        // Update
274
        $qb
275
            ->field('lockedAt')->set($date) // Set started
276
            ->field('locked')->set(true)
277
            ->field('status')->set(Job::STATUS_RUNNING)
278
            ->field('runId')->set($runId);
279
280
        $query = $qb->getQuery();
281
282
        $job = $query->execute();
283
284
        return $job;
285
    }
286
}
287