DispatchMessageHandler::getObjectManager()   A
last analyzed

Complexity

Conditions 3
Paths 3

Size

Total Lines 14
Code Lines 7

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 0
CRAP Score 12

Importance

Changes 1
Bugs 0 Features 0
Metric Value
eloc 7
c 1
b 0
f 0
dl 0
loc 14
ccs 0
cts 8
cp 0
rs 10
cc 3
nc 3
nop 1
crap 12
1
<?php
2
3
declare(strict_types=1);
4
5
namespace Setono\MessageSchedulerBundle\Message\Handler;
6
7
use const DATE_ATOM;
8
use Doctrine\Persistence\ManagerRegistry;
9
use Doctrine\Persistence\ObjectManager;
10
use InvalidArgumentException;
11
use RuntimeException;
12
use Safe\DateTime;
13
use function Safe\ini_set;
14
use function Safe\sprintf;
15
use Setono\MessageSchedulerBundle\Entity\ScheduledMessage;
16
use Setono\MessageSchedulerBundle\Message\Command\DispatchScheduledMessage;
17
use Setono\MessageSchedulerBundle\Repository\ScheduledMessageRepositoryInterface;
18
use Setono\MessageSchedulerBundle\Workflow\ScheduledMessageWorkflow;
19
use Symfony\Component\Messenger\Envelope;
20
use Symfony\Component\Messenger\Exception\UnrecoverableMessageHandlingException;
21
use Symfony\Component\Messenger\Handler\MessageHandlerInterface;
22
use Symfony\Component\Messenger\RoutableMessageBus;
23
use Symfony\Component\Messenger\Stamp\BusNameStamp;
24
use Symfony\Component\Workflow\Registry;
25
use Throwable;
26
27
final class DispatchMessageHandler implements MessageHandlerInterface
28
{
29
    private ?ObjectManager $objectManager = null;
30
31
    private RoutableMessageBus $routableMessageBus;
32
33
    private ScheduledMessageRepositoryInterface $scheduledMessageRepository;
34
35
    private Registry $workflowRegistry;
36
37
    private ManagerRegistry $managerRegistry;
38
39 1
    public function __construct(
40
        RoutableMessageBus $routableMessageBus,
41
        ScheduledMessageRepositoryInterface $scheduledMessageRepository,
42
        Registry $workflowRegistry,
43
        ManagerRegistry $managerRegistry
44
    ) {
45 1
        $this->routableMessageBus = $routableMessageBus;
46 1
        $this->scheduledMessageRepository = $scheduledMessageRepository;
47 1
        $this->workflowRegistry = $workflowRegistry;
48 1
        $this->managerRegistry = $managerRegistry;
49 1
    }
50
51
    public function __invoke(DispatchScheduledMessage $message): void
52
    {
53
        /** @var ScheduledMessage|null $scheduledMessage */
54
        $scheduledMessage = $this->scheduledMessageRepository->find($message->getScheduledMessageId());
55
        if (null === $scheduledMessage) {
56
            throw new UnrecoverableMessageHandlingException(sprintf(
57
                'The scheduled message with id, "%s" does not exist', $message->getScheduledMessageId()
58
            ));
59
        }
60
61
        $scheduledMessage->resetErrors();
62
63
        $now = new DateTime();
64
        if ($scheduledMessage->getDispatchAt() > $now) {
65
            throw new UnrecoverableMessageHandlingException(sprintf(
66
                'The scheduled message with id, "%s" is not eligible to be dispatched yet. The dispatch timestamp is %s, while the time now is %s',
67
                $message->getScheduledMessageId(), $scheduledMessage->getDispatchAt()->format(DATE_ATOM), $now->format(DATE_ATOM)
68
            ));
69
        }
70
71
        $workflow = $this->workflowRegistry->get($scheduledMessage, ScheduledMessageWorkflow::NAME);
72
        if (!$workflow->can($scheduledMessage, ScheduledMessageWorkflow::TRANSITION_PROCESS)) {
73
            throw new UnrecoverableMessageHandlingException(sprintf(
74
                'The scheduled message with id, "%s" could not enter the transition "%s". The state was: "%s"',
75
                $message->getScheduledMessageId(), ScheduledMessageWorkflow::TRANSITION_PROCESS, $scheduledMessage->getState()
76
            ));
77
        }
78
        $workflow->apply($scheduledMessage, ScheduledMessageWorkflow::TRANSITION_PROCESS);
79
80
        $objectManager = $this->getObjectManager($scheduledMessage);
81
        $objectManager->flush();
82
83
        $messageToBeDispatched = $this->unserialize($scheduledMessage->getSerializedMessage());
84
85
        $stamps = [];
86
87
        $bus = $scheduledMessage->getBus();
88
        if (null !== $bus) {
89
            $stamps[] = new BusNameStamp($bus);
90
        }
91
92
        try {
93
            // notice that this try-catch block won't handle (and therefore log) any errors happening,
94
            // if the $messageToBeDispatched is routed on an async transport
95
            $this->routableMessageBus->dispatch(new Envelope($messageToBeDispatched, $stamps));
96
97
            $workflow->apply($scheduledMessage, ScheduledMessageWorkflow::TRANSITION_SUCCEED);
98
            $objectManager->flush();
99
        } catch (Throwable $e) {
100
            $originalException = $e;
101
102
            $workflow->apply($scheduledMessage, ScheduledMessageWorkflow::TRANSITION_FAIL);
103
104
            do {
105
                $scheduledMessage->addError($e->getMessage());
106
                $e = $e->getPrevious();
107
            } while (null !== $e);
108
109
            $objectManager->flush();
110
111
            throw $originalException;
112
        }
113
    }
114
115
    /**
116
     * This code is taken from \Symfony\Component\Messenger\Transport\Serialization\PhpSerializer
117
     *
118
     * todo extract to a separate library
119
     */
120
    private function unserialize(string $str): object
121
    {
122
        $signalingException = new InvalidArgumentException(sprintf('Could not decode message using PHP serialization: %s.', $str));
123
        $prevUnserializeHandler = ini_set('unserialize_callback_func', self::class . '::handleUnserializeCallback');
124
125
        /**
126
         * @psalm-suppress MissingClosureParamType
127
         * @psalm-suppress MissingClosureReturnType
128
         */
129
        $prevErrorHandler = set_error_handler(function ($type, $msg, $file, $line, $context = []) use (&$prevErrorHandler, $signalingException) {
0 ignored issues
show
Unused Code introduced by
The assignment to $prevErrorHandler is dead and can be removed.
Loading history...
130
            if (__FILE__ === $file) {
131
                throw $signalingException;
132
            }
133
134
            return null !== $prevErrorHandler ? $prevErrorHandler($type, $msg, $file, $line, $context) : false;
135
        });
136
137
        try {
138
            /** @var object $messageToBeDispatched */
139
            $messageToBeDispatched = unserialize($str, [
140
                'allowed_classes' => true,
141
            ]);
142
        } finally {
143
            restore_error_handler();
144
            ini_set('unserialize_callback_func', $prevUnserializeHandler);
145
        }
146
147
        return $messageToBeDispatched;
148
    }
149
150
    /**
151
     * @internal
152
     */
153
    public static function handleUnserializeCallback(string $class): void
154
    {
155
        throw new InvalidArgumentException(sprintf('Message class "%s" not found during unserialization.', $class));
156
    }
157
158
    private function getObjectManager(object $object): ObjectManager
159
    {
160
        if (null === $this->objectManager) {
161
            $class = get_class($object);
162
            $manager = $this->managerRegistry->getManagerForClass($class);
163
164
            if (null === $manager) {
165
                throw new RuntimeException(sprintf('No object manager associated with the class, %s', $class));
166
            }
167
168
            $this->objectManager = $manager;
169
        }
170
171
        return $this->objectManager;
172
    }
173
}
174