1
|
|
|
<?php |
2
|
|
|
|
3
|
|
|
namespace Oro\Bundle\MagentoBundle\Provider; |
4
|
|
|
|
5
|
|
|
use Doctrine\Common\Persistence\ManagerRegistry; |
6
|
|
|
|
7
|
|
|
use Symfony\Component\EventDispatcher\EventDispatcherInterface; |
8
|
|
|
|
9
|
|
|
use Oro\Bundle\ImportExportBundle\Processor\ProcessorRegistry; |
10
|
|
|
use Oro\Bundle\IntegrationBundle\Entity\Channel as Integration; |
11
|
|
|
use Oro\Bundle\IntegrationBundle\Entity\Status; |
12
|
|
|
use Oro\Bundle\IntegrationBundle\ImportExport\Job\Executor; |
13
|
|
|
use Oro\Bundle\IntegrationBundle\Logger\LoggerStrategy; |
14
|
|
|
use Oro\Bundle\IntegrationBundle\Manager\TypesRegistry; |
15
|
|
|
use Oro\Bundle\IntegrationBundle\Provider\SyncProcessor; |
16
|
|
|
|
17
|
|
|
class InitialSyncProcessor extends AbstractInitialProcessor |
18
|
|
|
{ |
19
|
|
|
const INITIAL_CONNECTOR_SUFFIX = '_initial'; |
20
|
|
|
|
21
|
|
|
/** @var array|null */ |
22
|
|
|
protected $bundleConfiguration; |
23
|
|
|
|
24
|
|
|
/** @var SyncProcessor[] */ |
25
|
|
|
protected $postProcessors = []; |
26
|
|
|
|
27
|
|
|
/** @var bool */ |
28
|
|
|
protected $dictionaryDataLoaded = false; |
29
|
|
|
|
30
|
|
|
/** |
31
|
|
|
* @param ManagerRegistry $doctrineRegistry |
32
|
|
|
* @param ProcessorRegistry $processorRegistry |
33
|
|
|
* @param Executor $jobExecutor |
34
|
|
|
* @param TypesRegistry $registry |
35
|
|
|
* @param EventDispatcherInterface $eventDispatcher |
36
|
|
|
* @param LoggerStrategy $logger |
37
|
|
|
* @param array $bundleConfiguration |
38
|
|
|
*/ |
39
|
|
|
public function __construct( |
40
|
|
|
ManagerRegistry $doctrineRegistry, |
41
|
|
|
ProcessorRegistry $processorRegistry, |
42
|
|
|
Executor $jobExecutor, |
43
|
|
|
TypesRegistry $registry, |
44
|
|
|
EventDispatcherInterface $eventDispatcher, |
45
|
|
|
LoggerStrategy $logger = null, |
46
|
|
|
array $bundleConfiguration = null |
47
|
|
|
) { |
48
|
|
|
parent::__construct( |
49
|
|
|
$doctrineRegistry, |
50
|
|
|
$processorRegistry, |
51
|
|
|
$jobExecutor, |
52
|
|
|
$registry, |
53
|
|
|
$eventDispatcher, |
54
|
|
|
$logger |
55
|
|
|
); |
56
|
|
|
|
57
|
|
|
$this->bundleConfiguration = $bundleConfiguration; |
58
|
|
|
} |
59
|
|
|
|
60
|
|
|
/** |
61
|
|
|
* @param string $connectorType |
62
|
|
|
* @param SyncProcessor $processor |
63
|
|
|
* @return InitialSyncProcessor |
64
|
|
|
*/ |
65
|
|
|
public function addPostProcessor($connectorType, SyncProcessor $processor) |
66
|
|
|
{ |
67
|
|
|
$this->postProcessors[$connectorType] = $processor; |
68
|
|
|
|
69
|
|
|
return $this; |
70
|
|
|
} |
71
|
|
|
|
72
|
|
|
/** |
73
|
|
|
* {@inheritdoc} |
74
|
|
|
*/ |
75
|
|
|
protected function processDictionaryConnectors(Integration $integration) |
76
|
|
|
{ |
77
|
|
|
if (!$this->dictionaryDataLoaded) { |
78
|
|
|
parent::processDictionaryConnectors($integration); |
79
|
|
|
|
80
|
|
|
$this->dictionaryDataLoaded = true; |
81
|
|
|
} |
82
|
|
|
} |
83
|
|
|
|
84
|
|
|
/** |
85
|
|
|
* {@inheritdoc} |
86
|
|
|
*/ |
87
|
|
|
protected function processConnectors(Integration $integration, array $parameters = [], callable $callback = null) |
88
|
|
|
{ |
89
|
|
|
if (empty($parameters['skip-dictionary'])) { |
90
|
|
|
$this->processDictionaryConnectors($integration); |
91
|
|
|
} |
92
|
|
|
|
93
|
|
|
// Set start date for initial connectors |
94
|
|
|
$startSyncDate = $integration->getTransport()->getSettingsBag()->get('start_sync_date'); |
95
|
|
|
$parameters[self::START_SYNC_DATE] = $startSyncDate; |
96
|
|
|
|
97
|
|
|
// Pass interval to connectors for further filters creation |
98
|
|
|
$interval = $this->getSyncInterval(); |
99
|
|
|
$parameters[self::INTERVAL] = $interval; |
100
|
|
|
|
101
|
|
|
// Collect initial connectors |
102
|
|
|
$postProcessConnectorTypes = array_keys($this->postProcessors); |
103
|
|
|
$connectors = $this->getTypesOfConnectorsToProcess($integration, $this->getConnectorsFilterFunction($callback)); |
104
|
|
|
$postProcessConnectors = array_intersect($connectors, $postProcessConnectorTypes); |
105
|
|
|
$connectors = array_diff($connectors, $postProcessConnectorTypes); |
106
|
|
|
|
107
|
|
|
/** @var \DateTime[] $connectorsSyncedTo */ |
108
|
|
|
$connectorsSyncedTo = []; |
109
|
|
|
foreach ($connectors as $connector) { |
110
|
|
|
$connectorsSyncedTo[$connector] = $this->getInitialConnectorSyncedTo($integration, $connector); |
111
|
|
|
} |
112
|
|
|
|
113
|
|
|
// Process all initial connectors by date interval while there are connectors to process |
114
|
|
|
$isSuccess = true; |
115
|
|
|
do { |
116
|
|
|
$syncedConnectors = 0; |
117
|
|
|
foreach ($connectors as $connector) { |
118
|
|
|
if ($connectorsSyncedTo[$connector] > $startSyncDate) { |
119
|
|
|
$syncedConnectors++; |
120
|
|
|
|
121
|
|
|
$this->logger->info( |
122
|
|
|
sprintf( |
123
|
|
|
'Syncing connector %s starting %s interval %s', |
124
|
|
|
$connector, |
125
|
|
|
$connectorsSyncedTo[$connector]->format('Y-m-d H:i:s'), |
126
|
|
|
$interval->format('%d days') |
127
|
|
|
) |
128
|
|
|
); |
129
|
|
|
|
130
|
|
|
try { |
131
|
|
|
// Pass synced to for further filters creation |
132
|
|
|
$parameters = array_merge( |
133
|
|
|
$parameters, |
134
|
|
|
[self::INITIAL_SYNCED_TO => clone $connectorsSyncedTo[$connector]] |
135
|
|
|
); |
136
|
|
|
|
137
|
|
|
$realConnector = $this->getRealConnector($integration, $connector); |
138
|
|
|
$status = $this->processIntegrationConnector( |
139
|
|
|
$integration, |
140
|
|
|
$realConnector, |
141
|
|
|
$parameters |
142
|
|
|
); |
143
|
|
|
// Move sync date into past by interval value |
144
|
|
|
$connectorsSyncedTo[$connector]->sub($interval); |
145
|
|
|
|
146
|
|
|
$isSuccess = $isSuccess && $this->isIntegrationConnectorProcessSuccess($status); |
147
|
|
|
|
148
|
|
|
if ($isSuccess) { |
149
|
|
|
// Save synced to date for connector |
150
|
|
|
$syncedTo = $connectorsSyncedTo[$connector]; |
151
|
|
|
if ($syncedTo < $startSyncDate) { |
152
|
|
|
$syncedTo = $startSyncDate; |
153
|
|
|
} |
154
|
|
|
$this->updateSyncedTo($integration, $connector, $syncedTo); |
155
|
|
|
} else { |
156
|
|
|
break 2; |
157
|
|
|
} |
158
|
|
|
} catch (\Exception $e) { |
159
|
|
|
$isSuccess = false; |
160
|
|
|
|
161
|
|
|
$this->logger->critical($e->getMessage()); |
162
|
|
|
break 2; |
163
|
|
|
} |
164
|
|
|
} |
165
|
|
|
} |
166
|
|
|
} while ($syncedConnectors > 0); |
167
|
|
|
|
168
|
|
|
if ($isSuccess && $postProcessConnectors) { |
|
|
|
|
169
|
|
|
$isSuccess = $this->executePostProcessConnectors( |
170
|
|
|
$integration, |
171
|
|
|
$parameters, |
172
|
|
|
$postProcessConnectors, |
173
|
|
|
$startSyncDate |
174
|
|
|
); |
175
|
|
|
} |
176
|
|
|
|
177
|
|
|
return $isSuccess; |
178
|
|
|
} |
179
|
|
|
|
180
|
|
|
/** |
181
|
|
|
* @param callable|null $callback |
182
|
|
|
* @return \Closure |
183
|
|
|
*/ |
184
|
|
|
protected function getConnectorsFilterFunction(callable $callback = null) |
185
|
|
|
{ |
186
|
|
|
return function ($connector) use ($callback) { |
187
|
|
|
if (is_callable($callback) && !call_user_func($callback, $connector)) { |
188
|
|
|
return false; |
189
|
|
|
} |
190
|
|
|
|
191
|
|
|
return strpos($connector, self::INITIAL_CONNECTOR_SUFFIX) !== false; |
192
|
|
|
}; |
193
|
|
|
} |
194
|
|
|
|
195
|
|
|
/** |
196
|
|
|
* @param Integration $integration |
197
|
|
|
* @param string $connector |
198
|
|
|
* @param \DateTime $syncedTo |
199
|
|
|
*/ |
200
|
|
|
protected function updateSyncedTo(Integration $integration, $connector, \DateTime $syncedTo) |
201
|
|
|
{ |
202
|
|
|
$formattedSyncedTo = $syncedTo->format(\DateTime::ISO8601); |
203
|
|
|
|
204
|
|
|
$lastStatus = $this->getLastStatusForConnector($integration, $connector, Status::STATUS_COMPLETED); |
205
|
|
|
$statusData = $lastStatus->getData(); |
206
|
|
|
$statusData[self::INITIAL_SYNCED_TO] = $formattedSyncedTo; |
207
|
|
|
$lastStatus->setData($statusData); |
208
|
|
|
|
209
|
|
|
$this->addConnectorStatusAndFlush($integration, $lastStatus); |
|
|
|
|
210
|
|
|
} |
211
|
|
|
|
212
|
|
|
/** |
213
|
|
|
* @param Integration $integration |
214
|
|
|
* @param string $connector |
215
|
|
|
* @return \DateTime |
216
|
|
|
*/ |
217
|
|
|
protected function getInitialConnectorSyncedTo(Integration $integration, $connector) |
218
|
|
|
{ |
219
|
|
|
$latestSyncedTo = $this->getSyncedTo($integration, $connector); |
220
|
|
|
if ($latestSyncedTo === false) { |
221
|
|
|
return clone $this->getInitialSyncStartDate($integration); |
222
|
|
|
} |
223
|
|
|
|
224
|
|
|
return clone $latestSyncedTo; |
225
|
|
|
} |
226
|
|
|
|
227
|
|
|
/** |
228
|
|
|
* @return \DateInterval |
229
|
|
|
*/ |
230
|
|
|
protected function getSyncInterval() |
231
|
|
|
{ |
232
|
|
|
if (empty($this->bundleConfiguration['sync_settings']['initial_import_step_interval'])) { |
233
|
|
|
throw new \InvalidArgumentException('Option "initial_import_step_interval" is missing'); |
234
|
|
|
} |
235
|
|
|
|
236
|
|
|
$syncInterval = $this->bundleConfiguration['sync_settings']['initial_import_step_interval']; |
237
|
|
|
$interval = \DateInterval::createFromDateString($syncInterval); |
238
|
|
|
|
239
|
|
|
return $interval; |
240
|
|
|
} |
241
|
|
|
|
242
|
|
|
/** |
243
|
|
|
* @param Integration $integration |
244
|
|
|
* @param array $parameters |
245
|
|
|
* @param array $postProcessConnectors |
246
|
|
|
* @param \DateTime $startSyncDate |
247
|
|
|
* @return bool |
248
|
|
|
*/ |
249
|
|
|
protected function executePostProcessConnectors( |
250
|
|
|
Integration $integration, |
251
|
|
|
array $parameters, |
252
|
|
|
array $postProcessConnectors, |
253
|
|
|
\DateTime $startSyncDate |
254
|
|
|
) { |
255
|
|
|
$isSuccess = true; |
256
|
|
|
foreach ($postProcessConnectors as $connectorType) { |
257
|
|
|
// Do not sync already synced connectors |
258
|
|
|
if ($this->getLastStatusForConnector($integration, $connectorType, Status::STATUS_COMPLETED)) { |
259
|
|
|
continue; |
260
|
|
|
} |
261
|
|
|
|
262
|
|
|
$processor = $this->postProcessors[$connectorType]; |
263
|
|
|
$isSuccess = $isSuccess && $processor->process($integration, $connectorType, $parameters); |
264
|
|
|
if ($isSuccess) { |
265
|
|
|
$this->updateSyncedTo($integration, $connectorType, $startSyncDate); |
266
|
|
|
} |
267
|
|
|
} |
268
|
|
|
|
269
|
|
|
return $isSuccess; |
270
|
|
|
} |
271
|
|
|
} |
272
|
|
|
|
This check marks implicit conversions of arrays to boolean values in a comparison. While in PHP an empty array is considered to be equal (but not identical) to false, this is not always apparent.
Consider making the comparison explicit by using
empty(..)
or! empty(...)
instead.