Duplicate code is one of the most pungent code smells. A rule that is often used is to re-structure code once it is duplicated in three or more places.
Common duplication problems, and corresponding solutions are:
Complex classes like JobManager often do a lot of different things. To break such a class down, we need to identify a cohesive component within that class. A common approach to find such a component is to look for fields/methods that share the same prefixes, or suffixes. You can also have a look at the cohesion graph to spot any un-connected, or weakly-connected components.
Once you have determined the fields that belong together, you can apply the Extract Class refactoring. If the component makes sense as a sub-class, Extract Subclass is also a candidate, and is often faster.
While breaking up the class, it is a good idea to analyze how other classes use JobManager, and based on these observations, apply Extract Interface, too.
1 | <?php |
||
15 | class JobManager extends DoctrineJobManager |
||
16 | { |
||
17 | use CommonTrait; |
||
18 | protected static $saveInsertCalled = null; |
||
19 | protected static $resetInsertCalled = null; |
||
20 | |||
21 | 3 | public function countJobsByStatus($objectName, $status, $workerName = null, $method = null) |
|
51 | |||
52 | /** |
||
53 | * @param string|null $workerName |
||
54 | * @param string|null $method |
||
55 | * |
||
56 | * @return int Count of jobs pruned |
||
57 | */ |
||
58 | 1 | public function pruneExceptionJobs($workerName = null, $method = null) |
|
59 | { |
||
60 | /** @var EntityManager $objectManager */ |
||
61 | 1 | $objectManager = $this->getObjectManager(); |
|
62 | 1 | $queryBuilder = $objectManager->createQueryBuilder()->delete($this->getJobArchiveClass(), 'j'); |
|
63 | 1 | $queryBuilder->where('j.status = :status') |
|
64 | 1 | ->setParameter(':status', BaseJob::STATUS_EXCEPTION); |
|
65 | |||
66 | 1 | $this->addWorkerNameCriterion($queryBuilder, $workerName, $method); |
|
67 | 1 | $query = $queryBuilder->getQuery(); |
|
68 | |||
69 | 1 | return intval($query->execute()); |
|
70 | } |
||
71 | |||
72 | 21 | protected function resetSaveOk($function) |
|
95 | |||
96 | /** |
||
97 | * @param string $workerName |
||
98 | * @param string $method |
||
99 | */ |
||
100 | 15 | protected function addWorkerNameCriterion(QueryBuilder $queryBuilder, $workerName = null, $method = null) |
|
110 | |||
111 | 1 | protected function updateExpired($workerName = null, $method = null) |
|
128 | |||
129 | /** |
||
130 | * Removes archived jobs older than $olderThan. |
||
131 | * |
||
132 | * @param \DateTime $olderThan |
||
133 | */ |
||
134 | 1 | public function pruneArchivedJobs(\DateTime $olderThan) |
|
142 | |||
143 | 2 | View Code Duplication | public function getWaitingJobCount($workerName = null, $method = null) |
158 | |||
159 | /** |
||
160 | * Get Jobs statuses. |
||
161 | */ |
||
162 | 3 | public function getStatus() |
|
163 | { |
||
164 | 3 | $result = []; |
|
165 | 3 | $this->getStatusByEntityName($this->getJobClass(), $result); |
|
166 | 3 | $this->getStatusByEntityName($this->getJobArchiveClass(), $result); |
|
167 | |||
168 | 3 | $finalResult = []; |
|
169 | 3 | foreach ($result as $key => $item) { |
|
170 | 1 | ksort($item); |
|
171 | 1 | foreach ($item as $status => $count) { |
|
172 | 1 | if (isset($finalResult[$key][$status])) { |
|
173 | $finalResult[$key][$status] += $count; |
||
174 | } else { |
||
175 | 1 | $finalResult[$key][$status] = $count; |
|
176 | } |
||
177 | } |
||
178 | } |
||
179 | |||
180 | 3 | return $finalResult; |
|
181 | } |
||
182 | |||
183 | /** |
||
184 | * @param string $entityName |
||
185 | */ |
||
186 | 3 | protected function getStatusByEntityName($entityName, array &$result) |
|
187 | { |
||
188 | /** @var EntityManager $objectManager */ |
||
189 | 3 | $objectManager = $this->getObjectManager(); |
|
190 | 3 | $result1 = $objectManager->getRepository($entityName)->createQueryBuilder('j')->select('j.workerName, j.method, j.status, count(j) as c') |
|
191 | 3 | ->groupBy('j.workerName, j.method, j.status')->getQuery()->getArrayResult(); |
|
192 | |||
193 | 3 | foreach ($result1 as $item) { |
|
194 | 1 | $method = $item['workerName'].'->'.$item['method'].'()'; |
|
195 | 1 | if (!isset($result[$method])) { |
|
196 | 1 | $result[$method] = static::getAllStatuses(); |
|
197 | } |
||
198 | 1 | $result[$method][$item['status']] += intval($item['c']); |
|
199 | } |
||
200 | 3 | } |
|
201 | |||
202 | /** |
||
203 | * Get the next job to run (can be filtered by workername and method name). |
||
204 | * |
||
205 | * @param string $workerName |
||
206 | * @param string $methodName |
||
207 | * @param bool $prioritize |
||
208 | * @param int $runId |
||
209 | * |
||
210 | * @return Job|null |
||
211 | */ |
||
212 | 12 | public function getJob($workerName = null, $methodName = null, $prioritize = true, $runId = null) |
|
213 | { |
||
214 | do { |
||
215 | 12 | $queryBuilder = $this->getJobQueryBuilder($workerName, $methodName, $prioritize); |
|
216 | 12 | $queryBuilder->select('j.id'); |
|
217 | 12 | $queryBuilder->setMaxResults(100); |
|
218 | |||
219 | /** @var QueryBuilder $queryBuilder */ |
||
220 | 12 | $query = $queryBuilder->getQuery(); |
|
221 | 12 | $jobs = $query->getResult(); |
|
222 | 12 | if ($jobs) { |
|
223 | 9 | foreach ($jobs as $job) { |
|
224 | 9 | if ($job = $this->takeJob($job['id'], $runId)) { |
|
225 | 9 | return $job; |
|
226 | } |
||
227 | } |
||
228 | } |
||
229 | 7 | } while ($jobs); |
|
230 | |||
231 | 7 | return null; |
|
232 | } |
||
233 | |||
234 | /** |
||
235 | * @param string|null $workerName |
||
236 | * @param string|null $methodName |
||
237 | * @param bool $prioritize |
||
238 | * |
||
239 | * @return QueryBuilder |
||
240 | */ |
||
241 | 12 | public function getJobQueryBuilder($workerName = null, $methodName = null, $prioritize = true) |
|
242 | { |
||
243 | /** @var EntityRepository $repository */ |
||
244 | 12 | $repository = $this->getRepository(); |
|
245 | 12 | $queryBuilder = $repository->createQueryBuilder('j'); |
|
246 | 12 | $this->addStandardPredicate($queryBuilder); |
|
247 | 12 | $this->addWorkerNameCriterion($queryBuilder, $workerName, $methodName); |
|
248 | |||
249 | 12 | if ($prioritize) { |
|
250 | 12 | $queryBuilder->addOrderBy('j.priority', 'DESC'); |
|
251 | 12 | $queryBuilder->addOrderBy('j.whenUs', 'ASC'); |
|
252 | } else { |
||
253 | 1 | $queryBuilder->orderBy('j.whenUs', 'ASC'); |
|
254 | } |
||
255 | |||
256 | 12 | return $queryBuilder; |
|
257 | } |
||
258 | |||
259 | 15 | protected function addStandardPredicate(QueryBuilder $queryBuilder, $status = BaseJob::STATUS_NEW) |
|
260 | { |
||
261 | 15 | $dateTime = Util::getMicrotimeDateTime(); |
|
262 | 15 | $decimal = Util::getMicrotimeDecimalFormat($dateTime); |
|
263 | |||
264 | $queryBuilder |
||
265 | 15 | ->where('j.status = :status')->setParameter(':status', $status) |
|
266 | 15 | ->andWhere($queryBuilder->expr()->orX( |
|
267 | 15 | $queryBuilder->expr()->isNull('j.whenUs'), |
|
268 | 15 | $queryBuilder->expr()->lte('j.whenUs', ':whenUs') |
|
269 | )) |
||
270 | 15 | ->andWhere($queryBuilder->expr()->orX( |
|
271 | 15 | $queryBuilder->expr()->isNull('j.expiresAt'), |
|
272 | 15 | $queryBuilder->expr()->gt('j.expiresAt', ':expiresAt') |
|
273 | )) |
||
274 | 15 | ->setParameter(':whenUs', $decimal) |
|
275 | 15 | ->setParameter(':expiresAt', $dateTime); |
|
276 | 15 | } |
|
277 | |||
278 | /** |
||
279 | * @param int $runId |
||
280 | */ |
||
281 | 9 | protected function takeJob($jobId, $runId = null) |
|
282 | { |
||
283 | /** @var EntityRepository $repository */ |
||
284 | 9 | $repository = $this->getRepository(); |
|
285 | /** @var QueryBuilder $queryBuilder */ |
||
286 | 9 | $queryBuilder = $repository->createQueryBuilder('j'); |
|
287 | $queryBuilder |
||
288 | 9 | ->update() |
|
289 | 9 | ->set('j.status', ':status') |
|
290 | 9 | ->setParameter(':status', BaseJob::STATUS_RUNNING); |
|
291 | 9 | if (null !== $runId) { |
|
292 | $queryBuilder |
||
293 | 1 | ->set('j.runId', ':runId') |
|
294 | 1 | ->setParameter(':runId', $runId); |
|
295 | } |
||
296 | 9 | $queryBuilder->set('j.startedAt', ':startedAt') |
|
297 | 9 | ->setParameter(':startedAt', Util::getMicrotimeDateTime()); |
|
298 | 9 | $queryBuilder->where('j.id = :id'); |
|
299 | 9 | $queryBuilder->setParameter(':id', $jobId); |
|
300 | 9 | $resultCount = $queryBuilder->getQuery()->execute(); |
|
301 | |||
302 | 9 | if (1 === $resultCount) { |
|
303 | 9 | return $repository->find($jobId); |
|
304 | } |
||
305 | |||
306 | return null; |
||
307 | } |
||
308 | |||
309 | /** |
||
310 | * Tries to update the nearest job as a batch. |
||
311 | * |
||
312 | * @param \Dtc\QueueBundle\Model\Job $job |
||
313 | * |
||
314 | * @return null|Job |
||
315 | */ |
||
316 | 1 | public function updateNearestBatch(\Dtc\QueueBundle\Model\Job $job) |
|
317 | { |
||
318 | 1 | if (!$job instanceof Job) { |
|
319 | throw new UnsupportedException('$job must be instance of '.Job::class); |
||
320 | } |
||
321 | |||
322 | /** @var QueryBuilder $queryBuilder */ |
||
323 | 1 | $queryBuilder = $this->getRepository()->createQueryBuilder('j'); |
|
324 | 1 | $queryBuilder->select() |
|
325 | 1 | ->where('j.crcHash = :crcHash') |
|
326 | 1 | ->andWhere('j.status = :status') |
|
327 | 1 | ->setParameter(':status', BaseJob::STATUS_NEW) |
|
328 | 1 | ->setParameter(':crcHash', $job->getCrcHash()) |
|
329 | 1 | ->orderBy('j.whenUs', 'ASC') |
|
330 | 1 | ->setMaxResults(1); |
|
331 | 1 | $existingJobs = $queryBuilder->getQuery()->execute(); |
|
332 | |||
333 | 1 | if (empty($existingJobs)) { |
|
334 | return null; |
||
335 | } |
||
336 | |||
337 | /** @var Job $existingJob */ |
||
338 | 1 | $existingJob = $existingJobs[0]; |
|
339 | |||
340 | 1 | $newPriority = max($job->getPriority(), $existingJob->getPriority()); |
|
341 | 1 | $newWhenUs = $existingJob->getWhenUs(); |
|
342 | 1 | $bcResult = bccomp($job->getWhenUs(), $existingJob->getWhenUs()); |
|
343 | 1 | if ($bcResult < 0) { |
|
344 | 1 | $newWhenUs = $job->getWhenUs(); |
|
345 | } |
||
346 | |||
347 | 1 | $this->updateBatchJob($existingJob, $newPriority, $newWhenUs); |
|
348 | |||
349 | 1 | return $existingJob; |
|
350 | } |
||
351 | |||
352 | /** |
||
353 | * @param int $newPriority |
||
354 | * @param string $newWhenUs |
||
355 | */ |
||
356 | 1 | protected function updateBatchJob(Job $existingJob, $newPriority, $newWhenUs) |
|
357 | { |
||
358 | 1 | $existingPriority = $existingJob->getPriority(); |
|
359 | 1 | $existingWhenUs = $existingJob->getWhenUs(); |
|
360 | |||
361 | 1 | if ($newPriority !== $existingPriority || $newWhenUs !== $existingWhenUs) { |
|
362 | /** @var EntityRepository $repository */ |
||
363 | 1 | $repository = $this->getRepository(); |
|
364 | /** @var QueryBuilder $queryBuilder */ |
||
365 | 1 | $queryBuilder = $repository->createQueryBuilder('j'); |
|
366 | 1 | $queryBuilder->update(); |
|
367 | 1 | if ($newPriority !== $existingPriority) { |
|
368 | 1 | $existingJob->setPriority($newPriority); |
|
369 | 1 | $queryBuilder->set('j.priority', ':priority') |
|
370 | 1 | ->setParameter(':priority', $newPriority); |
|
371 | } |
||
372 | 1 | if ($newWhenUs !== $existingWhenUs) { |
|
373 | 1 | $existingJob->setWhenUs($newWhenUs); |
|
374 | 1 | $queryBuilder->set('j.whenUs', ':whenUs') |
|
375 | 1 | ->setParameter(':whenUs', $newWhenUs); |
|
376 | } |
||
377 | 1 | $queryBuilder->where('j.id = :id'); |
|
378 | 1 | $queryBuilder->setParameter(':id', $existingJob->getId()); |
|
379 | 1 | $queryBuilder->getQuery()->execute(); |
|
380 | } |
||
381 | |||
382 | 1 | return $existingJob; |
|
383 | } |
||
384 | |||
385 | 2 | public function getWorkersAndMethods($status = BaseJob::STATUS_NEW) |
|
386 | { |
||
387 | /** @var EntityRepository $repository */ |
||
388 | 2 | $repository = $this->getRepository(); |
|
389 | 2 | $queryBuilder = $repository->createQueryBuilder('j'); |
|
390 | 2 | $this->addStandardPredicate($queryBuilder, $status); |
|
391 | $queryBuilder |
||
392 | 2 | ->select('DISTINCT j.workerName, j.method'); |
|
393 | |||
394 | 2 | $results = $queryBuilder->getQuery()->getArrayResult(); |
|
395 | 2 | if (empty($results)) { |
|
396 | 2 | return []; |
|
397 | } |
||
398 | $workerMethods = []; |
||
399 | foreach ($results as $result) { |
||
400 | $workerMethods[$result['workerName']][] = $result['method']; |
||
401 | } |
||
402 | |||
403 | return $workerMethods; |
||
404 | } |
||
405 | |||
406 | /** |
||
407 | * @param string $workerName |
||
408 | * @param string $methodName |
||
409 | */ |
||
410 | 2 | public function countLiveJobs($workerName = null, $methodName = null) |
|
421 | |||
422 | /** |
||
423 | * @param string $workerName |
||
424 | * @param string $methodName |
||
425 | * @param callable|null $progressCallback |
||
426 | */ |
||
427 | 1 | public function archiveAllJobs($workerName = null, $methodName = null, callable $progressCallback = null) |
|
428 | { |
||
429 | // First mark all Live non-running jobs as Archive |
||
430 | 1 | $repository = $this->getRepository(); |
|
444 | |||
445 | /** |
||
446 | * Move jobs in 'archive' status to the archive table. |
||
447 | * |
||
448 | * This is a bit of a hack to run a lower level query so as to process the INSERT INTO SELECT |
||
449 | * All on the server as "INSERT INTO SELECT" is not supported natively in Doctrine. |
||
450 | * |
||
451 | * @param string|null $workerName |
||
452 | * @param string|null $methodName |
||
453 | * @param callable|null $progressCallback |
||
454 | */ |
||
455 | 1 | protected function runArchive($workerName = null, $methodName = null, callable $progressCallback = null) |
|
484 | } |
||
485 |
Duplicated code is one of the most pungent code smells. If you need to duplicate the same code in three or more different places, we strongly encourage you to look into extracting the code into a single class or operation.
You can also find more detailed suggestions in the “Code” section of your repository.