Passed
Push — master ( df72f2...90116d )
by eXeCUT
04:31
created

ConsoleController   A

Complexity

Total Complexity 35

Size/Duplication

Total Lines 264
Duplicated Lines 0 %

Importance

Changes 0
Metric Value
wmc 35
eloc 149
c 0
b 0
f 0
dl 0
loc 264
rs 9.6

12 Methods

Rating   Name   Duplication   Size   Complexity  
B actionCheckSource() 0 56 7
A markErrorFiles() 0 21 2
A options() 0 10 2
A actionReleaseTrigger() 0 2 1
A actionCheckSourceDaemon() 0 4 2
B actionIndex() 0 42 9
A clearOldFiles() 0 16 3
A deleteOldFilesBySetting() 0 21 2
A saveModel() 0 5 2
A release() 0 7 1
A waitForRelease() 0 12 2
A parseFile() 0 30 2
1
<?php
2
/**
3
 * User: execut
4
 * Date: 18.07.16
5
 * Time: 10:14
6
 */
7
8
namespace execut\import\controllers;
9
10
11
use execut\import\components\Importer;
12
use execut\import\components\parser\exception\Exception;
13
use execut\import\components\parser\Stack;
14
use execut\import\models\FilesSource;
15
use execut\import\models\FilesStatuse;
16
use execut\import\models\File;
17
use execut\import\models\Log;
18
use execut\import\models\Setting;
19
20
use yii\console\Controller;
21
use yii\log\Logger;
22
use yii\mutex\Mutex;
23
24
class ConsoleController extends Controller
25
{
26
    public $loadsLimit = 20;
27
    public $stackSize = 650;
28
    public $fileId = null;
29
    protected $lastCheckedRow = 0;
30
31
    public function options($actionID)
32
    {
33
        if ($actionID === 'index') {
34
            return [
35
                'fileId',
36
                'stackSize',
37
            ];
38
        }
39
        // $actionId might be used in subclasses to provide options specific to action id
40
        return ['color', 'interactive', 'help'];
41
    }
42
43
    public function actionIndex() {
44
        ini_set('memory_limit', -1);
45
        $id = $this->fileId;
46
        $this->clearOldFiles();
47
        $q = File::find();
48
        $this->markErrorFiles();
49
        if ($id === null) {
50
            $q->byHostName(gethostname());
51
            $q->isForImport()->isOnlyFresh()->orderBy('created ASC');
52
            $currentFilesCount = File::find()->byHostName(gethostname())->isLoading()->count();
53
            if ($currentFilesCount >= $this->loadsLimit) {
54
                echo 'Files limit reached ' . $this->loadsLimit . '. Now loaded ' . $currentFilesCount . ' files';
55
                return;
56
            }
57
        } else {
58
            $q->byId($id);
59
        }
60
61
        while (true) {
62
            if ($id === null) {
63
                $this->waitForRelease();
64
            }
65
66
            /**
67
             * @var File $file
68
             */
69
            $file = $q->one();
70
            if ($id === null && !$file) {
71
                $this->release();
72
                break;
73
            }
74
75
            $file->triggerLoading();
76
            if ($id === null) {
77
                $this->release();
78
            }
79
80
            $this->deleteOldFilesBySetting($file);
81
82
            $this->parseFile($file);
83
            if ($id !== null) {
84
                break;
85
            }
86
        }
87
    }
88
89
    public function actionReleaseTrigger() {
90
        $this->release();
91
    }
92
93
    protected function deleteOldFilesBySetting($file) {
94
        $q = File::find()->byImportSettingId($file->import_setting_id)->andWhere([
95
            '<>',
96
            'id',
97
            $file->id,
98
        ])->select('id');
99
        $this->waitForRelease();
100
        File::updateAll([
101
            'import_files_statuse_id' => FilesStatuse::find()->byKey(FilesStatuse::DELETE)->one()->id,
102
        ], ['id' => $q->column()]);
103
104
        $c = $q->count();
105
106
        $this->release();
107
108
        while ($c) {
109
            $this->waitForRelease();
110
            $c = $q->count();
111
            $this->release();
112
            echo 'Waiting while deleted file' . "\n";
113
            sleep(1);
114
        }
115
    }
116
117
    protected function markErrorFiles()
118
    {
119
        $this->waitForRelease();
120
        $this->stdout('Start check failed files' . "\n");
121
        /**
122
         * @var File $file
123
         */
124
        $files = File::find()->byHostName(gethostname())->isWithoutProcess()->isInProgress()->all();
125
        foreach ($files as $file) {
126
            $attributes = [
127
                'level' => Logger::LEVEL_ERROR,
128
                'category' => 'import.notFoundProcess',
129
                'message' => 'The process ' . $file->process_id . ' to import the file was not found',
0 ignored issues
show
Bug introduced by
Accessing process_id on the interface yii\db\ActiveRecordInterface suggest that you code against a concrete implementation. How about adding an instanceof check?
Loading history...
130
            ];
131
            $file->logError($attributes);
132
            $file->triggerException();
133
            $this->stdout('File ' . $file->id . ' is marked as errored' . "\n");
0 ignored issues
show
Bug introduced by
Accessing id on the interface yii\db\ActiveRecordInterface suggest that you code against a concrete implementation. How about adding an instanceof check?
Loading history...
134
        }
135
136
        $this->release();
137
        $this->stdout('End check failed files' . "\n");
138
    }
139
140
    protected function clearOldFiles()
141
    {
142
        while (true) {
143
            $this->waitForRelease();
144
            /**
145
             * @var File $file
146
             */
147
            $file = File::find()->byHostName(gethostname())->isForClean()->one();
148
            if (!$file) {
149
                $this->release();
150
                break;
151
            }
152
153
            $file->triggerDeleting();
154
            $file->delete();
155
            $this->release();
156
        }
157
    }
158
159
    protected function parseFile(File $file) {
160
        $this->stdout('Start parse file #' . $file->id . ' ' . $file->name . "\n");
161
        $data = $file->getRows();
162
//        $data = array_splice($data, 23400, 1000000);
163
        $file->scenario = 'import';
164
        try {
165
            ini_set('error_reporting', E_ERROR);
166
            $file->rows_count = count($data);
167
            $file->save();
168
            ini_set('error_reporting', E_ALL);
169
        } catch (\Exception $e) {
170
            $attributes = [
171
                'level' => Logger::LEVEL_ERROR,
172
                'category' => 'import.fatalError',
173
                'message' => $e->getMessage() . "\n" . $e->getTraceAsString(),
174
            ];
175
            $file->logError($attributes);
176
            $file->triggerException();
177
            throw $e;
178
        }
179
180
        $stacksSettings = $file->getSettings();
181
        //        \yii::$app->db->close();
182
        $importer = new Importer([
183
            'file' => $file,
184
            'settings' => $stacksSettings,
185
            'data' => $data,
186
            'stackSize' => $this->stackSize,
187
        ]);
188
        $importer->run();
189
    }
190
191
    public function actionCheckSource($type = 'email', $id = null) {
192
        /**
193
         * @var Mutex $mutex
194
         */
195
        $mutex = \yii::$app->mutex;
196
        $mutexKey = self::class . '_' . $type;
197
        while (!$mutex->acquire($mutexKey)) {
198
            sleep(1);
199
        }
200
201
        $this->stdout('Checking source type ' . $type . "\n");
202
        $q = Setting::find();
203
        if ($id !== null) {
204
            $q->andWhere(['id' => $id]);
205
        } else {
206
            $q->byImportFilesSource_key($type);
207
        }
208
209
        /**
210
         * @var Setting[] $importSettings
211
         */
212
        $importSettings = $q->all();
213
        foreach ($importSettings as $setting) {
214
            $source = $setting->getSource();
215
            $files = $source->getFiles();
216
            if (!empty($files)) {
217
                foreach ($files as $file) {
218
                    $md5 = md5_file($file->filePath);
219
                    if ($importFile = File::find()->byMd5($md5)->select(['id', 'updated'])->one()) {
220
                        $importFile->updated = date('Y-m-d H:i:s');
221
                        /**
222
                         * @var File $file
223
                         */
224
                        $importFile->save(false, [
225
                            'updated'
226
                        ]);
227
                        echo 'File with md5 ' . $md5 . ' is already exists' . "\n";
228
                    } else {
229
                        $importFile = new File();
230
                        $fileInfo = pathinfo($file->filePath);
231
                        $importFile->attributes = [
232
                            'name' => $file->fileName,
233
                            'extension' => $fileInfo['extension'],
234
                            'mime_type' => mime_content_type($file->filePath),
235
                            'import_setting_id' => $setting->id,
236
                            'import_files_source_id' => $setting->filesSource->id,
0 ignored issues
show
Bug Best Practice introduced by
The property filesSource does not exist on execut\import\models\Setting. Since you implemented __get, consider adding a @property annotation.
Loading history...
237
                            'content' => $file->content,
238
                        ];
239
240
                        $this->saveModel($importFile);
241
                    }
242
                }
243
            }
244
        }
245
246
        $mutex->release($mutexKey);
247
    }
248
249
    public function actionCheckSourceDaemon($type = 'email') {
250
        while (true) {
251
            $this->actionCheckSource($type);
252
            sleep(60 * 5);
253
        }
254
    }
255
256
    protected function saveModel($model) {
257
        if ($model->save()) {
258
            $this->stdout('Model ' . $model . ' is saved' . "\n");
259
        } else {
260
            $this->stderr('Model ' . $model . ' is errors: ' . var_export($model->errors, true) . "\n");
261
        }
262
    }
263
264
    /**
265
     * @return Mutex
266
     */
267
    protected function waitForRelease(): Mutex
268
    {
269
        /**
270
         * @var Mutex $mutex
271
         */
272
        $mutex = \yii::$app->mutex;
273
        while (!$mutex->acquire(__CLASS__)) {
274
            echo 'Wait for release' . "\n";
275
            sleep(1);
276
        }
277
278
        return $mutex;
279
    }
280
281
    protected function release(): void
282
    {
283
        /**
284
         * @var Mutex $mutex
285
         */
286
        $mutex = \yii::$app->mutex;
287
        $mutex->release(__CLASS__);
288
    }
289
}