Completed
Push — master ( 05bb5a...5289b8 )
by Rafał
34:32 queued 04:15
created

ContentPushConsumer   A

Complexity

Total Complexity 11

Size/Duplication

Total Lines 124
Duplicated Lines 0 %

Coupling/Cohesion

Components 1
Dependencies 9

Importance

Changes 0
Metric Value
wmc 11
lcom 1
cbo 9
dl 0
loc 124
rs 10
c 0
b 0
f 0

5 Methods

Rating   Name   Duplication   Size   Complexity  
A __construct() 0 15 1
A execute() 0 12 3
A doExecute() 0 41 2
A findExistingPackage() 0 12 3
A reset() 0 7 2
1
<?php
2
3
declare(strict_types=1);
4
5
/*
6
 * This file is part of the Superdesk Web Publisher Core Bundle.
7
 *
8
 * Copyright 2017 Sourcefabric z.ú. and contributors.
9
 *
10
 * For the full copyright and license information, please see the
11
 * AUTHORS and LICENSE files distributed with this source code.
12
 *
13
 * @copyright 2017 Sourcefabric z.ú
14
 * @license http://www.superdesk.org/license
15
 */
16
17
namespace SWP\Bundle\CoreBundle\Consumer;
18
19
use Doctrine\DBAL\ConnectionException;
20
use Doctrine\ORM\EntityManagerInterface;
21
use OldSound\RabbitMqBundle\RabbitMq\ConsumerInterface;
22
use PhpAmqpLib\Message\AMQPMessage;
23
use Psr\Log\LoggerInterface;
24
use SWP\Bundle\BridgeBundle\Doctrine\ORM\PackageRepository;
25
use SWP\Bundle\CoreBundle\Model\PackageInterface;
26
use SWP\Bundle\CoreBundle\Model\Tenant;
27
use SWP\Component\Bridge\Transformer\DataTransformerInterface;
28
use SWP\Component\MultiTenancy\Context\TenantContextInterface;
29
use Symfony\Component\Cache\ResettableInterface;
30
use Symfony\Component\EventDispatcher\EventDispatcherInterface;
31
use SWP\Component\Bridge\Events;
32
use Symfony\Component\EventDispatcher\GenericEvent;
33
34
class ContentPushConsumer implements ConsumerInterface
35
{
36
    /**
37
     * @var LoggerInterface
38
     */
39
    protected $logger;
40
41
    /**
42
     * @var PackageRepository
43
     */
44
    protected $packageRepository;
45
46
    /**
47
     * @var EventDispatcherInterface
48
     */
49
    protected $eventDispatcher;
50
51
    /**
52
     * @var DataTransformerInterface
53
     */
54
    protected $jsonToPackageTransformer;
55
56
    /**
57
     * @var EntityManagerInterface
58
     */
59
    protected $packageObjectManager;
60
61
    /**
62
     * @var TenantContextInterface
63
     */
64
    protected $tenantContext;
65
66
    public function __construct(
67
        LoggerInterface $logger,
68
        PackageRepository $packageRepository,
69
        EventDispatcherInterface $eventDispatcher,
70
        DataTransformerInterface $jsonToPackageTransformer,
71
        EntityManagerInterface $packageObjectManager,
72
        TenantContextInterface $tenantContext
73
    ) {
74
        $this->logger = $logger;
75
        $this->packageRepository = $packageRepository;
76
        $this->eventDispatcher = $eventDispatcher;
77
        $this->jsonToPackageTransformer = $jsonToPackageTransformer;
78
        $this->packageObjectManager = $packageObjectManager;
79
        $this->tenantContext = $tenantContext;
80
    }
81
82
    public function execute(AMQPMessage $msg): int
83
    {
84
        try {
85
            return $this->doExecute($msg);
86
        } catch (ConnectionException $e) {
87
            throw $e;
88
        } catch (\Exception $e) {
89
            $this->logger->error($e->getMessage(), ['trace' => $e->getTraceAsString()]);
90
91
            return ConsumerInterface::MSG_REJECT_REQUEUE;
92
        }
93
    }
94
95
    public function doExecute(AMQPMessage $message): int
96
    {
97
        $decodedMessage = \unserialize($message->body);
98
        $this->tenantContext->setTenant($this->packageObjectManager->find(Tenant::class, $decodedMessage['tenant']->getId()));
99
100
        /** @var PackageInterface $package */
101
        $package = $decodedMessage['package'];
102
103
        /** @var PackageInterface $existingPackage */
104
        $existingPackage = $this->findExistingPackage($package);
105
        if (null !== $existingPackage) {
106
            $package->setId($existingPackage->getId());
107
            $package->setCreatedAt($existingPackage->getCreatedAt());
108
            $package->setUpdatedAt(new \DateTime());
109
            $this->eventDispatcher->dispatch(Events::PACKAGE_PRE_UPDATE, new GenericEvent($package, [
110
                'eventName' => Events::PACKAGE_PRE_UPDATE,
111
                'package' => $existingPackage,
112
            ]));
113
114
            $package = $this->packageObjectManager->merge($package);
115
            $this->packageObjectManager->flush();
116
117
            $this->eventDispatcher->dispatch(Events::PACKAGE_POST_UPDATE, new GenericEvent($package, ['eventName' => Events::PACKAGE_POST_UPDATE]));
118
            $this->eventDispatcher->dispatch(Events::PACKAGE_PROCESSED, new GenericEvent($package, ['eventName' => Events::PACKAGE_PROCESSED]));
119
120
            $this->reset();
121
            $this->logger->info(sprintf('Package %s was updated', $existingPackage->getGuid()));
122
123
            return ConsumerInterface::MSG_ACK;
124
        }
125
126
        $this->eventDispatcher->dispatch(Events::PACKAGE_PRE_CREATE, new GenericEvent($package, ['eventName' => Events::PACKAGE_PRE_CREATE]));
127
        $this->packageRepository->add($package);
128
        $this->eventDispatcher->dispatch(Events::PACKAGE_POST_CREATE, new GenericEvent($package, ['eventName' => Events::PACKAGE_POST_CREATE]));
129
        $this->eventDispatcher->dispatch(Events::PACKAGE_PROCESSED, new GenericEvent($package, ['eventName' => Events::PACKAGE_PROCESSED]));
130
131
        $this->reset();
132
        $this->logger->info(sprintf('Package %s was created', $package->getGuid()));
133
134
        return ConsumerInterface::MSG_ACK;
135
    }
136
137
    protected function findExistingPackage(PackageInterface $package)
138
    {
139
        $existingPackage = $this->packageRepository->findOneBy(['guid' => $package->getGuid()]);
140
141
        if (null === $existingPackage && null !== $package->getEvolvedFrom()) {
142
            $existingPackage = $this->packageRepository->findOneBy([
143
                'guid' => $package->getEvolvedFrom(),
144
            ]);
145
        }
146
147
        return $existingPackage;
148
    }
149
150
    private function reset(): void
151
    {
152
        $this->packageObjectManager->clear();
153
        if ($this->tenantContext instanceof ResettableInterface) {
154
            $this->tenantContext->reset();
155
        }
156
    }
157
}
158