1 | <?php |
||
23 | class StepAggregator implements Workflow, LoggerAwareInterface |
||
24 | { |
||
25 | use LoggerAwareTrait; |
||
26 | |||
27 | /** |
||
28 | * @var Reader |
||
29 | */ |
||
30 | private $reader; |
||
31 | |||
32 | /** |
||
33 | * Identifier for the Import/Export |
||
34 | * |
||
35 | * @var string|null |
||
36 | */ |
||
37 | private $name = null; |
||
38 | |||
39 | /** |
||
40 | * @var boolean |
||
41 | */ |
||
42 | private $skipItemOnFailure = false; |
||
43 | |||
44 | /** |
||
45 | * @var \SplPriorityQueue |
||
46 | */ |
||
47 | private $steps; |
||
48 | |||
49 | /** |
||
50 | * @var Writer[] |
||
51 | */ |
||
52 | private $writers = []; |
||
53 | |||
54 | /** |
||
55 | * @var boolean |
||
56 | */ |
||
57 | protected $shouldStop = false; |
||
58 | |||
59 | /** |
||
60 | * @param Reader $reader |
||
61 | * @param string $name |
||
62 | */ |
||
63 | 13 | public function __construct(Reader $reader, $name = null) |
|
64 | { |
||
65 | 13 | $this->name = $name; |
|
66 | 13 | $this->reader = $reader; |
|
67 | |||
68 | // Defaults |
||
69 | 13 | $this->logger = new NullLogger(); |
|
70 | 13 | $this->steps = new \SplPriorityQueue(); |
|
71 | 13 | } |
|
72 | |||
73 | /** |
||
74 | * Add a step to the current workflow |
||
75 | * |
||
76 | * @param Step $step |
||
77 | * @param integer|null $priority |
||
78 | * |
||
79 | * @return $this |
||
80 | */ |
||
81 | 6 | public function addStep(Step $step, $priority = null) |
|
90 | |||
91 | /** |
||
92 | * Add a new writer to the current workflow |
||
93 | * |
||
94 | * @param Writer $writer |
||
95 | * |
||
96 | * @return $this |
||
97 | */ |
||
98 | 9 | public function addWriter(Writer $writer) |
|
99 | { |
||
100 | 9 | array_push($this->writers, $writer); |
|
101 | |||
102 | 9 | return $this; |
|
103 | } |
||
104 | |||
105 | /** |
||
106 | * {@inheritdoc} |
||
107 | */ |
||
108 | 11 | public function process() |
|
109 | { |
||
110 | 11 | $count = 0; |
|
111 | 11 | $exceptions = new \SplObjectStorage(); |
|
112 | 11 | $startTime = new \DateTime; |
|
113 | |||
114 | 11 | foreach ($this->writers as $writer) { |
|
115 | 8 | $writer->prepare(); |
|
116 | 11 | } |
|
117 | |||
118 | 11 | if (is_callable('pcntl_signal')) { |
|
119 | 11 | pcntl_signal(SIGTERM, array($this, 'stop')); |
|
120 | 11 | pcntl_signal(SIGINT, array($this, 'stop')); |
|
121 | 11 | } |
|
122 | |||
123 | // Read all items |
||
124 | 11 | foreach ($this->reader as $index => $item) { |
|
125 | |||
126 | 11 | if (is_callable('pcntl_signal_dispatch')) { |
|
127 | 11 | pcntl_signal_dispatch(); |
|
128 | 11 | } |
|
129 | |||
130 | 11 | if ($this->shouldStop) { |
|
131 | break; |
||
132 | } |
||
133 | |||
134 | try { |
||
135 | 11 | foreach (clone $this->steps as $step) { |
|
136 | 5 | if (false === $step->process($item)) { |
|
137 | 2 | continue 2; |
|
138 | } |
||
139 | 9 | } |
|
140 | |||
141 | 9 | if (!is_array($item) && !($item instanceof \ArrayAccess && $item instanceof \Traversable)) { |
|
142 | 2 | throw new UnexpectedTypeException($item, 'array'); |
|
143 | } |
||
144 | |||
145 | 7 | foreach ($this->writers as $writer) { |
|
146 | 6 | $writer->writeItem($item); |
|
147 | 6 | } |
|
148 | 9 | } catch(Exception $e) { |
|
149 | 3 | if (!$this->skipItemOnFailure) { |
|
150 | 2 | throw $e; |
|
151 | } |
||
152 | |||
153 | 1 | $exceptions->attach($e, $index); |
|
154 | 1 | $this->logger->error($e->getMessage()); |
|
155 | } |
||
156 | |||
157 | 7 | $count++; |
|
158 | 9 | } |
|
159 | |||
160 | 9 | foreach ($this->writers as $writer) { |
|
161 | 6 | $writer->finish(); |
|
162 | 9 | } |
|
163 | |||
164 | 9 | return new Result($this->name, $startTime, new \DateTime, $count, $exceptions); |
|
165 | } |
||
166 | |||
167 | /** |
||
168 | * Stops processing and force return Result from process() function |
||
169 | */ |
||
170 | public function stop() |
||
174 | |||
175 | /** |
||
176 | * Sets the value which determines whether the item should be skipped when error occures |
||
177 | * |
||
178 | * @param boolean $skipItemOnFailure When true skip current item on process exception and log the error |
||
179 | * |
||
180 | * @return $this |
||
181 | */ |
||
182 | 1 | public function setSkipItemOnFailure($skipItemOnFailure) |
|
183 | { |
||
184 | 1 | $this->skipItemOnFailure = $skipItemOnFailure; |
|
185 | |||
186 | 1 | return $this; |
|
187 | } |
||
188 | |||
189 | /** |
||
190 | * @return string |
||
191 | */ |
||
192 | public function getName() |
||
196 | } |
||
197 |