bingo-soft /
jabe
| 1 | <?php |
||||||
| 2 | |||||||
| 3 | namespace Jabe\Impl\Batch; |
||||||
| 4 | |||||||
| 5 | use Jabe\Impl\Context\Context; |
||||||
| 6 | use Jabe\Impl\JobExecutor\{ |
||||||
| 7 | JobDeclaration, |
||||||
| 8 | JobHandlerConfigurationInterface |
||||||
| 9 | }; |
||||||
| 10 | use Jabe\Impl\Json\JsonObjectConverter; |
||||||
| 11 | use Jabe\Impl\Persistence\Entity\{ |
||||||
| 12 | ByteArrayEntity, |
||||||
| 13 | ByteArrayManager, |
||||||
| 14 | JobEntity, |
||||||
| 15 | JobManager, |
||||||
| 16 | MessageEntity |
||||||
| 17 | }; |
||||||
| 18 | use Jabe\Impl\Util\JsonUtil; |
||||||
| 19 | |||||||
| 20 | abstract class AbstractBatchJobHandler implements BatchJobHandlerInterface |
||||||
| 21 | { |
||||||
| 22 | abstract public function getJobDeclaration(): JobDeclaration; |
||||||
| 23 | |||||||
| 24 | public function createJobs(BatchEntity $batch): bool |
||||||
| 25 | { |
||||||
| 26 | $configuration = $this->readConfiguration($batch->getConfigurationBytes()); |
||||||
| 27 | $deploymentId = null; |
||||||
| 28 | |||||||
| 29 | $idMappings = $configuration->getIdMappings(); |
||||||
| 30 | $deploymentAware = $idMappings !== null && !empty($idMappings); |
||||||
| 31 | |||||||
| 32 | $ids = $configuration->getIds(); |
||||||
| 33 | |||||||
| 34 | if ($deploymentAware) { |
||||||
| 35 | $this->sanitizeMappings($idMappings, $ids); |
||||||
| 36 | $mappingToProcess = $idMappings->get(0); |
||||||
| 37 | $ids = $mappingToProcess->getIds($ids); |
||||||
| 38 | $deploymentId = $mappingToProcess->getDeploymentId(); |
||||||
| 39 | } |
||||||
| 40 | |||||||
| 41 | $batchJobsPerSeed = $batch->getBatchJobsPerSeed(); |
||||||
| 42 | $invocationsPerBatchJob = $batch->getInvocationsPerBatchJob(); |
||||||
| 43 | |||||||
| 44 | $numberOfItemsToProcess = min($invocationsPerBatchJob * $batchJobsPerSeed, count($ids)); |
||||||
| 45 | |||||||
| 46 | // view of process instances to process |
||||||
| 47 | $processIds = array_slice($ids, 0, $numberOfItemsToProcess); |
||||||
| 48 | $this->createJobEntities($batch, $configuration, $deploymentId, $processIds, $invocationsPerBatchJob); |
||||||
| 49 | if ($deploymentAware) { |
||||||
| 50 | if (empty($ids)) { |
||||||
| 51 | // all ids of the deployment are handled |
||||||
| 52 | $idMappings->remove(0); |
||||||
| 53 | } else { |
||||||
| 54 | $idMappings->get(0)->removeIds($numberOfItemsToProcess); |
||||||
| 55 | } |
||||||
| 56 | } |
||||||
| 57 | |||||||
| 58 | // update batch configuration |
||||||
| 59 | $batch->setConfigurationBytes($this->writeConfiguration($configuration)); |
||||||
| 60 | |||||||
| 61 | return $deploymentAware ? $idMappings->isEmpty() : empty($ids); |
||||||
| 62 | } |
||||||
| 63 | |||||||
| 64 | protected function sanitizeMappings(DeploymentMappings $idMappings, array $ids): void |
||||||
| 65 | { |
||||||
| 66 | // for mixed version SeedJob execution, there might be ids that have been processed |
||||||
| 67 | // without updating the mappings, this is corrected here, |
||||||
| 68 | // see https://jira.camunda.com/browse/CAM-11188 |
||||||
| 69 | $elementsToRemove = $idMappings->getOverallIdCount() - count($ids); |
||||||
| 70 | if ($elementsToRemove > 0) { |
||||||
| 71 | foreach ($idMappings as $key => $deploymentMapping) { |
||||||
| 72 | if ($deploymentMapping->getCount() <= $elementsToRemove) { |
||||||
| 73 | $idMappings->remove($key); |
||||||
| 74 | $elementsToRemove -= $deploymentMapping->getCount(); |
||||||
| 75 | if ($elementsToRemove == 0) { |
||||||
| 76 | break; |
||||||
| 77 | } |
||||||
| 78 | } else { |
||||||
| 79 | $deploymentMapping->removeIds($elementsToRemove); |
||||||
| 80 | break; |
||||||
| 81 | } |
||||||
| 82 | } |
||||||
| 83 | } |
||||||
| 84 | } |
||||||
| 85 | |||||||
| 86 | protected function createJobEntities(BatchEntity $batch, BatchConfiguration $configuration, string $deploymentId, array $processIds, int $invocationsPerBatchJob): void |
||||||
| 87 | { |
||||||
| 88 | if (empty($processIds)) { |
||||||
| 89 | return; |
||||||
| 90 | } |
||||||
| 91 | |||||||
| 92 | $commandContext = Context::getCommandContext(); |
||||||
| 93 | $byteArrayManager = $commandContext->getByteArrayManager(); |
||||||
| 94 | $jobManager = $commandContext->getJobManager(); |
||||||
| 95 | |||||||
| 96 | $createdJobs = 0; |
||||||
| 97 | while (!empty($processIds)) { |
||||||
| 98 | $lastIdIndex = min($invocationsPerBatchJob, count($processIds)); |
||||||
| 99 | // view of process instances for this job |
||||||
| 100 | $idsForJob = array_splice($processIds, 0, $lastIdIndex); |
||||||
| 101 | |||||||
| 102 | $jobConfiguration = $this->createJobConfiguration($configuration, $idsForJob); |
||||||
| 103 | |||||||
| 104 | $jobConfiguration->setBatchId($batch->getId()); |
||||||
| 105 | |||||||
| 106 | $configurationEntity = $this->saveConfiguration($byteArrayManager, $jobConfiguration); |
||||||
| 107 | |||||||
| 108 | $job = $this->createBatchJob($batch, $configurationEntity); |
||||||
| 109 | $job->setDeploymentId($deploymentId); |
||||||
| 110 | $this->postProcessJob($configuration, $job, $jobConfiguration); |
||||||
| 111 | $jobManager->insertAndHintJobExecutor($job); |
||||||
| 112 | |||||||
| 113 | $idsForJob = []; |
||||||
|
0 ignored issues
–
show
Unused Code
introduced
by
Loading history...
|
|||||||
| 114 | $createdJobs += 1; |
||||||
| 115 | } |
||||||
| 116 | |||||||
| 117 | // update created jobs for batch |
||||||
| 118 | $batch->setJobsCreated($batch->getJobsCreated() + $createdJobs); |
||||||
| 119 | } |
||||||
| 120 | |||||||
| 121 | abstract protected function createJobConfiguration(BatchConfiguration $configuration, array $processIdsForJob): BatchConfiguration; |
||||||
| 122 | |||||||
| 123 | protected function postProcessJob(BatchConfiguration $configuration, JobEntity $job, BatchConfiguration $jobConfiguration): void |
||||||
|
0 ignored issues
–
show
The parameter
$job is not used and could be removed.
(
Ignorable by Annotation
)
If this is a false-positive, you can also ignore this issue in your code via the
This check looks for parameters that have been defined for a function or method, but which are not used in the method body. Loading history...
|
|||||||
| 124 | { |
||||||
| 125 | // do nothing as default |
||||||
| 126 | } |
||||||
| 127 | |||||||
| 128 | protected function createBatchJob(BatchEntity $batch, ByteArrayEntity $configuration): JobEntity |
||||||
| 129 | { |
||||||
| 130 | $creationContext = new BatchJobContext($batch, $configuration); |
||||||
| 131 | return $this->getJobDeclaration()->createJobInstance($creationContext); |
||||||
| 132 | } |
||||||
| 133 | |||||||
| 134 | public function deleteJobs(BatchEntity $batch): void |
||||||
| 135 | { |
||||||
| 136 | $jobs = Context::getCommandContext() |
||||||
| 137 | ->getJobManager() |
||||||
| 138 | ->findJobsByJobDefinitionId($batch->getBatchJobDefinitionId()); |
||||||
| 139 | |||||||
| 140 | foreach ($jobs as $job) { |
||||||
| 141 | $job->delete(); |
||||||
| 142 | } |
||||||
| 143 | } |
||||||
| 144 | |||||||
| 145 | public function newConfiguration(string $canonicalString): JobHandlerConfigurationInterface |
||||||
| 146 | { |
||||||
| 147 | return new BatchJobConfiguration($canonicalString); |
||||||
| 148 | } |
||||||
| 149 | |||||||
| 150 | public function onDelete(JobHandlerConfigurationInterface $configuration, JobEntity $jobEntity): void |
||||||
| 151 | { |
||||||
| 152 | $byteArrayId = $configuration->getConfigurationByteArrayId(); |
||||||
|
0 ignored issues
–
show
The method
getConfigurationByteArrayId() does not exist on Jabe\Impl\JobExecutor\Jo...rConfigurationInterface. It seems like you code against a sub-type of Jabe\Impl\JobExecutor\Jo...rConfigurationInterface such as Jabe\Impl\Batch\BatchJobConfiguration.
(
Ignorable by Annotation
)
If this is a false-positive, you can also ignore this issue in your code via the
Loading history...
|
|||||||
| 153 | if ($byteArrayId !== null) { |
||||||
| 154 | Context::getCommandContext()->getByteArrayManager() |
||||||
| 155 | ->deleteByteArrayById($byteArrayId); |
||||||
| 156 | } |
||||||
| 157 | } |
||||||
| 158 | |||||||
| 159 | protected function saveConfiguration(ByteArrayManager $byteArrayManager, BatchConfiguration $jobConfiguration): ByteArrayEntity |
||||||
| 160 | { |
||||||
| 161 | $configurationEntity = new ByteArrayEntity(); |
||||||
| 162 | $configurationEntity->setBytes($this->writeConfiguration($jobConfiguration)); |
||||||
| 163 | $byteArrayManager->insert($configurationEntity); |
||||||
| 164 | return $configurationEntity; |
||||||
| 165 | } |
||||||
| 166 | |||||||
| 167 | public function writeConfiguration(BatchConfiguration $configuration): string |
||||||
| 168 | { |
||||||
| 169 | $jsonObject = $this->getJsonConverterInstance()->toJsonObject($configuration); |
||||||
| 170 | return JsonUtil::asBytes($jsonObject); |
||||||
|
0 ignored issues
–
show
It seems like
$jsonObject can also be of type null; however, parameter $jsonObject of Jabe\Impl\Util\JsonUtil::asBytes() does only seem to accept stdClass, maybe add an additional type check?
(
Ignorable by Annotation
)
If this is a false-positive, you can also ignore this issue in your code via the
Loading history...
|
|||||||
| 171 | } |
||||||
| 172 | |||||||
| 173 | public function readConfiguration(string $serializedConfiguration): BatchConfiguration |
||||||
| 174 | { |
||||||
| 175 | return $this->getJsonConverterInstance()->toObject(JsonUtil::asObject($serializedConfiguration)); |
||||||
| 176 | } |
||||||
| 177 | |||||||
| 178 | abstract protected function getJsonConverterInstance(): JsonObjectConverter; |
||||||
| 179 | } |
||||||
| 180 |