Completed
Pull Request — 1.4 (#711)
by Paweł
10:54
created

ContentPushConsumer::__construct()   A

Complexity

Conditions 1
Paths 1

Size

Total Lines 15

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
dl 0
loc 15
rs 9.7666
c 0
b 0
f 0
cc 1
nc 1
nop 6
1
<?php
2
3
declare(strict_types=1);
4
5
/*
6
 * This file is part of the Superdesk Web Publisher Core Bundle.
7
 *
8
 * Copyright 2017 Sourcefabric z.ú. and contributors.
9
 *
10
 * For the full copyright and license information, please see the
11
 * AUTHORS and LICENSE files distributed with this source code.
12
 *
13
 * @copyright 2017 Sourcefabric z.ú
14
 * @license http://www.superdesk.org/license
15
 */
16
17
namespace SWP\Bundle\CoreBundle\Consumer;
18
19
use Doctrine\ORM\EntityManagerInterface;
20
use OldSound\RabbitMqBundle\RabbitMq\ConsumerInterface;
21
use PhpAmqpLib\Message\AMQPMessage;
22
use Psr\Log\LoggerInterface;
23
use SWP\Bundle\BridgeBundle\Doctrine\ORM\PackageRepository;
24
use SWP\Bundle\CoreBundle\Model\PackageInterface;
25
use SWP\Bundle\CoreBundle\Model\Tenant;
26
use SWP\Component\Bridge\Transformer\DataTransformerInterface;
27
use SWP\Component\MultiTenancy\Context\TenantContextInterface;
28
use Symfony\Component\Cache\ResettableInterface;
29
use Symfony\Component\EventDispatcher\EventDispatcherInterface;
30
use SWP\Component\Bridge\Events;
31
use Symfony\Component\EventDispatcher\GenericEvent;
32
33
class ContentPushConsumer implements ConsumerInterface
34
{
35
    /**
36
     * @var LoggerInterface
37
     */
38
    protected $logger;
39
40
    /**
41
     * @var PackageRepository
42
     */
43
    protected $packageRepository;
44
45
    /**
46
     * @var EventDispatcherInterface
47
     */
48
    protected $eventDispatcher;
49
50
    /**
51
     * @var DataTransformerInterface
52
     */
53
    protected $jsonToPackageTransformer;
54
55
    /**
56
     * @var EntityManagerInterface
57
     */
58
    protected $packageObjectManager;
59
60
    /**
61
     * @var TenantContextInterface
62
     */
63
    protected $tenantContext;
64
65
    public function __construct(
66
        LoggerInterface $logger,
67
        PackageRepository $packageRepository,
68
        EventDispatcherInterface $eventDispatcher,
69
        DataTransformerInterface $jsonToPackageTransformer,
70
        EntityManagerInterface $packageObjectManager,
71
        TenantContextInterface $tenantContext
72
    ) {
73
        $this->logger = $logger;
74
        $this->packageRepository = $packageRepository;
75
        $this->eventDispatcher = $eventDispatcher;
76
        $this->jsonToPackageTransformer = $jsonToPackageTransformer;
77
        $this->packageObjectManager = $packageObjectManager;
78
        $this->tenantContext = $tenantContext;
79
    }
80
81
    public function execute(AMQPMessage $msg): int
82
    {
83
        try {
84
            return $this->doExecute($msg);
85
        } catch (\Exception $e) {
86
            $this->logger->error($e->getMessage(), ['exception' => $e]);
87
88
            return ConsumerInterface::MSG_REJECT;
89
        }
90
    }
91
92
    public function doExecute(AMQPMessage $message): int
93
    {
94
        $decodedMessage = \unserialize($message->body);
95
        $this->tenantContext->setTenant($this->packageObjectManager->find(Tenant::class, $decodedMessage['tenant']->getId()));
96
97
        /** @var PackageInterface $package */
98
        $package = $decodedMessage['package'];
99
100
        /** @var PackageInterface $existingPackage */
101
        $existingPackage = $this->findExistingPackage($package);
102
        if (null !== $existingPackage) {
103
            $package->setId($existingPackage->getId());
104
            $package->setCreatedAt($existingPackage->getCreatedAt());
105
            $package->setUpdatedAt(new \DateTime());
106
            $this->eventDispatcher->dispatch(Events::PACKAGE_PRE_UPDATE, new GenericEvent($package, [
107
                'eventName' => Events::PACKAGE_PRE_UPDATE,
108
                'package' => $existingPackage,
109
            ]));
110
111
            $package = $this->packageObjectManager->merge($package);
112
            $this->packageObjectManager->flush();
113
114
            $this->eventDispatcher->dispatch(Events::PACKAGE_POST_UPDATE, new GenericEvent($package, ['eventName' => Events::PACKAGE_POST_UPDATE]));
115
            $this->eventDispatcher->dispatch(Events::PACKAGE_PROCESSED, new GenericEvent($package, ['eventName' => Events::PACKAGE_PROCESSED]));
116
117
            $this->reset();
118
            $this->logger->info(sprintf('Package %s was updated', $existingPackage->getGuid()));
119
120
            return ConsumerInterface::MSG_ACK;
121
        }
122
123
        $this->eventDispatcher->dispatch(Events::PACKAGE_PRE_CREATE, new GenericEvent($package, ['eventName' => Events::PACKAGE_PRE_CREATE]));
124
        $this->packageRepository->add($package);
125
        $this->eventDispatcher->dispatch(Events::PACKAGE_POST_CREATE, new GenericEvent($package, ['eventName' => Events::PACKAGE_POST_CREATE]));
126
        $this->eventDispatcher->dispatch(Events::PACKAGE_PROCESSED, new GenericEvent($package, ['eventName' => Events::PACKAGE_PROCESSED]));
127
128
        $this->reset();
129
        $this->logger->info(sprintf('Package %s was created', $package->getGuid()));
130
131
        return ConsumerInterface::MSG_ACK;
132
    }
133
134
    protected function findExistingPackage(PackageInterface $package)
135
    {
136
        $existingPackage = $this->packageRepository->findOneBy(['guid' => $package->getGuid()]);
137
138
        if (null === $existingPackage && null !== $package->getEvolvedFrom()) {
139
            $existingPackage = $this->packageRepository->findOneBy([
140
                'guid' => $package->getEvolvedFrom(),
141
            ]);
142
        }
143
144
        return $existingPackage;
145
    }
146
147
    private function reset(): void
148
    {
149
        $this->packageObjectManager->clear();
150
        if ($this->tenantContext instanceof ResettableInterface) {
151
            $this->tenantContext->reset();
152
        }
153
    }
154
}
155