Completed
Push — 1.9 ( 5c3e2e...1529fd )
by
unknown
60:20
created

InitialSyncCommand::execute()   C

Complexity

Conditions 7
Paths 13

Size

Total Lines 50
Code Lines 31

Duplication

Lines 0
Ratio 0 %

Importance

Changes 3
Bugs 0 Features 1
Metric Value
c 3
b 0
f 1
dl 0
loc 50
rs 6.7272
cc 7
eloc 31
nc 13
nop 2
1
<?php
2
3
namespace OroCRM\Bundle\MagentoBundle\Command;
4
5
use Doctrine\ORM\EntityManager;
6
7
use Psr\Log\LoggerInterface;
8
9
use Symfony\Bundle\FrameworkBundle\Command\ContainerAwareCommand;
10
use Symfony\Component\Console\Input\InputInterface;
11
use Symfony\Component\Console\Input\InputOption;
12
use Symfony\Component\Console\Output\OutputInterface;
13
14
use JMS\JobQueueBundle\Entity\Job;
15
16
use Oro\Bundle\IntegrationBundle\Entity\Channel as Integration;
17
use Oro\Bundle\IntegrationBundle\Entity\Repository\ChannelRepository;
18
use Oro\Bundle\SearchBundle\Command\ReindexCommand;
19
use Oro\Component\Log\OutputLogger;
20
21
use OroCRM\Bundle\AnalyticsBundle\Model\RFMMetricStateManager;
22
use OroCRM\Bundle\ChannelBundle\Entity\Channel;
23
use OroCRM\Bundle\MagentoBundle\Provider\InitialSyncProcessor;
24
25
class InitialSyncCommand extends ContainerAwareCommand
26
{
27
    const COMMAND_NAME = 'oro:magento:initial:sync';
28
29
    const SYNC_PROCESSOR = 'orocrm_magento.provider.initial_sync_processor';
30
31
    const STATUS_SUCCESS = 0;
32
    const STATUS_FAILED  = 255;
33
34
    /**
35
     * List of listeners what will be disabled during sync
36
     */
37
    protected $disabledOptionalListeners = [
38
        'oro_search.index_listener',
39
        'oro_entity.event_listener.entity_modify_created_updated_properties_listener'
40
    ];
41
42
    /**
43
     * List of entities we need to reindex after sync
44
     */
45
    protected $indexedEntities = [
46
        'OroCRM\Bundle\MagentoBundle\Entity\Order',
47
        'OroCRM\Bundle\MagentoBundle\Entity\Cart',
48
        'OroCRM\Bundle\MagentoBundle\Entity\Customer'
49
    ];
50
51
    /**
52
     * {@inheritdoc}
53
     */
54
    public function configure()
55
    {
56
        $this
57
            ->setName(self::COMMAND_NAME)
58
            ->addOption(
59
                'integration-id',
60
                'i',
61
                InputOption::VALUE_REQUIRED,
62
                'Sync will be performed for given integration id'
63
            )
64
            ->addOption(
65
                'skip-dictionary',
66
                null,
67
                InputOption::VALUE_NONE,
68
                'Skip dictionaries synchronization'
69
            )
70
            ->addOption(
71
                'connector',
72
                'con',
73
                InputOption::VALUE_OPTIONAL,
74
                'If option exists sync will be performed for given connector name'
75
            )
76
            ->setDescription('Run initial synchronization for magento channel.');
77
    }
78
79
    /**
80
     * {@inheritdoc}
81
     */
82
    public function execute(InputInterface $input, OutputInterface $output)
83
    {
84
        // Disable search listeners to increase the performance
85
        $this->disableOptionalListeners();
86
87
        $skipDictionary = (bool)$input->getOption('skip-dictionary');
88
        $integrationId = $input->getOption('integration-id');
89
        $logger = $this->getLogger($output);
90
        $this->getContainer()->get('oro_integration.logger.strategy')->setLogger($logger);
91
        $this->initEntityManager();
92
93
        if ($this->isJobRunning($integrationId)) {
94
            $logger->warning('Job already running. Terminating....');
95
96
            return self::STATUS_SUCCESS;
97
        }
98
99
        $integration = $this->getIntegrationChannelRepository()->getOrLoadById($integrationId);
100
        if (!$integration) {
101
            $logger->critical(sprintf('Integration with given ID "%d" not found', $integrationId));
102
103
            return self::STATUS_FAILED;
104
        } elseif (!$integration->isEnabled()) {
105
            $logger->warning('Integration is disabled. Terminating....');
106
107
            return self::STATUS_SUCCESS;
108
        }
109
110
        $this->scheduleAnalyticRecalculation($integration);
111
112
        $processor = $this->getSyncProcessor($logger);
113
        try {
114
            $logger->info(sprintf('Run initial sync for "%s" integration.', $integration->getName()));
115
116
            $connector = $input->getOption('connector');
117
            $result = $processor->process($integration, $connector, ['skip-dictionary' => $skipDictionary]);
118
            $exitCode = $result ? self::STATUS_SUCCESS : self::STATUS_FAILED;
119
        } catch (\Exception $e) {
120
            $logger->critical($e->getMessage(), ['exception' => $e]);
121
            $exitCode = self::STATUS_FAILED;
122
        }
123
124
        if ($exitCode === self::STATUS_SUCCESS) {
125
            $this->runReindex();
126
        }
127
128
        $logger->notice('Completed');
129
130
        return $exitCode;
131
    }
132
133
    /**
134
     * @param OutputInterface $output
135
     * @return OutputLogger
136
     */
137
    protected function getLogger(OutputInterface $output)
138
    {
139
        if ($output->getVerbosity() < OutputInterface::VERBOSITY_VERBOSE) {
140
            $output->setVerbosity(OutputInterface::VERBOSITY_VERBOSE);
141
        }
142
143
        return new OutputLogger($output);
144
    }
145
146
    /**
147
     * Check is job running (from previous schedule)
148
     *
149
     * @param null|int $integrationId
150
     *
151
     * @return bool
152
     */
153
    protected function isJobRunning($integrationId)
154
    {
155
        $running = $this->getIntegrationChannelRepository()
156
            ->getRunningSyncJobsCount($this->getName(), $integrationId);
157
158
        return $running > 1;
159
    }
160
161
    protected function initEntityManager()
162
    {
163
        $this->getEntityManager()->getConnection()->getConfiguration()->setSQLLogger(null);
164
    }
165
166
    /**
167
     * @return EntityManager
168
     */
169
    protected function getEntityManager()
170
    {
171
        return $this->getService('doctrine')->getManager();
172
    }
173
174
    /**
175
     * @param LoggerInterface $logger
176
     * @return InitialSyncProcessor
177
     */
178
    protected function getSyncProcessor($logger)
179
    {
180
        $processor = $this->getService(self::SYNC_PROCESSOR);
181
        $processor->getLoggerStrategy()->setLogger($logger);
182
183
        return $processor;
184
    }
185
186
    /**
187
     * @return ChannelRepository
188
     */
189
    protected function getIntegrationChannelRepository()
190
    {
191
        return $this->getContainer()->get('doctrine')->getRepository('OroIntegrationBundle:Channel');
192
    }
193
194
    /**
195
     * Get service from DI container by id
196
     *
197
     * @param string $id
198
     *
199
     * @return object
200
     */
201
    protected function getService($id)
202
    {
203
        return $this->getContainer()->get($id);
204
    }
205
206
    /**
207
     * @param Integration $integration
208
     */
209
    protected function scheduleAnalyticRecalculation(Integration $integration)
210
    {
211
        $dataChannel = $this->getDataChannelByChannel($integration);
212
        /** @var RFMMetricStateManager $rfmStateManager */
213
        $rfmStateManager = $this->getService('orocrm_analytics.model.rfm_state_manager');
214
        $rfmStateManager->scheduleRecalculation($dataChannel);
215
    }
216
217
    /**
218
     * @param Integration $integration
219
     * @return Channel
220
     */
221
    protected function getDataChannelByChannel(Integration $integration)
222
    {
223
        return $this->getContainer()->get('doctrine')
224
            ->getRepository('OroCRMChannelBundle:Channel')
225
            ->findOneBy(['dataSource' => $integration]);
226
    }
227
228
    /**
229
     * Turn off listeners to increase the performance
230
     */
231
    protected function disableOptionalListeners()
232
    {
233
        $listenerManager = $this->getContainer()->get('oro_platform.optional_listeners.manager');
234
        $knownListeners  = $listenerManager->getListeners();
235
        foreach ($this->disabledOptionalListeners as $listenerId) {
236
            if (in_array($listenerId, $knownListeners, true)) {
237
                $listenerManager->disableListener($listenerId);
238
            }
239
        }
240
    }
241
242
    /**
243
     * Add jobs to reindex magento entities
244
     */
245
    protected function runReindex()
246
    {
247
        /** @var EntityManager $em */
248
        $em  = $this->getContainer()->get('doctrine')->getManagerForClass('JMSJobQueueBundle:Job');
249
        $jobs = [];
250
        foreach ($this->indexedEntities as $entityClass) {
251
            $job = new Job(ReindexCommand::COMMAND_NAME, ['class' => $entityClass]);
252
            $em->persist($job);
253
            $jobs[] = $job;
254
        }
255
        $em->flush($jobs);
256
    }
257
}
258