Issues (3627)

app/bundles/LeadBundle/Model/ImportModel.php (1 issue)

1
<?php
2
3
/*
4
 * @copyright   2014 Mautic Contributors. All rights reserved
5
 * @author      Mautic
6
 *
7
 * @link        http://mautic.org
8
 *
9
 * @license     GNU/GPLv3 http://www.gnu.org/licenses/gpl-3.0.html
10
 */
11
12
namespace Mautic\LeadBundle\Model;
13
14
use Doctrine\ORM\ORMException;
15
use Mautic\CoreBundle\Helper\Chart\ChartQuery;
16
use Mautic\CoreBundle\Helper\Chart\LineChart;
17
use Mautic\CoreBundle\Helper\CoreParametersHelper;
18
use Mautic\CoreBundle\Helper\DateTimeHelper;
19
use Mautic\CoreBundle\Helper\InputHelper;
20
use Mautic\CoreBundle\Helper\PathsHelper;
21
use Mautic\CoreBundle\Model\FormModel;
22
use Mautic\CoreBundle\Model\NotificationModel;
23
use Mautic\LeadBundle\Entity\Company;
24
use Mautic\LeadBundle\Entity\Import;
25
use Mautic\LeadBundle\Entity\ImportRepository;
26
use Mautic\LeadBundle\Entity\Lead;
27
use Mautic\LeadBundle\Entity\LeadEventLog;
28
use Mautic\LeadBundle\Entity\LeadEventLogRepository;
29
use Mautic\LeadBundle\Event\ImportEvent;
30
use Mautic\LeadBundle\Exception\ImportDelayedException;
31
use Mautic\LeadBundle\Exception\ImportFailedException;
32
use Mautic\LeadBundle\Helper\Progress;
33
use Mautic\LeadBundle\LeadEvents;
34
use Symfony\Component\EventDispatcher\Event;
35
use Symfony\Component\HttpKernel\Exception\MethodNotAllowedHttpException;
36
37
/**
38
 * Class ImportModel.
39
 */
40
class ImportModel extends FormModel
41
{
42
    /**
43
     * @var PathsHelper
44
     */
45
    protected $pathsHelper;
46
47
    /**
48
     * @var LeadModel
49
     */
50
    protected $leadModel;
51
52
    /**
53
     * @var CompanyModel
54
     */
55
    protected $companyModel;
56
57
    /**
58
     * @var NotificationModel
59
     */
60
    protected $notificationModel;
61
62
    /**
63
     * @var CoreParametersHelper
64
     */
65
    protected $config;
66
67
    /**
68
     * @var LeadEventLogRepository
69
     */
70
    protected $leadEventLogRepo;
71
72
    /**
73
     * ImportModel constructor.
74
     */
75
    public function __construct(
76
        PathsHelper $pathsHelper,
77
        LeadModel $leadModel,
78
        NotificationModel $notificationModel,
79
        CoreParametersHelper $config,
80
        CompanyModel $companyModel
81
    ) {
82
        $this->pathsHelper       = $pathsHelper;
83
        $this->leadModel         = $leadModel;
84
        $this->notificationModel = $notificationModel;
85
        $this->config            = $config;
86
        $this->leadEventLogRepo  = $leadModel->getEventLogRepository();
87
        $this->companyModel      = $companyModel;
88
    }
89
90
    /**
91
     * Returns the Import entity which should be processed next.
92
     *
93
     * @return Import|null
94
     */
95
    public function getImportToProcess()
96
    {
97
        $result = $this->getRepository()->getImportsWithStatuses([Import::QUEUED, Import::DELAYED], 1);
98
99
        if (isset($result[0]) && $result[0] instanceof Import) {
100
            return $result[0];
101
        }
102
103
        return null;
104
    }
105
106
    /**
107
     * Compares current number of imports in progress with the limit from the configuration.
108
     *
109
     * @return bool
110
     */
111
    public function checkParallelImportLimit()
112
    {
113
        $parallelImportLimit = $this->getParallelImportLimit();
114
        $importsInProgress   = $this->getRepository()->countImportsInProgress();
115
116
        return !($importsInProgress >= $parallelImportLimit);
117
    }
118
119
    /**
120
     * Returns parallel import limit from the configuration.
121
     *
122
     * @param int $default
123
     *
124
     * @return int
125
     */
126
    public function getParallelImportLimit($default = 1)
127
    {
128
        return $this->config->get('parallel_import_limit', $default);
129
    }
130
131
    /**
132
     * Generates a HTML link to the import detail.
133
     *
134
     * @return string
135
     */
136
    public function generateLink(Import $import)
137
    {
138
        return '<a href="'.$this->router->generate(
139
            'mautic_import_action',
140
            ['objectAction' => 'view', 'object' => 'lead', 'objectId' => $import->getId()]
141
        ).'" data-toggle="ajax">'.$import->getOriginalFile().' ('.$import->getId().')</a>';
142
    }
143
144
    /**
145
     * Check if there are some IN_PROGRESS imports which got stuck for a while.
146
     * Set those as failed.
147
     */
148
    public function setGhostImportsAsFailed()
149
    {
150
        $ghostDelay = 2;
151
        $imports    = $this->getRepository()->getGhostImports($ghostDelay, 5);
152
153
        if (empty($imports)) {
154
            return null;
155
        }
156
157
        foreach ($imports as $import) {
158
            $import->setStatus($import::FAILED)
159
                ->setStatusInfo($this->translator->trans('mautic.lead.import.ghost.limit.hit', ['%limit%' => $ghostDelay]))
160
                ->removeFile();
161
162
            if ($import->getCreatedBy()) {
163
                $this->notificationModel->addNotification(
164
                    $this->translator->trans(
165
                        'mautic.lead.import.result.info',
166
                        ['%import%' => $this->generateLink($import)]
167
                    ),
168
                    'info',
169
                    false,
170
                    $this->translator->trans('mautic.lead.import.failed'),
171
                    'fa-download',
172
                    null,
173
                    $this->em->getReference('MauticUserBundle:User', $import->getCreatedBy())
174
                );
175
            }
176
        }
177
178
        $this->saveEntities($imports);
179
    }
180
181
    /**
182
     * Start import. This is meant for the CLI command since it will import
183
     * the whole file at once.
184
     *
185
     * @param int $limit Number of records to import before delaying the import. 0 will import all
186
     *
187
     * @throws ImportFailedException
188
     * @throws ImportDelayedException
189
     */
190
    public function beginImport(Import $import, Progress $progress, $limit = 0)
191
    {
192
        $this->setGhostImportsAsFailed();
193
194
        if (!$import) {
195
            $msg = 'import is empty, closing the import process';
196
            $this->logDebug($msg, $import);
197
            throw new ImportFailedException($msg);
198
        }
199
200
        if (!$import->canProceed()) {
201
            $this->saveEntity($import);
202
            $msg = 'import cannot be processed because '.$import->getStatusInfo();
203
            $this->logDebug($msg, $import);
204
            throw new ImportFailedException($msg);
205
        }
206
207
        if (!$this->checkParallelImportLimit()) {
208
            $info = $this->translator->trans(
209
                'mautic.lead.import.parallel.limit.hit',
210
                ['%limit%' => $this->getParallelImportLimit()]
211
            );
212
            $import->setStatus($import::DELAYED)->setStatusInfo($info);
213
            $this->saveEntity($import);
214
            $msg = 'import is delayed because parrallel limit was hit. '.$import->getStatusInfo();
215
            $this->logDebug($msg, $import);
216
            throw new ImportDelayedException($msg);
217
        }
218
219
        $processed = $import->getProcessedRows();
220
        $total     = $import->getLineCount();
221
        $pending   = $total - $processed;
222
223
        if ($limit && $limit < $pending) {
224
            $processed = 0;
225
            $total     = $limit;
226
        }
227
228
        $progress->setTotal($total);
229
        $progress->setDone($processed);
230
231
        $import->start();
232
233
        // Save the start changes so the user could see it
234
        $this->saveEntity($import);
235
        $this->logDebug('The background import is about to start', $import);
236
237
        try {
238
            if (!$this->process($import, $progress, $limit)) {
239
                throw new ImportFailedException($import->getStatusInfo());
240
            }
241
        } catch (ORMException $e) {
242
            // The EntityManager is probably closed. The entity cannot be saved.
243
            $info = $this->translator->trans(
244
                'mautic.lead.import.database.exception',
245
                ['%message%' => $e->getMessage()]
246
            );
247
248
            $import->setStatus($import::DELAYED)->setStatusInfo($info);
249
250
            throw new ImportFailedException('Database had been overloaded');
251
        }
252
253
        $import->end();
254
        $this->logDebug('The background import has ended', $import);
255
256
        // Save the end changes so the user could see it
257
        $this->saveEntity($import);
258
259
        if ($import->getCreatedBy()) {
260
            $this->notificationModel->addNotification(
261
                $this->translator->trans(
262
                    'mautic.lead.import.result.info',
263
                    ['%import%' => $this->generateLink($import)]
264
                ),
265
                'info',
266
                false,
267
                $this->translator->trans('mautic.lead.import.completed'),
268
                'fa-download',
269
                null,
270
                $this->em->getReference('MauticUserBundle:User', $import->getCreatedBy())
271
            );
272
        }
273
    }
274
275
    /**
276
     * Import the CSV file from configuration in the $import entity.
277
     *
278
     * @param int $limit Number of records to import before delaying the import
279
     *
280
     * @return bool
281
     */
282
    public function process(Import $import, Progress $progress, $limit = 0)
283
    {
284
        //Auto detect line endings for the file to work around MS DOS vs Unix new line characters
285
        ini_set('auto_detect_line_endings', true);
286
287
        try {
288
            $file = new \SplFileObject($import->getFilePath());
289
        } catch (\Exception $e) {
290
            $import->setStatusInfo('SplFileObject cannot read the file. '.$e->getMessage());
291
            $import->setStatus(Import::FAILED);
292
            $this->logDebug('import cannot be processed because '.$import->getStatusInfo(), $import);
293
294
            return false;
295
        }
296
297
        $lastImportedLine = $import->getLastLineImported();
298
        $headers          = $import->getHeaders();
299
        $headerCount      = count($headers);
300
        $config           = $import->getParserConfig();
301
        $counter          = 0;
302
303
        if ($lastImportedLine > 0) {
304
            // Seek is zero-based line numbering and
305
            $file->seek($lastImportedLine - 1);
306
        }
307
308
        $lineNumber = $lastImportedLine + 1;
309
        $this->logDebug('The import is starting on line '.$lineNumber, $import);
310
311
        $batchSize = $config['batchlimit'];
312
313
        // Convert to field names
314
        array_walk($headers, function (&$val) {
315
            $val = strtolower(InputHelper::alphanum($val, false, '_'));
316
        });
317
318
        while ($batchSize && !$file->eof()) {
319
            $data = $file->fgetcsv($config['delimiter'], $config['enclosure'], $config['escape']);
320
            $import->setLastLineImported($lineNumber);
321
322
            // Ignore the header row
323
            if (1 === $lineNumber) {
324
                ++$lineNumber;
325
                continue;
326
            }
327
328
            // Ensure the progress is changing
329
            ++$lineNumber;
330
            --$batchSize;
331
            $progress->increase();
332
333
            $errorMessage = null;
334
            $eventLog     = $this->initEventLog($import, $lineNumber);
335
336
            if ($this->isEmptyCsvRow($data)) {
337
                $errorMessage = 'mautic.lead.import.error.line_empty';
338
            }
339
340
            if ($this->hasMoreValuesThanColumns($data, $headerCount)) {
341
                $errorMessage = 'mautic.lead.import.error.header_mismatch';
342
            }
343
344
            if (!$errorMessage) {
345
                $data = $this->trimArrayValues($data);
346
                if (!array_filter($data)) {
347
                    continue;
348
                }
349
350
                $data = array_combine($headers, $data);
351
352
                try {
353
                    $entityModel = 'company' === $import->getObject() ? $this->companyModel : $this->leadModel;
354
355
                    $merged = $entityModel->import(
356
                        $import->getMatchedFields(),
357
                        $data,
358
                        $import->getDefault('owner'),
359
                        $import->getDefault('list'),
360
                        $import->getDefault('tags'),
361
                        true,
362
                        $eventLog,
363
                        $import->getId()
0 ignored issues
show
The call to Mautic\LeadBundle\Model\CompanyModel::import() has too many arguments starting with $import->getId(). ( Ignorable by Annotation )

If this is a false-positive, you can also ignore this issue in your code via the ignore-call  annotation

363
                    /** @scrutinizer ignore-call */ 
364
                    $merged = $entityModel->import(

This check compares calls to functions or methods with their respective definitions. If the call has more arguments than are defined, it raises an issue.

If a function is defined several times with a different number of parameters, the check may pick up the wrong definition and report false positives. One codebase where this has been known to happen is Wordpress. Please note the @ignore annotation hint above.

Loading history...
364
                    );
365
366
                    if ($merged) {
367
                        $this->logDebug('Entity on line '.$lineNumber.' has been updated', $import);
368
                        $import->increaseUpdatedCount();
369
                    } else {
370
                        $this->logDebug('Entity on line '.$lineNumber.' has been created', $import);
371
                        $import->increaseInsertedCount();
372
                    }
373
                } catch (\Exception $e) {
374
                    // Email validation likely failed
375
                    $errorMessage = $e->getMessage();
376
                }
377
            }
378
379
            if ($errorMessage) {
380
                $import->increaseIgnoredCount();
381
                $this->logImportRowError($eventLog, $errorMessage);
382
                $this->logDebug('Line '.$lineNumber.' error: '.$errorMessage, $import);
383
            } else {
384
                $this->leadEventLogRepo->saveEntity($eventLog);
385
            }
386
387
            // Release entities in Doctrine's memory to prevent memory leak
388
            $this->em->detach($eventLog);
389
            $eventLog = null;
390
            $data     = null;
391
            $this->em->clear(Lead::class);
392
            $this->em->clear(Company::class);
393
394
            // Save Import entity once per batch so the user could see the progress
395
            if (0 === $batchSize && $import->isBackgroundProcess()) {
396
                $isPublished = $this->getRepository()->getValue($import->getId(), 'is_published');
397
398
                if (!$isPublished) {
399
                    $import->setStatus($import::STOPPED);
400
                }
401
402
                $this->saveEntity($import);
403
                $this->dispatchEvent('batch_processed', $import);
404
405
                // Stop the import loop if the import got unpublished
406
                if (!$isPublished) {
407
                    $this->logDebug('The import has been unpublished. Stopping the import now.', $import);
408
                    break;
409
                }
410
411
                $batchSize = $config['batchlimit'];
412
            }
413
414
            ++$counter;
415
            if ($limit && $counter >= $limit) {
416
                $import->setStatus($import::DELAYED);
417
                $this->saveEntity($import);
418
                break;
419
            }
420
        }
421
422
        // Close the file
423
        $file = null;
424
425
        return true;
426
    }
427
428
    /**
429
     * Check if the CSV row has more values than the CSV header has columns.
430
     * If it is less, generate empty values for the rest of the missing values.
431
     * If it is more, return true.
432
     *
433
     * @param array &$data
434
     * @param int   $headerCount
435
     *
436
     * @return bool
437
     */
438
    public function hasMoreValuesThanColumns(array &$data, $headerCount)
439
    {
440
        $dataCount = count($data);
441
442
        if ($headerCount !== $dataCount) {
443
            $diffCount = ($headerCount - $dataCount);
444
445
            if ($diffCount > 0) {
446
                // Fill in the data with empty string
447
                $fill = array_fill($dataCount, $diffCount, '');
448
                $data = $data + $fill;
449
            } else {
450
                return true;
451
            }
452
        }
453
454
        return false;
455
    }
456
457
    /**
458
     * Trim all values in a one dymensional array.
459
     *
460
     * @return array
461
     */
462
    public function trimArrayValues(array $data)
463
    {
464
        return array_map('trim', $data);
465
    }
466
467
    /**
468
     * Decide whether the CSV row is empty.
469
     *
470
     * @param mixed $row
471
     *
472
     * @return bool
473
     */
474
    public function isEmptyCsvRow($row)
475
    {
476
        if (!is_array($row) || empty($row)) {
477
            return true;
478
        }
479
480
        if (1 === count($row) && ('' === $row[0] || null === $row[0])) {
481
            return true;
482
        }
483
484
        return !array_filter($row);
485
    }
486
487
    /**
488
     * Save log about errored line.
489
     *
490
     * @param string $errorMessage
491
     */
492
    public function logImportRowError(LeadEventLog $eventLog, $errorMessage)
493
    {
494
        $eventLog->addProperty('error', $this->translator->trans($errorMessage))
495
            ->setAction('failed');
496
497
        $this->leadEventLogRepo->saveEntity($eventLog);
498
    }
499
500
    /**
501
     * Initialize LeadEventLog object and configure it as the import event.
502
     *
503
     * @param int $lineNumber
504
     *
505
     * @return LeadEventLog
506
     */
507
    public function initEventLog(Import $import, $lineNumber)
508
    {
509
        $eventLog = new LeadEventLog();
510
        $eventLog->setUserId($import->getCreatedBy())
511
            ->setUserName($import->getCreatedByUser())
512
            ->setBundle('lead')
513
            ->setObject('import')
514
            ->setObjectId($import->getId())
515
            ->setProperties(
516
                [
517
                    'line' => $lineNumber,
518
                    'file' => $import->getOriginalFile(),
519
                ]
520
            );
521
522
        return $eventLog;
523
    }
524
525
    /**
526
     * Get line chart data of imported rows.
527
     *
528
     * @param string $unit       {@link php.net/manual/en/function.date.php#refsect1-function.date-parameters}
529
     * @param string $dateFormat
530
     * @param array  $filter
531
     *
532
     * @return array
533
     */
534
    public function getImportedRowsLineChartData($unit, \DateTime $dateFrom, \DateTime $dateTo, $dateFormat = null, $filter = [])
535
    {
536
        $filter['object'] = 'import';
537
        $filter['bundle'] = 'lead';
538
539
        // Clear the times for display by minutes
540
        $dateFrom->modify('-1 minute');
541
        $dateFrom->setTime($dateFrom->format('H'), $dateFrom->format('i'), 0);
542
        $dateTo->modify('+1 minute');
543
        $dateTo->setTime($dateTo->format('H'), $dateTo->format('i'), 0);
544
545
        $query = new ChartQuery($this->em->getConnection(), $dateFrom, $dateTo, $unit);
546
        $chart = new LineChart($unit, $dateFrom, $dateTo, $dateFormat);
547
        $data  = $query->fetchTimeData('lead_event_log', 'date_added', $filter);
548
549
        $chart->setDataset($this->translator->trans('mautic.lead.import.processed.rows'), $data);
550
551
        return $chart->render();
552
    }
553
554
    /**
555
     * Returns a list of failed rows for the import.
556
     *
557
     * @param int $importId
558
     *
559
     * @return array|null
560
     */
561
    public function getFailedRows($importId = null)
562
    {
563
        if (!$importId) {
564
            return null;
565
        }
566
567
        return $this->getEventLogRepository()->getFailedRows($importId, ['select' => 'properties,id']);
568
    }
569
570
    /**
571
     * @return ImportRepository
572
     */
573
    public function getRepository()
574
    {
575
        return $this->em->getRepository('MauticLeadBundle:Import');
576
    }
577
578
    /**
579
     * @return LeadEventLogRepository
580
     */
581
    public function getEventLogRepository()
582
    {
583
        return $this->em->getRepository('MauticLeadBundle:LeadEventLog');
584
    }
585
586
    /**
587
     * {@inheritdoc}
588
     *
589
     * @return string
590
     */
591
    public function getPermissionBase()
592
    {
593
        return 'lead:imports';
594
    }
595
596
    /**
597
     * Returns a unique name of a CSV file based on time.
598
     *
599
     * @return string
600
     */
601
    public function getUniqueFileName()
602
    {
603
        return (new DateTimeHelper())->toUtcString('YmdHis').'.csv';
604
    }
605
606
    /**
607
     * Returns a full path to the import dir.
608
     *
609
     * @return string
610
     */
611
    public function getImportDir()
612
    {
613
        $tmpDir = $this->pathsHelper->getSystemPath('tmp', true);
614
615
        return $tmpDir.'/imports';
616
    }
617
618
    /**
619
     * Get a specific entity or generate a new one if id is empty.
620
     *
621
     * @param $id
622
     *
623
     * @return object|null
624
     */
625
    public function getEntity($id = null)
626
    {
627
        if (null === $id) {
628
            return new Import();
629
        }
630
631
        return parent::getEntity($id);
632
    }
633
634
    /**
635
     * {@inheritdoc}
636
     *
637
     * @param $action
638
     * @param $event
639
     * @param $entity
640
     * @param $isNew
641
     *
642
     * @throws \Symfony\Component\HttpKernel\Exception\MethodNotAllowedHttpException
643
     */
644
    protected function dispatchEvent($action, &$entity, $isNew = false, Event $event = null)
645
    {
646
        if (!$entity instanceof Import) {
647
            throw new MethodNotAllowedHttpException(['Import']);
648
        }
649
650
        switch ($action) {
651
            case 'pre_save':
652
                $name = LeadEvents::IMPORT_PRE_SAVE;
653
                break;
654
            case 'post_save':
655
                $name = LeadEvents::IMPORT_POST_SAVE;
656
                break;
657
            case 'pre_delete':
658
                $name = LeadEvents::IMPORT_PRE_DELETE;
659
                break;
660
            case 'post_delete':
661
                $name = LeadEvents::IMPORT_POST_DELETE;
662
                break;
663
            case 'batch_processed':
664
                $name = LeadEvents::IMPORT_BATCH_PROCESSED;
665
                break;
666
            default:
667
                return null;
668
        }
669
670
        if ($this->dispatcher->hasListeners($name)) {
671
            if (empty($event)) {
672
                $event = new ImportEvent($entity, $isNew);
673
                $event->setEntityManager($this->em);
674
            }
675
676
            $this->dispatcher->dispatch($name, $event);
677
678
            return $event;
679
        } else {
680
            return null;
681
        }
682
    }
683
684
    /**
685
     * Logs a debug message if in dev environment.
686
     *
687
     * @param string $msg
688
     * @param Import $import
689
     */
690
    protected function logDebug($msg, Import $import = null)
691
    {
692
        if (MAUTIC_ENV === 'dev') {
693
            $importId = $import ? '('.$import->getId().')' : '';
694
            $this->logger->debug(sprintf('IMPORT%s: %s', $importId, $msg));
695
        }
696
    }
697
}
698