Completed
Push — master ( f6132c...345759 )
by
unknown
13:48
created

InitialSyncCommand::getSearchIndexer()   A

Complexity

Conditions 1
Paths 1

Size

Total Lines 4
Code Lines 2

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
dl 0
loc 4
rs 10
c 0
b 0
f 0
cc 1
eloc 2
nc 1
nop 0
1
<?php
2
3
namespace OroCRM\Bundle\MagentoBundle\Command;
4
5
use Doctrine\ORM\EntityManager;
6
7
use Psr\Log\LoggerInterface;
8
9
use Symfony\Bundle\FrameworkBundle\Command\ContainerAwareCommand;
10
use Symfony\Component\Console\Input\InputInterface;
11
use Symfony\Component\Console\Input\InputOption;
12
use Symfony\Component\Console\Output\OutputInterface;
13
14
use Oro\Bundle\IntegrationBundle\Entity\Channel as Integration;
15
use Oro\Bundle\IntegrationBundle\Entity\Repository\ChannelRepository;
16
use Oro\Component\Log\OutputLogger;
17
18
use OroCRM\Bundle\AnalyticsBundle\Model\RFMMetricStateManager;
19
use OroCRM\Bundle\ChannelBundle\Entity\Channel;
20
use OroCRM\Bundle\MagentoBundle\Provider\InitialSyncProcessor;
21
22
class InitialSyncCommand extends ContainerAwareCommand
23
{
24
    const COMMAND_NAME = 'oro:magento:initial:sync';
25
26
    const SYNC_PROCESSOR = 'orocrm_magento.provider.initial_sync_processor';
27
28
    const STATUS_SUCCESS = 0;
29
    const STATUS_FAILED  = 255;
30
31
    /**
32
     * List of listeners what will be disabled during sync
33
     */
34
    protected $disabledOptionalListeners = [
35
        'oro_search.index_listener',
36
        'oro_entity.event_listener.entity_modify_created_updated_properties_listener'
37
    ];
38
39
    /**
40
     * List of entities we need to reindex after sync
41
     */
42
    protected $indexedEntities = [
43
        'OroCRM\Bundle\MagentoBundle\Entity\Order',
44
        'OroCRM\Bundle\MagentoBundle\Entity\Cart',
45
        'OroCRM\Bundle\MagentoBundle\Entity\Customer'
46
    ];
47
48
    /**
49
     * {@inheritdoc}
50
     */
51
    public function configure()
52
    {
53
        $this
54
            ->setName(self::COMMAND_NAME)
55
            ->addOption(
56
                'integration-id',
57
                'i',
58
                InputOption::VALUE_REQUIRED,
59
                'Sync will be performed for given integration id'
60
            )
61
            ->addOption(
62
                'skip-dictionary',
63
                null,
64
                InputOption::VALUE_NONE,
65
                'Skip dictionaries synchronization'
66
            )
67
            ->addOption(
68
                'connector',
69
                'con',
70
                InputOption::VALUE_OPTIONAL,
71
                'If option exists sync will be performed for given connector name'
72
            )
73
            ->setDescription('Run initial synchronization for magento channel.');
74
    }
75
76
    /**
77
     * {@inheritdoc}
78
     */
79
    public function execute(InputInterface $input, OutputInterface $output)
80
    {
81
        // Disable search listeners to increase the performance
82
        $this->disableOptionalListeners();
83
84
        $skipDictionary = (bool)$input->getOption('skip-dictionary');
85
        $integrationId = $input->getOption('integration-id');
86
        $logger = $this->getLogger($output);
87
        $this->getContainer()->get('oro_integration.logger.strategy')->setLogger($logger);
88
        $this->initEntityManager();
89
90
        if ($this->isJobRunning($integrationId)) {
91
            $logger->warning('Job already running. Terminating....');
92
93
            return self::STATUS_SUCCESS;
94
        }
95
96
        $integration = $this->getIntegrationChannelRepository()->getOrLoadById($integrationId);
97
        if (!$integration) {
98
            $logger->critical(sprintf('Integration with given ID "%d" not found', $integrationId));
99
100
            return self::STATUS_FAILED;
101
        } elseif (!$integration->isEnabled()) {
102
            $logger->warning('Integration is disabled. Terminating....');
103
104
            return self::STATUS_SUCCESS;
105
        }
106
107
        $this->scheduleAnalyticRecalculation($integration);
108
109
        $processor = $this->getSyncProcessor($logger);
110
        try {
111
            $logger->info(sprintf('Run initial sync for "%s" integration.', $integration->getName()));
112
113
            $connector = $input->getOption('connector');
114
            $result = $processor->process($integration, $connector, ['skip-dictionary' => $skipDictionary]);
115
            $exitCode = $result ? self::STATUS_SUCCESS : self::STATUS_FAILED;
116
        } catch (\Exception $e) {
117
            $logger->critical($e->getMessage(), ['exception' => $e]);
118
            $exitCode = self::STATUS_FAILED;
119
        }
120
121
        if ($exitCode === self::STATUS_SUCCESS) {
122
            $this->runReindex();
123
        }
124
125
        $logger->notice('Completed');
126
127
        return $exitCode;
128
    }
129
130
    /**
131
     * @param OutputInterface $output
132
     * @return OutputLogger
133
     */
134
    protected function getLogger(OutputInterface $output)
135
    {
136
        if ($output->getVerbosity() < OutputInterface::VERBOSITY_VERBOSE) {
137
            $output->setVerbosity(OutputInterface::VERBOSITY_VERBOSE);
138
        }
139
140
        return new OutputLogger($output);
141
    }
142
143
    /**
144
     * Check is job running (from previous schedule)
145
     *
146
     * @param null|int $integrationId
147
     *
148
     * @return bool
149
     */
150
    protected function isJobRunning($integrationId)
151
    {
152
        $running = $this->getIntegrationChannelRepository()
153
            ->getRunningSyncJobsCount($this->getName(), $integrationId);
154
155
        return $running > 1;
156
    }
157
158
    protected function initEntityManager()
159
    {
160
        $this->getEntityManager()->getConnection()->getConfiguration()->setSQLLogger(null);
161
    }
162
163
    /**
164
     * @return EntityManager
165
     */
166
    protected function getEntityManager()
167
    {
168
        return $this->getService('doctrine')->getManager();
169
    }
170
171
    /**
172
     * @param LoggerInterface $logger
173
     * @return InitialSyncProcessor
174
     */
175
    protected function getSyncProcessor($logger)
176
    {
177
        $processor = $this->getService(self::SYNC_PROCESSOR);
178
        $processor->getLoggerStrategy()->setLogger($logger);
179
180
        return $processor;
181
    }
182
183
    /**
184
     * @return ChannelRepository
185
     */
186
    protected function getIntegrationChannelRepository()
187
    {
188
        return $this->getContainer()->get('doctrine')->getRepository('OroIntegrationBundle:Channel');
189
    }
190
191
    /**
192
     * Get service from DI container by id
193
     *
194
     * @param string $id
195
     *
196
     * @return object
197
     */
198
    protected function getService($id)
199
    {
200
        return $this->getContainer()->get($id);
201
    }
202
203
    /**
204
     * @param Integration $integration
205
     */
206
    protected function scheduleAnalyticRecalculation(Integration $integration)
207
    {
208
        $dataChannel = $this->getDataChannelByChannel($integration);
209
        /** @var RFMMetricStateManager $rfmStateManager */
210
        $rfmStateManager = $this->getService('orocrm_analytics.model.rfm_state_manager');
211
        $rfmStateManager->scheduleRecalculation($dataChannel);
212
    }
213
214
    /**
215
     * @param Integration $integration
216
     * @return Channel
217
     */
218
    protected function getDataChannelByChannel(Integration $integration)
219
    {
220
        return $this->getContainer()->get('doctrine')
221
            ->getRepository('OroCRMChannelBundle:Channel')
222
            ->findOneBy(['dataSource' => $integration]);
223
    }
224
225
    /**
226
     * Turn off listeners to increase the performance
227
     */
228
    protected function disableOptionalListeners()
229
    {
230
        $listenerManager = $this->getContainer()->get('oro_platform.optional_listeners.manager');
231
        $knownListeners  = $listenerManager->getListeners();
232
        foreach ($this->disabledOptionalListeners as $listenerId) {
233
            if (in_array($listenerId, $knownListeners, true)) {
234
                $listenerManager->disableListener($listenerId);
235
            }
236
        }
237
    }
238
239
    /**
240
     * Updates the search index for magento entities
241
     */
242
    protected function runReindex()
243
    {
244
        $this->getSearchIndexer()->reindex($this->indexedEntities);
245
    }
246
247
    /**
248
     * @return \Oro\Bundle\SearchBundle\Async\Indexer
249
     */
250
    protected function getSearchIndexer()
251
    {
252
        return $this->getContainer()->get('oro_search.async.indexer');
253
    }
254
}
255