Issues (332)

EventListener/PublishMercureUpdatesListener.php (1 issue)

1
<?php
2
3
/*
4
 * This file is part of the API Platform project.
5
 *
6
 * (c) Kévin Dunglas <[email protected]>
7
 *
8
 * For the full copyright and license information, please view the LICENSE
9
 * file that was distributed with this source code.
10
 */
11
12
declare(strict_types=1);
13
14
namespace ApiPlatform\Core\Bridge\Doctrine\EventListener;
15
16
use ApiPlatform\Core\Api\IriConverterInterface;
17
use ApiPlatform\Core\Api\ResourceClassResolverInterface;
18
use ApiPlatform\Core\Api\UrlGeneratorInterface;
19
use ApiPlatform\Core\Bridge\Symfony\Messenger\DispatchTrait;
20
use ApiPlatform\Core\Exception\InvalidArgumentException;
21
use ApiPlatform\Core\Exception\RuntimeException;
22
use ApiPlatform\Core\GraphQl\Subscription\MercureSubscriptionIriGeneratorInterface as GraphQlMercureSubscriptionIriGeneratorInterface;
23
use ApiPlatform\Core\GraphQl\Subscription\SubscriptionManagerInterface as GraphQlSubscriptionManagerInterface;
24
use ApiPlatform\Core\Metadata\Resource\Factory\ResourceMetadataFactoryInterface;
25
use ApiPlatform\Core\Util\ResourceClassInfoTrait;
26
use Doctrine\Common\EventArgs;
27
use Doctrine\ODM\MongoDB\Event\OnFlushEventArgs as MongoDbOdmOnFlushEventArgs;
28
use Doctrine\ORM\Event\OnFlushEventArgs as OrmOnFlushEventArgs;
29
use Symfony\Component\ExpressionLanguage\ExpressionLanguage;
30
use Symfony\Component\HttpFoundation\JsonResponse;
31
use Symfony\Component\Mercure\Update;
32
use Symfony\Component\Messenger\MessageBusInterface;
33
use Symfony\Component\Serializer\SerializerInterface;
34
35
/**
36
 * Publishes resources updates to the Mercure hub.
37
 *
38
 * @author Kévin Dunglas <[email protected]>
39
 *
40
 * @experimental
41
 */
42
final class PublishMercureUpdatesListener
43
{
44
    use DispatchTrait;
45
    use ResourceClassInfoTrait;
46
47
    private $iriConverter;
48
    private $serializer;
49
    private $publisher;
50
    private $expressionLanguage;
51
    private $createdObjects;
52
    private $updatedObjects;
53
    private $deletedObjects;
54
    private $formats;
55
    private $graphQlSubscriptionManager;
56
    private $graphQlMercureSubscriptionIriGenerator;
57
58
    /**
59
     * @param array<string, string[]|string> $formats
60
     */
61
    public function __construct(ResourceClassResolverInterface $resourceClassResolver, IriConverterInterface $iriConverter, ResourceMetadataFactoryInterface $resourceMetadataFactory, SerializerInterface $serializer, array $formats, MessageBusInterface $messageBus = null, callable $publisher = null, ?GraphQlSubscriptionManagerInterface $graphQlSubscriptionManager = null, ?GraphQlMercureSubscriptionIriGeneratorInterface $graphQlMercureSubscriptionIriGenerator = null, ExpressionLanguage $expressionLanguage = null)
62
    {
63
        if (null === $messageBus && null === $publisher) {
64
            throw new InvalidArgumentException('A message bus or a publisher must be provided.');
65
        }
66
67
        $this->resourceClassResolver = $resourceClassResolver;
68
        $this->iriConverter = $iriConverter;
69
        $this->resourceMetadataFactory = $resourceMetadataFactory;
70
        $this->serializer = $serializer;
71
        $this->formats = $formats;
72
        $this->messageBus = $messageBus;
73
        $this->publisher = $publisher;
74
        $this->expressionLanguage = $expressionLanguage ?? class_exists(ExpressionLanguage::class) ? new ExpressionLanguage() : null;
75
        $this->graphQlSubscriptionManager = $graphQlSubscriptionManager;
76
        $this->graphQlMercureSubscriptionIriGenerator = $graphQlMercureSubscriptionIriGenerator;
77
        $this->reset();
78
    }
79
80
    /**
81
     * Collects created, updated and deleted objects.
82
     */
83
    public function onFlush(EventArgs $eventArgs): void
84
    {
85
        if ($eventArgs instanceof OrmOnFlushEventArgs) {
86
            $uow = $eventArgs->getEntityManager()->getUnitOfWork();
87
        } elseif ($eventArgs instanceof MongoDbOdmOnFlushEventArgs) {
88
            $uow = $eventArgs->getDocumentManager()->getUnitOfWork();
89
        } else {
90
            return;
91
        }
92
93
        $methodName = $eventArgs instanceof OrmOnFlushEventArgs ? 'getScheduledEntityInsertions' : 'getScheduledDocumentInsertions';
94
        foreach ($uow->{$methodName}() as $object) {
95
            $this->storeObjectToPublish($object, 'createdObjects');
96
        }
97
98
        $methodName = $eventArgs instanceof OrmOnFlushEventArgs ? 'getScheduledEntityUpdates' : 'getScheduledDocumentUpdates';
99
        foreach ($uow->{$methodName}() as $object) {
100
            $this->storeObjectToPublish($object, 'updatedObjects');
101
        }
102
103
        $methodName = $eventArgs instanceof OrmOnFlushEventArgs ? 'getScheduledEntityDeletions' : 'getScheduledDocumentDeletions';
104
        foreach ($uow->{$methodName}() as $object) {
105
            $this->storeObjectToPublish($object, 'deletedObjects');
106
        }
107
    }
108
109
    /**
110
     * Publishes updates for changes collected on flush, and resets the store.
111
     */
112
    public function postFlush(): void
113
    {
114
        try {
115
            foreach ($this->createdObjects as $object) {
116
                $this->publishUpdate($object, $this->createdObjects[$object], 'create');
117
            }
118
119
            foreach ($this->updatedObjects as $object) {
120
                $this->publishUpdate($object, $this->updatedObjects[$object], 'update');
121
            }
122
123
            foreach ($this->deletedObjects as $object) {
124
                $this->publishUpdate($object, $this->deletedObjects[$object], 'delete');
125
            }
126
        } finally {
127
            $this->reset();
128
        }
129
    }
130
131
    private function reset(): void
132
    {
133
        $this->createdObjects = new \SplObjectStorage();
134
        $this->updatedObjects = new \SplObjectStorage();
135
        $this->deletedObjects = new \SplObjectStorage();
136
    }
137
138
    /**
139
     * @param object $object
140
     */
141
    private function storeObjectToPublish($object, string $property): void
142
    {
143
        if (null === $resourceClass = $this->getResourceClass($object)) {
144
            return;
145
        }
146
147
        $value = $this->resourceMetadataFactory->create($resourceClass)->getAttribute('mercure', false);
0 ignored issues
show
The method create() does not exist on null. ( Ignorable by Annotation )

If this is a false-positive, you can also ignore this issue in your code via the ignore-call  annotation

147
        $value = $this->resourceMetadataFactory->/** @scrutinizer ignore-call */ create($resourceClass)->getAttribute('mercure', false);

This check looks for calls to methods that do not seem to exist on a given type. It looks for the method on the type itself as well as in inherited classes or implemented interfaces.

This is most likely a typographical error or the method has been renamed.

Loading history...
148
        if (false === $value) {
149
            return;
150
        }
151
152
        if (\is_string($value)) {
153
            if (null === $this->expressionLanguage) {
154
                throw new RuntimeException('The Expression Language component is not installed. Try running "composer require symfony/expression-language".');
155
            }
156
157
            $value = $this->expressionLanguage->evaluate($value, ['object' => $object]);
158
        }
159
160
        if (true === $value) {
161
            $value = [];
162
        }
163
164
        if (!\is_array($value)) {
165
            throw new InvalidArgumentException(sprintf('The value of the "mercure" attribute of the "%s" resource class must be a boolean, an array of targets or a valid expression, "%s" given.', $resourceClass, \gettype($value)));
166
        }
167
168
        if ('deletedObjects' === $property) {
169
            $this->deletedObjects[(object) [
170
                'id' => $this->iriConverter->getIriFromItem($object),
171
                'iri' => $this->iriConverter->getIriFromItem($object, UrlGeneratorInterface::ABS_URL),
172
            ]] = $value;
173
174
            return;
175
        }
176
177
        $this->{$property}[$object] = $value;
178
    }
179
180
    /**
181
     * @param object $object
182
     */
183
    private function publishUpdate($object, array $targets, string $type): void
184
    {
185
        if ($object instanceof \stdClass) {
186
            // By convention, if the object has been deleted, we send only its IRI.
187
            // This may change in the feature, because it's not JSON Merge Patch compliant,
188
            // and I'm not a fond of this approach.
189
            $iri = $object->iri;
190
            /** @var string $data */
191
            $data = json_encode(['@id' => $object->id]);
192
        } else {
193
            $resourceClass = $this->getObjectClass($object);
194
            $context = $this->resourceMetadataFactory->create($resourceClass)->getAttribute('normalization_context', []);
195
196
            $iri = $this->iriConverter->getIriFromItem($object, UrlGeneratorInterface::ABS_URL);
197
            $data = $this->serializer->serialize($object, key($this->formats), $context);
198
        }
199
200
        $updates = array_merge([new Update($iri, $data, $targets)], $this->getGraphQlSubscriptionUpdates($object, $targets, $type));
201
202
        foreach ($updates as $update) {
203
            $this->messageBus ? $this->dispatch($update) : ($this->publisher)($update);
204
        }
205
    }
206
207
    /**
208
     * @param object $object
209
     *
210
     * @return Update[]
211
     */
212
    private function getGraphQlSubscriptionUpdates($object, array $targets, string $type): array
213
    {
214
        if ('update' !== $type || !$this->graphQlSubscriptionManager || !$this->graphQlMercureSubscriptionIriGenerator) {
215
            return [];
216
        }
217
218
        $payloads = $this->graphQlSubscriptionManager->getPushPayloads($object);
219
220
        $updates = [];
221
        foreach ($payloads as [$subscriptionId, $data]) {
222
            $updates[] = new Update(
223
                $this->graphQlMercureSubscriptionIriGenerator->generateTopicIri($subscriptionId),
224
                (string) (new JsonResponse($data))->getContent(),
225
                $targets
226
            );
227
        }
228
229
        return $updates;
230
    }
231
}
232