| 1 | <?php |
||
| 2 | |||
| 3 | namespace Bdf\QueueMessengerBundle\Transport\Handler; |
||
| 4 | |||
| 5 | use Bdf\Queue\Message\EnvelopeInterface as QueuedEnvelope; |
||
| 6 | use Bdf\Queue\Message\InteractEnvelopeInterface; |
||
| 7 | use Bdf\QueueMessengerBundle\Transport\Stamp\NullStampsSerializer; |
||
| 8 | use Bdf\QueueMessengerBundle\Transport\Stamp\StampsSerializerInterface; |
||
| 9 | use Symfony\Component\Messenger\Envelope; |
||
| 10 | use Symfony\Component\Messenger\Exception\LogicException; |
||
| 11 | use Symfony\Component\Messenger\MessageBusInterface; |
||
| 12 | use Symfony\Component\Messenger\Stamp\HandledStamp; |
||
| 13 | use Symfony\Component\Messenger\Stamp\ReceivedStamp; |
||
| 14 | |||
| 15 | /** |
||
| 16 | * Job for dispatch a queued command to the bus |
||
| 17 | * If the message do not contains a valid object as data, an InternalMessage is dispatched. |
||
| 18 | * |
||
| 19 | * If the a reply is requested on the message, |
||
| 20 | * a synchronised dispatch is performed, |
||
| 21 | * and the result is returned as reply |
||
| 22 | */ |
||
| 23 | class MessageBusHandler |
||
| 24 | { |
||
| 25 | /** |
||
| 26 | * The command dispatcher. |
||
| 27 | * |
||
| 28 | * @var MessageBusInterface |
||
| 29 | */ |
||
| 30 | private $dispatcher; |
||
| 31 | |||
| 32 | /** |
||
| 33 | * @var StampsSerializerInterface |
||
| 34 | */ |
||
| 35 | private $stampsSerializer; |
||
| 36 | |||
| 37 | /** |
||
| 38 | * Constructor. |
||
| 39 | */ |
||
| 40 | public function __construct(MessageBusInterface $dispatcher, StampsSerializerInterface $stampsSerializer = null) |
||
| 41 | { |
||
| 42 | $this->dispatcher = $dispatcher; |
||
| 43 | $this->stampsSerializer = $stampsSerializer ?: new NullStampsSerializer(); |
||
| 44 | } |
||
| 45 | |||
| 46 | /** |
||
| 47 | * Handle a queued command. |
||
| 48 | * |
||
| 49 | * @param mixed $message |
||
| 50 | */ |
||
| 51 | public function __invoke($message, QueuedEnvelope $queuedEnvelope) |
||
| 52 | { |
||
| 53 | $envelope = $this |
||
| 54 | ->toEnvelope($message, $queuedEnvelope) |
||
| 55 | // Symfony 4.3 add transport name in constructor |
||
| 56 | ->with(new ReceivedStamp($queuedEnvelope->connection()->getName())) |
||
| 57 | ; |
||
| 58 | |||
| 59 | $this->dispatch($envelope, $queuedEnvelope); |
||
| 60 | } |
||
| 61 | |||
| 62 | /** |
||
| 63 | * Dispatch the envelope to the bus. |
||
| 64 | * |
||
| 65 | * If the message is replyable and a reply is requested, a synchronized call is performed and the result is returned |
||
| 66 | */ |
||
| 67 | private function dispatch(Envelope $envelope, QueuedEnvelope $queuedEnvelope): void |
||
| 68 | { |
||
| 69 | $envelope = $this->dispatcher->dispatch($envelope); |
||
| 70 | |||
| 71 | if ($queuedEnvelope instanceof InteractEnvelopeInterface && $queuedEnvelope->message()->needsReply()) { |
||
| 72 | /** @var HandledStamp[] $handledStamps */ |
||
| 73 | $handledStamps = $envelope->all(HandledStamp::class); |
||
| 74 | |||
| 75 | if (!$handledStamps) { |
||
|
0 ignored issues
–
show
|
|||
| 76 | throw new LogicException(sprintf('Message of type "%s" was handled zero times. Exactly one handler is expected when using "%s::%s()".', \get_class($envelope->getMessage()), \get_class($this), __FUNCTION__)); |
||
| 77 | } |
||
| 78 | |||
| 79 | $queuedEnvelope->reply($handledStamps[0]->getResult()); |
||
| 80 | } |
||
| 81 | } |
||
| 82 | |||
| 83 | /** |
||
| 84 | * Get the envelope from the message payload. |
||
| 85 | * |
||
| 86 | * @param mixed $message |
||
| 87 | */ |
||
| 88 | private function toEnvelope($message, QueuedEnvelope $queuedEnvelope): Envelope |
||
| 89 | { |
||
| 90 | if ($message instanceof Envelope) { |
||
| 91 | return $message; |
||
| 92 | } |
||
| 93 | |||
| 94 | // TODO How to handle non object message |
||
| 95 | // if (!is_object($message)) { |
||
| 96 | // $message = new InternalMessage( |
||
| 97 | // $queuedEnvelope->message()->name() ?: $queuedEnvelope->message()->queue(), |
||
| 98 | // $message |
||
| 99 | // ); |
||
| 100 | // } |
||
| 101 | |||
| 102 | if ($stamps = $queuedEnvelope->message()->header('stamps')) { |
||
| 103 | return new Envelope($message, $this->stampsSerializer->deserialize($stamps)); |
||
| 104 | } |
||
| 105 | |||
| 106 | return new Envelope($message); |
||
| 107 | } |
||
| 108 | } |
||
| 109 |
This check marks implicit conversions of arrays to boolean values in a comparison. While in PHP an empty array is considered to be equal (but not identical) to false, this is not always apparent.
Consider making the comparison explicit by using
empty(..)or! empty(...)instead.