Issues (22)

MessageBus/TransactionalMessageBus.php (6 issues)

Labels
Severity
1
<?php
2
3
declare(strict_types=1);
4
5
/**
6
 * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
7
 * EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
8
 * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT.
9
 *
10
 * Copyright (c) 2024 Mykhailo Shtanko [email protected]
11
 *
12
 * For the full copyright and license information, please view the LICENSE.MD
13
 * file that was distributed with this source code.
14
 */
15
16
namespace FRZB\Component\TransactionalMessenger\MessageBus;
17
18
use Fp\Collections\ArrayList;
19
use FRZB\Component\TransactionalMessenger\Enum\CommitType;
20
use FRZB\Component\TransactionalMessenger\Event\DispatchFailedEvent;
21
use FRZB\Component\TransactionalMessenger\Event\DispatchSucceedEvent;
22
use FRZB\Component\TransactionalMessenger\Exception\MessageBusException;
23
use FRZB\Component\TransactionalMessenger\Helper\EnvelopeHelper;
24
use FRZB\Component\TransactionalMessenger\Helper\TransactionHelper;
25
use FRZB\Component\TransactionalMessenger\Storage\Storage;
26
use FRZB\Component\TransactionalMessenger\Storage\StorageInterface;
27
use FRZB\Component\TransactionalMessenger\ValueObject\FailedEnvelope;
0 ignored issues
show
The type FRZB\Component\Transacti...ueObject\FailedEnvelope was not found. Maybe you did not declare it correctly or list all dependencies?

The issue could also be caused by a filter entry in the build configuration. If the path has been excluded in your configuration, e.g. excluded_paths: ["lib/*"], you can move it to the dependency path list as follows:

filter:
    dependency_paths: ["lib/*"]

For further information see https://scrutinizer-ci.com/docs/tools/php/php-scrutinizer/#list-dependency-paths

Loading history...
28
use FRZB\Component\TransactionalMessenger\ValueObject\PendingEnvelope;
0 ignored issues
show
The type FRZB\Component\Transacti...eObject\PendingEnvelope was not found. Maybe you did not declare it correctly or list all dependencies?

The issue could also be caused by a filter entry in the build configuration. If the path has been excluded in your configuration, e.g. excluded_paths: ["lib/*"], you can move it to the dependency path list as follows:

filter:
    dependency_paths: ["lib/*"]

For further information see https://scrutinizer-ci.com/docs/tools/php/php-scrutinizer/#list-dependency-paths

Loading history...
29
use FRZB\Component\TransactionalMessenger\ValueObject\SucceedEnvelope;
0 ignored issues
show
The type FRZB\Component\Transacti...eObject\SucceedEnvelope was not found. Maybe you did not declare it correctly or list all dependencies?

The issue could also be caused by a filter entry in the build configuration. If the path has been excluded in your configuration, e.g. excluded_paths: ["lib/*"], you can move it to the dependency path list as follows:

filter:
    dependency_paths: ["lib/*"]

For further information see https://scrutinizer-ci.com/docs/tools/php/php-scrutinizer/#list-dependency-paths

Loading history...
30
use Psr\EventDispatcher\EventDispatcherInterface;
31
use Symfony\Component\DependencyInjection\Attribute\AsDecorator;
32
use Symfony\Component\DependencyInjection\Attribute\Autoconfigure;
33
use Symfony\Component\Messenger\Envelope;
34
use Symfony\Component\Messenger\MessageBusInterface;
35
use Symfony\Contracts\EventDispatcher\Event;
36
37
#[Autoconfigure]
38
#[AsDecorator(MessageBusInterface::class)]
39
final class TransactionalMessageBus implements TransactionalMessageBusInterface
40
{
41
    /** @var StorageInterface<PendingEnvelope> */
42
    private readonly StorageInterface $pendingStorage;
43
44
    /** @var StorageInterface<SucceedEnvelope> */
45
    private readonly StorageInterface $succeedStorage;
46
47
    /** @var StorageInterface<FailedEnvelope> */
48
    private readonly StorageInterface $failedStorage;
49
50
    public function __construct(
51
        private readonly MessageBusInterface $decoratedBus,
52
        private readonly EventDispatcherInterface $eventDispatcher,
53
        ?Storage $pendingStorage = null,
54
        ?Storage $succeedStorage = null,
55
        ?Storage $failedStorage = null,
56
    ) {
57
        $this->pendingStorage = $pendingStorage ?? new Storage();
0 ignored issues
show
The property pendingStorage is declared read-only in FRZB\Component\Transacti...TransactionalMessageBus.
Loading history...
58
        $this->succeedStorage = $succeedStorage ?? new Storage();
0 ignored issues
show
The property succeedStorage is declared read-only in FRZB\Component\Transacti...TransactionalMessageBus.
Loading history...
59
        $this->failedStorage = $failedStorage ?? new Storage();
0 ignored issues
show
The property failedStorage is declared read-only in FRZB\Component\Transacti...TransactionalMessageBus.
Loading history...
60
    }
61
62
    public function dispatch(object $message, array $stamps = []): Envelope
63
    {
64
        $envelope = EnvelopeHelper::wrap($message);
65
66
        TransactionHelper::isTransactional($message)
67
            ? $this->pendingStorage->append(new PendingEnvelope($envelope))
68
            : $this->dispatchEnvelope($envelope);
69
70
        return $envelope;
71
    }
72
73
    public function commit(CommitType ...$commitTypes): void
74
    {
75
        try {
76
            $this->dispatchPendingEnvelopes(...$commitTypes);
77
            $this->dispatchSucceedEnvelopes();
78
            $this->dispatchFailedEnvelopes();
79
        } catch (\Throwable $e) {
80
            throw MessageBusException::fromThrowable($e);
81
        }
82
    }
83
84
    public function rollback(\Throwable $exception): void
85
    {
86
        ArrayList::collect($this->pendingStorage->iterate())
87
            ->map(static fn (PendingEnvelope $pe) => new FailedEnvelope($pe->envelope, $exception))
88
            ->tap(fn (FailedEnvelope $fe) => $this->failedStorage->append($fe))
89
        ;
90
91
        try {
92
            $this->dispatchFailedEnvelopes();
93
        } catch (\Throwable $e) {
94
            throw MessageBusException::fromThrowable($e);
95
        } finally {
96
            $this->pendingStorage->clear();
97
            $this->succeedStorage->clear();
98
            $this->failedStorage->clear();
99
        }
100
    }
101
102
    private function dispatchPendingEnvelopes(CommitType ...$commitTypes): void
103
    {
104
        ArrayList::collect($this->pendingStorage->iterate())->tap(
105
            fn (PendingEnvelope $envelope) => $envelope->isTransactional(...$commitTypes)
106
                ? $this->dispatchEnvelope($envelope->envelope)
107
                : $this->pendingStorage->prepend($envelope),
108
        );
109
    }
110
111
    private function dispatchSucceedEnvelopes(): void
112
    {
113
        ArrayList::collect($this->succeedStorage->iterate())
114
            ->tap(fn (SucceedEnvelope $envelope) => $this->dispatchEvent(new DispatchSucceedEvent($envelope)))
115
        ;
116
    }
117
118
    private function dispatchFailedEnvelopes(): void
119
    {
120
        ArrayList::collect($this->failedStorage->iterate())
121
            ->tap(fn (FailedEnvelope $envelope) => $this->dispatchEvent(new DispatchFailedEvent($envelope)))
122
        ;
123
    }
124
125
    private function dispatchEnvelope(Envelope $envelope): Envelope
126
    {
127
        try {
128
            $this->succeedStorage->append(new SucceedEnvelope($this->decoratedBus->dispatch($envelope)));
129
        } catch (\Throwable $e) {
130
            $this->failedStorage->append(new FailedEnvelope($envelope, $e));
131
        } finally {
132
            return $envelope;
133
        }
134
    }
135
136
    private function dispatchEvent(Event $event): Event
137
    {
138
        $this->eventDispatcher->dispatch($event);
139
140
        return $event;
141
    }
142
}
143