| Total Complexity | 28 |
| Total Lines | 244 |
| Duplicated Lines | 0 % |
| Changes | 1 | ||
| Bugs | 0 | Features | 0 |
| 1 | <?php |
||
| 35 | class BackgroundTasksHandler { |
||
| 36 | private $settings; |
||
| 37 | private $logger; |
||
| 38 | private $maxParallelTasks; |
||
| 39 | private $maxExecutionTime; |
||
| 40 | private $batchSize; |
||
| 41 | private $maxTimeBeforeRemoval; |
||
| 42 | |||
| 43 | public function __construct(array $settings) { |
||
| 44 | $this->settings = $settings; |
||
| 45 | $this->logger = new TaskLogger($settings, LOG_TASKS_FILE); |
||
| 46 | $this->maxParallelTasks = $settings['max_parallel_tasks'] ?? 2; |
||
| 47 | $this->maxExecutionTime = $settings['task_maximum_run_time'] ?? 300; |
||
| 48 | $this->batchSize = $settings['task_batch_size'] ?? 50; |
||
| 49 | $this->maxTimeBeforeRemoval = isset($settings['history_duration']) ? ($settings['history_duration'] * 24 * 3600) : (15 * 24 * 3600); |
||
| 50 | } |
||
| 51 | |||
| 52 | /** |
||
| 53 | * Main function to process background tasks |
||
| 54 | */ |
||
| 55 | public function processBackgroundTasks() { |
||
| 56 | // Prevent multiple concurrent executions |
||
| 57 | if (!$this->acquireProcessLock()) { |
||
| 58 | if (LOG_TASKS=== true) $this->logger->log('Process already running', 'INFO'); |
||
|
1 ignored issue
–
show
|
|||
| 59 | return false; |
||
| 60 | } |
||
| 61 | |||
| 62 | try { |
||
| 63 | $this->cleanupStaleTasks(); |
||
| 64 | $this->processTaskBatches(); |
||
| 65 | $this->performMaintenanceTasks(); |
||
| 66 | } catch (Exception $e) { |
||
| 67 | if (LOG_TASKS=== true) $this->logger->log('Task processing error: ' . $e->getMessage(), 'ERROR'); |
||
|
1 ignored issue
–
show
|
|||
| 68 | } finally { |
||
| 69 | $this->releaseProcessLock(); |
||
| 70 | } |
||
| 71 | } |
||
| 72 | |||
| 73 | /** |
||
| 74 | * Acquire a lock to prevent multiple instances of this script from running simultaneously. |
||
| 75 | * @return bool |
||
| 76 | */ |
||
| 77 | private function acquireProcessLock(): bool { |
||
| 88 | } |
||
| 89 | |||
| 90 | /** |
||
| 91 | * Release the lock file. |
||
| 92 | */ |
||
| 93 | private function releaseProcessLock() { |
||
| 94 | $lockFile = empty(TASKS_LOCK_FILE) ? __DIR__.'/../files/teampass_background_tasks.lock' : TASKS_LOCK_FILE; |
||
| 95 | unlink($lockFile); |
||
| 96 | } |
||
| 97 | |||
| 98 | /** |
||
| 99 | * Cleanup stale tasks that have been running for too long or are marked as failed. |
||
| 100 | */ |
||
| 101 | private function cleanupStaleTasks() { |
||
| 102 | // Mark tasks as failed if they've been running too long |
||
| 103 | DB::query( |
||
| 104 | 'UPDATE ' . prefixTable('background_tasks') . ' |
||
| 105 | SET is_in_progress = -1, |
||
| 106 | finished_at = %i, |
||
| 107 | status = "failed" |
||
| 108 | WHERE is_in_progress = 1 |
||
| 109 | AND started_at < %i', |
||
| 110 | time(), |
||
| 111 | time() - $this->maxExecutionTime |
||
| 112 | ); |
||
| 113 | |||
| 114 | // Remove very old failed tasks |
||
| 115 | DB::query( |
||
| 116 | 'DELETE t, st FROM ' . prefixTable('background_tasks') . ' t |
||
| 117 | INNER JOIN ' . prefixTable('background_subtasks') . ' st ON (t.increment_id = st.task_id) |
||
| 118 | WHERE t.finished_at > %i |
||
| 119 | AND t.status = %s', |
||
| 120 | time() - $this->maxTimeBeforeRemoval, |
||
| 121 | "failed" |
||
| 122 | ); |
||
| 123 | } |
||
| 124 | |||
| 125 | /** |
||
| 126 | * Process batches of tasks. |
||
| 127 | * This method fetches tasks from the database and processes them in parallel. |
||
| 128 | */ |
||
| 129 | private function processTaskBatches() { |
||
| 130 | $runningTasks = $this->countRunningTasks(); |
||
| 131 | |||
| 132 | // Check if the maximum number of parallel tasks is reached |
||
| 133 | if ($runningTasks >= $this->maxParallelTasks) { |
||
| 134 | if (LOG_TASKS=== true) $this->logger->log('Wait ... '.$runningTasks.' out of '.$this->maxParallelTasks.' are already running ', 'INFO'); |
||
|
1 ignored issue
–
show
|
|||
| 135 | return; |
||
| 136 | } |
||
| 137 | |||
| 138 | $availableSlotsCount = $this->maxParallelTasks - $runningTasks; |
||
| 139 | |||
| 140 | // Fetch next batch of tasks |
||
| 141 | $tasks = DB::query( |
||
| 142 | 'SELECT increment_id, process_type, arguments |
||
| 143 | FROM ' . prefixTable('background_tasks') . ' |
||
| 144 | WHERE is_in_progress = 0 |
||
| 145 | AND (finished_at IS NULL OR finished_at = "") |
||
| 146 | ORDER BY increment_id ASC |
||
| 147 | LIMIT %i', |
||
| 148 | min($this->batchSize, $availableSlotsCount) |
||
| 149 | ); |
||
| 150 | |||
| 151 | foreach ($tasks as $task) { |
||
| 152 | if (LOG_TASKS=== true) $this->logger->log('Launching '.$task['increment_id'], 'INFO'); |
||
| 153 | $this->processIndividualTask($task); |
||
| 154 | } |
||
| 155 | } |
||
| 156 | |||
| 157 | /** |
||
| 158 | * Process an individual task. |
||
| 159 | * This method updates the task status in the database and starts a new process for the task. |
||
| 160 | * @param array $task The task to process. |
||
| 161 | */ |
||
| 162 | private function processIndividualTask(array $task) { |
||
| 163 | if (LOG_TASKS=== true) $this->logger->log('Processing task: ' . print_r($task, true), 'INFO'); |
||
|
1 ignored issue
–
show
|
|||
| 164 | |||
| 165 | // Store progress in the database |
||
| 166 | DB::update( |
||
| 167 | prefixTable('background_tasks'), |
||
| 168 | [ |
||
| 169 | 'is_in_progress' => 1, |
||
| 170 | 'started_at' => time(), |
||
| 171 | 'status' => 'in_progress' |
||
| 172 | ], |
||
| 173 | 'increment_id = %i', |
||
| 174 | $task['increment_id'] |
||
| 175 | ); |
||
| 176 | |||
| 177 | // Prepare process |
||
| 178 | $process = new Process([ |
||
| 179 | PHP_BINARY, |
||
| 180 | __DIR__ . '/background_tasks___worker.php', |
||
| 181 | $task['increment_id'], |
||
| 182 | $task['process_type'], |
||
| 183 | $task['arguments'] |
||
| 184 | ]); |
||
| 185 | |||
| 186 | // Launch process |
||
| 187 | $process->run(); |
||
| 188 | } |
||
| 189 | |||
| 190 | /** |
||
| 191 | * Count the number of currently running tasks. |
||
| 192 | * @return int The number of running tasks. |
||
| 193 | */ |
||
| 194 | private function countRunningTasks(): int { |
||
| 195 | return DB::queryFirstField( |
||
| 196 | 'SELECT COUNT(*) |
||
| 197 | FROM ' . prefixTable('background_tasks') . ' |
||
| 198 | WHERE is_in_progress = 1' |
||
| 199 | ); |
||
| 200 | } |
||
| 201 | |||
| 202 | /** |
||
| 203 | * Perform maintenance tasks. |
||
| 204 | * This method cleans up old items, expired tokens, and finished tasks. |
||
| 205 | */ |
||
| 206 | private function performMaintenanceTasks() { |
||
| 210 | } |
||
| 211 | |||
| 212 | /** |
||
| 213 | * Clean up multiple items edition. |
||
| 214 | * This method removes duplicate entries in the items_edition table. |
||
| 215 | */ |
||
| 216 | private function cleanMultipleItemsEdition() { |
||
| 217 | DB::query( |
||
| 218 | 'DELETE i1 FROM ' . prefixTable('items_edition') . ' i1 |
||
| 219 | JOIN ( |
||
| 220 | SELECT user_id, item_id, MIN(timestamp) AS oldest_timestamp |
||
| 221 | FROM ' . prefixTable('items_edition') . ' |
||
| 222 | GROUP BY user_id, item_id |
||
| 223 | ) i2 ON i1.user_id = i2.user_id AND i1.item_id = i2.item_id |
||
| 224 | WHERE i1.timestamp > i2.oldest_timestamp' |
||
| 225 | ); |
||
| 226 | } |
||
| 227 | |||
| 228 | /** |
||
| 229 | * Handle item tokens expiration. |
||
| 230 | * This method removes expired tokens from the items_edition table. |
||
| 231 | */ |
||
| 232 | private function handleItemTokensExpiration() { |
||
| 233 | DB::query( |
||
| 234 | 'DELETE FROM ' . prefixTable('items_edition') . ' |
||
| 235 | WHERE timestamp < %i', |
||
| 236 | time() - ($this->settings['delay_item_edition'] * 60 ?: EDITION_LOCK_PERIOD) |
||
| 237 | ); |
||
| 238 | } |
||
| 239 | |||
| 240 | /** |
||
| 241 | * Clean up old finished tasks. |
||
| 242 | * This method removes tasks that have been completed for too long. |
||
| 243 | */ |
||
| 244 | private function cleanOldFinishedTasks() { |
||
| 279 | } |
||
| 280 | } |
||
| 281 | |||
| 282 | |||
| 283 | |||
| 284 | // Main execution |
||
| 285 | try { |
||
| 286 | $configManager = new ConfigManager(); |
||
| 293 | } |