Total Complexity | 64 |
Total Lines | 499 |
Duplicated Lines | 0 % |
Changes | 6 | ||
Bugs | 0 | Features | 0 |
Complex classes like WorkerModelsEvents often do a lot of different things. To break such a class down, we need to identify a cohesive component within that class. A common approach to find such a component is to look for fields/methods that share the same prefixes, or suffixes.
Once you have determined the fields that belong together, you can apply the Extract Class refactoring. If the component makes sense as a sub-class, Extract Subclass is also a candidate, and is often faster.
While breaking up the class, it is a good idea to analyze how other classes use WorkerModelsEvents, and based on these observations, apply Extract Interface, too.
1 | <?php |
||
86 | class WorkerModelsEvents extends WorkerBase |
||
87 | { |
||
88 | private const int ACTION_TIMEOUT = 30; |
||
|
|||
89 | private bool $isProcessing = false; // seconds |
||
90 | private int $last_change; |
||
91 | |||
92 | // Array of planned reload actions that need to be started |
||
93 | private array $plannedReloadActions = []; |
||
94 | |||
95 | private int $timeout = 5; |
||
96 | |||
97 | // Array of core conf objects |
||
98 | private array $arrAsteriskConfObjects; |
||
99 | |||
100 | // Array of reload actions sorted by its priority |
||
101 | private array $reloadActions = []; |
||
102 | |||
103 | private array $otherModelsDependencyTable = []; |
||
104 | private array $pbxSettingsDependencyTable = []; |
||
105 | private array $customFilesDependencyTable = []; |
||
106 | |||
107 | private BeanstalkClient $beanstalkClient; |
||
108 | |||
109 | /** |
||
110 | * Invokes an action by publishing a job to the Beanstalk queue. |
||
111 | * |
||
112 | * @param string $action The action to invoke. |
||
113 | * @param array $parameters The parameters for the action. |
||
114 | * @param int $priority The priority of the job. |
||
115 | * @return void |
||
116 | */ |
||
117 | public static function invokeAction(string $action, array $parameters = [], int $priority = 0): void |
||
118 | { |
||
119 | $di = Di::getDefault(); |
||
120 | if (!$di) { |
||
121 | return; |
||
122 | } |
||
123 | /** @var BeanstalkClient $queue */ |
||
124 | $queue = $di->getShared(BeanstalkConnectionModelsProvider::SERVICE_NAME); |
||
125 | |||
126 | // Prepare the job data |
||
127 | $jobData = json_encode(['source' => BeanstalkConnectionModelsProvider::SOURCE_INVOKE_ACTION, 'action' => $action, 'parameters' => $parameters, 'model' => '']); |
||
128 | // Publish the job to the Beanstalk queue |
||
129 | $queue->publish($jobData, self::class, $priority, PheanstalkInterface::DEFAULT_DELAY, 3600); |
||
130 | } |
||
131 | |||
132 | /** |
||
133 | * Starts the model events worker. |
||
134 | * |
||
135 | * This method initializes the worker, subscribes to necessary events, and enters a loop waiting for these events. |
||
136 | * It acts as the main entry point for the worker's lifecycle. |
||
137 | * |
||
138 | * @param array $argv The command-line arguments passed to the worker. |
||
139 | * @return void |
||
140 | */ |
||
141 | public function start(array $argv): void |
||
142 | { |
||
143 | $this->initializeWorker(); |
||
144 | $this->subscribeToEvents(); |
||
145 | $this->waitForEvents(); |
||
146 | } |
||
147 | |||
148 | /** |
||
149 | * Initializes the worker by setting up initial state, loading configurations, and preparing dependencies. |
||
150 | * |
||
151 | * It sets the last change time, gets shared instances from the DI container, initializes PBX settings and model dependency tables, |
||
152 | * and sets up priority actions for reload. |
||
153 | * |
||
154 | * @return void |
||
155 | */ |
||
156 | private function initializeWorker(): void |
||
157 | { |
||
158 | |||
159 | $this->beanstalkClient = $this->getBeanstalkClient(); |
||
160 | |||
161 | $this->last_change = time() - $this->timeout; |
||
162 | |||
163 | // Array of core conf objects |
||
164 | $this->arrAsteriskConfObjects = $this->di->getShared(AsteriskConfModulesProvider::SERVICE_NAME); |
||
165 | |||
166 | // Initializes the PBX settings model dependency table. |
||
167 | $this->pbxSettingsDependencyTable = ProcessPBXSettings::getDependencyTable(); |
||
168 | |||
169 | // Initializes the custom files models dependency table. |
||
170 | $this->customFilesDependencyTable = ProcessCustomFiles::getDependencyTable(); |
||
171 | |||
172 | // Initializes the models dependency table. |
||
173 | $this->otherModelsDependencyTable = ProcessOtherModels::getDependencyTable(); |
||
174 | |||
175 | // Initializes the possible reload actions table. |
||
176 | $this->reloadActions = $this->getReloadActionsWithPriority(); |
||
177 | |||
178 | $this->plannedReloadActions = []; |
||
179 | } |
||
180 | |||
181 | /** |
||
182 | * Create BeanstalkClient connection |
||
183 | * @return BeanstalkClient |
||
184 | */ |
||
185 | private function getBeanstalkClient(): BeanstalkClient |
||
186 | { |
||
187 | $di = Di::getDefault(); |
||
188 | if (!$di) { |
||
189 | throw new RuntimeException("Dependency Injection container is not set."); |
||
190 | } |
||
191 | return $di->getShared(BeanstalkConnectionModelsProvider::SERVICE_NAME); |
||
192 | } |
||
193 | |||
194 | /** |
||
195 | * Get priority reload actions |
||
196 | * @return array |
||
197 | */ |
||
198 | private function getReloadActionsWithPriority(): array |
||
199 | { |
||
200 | return [ |
||
201 | ReloadModuleStateAction::class, |
||
202 | ReloadTimezoneAction::class, |
||
203 | ReloadSyslogDAction::class, |
||
204 | ReloadRestAPIWorkerAction::class, |
||
205 | ReloadNetworkAction::class, |
||
206 | ReloadFirewallAction::class, |
||
207 | ReloadFail2BanConfAction::class, |
||
208 | ReloadSSHAction::class, |
||
209 | ReloadLicenseAction::class, |
||
210 | ReloadSentryAction::class, |
||
211 | ReloadNatsAction::class, |
||
212 | ReloadNTPAction::class, |
||
213 | ReloadPHPFPMAction::class, |
||
214 | ReloadNginxAction::class, |
||
215 | ReloadNginxConfAction::class, |
||
216 | ReloadCrondAction::class, |
||
217 | RestartPBXCoreAction::class, |
||
218 | ReloadPBXCoreAction::class, |
||
219 | ReloadModulesConfAction::class, |
||
220 | ReloadFeaturesAction::class, |
||
221 | ReloadPJSIPAction::class, |
||
222 | ReloadRTPAction::class, |
||
223 | ReloadIAXAction::class, |
||
224 | ReloadH323Action::class, |
||
225 | ReloadHepAction::class, |
||
226 | ReloadDialplanAction::class, |
||
227 | ReloadParkingAction::class, |
||
228 | ReloadQueuesAction::class, |
||
229 | ReloadConferenceAction::class, |
||
230 | ReloadManagerAction::class, |
||
231 | ReloadVoicemailAction::class, |
||
232 | ReloadMOHAction::class, |
||
233 | ReloadWorkerCallEventsAction::class, |
||
234 | ReloadRecordingSettingsAction::class, |
||
235 | ReloadRecordSavePeriodAction::class, |
||
236 | ReloadCloudDescriptionAction::class, |
||
237 | ReloadCloudParametersAction::class |
||
238 | ]; |
||
239 | } |
||
240 | |||
241 | /** |
||
242 | * Subscribes the worker to relevant Beanstalk queues for processing model changes and handling pings. |
||
243 | * |
||
244 | * It ensures that the worker listens for incoming messages related to model changes and system pings, |
||
245 | * setting up appropriate callbacks for each. |
||
246 | * |
||
247 | * @return void |
||
248 | */ |
||
249 | private function subscribeToEvents(): void |
||
250 | { |
||
251 | $this->beanstalkClient->subscribe(self::class, [$this, 'processModelChanges']); |
||
252 | $this->beanstalkClient->subscribe($this->makePingTubeName(self::class), [$this, 'pingCallBack']); |
||
253 | $this->beanstalkClient->setTimeoutHandler([$this, 'timeoutHandler']); |
||
254 | } |
||
255 | |||
256 | /** |
||
257 | * Waits for events in a loop until a restart condition is met. |
||
258 | * |
||
259 | * This method keeps the worker in a loop, processing incoming events from the Beanstalk queue. |
||
260 | * The loop continues until an external condition triggers the need to restart the worker. |
||
261 | * |
||
262 | * @return void |
||
263 | */ |
||
264 | private function waitForEvents(): void |
||
265 | { |
||
266 | while ($this->needRestart === false) { |
||
267 | $this->beanstalkClient->wait(); |
||
268 | } |
||
269 | $this->timeoutHandler(); // Execute all collected changes before exit |
||
270 | } |
||
271 | |||
272 | /** |
||
273 | * Timeout handles |
||
274 | */ |
||
275 | public function timeoutHandler(): void |
||
276 | { |
||
277 | $this->last_change = time() - $this->timeout; |
||
278 | $this->startReload(); |
||
279 | } |
||
280 | |||
281 | /** |
||
282 | * Starts the reload process if there are modified tables. |
||
283 | * |
||
284 | * @return void |
||
285 | */ |
||
286 | private function startReload(): void |
||
287 | { |
||
288 | if ($this->isProcessing) { |
||
289 | return; |
||
290 | } |
||
291 | try { |
||
292 | $this->isProcessing = true; |
||
293 | // Check if there aren't any planned reload actions |
||
294 | if (count($this->plannedReloadActions) === 0) { |
||
295 | SystemMessages::sysLogMsg(__METHOD__, "No planed actions for reload", LOG_DEBUG); |
||
296 | return; |
||
297 | } |
||
298 | |||
299 | // Check if enough time has passed since the last change |
||
300 | if ((time() - $this->last_change) < $this->timeout) { |
||
301 | SystemMessages::sysLogMsg(__METHOD__, "Wait more time before starting the reload.", LOG_DEBUG); |
||
302 | return; |
||
303 | } |
||
304 | |||
305 | $executedActions = []; |
||
306 | // Process changes for each method in priority order |
||
307 | foreach ($this->reloadActions as $actionClassName) { |
||
308 | // Skip if there is no change for this method |
||
309 | if (!array_key_exists($actionClassName, $this->plannedReloadActions)) { |
||
310 | continue; |
||
311 | } |
||
312 | // Call the method if it exists |
||
313 | try { |
||
314 | // Set timeout for action execution |
||
315 | set_time_limit(self::ACTION_TIMEOUT); |
||
316 | |||
317 | $parameters = $this->plannedReloadActions[$actionClassName]['parameters']; |
||
318 | $hashes = array_keys($parameters); |
||
319 | SystemMessages::sysLogMsg($actionClassName, "Start action for the next parameters hashes: " . PHP_EOL . json_encode($hashes, JSON_PRETTY_PRINT), LOG_DEBUG); |
||
320 | |||
321 | $actionObject = new $actionClassName(); |
||
322 | $actionObject->execute($parameters); |
||
323 | $executedActions[] = $actionClassName; |
||
324 | } catch (Throwable $exception) { |
||
325 | CriticalErrorsHandler::handleExceptionWithSyslog($exception); |
||
326 | } |
||
327 | } |
||
328 | if (count($executedActions) > 0) { |
||
329 | SystemMessages::sysLogMsg(__METHOD__, "Reload actions were executed in the next order: " . PHP_EOL . json_encode($executedActions, JSON_PRETTY_PRINT), LOG_DEBUG); |
||
330 | } |
||
331 | |||
332 | // Send information about models changes to additional modules bulky without any details |
||
333 | PBXConfModulesProvider::hookModulesMethod(SystemConfigInterface::MODELS_EVENT_NEED_RELOAD, [$this->plannedReloadActions]); |
||
334 | |||
335 | // Reset the modified tables array |
||
336 | $this->plannedReloadActions = []; |
||
337 | } finally { |
||
338 | $this->isProcessing = false; |
||
339 | } |
||
340 | } |
||
341 | |||
342 | /** |
||
343 | * Processes model changes received from the Beanstalk queue. |
||
344 | * |
||
345 | * @param BeanstalkClient $message The message received from the Beanstalk queue. |
||
346 | * @return void |
||
347 | */ |
||
348 | public function processModelChanges(BeanstalkClient $message): void |
||
349 | { |
||
350 | try { |
||
351 | // Decode the received message |
||
352 | $receivedMessage = json_decode($message->getBody(), true, 512, JSON_THROW_ON_ERROR); |
||
353 | |||
354 | // Check the source of the message and perform actions accordingly |
||
355 | if ( |
||
356 | $receivedMessage['source'] === BeanstalkConnectionModelsProvider::SOURCE_INVOKE_ACTION |
||
357 | && in_array($receivedMessage['action'], $this->reloadActions) |
||
358 | ) { |
||
359 | // Store the modified table and its parameters |
||
360 | $this->planReloadAction($receivedMessage['action'], $receivedMessage['parameters']); |
||
361 | } elseif ($receivedMessage['source'] === BeanstalkConnectionModelsProvider::SOURCE_MODELS_CHANGED) { |
||
362 | // Fill the modified tables array with the changes from the received message |
||
363 | $this->fillModifiedTables($receivedMessage); |
||
364 | |||
365 | // Check the model events to renew advice cache |
||
366 | WorkerPrepareAdvice::afterModelEvents($receivedMessage); |
||
367 | } |
||
368 | |||
369 | // Start the reload process if there are modified tables |
||
370 | $this->startReload(); |
||
371 | |||
372 | if (!$receivedMessage) { |
||
373 | return; |
||
374 | } |
||
375 | |||
376 | // Send information about model changes to additional modules with changed data details |
||
377 | PBXConfModulesProvider::hookModulesMethod(SystemConfigInterface::MODELS_EVENT_CHANGE_DATA, [$receivedMessage]); |
||
378 | } catch (Throwable $exception) { |
||
379 | $this->needRestart = true; |
||
380 | CriticalErrorsHandler::handleExceptionWithSyslog($exception); |
||
381 | } |
||
382 | } |
||
383 | |||
384 | /** |
||
385 | * Add new reload action with parameters to the planned reload actions array. |
||
386 | * |
||
387 | * @param string $action The name of the action to be executed. |
||
388 | * @param array $parameters The parameters to be passed to the action. |
||
389 | * @return void |
||
390 | */ |
||
391 | private function planReloadAction(string $action, array $parameters = []): void |
||
392 | { |
||
393 | $newHash = $this->createUniqueKeyFromArray($parameters); |
||
394 | if (!array_key_exists($action, $this->plannedReloadActions)) { |
||
395 | $this->plannedReloadActions[$action]['parameters'][$newHash] = $parameters; |
||
396 | SystemMessages::sysLogMsg(__METHOD__, "New reload task $action planned with parameters (hash=$newHash):" . PHP_EOL . json_encode($parameters, JSON_PRETTY_PRINT), LOG_DEBUG); |
||
397 | } else { |
||
398 | foreach ($this->plannedReloadActions[$action]['parameters'] as $oldHash => $existParameters) { |
||
399 | if ($newHash === $oldHash) { |
||
400 | return; |
||
401 | } |
||
402 | $this->plannedReloadActions[$action]['parameters'][$newHash] = $parameters; |
||
403 | SystemMessages::sysLogMsg(__METHOD__, "Existing reload task $action received a new parameters (hash=$newHash)" . PHP_EOL . json_encode($parameters, JSON_PRETTY_PRINT), LOG_DEBUG); |
||
404 | } |
||
405 | } |
||
406 | } |
||
407 | |||
408 | private function createUniqueKeyFromArray(array $array): string |
||
409 | { |
||
410 | // Convert the array to JSON string |
||
411 | $json = json_encode($array); |
||
412 | |||
413 | // Create an MD5 hash of the JSON string |
||
414 | return md5($json); |
||
415 | } |
||
416 | |||
417 | /** |
||
418 | * Fills the modified tables array with changes based on the received data. |
||
419 | * |
||
420 | * @param array $data The data containing the changes. |
||
421 | * @return void |
||
422 | */ |
||
423 | private function fillModifiedTables(array $data): void |
||
424 | { |
||
425 | $countPlannedActions = count($this->plannedReloadActions); |
||
426 | $modifiedModel = $data['model'] ?? ''; |
||
427 | if (empty($modifiedModel)) { |
||
428 | return; |
||
429 | } |
||
430 | |||
431 | SystemMessages::sysLogMsg(__METHOD__, "New changes received:" . PHP_EOL . json_encode($data, JSON_PRETTY_PRINT), LOG_DEBUG); |
||
432 | |||
433 | // Clear cache for the called class |
||
434 | ModelsBase::clearCache($modifiedModel); |
||
435 | |||
436 | // Get new settings for dependent modules |
||
437 | $this->getNewSettingsForDependentModules($modifiedModel); |
||
438 | |||
439 | // Plan new reload actions |
||
440 | $this->planReloadActionsForCustomFiles($modifiedModel, $data); |
||
441 | $this->planReloadActionsForPbxSettings($modifiedModel, $data); |
||
442 | $this->planReloadActionsForOtherModels($modifiedModel, $data); |
||
443 | |||
444 | // Start counting time when the new reload actions were received |
||
445 | if ($countPlannedActions === 0 && count($this->plannedReloadActions) > 0) { |
||
446 | $this->last_change = time(); |
||
447 | } |
||
448 | } |
||
449 | |||
450 | /** |
||
451 | * Retrieves new settings for dependent modules based on the called class. |
||
452 | * |
||
453 | * @param string $modifiedModel The called class for which to retrieve settings. |
||
454 | * @return void |
||
455 | */ |
||
456 | private function getNewSettingsForDependentModules(string $modifiedModel): void |
||
457 | { |
||
458 | foreach ($this->arrAsteriskConfObjects as $configClassObj) { |
||
459 | try { |
||
460 | $dependencies = call_user_func([$configClassObj, AsteriskConfigInterface::GET_DEPENDENCE_MODELS]); |
||
461 | // Check if the called class is a dependency and the config class has the GET_SETTINGS method |
||
462 | if (in_array($modifiedModel, $dependencies, true) && method_exists($configClassObj, AsteriskConfigInterface::GET_SETTINGS)) { |
||
463 | // Retrieve the new settings for the config class |
||
464 | call_user_func([$configClassObj, AsteriskConfigInterface::GET_SETTINGS]); |
||
465 | } |
||
466 | } catch (Throwable $e) { |
||
467 | CriticalErrorsHandler::handleExceptionWithSyslog($e); |
||
468 | continue; |
||
469 | } |
||
470 | } |
||
471 | } |
||
472 | |||
473 | /** |
||
474 | * Fills the modified tables array based on the custom files data, the called class, and the record ID. |
||
475 | * |
||
476 | * @param string $modifiedModel The modified model class (Must be CustomFiles) |
||
477 | * @param array $modelData Data received during model change. |
||
478 | * @return void |
||
479 | */ |
||
480 | private function planReloadActionsForCustomFiles(string $modifiedModel, array $modelData): void |
||
481 | { |
||
482 | // Check if the called class is not CustomFiles |
||
483 | if (CustomFiles::class !== $modifiedModel || empty($modelData['recordId'])) { |
||
484 | return; |
||
485 | } |
||
486 | |||
487 | $changedCustomFile = CustomFiles::findFirstById($modelData['recordId']); |
||
488 | if ($changedCustomFile === null || $changedCustomFile->changed !== '1') { |
||
489 | return; |
||
490 | } |
||
491 | |||
492 | foreach ($this->customFilesDependencyTable as $dependencyData) { |
||
493 | // The rule for all files or the rule only for specific file |
||
494 | if ($dependencyData['filePath'] === '*' || strcasecmp($changedCustomFile->filepath, $dependencyData['filePath']) === 0) { |
||
495 | foreach ($dependencyData['actions'] as $action) { |
||
496 | $this->planReloadAction($action, $modelData); |
||
497 | } |
||
498 | } |
||
499 | } |
||
500 | |||
501 | // After actions are invoked, reset the changed status and save the file data |
||
502 | $changedCustomFile->writeAttribute("changed", '0'); |
||
503 | $changedCustomFile->save(); |
||
504 | } |
||
505 | |||
506 | /** |
||
507 | * Fills the modified tables array based on the PBX settings data, the called class, and the record ID. |
||
508 | * |
||
509 | * @param string $modifiedModel The modified model class (Must be PbxSettings) |
||
510 | * @param array $modelData Data received during model change. |
||
511 | * @return void |
||
512 | */ |
||
513 | private function planReloadActionsForPbxSettings(string $modifiedModel, array $modelData): void |
||
514 | { |
||
515 | // Check if the called class is not PbxSettings |
||
516 | if (PbxSettings::class !== $modifiedModel || empty($modelData['recordId'])) { |
||
517 | return; |
||
518 | } |
||
519 | |||
520 | // Clear cache for PbxSettings |
||
521 | PbxSettings::clearCache(PbxSettings::class); |
||
522 | |||
523 | // Find the PbxSettings record |
||
524 | /** @var PbxSettings $pbxSettings */ |
||
525 | $pbxSettings = PbxSettings::findFirstByKey($modelData['recordId']); |
||
526 | if ($pbxSettings === null) { |
||
527 | return; |
||
528 | } |
||
529 | $key = $pbxSettings->key; |
||
530 | |||
531 | // Iterate through the PBX settings dependency table and update the modified tables array |
||
532 | foreach ($this->pbxSettingsDependencyTable as $data) { |
||
533 | $additionalConditions = (isset($data['strPosKey']) && str_contains($key, $data['strPosKey'])); |
||
534 | |||
535 | // Check additional conditions and the setting name |
||
536 | if (!$additionalConditions && !in_array($key, $data['keys'], true)) { |
||
537 | continue; |
||
538 | } |
||
539 | |||
540 | // Update the modified tables array for each function |
||
541 | foreach ($data['actions'] as $action) { |
||
542 | $this->planReloadAction($action, $modelData); |
||
543 | } |
||
544 | } |
||
545 | } |
||
546 | |||
547 | /** |
||
548 | * Fills the modified tables array based on the models dependency table and the called class. |
||
549 | * |
||
550 | * @param string $modifiedModel The called model class. |
||
551 | * @param array $modelData Data received during model change. |
||
552 | * @return void |
||
553 | */ |
||
554 | private function planReloadActionsForOtherModels(string $modifiedModel, array $modelData): void |
||
555 | { |
||
556 | foreach ($this->otherModelsDependencyTable as $dependencyData) { |
||
557 | if (!in_array($modifiedModel, $dependencyData['modelClasses'], true)) { |
||
558 | continue; |
||
559 | } |
||
560 | foreach ($dependencyData['actions'] as $action) { |
||
561 | $this->planReloadAction($action, $modelData); |
||
562 | } |
||
563 | } |
||
564 | } |
||
565 | |||
566 | /** |
||
567 | * Callback for the ping to keep the connection alive. |
||
568 | * |
||
569 | * @param BeanstalkClient $message The received message. |
||
570 | * |
||
571 | * @return void |
||
572 | */ |
||
573 | public function pingCallBack(BeanstalkClient $message): void |
||
574 | { |
||
575 | try { |
||
576 | // Reply immediately to keep connection alive |
||
577 | $message->reply(json_encode($message->getBody() . ':pong')); |
||
578 | |||
579 | // Then process reload if needed |
||
580 | if (!$this->isProcessing) { |
||
581 | $this->startReload(); |
||
582 | } |
||
583 | } catch (Throwable $e) { |
||
584 | CriticalErrorsHandler::handleExceptionWithSyslog($e); |
||
585 | } |
||
593 |