Test Setup Failed
Push — master ( 298fd3...56daff )
by Matthew
17:14
created

BaseJobManager   B

Complexity

Total Complexity 39

Size/Duplication

Total Lines 278
Duplicated Lines 0 %

Coupling/Cohesion

Components 1
Dependencies 6

Importance

Changes 0
Metric Value
wmc 39
lcom 1
cbo 6
dl 0
loc 278
rs 8.2857
c 0
b 0
f 0

17 Methods

Rating   Name   Duplication   Size   Complexity  
A __construct() 0 12 1
A getObjectManager() 0 4 1
A getObjectName() 0 4 1
A getArchiveObjectName() 0 4 1
A getRunClass() 0 4 1
A getRunArchiveClass() 0 4 1
A getRepository() 0 4 1
countJobsByStatus() 0 1 ?
B resetErroneousJobs() 0 22 4
C getStalledJobs() 0 49 12
B resetStalledJobs() 0 22 4
A pruneStalledJobs() 0 17 4
A deleteJob() 0 6 1
A saveHistory() 0 4 1
B save() 0 34 3
stopIdGenerator() 0 1 ?
B resetJobsByCriterion() 0 38 3
1
<?php
2
3
namespace Dtc\QueueBundle\Doctrine;
4
5
use Doctrine\Common\Persistence\ObjectManager;
6
use Doctrine\Common\Persistence\ObjectRepository;
7
use Dtc\QueueBundle\Model\AbstractJobManager;
8
use Dtc\QueueBundle\Model\Job;
9
use Dtc\QueueBundle\Model\Run;
10
use Dtc\QueueBundle\Util\Util;
11
12
abstract class BaseJobManager extends AbstractJobManager
13
{
14
    /** Number of jobs to prune / reset / gather at a time */
15
    const FETCH_COUNT = 100;
16
17
    /** Number of seconds before a job is considered stalled if the runner is no longer active */
18
    const STALLED_SECONDS = 1800;
19
    protected $objectManager;
20
    protected $objectName;
21
    protected $archiveObjectName;
22
    protected $runClass;
23
    protected $runArchiveClass;
24
25
    public function __construct(ObjectManager $objectManager,
26
        $objectName,
27
        $archiveObjectName,
28
        $runClass,
29
        $runArchiveClass)
30
    {
31
        $this->objectManager = $objectManager;
32
        $this->objectName = $objectName;
33
        $this->archiveObjectName = $archiveObjectName;
34
        $this->runClass = $runClass;
35
        $this->runArchiveClass = $runArchiveClass;
36
    }
37
38
    /**
39
     * @return ObjectManager
40
     */
41
    public function getObjectManager()
42
    {
43
        return $this->objectManager;
44
    }
45
46
    /**
47
     * @return string
48
     */
49
    public function getObjectName()
50
    {
51
        return $this->objectName;
52
    }
53
54
    /**
55
     * @return string
56
     */
57
    public function getArchiveObjectName()
58
    {
59
        return $this->archiveObjectName;
60
    }
61
62
    /**
63
     * @return string
64
     */
65
    public function getRunClass()
66
    {
67
        return $this->runClass;
68
    }
69
70
    /**
71
     * @return string
72
     */
73
    public function getRunArchiveClass()
74
    {
75
        return $this->runArchiveClass;
76
    }
77
78
    /**
79
     * @return ObjectRepository
80
     */
81
    public function getRepository()
82
    {
83
        return $this->getObjectManager()->getRepository($this->getObjectName());
84
    }
85
86
    abstract protected function countJobsByStatus($objectName, $status, $workerName = null, $method = null);
87
88
    public function resetErroneousJobs($workerName = null, $method = null)
89
    {
90
        $count = $this->countJobsByStatus($this->getArchiveObjectName(), Job::STATUS_ERROR, $workerName, $method);
91
92
        $criterion = ['status' => Job::STATUS_ERROR];
93
        if ($workerName) {
94
            $criterion['workerName'] = $workerName;
95
        }
96
        if ($method) {
97
            $criterion['method'] = $method;
98
        }
99
100
        $countProcessed = 0;
101
        $objectManager = $this->getObjectManager();
102
        for ($i = 0; $i < $count; $i += static::FETCH_COUNT) {
103
            $countProcessed += $this->resetJobsByCriterion(
104
                $criterion, static::FETCH_COUNT, $i);
105
            $objectManager->flush();
106
        }
107
108
        return $countProcessed;
109
    }
110
111
    protected function getStalledJobs($workerName = null, $method = null)
112
    {
113
        $count = $this->countJobsByStatus($this->getObjectName(), Job::STATUS_RUNNING, $workerName, $method);
114
115
        $criterion = ['status' => Job::STATUS_RUNNING];
116
        if ($workerName) {
117
            $criterion['workerName'] = $workerName;
118
        }
119
        if ($method) {
120
            $criterion['method'] = $method;
121
        }
122
123
        $runningJobsById = [];
124
125
        $objectManager = $this->getObjectManager();
126
        $runRepository = $objectManager->getRepository($this->runClass);
127
        $runArchiveRepository = $objectManager->getRepository($this->runArchiveClass);
128
        $repository = $this->getRepository();
129
130
        for ($i = 0; $i < $count; $i += static::FETCH_COUNT) {
131
            $runningJobs = $repository->findBy($criterion);
132
            if ($runningJobs) {
0 ignored issues
show
Bug Best Practice introduced by
The expression $runningJobs of type array is implicitly converted to a boolean; are you sure this is intended? If so, consider using ! empty($expr) instead to make it clear that you intend to check for an array without elements.

This check marks implicit conversions of arrays to boolean values in a comparison. While in PHP an empty array is considered to be equal (but not identical) to false, this is not always apparent.

Consider making the comparison explicit by using empty(..) or ! empty(...) instead.

Loading history...
133
                foreach ($runningJobs as $job) {
134
                    /** Job $job */
135
                    if (null !== $runId = $job->getRunId()) {
136
                        $runningJobsById[$runId][] = $job;
137
                    }
138
                }
139
            }
140
        }
141
142
        $stalledJobs = [];
143
        foreach (array_keys($runningJobsById) as $runId) {
144
            if ($runRepository->find($runId)) {
145
                continue;
146
            }
147
            /** @var Run $run */
148
            if ($run = $runArchiveRepository->find($runId)) {
0 ignored issues
show
Bug introduced by
Are you sure the assignment to $run is correct as $runArchiveRepository->find($runId) (which targets Doctrine\Common\Persiste...bjectRepository::find()) seems to always return null.

This check looks for function or method calls that always return null and whose return value is assigned to a variable.

class A
{
    function getObject()
    {
        return null;
    }

}

$a = new A();
$object = $a->getObject();

The method getObject() can return nothing but null, so it makes no sense to assign that value to a variable.

The reason is most likely that a function or method is imcomplete or has been reduced for debug purposes.

Loading history...
149
                if ($endTime = $run->getEndedAt()) {
150
                    // Did it end over an hour ago
151
                    if ((time() - $endTime->getTimestamp()) > static::STALLED_SECONDS) {
152
                        $stalledJobs = array_merge($stalledJobs, $runningJobsById[$runId]);
153
                    }
154
                }
155
            }
156
        }
157
158
        return $stalledJobs;
159
    }
160
161
    public function resetStalledJobs($workerName = null, $method = null)
162
    {
163
        $stalledJobs = $this->getStalledJobs($workerName, $method);
164
165
        $objectManager = $this->getObjectManager();
166
167
        $countProcessed = 0;
168
        for ($i = 0, $count = count($stalledJobs); $i < $count; $i += static::FETCH_COUNT) {
169
            for ($j = $i, $max = $i + static::FETCH_COUNT; $j < $max && $j < $count; ++$j ) {
170
                $job = $stalledJobs[$j];
171
                /* Job $job */
172
                $job->setStatus(Job::STATUS_NEW);
173
                $job->setLocked(null);
174
                $job->setLockedAt(null);
175
                $objectManager->persist($job);
176
                ++$countProcessed;
177
            }
178
            $objectManager->flush();
179
        }
180
181
        return $countProcessed;
182
    }
183
184
    public function pruneStalledJobs($workerName = null, $method = null)
185
    {
186
        $stalledJobs = $this->getStalledJobs($workerName, $method);
187
        $objectManager = $this->getObjectManager();
188
189
        $countProcessed = 0;
190
        for ($i = 0, $count = count($stalledJobs); $i < $count; $i += static::FETCH_COUNT) {
191
            for ($j = $i, $max = $i + static::FETCH_COUNT; $j < $max && $j < $count; ++$j ) {
192
                $job = $stalledJobs[$j];
193
                $objectManager->remove($job);
194
                ++$countProcessed;
195
            }
196
            $objectManager->flush();
197
        }
198
199
        return $countProcessed;
200
    }
201
202
    public function deleteJob(\Dtc\QueueBundle\Model\Job $job)
203
    {
204
        $objectManager = $this->getObjectManager();
205
        $objectManager->remove($job);
206
        $objectManager->flush();
207
    }
208
209
    public function saveHistory(\Dtc\QueueBundle\Model\Job $job)
210
    {
211
        $this->deleteJob($job); // Should cause job to be archived
212
    }
213
214
    public function save(\Dtc\QueueBundle\Model\Job $job)
215
    {
216
        // Todo: Serialize args
217
218
        // Generate crc hash for the job
219
        $hashValues = array($job->getClassName(), $job->getMethod(), $job->getWorkerName(), $job->getArgs());
220
        $crcHash = hash('sha256', serialize($hashValues));
221
        $job->setCrcHash($crcHash);
222
        $objectManager = $this->getObjectManager();
223
224
        if (true === $job->getBatch()) {
225
            // See if similar job that hasn't run exists
226
            $criteria = array('crcHash' => $crcHash, 'status' => Job::STATUS_NEW);
227
            $oldJob = $this->getRepository()->findOneBy($criteria);
228
229
            if ($oldJob) {
230
                // Old job exists - just override fields Set higher priority
231
                $oldJob->setPriority(max($job->getPriority(), $oldJob->getPriority()));
232
                $oldJob->setWhenAt(min($job->getWhenAt(), $oldJob->getWhenAt()));
233
                $oldJob->setBatch(true);
234
                $oldJob->setUpdatedAt(new \DateTime());
235
                $objectManager->persist($oldJob);
236
                $objectManager->flush();
237
238
                return $oldJob;
239
            }
240
        }
241
242
        // Just save a new job
243
        $objectManager->persist($job);
244
        $objectManager->flush();
245
246
        return $job;
247
    }
248
249
    abstract protected function stopIdGenerator($objectName);
250
251
    private function resetJobsByCriterion(
252
        $criterion,
253
        $limit,
254
        $offset)
255
    {
256
        $objectManager = $this->getObjectManager();
257
        $objectName = $this->getObjectName();
258
        $archiveObjectName = $this->getArchiveObjectName();
259
        $jobRepository = $objectManager->getRepository($objectName);
260
        $jobArchiveRepository = $objectManager->getRepository($archiveObjectName);
261
        $className = $jobRepository->getClassName();
262
        $metadata = $objectManager->getClassMetadata($className);
263
264
        $this->stopIdGenerator($archiveObjectName);
265
        $identifierData = $metadata->getIdentifier();
266
        $idColumn = isset($identifierData[0]) ? $identifierData[0] : 'id';
267
        $results = $jobArchiveRepository->findBy($criterion, [$idColumn => 'ASC'], $limit, $offset);
268
        $countProcessed = 0;
269
270
        foreach ($results as $jobArchive) {
271
            /** @var Job $job */
272
            $job = new $className();
273
            Util::copy($jobArchive, $job);
274
            $job->setStatus(Job::STATUS_NEW);
275
            $job->setLocked(null);
276
            $job->setLockedAt(null);
277
            $job->setMessage(null);
278
            $job->setUpdatedAt(new \DateTime());
279
            $job->setFinishedAt(null);
0 ignored issues
show
Documentation introduced by
null is of type null, but the function expects a object<DateTime>.

It seems like the type of the argument is not accepted by the function/method which you are calling.

In some cases, in particular if PHP’s automatic type-juggling kicks in this might be fine. In other cases, however this might be a bug.

We suggest to add an explicit type cast like in the following example:

function acceptsInteger($int) { }

$x = '123'; // string "123"

// Instead of
acceptsInteger($x);

// we recommend to use
acceptsInteger((integer) $x);
Loading history...
280
            $job->setElapsed(null);
281
282
            $objectManager->persist($job);
283
            $objectManager->remove($jobArchive);
284
            ++$countProcessed;
285
        }
286
287
        return $countProcessed;
288
    }
289
}
290