Completed
Push — master ( 613d98...fa04c9 )
by Rémi
20:40
created

PeclAmqpDriver   A

Complexity

Total Complexity 26

Size/Duplication

Total Lines 285
Duplicated Lines 6.67 %

Coupling/Cohesion

Components 1
Dependencies 3

Importance

Changes 0
Metric Value
wmc 26
lcom 1
cbo 3
dl 19
loc 285
rs 10
c 0
b 0
f 0

16 Methods

Rating   Name   Duplication   Size   Complexity  
A __construct() 0 4 1
A declareSimpleQueue() 0 13 2
A declareExchange() 0 9 1
A bindQueue() 0 5 1
A declareAndBindQueue() 0 5 1
A deleteQueue() 0 4 1
A deleteExchange() 0 4 1
A publish() 0 10 1
B consume() 0 30 4
A ack() 0 5 1
A nack() 0 5 2
A close() 0 4 1
A getChannel() 0 9 2
A getQueue() 0 9 2
A getExchange() 0 9 2
A getMessageProperties() 19 19 3

How to fix   Duplicated Code   

Duplicated Code

Duplicate code is one of the most pungent code smells. A rule that is often used is to re-structure code once it is duplicated in three or more places.

Common duplication problems, and corresponding solutions are:

1
<?php
2
3
namespace Burrow\Driver;
4
5
use Burrow\Driver;
6
use Burrow\Exception\ConsumerException;
7
use Burrow\Exception\TimeoutException;
8
use Burrow\Message;
9
10
/**
11
 * @codeCoverageIgnore
12
 */
13
class PeclAmqpDriver implements Driver
14
{
15
    const DELIVERY_MODE = 'delivery_mode';
16
    const CONTENT_TYPE = 'content_type';
17
    const APPLICATION_HEADERS = 'headers';
18
    const CORRELATION_ID = 'correlation_id';
19
    const REPLY_TO = 'reply_to';
20
21
    /** @var \AMQPConnection */
22
    private $connection;
23
24
    /** @var \AMQPChannel */
25
    private $channel;
26
27
    /**
28
     * PeclAmqpDriver constructor.
29
     *
30
     * @param \AMQPConnection $connection
31
     */
32
    public function __construct(\AMQPConnection $connection)
33
    {
34
        $this->connection = $connection;
35
    }
36
37
    /**
38
     * Declare a persistent queue
39
     *
40
     * @param string $queueName
41
     * @param string  $type
42
     *
43
     * @return string
44
     */
45
    public function declareSimpleQueue($queueName = '', $type = self::QUEUE_DURABLE)
46
    {
47
        $flag = AMQP_DURABLE;
48
        if ($type === self::QUEUE_EXCLUSIVE) {
49
            $flag = AMQP_EXCLUSIVE;
50
        }
51
        
52
        $queue = $this->getQueue($queueName);
53
        $queue->setFlags($flag);
54
        $queue->declareQueue();
55
56
        return $queue->getName();
57
    }
58
59
    /**
60
     * Declare an exchange
61
     *
62
     * @param  string $exchangeName
63
     * @param  string $type
64
     *
65
     * @return string
66
     */
67
    public function declareExchange($exchangeName = '', $type = self::EXCHANGE_TYPE_FANOUT)
68
    {
69
        $exchange = $this->getExchange($exchangeName);
70
        $exchange->setType($type);
71
        $exchange->setFlags(AMQP_DURABLE);
72
        $exchange->declareExchange();
73
74
        return $exchange->getName();
75
    }
76
77
    /**
78
     * Bind an existing queue to an exchange
79
     *
80
     * @param  string $exchange
81
     * @param  string $queueName
82
     * @param  string $routingKey
83
     *
84
     * @return void
85
     */
86
    public function bindQueue($exchange, $queueName, $routingKey = '')
87
    {
88
        $queue = $this->getQueue($queueName);
89
        $queue->bind($exchange, $routingKey);
90
    }
91
92
    /**
93
     * Create a persisting queue and bind it to an exchange
94
     *
95
     * @param  string $exchange
96
     * @param  string $queueName
97
     * @param  string $routingKey
98
     *
99
     * @return void
100
     */
101
    public function declareAndBindQueue($exchange, $queueName, $routingKey = '')
102
    {
103
        $this->declareSimpleQueue($queueName);
104
        $this->bindQueue($exchange, $queueName, $routingKey);
105
    }
106
107
    /**
108
     * Delete a queue
109
     *
110
     * @param string $queueName
111
     *
112
     * @return void
113
     */
114
    public function deleteQueue($queueName)
115
    {
116
        $this->getQueue($queueName)->delete();
117
    }
118
119
    /**
120
     * Delete an exchange
121
     *
122
     * @param string $exchangeName
123
     *
124
     * @return void
125
     */
126
    public function deleteExchange($exchangeName)
127
    {
128
        $this->getExchange($exchangeName)->delete();
129
    }
130
131
    /**
132
     * Publish a message in the exchange
133
     *
134
     * @param string  $exchangeName
135
     * @param Message $message
136
     *
137
     * @return void
138
     */
139
    public function publish($exchangeName, Message $message)
140
    {
141
        $exchange = $this->getExchange($exchangeName);
142
        $exchange->publish(
143
            $message->getBody(),
144
            $message->getRoutingKey(),
145
            AMQP_NOPARAM,
146
            self::getMessageProperties($message)
147
        );
148
    }
149
150
    /**
151
     * Consume the queue
152
     *
153
     * @param string   $queueName
154
     * @param callable $callback
155
     * @param int      $timeout
156
     * @param bool     $autoAck
157
     *
158
     * @return void
159
     */
160
    public function consume($queueName, callable $callback, $timeout = 0, $autoAck = true)
161
    {
162
        $this->connection->setReadTimeout($timeout);
163
        $this->getChannel()->setPrefetchCount(1);
164
        $queue = $this->getQueue($queueName);
165
        $flags = $autoAck ? AMQP_AUTOACK : AMQP_NOPARAM;
166
167
        try {
168
            $queue->consume(function (\AMQPEnvelope $message) use ($callback, $queueName) {
169
170
                $burrowMessage = new Message(
171
                    $message->getBody(),
172
                    $message->getRoutingKey(),
173
                    $message->getHeaders(),
174
                    $message->getCorrelationId(),
175
                    $message->getReplyTo()
176
                );
177
                $burrowMessage->setDeliveryTag($message->getDeliveryTag());
178
                $burrowMessage->setQueue($queueName);
179
180
                return $callback($burrowMessage);
181
            }, $flags);
182
        } catch (\AMQPQueueException $e) {
183
            if ($e->getMessage() === 'Consumer timeout exceed') {
184
                throw TimeoutException::build($e, $timeout);
185
            }
186
            throw ConsumerException::build($e);
187
        }
188
189
    }
190
191
    /**
192
     * Acknowledge the reception of the message.
193
     *
194
     * @param Message $message
195
     *
196
     * @return void
197
     */
198
    public function ack(Message $message)
199
    {
200
        $queue = $this->getQueue($message->getQueue());
201
        $queue->ack($message->getDeliveryTag());
202
    }
203
204
    /**
205
     * Acknowledge an error during the consumption of the message
206
     *
207
     * @param Message $message
208
     * @param bool    $requeue
209
     *
210
     * @return void
211
     */
212
    public function nack(Message $message, $requeue = true)
213
    {
214
        $queue = $this->getQueue($message->getQueue());
215
        $queue->nack($message->getDeliveryTag(), ($requeue) ? AMQP_REQUEUE : AMQP_NOPARAM);
216
    }
217
218
    /**
219
     * Close the connection
220
     *
221
     * @return void
222
     */
223
    public function close()
224
    {
225
        $this->connection->disconnect();
226
    }
227
228
    /**
229
     * @return \AMQPChannel
230
     */
231
    private function getChannel()
232
    {
233
        if (null === $this->channel) {
234
            $this->connection->connect();
235
            $this->channel = new \AMQPChannel($this->connection);
236
        }
237
238
        return $this->channel;
239
    }
240
241
    /**
242
     * @param string $queueName
243
     *
244
     * @return \AMQPQueue
245
     */
246
    private function getQueue($queueName)
247
    {
248
        $queue = new \AMQPQueue($this->getChannel());
249
        if ($queueName) {
250
            $queue->setName($queueName);
251
        }
252
253
        return $queue;
254
    }
255
256
    /**
257
     * @param string $exchangeName
258
     *
259
     * @return \AMQPExchange
260
     */
261
    private function getExchange($exchangeName)
262
    {
263
        $exchange = new \AMQPExchange($this->getChannel());
264
        if ($exchangeName) {
265
            $exchange->setName($exchangeName);
266
        }
267
268
        return $exchange;
269
    }
270
271
    /**
272
     * Returns the message parameters
273
     *
274
     * @param Message $message
275
     *
276
     * @return array
277
     */
278 View Code Duplication
    private static function getMessageProperties(Message $message)
0 ignored issues
show
Duplication introduced by
This method seems to be duplicated in your project.

Duplicated code is one of the most pungent code smells. If you need to duplicate the same code in three or more different places, we strongly encourage you to look into extracting the code into a single class or operation.

You can also find more detailed suggestions in the “Code” section of your repository.

Loading history...
279
    {
280
        $properties = [
281
            self::DELIVERY_MODE       => 2,
282
            self::CONTENT_TYPE        => 'text/plain',
283
            self::APPLICATION_HEADERS => $message->getHeaders()
284
        ];
285
286
        if ($message->getCorrelationId() !== null) {
287
            $properties[self::CORRELATION_ID] = $message->getCorrelationId();
288
289
        }
290
291
        if ($message->getReplyTo() !== null) {
292
            $properties[self::REPLY_TO] = $message->getReplyTo();
293
        }
294
295
        return $properties;
296
    }
297
}
298