Completed
Push — 2.0 ( ffb373...e1b9bf )
by Paweł
17:32 queued 08:21
created

ContentPushConsumer::doExecute()   B

Complexity

Conditions 5
Paths 4

Size

Total Lines 48

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
dl 0
loc 48
rs 8.8234
c 0
b 0
f 0
cc 5
nc 4
nop 2
1
<?php
2
3
declare(strict_types=1);
4
5
/*
6
 * This file is part of the Superdesk Web Publisher Core Bundle.
7
 *
8
 * Copyright 2017 Sourcefabric z.ú. and contributors.
9
 *
10
 * For the full copyright and license information, please see the
11
 * AUTHORS and LICENSE files distributed with this source code.
12
 *
13
 * @copyright 2017 Sourcefabric z.ú
14
 * @license http://www.superdesk.org/license
15
 */
16
17
namespace SWP\Bundle\CoreBundle\Consumer;
18
19
use Doctrine\DBAL\DBALException;
20
use Doctrine\DBAL\Exception\NotNullConstraintViolationException;
21
use Doctrine\ORM\EntityManagerInterface;
22
use Doctrine\ORM\NonUniqueResultException;
23
use Doctrine\ORM\ORMException;
24
use Exception;
25
use OldSound\RabbitMqBundle\RabbitMq\ConsumerInterface;
26
use PhpAmqpLib\Message\AMQPMessage;
27
use Psr\Log\LoggerInterface;
28
use Sentry\Breadcrumb;
29
use Sentry\State\HubInterface;
30
use SWP\Bundle\BridgeBundle\Doctrine\ORM\PackageRepository;
31
use SWP\Bundle\CoreBundle\Model\PackageInterface;
32
use SWP\Bundle\CoreBundle\Model\Tenant;
33
use SWP\Bundle\CoreBundle\Model\TenantInterface;
34
use SWP\Component\Bridge\Model\ItemInterface;
35
use SWP\Component\Bridge\Transformer\DataTransformerInterface;
36
use SWP\Component\MultiTenancy\Context\TenantContextInterface;
37
use Symfony\Component\Cache\ResettableInterface;
38
use Symfony\Component\EventDispatcher\EventDispatcherInterface;
39
use SWP\Component\Bridge\Events;
40
use Symfony\Component\EventDispatcher\GenericEvent;
41
use Symfony\Component\Lock\Factory;
42
use function unserialize;
43
44
class ContentPushConsumer implements ConsumerInterface
45
{
46
    protected $lockFactory;
47
48
    protected $logger;
49
50
    protected $packageRepository;
51
52
    protected $eventDispatcher;
53
54
    protected $jsonToPackageTransformer;
55
56
    protected $packageObjectManager;
57
58
    protected $tenantContext;
59
60
    protected $sentryHub;
61
62
    public function __construct(
63
        Factory $lockFactory,
64
        LoggerInterface $logger,
65
        PackageRepository $packageRepository,
66
        EventDispatcherInterface $eventDispatcher,
67
        DataTransformerInterface $jsonToPackageTransformer,
68
        EntityManagerInterface $packageObjectManager,
69
        TenantContextInterface $tenantContext,
70
        HubInterface $sentryHub
71
    ) {
72
        $this->lockFactory = $lockFactory;
73
        $this->logger = $logger;
74
        $this->packageRepository = $packageRepository;
75
        $this->eventDispatcher = $eventDispatcher;
76
        $this->jsonToPackageTransformer = $jsonToPackageTransformer;
77
        $this->packageObjectManager = $packageObjectManager;
78
        $this->tenantContext = $tenantContext;
79
        $this->sentryHub = $sentryHub;
80
    }
81
82
    public function execute(AMQPMessage $msg): int
83
    {
84
        $decodedMessage = unserialize($msg->body, [true]);
85
        /** @var TenantInterface $tenant */
86
        $tenant = $decodedMessage['tenant'];
87
        /** @var PackageInterface $package */
88
        $package = $decodedMessage['package'];
89
        $lock = $this->lockFactory->createLock(md5(json_encode(['type' => 'package', 'guid' => $package->getGuid()])), 120);
90
91
        try {
92
            if (!$lock->acquire()) {
93
                return ConsumerInterface::MSG_REJECT_REQUEUE;
94
            }
95
96
            return $this->doExecute($tenant, $package);
97
        } catch (NonUniqueResultException | NotNullConstraintViolationException $e) {
98
            $this->logException($e, $package, 'Unhandled NonUnique or NotNullConstraint exception');
99
100
            return ConsumerInterface::MSG_REJECT;
101
        } catch (DBALException | ORMException $e) {
102
            $this->logException($e, $package);
103
104
            throw $e;
105
        } catch (Exception $e) {
106
            $this->logException($e, $package);
107
108
            return ConsumerInterface::MSG_REJECT;
109
        } finally {
110
            $lock->release();
111
        }
112
    }
113
114
    /**
115
     * @throws NonUniqueResultException
116
     * @throws NotNullConstraintViolationException
117
     * @throws DBALException
118
     * @throws ORMException
119
     * @throws Exception
120
     */
121
    public function doExecute(TenantInterface $tenant, PackageInterface $package): int
122
    {
123
        $packageType = $package->getType();
124
        if (ItemInterface::TYPE_TEXT !== $packageType && ItemInterface::TYPE_COMPOSITE !== $packageType) {
125
            return ConsumerInterface::MSG_REJECT;
126
        }
127
128
        $this->tenantContext->setTenant($this->packageObjectManager->find(Tenant::class, $tenant->getId()));
0 ignored issues
show
Documentation introduced by
$this->packageObjectMana...lass, $tenant->getId()) is of type object|null, but the function expects a object<SWP\Component\Mul...\Model\TenantInterface>.

It seems like the type of the argument is not accepted by the function/method which you are calling.

In some cases, in particular if PHP’s automatic type-juggling kicks in this might be fine. In other cases, however this might be a bug.

We suggest to add an explicit type cast like in the following example:

function acceptsInteger($int) { }

$x = '123'; // string "123"

// Instead of
acceptsInteger($x);

// we recommend to use
acceptsInteger((integer) $x);
Loading history...
129
130
        /** @var PackageInterface $existingPackage */
131
        $existingPackage = $this->findExistingPackage($package);
132
        if (null !== $existingPackage) {
133
            $package->setId($existingPackage->getId());
134
            $package->setCreatedAt($existingPackage->getCreatedAt());
135
            $package->setUpdatedAt(new \DateTime());
136
            $this->eventDispatcher->dispatch(Events::PACKAGE_PRE_UPDATE, new GenericEvent($package, [
137
                'eventName' => Events::PACKAGE_PRE_UPDATE,
138
                'package' => $existingPackage,
139
            ]));
140
141
            foreach ($existingPackage->getGroups() as $group) {
142
                $this->packageObjectManager->remove($group);
143
            }
144
145
            $package = $this->packageObjectManager->merge($package);
146
            $this->packageObjectManager->flush();
147
148
            $this->eventDispatcher->dispatch(Events::PACKAGE_POST_UPDATE, new GenericEvent($package, ['eventName' => Events::PACKAGE_POST_UPDATE]));
149
            $this->eventDispatcher->dispatch(Events::PACKAGE_PROCESSED, new GenericEvent($package, ['eventName' => Events::PACKAGE_PROCESSED]));
150
            $this->packageObjectManager->flush();
151
152
            $this->reset();
153
            $this->logger->info(sprintf('Package %s was updated', $existingPackage->getGuid()));
154
155
            return ConsumerInterface::MSG_ACK;
156
        }
157
158
        $this->eventDispatcher->dispatch(Events::PACKAGE_PRE_CREATE, new GenericEvent($package, ['eventName' => Events::PACKAGE_PRE_CREATE]));
159
        $this->packageRepository->add($package);
160
        $this->eventDispatcher->dispatch(Events::PACKAGE_POST_CREATE, new GenericEvent($package, ['eventName' => Events::PACKAGE_POST_CREATE]));
161
        $this->eventDispatcher->dispatch(Events::PACKAGE_PROCESSED, new GenericEvent($package, ['eventName' => Events::PACKAGE_PROCESSED]));
162
        $this->packageObjectManager->flush();
163
164
        $this->logger->info(sprintf('Package %s was created', $package->getGuid()));
165
        $this->reset();
166
167
        return ConsumerInterface::MSG_ACK;
168
    }
169
170
    protected function findExistingPackage(PackageInterface $package)
171
    {
172
        $existingPackage = $this->packageRepository->findOneBy(['guid' => $package->getEvolvedFrom() ?? $package->getGuid()]);
173
        if (null === $existingPackage && null !== $package->getEvolvedFrom()) {
174
            $existingPackage = $this->packageRepository->findOneBy(['guid' => $package->getGuid()]);
175
        }
176
177
        if (null === $existingPackage) {
178
            // check for updated items (with evolved from)
179
            $existingPackage = $this->packageRepository->findOneBy(['evolvedFrom' => $package->getGuid()]);
180
        }
181
182
        return $existingPackage;
183
    }
184
185
    private function reset(): void
186
    {
187
        $this->packageObjectManager->clear();
188
        if ($this->tenantContext instanceof ResettableInterface) {
189
            $this->tenantContext->reset();
190
        }
191
    }
192
193
    private function logException(\Exception $e, PackageInterface $package, string $defaultMessage = 'Unhandled exception'): void
194
    {
195
        $this->logger->error('' !== $e->getMessage() ? $e->getMessage() : $defaultMessage, ['trace' => $e->getTraceAsString()]);
196
        $this->sentryHub->addBreadcrumb(new Breadcrumb(
197
            Breadcrumb::LEVEL_DEBUG,
198
            Breadcrumb::TYPE_DEFAULT,
199
            'publishing',
200
            'Package',
201
            [
202
                'guid' => $package->getGuid(),
203
                'headline' => $package->getHeadline(),
204
            ]
205
        ));
206
        $this->sentryHub->captureException($e);
207
    }
208
}
209