execut /
yii2-import
| 1 | <?php |
||
| 2 | /** |
||
| 3 | * User: execut |
||
| 4 | * Date: 18.07.16 |
||
| 5 | * Time: 10:14 |
||
| 6 | */ |
||
| 7 | |||
| 8 | namespace execut\import\controllers; |
||
| 9 | |||
| 10 | |||
| 11 | use execut\crudFields\fields\Field; |
||
| 12 | use execut\import\components\Importer; |
||
| 13 | use execut\import\components\parser\exception\Exception; |
||
| 14 | use execut\import\components\parser\Stack; |
||
| 15 | use execut\import\models\FilesSource; |
||
| 16 | use execut\import\models\FilesStatuse; |
||
| 17 | use execut\import\models\File; |
||
| 18 | use execut\import\models\Log; |
||
| 19 | use execut\import\models\Setting; |
||
| 20 | |||
| 21 | use yii\console\Controller; |
||
| 22 | use yii\log\Logger; |
||
| 23 | use yii\mutex\Mutex; |
||
| 24 | |||
| 25 | class ConsoleController extends Controller |
||
| 26 | { |
||
| 27 | public $loadsLimit = 20; |
||
| 28 | public $stackSize = 650; |
||
| 29 | public $offset = 0; |
||
| 30 | public $fileId = null; |
||
| 31 | protected $lastCheckedRow = 0; |
||
| 32 | |||
| 33 | public function options($actionID) |
||
| 34 | { |
||
| 35 | if ($actionID === 'index') { |
||
| 36 | return [ |
||
| 37 | 'fileId', |
||
| 38 | 'stackSize', |
||
| 39 | 'offset', |
||
| 40 | ]; |
||
| 41 | } |
||
| 42 | // $actionId might be used in subclasses to provide options specific to action id |
||
| 43 | return ['color', 'interactive', 'help']; |
||
| 44 | } |
||
| 45 | |||
| 46 | public function actionIndex() { |
||
| 47 | ini_set('memory_limit', -1); |
||
| 48 | $id = $this->fileId; |
||
| 49 | // $this->clearOldFiles(); |
||
| 50 | $q = File::find(); |
||
| 51 | if (YII_ENV !== 'dev') { |
||
|
0 ignored issues
–
show
introduced
by
Loading history...
|
|||
| 52 | $this->markErrorFiles(); |
||
| 53 | } |
||
| 54 | |||
| 55 | if ($id === null) { |
||
| 56 | $q->byHostName(gethostname()); |
||
| 57 | $q |
||
| 58 | ->isForImport() |
||
| 59 | ->isNotLoadedOrStop() |
||
| 60 | ->isOnlyFresh() |
||
| 61 | ->orderBy('created ASC'); |
||
| 62 | $currentFilesCount = File::find()->byHostName(gethostname())->isLoading()->count(); |
||
| 63 | if ($currentFilesCount >= $this->loadsLimit) { |
||
| 64 | echo 'Files limit reached ' . $this->loadsLimit . '. Now loaded ' . $currentFilesCount . ' files'; |
||
| 65 | return; |
||
| 66 | } |
||
| 67 | } else { |
||
| 68 | $q->byId($id); |
||
| 69 | } |
||
| 70 | |||
| 71 | while (true) { |
||
| 72 | if ($id === null) { |
||
| 73 | $this->waitForRelease(); |
||
| 74 | } |
||
| 75 | |||
| 76 | /** |
||
| 77 | * @var File $file |
||
| 78 | */ |
||
| 79 | $file = $q->one(); |
||
| 80 | if ($id === null && !$file) { |
||
| 81 | $this->release(); |
||
| 82 | break; |
||
| 83 | } |
||
| 84 | |||
| 85 | $file->triggerLoading(); |
||
| 86 | if ($id === null) { |
||
| 87 | $this->release(); |
||
| 88 | } |
||
| 89 | |||
| 90 | $this->deleteOldFilesBySetting($file); |
||
| 91 | |||
| 92 | $this->parseFile($file); |
||
| 93 | if ($id !== null) { |
||
| 94 | break; |
||
| 95 | } |
||
| 96 | } |
||
| 97 | } |
||
| 98 | |||
| 99 | public function actionReleaseTrigger() { |
||
| 100 | $this->release(); |
||
| 101 | } |
||
| 102 | |||
| 103 | protected function deleteOldFilesBySetting($file) { |
||
| 104 | $this->waitForRelease(); |
||
| 105 | $q = File::find()->byImportSettingId($file->import_setting_id)->andWhere([ |
||
| 106 | '<>', |
||
| 107 | 'id', |
||
| 108 | $file->id, |
||
| 109 | ])->select('id'); |
||
| 110 | $files = $q->all(); |
||
| 111 | foreach ($files as $file) { |
||
|
0 ignored issues
–
show
|
|||
| 112 | if ($file->delete()) { |
||
| 113 | echo 'Deleted file #' . $file->id . "\n"; |
||
|
0 ignored issues
–
show
|
|||
| 114 | } |
||
| 115 | } |
||
| 116 | |||
| 117 | $this->release(); |
||
| 118 | } |
||
| 119 | |||
| 120 | protected function markErrorFiles() |
||
| 121 | { |
||
| 122 | $this->waitForRelease(); |
||
| 123 | $this->stdout('Start check failed files' . "\n"); |
||
| 124 | /** |
||
| 125 | * @var File $file |
||
| 126 | */ |
||
| 127 | $files = File::find()->byHostName(gethostname())->isWithoutProcess()->isInProgress()->all(); |
||
| 128 | foreach ($files as $file) { |
||
| 129 | $this->stdout('File ' . $file->id . ' start mark as error' . "\n"); |
||
|
0 ignored issues
–
show
|
|||
| 130 | $attributes = [ |
||
| 131 | 'level' => Logger::LEVEL_ERROR, |
||
| 132 | 'category' => 'import.notFoundProcess', |
||
| 133 | 'message' => 'The process ' . $file->process_id . ' to import the file was not found', |
||
|
0 ignored issues
–
show
|
|||
| 134 | ]; |
||
| 135 | $file->logError($attributes); |
||
| 136 | $file->triggerException(); |
||
| 137 | $this->stdout('File ' . $file->id . ' is marked as errored' . "\n"); |
||
| 138 | } |
||
| 139 | |||
| 140 | $this->release(); |
||
| 141 | $this->stdout('End check failed files' . "\n"); |
||
| 142 | } |
||
| 143 | |||
| 144 | protected function clearOldFiles() |
||
| 145 | { |
||
| 146 | while (true) { |
||
| 147 | $this->waitForRelease(); |
||
| 148 | /** |
||
| 149 | * @var File $file |
||
| 150 | */ |
||
| 151 | $file = File::find()->byHostName(gethostname())->isForClean()->one(); |
||
| 152 | if (!$file) { |
||
| 153 | $this->release(); |
||
| 154 | break; |
||
| 155 | } |
||
| 156 | |||
| 157 | $file->triggerDeleting(); |
||
| 158 | $file->delete(); |
||
| 159 | $this->release(); |
||
| 160 | } |
||
| 161 | } |
||
| 162 | |||
| 163 | protected function parseFile(File $file) { |
||
| 164 | $this->stdout('Start parse file #' . $file->id . ' ' . $file->name . "\n"); |
||
| 165 | try { |
||
| 166 | $file->scenario = 'import'; |
||
| 167 | $data = $file->getRows(); |
||
| 168 | if ($this->offset > 0) { |
||
| 169 | $data = array_splice($data, $this->offset); |
||
| 170 | } |
||
| 171 | // $data = array_splice($data, 23400, 1000000); |
||
| 172 | ini_set('error_reporting', E_ERROR); |
||
| 173 | $file->rows_count = count($data); |
||
| 174 | $file->save(); |
||
| 175 | ini_set('error_reporting', E_ALL); |
||
| 176 | } catch (\Exception $e) { |
||
| 177 | $attributes = [ |
||
| 178 | 'level' => Logger::LEVEL_ERROR, |
||
| 179 | 'category' => 'import.fatalError', |
||
| 180 | 'message' => $e->getMessage() . "\n" . $e->getTraceAsString(), |
||
| 181 | ]; |
||
| 182 | $file->logError($attributes); |
||
| 183 | $file->triggerException(); |
||
| 184 | throw $e; |
||
| 185 | } |
||
| 186 | |||
| 187 | $stacksSettings = $file->getSettings(); |
||
| 188 | // \yii::$app->db->close(); |
||
| 189 | $importer = new Importer([ |
||
| 190 | 'file' => $file, |
||
| 191 | 'settings' => $stacksSettings, |
||
| 192 | 'data' => $data, |
||
| 193 | 'stackSize' => $this->stackSize, |
||
| 194 | ]); |
||
| 195 | $importer->run(); |
||
| 196 | } |
||
| 197 | |||
| 198 | public function actionCheckSource($type = 'email', $id = null, $isIgnoreMutex = false) { |
||
| 199 | imap_timeout( IMAP_CLOSETIMEOUT, 60); |
||
| 200 | /** |
||
| 201 | * @var Mutex $mutex |
||
| 202 | */ |
||
| 203 | $mutex = \yii::$app->mutex; |
||
| 204 | $mutexKey = self::class . '_' . $type; |
||
| 205 | while (!$isIgnoreMutex && !$mutex->acquire($mutexKey)) { |
||
| 206 | sleep(1); |
||
| 207 | } |
||
| 208 | |||
| 209 | $this->stdout('Checking source type ' . $type . "\n"); |
||
| 210 | $q = Setting::find(); |
||
| 211 | if ($id) { |
||
| 212 | $q->andWhere(['id' => $id]); |
||
| 213 | } else { |
||
| 214 | $q->byImportFilesSource_key($type); |
||
| 215 | } |
||
| 216 | |||
| 217 | /** |
||
| 218 | * @var Setting[] $importSettings |
||
| 219 | */ |
||
| 220 | $importSettings = $q->all(); |
||
| 221 | FilesSource::$emailAdapter = null; |
||
| 222 | foreach ($importSettings as $setting) { |
||
| 223 | $source = $setting->getSource(); |
||
| 224 | $files = $source->getFiles(); |
||
| 225 | if (!empty($files)) { |
||
| 226 | foreach ($files as $file) { |
||
| 227 | $md5 = md5_file($file->filePath); |
||
| 228 | if ($importFile = File::find()->byMd5($md5)->select(['id', 'updated'])->one()) { |
||
| 229 | $importFile->updated = date('Y-m-d H:i:s'); |
||
| 230 | /** |
||
| 231 | * @var File $file |
||
| 232 | */ |
||
| 233 | $importFile->save(false, [ |
||
| 234 | 'updated' |
||
| 235 | ]); |
||
| 236 | echo 'File with md5 ' . $md5 . ' is already exists' . "\n"; |
||
| 237 | } else { |
||
| 238 | $importFile = new File(); |
||
| 239 | $importFile->scenario = Field::SCENARIO_FORM; |
||
| 240 | $fileInfo = pathinfo($file->filePath); |
||
| 241 | $importFile->attributes = [ |
||
| 242 | 'name' => $file->fileName, |
||
| 243 | 'extension' => $fileInfo['extension'], |
||
| 244 | 'mime_type' => mime_content_type($file->filePath), |
||
| 245 | 'import_setting_id' => $setting->id, |
||
| 246 | 'import_files_source_id' => $setting->filesSource->id, |
||
|
0 ignored issues
–
show
The property
filesSource does not exist on execut\import\models\Setting. Since you implemented __get, consider adding a @property annotation.
Loading history...
|
|||
| 247 | 'content' => $file->content, |
||
| 248 | ]; |
||
| 249 | |||
| 250 | $this->saveModel($importFile); |
||
| 251 | } |
||
| 252 | } |
||
| 253 | } |
||
| 254 | } |
||
| 255 | |||
| 256 | $mutex->release($mutexKey); |
||
| 257 | } |
||
| 258 | |||
| 259 | public function actionCheckSourceDaemon($type = 'email') { |
||
| 260 | while (true) { |
||
| 261 | $this->actionCheckSource($type); |
||
| 262 | sleep(60); |
||
| 263 | } |
||
| 264 | } |
||
| 265 | |||
| 266 | protected function saveModel($model) { |
||
| 267 | if ($model->save()) { |
||
| 268 | $this->stdout('Model ' . $model . ' is saved' . "\n"); |
||
| 269 | } else { |
||
| 270 | $this->stderr('Model ' . $model . ' is errors: ' . var_export($model->errors, true) . "\n"); |
||
| 271 | } |
||
| 272 | } |
||
| 273 | |||
| 274 | /** |
||
| 275 | * @return Mutex |
||
| 276 | */ |
||
| 277 | protected function waitForRelease(): Mutex |
||
| 278 | { |
||
| 279 | /** |
||
| 280 | * @var Mutex $mutex |
||
| 281 | */ |
||
| 282 | $mutex = \yii::$app->mutex; |
||
| 283 | while (!$mutex->acquire(__CLASS__)) { |
||
| 284 | echo 'Wait for release' . "\n"; |
||
| 285 | sleep(1); |
||
| 286 | } |
||
| 287 | |||
| 288 | return $mutex; |
||
| 289 | } |
||
| 290 | |||
| 291 | protected function release(): void |
||
| 292 | { |
||
| 293 | /** |
||
| 294 | * @var Mutex $mutex |
||
| 295 | */ |
||
| 296 | $mutex = \yii::$app->mutex; |
||
| 297 | $mutex->release(__CLASS__); |
||
| 298 | } |
||
| 299 | } |