1
|
|
|
<?php |
2
|
|
|
|
3
|
|
|
namespace Maketok\DataMigration\Action\Type; |
4
|
|
|
|
5
|
|
|
use Maketok\DataMigration\Action\ActionInterface; |
6
|
|
|
use Maketok\DataMigration\Action\ConfigInterface; |
7
|
|
|
use Maketok\DataMigration\Action\Exception\NormalizationException; |
8
|
|
|
use Maketok\DataMigration\ArrayUtilsTrait; |
9
|
|
|
use Maketok\DataMigration\Expression\LanguageInterface; |
10
|
|
|
use Maketok\DataMigration\Input\InputResourceInterface; |
11
|
|
|
use Maketok\DataMigration\MapInterface; |
12
|
|
|
use Maketok\DataMigration\Storage\Db\ResourceHelperInterface; |
13
|
|
|
use Maketok\DataMigration\Storage\Exception\ParsingException; |
14
|
|
|
use Maketok\DataMigration\Unit\ImportFileUnitInterface; |
15
|
|
|
use Maketok\DataMigration\Unit\UnitBagInterface; |
16
|
|
|
use Maketok\DataMigration\Workflow\ResultInterface; |
17
|
|
|
|
18
|
|
|
/** |
19
|
|
|
* Disperse base input stream into separate units (tmp csv files) for further processing |
20
|
|
|
*/ |
21
|
|
|
class CreateTmpFiles extends AbstractAction implements ActionInterface |
|
|
|
|
22
|
|
|
{ |
23
|
|
|
use ArrayUtilsTrait; |
24
|
|
|
|
25
|
|
|
/** |
26
|
|
|
* @var UnitBagInterface|ImportFileUnitInterface[] |
27
|
|
|
*/ |
28
|
|
|
protected $bag; |
29
|
|
|
/** |
30
|
|
|
* @var InputResourceInterface |
31
|
|
|
*/ |
32
|
|
|
private $input; |
33
|
|
|
/** |
34
|
|
|
* @var MapInterface |
35
|
|
|
*/ |
36
|
|
|
private $map; |
37
|
|
|
/** |
38
|
|
|
* @var ResourceHelperInterface |
39
|
|
|
*/ |
40
|
|
|
private $helperResource; |
41
|
|
|
/** |
42
|
|
|
* @var array |
43
|
|
|
*/ |
44
|
|
|
private $buffer = []; |
45
|
|
|
/** |
46
|
|
|
* @var array |
47
|
|
|
*/ |
48
|
|
|
private $contributionBuffer = []; |
49
|
|
|
/** |
50
|
|
|
* is last processed entity valid |
51
|
|
|
* @var bool |
52
|
|
|
*/ |
53
|
|
|
private $isValid = true; |
54
|
|
|
/** |
55
|
|
|
* @var LanguageInterface |
56
|
|
|
*/ |
57
|
|
|
protected $language; |
58
|
|
|
/** |
59
|
|
|
* @var ResultInterface |
60
|
|
|
*/ |
61
|
|
|
protected $result; |
62
|
|
|
|
63
|
|
|
/** |
64
|
|
|
* @param UnitBagInterface $bag |
65
|
|
|
* @param ConfigInterface $config |
66
|
|
|
* @param LanguageInterface $language |
67
|
|
|
* @param InputResourceInterface $input |
68
|
|
|
* @param MapInterface $map |
69
|
|
|
* @param ResourceHelperInterface $helperResource |
70
|
|
|
*/ |
71
|
9 |
|
public function __construct( |
72
|
|
|
UnitBagInterface $bag, |
73
|
|
|
ConfigInterface $config, |
74
|
|
|
LanguageInterface $language, |
75
|
|
|
InputResourceInterface $input, |
76
|
|
|
MapInterface $map, |
77
|
|
|
ResourceHelperInterface $helperResource |
78
|
|
|
) { |
79
|
9 |
|
parent::__construct($bag, $config); |
80
|
9 |
|
$this->input = $input; |
81
|
9 |
|
$this->map = $map; |
82
|
9 |
|
$this->helperResource = $helperResource; |
83
|
9 |
|
$this->language = $language; |
84
|
9 |
|
} |
85
|
|
|
|
86
|
|
|
/** |
87
|
|
|
* {@inheritdoc} |
88
|
|
|
* @throws \LogicException |
89
|
|
|
*/ |
90
|
8 |
|
public function process(ResultInterface $result) |
91
|
|
|
{ |
92
|
8 |
|
$this->result = $result; |
93
|
8 |
|
$this->start(); |
94
|
8 |
|
while (true) { |
95
|
|
|
try { |
96
|
8 |
|
$entity = $this->input->get(); |
97
|
8 |
|
if ($entity === false) { |
98
|
8 |
|
break 1; |
99
|
|
|
} |
100
|
8 |
|
$this->processWrite($entity); |
|
|
|
|
101
|
8 |
|
$this->contributionBuffer = []; |
102
|
8 |
|
$this->dumpBuffer(); |
103
|
8 |
|
} catch (ParsingException $e) { |
104
|
|
|
if ($this->config['continue_on_error']) { |
105
|
|
|
$this->result->addActionException($this->getCode(), $e); |
106
|
|
|
} else { |
107
|
|
|
$this->close(); |
108
|
|
|
throw $e; |
109
|
|
|
} |
110
|
|
|
} catch (\Exception $e) { |
111
|
|
|
$this->close(); |
112
|
|
|
throw $e; |
113
|
|
|
} |
114
|
8 |
|
} |
115
|
8 |
|
$this->dumpBuffer(); |
116
|
8 |
|
$this->close(); |
117
|
8 |
|
} |
118
|
|
|
|
119
|
|
|
/** |
120
|
|
|
* write row to buffer |
121
|
|
|
* @param array $entity |
122
|
|
|
* @param int $level |
123
|
|
|
* @param int $idx |
124
|
|
|
*/ |
125
|
8 |
|
private function processWrite(array $entity, $level = 1, $idx = 0) |
126
|
|
|
{ |
127
|
8 |
|
$this->isValid = true; |
128
|
|
|
// parsing entity according to the relation tree |
129
|
8 |
|
$topUnits = $this->bag->getUnitsFromLevel($level); |
130
|
8 |
|
if (!isset($this->contributionBuffer[$level][$idx])) { |
131
|
8 |
|
$this->contributionBuffer[$level] = [$idx => []]; |
132
|
8 |
|
} |
133
|
8 |
|
foreach ($topUnits as $code) { |
134
|
8 |
|
if ($this->map->isFresh($entity)) { |
135
|
8 |
|
$this->map->feed($entity); |
136
|
8 |
|
} |
137
|
|
|
/** @var ImportFileUnitInterface $unit */ |
138
|
8 |
|
$unit = $this->bag->getUnitByCode($code); |
139
|
8 |
|
$this->processAdditions($unit, $idx); |
140
|
8 |
|
if (!$this->validate($unit)) { |
141
|
1 |
|
$this->isValid = false; |
142
|
1 |
|
} |
143
|
8 |
|
$this->writeRowBuffered($unit); |
144
|
8 |
|
$children = $this->bag->getChildren($code); |
145
|
8 |
|
foreach ($children as $child) { |
146
|
5 |
|
if (isset($entity[$child->getCode()])) { |
147
|
5 |
|
$childData = $entity[$child->getCode()]; |
148
|
5 |
|
$i = 0; |
149
|
5 |
|
foreach ($childData as $childEntity) { |
150
|
5 |
|
$this->processWrite($childEntity, $this->bag->getUnitLevel($child->getCode()), $i); |
151
|
5 |
|
$i++; |
152
|
5 |
|
} |
153
|
5 |
|
} |
154
|
8 |
|
} |
155
|
8 |
|
} |
156
|
8 |
|
} |
157
|
|
|
|
158
|
|
|
/** |
159
|
|
|
* open handlers |
160
|
|
|
*/ |
161
|
8 |
|
private function start() |
162
|
|
|
{ |
163
|
8 |
|
$this->result->setActionStartTime($this->getCode(), new \DateTime()); |
164
|
8 |
|
$this->bag->compileTree(); |
165
|
8 |
|
foreach ($this->bag as $unit) { |
166
|
8 |
|
$unit->setTmpFileName($this->getTmpFileName($unit)); |
167
|
8 |
|
$unit->getFilesystem()->open($unit->getTmpFileName(), 'w'); |
168
|
8 |
|
} |
169
|
8 |
|
} |
170
|
|
|
|
171
|
|
|
/** |
172
|
|
|
* close all handlers |
173
|
|
|
*/ |
174
|
8 |
|
private function close() |
175
|
|
|
{ |
176
|
8 |
|
foreach ($this->bag as $unit) { |
177
|
8 |
|
$unit->getFilesystem()->close(); |
178
|
8 |
|
} |
179
|
8 |
|
$this->input->reset(); |
180
|
8 |
|
$this->result->setActionEndTime($this->getCode(), new \DateTime()); |
181
|
8 |
|
} |
182
|
|
|
|
183
|
|
|
/** |
184
|
|
|
* @param ImportFileUnitInterface $unit |
185
|
|
|
* @param int $idx |
186
|
|
|
*/ |
187
|
8 |
|
private function processAdditions(ImportFileUnitInterface $unit, $idx = 0) |
188
|
|
|
{ |
189
|
8 |
|
$level = $this->bag->getUnitLevel($unit->getCode()); |
190
|
8 |
|
if (isset($this->contributionBuffer[$level][$idx])) { |
191
|
8 |
|
$contributionBuffer = $this->contributionBuffer[$level][$idx]; |
192
|
8 |
|
} else { |
193
|
|
|
$this->contributionBuffer[$level] = [$idx => []]; |
194
|
|
|
$contributionBuffer = []; |
195
|
|
|
} |
196
|
8 |
|
if (!in_array($unit->getCode(), $contributionBuffer)) { |
197
|
8 |
|
foreach ($unit->getContributions() as $contribution) { |
198
|
6 |
|
$this->language->evaluate($contribution, [ |
199
|
6 |
|
'map' => $this->map, |
200
|
6 |
|
'resource' => $this->helperResource, |
201
|
6 |
|
'hashmaps' => $unit->getHashmaps(), |
202
|
6 |
|
]); |
203
|
8 |
|
} |
204
|
8 |
|
$this->contributionBuffer[$level][$idx][] = $unit->getCode(); |
205
|
8 |
|
} |
206
|
|
|
|
207
|
|
|
/** @var ImportFileUnitInterface $sibling */ |
208
|
8 |
|
foreach ($unit->getSiblings() as $sibling) { |
209
|
|
|
if (!in_array($sibling->getCode(), $contributionBuffer)) { |
210
|
|
|
foreach ($sibling->getContributions() as $contribution) { |
|
|
|
|
211
|
|
|
$this->language->evaluate($contribution, [ |
212
|
|
|
'map' => $this->map, |
213
|
|
|
'resource' => $this->helperResource, |
214
|
|
|
'hashmaps' => $sibling->getHashmaps(), |
|
|
|
|
215
|
|
|
]); |
216
|
|
|
} |
217
|
|
|
$this->contributionBuffer[$level][$idx][] = $sibling->getCode(); |
218
|
|
|
} |
219
|
8 |
|
} |
220
|
8 |
|
} |
221
|
|
|
|
222
|
|
|
/** |
223
|
|
|
* @param ImportFileUnitInterface $unit |
224
|
|
|
* @param bool $replace |
225
|
|
|
*/ |
226
|
8 |
|
private function writeRowBuffered(ImportFileUnitInterface $unit, $replace = false) |
227
|
|
|
{ |
228
|
8 |
|
$shouldAdd = true; |
229
|
8 |
|
foreach ($unit->getWriteConditions() as $condition) { |
230
|
5 |
|
$shouldAdd = $this->language->evaluate($condition, [ |
231
|
5 |
|
'map' => $this->map, |
232
|
5 |
|
'resource' => $this->helperResource, |
233
|
5 |
|
'hashmaps' => $unit->getHashmaps(), |
234
|
5 |
|
]); |
235
|
5 |
|
if (!$shouldAdd) { |
236
|
1 |
|
break 1; |
237
|
|
|
} |
238
|
8 |
|
} |
239
|
8 |
|
if ($shouldAdd) { |
240
|
8 |
|
$row = array_map(function ($var) use ($unit) { |
241
|
7 |
|
return $this->language->evaluate($var, [ |
242
|
7 |
|
'map' => $this->map, |
243
|
7 |
|
'resource' => $this->helperResource, |
244
|
7 |
|
'hashmaps' => $unit->getHashmaps(), |
245
|
7 |
|
]); |
246
|
8 |
|
}, $unit->getMapping()); |
247
|
|
|
/** |
248
|
|
|
* Each unit can return rows multiple times in case it needs to |
249
|
|
|
* but each mapped part should be returned equal times |
250
|
|
|
*/ |
251
|
|
|
try { |
252
|
8 |
|
if (isset($this->buffer[$unit->getCode()]) && is_array($this->buffer[$unit->getCode()])) { |
253
|
5 |
|
if ($replace) { |
254
|
|
|
// replace last row in the buffer |
255
|
5 |
|
array_pop($this->buffer[$unit->getCode()]); |
256
|
5 |
|
} |
257
|
5 |
|
$this->buffer[$unit->getCode()] = array_merge( |
258
|
5 |
|
$this->buffer[$unit->getCode()], |
259
|
5 |
|
$this->normalize($row) |
260
|
5 |
|
); |
261
|
5 |
|
} else { |
262
|
8 |
|
$this->buffer[$unit->getCode()] = $this->normalize($row); |
263
|
|
|
} |
264
|
8 |
|
} catch (NormalizationException $e) { |
265
|
|
|
$this->result->addActionException($this->getCode(), $e); |
266
|
|
|
} |
267
|
|
|
/** @var ImportFileUnitInterface $parent */ |
268
|
8 |
|
if ($parent = $unit->getParent()) { |
269
|
5 |
|
$this->writeRowBuffered($parent, true); |
|
|
|
|
270
|
5 |
|
$siblings = $parent->getSiblings(); |
271
|
|
|
/** @var ImportFileUnitInterface $sibling */ |
272
|
5 |
|
foreach ($siblings as $sibling) { |
273
|
|
|
$this->writeRowBuffered($sibling, true); |
|
|
|
|
274
|
5 |
|
} |
275
|
5 |
|
} |
276
|
8 |
|
} |
277
|
8 |
|
} |
278
|
|
|
|
279
|
|
|
/** |
280
|
|
|
* @param ImportFileUnitInterface $unit |
281
|
|
|
* @return bool |
282
|
|
|
*/ |
283
|
8 |
|
private function validate(ImportFileUnitInterface $unit) |
284
|
|
|
{ |
285
|
8 |
|
$valid = true; |
286
|
8 |
|
foreach ($unit->getValidationRules() as $validationRule) { |
287
|
4 |
|
$valid = $this->language->evaluate($validationRule, [ |
288
|
4 |
|
'map' => $this->map, |
289
|
4 |
|
'resource' => $this->helperResource, |
290
|
4 |
|
'hashmaps' => $unit->getHashmaps(), |
291
|
4 |
|
]); |
292
|
4 |
|
if (!$valid) { |
293
|
1 |
|
$this->result->addActionError( |
294
|
1 |
|
$this->getCode(), |
295
|
1 |
|
sprintf( |
296
|
1 |
|
"Invalid row %s for unit %s.", |
297
|
1 |
|
json_encode($this->map->dumpState()), |
298
|
1 |
|
$unit->getCode() |
299
|
1 |
|
) |
300
|
1 |
|
); |
301
|
1 |
|
break 1; |
302
|
|
|
} |
303
|
8 |
|
} |
304
|
8 |
|
return (bool) $valid; |
305
|
|
|
} |
306
|
|
|
|
307
|
|
|
/** |
308
|
|
|
* @param ImportFileUnitInterface $unit |
309
|
|
|
*/ |
310
|
8 |
|
private function dumpBuffer(ImportFileUnitInterface $unit = null) |
|
|
|
|
311
|
|
|
{ |
312
|
8 |
|
if (!$this->isValid && !$this->config['ignore_validation']) { |
313
|
1 |
|
return; |
314
|
|
|
} |
315
|
7 |
|
$processedCounterContainer = []; |
|
|
|
|
316
|
7 |
|
foreach ($this->buffer as $key => $dataArray) { |
317
|
7 |
|
if ($unit && $key != $unit->getCode()) { |
318
|
|
|
continue; |
319
|
|
|
} |
320
|
7 |
|
$handler = false; |
321
|
7 |
|
$tmpUnit = false; |
322
|
7 |
|
if ($unit) { |
323
|
|
|
$handler = $unit->getFilesystem(); |
324
|
7 |
|
} elseif (($tmpUnit = $this->bag->getUnitByCode($key)) !== false) { |
325
|
|
|
/** @var ImportFileUnitInterface $tmpUnit */ |
326
|
7 |
|
$handler = $tmpUnit->getFilesystem(); |
327
|
7 |
|
} |
328
|
7 |
|
if (is_object($handler)) { |
329
|
7 |
|
foreach ($dataArray as $row) { |
330
|
7 |
|
$written = $handler->writeRow(array_values($row)); |
331
|
7 |
|
if (false === $written) { |
332
|
1 |
|
$this->result->addActionError( |
333
|
1 |
|
$this->getCode(), |
334
|
1 |
|
sprintf("Could not write row %s to file.", json_encode($row)) |
335
|
1 |
|
); |
336
|
7 |
|
} elseif ($tmpUnit && !$tmpUnit->getParent() && !in_array($tmpUnit->getCode(), $processedCounterContainer)) { |
337
|
6 |
|
$this->result->incrementActionProcessed($this->getCode()); |
338
|
6 |
|
$processedCounterContainer[] = $tmpUnit->getCode(); |
339
|
6 |
|
foreach ($tmpUnit->getSiblings() as $sibling) { |
340
|
|
|
$processedCounterContainer[] = $sibling->getCode(); |
341
|
6 |
|
} |
342
|
6 |
|
} |
343
|
7 |
|
} |
344
|
7 |
|
} |
345
|
7 |
|
unset($this->buffer[$key]); |
346
|
7 |
|
} |
347
|
7 |
|
} |
348
|
|
|
|
349
|
|
|
/** |
350
|
|
|
* {@inheritdoc} |
351
|
|
|
*/ |
352
|
9 |
|
public function getCode() |
353
|
|
|
{ |
354
|
9 |
|
return 'create_tmp_files'; |
355
|
|
|
} |
356
|
|
|
} |
357
|
|
|
|
The class complexity is the sum of the complexity of all methods. A very high value is usually an indication that your class does not follow the single reponsibility principle and does more than one job.
Some resources for further reading:
You can also find more detailed suggestions for refactoring in the “Code” section of your repository.