Completed
Push — 2.0 ( e1b9bf...789055 )
by Rafał
10:13
created

ContentPushConsumer::doExecute()   A

Complexity

Conditions 4
Paths 3

Size

Total Lines 37

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
dl 0
loc 37
rs 9.328
c 0
b 0
f 0
cc 4
nc 3
nop 2
1
<?php
2
3
declare(strict_types=1);
4
5
/*
6
 * This file is part of the Superdesk Web Publisher Core Bundle.
7
 *
8
 * Copyright 2017 Sourcefabric z.ú. and contributors.
9
 *
10
 * For the full copyright and license information, please see the
11
 * AUTHORS and LICENSE files distributed with this source code.
12
 *
13
 * @copyright 2017 Sourcefabric z.ú
14
 * @license http://www.superdesk.org/license
15
 */
16
17
namespace SWP\Bundle\CoreBundle\Consumer;
18
19
use Doctrine\DBAL\DBALException;
20
use Doctrine\DBAL\Exception\NotNullConstraintViolationException;
21
use Doctrine\ORM\EntityManagerInterface;
22
use Doctrine\ORM\NonUniqueResultException;
23
use Doctrine\ORM\ORMException;
24
use Exception;
25
use OldSound\RabbitMqBundle\RabbitMq\ConsumerInterface;
26
use PhpAmqpLib\Message\AMQPMessage;
27
use Psr\Log\LoggerInterface;
28
use Sentry\Breadcrumb;
29
use Sentry\State\HubInterface;
30
use SWP\Bundle\BridgeBundle\Doctrine\ORM\PackageRepository;
31
use SWP\Bundle\CoreBundle\Hydrator\PackageHydratorInterface;
32
use SWP\Bundle\CoreBundle\Model\PackageInterface;
33
use SWP\Bundle\CoreBundle\Model\Tenant;
34
use SWP\Bundle\CoreBundle\Model\TenantInterface;
35
use SWP\Component\Bridge\Model\ItemInterface;
36
use SWP\Component\Bridge\Transformer\DataTransformerInterface;
37
use SWP\Component\MultiTenancy\Context\TenantContextInterface;
38
use Symfony\Component\Cache\ResettableInterface;
39
use Symfony\Component\EventDispatcher\EventDispatcherInterface;
40
use SWP\Component\Bridge\Events;
41
use Symfony\Component\EventDispatcher\GenericEvent;
42
use Symfony\Component\Lock\Factory;
43
use function unserialize;
44
45
class ContentPushConsumer implements ConsumerInterface
46
{
47
    protected $lockFactory;
48
49
    protected $logger;
50
51
    protected $packageRepository;
52
53
    protected $eventDispatcher;
54
55
    protected $jsonToPackageTransformer;
56
57
    protected $packageObjectManager;
58
59
    protected $tenantContext;
60
61
    protected $sentryHub;
62
63
    protected $packageHydrator;
64
65
    public function __construct(
66
        Factory $lockFactory,
67
        LoggerInterface $logger,
68
        PackageRepository $packageRepository,
69
        EventDispatcherInterface $eventDispatcher,
70
        DataTransformerInterface $jsonToPackageTransformer,
71
        EntityManagerInterface $packageObjectManager,
72
        TenantContextInterface $tenantContext,
73
        HubInterface $sentryHub,
74
        PackageHydratorInterface $packageHydrator
75
    ) {
76
        $this->lockFactory = $lockFactory;
77
        $this->logger = $logger;
78
        $this->packageRepository = $packageRepository;
79
        $this->eventDispatcher = $eventDispatcher;
80
        $this->jsonToPackageTransformer = $jsonToPackageTransformer;
81
        $this->packageObjectManager = $packageObjectManager;
82
        $this->tenantContext = $tenantContext;
83
        $this->sentryHub = $sentryHub;
84
        $this->packageHydrator = $packageHydrator;
85
    }
86
87
    public function execute(AMQPMessage $msg): int
88
    {
89
        $decodedMessage = unserialize($msg->body, [true]);
90
        /** @var TenantInterface $tenant */
91
        $tenant = $decodedMessage['tenant'];
92
        /** @var PackageInterface $package */
93
        $package = $decodedMessage['package'];
94
        $lock = $this->lockFactory->createLock(md5(json_encode(['type' => 'package', 'guid' => $package->getGuid()])), 120);
95
96
        try {
97
            if (!$lock->acquire()) {
98
                return ConsumerInterface::MSG_REJECT_REQUEUE;
99
            }
100
101
            return $this->doExecute($tenant, $package);
102
        } catch (NonUniqueResultException | NotNullConstraintViolationException $e) {
103
            $this->logException($e, $package, 'Unhandled NonUnique or NotNullConstraint exception');
104
105
            return ConsumerInterface::MSG_REJECT;
106
        } catch (DBALException | ORMException $e) {
107
            $this->logException($e, $package);
108
109
            throw $e;
110
        } catch (Exception $e) {
111
            $this->logException($e, $package);
112
113
            return ConsumerInterface::MSG_REJECT;
114
        } finally {
115
            $lock->release();
116
        }
117
    }
118
119
    /**
120
     * @throws NonUniqueResultException
121
     * @throws NotNullConstraintViolationException
122
     * @throws DBALException
123
     * @throws ORMException
124
     * @throws Exception
125
     */
126
    public function doExecute(TenantInterface $tenant, PackageInterface $package): int
127
    {
128
        $packageType = $package->getType();
129
        if (ItemInterface::TYPE_TEXT !== $packageType && ItemInterface::TYPE_COMPOSITE !== $packageType) {
130
            return ConsumerInterface::MSG_REJECT;
131
        }
132
133
        $this->tenantContext->setTenant($this->packageObjectManager->find(Tenant::class, $tenant->getId()));
0 ignored issues
show
Documentation introduced by
$this->packageObjectMana...lass, $tenant->getId()) is of type object|null, but the function expects a object<SWP\Component\Mul...\Model\TenantInterface>.

It seems like the type of the argument is not accepted by the function/method which you are calling.

In some cases, in particular if PHP’s automatic type-juggling kicks in this might be fine. In other cases, however this might be a bug.

We suggest to add an explicit type cast like in the following example:

function acceptsInteger($int) { }

$x = '123'; // string "123"

// Instead of
acceptsInteger($x);

// we recommend to use
acceptsInteger((integer) $x);
Loading history...
134
135
        /** @var PackageInterface $existingPackage */
136
        $existingPackage = $this->findExistingPackage($package);
137
        if (null !== $existingPackage) {
138
            $existingPackage = $this->packageHydrator->hydrate($package, $existingPackage);
139
140
            $this->eventDispatcher->dispatch(Events::PACKAGE_PRE_UPDATE, new GenericEvent($existingPackage, ['eventName' => Events::PACKAGE_PRE_UPDATE]));
141
            $this->packageObjectManager->flush();
142
            $this->eventDispatcher->dispatch(Events::PACKAGE_POST_UPDATE, new GenericEvent($existingPackage, ['eventName' => Events::PACKAGE_POST_UPDATE]));
143
            $this->eventDispatcher->dispatch(Events::PACKAGE_PROCESSED, new GenericEvent($existingPackage, ['eventName' => Events::PACKAGE_PROCESSED]));
144
            $this->packageObjectManager->flush();
145
146
            $this->reset();
147
            $this->logger->info(sprintf('Package %s was updated', $existingPackage->getGuid()));
148
149
            return ConsumerInterface::MSG_ACK;
150
        }
151
152
        $this->eventDispatcher->dispatch(Events::PACKAGE_PRE_CREATE, new GenericEvent($package, ['eventName' => Events::PACKAGE_PRE_CREATE]));
153
        $this->packageRepository->add($package);
154
        $this->eventDispatcher->dispatch(Events::PACKAGE_POST_CREATE, new GenericEvent($package, ['eventName' => Events::PACKAGE_POST_CREATE]));
155
        $this->eventDispatcher->dispatch(Events::PACKAGE_PROCESSED, new GenericEvent($package, ['eventName' => Events::PACKAGE_PROCESSED]));
156
        $this->packageObjectManager->flush();
157
158
        $this->logger->info(sprintf('Package %s was created', $package->getGuid()));
159
        $this->reset();
160
161
        return ConsumerInterface::MSG_ACK;
162
    }
163
164
    protected function findExistingPackage(PackageInterface $package)
165
    {
166
        $existingPackage = $this->packageRepository->findOneBy(['guid' => $package->getEvolvedFrom() ?? $package->getGuid()]);
167
        if (null === $existingPackage && null !== $package->getEvolvedFrom()) {
168
            $existingPackage = $this->packageRepository->findOneBy(['guid' => $package->getGuid()]);
169
        }
170
171
        if (null === $existingPackage) {
172
            // check for updated items (with evolved from)
173
            $existingPackage = $this->packageRepository->findOneBy(['evolvedFrom' => $package->getGuid()]);
174
        }
175
176
        return $existingPackage;
177
    }
178
179
    private function reset(): void
180
    {
181
        $this->packageObjectManager->clear();
182
        if ($this->tenantContext instanceof ResettableInterface) {
183
            $this->tenantContext->reset();
184
        }
185
    }
186
187
    private function logException(\Exception $e, PackageInterface $package, string $defaultMessage = 'Unhandled exception'): void
188
    {
189
        $this->logger->error('' !== $e->getMessage() ? $e->getMessage() : $defaultMessage, ['trace' => $e->getTraceAsString()]);
190
        $this->sentryHub->addBreadcrumb(new Breadcrumb(
191
            Breadcrumb::LEVEL_DEBUG,
192
            Breadcrumb::TYPE_DEFAULT,
193
            'publishing',
194
            'Package',
195
            [
196
                'guid' => $package->getGuid(),
197
                'headline' => $package->getHeadline(),
198
            ]
199
        ));
200
        $this->sentryHub->captureException($e);
201
    }
202
}
203