Passed
Push — master ( 662d52...2a8232 )
by eXeCUT
03:58
created

controllers/ConsoleController.php (4 issues)

1
<?php
2
/**
3
 * User: execut
4
 * Date: 18.07.16
5
 * Time: 10:14
6
 */
7
8
namespace execut\import\controllers;
9
10
11
use execut\crudFields\fields\Field;
12
use execut\import\components\Importer;
13
use execut\import\components\parser\exception\Exception;
14
use execut\import\components\parser\Stack;
15
use execut\import\models\FilesSource;
16
use execut\import\models\FilesStatuse;
17
use execut\import\models\File;
18
use execut\import\models\Log;
19
use execut\import\models\Setting;
20
21
use yii\console\Controller;
22
use yii\log\Logger;
23
use yii\mutex\Mutex;
24
25
class ConsoleController extends Controller
26
{
27
    public $loadsLimit = 20;
28
    public $stackSize = 650;
29
    public $fileId = null;
30
    protected $lastCheckedRow = 0;
31
32
    public function options($actionID)
33
    {
34
        if ($actionID === 'index') {
35
            return [
36
                'fileId',
37
                'stackSize',
38
            ];
39
        }
40
        // $actionId might be used in subclasses to provide options specific to action id
41
        return ['color', 'interactive', 'help'];
42
    }
43
44
    public function actionIndex() {
45
        ini_set('memory_limit', -1);
46
        $id = $this->fileId;
47
        $this->clearOldFiles();
48
        $q = File::find();
49
        if (YII_ENV !== 'dev') {
0 ignored issues
show
The condition execut\import\controllers\YII_ENV !== 'dev' is always true.
Loading history...
50
            $this->markErrorFiles();
51
        }
52
53
        if ($id === null) {
54
            $q->byHostName(gethostname());
55
            $q->isForImport()->isOnlyFresh()->orderBy('created ASC');
56
            $currentFilesCount = File::find()->byHostName(gethostname())->isLoading()->count();
57
            if ($currentFilesCount >= $this->loadsLimit) {
58
                echo 'Files limit reached ' . $this->loadsLimit . '. Now loaded ' . $currentFilesCount . ' files';
59
                return;
60
            }
61
        } else {
62
            $q->byId($id);
63
        }
64
65
        while (true) {
66
            if ($id === null) {
67
                $this->waitForRelease();
68
            }
69
70
            /**
71
             * @var File $file
72
             */
73
            $file = $q->one();
74
            if ($id === null && !$file) {
75
                $this->release();
76
                break;
77
            }
78
79
            $file->triggerLoading();
80
            if ($id === null) {
81
                $this->release();
82
            }
83
84
            $this->deleteOldFilesBySetting($file);
85
86
            $this->parseFile($file);
87
            if ($id !== null) {
88
                break;
89
            }
90
        }
91
    }
92
93
    public function actionReleaseTrigger() {
94
        $this->release();
95
    }
96
97
    protected function deleteOldFilesBySetting($file) {
98
        $q = File::find()->byImportSettingId($file->import_setting_id)->andWhere([
99
            '<>',
100
            'id',
101
            $file->id,
102
        ])->select('id');
103
        $this->waitForRelease();
104
        File::updateAll([
105
            'import_files_statuse_id' => FilesStatuse::find()->byKey(FilesStatuse::DELETE)->one()->id,
106
        ], ['id' => $q->column()]);
107
108
        $c = $q->count();
109
110
        $this->release();
111
112
        while ($c) {
113
            $this->waitForRelease();
114
            $c = $q->count();
115
            $this->release();
116
            echo 'Waiting while deleted file' . "\n";
117
            sleep(1);
118
        }
119
    }
120
121
    protected function markErrorFiles()
122
    {
123
        $this->waitForRelease();
124
        $this->stdout('Start check failed files' . "\n");
125
        /**
126
         * @var File $file
127
         */
128
        $files = File::find()->byHostName(gethostname())->isWithoutProcess()->isInProgress()->all();
129
        foreach ($files as $file) {
130
            $attributes = [
131
                'level' => Logger::LEVEL_ERROR,
132
                'category' => 'import.notFoundProcess',
133
                'message' => 'The process ' . $file->process_id . ' to import the file was not found',
0 ignored issues
show
Accessing process_id on the interface yii\db\ActiveRecordInterface suggest that you code against a concrete implementation. How about adding an instanceof check?
Loading history...
134
            ];
135
            $file->logError($attributes);
136
            $file->triggerException();
137
            $this->stdout('File ' . $file->id . ' is marked as errored' . "\n");
0 ignored issues
show
Accessing id on the interface yii\db\ActiveRecordInterface suggest that you code against a concrete implementation. How about adding an instanceof check?
Loading history...
138
        }
139
140
        $this->release();
141
        $this->stdout('End check failed files' . "\n");
142
    }
143
144
    protected function clearOldFiles()
145
    {
146
        while (true) {
147
            $this->waitForRelease();
148
            /**
149
             * @var File $file
150
             */
151
            $file = File::find()->byHostName(gethostname())->isForClean()->one();
152
            if (!$file) {
153
                $this->release();
154
                break;
155
            }
156
157
            $file->triggerDeleting();
158
            $file->delete();
159
            $this->release();
160
        }
161
    }
162
163
    protected function parseFile(File $file) {
164
        $this->stdout('Start parse file #' . $file->id . ' ' . $file->name . "\n");
165
        $data = $file->getRows();
166
//        $data = array_splice($data, 23400, 1000000);
167
        $file->scenario = 'import';
168
        try {
169
            ini_set('error_reporting', E_ERROR);
170
            $file->rows_count = count($data);
171
            $file->save();
172
            ini_set('error_reporting', E_ALL);
173
        } catch (\Exception $e) {
174
            $attributes = [
175
                'level' => Logger::LEVEL_ERROR,
176
                'category' => 'import.fatalError',
177
                'message' => $e->getMessage() . "\n" . $e->getTraceAsString(),
178
            ];
179
            $file->logError($attributes);
180
            $file->triggerException();
181
            throw $e;
182
        }
183
184
        $stacksSettings = $file->getSettings();
185
        //        \yii::$app->db->close();
186
        $importer = new Importer([
187
            'file' => $file,
188
            'settings' => $stacksSettings,
189
            'data' => $data,
190
            'stackSize' => $this->stackSize,
191
        ]);
192
        $importer->run();
193
    }
194
195
    public function actionCheckSource($type = 'email', $id = null) {
196
        /**
197
         * @var Mutex $mutex
198
         */
199
        $mutex = \yii::$app->mutex;
200
        $mutexKey = self::class . '_' . $type;
201
        while (!$mutex->acquire($mutexKey)) {
202
            sleep(1);
203
        }
204
205
        $this->stdout('Checking source type ' . $type . "\n");
206
        $q = Setting::find();
207
        if ($id !== null) {
208
            $q->andWhere(['id' => $id]);
209
        } else {
210
            $q->byImportFilesSource_key($type);
211
        }
212
213
        /**
214
         * @var Setting[] $importSettings
215
         */
216
        $importSettings = $q->all();
217
        FilesSource::$emailAdapter = null;
218
        foreach ($importSettings as $setting) {
219
            $source = $setting->getSource();
220
            $files = $source->getFiles();
221
            if (!empty($files)) {
222
                foreach ($files as $file) {
223
                    $md5 = md5_file($file->filePath);
224
                    if ($importFile = File::find()->byMd5($md5)->select(['id', 'updated'])->one()) {
225
                        $importFile->updated = date('Y-m-d H:i:s');
226
                        /**
227
                         * @var File $file
228
                         */
229
                        $importFile->save(false, [
230
                            'updated'
231
                        ]);
232
                        echo 'File with md5 ' . $md5 . ' is already exists' . "\n";
233
                    } else {
234
                        $importFile = new File();
235
                        $importFile->scenario = Field::SCENARIO_FORM;
236
                        $fileInfo = pathinfo($file->filePath);
237
                        $importFile->attributes = [
238
                            'name' => $file->fileName,
239
                            'extension' => $fileInfo['extension'],
240
                            'mime_type' => mime_content_type($file->filePath),
241
                            'import_setting_id' => $setting->id,
242
                            'import_files_source_id' => $setting->filesSource->id,
0 ignored issues
show
Bug Best Practice introduced by eXeCUT
The property filesSource does not exist on execut\import\models\Setting. Since you implemented __get, consider adding a @property annotation.
Loading history...
243
                            'content' => $file->content,
244
                        ];
245
246
                        $this->saveModel($importFile);
247
                    }
248
                }
249
            }
250
        }
251
252
        $mutex->release($mutexKey);
253
    }
254
255
    public function actionCheckSourceDaemon($type = 'email') {
256
        while (true) {
257
            $this->actionCheckSource($type);
258
            sleep(60);
259
        }
260
    }
261
262
    protected function saveModel($model) {
263
        if ($model->save()) {
264
            $this->stdout('Model ' . $model . ' is saved' . "\n");
265
        } else {
266
            $this->stderr('Model ' . $model . ' is errors: ' . var_export($model->errors, true) . "\n");
267
        }
268
    }
269
270
    /**
271
     * @return Mutex
272
     */
273
    protected function waitForRelease(): Mutex
274
    {
275
        /**
276
         * @var Mutex $mutex
277
         */
278
        $mutex = \yii::$app->mutex;
279
        while (!$mutex->acquire(__CLASS__)) {
280
            echo 'Wait for release' . "\n";
281
            sleep(1);
282
        }
283
284
        return $mutex;
285
    }
286
287
    protected function release(): void
288
    {
289
        /**
290
         * @var Mutex $mutex
291
         */
292
        $mutex = \yii::$app->mutex;
293
        $mutex->release(__CLASS__);
294
    }
295
}