Completed
Pull Request — master (#296)
by
unknown
03:13
created

StepAggregator::__construct()   A

Complexity

Conditions 1
Paths 1

Size

Total Lines 9
Code Lines 5

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 6
CRAP Score 1

Importance

Changes 2
Bugs 0 Features 1
Metric Value
c 2
b 0
f 1
dl 0
loc 9
ccs 6
cts 6
cp 1
rs 9.6666
cc 1
eloc 5
nc 1
nop 2
crap 1
1
<?php
2
3
namespace Ddeboer\DataImport\Workflow;
4
5
use Ddeboer\DataImport\Exception;
6
use Ddeboer\DataImport\Exception\UnexpectedTypeException;
7
use Ddeboer\DataImport\Reader;
8
use Ddeboer\DataImport\Result;
9
use Ddeboer\DataImport\Step;
10
use Ddeboer\DataImport\Step\PriorityStep;
11
use Ddeboer\DataImport\Workflow;
12
use Ddeboer\DataImport\Writer;
13
use Psr\Log\LoggerInterface;
14
use Psr\Log\LoggerAwareInterface;
15
use Psr\Log\LoggerAwareTrait;
16
use Psr\Log\NullLogger;
17
18
/**
19
 * A mediator between a reader and one or more writers and converters
20
 *
21
 * @author David de Boer <[email protected]>
22
 */
23
class StepAggregator implements Workflow, LoggerAwareInterface
24
{
25
    use LoggerAwareTrait;
26
27
    /**
28
     * @var Reader
29
     */
30
    private $reader;
31
32
    /**
33
     * Identifier for the Import/Export
34
     *
35
     * @var string|null
36
     */
37
    private $name = null;
38
39
    /**
40
     * @var boolean
41
     */
42
    private $skipItemOnFailure = false;
43
44
    /**
45
     * @var \SplPriorityQueue
46
     */
47
    private $steps;
48
49
    /**
50
     * @var Writer[]
51
     */
52
    private $writers = [];
53
54
    /**
55
     * @var boolean
56
     */
57
    protected $shouldStop = false;
58
59
    /**
60
     * @param Reader $reader
61
     * @param string $name
62
     */
63 13
    public function __construct(Reader $reader, $name = null)
64
    {
65 13
        $this->name = $name;
66 13
        $this->reader = $reader;
67
68
        // Defaults
69 13
        $this->logger = new NullLogger();
70 13
        $this->steps = new \SplPriorityQueue();
71 13
    }
72
73
    /**
74
     * Add a step to the current workflow
75
     *
76
     * @param Step         $step
77
     * @param integer|null $priority
78
     *
79
     * @return $this
80
     */
81 6
    public function addStep(Step $step, $priority = null)
82
    {
83 6
        $priority = null === $priority && $step instanceof PriorityStep ? $step->getPriority() : null;
84 6
        $priority = null === $priority ? 0 : $priority;
85
86 6
        $this->steps->insert($step, $priority);
87
88 6
        return $this;
89
    }
90
91
    /**
92
     * Add a new writer to the current workflow
93
     *
94
     * @param Writer $writer
95
     *
96
     * @return $this
97
     */
98 9
    public function addWriter(Writer $writer)
99
    {
100 9
        array_push($this->writers, $writer);
101
102 9
        return $this;
103
    }
104
105
    /**
106
     * {@inheritdoc}
107
     */
108 11
    public function process()
109
    {
110 11
        $count      = 0;
111 11
        $exceptions = new \SplObjectStorage();
112 11
        $startTime  = new \DateTime;
113
114 11
        foreach ($this->writers as $writer) {
115 8
            $writer->prepare();
116 11
        }
117
118 11
        if (is_callable('pcntl_signal')) {
119 11
            pcntl_signal(SIGTERM, array($this, 'stop'));
120 11
            pcntl_signal(SIGINT, array($this, 'stop'));
121 11
        }
122
123
        // Read all items
124 11
        foreach ($this->reader as $index => $item) {
125
126 11
            if (is_callable('pcntl_signal_dispatch')) {
127 11
                pcntl_signal_dispatch();
128 11
            }
129
130 11
            if ($this->shouldStop) {
131
                break;
132
            }
133
134
            try {
135 11
                foreach (clone $this->steps as $step) {
136 5
                    if (false === $step->process($item)) {
137 2
                        continue 2;
138
                    }
139 9
                }
140
141 9
                if (!is_array($item) && !($item instanceof \ArrayAccess && $item instanceof \Traversable)) {
142 2
                    throw new UnexpectedTypeException($item, 'array');
143
                }
144
145 7
                foreach ($this->writers as $writer) {
146 6
                    $writer->writeItem($item);
147 6
                }
148 9
            } catch(Exception $e) {
149 3
                if (!$this->skipItemOnFailure) {
150 2
                    throw $e;
151
                }
152
153 1
                $exceptions->attach($e, $index);
154 1
                $this->logger->error($e->getMessage());
155
            }
156
157 7
            $count++;
158 9
        }
159
160 9
        foreach ($this->writers as $writer) {
161 6
            $writer->finish();
162 9
        }
163
164 9
        return new Result($this->name, $startTime, new \DateTime, $count, $exceptions);
165
    }
166
167
    /**
168
     * Stops processing and force return Result from process() function
169
     */
170
    public function stop()
171
    {
172
        $this->shouldStop = true;
173
    }
174
175
    /**
176
     * Sets the value which determines whether the item should be skipped when error occures
177
     *
178
     * @param boolean $skipItemOnFailure When true skip current item on process exception and log the error
179
     *
180
     * @return $this
181
     */
182 1
    public function setSkipItemOnFailure($skipItemOnFailure)
183
    {
184 1
        $this->skipItemOnFailure = $skipItemOnFailure;
185
186 1
        return $this;
187
    }
188
189
    /**
190
     * @return string
191
     */
192
    public function getName()
193
    {
194
        return $this->name;
195
    }
196
}
197