1 | <?php |
||||||
2 | |||||||
3 | namespace Jabe\Impl\Batch; |
||||||
4 | |||||||
5 | use Jabe\Impl\Context\Context; |
||||||
6 | use Jabe\Impl\JobExecutor\{ |
||||||
7 | JobDeclaration, |
||||||
8 | JobHandlerConfigurationInterface |
||||||
9 | }; |
||||||
10 | use Jabe\Impl\Json\JsonObjectConverter; |
||||||
11 | use Jabe\Impl\Persistence\Entity\{ |
||||||
12 | ByteArrayEntity, |
||||||
13 | ByteArrayManager, |
||||||
14 | JobEntity, |
||||||
15 | JobManager, |
||||||
16 | MessageEntity |
||||||
17 | }; |
||||||
18 | use Jabe\Impl\Util\JsonUtil; |
||||||
19 | |||||||
20 | abstract class AbstractBatchJobHandler implements BatchJobHandlerInterface |
||||||
21 | { |
||||||
22 | abstract public function getJobDeclaration(): JobDeclaration; |
||||||
23 | |||||||
24 | public function createJobs(BatchEntity $batch): bool |
||||||
25 | { |
||||||
26 | $configuration = $this->readConfiguration($batch->getConfigurationBytes()); |
||||||
27 | $deploymentId = null; |
||||||
28 | |||||||
29 | $idMappings = $configuration->getIdMappings(); |
||||||
30 | $deploymentAware = $idMappings !== null && !empty($idMappings); |
||||||
31 | |||||||
32 | $ids = $configuration->getIds(); |
||||||
33 | |||||||
34 | if ($deploymentAware) { |
||||||
35 | $this->sanitizeMappings($idMappings, $ids); |
||||||
36 | $mappingToProcess = $idMappings->get(0); |
||||||
37 | $ids = $mappingToProcess->getIds($ids); |
||||||
38 | $deploymentId = $mappingToProcess->getDeploymentId(); |
||||||
39 | } |
||||||
40 | |||||||
41 | $batchJobsPerSeed = $batch->getBatchJobsPerSeed(); |
||||||
42 | $invocationsPerBatchJob = $batch->getInvocationsPerBatchJob(); |
||||||
43 | |||||||
44 | $numberOfItemsToProcess = min($invocationsPerBatchJob * $batchJobsPerSeed, count($ids)); |
||||||
45 | |||||||
46 | // view of process instances to process |
||||||
47 | $processIds = array_slice($ids, 0, $numberOfItemsToProcess); |
||||||
48 | $this->createJobEntities($batch, $configuration, $deploymentId, $processIds, $invocationsPerBatchJob); |
||||||
49 | if ($deploymentAware) { |
||||||
50 | if (empty($ids)) { |
||||||
51 | // all ids of the deployment are handled |
||||||
52 | $idMappings->remove(0); |
||||||
53 | } else { |
||||||
54 | $idMappings->get(0)->removeIds($numberOfItemsToProcess); |
||||||
55 | } |
||||||
56 | } |
||||||
57 | |||||||
58 | // update batch configuration |
||||||
59 | $batch->setConfigurationBytes($this->writeConfiguration($configuration)); |
||||||
60 | |||||||
61 | return $deploymentAware ? $idMappings->isEmpty() : empty($ids); |
||||||
62 | } |
||||||
63 | |||||||
64 | protected function sanitizeMappings(DeploymentMappings $idMappings, array $ids): void |
||||||
65 | { |
||||||
66 | // for mixed version SeedJob execution, there might be ids that have been processed |
||||||
67 | // without updating the mappings, this is corrected here, |
||||||
68 | // see https://jira.camunda.com/browse/CAM-11188 |
||||||
69 | $elementsToRemove = $idMappings->getOverallIdCount() - count($ids); |
||||||
70 | if ($elementsToRemove > 0) { |
||||||
71 | foreach ($idMappings as $key => $deploymentMapping) { |
||||||
72 | if ($deploymentMapping->getCount() <= $elementsToRemove) { |
||||||
73 | $idMappings->remove($key); |
||||||
74 | $elementsToRemove -= $deploymentMapping->getCount(); |
||||||
75 | if ($elementsToRemove == 0) { |
||||||
76 | break; |
||||||
77 | } |
||||||
78 | } else { |
||||||
79 | $deploymentMapping->removeIds($elementsToRemove); |
||||||
80 | break; |
||||||
81 | } |
||||||
82 | } |
||||||
83 | } |
||||||
84 | } |
||||||
85 | |||||||
86 | protected function createJobEntities(BatchEntity $batch, BatchConfiguration $configuration, string $deploymentId, array $processIds, int $invocationsPerBatchJob): void |
||||||
87 | { |
||||||
88 | if (empty($processIds)) { |
||||||
89 | return; |
||||||
90 | } |
||||||
91 | |||||||
92 | $commandContext = Context::getCommandContext(); |
||||||
93 | $byteArrayManager = $commandContext->getByteArrayManager(); |
||||||
94 | $jobManager = $commandContext->getJobManager(); |
||||||
95 | |||||||
96 | $createdJobs = 0; |
||||||
97 | while (!empty($processIds)) { |
||||||
98 | $lastIdIndex = min($invocationsPerBatchJob, count($processIds)); |
||||||
99 | // view of process instances for this job |
||||||
100 | $idsForJob = array_splice($processIds, 0, $lastIdIndex); |
||||||
101 | |||||||
102 | $jobConfiguration = $this->createJobConfiguration($configuration, $idsForJob); |
||||||
103 | |||||||
104 | $jobConfiguration->setBatchId($batch->getId()); |
||||||
105 | |||||||
106 | $configurationEntity = $this->saveConfiguration($byteArrayManager, $jobConfiguration); |
||||||
107 | |||||||
108 | $job = $this->createBatchJob($batch, $configurationEntity); |
||||||
109 | $job->setDeploymentId($deploymentId); |
||||||
110 | $this->postProcessJob($configuration, $job, $jobConfiguration); |
||||||
111 | $jobManager->insertAndHintJobExecutor($job); |
||||||
112 | |||||||
113 | $idsForJob = []; |
||||||
0 ignored issues
–
show
Unused Code
introduced
by
![]() |
|||||||
114 | $createdJobs += 1; |
||||||
115 | } |
||||||
116 | |||||||
117 | // update created jobs for batch |
||||||
118 | $batch->setJobsCreated($batch->getJobsCreated() + $createdJobs); |
||||||
119 | } |
||||||
120 | |||||||
121 | abstract protected function createJobConfiguration(BatchConfiguration $configuration, array $processIdsForJob): BatchConfiguration; |
||||||
122 | |||||||
123 | protected function postProcessJob(BatchConfiguration $configuration, JobEntity $job, BatchConfiguration $jobConfiguration): void |
||||||
0 ignored issues
–
show
The parameter
$job is not used and could be removed.
(
Ignorable by Annotation
)
If this is a false-positive, you can also ignore this issue in your code via the
This check looks for parameters that have been defined for a function or method, but which are not used in the method body. ![]() |
|||||||
124 | { |
||||||
125 | // do nothing as default |
||||||
126 | } |
||||||
127 | |||||||
128 | protected function createBatchJob(BatchEntity $batch, ByteArrayEntity $configuration): JobEntity |
||||||
129 | { |
||||||
130 | $creationContext = new BatchJobContext($batch, $configuration); |
||||||
131 | return $this->getJobDeclaration()->createJobInstance($creationContext); |
||||||
132 | } |
||||||
133 | |||||||
134 | public function deleteJobs(BatchEntity $batch): void |
||||||
135 | { |
||||||
136 | $jobs = Context::getCommandContext() |
||||||
137 | ->getJobManager() |
||||||
138 | ->findJobsByJobDefinitionId($batch->getBatchJobDefinitionId()); |
||||||
139 | |||||||
140 | foreach ($jobs as $job) { |
||||||
141 | $job->delete(); |
||||||
142 | } |
||||||
143 | } |
||||||
144 | |||||||
145 | public function newConfiguration(string $canonicalString): JobHandlerConfigurationInterface |
||||||
146 | { |
||||||
147 | return new BatchJobConfiguration($canonicalString); |
||||||
148 | } |
||||||
149 | |||||||
150 | public function onDelete(JobHandlerConfigurationInterface $configuration, JobEntity $jobEntity): void |
||||||
151 | { |
||||||
152 | $byteArrayId = $configuration->getConfigurationByteArrayId(); |
||||||
0 ignored issues
–
show
The method
getConfigurationByteArrayId() does not exist on Jabe\Impl\JobExecutor\Jo...rConfigurationInterface . It seems like you code against a sub-type of Jabe\Impl\JobExecutor\Jo...rConfigurationInterface such as Jabe\Impl\Batch\BatchJobConfiguration .
(
Ignorable by Annotation
)
If this is a false-positive, you can also ignore this issue in your code via the
![]() |
|||||||
153 | if ($byteArrayId !== null) { |
||||||
154 | Context::getCommandContext()->getByteArrayManager() |
||||||
155 | ->deleteByteArrayById($byteArrayId); |
||||||
156 | } |
||||||
157 | } |
||||||
158 | |||||||
159 | protected function saveConfiguration(ByteArrayManager $byteArrayManager, BatchConfiguration $jobConfiguration): ByteArrayEntity |
||||||
160 | { |
||||||
161 | $configurationEntity = new ByteArrayEntity(); |
||||||
162 | $configurationEntity->setBytes($this->writeConfiguration($jobConfiguration)); |
||||||
163 | $byteArrayManager->insert($configurationEntity); |
||||||
164 | return $configurationEntity; |
||||||
165 | } |
||||||
166 | |||||||
167 | public function writeConfiguration(BatchConfiguration $configuration): string |
||||||
168 | { |
||||||
169 | $jsonObject = $this->getJsonConverterInstance()->toJsonObject($configuration); |
||||||
170 | return JsonUtil::asBytes($jsonObject); |
||||||
0 ignored issues
–
show
It seems like
$jsonObject can also be of type null ; however, parameter $jsonObject of Jabe\Impl\Util\JsonUtil::asBytes() does only seem to accept stdClass , maybe add an additional type check?
(
Ignorable by Annotation
)
If this is a false-positive, you can also ignore this issue in your code via the
![]() |
|||||||
171 | } |
||||||
172 | |||||||
173 | public function readConfiguration(string $serializedConfiguration): BatchConfiguration |
||||||
174 | { |
||||||
175 | return $this->getJsonConverterInstance()->toObject(JsonUtil::asObject($serializedConfiguration)); |
||||||
176 | } |
||||||
177 | |||||||
178 | abstract protected function getJsonConverterInstance(): JsonObjectConverter; |
||||||
179 | } |
||||||
180 |