RabbitMqMessageDispatcher::dispatch()   A
last analyzed

Complexity

Conditions 4
Paths 4

Size

Total Lines 17

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 12
CRAP Score 4

Importance

Changes 0
Metric Value
dl 0
loc 17
ccs 12
cts 12
cp 1
rs 9.7
c 0
b 0
f 0
cc 4
nc 4
nop 1
crap 4
1
<?php
2
namespace PSB\Core\Transport\RabbitMq;
3
4
5
use PSB\Core\Exception\InvalidArgumentException;
6
use PSB\Core\Routing\MulticastAddressTag;
7
use PSB\Core\Routing\UnicastAddressTag;
8
use PSB\Core\Transport\MessageDispatcherInterface;
9
use PSB\Core\Transport\OutgoingPhysicalMessage;
10
use PSB\Core\Transport\TransportOperations;
11
12
class RabbitMqMessageDispatcher implements MessageDispatcherInterface
13
{
14
    /**
15
     * @var RoutingTopology
16
     */
17
    private $routingTopology;
18
19
    /**
20
     * @var BrokerModel
21
     */
22
    private $brokerModel;
23
24
    /**
25
     * @var MessageConverter
26
     */
27
    private $messageConverter;
28
29
    /**
30
     * @param RoutingTopology $routingTopology
31
     * @param BrokerModel $brokerModel
32
     * @param MessageConverter $messageConverter
33
     */
34 4
    public function __construct(
35
        RoutingTopology $routingTopology,
36
        BrokerModel $brokerModel,
37
        MessageConverter $messageConverter
38
    ) {
39 4
        $this->routingTopology = $routingTopology;
40 4
        $this->brokerModel = $brokerModel;
41 4
        $this->messageConverter = $messageConverter;
42 4
    }
43
44
    /**
45
     * @param TransportOperations $transportOperations
46
     */
47 3
    public function dispatch(TransportOperations $transportOperations)
48
    {
49 3
        foreach ($transportOperations->getTransportOperations() as $transportOperation) {
50 3
            $addressTag = $transportOperation->getAddressTag();
51 3
            $message = $transportOperation->getMessage();
52 3
            if ($addressTag instanceof UnicastAddressTag) {
53 1
                $this->sendMessage($message, $addressTag);
54 2
            } elseif ($addressTag instanceof MulticastAddressTag) {
55 1
                $this->publishMessage($message, $addressTag);
56
            } else {
57 1
                $tagType = get_class($addressTag);
58 1
                throw new InvalidArgumentException(
59 1
                    "Transport operations contain an unsupported type of '$tagType'. Supported types are 'PSB\Core\\Routing\\UnicastAddressTag' and 'PSB\Core\\Routing\\MulticastAddressTag'."
60
                );
61
            }
62
        }
63 2
    }
64
65
    /**
66
     * @param OutgoingPhysicalMessage $message
67
     * @param UnicastAddressTag       $addressTag
68
     */
69 1
    private function sendMessage(OutgoingPhysicalMessage $message, UnicastAddressTag $addressTag)
70
    {
71 1
        $attributes = $this->messageConverter->composeRabbitMqAttributes($message);
72 1
        $this->routingTopology->send(
73 1
            $this->brokerModel,
74 1
            $addressTag->getDestination(),
75 1
            $message->getBody(),
76 1
            $attributes
77
        );
78 1
    }
79
80
    /**
81
     * @param OutgoingPhysicalMessage $message
82
     * @param MulticastAddressTag     $addressTag
83
     */
84 1
    private function publishMessage(OutgoingPhysicalMessage $message, MulticastAddressTag $addressTag)
85
    {
86 1
        $attributes = $this->messageConverter->composeRabbitMqAttributes($message);
87 1
        $this->routingTopology->publish(
88 1
            $this->brokerModel,
89 1
            $addressTag->getMessageType(),
90 1
            $message->getBody(),
91 1
            $attributes
92
        );
93 1
    }
94
}
95