1 | <?php |
||
2 | |||
3 | declare(strict_types=1); |
||
4 | |||
5 | namespace Setono\MessageSchedulerBundle\Message\Handler; |
||
6 | |||
7 | use const DATE_ATOM; |
||
8 | use Doctrine\Persistence\ManagerRegistry; |
||
9 | use Doctrine\Persistence\ObjectManager; |
||
10 | use InvalidArgumentException; |
||
11 | use RuntimeException; |
||
12 | use Safe\DateTime; |
||
13 | use function Safe\ini_set; |
||
14 | use function Safe\sprintf; |
||
15 | use Setono\MessageSchedulerBundle\Entity\ScheduledMessage; |
||
16 | use Setono\MessageSchedulerBundle\Message\Command\DispatchScheduledMessage; |
||
17 | use Setono\MessageSchedulerBundle\Repository\ScheduledMessageRepositoryInterface; |
||
18 | use Setono\MessageSchedulerBundle\Workflow\ScheduledMessageWorkflow; |
||
19 | use Symfony\Component\Messenger\Envelope; |
||
20 | use Symfony\Component\Messenger\Exception\UnrecoverableMessageHandlingException; |
||
21 | use Symfony\Component\Messenger\Handler\MessageHandlerInterface; |
||
22 | use Symfony\Component\Messenger\RoutableMessageBus; |
||
23 | use Symfony\Component\Messenger\Stamp\BusNameStamp; |
||
24 | use Symfony\Component\Workflow\Registry; |
||
25 | use Throwable; |
||
26 | |||
27 | final class DispatchMessageHandler implements MessageHandlerInterface |
||
28 | { |
||
29 | private ?ObjectManager $objectManager = null; |
||
30 | |||
31 | private RoutableMessageBus $routableMessageBus; |
||
32 | |||
33 | private ScheduledMessageRepositoryInterface $scheduledMessageRepository; |
||
34 | |||
35 | private Registry $workflowRegistry; |
||
36 | |||
37 | private ManagerRegistry $managerRegistry; |
||
38 | |||
39 | 1 | public function __construct( |
|
40 | RoutableMessageBus $routableMessageBus, |
||
41 | ScheduledMessageRepositoryInterface $scheduledMessageRepository, |
||
42 | Registry $workflowRegistry, |
||
43 | ManagerRegistry $managerRegistry |
||
44 | ) { |
||
45 | 1 | $this->routableMessageBus = $routableMessageBus; |
|
46 | 1 | $this->scheduledMessageRepository = $scheduledMessageRepository; |
|
47 | 1 | $this->workflowRegistry = $workflowRegistry; |
|
48 | 1 | $this->managerRegistry = $managerRegistry; |
|
49 | 1 | } |
|
50 | |||
51 | public function __invoke(DispatchScheduledMessage $message): void |
||
52 | { |
||
53 | /** @var ScheduledMessage|null $scheduledMessage */ |
||
54 | $scheduledMessage = $this->scheduledMessageRepository->find($message->getScheduledMessageId()); |
||
55 | if (null === $scheduledMessage) { |
||
56 | throw new UnrecoverableMessageHandlingException(sprintf( |
||
57 | 'The scheduled message with id, "%s" does not exist', $message->getScheduledMessageId() |
||
58 | )); |
||
59 | } |
||
60 | |||
61 | $scheduledMessage->resetErrors(); |
||
62 | |||
63 | $now = new DateTime(); |
||
64 | if ($scheduledMessage->getDispatchAt() > $now) { |
||
65 | throw new UnrecoverableMessageHandlingException(sprintf( |
||
66 | 'The scheduled message with id, "%s" is not eligible to be dispatched yet. The dispatch timestamp is %s, while the time now is %s', |
||
67 | $message->getScheduledMessageId(), $scheduledMessage->getDispatchAt()->format(DATE_ATOM), $now->format(DATE_ATOM) |
||
68 | )); |
||
69 | } |
||
70 | |||
71 | $workflow = $this->workflowRegistry->get($scheduledMessage, ScheduledMessageWorkflow::NAME); |
||
72 | if (!$workflow->can($scheduledMessage, ScheduledMessageWorkflow::TRANSITION_PROCESS)) { |
||
73 | throw new UnrecoverableMessageHandlingException(sprintf( |
||
74 | 'The scheduled message with id, "%s" could not enter the transition "%s". The state was: "%s"', |
||
75 | $message->getScheduledMessageId(), ScheduledMessageWorkflow::TRANSITION_PROCESS, $scheduledMessage->getState() |
||
76 | )); |
||
77 | } |
||
78 | $workflow->apply($scheduledMessage, ScheduledMessageWorkflow::TRANSITION_PROCESS); |
||
79 | |||
80 | $objectManager = $this->getObjectManager($scheduledMessage); |
||
81 | $objectManager->flush(); |
||
82 | |||
83 | $messageToBeDispatched = $this->unserialize($scheduledMessage->getSerializedMessage()); |
||
84 | |||
85 | $stamps = []; |
||
86 | |||
87 | $bus = $scheduledMessage->getBus(); |
||
88 | if (null !== $bus) { |
||
89 | $stamps[] = new BusNameStamp($bus); |
||
90 | } |
||
91 | |||
92 | try { |
||
93 | // notice that this try-catch block won't handle (and therefore log) any errors happening, |
||
94 | // if the $messageToBeDispatched is routed on an async transport |
||
95 | $this->routableMessageBus->dispatch(new Envelope($messageToBeDispatched, $stamps)); |
||
96 | |||
97 | $workflow->apply($scheduledMessage, ScheduledMessageWorkflow::TRANSITION_SUCCEED); |
||
98 | $objectManager->flush(); |
||
99 | } catch (Throwable $e) { |
||
100 | $originalException = $e; |
||
101 | |||
102 | $workflow->apply($scheduledMessage, ScheduledMessageWorkflow::TRANSITION_FAIL); |
||
103 | |||
104 | do { |
||
105 | $scheduledMessage->addError($e->getMessage()); |
||
106 | $e = $e->getPrevious(); |
||
107 | } while (null !== $e); |
||
108 | |||
109 | $objectManager->flush(); |
||
110 | |||
111 | throw $originalException; |
||
112 | } |
||
113 | } |
||
114 | |||
115 | /** |
||
116 | * This code is taken from \Symfony\Component\Messenger\Transport\Serialization\PhpSerializer |
||
117 | * |
||
118 | * todo extract to a separate library |
||
119 | */ |
||
120 | private function unserialize(string $str): object |
||
121 | { |
||
122 | $signalingException = new InvalidArgumentException(sprintf('Could not decode message using PHP serialization: %s.', $str)); |
||
123 | $prevUnserializeHandler = ini_set('unserialize_callback_func', self::class . '::handleUnserializeCallback'); |
||
124 | |||
125 | /** |
||
126 | * @psalm-suppress MissingClosureParamType |
||
127 | * @psalm-suppress MissingClosureReturnType |
||
128 | */ |
||
129 | $prevErrorHandler = set_error_handler(function ($type, $msg, $file, $line, $context = []) use (&$prevErrorHandler, $signalingException) { |
||
0 ignored issues
–
show
Unused Code
introduced
by
![]() |
|||
130 | if (__FILE__ === $file) { |
||
131 | throw $signalingException; |
||
132 | } |
||
133 | |||
134 | return null !== $prevErrorHandler ? $prevErrorHandler($type, $msg, $file, $line, $context) : false; |
||
135 | }); |
||
136 | |||
137 | try { |
||
138 | /** @var object $messageToBeDispatched */ |
||
139 | $messageToBeDispatched = unserialize($str, [ |
||
140 | 'allowed_classes' => true, |
||
141 | ]); |
||
142 | } finally { |
||
143 | restore_error_handler(); |
||
144 | ini_set('unserialize_callback_func', $prevUnserializeHandler); |
||
145 | } |
||
146 | |||
147 | return $messageToBeDispatched; |
||
148 | } |
||
149 | |||
150 | /** |
||
151 | * @internal |
||
152 | */ |
||
153 | public static function handleUnserializeCallback(string $class): void |
||
154 | { |
||
155 | throw new InvalidArgumentException(sprintf('Message class "%s" not found during unserialization.', $class)); |
||
156 | } |
||
157 | |||
158 | private function getObjectManager(object $object): ObjectManager |
||
159 | { |
||
160 | if (null === $this->objectManager) { |
||
161 | $class = get_class($object); |
||
162 | $manager = $this->managerRegistry->getManagerForClass($class); |
||
163 | |||
164 | if (null === $manager) { |
||
165 | throw new RuntimeException(sprintf('No object manager associated with the class, %s', $class)); |
||
166 | } |
||
167 | |||
168 | $this->objectManager = $manager; |
||
169 | } |
||
170 | |||
171 | return $this->objectManager; |
||
172 | } |
||
173 | } |
||
174 |