Completed
Push — master ( 02065a...03f59a )
by Markus
02:11
created

StepAggregator::buildPipeline()   A

Complexity

Conditions 2
Paths 2

Size

Total Lines 14
Code Lines 6

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 8
CRAP Score 2

Importance

Changes 1
Bugs 0 Features 0
Metric Value
dl 0
loc 14
rs 9.4285
c 1
b 0
f 0
ccs 8
cts 8
cp 1
cc 2
eloc 6
nc 2
nop 0
crap 2
1
<?php
2
3
namespace Port\Steps;
4
5
use Port\Exception;
6
use Port\Reader;
7
use Port\Result;
8
use Port\Steps\Step\PriorityStep;
9
use Port\Workflow;
10
use Port\Writer;
11
use Psr\Log\LoggerAwareInterface;
12
use Psr\Log\LoggerAwareTrait;
13
use Psr\Log\NullLogger;
14
use Seld\Signal\SignalHandler;
15
16
/**
17
 * A mediator between a reader and one or more writers and converters
18
 *
19
 * @author David de Boer <[email protected]>
20
 */
21
class StepAggregator implements Workflow, LoggerAwareInterface
22
{
23
    use LoggerAwareTrait;
24
25
    /**
26
     * @var Reader
27
     */
28
    private $reader;
29
30
    /**
31
     * Identifier for the Import/Export
32
     *
33
     * @var string|null
34
     */
35
    private $name = null;
36
37
    /**
38
     * @var boolean
39
     */
40
    private $skipItemOnFailure = false;
41
42
    /**
43
     * @var array
44
     */
45
    private $steps = [];
46
47
    /**
48
     * @var Writer[]
49
     */
50
    private $writers = [];
51
52
    /**
53
     * @param Reader $reader
54
     * @param string $name
55
     */
56 7
    public function __construct(Reader $reader, $name = null)
57
    {
58 7
        $this->name = $name;
59 7
        $this->reader = $reader;
60
61
        // Defaults
62 7
        $this->logger = new NullLogger();
63 7
    }
64
65
    /**
66
     * Add a step to the current workflow
67
     *
68
     * @param Step         $step
69
     * @param integer|null $priority
70
     *
71
     * @return $this
72
     */
73 1
    public function addStep(Step $step, $priority = null)
74
    {
75 1
        $priority = null === $priority && $step instanceof PriorityStep ? $step->getPriority() : $priority;
76 1
        $priority = null === $priority ? 0 : $priority;
77
78 1
        $this->steps[$priority][] = $step;
79
80 1
        return $this;
81
    }
82
83
    /**
84
     * Add a new writer to the current workflow
85
     *
86
     * @param Writer $writer
87
     *
88
     * @return $this
89
     */
90 5
    public function addWriter(Writer $writer)
91
    {
92 5
        array_push($this->writers, $writer);
93
94 5
        return $this;
95
    }
96
97
    /**
98
     * {@inheritdoc}
99
     */
100 4
    public function process()
101
    {
102 4
        $count      = 0;
103 4
        $exceptions = new \SplObjectStorage();
104 4
        $startTime  = new \DateTime;
105
106 4
        $signal = SignalHandler::create(['SIGTERM', 'SIGINT'], $this->logger);
0 ignored issues
show
Documentation introduced by
$this->logger is of type object<Psr\Log\LoggerInterface>, but the function expects a callable|null.

It seems like the type of the argument is not accepted by the function/method which you are calling.

In some cases, in particular if PHP’s automatic type-juggling kicks in this might be fine. In other cases, however this might be a bug.

We suggest to add an explicit type cast like in the following example:

function acceptsInteger($int) { }

$x = '123'; // string "123"

// Instead of
acceptsInteger($x);

// we recommend to use
acceptsInteger((integer) $x);
Loading history...
107
108 4
        foreach ($this->writers as $writer) {
109 4
            $writer->prepare();
110 4
        }
111
112 4
        $pipeline = $this->buildPipeline();
113
114
        // Read all items
115 4
        foreach ($this->reader as $index => $item) {
116
            try {
117 4
                if ($signal->isTriggered()) {
118
                    break;
119
                }
120
121 4
                if (false === $pipeline($item)) {
122
                    continue;
123
                }
124 4
            } catch(Exception $e) {
125 2
                if (!$this->skipItemOnFailure) {
126 1
                    throw $e;
127
                }
128
129 1
                $exceptions->attach($e, $index);
130 1
                $this->logger->error($e->getMessage());
131
            }
132
133 3
            $count++;
134 3
        }
135
136 3
        foreach ($this->writers as $writer) {
137 3
            $writer->finish();
138 3
        }
139
140 3
        return new Result($this->name, $startTime, new \DateTime, $count, $exceptions);
141
    }
142
143
    /**
144
     * Sets the value which determines whether the item should be skipped when error occures
145
     *
146
     * @param boolean $skipItemOnFailure When true skip current item on process exception and log the error
147
     *
148
     * @return $this
149
     */
150 1
    public function setSkipItemOnFailure($skipItemOnFailure)
151
    {
152 1
        $this->skipItemOnFailure = $skipItemOnFailure;
153
154 1
        return $this;
155
    }
156
157
    /**
158
     * @return string
159
     */
160
    public function getName()
161
    {
162
        return $this->name;
163
    }
164
165
    /**
166
     * Builds the pipeline
167
     *
168
     * @return callable
169
     */
170 4
    private function buildPipeline()
171
    {
172
        $nextCallable = function ($item) {
0 ignored issues
show
Unused Code introduced by
The parameter $item is not used and could be removed.

This check looks from parameters that have been defined for a function or method, but which are not used in the method body.

Loading history...
173
            // the final callable is a no-op
174 4
        };
175
176 4
        foreach ($this->getStepsSortedDescByPriority() as $step) {
177 4
            $nextCallable = function ($item) use ($step, $nextCallable) {
178 4
                return $step->process($item, $nextCallable);
179 4
            };
180 4
        }
181
182 4
        return $nextCallable;
183
    }
184
185
    /**
186
     * Sorts the internal list of steps and writers by priority in reverse order.
187
     *
188
     * @return Step[]
189
     */
190 4
    private function getStepsSortedDescByPriority()
191
    {
192 4
        $steps = $this->steps;
193
        // Use illogically large and small priorities
194 4
        $steps[-255][] = new Step\ArrayCheckStep;
195 4
        foreach ($this->writers as $writer) {
196 4
            $steps[-256][] = new Step\WriterStep($writer);
197 4
        }
198
199 4
        ksort($steps);
200
201 4
        $sortedStep = [];
202
        /** @var Step[] $stepsAtSamePriority */
203 4
        foreach ($steps as $stepsAtSamePriority) {
204 4
            foreach ($stepsAtSamePriority as $step) {
205 4
                $sortedStep[] = $step;
206 4
            }
207 4
        }
208
209 4
        return array_reverse($sortedStep);
210
    }
211
}
212