AssembleInput::processAssemble()   B
last analyzed

Complexity

Conditions 6
Paths 6

Size

Total Lines 27

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 19
CRAP Score 6

Importance

Changes 0
Metric Value
dl 0
loc 27
c 0
b 0
f 0
ccs 19
cts 19
cp 1
rs 8.8657
cc 6
nc 6
nop 0
crap 6
1
<?php
2
3
namespace Maketok\DataMigration\Action\Type;
4
5
use Maketok\DataMigration\Action\ActionInterface;
6
use Maketok\DataMigration\Action\ConfigInterface;
7
use Maketok\DataMigration\Action\Exception\ConflictException;
8
use Maketok\DataMigration\Action\Exception\FlowRegulationException;
9
use Maketok\DataMigration\Action\Exception\WrongContextException;
10
use Maketok\DataMigration\ArrayUtilsTrait;
11
use Maketok\DataMigration\Expression\LanguageInterface;
12
use Maketok\DataMigration\Input\InputResourceInterface;
13
use Maketok\DataMigration\MapInterface;
14
use Maketok\DataMigration\Unit\ExportFileUnitInterface;
15
use Maketok\DataMigration\Unit\ImportFileUnitInterface;
16
use Maketok\DataMigration\Unit\UnitBagInterface;
17
use Maketok\DataMigration\Unit\UnitInterface;
18
use Maketok\DataMigration\Workflow\ResultInterface;
19
20
/**
21
 * Create Base Input stream from separate units
22
 */
23
class AssembleInput extends AbstractAction implements ActionInterface
0 ignored issues
show
Complexity introduced by
This class has a complexity of 97 which exceeds the configured maximum of 50.

The class complexity is the sum of the complexity of all methods. A very high value is usually an indication that your class does not follow the single reponsibility principle and does more than one job.

Some resources for further reading:

You can also find more detailed suggestions for refactoring in the “Code” section of your repository.

Loading history...
Complexity introduced by
The class AssembleInput has a coupling between objects value of 21. Consider to reduce the number of dependencies under 13.
Loading history...
24
{
25
    use ArrayUtilsTrait;
26
27
    const FLOW_CONTINUE = 100;
28
    const FLOW_ABORT = 200;
29
30
    /**
31
     * @var UnitBagInterface|ExportFileUnitInterface[]|ImportFileUnitInterface[]
32
     */
33
    protected $bag;
34
    /**
35
     * @var InputResourceInterface
36
     */
37
    private $input;
38
    /**
39
     * @var MapInterface
40
     */
41
    private $map;
42
    /**
43
     * Current row being processed
44
     * @var array
45
     */
46
    private $processed = [];
47
    /**
48
     * Last processed data for unit
49
     * @var array
50
     */
51
    private $lastProcessed = [];
52
    /**
53
     * read buffer
54
     * @var array
55
     */
56
    private $buffer = [];
57
    /**
58
     * persistent buffer serves as a deeper storage than orig buffer
59
     * it will serve value into orig buffer once it's missing there
60
     * @var array
61
     */
62
    private $persistentBuffer = [];
63
    /**
64
     * Storage that holds keys from pB that should be unset by the end of flow
65
     * @var array
66
     */
67
    private $toUnset = [];
68
    /**
69
     * Temporary buffer that's cleared right after serving data
70
     * @var array
71
     */
72
    private $tmpBuffer = [];
73
    /**
74
     * write buffer
75
     * @var array
76
     */
77
    private $writeBuffer = [];
78
    /**
79
     * small storage to hold info about the lasting buffer records
80
     * (if current processed was served from buffer)
81
     * @var array
82
     */
83
    private $lastingBuffer = [];
84
    /**
85
     * Entries that specify connection between units
86
     * @var array
87
     */
88
    private $connectBuffer = [];
89
    /**
90
     * Units that are empty
91
     * @var string[]
92
     */
93
    private $finished = [];
94
    /**
95
     * @var LanguageInterface
96
     */
97
    protected $language;
98
    /**
99
     * @var ResultInterface
100
     */
101
    protected $result;
102
103
    /**
104
     * @param UnitBagInterface $bag
105
     * @param ConfigInterface $config
106
     * @param LanguageInterface $language
107
     * @param InputResourceInterface $input
108
     * @param MapInterface $map
109
     */
110 19
    public function __construct(
111
        UnitBagInterface $bag,
112
        ConfigInterface $config,
113
        LanguageInterface $language,
114
        InputResourceInterface $input,
115
        MapInterface $map
116
    ) {
117 19
        parent::__construct($bag, $config);
118 19
        $this->input = $input;
119 19
        $this->map = $map;
120 19
        $this->language = $language;
121 19
    }
122
123
    /**
124
     * {@inheritdoc}
125
     * Reversed process to create input resource
126
     * Order of the units matters!
127
     * @throws \LogicException
128
     * @throws WrongContextException
129
     */
130 17
    public function process(ResultInterface $result)
131
    {
132 17
        $this->result = $result;
133 17
        $this->start();
134 16
        while (true) {
135
            try {
136 16
                $this->processUnitRowData();
137 13
                if ($this->bag->getLowestLevel() == 1) {
138 4
                    $this->dumpWriteBuffer();
139 4
                }
140 13
                $this->addData();
141 16
            } catch (FlowRegulationException $e) {
142 15
                if ($e->getCode() === self::FLOW_ABORT) {
143 14
                    break 1; // exit point
144
                }
145 13
            } catch (\Exception $e) {
146 2
                $this->close();
147 2
                throw $e;
148
            }
149 15
            $this->clear();
150 15
        }
151 14
        $this->dumpWriteBuffer();
152 14
        $this->close();
153 14
    }
154
155
    /**
156
     * clear variable context
157
     */
158 15
    private function clear()
159
    {
160 15
        $this->connectBuffer = [];
161 15
        $this->processed = [];
162 15
        $this->toUnset = [];
163 15
        $this->lastingBuffer = [];
164 15
    }
165
166
    /**
167
     * read data
168
     */
169 16
    private function processRead()
170
    {
171 16
        foreach ($this->bag as $unit) {
172 16
            $code = $unit->getCode();
173 16
            $this->prepareRead($code);
174 16
            if (isset($this->tmpBuffer[$code])) {
175 5
                $this->processed[$code] = $tmpRow = $this->tmpBuffer[$code];
176 5
                unset($this->tmpBuffer[$code]);
177 5
                $this->lastingBuffer[$code] = false;
178 5
                if (!$this->bag->isLowest($code)) {
179 3
                    $this->buffer[$code] = $this->processed[$code];
180 3
                }
181 16
            } elseif (in_array($code, $this->finished)) {
182 8
                continue;
183 16
            } elseif (isset($this->buffer[$code])) {
184 11
                $this->processed[$code] = $tmpRow = $this->buffer[$code];
185 11
                $this->lastingBuffer[$code] = true;
186 16
            } elseif (($tmpRow = $this->readRow($unit)) !== false) {
187 16
                $this->processed[$code] = $tmpRow;
188 16
                $this->lastingBuffer[$code] = false;
189 16
                if (!$this->bag->isLowest($code)) {
190 11
                    $this->buffer[$code] = $this->processed[$code];
191 11
                }
192 16
            } else {
193 15
                continue;
194
            }
195 16
            if (isset($this->processed[$code])) {
196 16
                $this->lastProcessed[$code] = $this->processed[$code];
197 16
            }
198
            // do not add all nulls to connectBuffer
199 16
            if ($this->isEmptyData($tmpRow)) {
200 5
                $this->connectBuffer[$code] = [];
201 5
                continue;
202
            }
203
            $this->connectBuffer[$code] = array_map(function ($var) use ($tmpRow) {
204 16
                if (!isset($tmpRow[$var])) {
205 1
                    throw new \LogicException("Wrong reversed connection key given.");
206
                } else {
207 15
                    return $tmpRow[$var];
208
                }
209 16
            }, $unit->getReversedConnection());
210 15
        }
211 15
    }
212
213
    /**
214
     * @param string $code
215
     */
216 16
    private function prepareRead($code)
217
    {
218 16
        if (!isset($this->tmpBuffer[$code]) && isset($this->persistentBuffer[$code])) {
219 5
            $this->tmpBuffer[$code] = $this->persistentBuffer[$code];
220 5
            $this->toUnset[$code] = true;
221 5
        }
222 16
    }
223
224
    /**
225
     * try to assemble
226
     * @throws ConflictException
227
     * @throws FlowRegulationException
228
     */
229 14
    private function processAssemble()
230
    {
231 14
        foreach ($this->bag->getRelations() as $rel) {
232 13
            $code = key($rel);
233 13
            $set = current($rel);
234
            $setCodes = array_map(function (UnitInterface $unit) {
235 13
                return $unit->getCode();
236 13
            }, $set);
237 13
            $codes = array_intersect_key($this->connectBuffer, array_flip($setCodes));
238
            switch ($code) {
239 13
                case 'pc': //parent-child
240
                    try {
241 10
                        $this->assemble($codes);
242 10
                    } catch (ConflictException $e) {
243 10
                        $this->handleConflict(array_keys($codes));
244
                    }
245 9
                    break;
246 8
                case 's': //siblings
247
                    try {
248 8
                        $this->assemble($codes);
249 8
                    } catch (ConflictException $e) {
250 4
                        $this->handleConflictedSibling(array_keys($codes));
251
                    }
252 8
                    break;
253
            }
254 13
        }
255 13
    }
256
257
    /**
258
     * @param array $codes
259
     * @throws FlowRegulationException
260
     */
261 4
    private function handleConflictedSibling(array $codes)
262
    {
263 4
        $optional = false;
264 4
        foreach ($codes as $code) {
265
            /** @var ExportFileUnitInterface $unit */
266 4
            $unit = $this->bag->getUnitByCode($code);
267 4
            if ($unit->isOptional()) {
268 4
                if (isset($this->processed[$code])) {
269 4
                    $this->tmpBuffer[$code] = $this->getNullsData($this->processed[$code]);
270 4
                    $this->persistentBuffer[$code] = $this->processed[$code];
271 4
                    $this->unsetBuffer($code);
272 4
                }
273 4
                $optional = true;
274 4
            } else {
275 4
                if (isset($this->processed[$code])) {
276 4
                    $this->buffer[$code] = $this->processed[$code];
277 4
                }
278
            }
279 4
            $this->bufferChildren($code);
280 4
        }
281 4
        if (!$optional) {
282
            throw new \LogicException(sprintf("Conflict for units: %s", json_encode($codes)));
283
        }
284 4
        throw new FlowRegulationException("", self::FLOW_CONTINUE);
285
    }
286
287
    /**
288
     * Buffer all child items
289
     * @param string $code
290
     */
291 4
    private function bufferChildren($code)
292
    {
293 4
        $lvl = $this->bag->getUnitLevel($code);
294 4
        $maxLvl = $this->bag->getLowestLevel();
295 4
        $lvl += 1;
296 4
        while ($lvl <= $maxLvl) {
297 3
            $codes = $this->bag->getUnitsFromLevel($lvl);
298 3
            foreach ($codes as $childCode) {
299 3
                if (!isset($this->buffer[$childCode]) && isset($this->processed[$childCode])) {
300 2
                    $this->buffer[$childCode] = $this->processed[$childCode];
301 2
                }
302 3
            }
303 3
            $lvl += 1;
304 3
        }
305 4
    }
306
307
    /**
308
     * @return array
309
     */
310 16
    private function processUnitRowData()
311
    {
312 16
        $this->processRead();
313 15
        $this->analyzeRow();
314 14
        $this->processAssemble();
315 13
    }
316
317
    /**
318
     * analyze tmp rows before trying to assemble
319
     * @throws FlowRegulationException
320
     * @throws \LogicException
321
     */
322 15
    private function analyzeRow()
323
    {
324 15
        if ($this->isEmptyData($this->processed)) {
325 14
            throw new FlowRegulationException("", self::FLOW_ABORT);
326
        }
327
        // check if we have some missing units
328 15
        if (count($this->connectBuffer) < $this->bag->count()) {
329 11
            foreach ($this->bag as $unit) {
330 11
                $code = $unit->getCode();
331 11
                if (!array_key_exists($code, $this->connectBuffer)) {
332 11
                    $this->analyzeOptionalItems($code);
333 9
                    $this->finished[] = $code;
334 9
                }
335 11
            }
336
            // checking if input has the correct # of mapped entries
337 9
            $intersect = array_intersect_key($this->connectBuffer, $this->buffer);
338 9
            foreach (array_keys($intersect) as $purgeKey) {
339 8
                $this->unsetBuffer($purgeKey);
340 8
                unset($this->connectBuffer[$purgeKey]);
341 9
            }
342 9
            if (!$this->isEmptyData($this->connectBuffer)) {
343 1
                throw new \LogicException(
344 1
                    sprintf(
345 1
                        "Orphaned rows in some of the units %s",
346 1
                        json_encode(array_keys($this->connectBuffer))
347 1
                    )
348 1
                );
349
            }
350 8
            throw new FlowRegulationException("", self::FLOW_CONTINUE);
351
        }
352 14
    }
353
354
    /**
355
     * Check the optional items missing
356
     * @param string $code
357
     * @throws FlowRegulationException
358
     */
359 11
    private function analyzeOptionalItems($code)
360
    {
361
        /** @var ExportFileUnitInterface $unit */
362 11
        $unit = $this->bag->getUnitByCode($code);
363 11
        if ($unit->isOptional()) {
364 3
            $lvl = $this->bag->getUnitLevel($code);
365 3
            $lvl--;
366 3
            $allLasting = true;
367
            // clear buffers of lasting ancestors
368 3
            while ($lvl >= 1) {
369 2
                $parents = $this->bag->getUnitsFromLevel($lvl);
370 2
                foreach ($parents as $parentCode) {
371 2
                    $lasting = isset($this->lastingBuffer[$parentCode]) && $this->lastingBuffer[$parentCode];
372 2
                    $allLasting &= $lasting;
373 2
                    if ($lasting) {
374 2
                        $this->unsetBuffer($parentCode);
375 2
                    }
376 2
                }
377 2
                $lvl--;
378 2
            }
379 3
            if ($allLasting) {
380 3
                $this->dumpWriteBuffer();
381 3
            }
382
            // add buffer for current optional unit
383 3
            if ($this->lastProcessed[$code]) {
384 3
                $this->buffer[$code] = $this->getNullsData($this->lastProcessed[$code]);
385 3
            } else {
386
                throw new \LogicException(sprintf("Can't add optional item to buffer, since none existed."));
387
            }
388 3
            throw new FlowRegulationException("", self::FLOW_CONTINUE);
389
        }
390 9
    }
391
392
    /**
393
     * @param string[] $codes
394
     * @throws FlowRegulationException
395
     * @throws \LogicException
396
     */
397 10
    private function handleConflict(array $codes)
398
    {
399 10
        $optionalCodeEncounter = false;
0 ignored issues
show
Comprehensibility Naming introduced by
The variable name $optionalCodeEncounter exceeds the maximum configured length of 20.

Very long variable names usually make code harder to read. It is therefore recommended not to make variable names too verbose.

Loading history...
400 10
        foreach ($codes as $code) {
401
            /** @var ExportFileUnitInterface $unit */
402 10
            $unit = $this->bag->getUnitByCode($code);
403 10
            if (isset($this->buffer[$code]) && $unit->isOptional()) {
404 1
                $this->persistentBuffer[$code] = $this->buffer[$code];
405 1
                $this->tmpBuffer[$code] = $this->getNullsData($this->processed[$code]);
406 1
                $this->unsetBuffer($code);
407 1
                $optionalCodeEncounter = true;
408 1
                continue;
409
            }
410 10
        }
411
        // normal flow
412 10
        if (!$optionalCodeEncounter) {
413 10
            foreach ($codes as $code) {
414 10
                if ($this->cleanBuffer($code)) {
415 10
                    continue;
416
                }
417 10
                $this->fillBuffer($code);
418 10
            }
419 10
        }
420 10
        $this->dumpWriteBuffer();
421 10
        foreach ($this->bag as $unit) {
422 10
            $this->cleanPersistentBuffer($unit->getCode());
423 10
        }
424 10
        throw new FlowRegulationException("", self::FLOW_CONTINUE);
425
    }
426
427
    /**
428
     * dupm existing write buffer
429
     */
430 15
    private function dumpWriteBuffer()
431
    {
432 15
        if (!empty($this->writeBuffer)) {
433 13
            $this->input->add($this->writeBuffer);
434 13
            $this->result->incrementActionProcessed($this->getCode());
435 13
        }
436 15
        $this->writeBuffer = [];
437 15
    }
438
439
    /**
440
     * clean buffer for code
441
     * @param string $code
442
     * @return bool
443
     */
444 10
    protected function cleanBuffer($code)
445
    {
446 10
        $unit = $this->bag->getUnitByCode($code);
447 10
        $siblings = $unit->getSiblings();
448 10
        $cleaned = false;
449
        /** @var UnitInterface $singleUnit */
450 10
        foreach (array_merge($siblings, [$unit]) as $singleUnit) {
451 10
            $code = $singleUnit->getCode();
452 10
            $cleaned |= $this->unsetBuffer($code);
453
            // now check if buffer came from PS
454
            // in case it did, and we cleared because of conflict
455
            // we need to bring it back
456 10
            if (isset($this->toUnset[$code])) {
457 4
                unset($this->toUnset[$code]);
458 4
            }
459 10
        }
460 10
        return $cleaned;
461
    }
462
463
    /**
464
     * delete current buffer for given code
465
     * @param string $code
466
     * @return bool
467
     */
468 15
    private function unsetBuffer($code)
469
    {
470 15
        if (isset($this->buffer[$code])) {
471 12
            unset($this->buffer[$code]);
472 12
            return true;
473
        }
474 14
        return false;
475
    }
476
477
    /**
478
     * fill buffer for code
479
     * @param string $code
480
     * @return bool
481
     */
482 10
    public function fillBuffer($code)
483
    {
484 10
        $unit = $this->bag->getUnitByCode($code);
485 10
        $siblings = $unit->getSiblings();
486 10
        $filled = false;
487 10
        while (!isset($this->buffer[$code]) && isset($this->processed[$code])) {
488 10
            $this->buffer[$code] = $this->processed[$code];
489 10
            $filled = true;
490 10
            $sibling = array_shift($siblings);
491 10
            if ($sibling) {
492 1
                $code = $sibling->getCode();
493 1
            }
494 10
        }
495 10
        return $filled;
496
    }
497
498
    /**
499
     * procedure of adding data to input resource
500
     */
501 13
    private function addData()
502
    {
503 13
        $toAdd = [];
504 13
        $tmpRow = $this->assembleResolve($this->processed);
505 13
        $this->map->clear();
506 13
        $this->map->feed($tmpRow);
507 13
        foreach ($this->processed as $unitCode => $unitData) {
508
            /** @var ExportFileUnitInterface|ImportFileUnitInterface $unit */
509 13
            $unit = $this->bag->getUnitByCode($unitCode);
510 13
            $toAdd[$unitCode] = array_map(function ($var) use ($unit) {
511 13
                return $this->language->evaluate($var, [
512 13
                    'map' => $this->map,
513 13
                    'hashmaps' => $unit->getHashmaps(),
0 ignored issues
show
Bug introduced by
The method getHashmaps does only exist in Maketok\DataMigration\Unit\ImportFileUnitInterface, but not in Maketok\DataMigration\Unit\ExportFileUnitInterface.

It seems like the method you are trying to call exists only in some of the possible types.

Let’s take a look at an example:

class A
{
    public function foo() { }
}

class B extends A
{
    public function bar() { }
}

/**
 * @param A|B $x
 */
function someFunction($x)
{
    $x->foo(); // This call is fine as the method exists in A and B.
    $x->bar(); // This method only exists in B and might cause an error.
}

Available Fixes

  1. Add an additional type-check:

    /**
     * @param A|B $x
     */
    function someFunction($x)
    {
        $x->foo();
    
        if ($x instanceof B) {
            $x->bar();
        }
    }
    
  2. Only allow a single type to be passed if the variable comes from a parameter:

    function someFunction(B $x) { /** ... */ }
    
Loading history...
514 13
                ]);
515 13
            }, $unit->getReversedMapping());
0 ignored issues
show
Bug introduced by
The method getReversedMapping does only exist in Maketok\DataMigration\Unit\ExportFileUnitInterface, but not in Maketok\DataMigration\Unit\ImportFileUnitInterface.

It seems like the method you are trying to call exists only in some of the possible types.

Let’s take a look at an example:

class A
{
    public function foo() { }
}

class B extends A
{
    public function bar() { }
}

/**
 * @param A|B $x
 */
function someFunction($x)
{
    $x->foo(); // This call is fine as the method exists in A and B.
    $x->bar(); // This method only exists in B and might cause an error.
}

Available Fixes

  1. Add an additional type-check:

    /**
     * @param A|B $x
     */
    function someFunction($x)
    {
        $x->foo();
    
        if ($x instanceof B) {
            $x->bar();
        }
    }
    
  2. Only allow a single type to be passed if the variable comes from a parameter:

    function someFunction(B $x) { /** ... */ }
    
Loading history...
516 13
        }
517 13
        foreach ($this->bag as $unit) {
518 13
            $code = $unit->getCode();
519 13
            if ($this->bag->isLowest($code)) {
520 13
                $this->unsetBuffer($code);
521 13
            }
522 13
            $this->cleanPersistentBuffer($code);
523 13
        }
524 13
        $this->writeBuffer = $this->assembleHierarchy($toAdd);
525 13
    }
526
527
    /**
528
     * @param string $code
529
     */
530 14
    private function cleanPersistentBuffer($code)
531
    {
532 14
        if (isset($this->toUnset[$code]) && isset($this->persistentBuffer[$code])) {
533 5
            unset($this->persistentBuffer[$code]);
534 5
        }
535 14
    }
536
537
    /**
538
     * @param array $toAdd
539
     * @param int $level
540
     * @return array
541
     */
542 13
    private function assembleHierarchy(array $toAdd, $level = 1)
543
    {
544 13
        $topUnits = $this->bag->getUnitsFromLevel($level);
545 13
        $return = $this->assemble(array_intersect_key($toAdd, array_flip($topUnits)));
546 13
        if ($level == 1) {
547 13
            $return = array_replace_recursive($this->writeBuffer, $return);
548 13
        }
549 13
        foreach ($topUnits as $code) {
550 13
            $children = $this->bag->getChildren($code);
551 13
            foreach ($children as $child) {
552 9
                if (isset($return[$child->getCode()]) && is_array($return[$child->getCode()])) {
553 9
                    $return[$child->getCode()][] = $this->assembleHierarchy(
554 9
                        $toAdd,
555 9
                        $this->bag->getUnitLevel($child->getCode())
556 9
                    );
557 9
                } else {
558 9
                    $return[$child->getCode()] = [
559 9
                        $this->assembleHierarchy(
560 9
                            $toAdd,
561 9
                            $this->bag->getUnitLevel($child->getCode())
562 9
                        )
563 9
                    ];
564
                }
565 13
            }
566 13
        }
567 13
        return $return;
568
    }
569
570
    /**
571
     * @param ImportFileUnitInterface $unit
572
     * @return array|bool
573
     * @throws WrongContextException
574
     */
575 16
    private function readRow(ImportFileUnitInterface $unit)
576
    {
577 16
        $row = $unit->getFilesystem()->readRow();
578 16
        if (is_array($row)) {
579 16
            return array_combine(array_keys($unit->getMapping()), $row);
580
        }
581 15
        return false;
582
    }
583
584
    /**
585
     * open handlers
586
     * @throws WrongContextException
587
     */
588 17
    private function start()
589
    {
590 17
        $this->result->setActionStartTime($this->getCode(), new \DateTime());
591 17
        $this->bag->compileTree();
592 17
        foreach ($this->bag as $unit) {
593 17
            if ($unit->getTmpFileName() === null) {
594 1
                throw new WrongContextException(sprintf(
595 1
                    "Action can not be used for current unit %s. Tmp file is missing.",
596 1
                    $unit->getCode()
597 1
                ));
598
            }
599 16
            $unit->getFilesystem()->open($unit->getTmpFileName(), 'r');
600 16
        }
601 16
    }
602
603
    /**
604
     * close all handlers
605
     */
606 16
    private function close()
607
    {
608 16
        foreach ($this->bag as $unit) {
609 16
            $unit->getFilesystem()->close();
610 16
        }
611 16
        if (!$this->config['file_debug']) {
612 16
            foreach ($this->bag as $unit) {
613 16
                $unit->getFilesystem()->cleanUp($unit->getTmpFileName());
614 16
            }
615 16
        }
616 16
        $this->result->setActionEndTime($this->getCode(), new \DateTime());
617 16
    }
618
619
    /**
620
     * {@inheritdoc}
621
     */
622 18
    public function getCode()
623
    {
624 18
        return 'assemble_input';
625
    }
626
}
627