Passed
Push — main ( 987cb0...4f2490 )
by De Cramer
02:59
created

EtlExecutionEventSubscriber::getSubscribedEvents()   A

Complexity

Conditions 1
Paths 1

Size

Total Lines 5
Code Lines 3

Duplication

Lines 0
Ratio 0 %

Importance

Changes 1
Bugs 0 Features 0
Metric Value
eloc 3
c 1
b 0
f 0
dl 0
loc 5
rs 10
cc 1
nc 1
nop 0
1
<?php
2
3
namespace Oliverde8\PhpEtlBundle\EventSubscriber;
4
5
use Doctrine\ORM\EntityManagerInterface;
6
use EasyCorp\Bundle\EasyAdminBundle\Event\AfterEntityPersistedEvent;
7
use EasyCorp\Bundle\EasyAdminBundle\Event\BeforeEntityPersistedEvent;
8
use Oliverde8\PhpEtlBundle\Entity\EtlExecution;
9
use Oliverde8\PhpEtlBundle\Message\EtlExecutionMessage;
10
use Oliverde8\PhpEtlBundle\Services\ChainProcessorsManager;
11
use Symfony\Component\EventDispatcher\EventSubscriberInterface;
12
use Symfony\Component\Messenger\MessageBusInterface;
13
14
class EtlExecutionEventSubscriber implements EventSubscriberInterface
15
{
16
    /** @var EntityManagerInterface */
17
    protected $em;
18
19
    /** @var ChainProcessorsManager */
20
    protected $chainProcessorManager;
21
22
    /** @var MessageBusInterface */
23
    protected $messageBus;
24
25
    /**
26
     * EtlExecutionEventSubscriber constructor.
27
     * @param EntityManagerInterface $em
28
     * @param ChainProcessorsManager $chainProcessorManager
29
     * @param MessageBusInterface $messageBus
30
     */
31
    public function __construct(EntityManagerInterface $em, ChainProcessorsManager $chainProcessorManager, MessageBusInterface $messageBus)
32
    {
33
        $this->em = $em;
34
        $this->chainProcessorManager = $chainProcessorManager;
35
        $this->messageBus = $messageBus;
36
    }
37
38
    public static function getSubscribedEvents()
39
    {
40
        return [
41
            BeforeEntityPersistedEvent::class => ['setChainDetails'],
42
            AfterEntityPersistedEvent::class => ['queueChainExecution'],
43
        ];
44
    }
45
46
    public function setChainDetails(BeforeEntityPersistedEvent $event)
47
    {
48
        $entity = $event->getEntityInstance();
49
        if (!$entity instanceof EtlExecution) {
50
            return;
51
        }
52
53
        $definition = $this->chainProcessorManager->getDefinition($entity->getName());
54
        $entity->setDefinition($definition);
55
        $entity->setStatus(EtlExecution::STATUS_WAITING);
56
    }
57
58
    public function queueChainExecution(AfterEntityPersistedEvent $event)
59
    {
60
        $entity = $event->getEntityInstance();
61
        if (!$entity instanceof EtlExecution) {
62
            return;
63
        }
64
65
        try {
66
            $this->messageBus->dispatch(new EtlExecutionMessage($entity->getId()));
67
        } catch (\Exception $e) {
68
            $this->em->remove($entity);
69
            $this->em->flush();
70
71
            throw $e;
72
        }
73
    }
74
}
75