Completed
Branch master (45c65c)
by Matthew
02:05
created

BaseJobManager::findRunningJobs()   B

Complexity

Conditions 5
Paths 3

Size

Total Lines 19
Code Lines 10

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 10
CRAP Score 5

Importance

Changes 0
Metric Value
dl 0
loc 19
ccs 10
cts 10
cp 1
rs 8.8571
c 0
b 0
f 0
cc 5
eloc 10
nc 3
nop 2
crap 5
1
<?php
2
3
namespace Dtc\QueueBundle\Doctrine;
4
5
use Doctrine\Common\Persistence\ObjectManager;
6
use Doctrine\Common\Persistence\ObjectRepository;
7
use Doctrine\ODM\MongoDB\DocumentRepository;
8
use Doctrine\ORM\EntityRepository;
9
use Dtc\QueueBundle\Model\AbstractJobManager;
10
use Dtc\QueueBundle\Model\BaseJob;
11
use Dtc\QueueBundle\Model\Job;
12
use Dtc\QueueBundle\Model\Run;
13
use Dtc\QueueBundle\Util\Util;
14
15
abstract class BaseJobManager extends AbstractJobManager
16
{
17
    /** Number of jobs to prune / reset / gather at a time */
18
    const FETCH_COUNT = 100;
19
20
    /** Number of seconds before a job is considered stalled if the runner is no longer active */
21
    const STALLED_SECONDS = 1800;
22
    protected $objectManager;
23
    protected $objectName;
24
    protected $archiveObjectName;
25
    protected $runClass;
26
    protected $runArchiveClass;
27
28
    /**
29
     * @param string $objectName
30
     * @param string $archiveObjectName
31
     * @param string $runClass
32
     * @param string $runArchiveClass
33
     */
34
    public function __construct(ObjectManager $objectManager,
35
        $objectName,
36
        $archiveObjectName,
37
        $runClass,
38
        $runArchiveClass)
39
    {
40
        $this->objectManager = $objectManager;
41
        $this->objectName = $objectName;
42
        $this->archiveObjectName = $archiveObjectName;
43
        $this->runClass = $runClass;
44
        $this->runArchiveClass = $runArchiveClass;
45
    }
46
47
    /**
48
     * @return ObjectManager
49
     */
50 23
    public function getObjectManager()
51
    {
52 23
        return $this->objectManager;
53
    }
54
55
    /**
56
     * @return string
57
     */
58 19
    public function getObjectName()
59
    {
60 19
        return $this->objectName;
61
    }
62
63
    /**
64
     * @return string
65
     */
66 10
    public function getArchiveObjectName()
67
    {
68 10
        return $this->archiveObjectName;
69
    }
70
71
    /**
72
     * @return string
73
     */
74 2
    public function getRunClass()
75
    {
76 2
        return $this->runClass;
77
    }
78
79
    /**
80
     * @return string
81
     */
82 2
    public function getRunArchiveClass()
83
    {
84 2
        return $this->runArchiveClass;
85
    }
86
87
    /**
88
     * @return ObjectRepository
89
     */
90 12
    public function getRepository()
91
    {
92 12
        return $this->getObjectManager()->getRepository($this->getObjectName());
93
    }
94
95
    /**
96
     * @param string $objectName
97
     */
98
    abstract protected function countJobsByStatus($objectName, $status, $workerName = null, $method = null);
99
100 2
    public function resetErroneousJobs($workerName = null, $method = null)
101
    {
102 2
        $count = $this->countJobsByStatus($this->getArchiveObjectName(), Job::STATUS_ERROR, $workerName, $method);
103
104 2
        $criterion = ['status' => Job::STATUS_ERROR];
105 2
        if (null !== $workerName) {
106
            $criterion['workerName'] = $workerName;
107
        }
108 2
        if (null !== $method) {
109
            $criterion['method'] = $method;
110
        }
111
112 2
        $countProcessed = 0;
113 2
        $objectManager = $this->getObjectManager();
114 2
        for ($i = 0; $i < $count; $i += static::FETCH_COUNT) {
115 1
            $countProcessed += $this->resetJobsByCriterion(
116 1
                $criterion, static::FETCH_COUNT, $i);
117 1
            $objectManager->flush();
118
        }
119
120 2
        return $countProcessed;
121
    }
122
123 4
    protected function getStalledJobs($workerName = null, $method = null)
124
    {
125 4
        $count = $this->countJobsByStatus($this->getObjectName(), Job::STATUS_RUNNING, $workerName, $method);
126
127 4
        $criterion = ['status' => Job::STATUS_RUNNING];
128 4
        if (null !== $workerName) {
129 1
            $criterion['workerName'] = $workerName;
130
        }
131 4
        if (null !== $method) {
132 1
            $criterion['method'] = $method;
133
        }
134
135 4
        $runningJobs = $this->findRunningJobs($criterion, $count);
136
137 4
        return $this->extractStalledJobs($runningJobs);
138
    }
139
140 4
    protected function findRunningJobs($criterion, $count)
141
    {
142 4
        $repository = $this->getRepository();
143 4
        $runningJobsById = [];
144
145 4
        for ($i = 0; $i < $count; $i += static::FETCH_COUNT) {
146 4
            $runningJobs = $repository->findBy($criterion);
147 4
            if (!empty($runningJobs)) {
148 4
                foreach ($runningJobs as $job) {
149
                    /** @var Job $job */
150 4
                    if (null !== $runId = $job->getRunId()) {
151 4
                        $runningJobsById[$runId][] = $job;
152
                    }
153
                }
154
            }
155
        }
156
157 4
        return $runningJobsById;
158
    }
159
160 4
    protected function extractStalledJobs(array $runningJobsById)
161
    {
162 4
        $objectManager = $this->getObjectManager();
163 4
        $runRepository = $objectManager->getRepository($this->runClass);
164
        /** @var EntityRepository|DocumentRepository $runArchiveRepository */
165 4
        $runArchiveRepository = $objectManager->getRepository($this->runArchiveClass);
166
167 4
        $stalledJobs = [];
168 4
        foreach (array_keys($runningJobsById) as $runId) {
169 2
            if ($runRepository->find($runId)) {
170
                continue;
171
            }
172
            /** @var Run $run */
173 2
            if ($run = $runArchiveRepository->find($runId)) {
174 2
                if ($endTime = $run->getEndedAt()) {
175
                    // Did it end over an hour ago
176 2
                    if ((time() - $endTime->getTimestamp()) > static::STALLED_SECONDS) {
177 2
                        $stalledJobs = array_merge($stalledJobs, $runningJobsById[$runId]);
178
                    }
179
                }
180
            }
181
        }
182
183 4
        return $stalledJobs;
184
    }
185
186 2
    public function resetStalledJobs($workerName = null, $method = null)
187
    {
188 2
        $stalledJobs = $this->getStalledJobs($workerName, $method);
189
190 2
        $objectManager = $this->getObjectManager();
191
192 2
        $countProcessed = 0;
193 2
        for ($i = 0, $count = count($stalledJobs); $i < $count; $i += static::FETCH_COUNT) {
194 1
            for ($j = $i, $max = $i + static::FETCH_COUNT; $j < $max && $j < $count; ++$j) {
195 1
                $job = $stalledJobs[$j];
196
                /* Job $job */
197 1
                $job->setStatus(BaseJob::STATUS_NEW);
198 1
                $job->setLocked(null);
199 1
                $job->setLockedAt(null);
200 1
                $objectManager->persist($job);
201 1
                ++$countProcessed;
202
            }
203 1
            $objectManager->flush();
204
        }
205
206 2
        return $countProcessed;
207
    }
208
209
    /**
210
     * @param string $workerName
211
     * @param string $method
212
     */
213 2
    public function pruneStalledJobs($workerName = null, $method = null)
214
    {
215 2
        $stalledJobs = $this->getStalledJobs($workerName, $method);
216 2
        $objectManager = $this->getObjectManager();
217
218 2
        $countProcessed = 0;
219 2
        for ($i = 0, $count = count($stalledJobs); $i < $count; $i += static::FETCH_COUNT) {
220 1
            for ($j = $i, $max = $i + static::FETCH_COUNT; $j < $max && $j < $count; ++$j) {
221 1
                $job = $stalledJobs[$j];
222 1
                $objectManager->remove($job);
223 1
                ++$countProcessed;
224
            }
225 1
            $objectManager->flush();
226
        }
227
228 2
        return $countProcessed;
229
    }
230
231 6
    public function deleteJob(\Dtc\QueueBundle\Model\Job $job)
232
    {
233 6
        $objectManager = $this->getObjectManager();
234 6
        $objectManager->remove($job);
235 6
        $objectManager->flush();
236 6
    }
237
238
    public function saveHistory(\Dtc\QueueBundle\Model\Job $job)
239
    {
240
        $this->deleteJob($job); // Should cause job to be archived
241
    }
242
243 16
    public function save(\Dtc\QueueBundle\Model\Job $job)
244
    {
245
        // Todo: Serialize args
246
247
        // Generate crc hash for the job
248 16
        $hashValues = array($job->getClassName(), $job->getMethod(), $job->getWorkerName(), $job->getArgs());
249 16
        $crcHash = hash('sha256', serialize($hashValues));
250 16
        $job->setCrcHash($crcHash);
251 16
        $objectManager = $this->getObjectManager();
252
253 16
        if (true === $job->getBatch()) {
254
            // See if similar job that hasn't run exists
255
            $criteria = array('crcHash' => $crcHash, 'status' => BaseJob::STATUS_NEW);
256
            $oldJob = $this->getRepository()->findOneBy($criteria);
257
258
            if ($oldJob) {
259
                // Old job exists - just override fields Set higher priority
260
                $oldJob->setPriority(max($job->getPriority(), $oldJob->getPriority()));
261
                $oldJob->setWhenAt(min($job->getWhenAt(), $oldJob->getWhenAt()));
262
                $oldJob->setBatch(true);
263
                $oldJob->setUpdatedAt(new \DateTime());
264
                $objectManager->persist($oldJob);
265
                $objectManager->flush();
266
267
                return $oldJob;
268
            }
269
        }
270
271
        // Just save a new job
272 16
        $objectManager->persist($job);
273 16
        $objectManager->flush();
274
275 16
        return $job;
276
    }
277
278
    /**
279
     * @param string $objectName
280
     */
281
    abstract protected function stopIdGenerator($objectName);
282
283
    /**
284
     * @param int $limit
285
     * @param int $offset
286
     */
287 1
    private function resetJobsByCriterion(
288
        $criterion,
289
        $limit,
290
        $offset)
291
    {
292 1
        $objectManager = $this->getObjectManager();
293 1
        $objectName = $this->getObjectName();
294 1
        $archiveObjectName = $this->getArchiveObjectName();
295 1
        $jobRepository = $objectManager->getRepository($objectName);
296 1
        $jobArchiveRepository = $objectManager->getRepository($archiveObjectName);
297 1
        $className = $jobRepository->getClassName();
298 1
        $metadata = $objectManager->getClassMetadata($className);
299
300 1
        $this->stopIdGenerator($archiveObjectName);
301 1
        $identifierData = $metadata->getIdentifier();
302 1
        $idColumn = isset($identifierData[0]) ? $identifierData[0] : 'id';
303 1
        $results = $jobArchiveRepository->findBy($criterion, [$idColumn => 'ASC'], $limit, $offset);
304 1
        $countProcessed = 0;
305
306 1
        foreach ($results as $jobArchive) {
307
            /** @var Job $job */
308 1
            $job = new $className();
309 1
            Util::copy($jobArchive, $job);
310 1
            $job->setStatus(BaseJob::STATUS_NEW);
311 1
            $job->setLocked(null);
312 1
            $job->setLockedAt(null);
313 1
            $job->setMessage(null);
314 1
            $job->setUpdatedAt(new \DateTime());
315 1
            $job->setFinishedAt(null);
316 1
            $job->setElapsed(null);
317
318 1
            $objectManager->persist($job);
319 1
            $objectManager->remove($jobArchive);
320 1
            ++$countProcessed;
321
        }
322
323 1
        return $countProcessed;
324
    }
325
}
326