Completed
Push — master ( 664fb1...60ffba )
by
unknown
26:32
created

processDictionaryConnectors()   A

Complexity

Conditions 2
Paths 2

Size

Total Lines 8
Code Lines 4

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
dl 0
loc 8
rs 9.4285
c 0
b 0
f 0
cc 2
eloc 4
nc 2
nop 1
1
<?php
2
3
namespace Oro\Bundle\MagentoBundle\Provider;
4
5
use Doctrine\Common\Persistence\ManagerRegistry;
6
7
use Symfony\Component\EventDispatcher\EventDispatcherInterface;
8
9
use Oro\Bundle\ImportExportBundle\Processor\ProcessorRegistry;
10
use Oro\Bundle\IntegrationBundle\Entity\Channel as Integration;
11
use Oro\Bundle\IntegrationBundle\Entity\Status;
12
use Oro\Bundle\IntegrationBundle\ImportExport\Job\Executor;
13
use Oro\Bundle\IntegrationBundle\Logger\LoggerStrategy;
14
use Oro\Bundle\IntegrationBundle\Manager\TypesRegistry;
15
use Oro\Bundle\IntegrationBundle\Provider\SyncProcessor;
16
17
class InitialSyncProcessor extends AbstractInitialProcessor
18
{
19
    const INITIAL_CONNECTOR_SUFFIX = '_initial';
20
21
    /** @var array|null */
22
    protected $bundleConfiguration;
23
24
    /** @var SyncProcessor[] */
25
    protected $postProcessors = [];
26
27
    /** @var bool */
28
    protected $dictionaryDataLoaded = false;
29
30
    /**
31
     * @param ManagerRegistry $doctrineRegistry
32
     * @param ProcessorRegistry $processorRegistry
33
     * @param Executor $jobExecutor
34
     * @param TypesRegistry $registry
35
     * @param EventDispatcherInterface $eventDispatcher
36
     * @param LoggerStrategy $logger
37
     * @param array $bundleConfiguration
38
     */
39
    public function __construct(
40
        ManagerRegistry $doctrineRegistry,
41
        ProcessorRegistry $processorRegistry,
42
        Executor $jobExecutor,
43
        TypesRegistry $registry,
44
        EventDispatcherInterface $eventDispatcher,
45
        LoggerStrategy $logger = null,
46
        array $bundleConfiguration = null
47
    ) {
48
        parent::__construct(
49
            $doctrineRegistry,
50
            $processorRegistry,
51
            $jobExecutor,
52
            $registry,
53
            $eventDispatcher,
54
            $logger
55
        );
56
57
        $this->bundleConfiguration = $bundleConfiguration;
58
    }
59
60
    /**
61
     * @param string $connectorType
62
     * @param SyncProcessor $processor
63
     * @return InitialSyncProcessor
64
     */
65
    public function addPostProcessor($connectorType, SyncProcessor $processor)
66
    {
67
        $this->postProcessors[$connectorType] = $processor;
68
69
        return $this;
70
    }
71
72
    /**
73
     * {@inheritdoc}
74
     */
75
    protected function processDictionaryConnectors(Integration $integration)
76
    {
77
        if (!$this->dictionaryDataLoaded) {
78
            parent::processDictionaryConnectors($integration);
79
80
            $this->dictionaryDataLoaded = true;
81
        }
82
    }
83
84
    /**
85
     * {@inheritdoc}
86
     */
87
    protected function processConnectors(Integration $integration, array $parameters = [], callable $callback = null)
88
    {
89
        if (empty($parameters['skip-dictionary'])) {
90
            $this->processDictionaryConnectors($integration);
91
        }
92
93
        // Set start date for initial connectors
94
        $startSyncDate = $integration->getTransport()->getSettingsBag()->get('start_sync_date');
95
        $parameters[self::START_SYNC_DATE] = $startSyncDate;
96
97
        // Pass interval to connectors for further filters creation
98
        $interval = $this->getSyncInterval();
99
        $parameters[self::INTERVAL] = $interval;
100
101
        // Collect initial connectors
102
        $postProcessConnectorTypes = array_keys($this->postProcessors);
103
        $connectors = $this->getTypesOfConnectorsToProcess($integration, $this->getConnectorsFilterFunction($callback));
104
        $postProcessConnectors = array_intersect($connectors, $postProcessConnectorTypes);
105
        $connectors = array_diff($connectors, $postProcessConnectorTypes);
106
107
        /** @var \DateTime[] $connectorsSyncedTo */
108
        $connectorsSyncedTo = [];
109
        foreach ($connectors as $connector) {
110
            $connectorsSyncedTo[$connector] = $this->getInitialConnectorSyncedTo($integration, $connector);
111
        }
112
113
        // Process all initial connectors by date interval while there are connectors to process
114
        $isSuccess = true;
115
        do {
116
            $syncedConnectors = 0;
117
            foreach ($connectors as $connector) {
118
                if ($connectorsSyncedTo[$connector] > $startSyncDate) {
119
                    $syncedConnectors++;
120
121
                    $this->logger->info(
122
                        sprintf(
123
                            'Syncing connector %s starting %s interval %s',
124
                            $connector,
125
                            $connectorsSyncedTo[$connector]->format('Y-m-d H:i:s'),
126
                            $interval->format('%d days')
127
                        )
128
                    );
129
130
                    try {
131
                        // Pass synced to for further filters creation
132
                        $parameters = array_merge(
133
                            $parameters,
134
                            [self::INITIAL_SYNCED_TO => clone $connectorsSyncedTo[$connector]]
135
                        );
136
137
                        $realConnector = $this->getRealConnector($integration, $connector);
138
                        $status = $this->processIntegrationConnector(
139
                            $integration,
140
                            $realConnector,
141
                            $parameters
142
                        );
143
                        // Move sync date into past by interval value
144
                        $connectorsSyncedTo[$connector]->sub($interval);
145
146
                        $isSuccess = $isSuccess && $this->isIntegrationConnectorProcessSuccess($status);
147
148
                        if ($isSuccess) {
149
                            // Save synced to date for connector
150
                            $syncedTo = $connectorsSyncedTo[$connector];
151
                            if ($syncedTo < $startSyncDate) {
152
                                $syncedTo = $startSyncDate;
153
                            }
154
                            $this->updateSyncedTo($integration, $connector, $syncedTo);
155
                        } else {
156
                            break 2;
157
                        }
158
                    } catch (\Exception $e) {
159
                        $isSuccess = false;
160
161
                        $this->logger->critical($e->getMessage());
162
                        break 2;
163
                    }
164
                }
165
            }
166
        } while ($syncedConnectors > 0);
167
168
        if ($isSuccess && $postProcessConnectors) {
0 ignored issues
show
Bug Best Practice introduced by
The expression $postProcessConnectors of type string[] is implicitly converted to a boolean; are you sure this is intended? If so, consider using ! empty($expr) instead to make it clear that you intend to check for an array without elements.

This check marks implicit conversions of arrays to boolean values in a comparison. While in PHP an empty array is considered to be equal (but not identical) to false, this is not always apparent.

Consider making the comparison explicit by using empty(..) or ! empty(...) instead.

Loading history...
169
            $isSuccess = $this->executePostProcessConnectors(
170
                $integration,
171
                $parameters,
172
                $postProcessConnectors,
173
                $startSyncDate
174
            );
175
        }
176
177
        return $isSuccess;
178
    }
179
180
    /**
181
     * @param callable|null $callback
182
     * @return \Closure
183
     */
184
    protected function getConnectorsFilterFunction(callable $callback = null)
185
    {
186
        return function ($connector) use ($callback) {
187
            if (is_callable($callback) && !call_user_func($callback, $connector)) {
188
                return false;
189
            }
190
191
            return strpos($connector, self::INITIAL_CONNECTOR_SUFFIX) !== false;
192
        };
193
    }
194
195
    /**
196
     * @param Integration $integration
197
     * @param string $connector
198
     * @param \DateTime $syncedTo
199
     */
200
    protected function updateSyncedTo(Integration $integration, $connector, \DateTime $syncedTo)
201
    {
202
        $formattedSyncedTo = $syncedTo->format(\DateTime::ISO8601);
203
204
        $lastStatus = $this->getLastStatusForConnector($integration, $connector, Status::STATUS_COMPLETED);
205
        $statusData = $lastStatus->getData();
206
        $statusData[self::INITIAL_SYNCED_TO] = $formattedSyncedTo;
207
        $lastStatus->setData($statusData);
208
209
        $this->addConnectorStatusAndFlush($integration, $lastStatus);
0 ignored issues
show
Bug introduced by
It seems like $lastStatus defined by $this->getLastStatusForC...atus::STATUS_COMPLETED) on line 204 can be null; however, Oro\Bundle\IntegrationBu...nnectorStatusAndFlush() does not accept null, maybe add an additional type check?

Unless you are absolutely sure that the expression can never be null because of other conditions, we strongly recommend to add an additional type check to your code:

/** @return stdClass|null */
function mayReturnNull() { }

function doesNotAcceptNull(stdClass $x) { }

// With potential error.
function withoutCheck() {
    $x = mayReturnNull();
    doesNotAcceptNull($x); // Potential error here.
}

// Safe - Alternative 1
function withCheck1() {
    $x = mayReturnNull();
    if ( ! $x instanceof stdClass) {
        throw new \LogicException('$x must be defined.');
    }
    doesNotAcceptNull($x);
}

// Safe - Alternative 2
function withCheck2() {
    $x = mayReturnNull();
    if ($x instanceof stdClass) {
        doesNotAcceptNull($x);
    }
}
Loading history...
210
    }
211
212
    /**
213
     * @param Integration $integration
214
     * @param string $connector
215
     * @return \DateTime
216
     */
217
    protected function getInitialConnectorSyncedTo(Integration $integration, $connector)
218
    {
219
        $latestSyncedTo = $this->getSyncedTo($integration, $connector);
220
        if ($latestSyncedTo === false) {
221
            return clone $this->getInitialSyncStartDate($integration);
222
        }
223
224
        return clone $latestSyncedTo;
225
    }
226
227
    /**
228
     * @return \DateInterval
229
     */
230
    protected function getSyncInterval()
231
    {
232
        if (empty($this->bundleConfiguration['sync_settings']['initial_import_step_interval'])) {
233
            throw new \InvalidArgumentException('Option "initial_import_step_interval" is missing');
234
        }
235
236
        $syncInterval = $this->bundleConfiguration['sync_settings']['initial_import_step_interval'];
237
        $interval = \DateInterval::createFromDateString($syncInterval);
238
239
        return $interval;
240
    }
241
242
    /**
243
     * @param Integration $integration
244
     * @param array $parameters
245
     * @param array $postProcessConnectors
246
     * @param \DateTime $startSyncDate
247
     * @return bool
248
     */
249
    protected function executePostProcessConnectors(
250
        Integration $integration,
251
        array $parameters,
252
        array $postProcessConnectors,
253
        \DateTime $startSyncDate
254
    ) {
255
        $isSuccess = true;
256
        foreach ($postProcessConnectors as $connectorType) {
257
            // Do not sync already synced connectors
258
            if ($this->getLastStatusForConnector($integration, $connectorType, Status::STATUS_COMPLETED)) {
259
                continue;
260
            }
261
262
            $processor = $this->postProcessors[$connectorType];
263
            $isSuccess = $isSuccess && $processor->process($integration, $connectorType, $parameters);
264
            if ($isSuccess) {
265
                $this->updateSyncedTo($integration, $connectorType, $startSyncDate);
266
            }
267
        }
268
269
        return $isSuccess;
270
    }
271
}
272