ChainProcessor::processItem()   B
last analyzed

Complexity

Conditions 10
Paths 13

Size

Total Lines 43
Code Lines 29

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 9
CRAP Score 10

Importance

Changes 1
Bugs 0 Features 0
Metric Value
cc 10
eloc 29
c 1
b 0
f 0
nc 13
nop 3
dl 0
loc 43
rs 7.6666
ccs 9
cts 9
cp 1
crap 10

How to fix   Complexity   

Long Method

Small methods make your code easier to understand, in particular if combined with a good name. Besides, if your method is small, finding a good name is usually much easier.

For example, if you find yourself adding comments to a method's body, this is usually a good sign to extract the commented part to a new method, and use the comment as a starting point when coming up with a good name for this new method.

Commonly applied refactorings include:

1
<?php
2
3
declare(strict_types=1);
4
5
namespace Oliverde8\Component\PhpEtl;
6
7
use Oliverde8\Component\PhpEtl\ChainOperation\ChainOperationInterface;
8
use Oliverde8\Component\PhpEtl\Exception\ChainOperationException;
9
use Oliverde8\Component\PhpEtl\Item\AsyncItemInterface;
10
use Oliverde8\Component\PhpEtl\Item\ChainBreakItem;
11
use Oliverde8\Component\PhpEtl\Item\DataItem;
12
use Oliverde8\Component\PhpEtl\Item\GroupedItemInterface;
13
use Oliverde8\Component\PhpEtl\Item\ItemInterface;
14
use Oliverde8\Component\PhpEtl\Item\MixItem;
15
use Oliverde8\Component\PhpEtl\Item\StopItem;
16
use Oliverde8\Component\PhpEtl\Model\ExecutionContext;
17
use Oliverde8\Component\PhpEtl\Model\LoggerContext;
18
19
/**
20
 * Class ChainProcessor
21
 *
22
 * @author    de Cramer Oliver<[email protected]>
23
 * @copyright 2018 Oliverde8
24
 * @package Oliverde8\Component\PhpEtl
25
 */
26
class ChainProcessor extends LoggerContext implements ChainProcessorInterface
27
{
28
    const KEY_LOGGER_ETL_IDENTIFIER = 'etl.identifier';
29
30
    /** @var ChainOperationInterface[] */
31
    protected array $chainLinks = [];
32
33 5
    protected ExecutionContextFactoryInterface $contextFactory;
34
35 5
    /** @var string[] */
36 5
    protected array $chainLinkNames = [];
37 5
38
    protected array $asyncItems = [];
39
40
    protected int $maxAsynchronousItems = 10;
41
42
    /**
43
     * ChainProcessor constructor.
44
     *
45
     * @param ChainOperationInterface[] $chainLinks
46 5
     */
47
    public function __construct(
48 5
        array $chainLinks,
49 5
        ExecutionContextFactoryInterface $contextFactory,
50
        int $maxAsynchronousItems = 10
51
    )
52 5
    {
53 4
        $this->contextFactory = $contextFactory;
54
        $this->maxAsynchronousItems = $maxAsynchronousItems;
55
        $this->chainLinkNames = array_keys($chainLinks);
56
        $this->chainLinks = array_values($chainLinks);
57
    }
58
59
    public function process(\Iterator $items, array $parameters)
60
    {
61
        $context = $this->contextFactory->get($parameters);
62
        $context->replaceLoggerContext($parameters);
63
        $context->setLoggerContext(self::KEY_LOGGER_ETL_IDENTIFIER, '');
64
65 5
        try {
66
            $context->getLogger()->info("Starting etl process!");
67 5
            $this->processItems($items, 0, $context);
68
            $context->getLogger()->info("Finished etl process!");
69 5
            $context->finalise();
70 5
        } catch (\Exception $e) {
71 5
            $params['exception'] = $e;
0 ignored issues
show
Comprehensibility Best Practice introduced by
$params was never initialized. Although not strictly required by PHP, it is generally a good practice to add $params = array(); before regardless.
Loading history...
72
            $context->getLogger()->info("Failed during etl process!", $params);
73 5
            $context->finalise();
74 5
            throw $e;
75
        }
76
    }
77 4
78 4
    /**
79 4
     * Process list of items with chain starting at $startAt.
80
     */
81 4
    protected function processItems(\Iterator $items, int $startAt, ExecutionContext $context, bool $withStop = true)
82
    {
83
        $identifierPrefix = $context->getParameter('etl.identifier');
84
85
        $count = 1;
86
        foreach ($items as $item) {
87
            $context->setLoggerContext(self::KEY_LOGGER_ETL_IDENTIFIER, $identifierPrefix . $count++);
0 ignored issues
show
Bug introduced by
Are you sure $identifierPrefix of type array|mixed|null can be used in concatenation? ( Ignorable by Annotation )

If this is a false-positive, you can also ignore this issue in your code via the ignore-type  annotation

87
            $context->setLoggerContext(self::KEY_LOGGER_ETL_IDENTIFIER, /** @scrutinizer ignore-type */ $identifierPrefix . $count++);
Loading history...
88
89
            $dataItem = new DataItem($item);
90
            $this->processItemWithChain($dataItem, $startAt, $context);
91
        }
92
93
        $this->endAllAsyncOperations();
94 5
95
        $stopItem = new StopItem();
96 5
        if ($withStop) {
97 5
            $context->setLoggerContext(self::KEY_LOGGER_ETL_IDENTIFIER, $identifierPrefix . 'STOP');
98
            while ($this->processItemWithChain($stopItem, $startAt, $context) !== $stopItem) {
99 4
                // Executing stop until the system stops.
100 2
            }
101 2
        }
102
103 2
        return $stopItem;
104 4
    }
105 3
106
    public function processItemWithChain(ItemInterface $item, int $startAt, ExecutionContext $context): ItemInterface
107
    {
108
        for ($chainNumber = $startAt; $chainNumber < count($this->chainLinks); $chainNumber++) {
0 ignored issues
show
Performance Best Practice introduced by
It seems like you are calling the size function count() as part of the test condition. You might want to compute the size beforehand, and not on each iteration.

If the size of the collection does not change during the iteration, it is generally a good practice to compute it beforehand, and not on each iteration:

for ($i=0; $i<count($array); $i++) { // calls count() on each iteration
}

// Better
for ($i=0, $c=count($array); $i<$c; $i++) { // calls count() just once
}
Loading history...
109 4
            $item = $this->processItemWithOperation($item, $chainNumber, $context);
110
111
            $item = $this->processItem($item, $chainNumber, $context);
112
        }
113
114
        return $item;
115
    }
116
117
    public function processItem(ItemInterface $item, int $chainNumber, ExecutionContext $context): ItemInterface
118
    {
119
        $this->processAsyncOperations();
120
121
        if ($item instanceof AsyncItemInterface) {
122
            while (count($this->asyncItems) >= $this->maxAsynchronousItems) {
123 5
                usleep(1000);
124
                $this->processAsyncOperations();
125
            }
126 5
            $this->asyncItems[] = [
127 1
                'item' => $item,
128 1
                'context' => $context,
129
                'chain_number' => $chainNumber,
130 1
            ];
131 1
132 1
            return new ChainBreakItem();
133 1
        } elseif ($item instanceof MixItem) {
134 1
            $context->setLoggerContext(self::KEY_LOGGER_ETL_IDENTIFIER, "chain link:{$this->chainLinkNames[$chainNumber]}-");
135
136
            foreach ($item->getItems() as $mixItem) {
137
                if ($mixItem instanceof AsyncItemInterface) {
138
                    $item = $this->processItemWithChain($mixItem, $chainNumber, $context);
139
                } elseif ($mixItem instanceof GroupedItemInterface) {
140
                    $item = $this->processItems($mixItem->getIterator(), $chainNumber + 1, $context, false);
141
                } else {
142
                    $item = $this->processItemWithChain($mixItem, $chainNumber + 1, $context);
143
                }
144
            }
145
146
            if ($item instanceof StopItem) {
147
                return $item;
148
            }
149
            return new ChainBreakItem();
150
        } elseif ($item instanceof GroupedItemInterface) {
151
            $context->setLoggerContext(self::KEY_LOGGER_ETL_IDENTIFIER, "chain link:{$this->chainLinkNames[$chainNumber]}-");
152
            $this->processItems($item->getIterator(), $chainNumber + 1, $context, false);
153
154
            return new StopItem();
155
        } else if ($item instanceof ChainBreakItem) {
156
            return $item;
157
        }
158
159
        return $item;
160
    }
161
162
    protected function processAsyncOperations()
163
    {
164
        foreach ($this->asyncItems as $id => $item) {
165
            if (!$item['item']->isRunning()) {
166
                // Item has finished.
167
                unset($this->asyncItems[$id]);
168
                $this->processItemWithChain($item['item']->getItem(), $item['chain_number'] + 1, $item['context']);
169
            }
170
        }
171
172
    }
173
174
    protected function endAllAsyncOperations()
175
    {
176
        while (!empty($this->asyncItems)) {
177
            $this->processAsyncOperations();
178
179
            if (!empty($this->asyncItems)) {
180
                usleep(1000);
181
            }
182
        }
183
    }
184
185
    /**
186
     * Process an item and handle errors during the process.
187
     *
188
     * @throws ChainOperationException
189
     */
190
    protected function processItemWithOperation(ItemInterface $item, int $chainNumber, ExecutionContext &$context): ItemInterface
191
    {
192
        try {
193
            return $this->chainLinks[$chainNumber]->process($item, $context);
194
        } catch (\Exception $exception) {
195
            throw new ChainOperationException(
196
                "An exception was thrown during the handling of the chain link : "
197
                    . "{$this->chainLinkNames[$chainNumber]} "
198
                    . "with the item {$context->getParameter(self::KEY_LOGGER_ETL_IDENTIFIER)}.",
199
                0,
200
                $exception,
201
                $this->chainLinkNames[$chainNumber]
202
            );
203
        }
204
    }
205
}
206