Completed
Push — master ( 9186fa...c4ad81 )
by Tilita
16s queued 14s
created

ExchangeEntity   A

Complexity

Total Complexity 22

Size/Duplication

Total Lines 204
Duplicated Lines 0 %

Test Coverage

Coverage 90.77%

Importance

Changes 1
Bugs 0 Features 0
Metric Value
eloc 72
dl 0
loc 204
ccs 59
cts 65
cp 0.9077
rs 10
c 1
b 0
f 0
wmc 22

10 Methods

Rating   Name   Duplication   Size   Complexity  
B bind() 0 19 7
A reconnect() 0 3 1
A publish() 0 23 4
A createExchange() 0 6 1
A delete() 0 3 1
A getChannel() 0 3 1
A getAliasName() 0 3 1
A getConnection() 0 3 1
A __construct() 0 5 1
A create() 0 20 4
1
<?php
2
namespace NeedleProject\LaravelRabbitMq\Entity;
3
4
use NeedleProject\LaravelRabbitMq\AMQPConnection;
5
use NeedleProject\LaravelRabbitMq\PublisherInterface;
6
use PhpAmqpLib\Channel\AMQPChannel;
7
use PhpAmqpLib\Exception\AMQPChannelClosedException;
8
use PhpAmqpLib\Exception\AMQPProtocolChannelException;
9
use PhpAmqpLib\Message\AMQPMessage;
10
11
/**
12
 * Class ExchangeEntity
13
 *
14
 * @package NeedleProject\LaravelRabbitMq\Entity
15
 * @author  Adrian Tilita <[email protected]>
16
 */
17
class ExchangeEntity implements PublisherInterface, AMQPEntityInterface
18
{
19
    /**
20
     * @const int   Retry count when a Channel Closed exeption is thrown
21
     */
22
    const MAX_RETRIES = 3;
23
24
    /**
25
     * @const array Default connections parameters
26
     */
27
    const DEFAULTS = [
28
        'exchange_type'                => 'topic',
29
        // Whether to check if it exists or to verify existance using argument types (Throws PRECONDITION_FAILED)
30
        'passive'                      => false,
31
        // Entities with durable will be re-created uppon server restart
32
        'durable'                      => false,
33
        // Whether to delete it when no queues ar bind to it
34
        'auto_delete'                  => false,
35
        // Whether the exchange can be used by a publisher or block it (declared just for internal "wiring")
36
        'internal'                     => false,
37
        // Whether to receive a Declare confirmation
38
        'nowait'                       => false,
39
        // Whether to auto create the entity before publishing/consuming it
40
        'auto_create'                  => false,
41
        // whether to "hide" the exception on re-declare.
42
        // if the `passive` filter is set true, this is redundant
43
        'throw_exception_on_redeclare' => true,
44
        // whether to throw on exception when trying to
45
        // bind to an in-existent queue/exchange
46
        'throw_exception_on_bind_fail' => true,
47
    ];
48
49
    /**
50
     * @var AMQPConnection
51
     */
52
    protected $connection;
53
54
    /**
55
     * @var string
56
     */
57
    protected $aliasName;
58
59
    /**
60
     * @var array
61
     */
62
    protected $attributes;
63
64
    /**
65
     * @var int 
66
     */
67
    protected $retryCount = 0;
68
69
    /**
70
     * ExchangeEntity constructor.
71
     *
72
     * @param AMQPConnection $connection
73
     * @param string $aliasName
74
     * @param array $attributes
75
     */
76 15
    public function __construct(AMQPConnection $connection, string $aliasName, array $attributes = [])
77
    {
78 15
        $this->connection = $connection;
79 15
        $this->aliasName  = $aliasName;
80 15
        $this->attributes = $attributes;
81 15
    }
82
83
    /**
84
     * @param AMQPConnection $connection
85
     * @param string $aliasName
86
     * @param array $exchangeDetails
87
     * @return ExchangeEntity
88
     */
89 15
    public static function createExchange(AMQPConnection $connection, string $aliasName, array $exchangeDetails)
90
    {
91 15
        return new static(
92
            $connection,
93
            $aliasName,
94 15
            array_merge(self::DEFAULTS, $exchangeDetails)
95
        );
96
    }
97
98
    /**
99
     * @return string
100
     */
101 2
    public function getAliasName(): string
102
    {
103 2
        return $this->aliasName;
104
    }
105
106
    /**
107
     * @return AMQPConnection
108
     */
109 10
    protected function getConnection(): AMQPConnection
110
    {
111 10
        return $this->connection;
112
    }
113
114
    /**
115
     * @return AMQPChannel
116
     */
117 10
    protected function getChannel(): AMQPChannel
118
    {
119 10
        return $this->getConnection()->getChannel();
120
    }
121
122
    /**
123
     * Create the Queue
124
     */
125 6
    public function create()
126
    {
127
        try {
128 6
            $this->getChannel()
129 6
                ->exchange_declare(
130 6
                    $this->attributes['name'],
131 6
                    $this->attributes['exchange_type'],
132 6
                    $this->attributes['passive'],
133 6
                    $this->attributes['durable'],
134 6
                    $this->attributes['auto_delete'],
135 6
                    $this->attributes['internal'],
136 6
                    $this->attributes['nowait']
137
                );
138 2
        } catch (AMQPProtocolChannelException $e) {
139
            // 406 is a soft error triggered for precondition failure (when redeclaring with different parameters)
140 2
            if (true === $this->attributes['throw_exception_on_redeclare'] || $e->amqp_reply_code !== 406) {
141 1
                throw $e;
142
            }
143
            // a failure trigger channels closing process
144 1
            $this->getConnection()->reconnect();
145
        }
146 5
    }
147
148
    /**
149
     * @throws AMQPProtocolChannelException
150
     */
151 5
    public function bind()
152
    {
153 5
        if (!isset($this->attributes['bind']) || empty($this->attributes['bind'])) {
154 1
            return;
155
        }
156 4
        foreach ($this->attributes['bind'] as $bindItem) {
157
            try {
158 4
                $this->getChannel()
159 4
                    ->queue_bind(
160 4
                        $bindItem['queue'],
161 4
                        $this->attributes['name'],
162 4
                        $bindItem['routing_key']
163
                    );
164
            } catch (AMQPProtocolChannelException $e) {
165
                // 404 is the code for trying to bind to an non-existing entity
166
                if (true === $this->attributes['throw_exception_on_bind_fail'] || $e->amqp_reply_code !== 404) {
167
                    throw $e;
168
                }
169 4
                $this->getConnection()->reconnect();
170
            }
171
        }
172 4
    }
173
174
    /**
175
     * Delete the queue
176
     */
177 1
    public function delete()
178
    {
179 1
        $this->getChannel()->exchange_delete($this->attributes['name']);
180 1
    }
181
182
    /**
183
     * {@inheritdoc}
184
     */
185
    public function reconnect()
186
    {
187
        $this->getConnection()->reconnect();
188
    }
189
190
    /**
191
     * Publish a message
192
     *
193
     * @param string $message
194
     * @param string $routingKey
195
     * @return mixed|void
196
     * @throws AMQPProtocolChannelException
197
     */
198 5
    public function publish(string $message, string $routingKey = '')
199
    {
200 5
        if ($this->attributes['auto_create'] === true) {
201 3
            $this->create();
202 3
            $this->bind();
203
        }
204
        try {
205 5
            $this->getChannel()->basic_publish(
206 5
                new AMQPMessage($message),
207 5
                $this->attributes['name'],
208
                $routingKey,
209 5
                true
210
            );
211 4
            $this->retryCount = 0;
212 2
        } catch (AMQPChannelClosedException $exception) {
213 2
            $this->retryCount++;
214
            // Retry publishing with re-connect
215 2
            if ($this->retryCount < self::MAX_RETRIES) {
216 2
                $this->getConnection()->reconnect();
217 2
                $this->publish($message, $routingKey);
218 1
                return;
219
            }
220 1
            throw $exception;
221
        }
222 4
    }
223
}
224