Completed
Push — master ( 0a650b...9b576e )
by Jeroen
9s
created

ImportFactory::getImportHandler()   A

Complexity

Conditions 1
Paths 1

Size

Total Lines 4
Code Lines 2

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 2
CRAP Score 1

Importance

Changes 1
Bugs 0 Features 0
Metric Value
c 1
b 0
f 0
dl 0
loc 4
ccs 2
cts 2
cp 1
rs 10
cc 1
eloc 2
nc 1
nop 1
crap 1
1
<?php
2
3
namespace TreeHouse\IoBundle\Import;
4
5
use Doctrine\Common\Persistence\ManagerRegistry;
6
use Symfony\Component\EventDispatcher\Event;
7
use Symfony\Component\EventDispatcher\EventDispatcher;
8
use Symfony\Component\EventDispatcher\EventDispatcherInterface;
9
use Symfony\Component\EventDispatcher\EventSubscriberInterface;
10
use TreeHouse\Feeder\Exception\ReadException;
11
use TreeHouse\Feeder\Feed;
12
use TreeHouse\Feeder\FeedEvents;
13
use TreeHouse\Feeder\Reader\ReaderInterface;
14
use TreeHouse\IoBundle\Entity\Feed as FeedEntity;
15
use TreeHouse\IoBundle\Entity\Import;
16
use TreeHouse\IoBundle\Entity\ImportPart;
17
use TreeHouse\IoBundle\Entity\ImportRepository;
18
use TreeHouse\IoBundle\Exception\UnfinishedImportException;
19
use TreeHouse\IoBundle\Import\Event\ExceptionEvent;
20
use TreeHouse\IoBundle\Import\Event\PartEvent;
21
use TreeHouse\IoBundle\Import\Feed\FeedBuilder;
22
use TreeHouse\IoBundle\Import\Feed\TransportFactory;
23
use TreeHouse\IoBundle\Import\Handler\HandlerInterface;
24
use TreeHouse\IoBundle\Import\Importer\Importer;
25
use TreeHouse\IoBundle\Import\Importer\ImporterBuilderFactory;
26
use TreeHouse\IoBundle\Import\Log\ItemLoggerInterface;
27
use TreeHouse\IoBundle\Import\Processor\ProcessorInterface;
28
use TreeHouse\IoBundle\Import\Reader\ReaderBuilderFactory;
29
use TreeHouse\IoBundle\Import\Reader\ReaderBuilderInterface;
30
31
class ImportFactory implements EventSubscriberInterface
32
{
33
    /**
34
     * @var ManagerRegistry
35
     */
36
    protected $doctrine;
37
38
    /**
39
     * @var ImportRegistry
40
     */
41
    protected $importRegistry;
42
43
    /**
44
     * @var ImporterBuilderFactory
45
     */
46
    protected $importerBuilderFactory;
47
48
    /**
49
     * @var ReaderBuilderFactory
50
     */
51
    protected $readerBuilderFactory;
52
53
    /**
54
     * @var ImportStorage
55
     */
56
    protected $importStorage;
57
58
    /**
59
     * @var EventDispatcherInterface
60
     */
61
    protected $eventDispatcher;
62
63
    /**
64
     * @param ManagerRegistry          $doctrine
65
     * @param ImportRegistry           $importRegistry
66
     * @param ImporterBuilderFactory   $importerBuilderFactory
67
     * @param ReaderBuilderFactory     $readerBuilderFactory
68
     * @param ImportStorage            $importStorage
69
     * @param EventDispatcherInterface $dispatcher
70
     */
71 4
    public function __construct(
72
        ManagerRegistry $doctrine,
73
        ImportRegistry $importRegistry,
74 2
        ImporterBuilderFactory $importerBuilderFactory,
75
        ReaderBuilderFactory $readerBuilderFactory,
76
        ImportStorage $importStorage,
77
        EventDispatcherInterface $dispatcher
78
    ) {
79 4
        $this->doctrine = $doctrine;
80 4
        $this->importRegistry = $importRegistry;
81 4
        $this->importerBuilderFactory = $importerBuilderFactory;
82 4
        $this->readerBuilderFactory = $readerBuilderFactory;
83 4
        $this->importStorage = $importStorage;
84 4
        $this->eventDispatcher = $dispatcher;
85 4
    }
86
87
    /**
88
     * @return EventDispatcherInterface
89
     */
90 4
    public function getEventDispatcher()
91
    {
92
        return $this->eventDispatcher;
93 4
    }
94
95
    /**
96
     * @inheritdoc
97
     */
98 4
    public static function getSubscribedEvents()
99
    {
100 4
        $events = [];
101
102 4
        foreach ([FeedEvents::class, ImportEvents::class] as $class) {
103 4
            $refl = new \ReflectionClass($class);
104 4
            foreach ($refl->getConstants() as $constant) {
105 4
                $events[$constant][] = 'relayEvent';
106 4
            }
107 4
        }
108
109 4
        $events[ImportEvents::EXCEPTION][] = 'onException';
110
111 4
        return $events;
112
    }
113
114
    /**
115
     * Relays an event to the main dispatcher in the manager.
116
     * This is done so listeners can subscribe to this class,
117
     * while each importer starts with a new dispatcher.
118
     
119
     * @param Event  $event
120
     * @param string $name
121
     */
122 4
    public function relayEvent(Event $event, $name)
123
    {
124 4
        $this->eventDispatcher->dispatch($name, $event);
125 4
    }
126
127
    /**
128
     * Handler for an exception event. Importer types can listen to the same
129
     * event and stop propagation if they want to change this behaviour.
130
     *
131
     * @param ExceptionEvent $event
132
     *
133
     * @throws \RuntimeException
134
     */
135
    public function onException(ExceptionEvent $event)
136
    {
137
        $exception = $event->getException();
138
        if ($exception instanceof ReadException) {
139
            $msg = sprintf('Error reading feed: %s', $exception->getMessage());
140
        } else {
141
            $msg = sprintf(
142
                'Import aborted with %s: "%s" Stack trace: %s',
143
                get_class($exception),
144
                $exception->getMessage(),
145
                $exception->getTraceAsString()
146
            );
147
        }
148
149
        throw new \RuntimeException($msg, 0, $exception);
150
    }
151
152
    /**
153
     * @param ItemLoggerInterface $logger
154
     */
155 4
    public function setItemLogger(ItemLoggerInterface $logger)
156
    {
157 4
        $this->eventDispatcher->addSubscriber($logger);
158 4
    }
159
160
    /**
161
     * Creates an import for a feed. If an import for this feed was created
162
     * before, but has not started yet, that import is returned. All other open
163
     * imports are closed first.
164
     *
165
     * @param FeedEntity $feed         The feed to create the import for
166
     * @param \DateTime  $scheduleDate The date this import should start
167
     * @param bool       $forced       Whether to handle items that would normally be skipped
168
     * @param bool       $partial      If left out, it will be determined based on feed
169
     *
170
     * @throws UnfinishedImportException When an existing (and running) import is found
171
     *
172
     * @return Import
173
     */
174 4
    public function createImport(FeedEntity $feed, \DateTime $scheduleDate = null, $forced = false, $partial = null)
175
    {
176 4
        if (is_null($scheduleDate)) {
177
            $scheduleDate = new \DateTime();
178
        }
179
180 4
        if (is_null($partial)) {
181 4
            $partial = $feed->isPartial();
182 4
        }
183
        // see if any imports are still unfinished
184 4
        $import = $this->findOrCreateImport($feed);
185
186
        // check if it's a new import
187 4
        if (!$import->getId()) {
188 4
            $import->setForced($forced);
189 4
            $import->setPartial($partial);
190 4
            $import->setDatetimeScheduled($scheduleDate);
191
192
            // save now: we want the import on record before starting it
193 4
            $this->getRepository()->save($import);
194
195
            // add parts
196 4
            $this->addImportParts($import);
197 4
        }
198
199 4
        return $import;
200
    }
201
202
    /**
203
     * @param ImportPart               $part
204
     * @param EventDispatcherInterface $dispatcher
205
     *
206
     * @return ImportJob
207
     */
208 4
    public function createImportJob(ImportPart $part, EventDispatcherInterface $dispatcher = null)
209
    {
210 4
        $import = $part->getImport();
211 4
        $feed = $import->getFeed();
212 4
        $dispatcher = $dispatcher ?: $this->createEventDispatcher();
213
214 4
        $importer = $this->createImporter($import, $dispatcher, $this->getDefaultImporterOptions($import));
215 4
        $reader = $this->createImportPartReader($part, $dispatcher, $this->getDefaultReaderOptions($import));
216 4
        $feed = $this->createFeed($feed, $reader, $dispatcher, $this->getDefaultFeedOptions($import));
0 ignored issues
show
Bug introduced by
It seems like $reader defined by $this->createImportPartR...ReaderOptions($import)) on line 215 can also be of type null; however, TreeHouse\IoBundle\Impor...rtFactory::createFeed() does only seem to accept object<TreeHouse\Feeder\Reader\ReaderInterface>, maybe add an additional type check?

If a method or function can return multiple different values and unless you are sure that you only can receive a single value in this context, we recommend to add an additional type check:

/**
 * @return array|string
 */
function returnsDifferentValues($x) {
    if ($x) {
        return 'foo';
    }

    return array();
}

$x = returnsDifferentValues($y);
if (is_array($x)) {
    // $x is an array.
}

If this a common case that PHP Analyzer should handle natively, please let us know by opening an issue.

Loading history...
217 4
        $processor = $this->getImportProcessor($import);
218
219 4
        return new ImportJob($part, $feed, $processor, $importer, $this->getRepository());
220
    }
221
222
    /**
223
     * @param Import                   $import
224
     * @param EventDispatcherInterface $dispatcher
225
     * @param array                    $options
226
     *
227
     * @return Importer
228
     */
229 4
    protected function createImporter(Import $import, EventDispatcherInterface $dispatcher, array $options = [])
230
    {
231 4
        $type = $this->importRegistry->getImporterType($import->getFeed()->getImporterType());
232 4
        $handler = $this->getImportHandler($import);
233 4
        $options = array_merge($options, $import->getFeed()->getImporterOptions());
234
235 4
        $builder = $this->importerBuilderFactory->create($dispatcher);
236
237 4
        return $builder->build($type, $import, $handler, $options);
238
    }
239
240
    /**
241
     * @param Import    $import
242
     * @param array     $transport
243
     * @param \DateTime $scheduleDate
244
     * @param int       $position
245
     *
246
     * @return ImportPart
247
     */
248 4
    protected function createImportPart(Import $import, array $transport, \DateTime $scheduleDate = null, $position = null)
249
    {
250 4
        if (is_null($scheduleDate)) {
251 4
            $scheduleDate = new \DateTime();
252 4
        }
253
254 4
        if (is_null($position)) {
255
            $positions = $import
256 4
                ->getParts()
257 4
                ->map(function (ImportPart $part) {
258
                    return $part->getPosition();
259 4
                })
260 4
                ->toArray()
261 4
            ;
262
263
            // add this to ensure we have at least 1 position
264 4
            $positions[] = 0;
265
266 4
            $position = max($positions) + 1;
267 4
        }
268
269 4
        $part = new ImportPart();
270 4
        $part->setPosition($position);
271 4
        $part->setTransportConfig($transport);
272 4
        $part->setDatetimeScheduled($scheduleDate);
273 4
        $part->setImport($import);
274 4
        $import->addPart($part);
275
276 4
        $this->getRepository()->savePart($part);
277
278 4
        return $part;
279
    }
280
281
    /**
282
     * @param Import                   $import
283
     * @param array                    $transport
284
     * @param string                   $resourceType
285
     * @param EventDispatcherInterface $dispatcher
286
     * @param array                    $options
287
     *
288
     * @return ReaderInterface
289
     */
290 4
    protected function createReader(Import $import, array $transport, $resourceType, EventDispatcherInterface $dispatcher, array $options = [])
291
    {
292 4
        $destinationDir = $this->importStorage->getImportDir($import);
293
294 4
        $feed = $import->getFeed();
295 4
        $type = $this->importRegistry->getReaderType($feed->getReaderType());
296 4
        $builder = $this->readerBuilderFactory->create($dispatcher, $destinationDir);
297 4
        $options = array_merge($options, $feed->getReaderOptions());
298
299 4
        return $builder->build($type, $transport, $resourceType, $options);
300
    }
301
302
    /**
303
     * @param Import                   $import
304
     * @param EventDispatcherInterface $dispatcher
305
     * @param array                    $options
306
     *
307
     * @return ReaderInterface
308
     */
309 4 View Code Duplication
    protected function createImportReader(Import $import, EventDispatcherInterface $dispatcher, array $options = [])
0 ignored issues
show
Duplication introduced by
This method seems to be duplicated in your project.

Duplicated code is one of the most pungent code smells. If you need to duplicate the same code in three or more different places, we strongly encourage you to look into extracting the code into a single class or operation.

You can also find more detailed suggestions in the “Code” section of your repository.

Loading history...
310
    {
311 4
        $feed = $import->getFeed();
312 4
        $transport = $feed->getTransportConfig();
313 4
        $resourceType = ReaderBuilderInterface::RESOURCE_TYPE_MAIN;
314
315 4
        return $this->createReader($import, $transport, $resourceType, $dispatcher, $options);
316
    }
317
318
    /**
319
     * @param ImportPart               $importPart
320
     * @param EventDispatcherInterface $dispatcher
321
     * @param array                    $options
322
     *
323
     * @return ReaderInterface
324
     */
325 4 View Code Duplication
    protected function createImportPartReader(ImportPart $importPart, EventDispatcherInterface $dispatcher, array $options = [])
0 ignored issues
show
Duplication introduced by
This method seems to be duplicated in your project.

Duplicated code is one of the most pungent code smells. If you need to duplicate the same code in three or more different places, we strongly encourage you to look into extracting the code into a single class or operation.

You can also find more detailed suggestions in the “Code” section of your repository.

Loading history...
326
    {
327 4
        $import = $importPart->getImport();
328 4
        $transport = $importPart->getTransportConfig();
329 4
        $resourceType = ReaderBuilderInterface::RESOURCE_TYPE_PART;
330
331 4
        return $this->createReader($import, $transport, $resourceType, $dispatcher, $options);
332
    }
333
334
    /**
335
     * @param FeedEntity               $feed
336
     * @param ReaderInterface          $reader
337
     * @param EventDispatcherInterface $dispatcher
338
     * @param array                    $options
339
     *
340
     * @return Feed
341
     */
342 4
    protected function createFeed(FeedEntity $feed, ReaderInterface $reader, EventDispatcherInterface $dispatcher, array $options = [])
343
    {
344 4
        $builder = new FeedBuilder($dispatcher);
345 4
        $type = $this->importRegistry->getFeedType($feed->getType());
346 4
        $options = array_merge($options, $feed->getOptions());
347
348 4
        return $builder->build($type, $reader, $options);
349
    }
350
351
    /**
352
     * @return EventDispatcherInterface
353
     */
354 4
    protected function createEventDispatcher()
355
    {
356 4
        $dispatcher = new EventDispatcher();
357 4
        $dispatcher->addSubscriber($this);
358
359 4
        return $dispatcher;
360
    }
361
362
    /**
363
     * @param FeedEntity $feed
364
     *
365
     * @throws UnfinishedImportException
366
     *
367
     * @return Import
368
     */
369 4
    protected function findOrCreateImport(FeedEntity $feed)
370
    {
371
        /** @var $imports Import[] */
372 4
        $imports = $this->getRepository()->findBy(['feed' => $feed]);
373
374 4
        foreach ($imports as $import) {
375
            // skip finished imports
376 2
            if ($import->isFinished()) {
377 2
                continue;
378
            }
379
380
            // if it hasn't started yet, use this one
381
            if (!$import->isStarted()) {
382
                return $import;
383
            }
384
385
            try {
386
                $this->getRepository()->finishImport($import);
387
            } catch (UnfinishedImportException $e) {
388
                throw new UnfinishedImportException(
389
                    $import,
390
                    sprintf(
391
                        'Import %d has unfinished parts, close those first before creating a new import',
392
                        $import->getId()
393
                    )
394
                );
395
            }
396 4
        }
397
398
        // all previous imports are checked and finished if necessary, we can create a new one now
399 4
        $import = new Import();
400 4
        $import->setFeed($feed);
401
402 4
        return $import;
403
    }
404
405 4
    protected function addImportParts(Import $import)
406
    {
407 4
        $dispatcher = $this->createEventDispatcher();
408 4
        $options = $this->getDefaultReaderOptions($import);
409
410 4
        $reader = $this->createImportReader($import, $dispatcher, $options);
411
412 4
        foreach ($reader->getResources() as $resource) {
413 4
            $transport = TransportFactory::createConfigFromTransport($resource->getTransport());
414
415 4
            $part = $this->createImportPart($import, $transport);
416 4
            $this->eventDispatcher->dispatch(ImportEvents::PART_CREATED, new PartEvent($part));
417 4
        }
418 4
    }
419
420
    /**
421
     * @return ImportRepository
422
     */
423 4
    protected function getRepository()
424
    {
425 4
        return $this->doctrine->getRepository('TreeHouseIoBundle:Import');
426
    }
427
428
    /**
429
     * @param Import $import
430
     *
431
     * @return HandlerInterface
432
     */
433 4
    protected function getImportHandler(Import $import)
0 ignored issues
show
Unused Code introduced by
The parameter $import is not used and could be removed.

This check looks from parameters that have been defined for a function or method, but which are not used in the method body.

Loading history...
434
    {
435 4
        return $this->importRegistry->getHandler('doctrine');
436
    }
437
438
    /**
439
     * @param Import $import
440
     *
441
     * @return ProcessorInterface
442
     */
443 4
    protected function getImportProcessor(Import $import)
0 ignored issues
show
Unused Code introduced by
The parameter $import is not used and could be removed.

This check looks from parameters that have been defined for a function or method, but which are not used in the method body.

Loading history...
444
    {
445 4
        return $this->importRegistry->getProcessor('posix');
446
    }
447
448
    /**
449
     * Returns default options to pass to the feed builder.
450
     *
451
     * @param Import $import
452
     *
453
     * @return array
454
     */
455 4
    protected function getDefaultFeedOptions(Import $import)
456
    {
457
        return [
458 4
            'forced' => $import->isForced(),
459 4
            'feed' => $import->getFeed(),
460 4
            'default_values' => $import->getFeed()->getDefaultValues(),
461 4
        ];
462
    }
463
464
    /**
465
     * Returns default options to pass to the reader builder.
466
     *
467
     * @param Import $import
468
     *
469
     * @return array
470
     */
471 4
    protected function getDefaultReaderOptions(Import $import)
472
    {
473
        return [
474 4
            'partial' => $import->isPartial(),
475 4
            'forced' => $import->isForced(),
476 4
        ];
477
    }
478
479
    /**
480
     * Returns default options to pass to the importer builder.
481
     *
482
     * @param Import $import
483
     *
484
     * @return array
485
     */
486 4
    protected function getDefaultImporterOptions(Import $import)
0 ignored issues
show
Unused Code introduced by
The parameter $import is not used and could be removed.

This check looks from parameters that have been defined for a function or method, but which are not used in the method body.

Loading history...
487
    {
488 4
        return [];
489
    }
490
}
491