RoutingTopology   A
last analyzed

Complexity

Total Complexity 20

Size/Duplication

Total Lines 211
Duplicated Lines 0 %

Coupling/Cohesion

Components 1
Dependencies 1

Test Coverage

Coverage 98.59%

Importance

Changes 0
Metric Value
wmc 20
lcom 1
cbo 1
dl 0
loc 211
ccs 70
cts 71
cp 0.9859
rs 10
c 0
b 0
f 0

14 Methods

Rating   Name   Duplication   Size   Complexity  
A __construct() 0 4 1
A setupForEndpointUse() 0 7 2
A setupSubscription() 0 5 1
A tearDownSubscription() 0 4 1
A publish() 0 17 1
A send() 0 14 1
A sendToQueue() 0 14 1
A setupClassSubscriptions() 0 14 3
A markMessageAsConfigured() 0 4 1
A isMessageConfigured() 0 4 1
A getEventExchangeName() 0 9 2
A getSafeName() 0 4 1
A enhanceDeliveryMode() 0 10 2
A declareExchange() 0 4 2
1
<?php
2
namespace PSB\Core\Transport\RabbitMq;
3
4
5
class RoutingTopology
6
{
7
    /**
8
     * @var bool
9
     */
10
    private $useDurableMessaging;
11
12
    /**
13
     * @var bool[]
14
     */
15
    private $messageTopologyConfigured = [];
16
17
    /**
18
     * @param bool $useDurableMesaging
19
     */
20 10
    public function __construct($useDurableMesaging)
21
    {
22 10
        $this->useDurableMessaging = $useDurableMesaging;
23 10
    }
24
25
    /**
26
     * @param BrokerModel $broker
27
     * @param string      $queueName
28
     */
29 2
    public function setupForEndpointUse(BrokerModel $broker, $queueName)
30
    {
31 2
        $safeQueueName = static::getSafeName($queueName);
32 2
        $broker->declareQueue($safeQueueName, $this->useDurableMessaging ? AMQP_DURABLE : null);
33 2
        $this->declareExchange($broker, $safeQueueName);
34 2
        $broker->bindQueue($safeQueueName, $safeQueueName, '');
35 2
    }
36
37
    /**
38
     * @param BrokerModel $broker
39
     * @param string      $messageFqcn
40
     * @param string      $subscriberName
41
     */
42 1
    public function setupSubscription(BrokerModel $broker, $messageFqcn, $subscriberName)
43
    {
44 1
        $this->setupClassSubscriptions($broker, $messageFqcn);
45 1
        $broker->bindExchange($subscriberName, $this->getEventExchangeName($messageFqcn));
46 1
    }
47
48
    /**
49
     * @param BrokerModel $broker
50
     * @param string      $messageFqcn
51
     * @param string      $subscriberName
52
     */
53 1
    public function tearDownSubscription(BrokerModel $broker, $messageFqcn, $subscriberName)
54
    {
55 1
        $broker->unbindExchange($subscriberName, $this->getEventExchangeName($messageFqcn));
56 1
    }
57
58
    /**
59
     * @param BrokerModel $broker
60
     * @param string      $messageFqcn
61
     * @param string      $messageBody
62
     * @param array       $attributes
63
     */
64 1
    public function publish(
65
        BrokerModel $broker,
66
        $messageFqcn,
67
        $messageBody,
68
        array $attributes = []
69
    ) {
70
        // The semantics of publish implies that one can publish without caring who listens,
71
        // which means that it should not throw due to lack of exchange(s) and thus we make sure the exchange(s) exists
72 1
        $this->setupClassSubscriptions($broker, $messageFqcn);
73 1
        $broker->publish(
74 1
            $this->getEventExchangeName($messageFqcn),
75 1
            $messageBody,
76 1
            '',
77 1
            AMQP_NOPARAM,
78 1
            $this->enhanceDeliveryMode($attributes)
79
        );
80 1
    }
81
82
    /**
83
     * @param BrokerModel $broker
84
     * @param string      $address
85
     * @param string      $messageBody
86
     * @param array       $attributes
87
     */
88 1
    public function send(
89
        BrokerModel $broker,
90
        $address,
91
        $messageBody,
92
        array $attributes = []
93
    ) {
94 1
        $broker->publish(
95 1
            static::getSafeName($address),
96 1
            $messageBody,
97 1
            '',
98 1
            AMQP_NOPARAM,
99 1
            $this->enhanceDeliveryMode($attributes)
100
        );
101 1
    }
102
103
    /**
104
     * @param BrokerModel $broker
105
     * @param string      $queueName
106
     * @param string      $messageBody
107
     * @param array       $attributes
108
     */
109 1
    public function sendToQueue(
110
        BrokerModel $broker,
111
        $queueName,
112
        $messageBody,
113
        array $attributes = []
114
    ) {
115 1
        $broker->publish(
116 1
            '',
117 1
            $messageBody,
118 1
            static::getSafeName($queueName),
119 1
            AMQP_NOPARAM,
120 1
            $this->enhanceDeliveryMode($attributes)
121
        );
122 1
    }
123
124
    /**
125
     * @param BrokerModel $broker
126
     * @param string      $messageFqcn
127
     */
128 2
    private function setupClassSubscriptions(BrokerModel $broker, $messageFqcn)
129
    {
130 2
        if ($this->isMessageConfigured($messageFqcn)) {
131
            return;
132
        }
133
134 2
        $this->declareExchange($broker, $this->getEventExchangeName($messageFqcn));
135 2
        foreach (class_implements($messageFqcn, true) as $fqin) {
136 2
            $this->declareExchange($broker, $this->getEventExchangeName($fqin));
137 2
            $broker->bindExchange($this->getEventExchangeName($fqin), $this->getEventExchangeName($messageFqcn));
138
        }
139
140 2
        $this->markMessageAsConfigured($messageFqcn);
141 2
    }
142
143
    /**
144
     * @param string $messageFqcn
145
     */
146 2
    private function markMessageAsConfigured($messageFqcn)
147
    {
148 2
        $this->messageTopologyConfigured[$messageFqcn] = null;
149 2
    }
150
151
    /**
152
     * @param string $messageFqcn
153
     *
154
     * @return bool
155
     */
156 2
    private function isMessageConfigured($messageFqcn)
157
    {
158 2
        return isset($this->messageTopologyConfigured[$messageFqcn]);
159
    }
160
161
    /**
162
     * Converts a FQCN to a exchange name.
163
     * Eg. \Some\Namespaced\Class -> Some.Namespaced:Class
164
     *
165
     * @param string $messageFqcn
166
     *
167
     * @return string
168
     */
169 3
    private function getEventExchangeName($messageFqcn)
170
    {
171 3
        $messageFqcn = trim($messageFqcn, '\\');
172 3
        $pos = strrpos($messageFqcn, '\\');
173 3
        if ($pos !== false) {
174 3
            $messageFqcn = substr_replace($messageFqcn, ':', $pos, 1);
175
        }
176 3
        return str_replace('\\', '.', $messageFqcn);
177
    }
178
179
    /**
180
     * Only letters, digits, hyphen, underscore, period or colon are allowed for queue and exchange names in RMQ
181
     *
182
     * @param string $address
183
     *
184
     * @return string
185
     */
186 6
    public static function getSafeName($address)
187
    {
188 6
        return preg_replace("/[^a-zA-Z0-9-_.:]/i", '.', $address);
189
    }
190
191
    /**
192
     * @param array $attributes
193
     *
194
     * @return array
195
     */
196 3
    private function enhanceDeliveryMode(array $attributes)
197
    {
198 3
        $attributes['delivery_mode'] = 1;
199
200 3
        if ($this->useDurableMessaging) {
201 3
            $attributes['delivery_mode'] = 2;
202
        }
203
204 3
        return $attributes;
205
    }
206
207
    /**
208
     * @param BrokerModel $broker
209
     * @param string      $exchangeName
210
     */
211 4
    private function declareExchange(BrokerModel $broker, $exchangeName)
212
    {
213 4
        $broker->declareExchange($exchangeName, AMQP_EX_TYPE_FANOUT, $this->useDurableMessaging ? AMQP_DURABLE : null);
214 4
    }
215
}
216