1 | <?php |
||
2 | |||
3 | namespace Bdf\QueueMessengerBundle\Transport\Handler; |
||
4 | |||
5 | use Bdf\Queue\Message\EnvelopeInterface as QueuedEnvelope; |
||
6 | use Bdf\Queue\Message\InteractEnvelopeInterface; |
||
7 | use Bdf\QueueMessengerBundle\Transport\Stamp\NullStampsSerializer; |
||
8 | use Bdf\QueueMessengerBundle\Transport\Stamp\StampsSerializerInterface; |
||
9 | use Symfony\Component\Messenger\Envelope; |
||
10 | use Symfony\Component\Messenger\Exception\LogicException; |
||
11 | use Symfony\Component\Messenger\MessageBusInterface; |
||
12 | use Symfony\Component\Messenger\Stamp\HandledStamp; |
||
13 | use Symfony\Component\Messenger\Stamp\ReceivedStamp; |
||
14 | |||
15 | /** |
||
16 | * Job for dispatch a queued command to the bus |
||
17 | * If the message do not contains a valid object as data, an InternalMessage is dispatched. |
||
18 | * |
||
19 | * If the a reply is requested on the message, |
||
20 | * a synchronised dispatch is performed, |
||
21 | * and the result is returned as reply |
||
22 | */ |
||
23 | class MessageBusHandler |
||
24 | { |
||
25 | /** |
||
26 | * The command dispatcher. |
||
27 | * |
||
28 | * @var MessageBusInterface |
||
29 | */ |
||
30 | private $dispatcher; |
||
31 | |||
32 | /** |
||
33 | * @var StampsSerializerInterface |
||
34 | */ |
||
35 | private $stampsSerializer; |
||
36 | |||
37 | /** |
||
38 | * Constructor. |
||
39 | */ |
||
40 | public function __construct(MessageBusInterface $dispatcher, StampsSerializerInterface $stampsSerializer = null) |
||
41 | { |
||
42 | $this->dispatcher = $dispatcher; |
||
43 | $this->stampsSerializer = $stampsSerializer ?: new NullStampsSerializer(); |
||
44 | } |
||
45 | |||
46 | /** |
||
47 | * Handle a queued command. |
||
48 | * |
||
49 | * @param mixed $message |
||
50 | */ |
||
51 | public function __invoke($message, QueuedEnvelope $queuedEnvelope) |
||
52 | { |
||
53 | $envelope = $this |
||
54 | ->toEnvelope($message, $queuedEnvelope) |
||
55 | // Symfony 4.3 add transport name in constructor |
||
56 | ->with(new ReceivedStamp($queuedEnvelope->connection()->getName())) |
||
57 | ; |
||
58 | |||
59 | $this->dispatch($envelope, $queuedEnvelope); |
||
60 | } |
||
61 | |||
62 | /** |
||
63 | * Dispatch the envelope to the bus. |
||
64 | * |
||
65 | * If the message is replyable and a reply is requested, a synchronized call is performed and the result is returned |
||
66 | */ |
||
67 | private function dispatch(Envelope $envelope, QueuedEnvelope $queuedEnvelope): void |
||
68 | { |
||
69 | $envelope = $this->dispatcher->dispatch($envelope); |
||
70 | |||
71 | if ($queuedEnvelope instanceof InteractEnvelopeInterface && $queuedEnvelope->message()->needsReply()) { |
||
72 | /** @var HandledStamp[] $handledStamps */ |
||
73 | $handledStamps = $envelope->all(HandledStamp::class); |
||
74 | |||
75 | if (!$handledStamps) { |
||
0 ignored issues
–
show
|
|||
76 | throw new LogicException(sprintf('Message of type "%s" was handled zero times. Exactly one handler is expected when using "%s::%s()".', \get_class($envelope->getMessage()), \get_class($this), __FUNCTION__)); |
||
77 | } |
||
78 | |||
79 | $queuedEnvelope->reply($handledStamps[0]->getResult()); |
||
80 | } |
||
81 | } |
||
82 | |||
83 | /** |
||
84 | * Get the envelope from the message payload. |
||
85 | * |
||
86 | * @param mixed $message |
||
87 | */ |
||
88 | private function toEnvelope($message, QueuedEnvelope $queuedEnvelope): Envelope |
||
89 | { |
||
90 | if ($message instanceof Envelope) { |
||
91 | return $message; |
||
92 | } |
||
93 | |||
94 | // TODO How to handle non object message |
||
95 | // if (!is_object($message)) { |
||
96 | // $message = new InternalMessage( |
||
97 | // $queuedEnvelope->message()->name() ?: $queuedEnvelope->message()->queue(), |
||
98 | // $message |
||
99 | // ); |
||
100 | // } |
||
101 | |||
102 | if ($stamps = $queuedEnvelope->message()->header('stamps')) { |
||
103 | return new Envelope($message, $this->stampsSerializer->deserialize($stamps)); |
||
104 | } |
||
105 | |||
106 | return new Envelope($message); |
||
107 | } |
||
108 | } |
||
109 |
This check marks implicit conversions of arrays to boolean values in a comparison. While in PHP an empty array is considered to be equal (but not identical) to false, this is not always apparent.
Consider making the comparison explicit by using
empty(..)
or! empty(...)
instead.