Completed
Push — 2.0 ( c10f05...3dc2d4 )
by Paweł
20s queued 12s
created

ContentPushConsumer::execute()   B

Complexity

Conditions 7
Paths 26

Size

Total Lines 35

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
dl 0
loc 35
rs 8.4266
c 0
b 0
f 0
cc 7
nc 26
nop 1
1
<?php
2
3
declare(strict_types=1);
4
5
/*
6
 * This file is part of the Superdesk Web Publisher Core Bundle.
7
 *
8
 * Copyright 2017 Sourcefabric z.ú. and contributors.
9
 *
10
 * For the full copyright and license information, please see the
11
 * AUTHORS and LICENSE files distributed with this source code.
12
 *
13
 * @copyright 2017 Sourcefabric z.ú
14
 * @license http://www.superdesk.org/license
15
 */
16
17
namespace SWP\Bundle\CoreBundle\Consumer;
18
19
use Doctrine\DBAL\DBALException;
20
use Doctrine\DBAL\Exception\NotNullConstraintViolationException;
21
use Doctrine\ORM\EntityManagerInterface;
22
use Doctrine\ORM\NonUniqueResultException;
23
use Doctrine\ORM\ORMException;
24
use Exception;
25
use OldSound\RabbitMqBundle\RabbitMq\ConsumerInterface;
26
use PhpAmqpLib\Message\AMQPMessage;
27
use Psr\Log\LoggerInterface;
28
use SWP\Bundle\BridgeBundle\Doctrine\ORM\PackageRepository;
29
use SWP\Bundle\CoreBundle\Model\PackageInterface;
30
use SWP\Bundle\CoreBundle\Model\Tenant;
31
use SWP\Bundle\CoreBundle\Model\TenantInterface;
32
use SWP\Component\Bridge\Model\ItemInterface;
33
use SWP\Component\Bridge\Transformer\DataTransformerInterface;
34
use SWP\Component\MultiTenancy\Context\TenantContextInterface;
35
use Symfony\Component\Cache\ResettableInterface;
36
use Symfony\Component\EventDispatcher\EventDispatcherInterface;
37
use SWP\Component\Bridge\Events;
38
use Symfony\Component\EventDispatcher\GenericEvent;
39
use Symfony\Component\Lock\Factory;
40
use function unserialize;
41
42
class ContentPushConsumer implements ConsumerInterface
43
{
44
    protected $lockFactory;
45
46
    /**
47
     * @var LoggerInterface
48
     */
49
    protected $logger;
50
51
    /**
52
     * @var PackageRepository
53
     */
54
    protected $packageRepository;
55
56
    /**
57
     * @var EventDispatcherInterface
58
     */
59
    protected $eventDispatcher;
60
61
    /**
62
     * @var DataTransformerInterface
63
     */
64
    protected $jsonToPackageTransformer;
65
66
    /**
67
     * @var EntityManagerInterface
68
     */
69
    protected $packageObjectManager;
70
71
    /**
72
     * @var TenantContextInterface
73
     */
74
    protected $tenantContext;
75
76
    public function __construct(
77
        Factory $lockFactory,
78
        LoggerInterface $logger,
79
        PackageRepository $packageRepository,
80
        EventDispatcherInterface $eventDispatcher,
81
        DataTransformerInterface $jsonToPackageTransformer,
82
        EntityManagerInterface $packageObjectManager,
83
        TenantContextInterface $tenantContext
84
    ) {
85
        $this->lockFactory = $lockFactory;
86
        $this->logger = $logger;
87
        $this->packageRepository = $packageRepository;
88
        $this->eventDispatcher = $eventDispatcher;
89
        $this->jsonToPackageTransformer = $jsonToPackageTransformer;
90
        $this->packageObjectManager = $packageObjectManager;
91
        $this->tenantContext = $tenantContext;
92
    }
93
94
    public function execute(AMQPMessage $msg): int
95
    {
96
        $decodedMessage = unserialize($msg->body, [true]);
97
        /** @var TenantInterface $tenant */
98
        $tenant = $decodedMessage['tenant'];
99
        /** @var PackageInterface $package */
100
        $package = $decodedMessage['package'];
101
        $lock = $this->lockFactory->createLock(md5(json_encode(['type' => 'package', 'guid' => $package->getGuid()])), 120);
102
103
        try {
104
            if (!$lock->acquire()) {
105
                return ConsumerInterface::MSG_REJECT_REQUEUE;
106
            }
107
108
            $result = $this->doExecute($tenant, $package);
109
            $lock->release();
110
111
            return $result;
112
        } catch (NonUniqueResultException | NotNullConstraintViolationException $e) {
113
            $this->logger->error('' !== $e->getMessage() ? $e->getMessage() : 'Unhandled NonUnique or NotNullConstraint exception', ['trace' => $e->getTraceAsString()]);
114
115
            return ConsumerInterface::MSG_REJECT;
116
        } catch (DBALException | ORMException $e) {
117
            $lock->release();
118
119
            throw $e;
120
        } catch (Exception $e) {
121
            $this->logger->error('' !== $e->getMessage() ? $e->getMessage() : 'Unhandled exception', ['trace' => $e->getTraceAsString()]);
122
            $lock->release();
123
124
            return ConsumerInterface::MSG_REJECT;
125
        } finally {
126
            $lock->release();
127
        }
128
    }
129
130
    /**
131
     * @throws NonUniqueResultException
132
     * @throws NotNullConstraintViolationException
133
     * @throws DBALException
134
     * @throws ORMException
135
     * @throws Exception
136
     */
137
    public function doExecute(TenantInterface $tenant, PackageInterface $package): int
138
    {
139
        $packageType = $package->getType();
140
        if (ItemInterface::TYPE_TEXT !== $packageType && ItemInterface::TYPE_COMPOSITE !== $packageType) {
141
            return ConsumerInterface::MSG_REJECT;
142
        }
143
144
        $this->tenantContext->setTenant($this->packageObjectManager->find(Tenant::class, $tenant->getId()));
0 ignored issues
show
Documentation introduced by
$this->packageObjectMana...lass, $tenant->getId()) is of type object|null, but the function expects a object<SWP\Component\Mul...\Model\TenantInterface>.

It seems like the type of the argument is not accepted by the function/method which you are calling.

In some cases, in particular if PHP’s automatic type-juggling kicks in this might be fine. In other cases, however this might be a bug.

We suggest to add an explicit type cast like in the following example:

function acceptsInteger($int) { }

$x = '123'; // string "123"

// Instead of
acceptsInteger($x);

// we recommend to use
acceptsInteger((integer) $x);
Loading history...
145
146
        /** @var PackageInterface $existingPackage */
147
        $existingPackage = $this->findExistingPackage($package);
148
        if (null !== $existingPackage) {
149
            $package->setId($existingPackage->getId());
150
            $package->setCreatedAt($existingPackage->getCreatedAt());
151
            $package->setUpdatedAt(new \DateTime());
152
            $this->eventDispatcher->dispatch(Events::PACKAGE_PRE_UPDATE, new GenericEvent($package, [
153
                'eventName' => Events::PACKAGE_PRE_UPDATE,
154
                'package' => $existingPackage,
155
            ]));
156
157
            $package = $this->packageObjectManager->merge($package);
158
            $this->packageObjectManager->flush();
159
160
            $this->eventDispatcher->dispatch(Events::PACKAGE_POST_UPDATE, new GenericEvent($package, ['eventName' => Events::PACKAGE_POST_UPDATE]));
161
            $this->eventDispatcher->dispatch(Events::PACKAGE_PROCESSED, new GenericEvent($package, ['eventName' => Events::PACKAGE_PROCESSED]));
162
163
            $this->reset();
164
            $this->logger->info(sprintf('Package %s was updated', $existingPackage->getGuid()));
165
166
            return ConsumerInterface::MSG_ACK;
167
        }
168
169
        $this->eventDispatcher->dispatch(Events::PACKAGE_PRE_CREATE, new GenericEvent($package, ['eventName' => Events::PACKAGE_PRE_CREATE]));
170
        $this->packageRepository->add($package);
171
        $this->eventDispatcher->dispatch(Events::PACKAGE_POST_CREATE, new GenericEvent($package, ['eventName' => Events::PACKAGE_POST_CREATE]));
172
        $this->eventDispatcher->dispatch(Events::PACKAGE_PROCESSED, new GenericEvent($package, ['eventName' => Events::PACKAGE_PROCESSED]));
173
        $this->reset();
174
        $this->logger->info(sprintf('Package %s was created', $package->getGuid()));
175
176
        return ConsumerInterface::MSG_ACK;
177
    }
178
179
    protected function findExistingPackage(PackageInterface $package)
180
    {
181
        $existingPackage = $this->packageRepository->findOneBy(['guid' => $package->getEvolvedFrom() ?? $package->getGuid()]);
182
        if (null === $existingPackage && null !== $package->getEvolvedFrom()) {
183
            $existingPackage = $this->packageRepository->findOneBy(['guid' => $package->getGuid()]);
184
        }
185
186
        if (null === $existingPackage) {
187
            // check for updated items (with evolved from)
188
            $existingPackage = $this->packageRepository->findOneBy(['evolvedFrom' => $package->getGuid()]);
189
        }
190
191
        return $existingPackage;
192
    }
193
194
    private function reset(): void
195
    {
196
        $this->packageObjectManager->clear();
197
        if ($this->tenantContext instanceof ResettableInterface) {
198
            $this->tenantContext->reset();
199
        }
200
    }
201
}
202