EndpointConfigurator::useContainer()   A
last analyzed

Complexity

Conditions 1
Paths 1

Size

Total Lines 4

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 3
CRAP Score 1

Importance

Changes 0
Metric Value
dl 0
loc 4
ccs 3
cts 3
cp 1
rs 10
c 0
b 0
f 0
cc 1
nc 1
nop 1
crap 1
1
<?php
2
namespace PSB\Core;
3
4
5
use Acclimate\Container\ContainerAcclimator;
6
use PSB\Core\Correlation\MessageCorrelationFeature;
7
use PSB\Core\ErrorHandling\ErrorLastResort\ErrorLastResortFeature;
8
use PSB\Core\ErrorHandling\FirstLevelRetry\FirstLevelRetryFeature;
9
use PSB\Core\Exception\UnexpectedValueException;
10
use PSB\Core\Feature\FeatureSettingsExtensions;
11
use PSB\Core\Feature\RootFeature;
12
use PSB\Core\MessageMutation\IncomingMessageMutationFeature;
13
use PSB\Core\MessageMutation\OutgoingMessageMutationFeature;
14
use PSB\Core\ObjectBuilder\Builder;
15
use PSB\Core\ObjectBuilder\BuilderInterface;
16
use PSB\Core\ObjectBuilder\Container;
17
use PSB\Core\Outbox\OutboxFeature;
18
use PSB\Core\Persistence\EnabledPersistence;
19
use PSB\Core\Persistence\InMemory\InMemoryPersistenceDefinition;
20
use PSB\Core\Persistence\InMemory\Outbox\InMemoryOutboxPersistenceFeature;
21
use PSB\Core\Persistence\PersistenceDefinition;
22
use PSB\Core\Persistence\PersistenceDefinitionApplier;
23
use PSB\Core\Persistence\StorageType;
24
use PSB\Core\Pipeline\BusOperations;
25
use PSB\Core\Pipeline\BusOperationsContextFactory;
26
use PSB\Core\Pipeline\Incoming\IncomingContextFactory;
27
use PSB\Core\Pipeline\Incoming\StageContext\TransportReceiveContext;
28
use PSB\Core\Pipeline\IncomingPipelineFeature;
29
use PSB\Core\Pipeline\Outgoing\OutgoingContextFactory;
30
use PSB\Core\Pipeline\OutgoingPipelineFeature;
31
use PSB\Core\Pipeline\PipelineFactory;
32
use PSB\Core\Pipeline\PipelineModifications;
33
use PSB\Core\Pipeline\PipelineRootStageContext;
34
use PSB\Core\Pipeline\StepChainBuilderFactory;
35
use PSB\Core\Routing\AutoSubscription\AutoSubscribeFeature;
36
use PSB\Core\Routing\RoutingFeature;
37
use PSB\Core\Routing\UnicastRoutingTable;
38
use PSB\Core\Serialization\Json\JsonSerializationDefinition;
39
use PSB\Core\Serialization\SerializationConfigurator;
40
use PSB\Core\Serialization\SerializationDefinition;
41
use PSB\Core\Serialization\SerializationFeature;
42
use PSB\Core\Transport\Config\TransportDefinition;
43
use PSB\Core\Transport\InboundTransport;
44
use PSB\Core\Transport\MessagePusherInterface;
45
use PSB\Core\Transport\OutboundTransport;
46
use PSB\Core\Transport\PushPipe;
47
use PSB\Core\Transport\PushSettings;
48
use PSB\Core\Transport\QueueBindings;
49
use PSB\Core\Transport\RabbitMq\Config\RabbitMqTransportDefinition;
50
use PSB\Core\Transport\ReceivingFeature;
51
use PSB\Core\Transport\SendingFeature;
52
use PSB\Core\Transport\TransportFeature;
53
use PSB\Core\Transport\TransportReceiveContextFactory;
54
use PSB\Core\Transport\TransportReceiver;
55
use PSB\Core\Util\Clock\ClockInterface;
56
use PSB\Core\Util\Clock\SystemClock;
57
use PSB\Core\Util\Guard;
58
use PSB\Core\Util\Settings;
59
use PSB\Core\UuidGeneration\Comb\CombUuidGenerationDefinition;
60
use PSB\Core\UuidGeneration\UuidGenerationConfigurator;
61
use PSB\Core\UuidGeneration\UuidGenerationDefinition;
62
use PSB\Core\UuidGeneration\UuidGeneratorInterface;
63
64
class EndpointConfigurator
65
{
66
    /**
67
     * @var Settings
68
     */
69
    private $settings;
70
71
    /**
72
     * @var string
73
     */
74
    private $endpointName;
75
76
    /**
77
     * @var MessageHandlerRegistry
78
     */
79
    private $messageHandlerRegistry;
80
81
    /**
82
     * @var UnicastRoutingTable
83
     */
84
    private $unicastRoutingTable;
85
86
    /**
87
     * @var MessageMutatorRegistry
88
     */
89
    private $messageMutatorRegistry;
90
91
    /**
92
     * @var PipelineModifications
93
     */
94
    private $pipelineModifications;
95
96
    /**
97
     * @param string                 $endpointName
98
     * @param Settings               $settings
99
     * @param MessageHandlerRegistry $messageHandlerRegistry
100
     * @param UnicastRoutingTable    $unicastRoutingTable
101
     * @param MessageMutatorRegistry $messageMutatorRegistry
102
     * @param PipelineModifications  $pipelineModifications
103
     */
104 25
    public function __construct(
105
        $endpointName,
106
        Settings $settings,
107
        MessageHandlerRegistry $messageHandlerRegistry,
108
        UnicastRoutingTable $unicastRoutingTable,
109
        MessageMutatorRegistry $messageMutatorRegistry,
110
        PipelineModifications $pipelineModifications
111
    ) {
112 25
        Guard::againstNullAndEmpty('name', $endpointName);
113 25
        $this->endpointName = $endpointName;
114 25
        $this->settings = $settings;
115 25
        $this->messageHandlerRegistry = $messageHandlerRegistry;
116 25
        $this->unicastRoutingTable = $unicastRoutingTable;
117 25
        $this->messageMutatorRegistry = $messageMutatorRegistry;
118 25
        $this->pipelineModifications = $pipelineModifications;
119 25
    }
120
121
    /**
122
     * @param string $endpointName
123
     *
124
     * @return EndpointConfigurator
125
     */
126 1
    public static function create($endpointName)
127
    {
128 1
        return new self(
129 1
            $endpointName,
130 1
            new Settings(),
131 1
            new MessageHandlerRegistry(),
132 1
            new UnicastRoutingTable(),
133 1
            new MessageMutatorRegistry(),
134 1
            new PipelineModifications()
135
        );
136
    }
137
138
    /**
139
     * @param string $endpointName
140
     */
141
    public function setEndpointName($endpointName)
142
    {
143
        $this->endpointName = $endpointName;
144
    }
145
146
    /**
147
     * @param PersistenceDefinition $definition
148
     * @param StorageType|null      $storageType
149
     *
150
     * @return Persistence\PersistenceConfigurator
151
     */
152 2
    public function usePersistence(PersistenceDefinition $definition, StorageType $storageType = null)
153
    {
154 2
        $enabledPersistences = $this->settings->tryGet(KnownSettingsEnum::ENABLED_PERSISTENCES) ?: [];
155
156 2
        $enabledPersistences[] = new EnabledPersistence($definition, $storageType);
157
158 2
        $this->settings->set(KnownSettingsEnum::ENABLED_PERSISTENCES, $enabledPersistences);
159
160 2
        return $definition->createConfigurator($this->settings);
161
    }
162
163
    public function clearPersistences()
164
    {
165
        $this->settings->set(KnownSettingsEnum::ENABLED_PERSISTENCES, []);
166
    }
167
168
    /**
169
     * @param TransportDefinition $definition
170
     *
171
     * @return Transport\Config\TransportConfigurator
172
     */
173 1
    public function useTransport(TransportDefinition $definition)
174
    {
175 1
        $this->settings->set(TransportDefinition::class, $definition);
176 1
        $this->settings->set(InboundTransport::class, new InboundTransport());
177 1
        $this->settings->set(OutboundTransport::class, new OutboundTransport());
178
179 1
        return $definition->createConfigurator($this->settings);
180
    }
181
182
    /**
183
     * @param SerializationDefinition $definition
184
     *
185
     * @return SerializationConfigurator
186
     */
187 1
    public function useSerialization(SerializationDefinition $definition)
188
    {
189 1
        $this->settings->set(SerializationDefinition::class, $definition);
190
191 1
        return $definition->createConfigurator($this->settings);
192
    }
193
194
    /**
195
     * @param UuidGenerationDefinition $definition
196
     *
197
     * @return UuidGenerationConfigurator
198
     */
199 1
    public function useUuidGeneration(UuidGenerationDefinition $definition)
200
    {
201 1
        $this->settings->set(UuidGenerationDefinition::class, $definition);
202
203 1
        return $definition->createConfigurator($this->settings);
204
    }
205
206
    /**
207
     * Using your own container allows you to register handlers as services.
208
     * Supported containers are the same as those supported by jeremeamia/acclimate-container.
209
     *
210
     * @param mixed $container
211
     */
212 1
    public function useContainer($container)
213
    {
214 1
        $this->settings->set(KnownSettingsEnum::CONTAINER, $container);
215 1
    }
216
217
    /**
218
     * @param string $eventFqcn
219
     * @param string $handlerContainerId
220
     */
221 1
    public function registerEventHandler($eventFqcn, $handlerContainerId)
222
    {
223 1
        $this->messageHandlerRegistry->registerEventHandler($eventFqcn, $handlerContainerId);
224 1
    }
225
226
    /**
227
     * @param string $commandFqcn
228
     * @param string $handlerContainerId
229
     */
230 1
    public function registerCommandHandler($commandFqcn, $handlerContainerId)
231
    {
232 1
        $this->messageHandlerRegistry->registerCommandHandler($commandFqcn, $handlerContainerId);
233 1
    }
234
235
    /**
236
     * @param string $mutatorContainerId
237
     */
238 1
    public function registerIncomingLogicalMessageMutator($mutatorContainerId)
239
    {
240 1
        $this->messageMutatorRegistry->registerIncomingLogicalMessageMutator($mutatorContainerId);
241 1
    }
242
243
    /**
244
     * @param string $mutatorContainerId
245
     */
246 1
    public function registerIncomingPhysicalMessageMutator($mutatorContainerId)
247
    {
248 1
        $this->messageMutatorRegistry->registerIncomingPhysicalMessageMutator($mutatorContainerId);
249 1
    }
250
251
    /**
252
     * @param string $mutatorContainerId
253
     */
254 1
    public function registerOutgoingLogicalMessageMutator($mutatorContainerId)
255
    {
256 1
        $this->messageMutatorRegistry->registerOutgoingLogicalMessageMutator($mutatorContainerId);
257 1
    }
258
259
    /**
260
     * @param string $mutatorContainerId
261
     */
262 1
    public function registerOutgoingPhysicalMessageMutator($mutatorContainerId)
263
    {
264 1
        $this->messageMutatorRegistry->registerOutgoingPhysicalMessageMutator($mutatorContainerId);
265 1
    }
266
267
    /**
268
     * It allows you to configure which endpoints should receive a command message.
269
     * Command messages are those being sent via ->send and not via ->publish.
270
     *
271
     * @param string $messageFqcn
272
     * @param string $endpointName
273
     */
274 1
    public function registerCommandRoutingRule($messageFqcn, $endpointName)
275
    {
276 1
        $this->unicastRoutingTable->routeToEndpoint($messageFqcn, $endpointName);
277 1
    }
278
279
    /**
280
     * @param string $stepId
281
     */
282 1
    public function removePipelineStep($stepId)
283
    {
284 1
        $this->pipelineModifications->removeStep($stepId);
285 1
    }
286
287
    /**
288
     * @param string        $stepId
289
     * @param string        $stepFqcn
290
     * @param callable|null $factory
291
     * @param string|null   $description
292
     */
293 1
    public function replacePipelineStep($stepId, $stepFqcn, callable $factory = null, $description = null)
294
    {
295 1
        $this->pipelineModifications->replaceStep($stepId, $stepFqcn, $factory, $description);
296 1
    }
297
298
    /**
299
     * @param string        $stepId
300
     * @param string        $stepFqcn
301
     * @param callable|null $factory
302
     * @param string|null   $description
303
     *
304
     * @return Pipeline\StepRegistration
305
     */
306 1
    public function registerPipelineStep($stepId, $stepFqcn, callable $factory = null, $description = null)
307
    {
308 1
        return $this->pipelineModifications->registerStep($stepId, $stepFqcn, $factory, $description);
309
    }
310
311
    /**
312
     * Installers are supposed to be tasks that only need to be ran once per deployment, like creating endpoint queues,
313
     * persistence related database tables, etc. They are disabled by default and they need to be explicitly enabled
314
     * if needed.
315
     */
316 1
    public function enableInstallers()
317
    {
318 1
        $this->settings->set(KnownSettingsEnum::INSTALLERS_ENABLED, true);
319 1
    }
320
321
    public function disableInstallers()
322
    {
323
        $this->settings->set(KnownSettingsEnum::INSTALLERS_ENABLED, false);
324
    }
325
326
    /**
327
     * @return bool
328
     */
329
    public function areInstallersEnabled()
330
    {
331
        return $this->settings->tryGet(KnownSettingsEnum::INSTALLERS_ENABLED) ?: false;
332
    }
333
334
    /**
335
     * Putting the endpoint in send only mode disables any message receiving capabilities.
336
     * This also means that the endpoint will no longer block waiting for messages when started.
337
     */
338 1
    public function enableSendOnly()
339
    {
340 1
        $this->settings->set(KnownSettingsEnum::SEND_ONLY, true);
341 1
    }
342
343
    /**
344
     * @return bool
345
     */
346
    public function isSendOnly()
347
    {
348
        return $this->settings->tryGet(KnownSettingsEnum::SEND_ONLY) ?: false;
349
    }
350
351
    /**
352
     * @param string $errorQueue
353
     */
354
    public function sendFailedMessagesTo($errorQueue)
355
    {
356
        $this->settings->set(KnownSettingsEnum::ERROR_QUEUE, $errorQueue);
357
    }
358
359 1
    public function enableDurableMessaging()
360
    {
361 1
        $this->settings->set(KnownSettingsEnum::DURABLE_MESSAGING_ENABLED, true);
362 1
    }
363
364 1
    public function disableDurableMessaging()
365
    {
366 1
        $this->settings->set(KnownSettingsEnum::DURABLE_MESSAGING_ENABLED, false);
367 1
    }
368
369
    /**
370
     * @param int $days
371
     */
372 1
    public function setDaysToKeepOutboxDeduplicationData($days)
373
    {
374 1
        $this->settings->set(KnownSettingsEnum::DAYS_TO_KEEP_DEDUPLICATION_DATA, $days);
375 1
    }
376
377
    /**
378
     * @param string $featureFqcn
379
     */
380 1
    public function enableFeature($featureFqcn)
381
    {
382 1
        FeatureSettingsExtensions::enableFeature($featureFqcn, $this->settings);
383 1
    }
384
385
    /**
386
     * @param string $featureFqcn
387
     */
388 1
    public function disableFeature($featureFqcn)
389
    {
390 1
        FeatureSettingsExtensions::disableFeature($featureFqcn, $this->settings);
391 1
    }
392
393
    /**
394
     * @param int $maxRetries
395
     */
396
    public function setMaxFirstLevelRetries($maxRetries)
397
    {
398
        Guard::againstNullAndNonInt('maxRetries', $maxRetries);
399
        $this->settings->set(KnownSettingsEnum::MAX_FLR_RETRIES, (int)$maxRetries);
400
    }
401
402
    /**
403
     * @return StartableEndpoint
404
     */
405
    public function build()
406
    {
407
        $externalContainer = $this->settings->tryGet(KnownSettingsEnum::CONTAINER);
408
        $container = new Container();
409
        $acclimator = new ContainerAcclimator();
410
        $adaptedExternalContainer = $externalContainer ? $acclimator->acclimate($externalContainer) : null;
411
        $builder = new Builder($container, $adaptedExternalContainer);
412
        $container[BuilderInterface::class] = $builder;
413
414
        $this->ensureTransportConfigured();
415
        $this->ensureOutboxPersistenceConfigured();
416
        $this->ensureSerializationConfigured();
417
        $this->ensureUuidGenerationConfigured();
418
        $this->registerKnownFeatures();
419
420
        $this->settings->setDefault(KnownSettingsEnum::ENDPOINT_NAME, $this->endpointName);
421
        $this->settings->setDefault(KnownSettingsEnum::SEND_ONLY, false);
422
        $this->settings->set(QueueBindings::class, new QueueBindings());
423
424
        $persistenceDefinitionApplier = new PersistenceDefinitionApplier();
425
426
        $this->registerBaseContainerServices($container);
427
428
        return new StartableEndpoint(
429
            $this->settings,
430
            $persistenceDefinitionApplier,
431
            $builder,
432
            $builder->build(PipelineModifications::class),
433
            $builder->build(BusContext::class)
434
        );
435
    }
436
437
    private function ensureTransportConfigured()
438
    {
439
        if (!$this->settings->tryGet(TransportDefinition::class)) {
440
            $this->useTransport(new RabbitMqTransportDefinition());
441
        }
442
    }
443
444
    private function ensureOutboxPersistenceConfigured()
445
    {
446
        $isConfigured = false;
447
        /** @var EnabledPersistence[] $enabledPersistences */
448
        $enabledPersistences = $this->settings->tryGet(KnownSettingsEnum::ENABLED_PERSISTENCES) ?: [];
449
        foreach ($enabledPersistences as $enabledPersistence) {
450
            $selectedStorageType = $enabledPersistence->getSelectedStorageType();
451
            if (!$selectedStorageType
452
                || $selectedStorageType && $selectedStorageType->equals(StorageType::OUTBOX())
453
            ) {
454
                $isConfigured = true;
455
            }
456
        }
457
458
        if (!$isConfigured) {
459
            $this->usePersistence(new InMemoryPersistenceDefinition(), StorageType::OUTBOX());
460
        }
461
    }
462
463
    private function ensureSerializationConfigured()
464
    {
465
        if (!$this->settings->tryGet(SerializationDefinition::class)) {
466
            $this->useSerialization(new JsonSerializationDefinition());
467
        }
468
    }
469
470
    private function ensureUuidGenerationConfigured()
471
    {
472
        if (!$this->settings->tryGet(UuidGenerationDefinition::class)) {
473
            $this->useUuidGeneration(new CombUuidGenerationDefinition());
474
        }
475
    }
476
477
    private function registerBaseContainerServices(Container $c)
478
    {
479
        $c[Settings::class] = $this->settings;
480
        $c[MessageHandlerRegistry::class] = $this->messageHandlerRegistry;
481
        $c[MessageMutatorRegistry::class] = $this->messageMutatorRegistry;
482
        $c[UnicastRoutingTable::class] = $this->unicastRoutingTable;
483
        $c[PipelineModifications::class] = $this->pipelineModifications;
484
485
        /** @var UuidGenerationDefinition $uuidDefinition */
486
        $uuidDefinition = $this->settings->get(UuidGenerationDefinition::class);
487
        $c[UuidGeneratorInterface::class] = $uuidDefinition->formalize($this->settings);
488
489
        $c[PipelineFactory::class] = new PipelineFactory($c[BuilderInterface::class], new StepChainBuilderFactory());
490
        $c[BusOperationsContextFactory::class] = new BusOperationsContextFactory();
491
        $c[BusOperations::class] = new BusOperations(
492
            $c[PipelineFactory::class],
493
            $c[BusOperationsContextFactory::class],
494
            $c[PipelineModifications::class],
495
            $c[UuidGeneratorInterface::class]
496
        );
497
        $c[OutgoingOptionsFactory::class] = new OutgoingOptionsFactory();
498
        $c[BusContext::class] = new BusContext(
499
            new PipelineRootStageContext($c[BuilderInterface::class]),
500
            $c[BusOperations::class],
501
            $c[OutgoingOptionsFactory::class]
502
        );
503
504
        $c[IncomingContextFactory::class] = function ($c) {
505
            return new IncomingContextFactory($c[BusOperations::class], $c[OutgoingOptionsFactory::class]);
506
        };
507
        $c[OutgoingContextFactory::class] = function () {
508
            return new OutgoingContextFactory();
509
        };
510
511
        $c[ClockInterface::class] = function () {
512
            return new SystemClock();
513
        };
514
        $c[DateTimeConverter::class] = function () {
515
            return new DateTimeConverter();
516
        };
517
518
        $c[PushPipe::class] = function ($c) {
519
            /** @var PipelineFactory $pipelineFactory */
520
            $pipelineFactory = $c[PipelineFactory::class];
521
            return new PushPipe(
522
                new TransportReceiveContextFactory($c[BuilderInterface::class]),
523
                $pipelineFactory->createStartingWith(
524
                    TransportReceiveContext::class,
525
                    $c[PipelineModifications::class]
526
                )
527
            );
528
        };
529
        $c[PushSettings::class] = function () {
530
            $errorQueue = $this->settings->tryGet(KnownSettingsEnum::ERROR_QUEUE);
531
            if (!$errorQueue) {
532
                throw new UnexpectedValueException(
533
                    "The error queue needs to be set. You can do it using endpointConfigurator.sendFailedMessagesTo."
534
                );
535
            }
536
            return new PushSettings(
537
                $this->settings->get(KnownSettingsEnum::LOCAL_ADDRESS),
538
                $this->settings->get(KnownSettingsEnum::ERROR_QUEUE),
539
                $this->settings->tryGet(KnownSettingsEnum::PURGE_ON_STARTUP) ?: false
540
            );
541
        };
542
        $c[TransportReceiver::class] = function ($c) {
543
            return new TransportReceiver(
544
                $c[MessagePusherInterface::class],
545
                $c[PushSettings::class],
546
                $c[PushPipe::class]
547
            );
548
        };
549
    }
550
551
    private function registerKnownFeatures()
552
    {
553
        $featureFqcnList = [
554
            RootFeature::class,
555
            TransportFeature::class,
556
            SendingFeature::class,
557
            ReceivingFeature::class,
558
            SerializationFeature::class,
559
            RoutingFeature::class,
560
            AutoSubscribeFeature::class,
561
            OutgoingPipelineFeature::class,
562
            IncomingPipelineFeature::class,
563
            OutboxFeature::class,
564
            InMemoryOutboxPersistenceFeature::class,
565
            OutgoingMessageMutationFeature::class,
566
            IncomingMessageMutationFeature::class,
567
            ErrorLastResortFeature::class,
568
            FirstLevelRetryFeature::class,
569
            MessageCorrelationFeature::class,
570
        ];
571
572
        foreach ($featureFqcnList as $featureFqcn) {
573
            FeatureSettingsExtensions::registerFeature($featureFqcn, $this->settings);
574
        }
575
    }
576
}
577