BrokerModel::produceQueueInstance()   A
last analyzed

Complexity

Conditions 3
Paths 3

Size

Total Lines 16

Duplication

Lines 16
Ratio 100 %

Code Coverage

Tests 0
CRAP Score 12

Importance

Changes 0
Metric Value
dl 16
loc 16
ccs 0
cts 10
cp 0
rs 9.7333
c 0
b 0
f 0
cc 3
nc 3
nop 1
crap 12
1
<?php
2
namespace PSB\Core\Transport\RabbitMq;
3
4
5
use AMQPChannel;
6
use AMQPConnection;
7
use AMQPExchange;
8
use AMQPQueue;
9
10
/**
11
 * TODO handle exceptions thrown by AMQP
12
 * TODO handle return values for any amqp operation (setters, publish, declare, etc.). Any false should be critical.
13
 */
14
class BrokerModel
15
{
16
    /**
17
     * @var AMQPConnection
18
     */
19
    private $connection;
20
21
    /**
22
     * @var AMQPChannel
23
     */
24
    private $channel;
25
26
    /**
27
     * @var AMQPExchange[]
28
     */
29
    private $declaredExchangeInstances = [];
30
31
    /**
32
     * @var AMQPQueue[]
33
     */
34
    private $declaredQueueInstances = [];
35
36
    /**
37
     * @var AMQPExchange[]
38
     */
39
    private $usedExchangeInstances = [];
40
41
    /**
42
     * @var AMQPQueue[]
43
     */
44
    private $usedQueueInstances = [];
45
46
    /**
47
     * @param AMQPConnection $connection
48
     */
49 3
    public function __construct(AMQPConnection $connection)
50
    {
51 3
        $this->connection = $connection;
52 3
    }
53
54
    /**
55
     * @param string $exchangeName
56
     * @param string $messageBody
57
     * @param string $routingKey
58
     * @param int    $flags
59
     * @param array  $attributes
60
     *
61
     * @return bool
62
     */
63
    public function publish(
64
        $exchangeName,
65
        $messageBody,
66
        $routingKey = '',
67
        $flags = AMQP_NOPARAM,
68
        array $attributes = []
69
    ) {
70
        $exchange = $this->produceExchangeInstance($exchangeName);
71
72
        $return = $exchange->publish(
73
            $messageBody,
74
            $routingKey,
75
            $flags,
76
            $attributes
77
        );
78
79
        return $return;
80
    }
81
82
    /**
83
     * @param string $exchangeName
84
     * @param string $exchangeType
85
     * @param int    $flags
86
     * @param array  $arguments
87
     */
88 View Code Duplication
    public function declareExchange($exchangeName, $exchangeType, $flags = null, $arguments = [])
89
    {
90
        $this->ensureChannel();
91
        if (!isset($this->declaredExchangeInstances[$exchangeName])) {
92
            $exchange = new AMQPExchange($this->channel);
93
            $this->declaredExchangeInstances[$exchangeName] = $exchange;
94
            if ($flags !== null) {
95
                $exchange->setFlags($flags);
96
            }
97
            $exchange->setName($exchangeName);
98
            $exchange->setType($exchangeType);
99
            $exchange->setArguments($arguments);
100
            $exchange->declareExchange();
101
        }
102
    }
103
104
    /**
105
     * @param string $queueName
106
     * @param int    $flags
107
     * @param array  $arguments
108
     */
109 View Code Duplication
    public function declareQueue($queueName, $flags = null, $arguments = [])
110
    {
111
        $this->ensureChannel();
112
113
        if (!isset($this->declaredQueueInstances[$queueName])) {
114
            $queue = new AMQPQueue($this->channel);
115
            $this->declaredQueueInstances[$queueName] = $queue;
116
            if ($flags !== null) {
117
                $queue->setFlags($flags);
118
            }
119
            $queue->setName($queueName);
120
            $queue->setArguments($arguments);
121
            $queue->declareQueue();
122
        }
123
    }
124
125
    /**
126
     * WARNING: amqp 1.4.0 on x64 is bugged and will randomly hang when attempting to bind exchanges
127
     *
128
     * @param string $destinationName
129
     * @param string $sourceName
130
     * @param string $routingKey
131
     */
132
    public function bindExchange($destinationName, $sourceName, $routingKey = '')
133
    {
134
        /**
135
         * Amqp 1.4.0 will throw when trying to bind exchanges without a routing key even if they are of type fanout.
136
         * Later versions do not, but upgrading from 1.4.0 would result in a duplication of bindings (there would
137
         * be one with a routing key and one without), so in order to avoid that we use a routing key for all versions.
138
         */
139
        if ($routingKey == '') {
140
            $routingKey = 'making-sure-there-is-a-routing-key';
141
        }
142
143
        $exchange = $this->produceExchangeInstance($destinationName);
144
        $exchange->bind($sourceName, $routingKey);
145
    }
146
147
    /**
148
     * @param string $destinationName
149
     * @param string $sourceName
150
     * @param string $routingKey
151
     *
152
     * @throws \AMQPChannelException
153
     * @throws \AMQPConnectionException
154
     * @throws \AMQPExchangeException
155
     */
156
    public function unbindExchange($destinationName, $sourceName, $routingKey = '')
157
    {
158
        // WARNING: only amqp 1.6.0 and above supports exchange unbinding
159
        if (phpversion('amqp') <= '1.4.0') {
160
            return;
161
        }
162
163
        $exchange = $this->produceExchangeInstance($destinationName);
164
165
        $exchange->unbind($sourceName, $routingKey);
166
    }
167
168
    /**
169
     * @param string $queueName
170
     * @param string $exchangeName
171
     * @param null   $routingKey
172
     * @param array  $arguments
173
     */
174
    public function bindQueue($queueName, $exchangeName, $routingKey = null, $arguments = [])
175
    {
176
        $queue = $this->produceQueueInstance($queueName);
177
        $queue->bind($exchangeName, $routingKey, $arguments);
178
    }
179
180
    /**
181
     * @param string $queueName
182
     * @param string $exchangeName
183
     * @param null   $routingKey
184
     * @param array  $arguments
185
     */
186
    public function unbindQueue($queueName, $exchangeName, $routingKey = null, $arguments = [])
187
    {
188
        $queue = $this->produceQueueInstance($queueName);
189
        $queue->unbind($exchangeName, $routingKey, $arguments);
190
    }
191
192
    /**
193
     * @param string $queueName
194
     */
195
    public function purgeQueue($queueName)
196
    {
197
        $queue = $this->produceQueueInstance($queueName);
198
        $queue->purge();
199
    }
200
201
    /**
202
     * @param string   $queueName
203
     * @param callable $callback
204
     */
205
    public function consume($queueName, callable $callback)
206
    {
207
        $queue = $this->produceQueueInstance($queueName);
208
        $queue->consume($callback);
209
    }
210
211
    /**
212
     * It returns a cached exchange or a new one if none exists.
213
     *
214
     * @param string $exchangeName
215
     *
216
     * @return AMQPExchange
217
     */
218 View Code Duplication
    private function produceExchangeInstance($exchangeName)
219
    {
220
        if (isset($this->declaredExchangeInstances[$exchangeName])) {
221
            return $this->declaredExchangeInstances[$exchangeName];
222
        }
223
224
        if (isset($this->usedExchangeInstances[$exchangeName])) {
225
            return $this->usedExchangeInstances[$exchangeName];
226
        }
227
228
        $this->ensureChannel();
229
        $exchange = new AMQPExchange($this->channel);
230
        $exchange->setName($exchangeName);
231
        $this->usedExchangeInstances[$exchangeName] = $exchange;
232
        return $exchange;
233
    }
234
235
    /**
236
     * It returns a cached queue or a new one if none exists.
237
     *
238
     * @param string $queueName
239
     *
240
     * @return AMQPQueue
241
     */
242 View Code Duplication
    private function produceQueueInstance($queueName)
243
    {
244
        if (isset($this->declaredQueueInstances[$queueName])) {
245
            return $this->declaredQueueInstances[$queueName];
246
        }
247
248
        if (isset($this->usedQueueInstances[$queueName])) {
249
            return $this->usedQueueInstances[$queueName];
250
        }
251
252
        $this->ensureChannel();
253
        $queue = new AMQPQueue($this->channel);
254
        $queue->setName($queueName);
255
        $this->usedQueueInstances[$queueName] = $queue;
256
        return $queue;
257
    }
258
259
    private function ensureChannel()
260
    {
261
        $this->ensureConnection();
262
        if (!$this->channel) {
263
            $this->channel = new AMQPChannel($this->connection);
264
        }
265
    }
266
267
    private function ensureConnection()
268
    {
269
        if (!$this->connection->isConnected()) {
270
            $this->connection->connect();
271
        }
272
    }
273
}
274