ExchangeEntity   A
last analyzed

Complexity

Total Complexity 22

Size/Duplication

Total Lines 219
Duplicated Lines 0 %

Test Coverage

Coverage 90.91%

Importance

Changes 2
Bugs 0 Features 0
Metric Value
eloc 81
c 2
b 0
f 0
dl 0
loc 219
ccs 60
cts 66
cp 0.9091
rs 10
wmc 22

10 Methods

Rating   Name   Duplication   Size   Complexity  
B bind() 0 19 7
A reconnect() 0 3 1
A createExchange() 0 6 1
A delete() 0 3 1
A getChannel() 0 3 1
A getAliasName() 0 3 1
A getConnection() 0 3 1
A __construct() 0 5 1
A create() 0 24 4
A publish() 0 29 4
1
<?php
2
namespace NeedleProject\LaravelRabbitMq\Entity;
3
4
use NeedleProject\LaravelRabbitMq\AMQPConnection;
5
use NeedleProject\LaravelRabbitMq\Interpreter\EntityArgumentsInterpreter;
6
use NeedleProject\LaravelRabbitMq\PublisherInterface;
7
use PhpAmqpLib\Channel\AMQPChannel;
8
use PhpAmqpLib\Exception\AMQPChannelClosedException;
9
use PhpAmqpLib\Exception\AMQPProtocolChannelException;
10
use PhpAmqpLib\Message\AMQPMessage;
11
use PhpAmqpLib\Wire\AMQPTable;
12
13
/**
14
 * Class ExchangeEntity
15
 *
16
 * @package NeedleProject\LaravelRabbitMq\Entity
17
 * @author  Adrian Tilita <[email protected]>
18
 */
19
class ExchangeEntity implements PublisherInterface, AMQPEntityInterface
20
{
21
    /**
22
     * @const int   Retry count when a Channel Closed exception is thrown
23
     */
24
    const MAX_RETRIES = 3;
25
26
    /**
27
     * @const array Default connections parameters
28
     */
29
    const DEFAULTS = [
30
        'exchange_type'                => 'topic',
31
        // Whether to check if it exists or to verify existance using argument types (Throws PRECONDITION_FAILED)
32
        'passive'                      => false,
33
        // Entities with durable will be re-created uppon server restart
34
        'durable'                      => false,
35
        // Whether to delete it when no queues ar bind to it
36
        'auto_delete'                  => false,
37
        // Whether the exchange can be used by a publisher or block it (declared just for internal "wiring")
38
        'internal'                     => false,
39
        // Whether to receive a Declare confirmation
40
        'nowait'                       => false,
41
        // Additional arguments for queue creation
42
        'arguments'                    => [],
43
        // Whether to auto create the entity before publishing/consuming it
44
        'auto_create'                  => false,
45
        // whether to "hide" the exception on re-declare.
46
        // if the `passive` filter is set true, this is redundant
47
        'throw_exception_on_redeclare' => true,
48
        // whether to throw on exception when trying to
49
        // bind to an in-existent queue/exchange
50
        'throw_exception_on_bind_fail' => true,
51
        // no ideea what it represents - @todo - find a documentation that states it's role
52
        'ticket'                       => null
53
    ];
54
55
    /**
56
     * @var AMQPConnection
57
     */
58
    protected $connection;
59
60
    /**
61
     * @var string
62
     */
63
    protected $aliasName;
64
65
    /**
66
     * @var array
67
     */
68
    protected $attributes;
69
70
    /**
71
     * @var int
72
     */
73
    protected $retryCount = 0;
74
75
    /**
76 15
     * ExchangeEntity constructor.
77
     *
78 15
     * @param AMQPConnection $connection
79 15
     * @param string $aliasName
80 15
     * @param array $attributes
81 15
     */
82
    public function __construct(AMQPConnection $connection, string $aliasName, array $attributes = [])
83
    {
84
        $this->connection = $connection;
85
        $this->aliasName  = $aliasName;
86
        $this->attributes = $attributes;
87
    }
88
89 15
    /**
90
     * @param AMQPConnection $connection
91 15
     * @param string $aliasName
92
     * @param array $exchangeDetails
93
     * @return ExchangeEntity
94 15
     */
95
    public static function createExchange(AMQPConnection $connection, string $aliasName, array $exchangeDetails)
96
    {
97
        return new static(
98
            $connection,
99
            $aliasName,
100
            array_merge(self::DEFAULTS, $exchangeDetails)
101 2
        );
102
    }
103 2
104
    /**
105
     * @return string
106
     */
107
    public function getAliasName(): string
108
    {
109 10
        return $this->aliasName;
110
    }
111 10
112
    /**
113
     * @return AMQPConnection
114
     */
115
    protected function getConnection(): AMQPConnection
116
    {
117 10
        return $this->connection;
118
    }
119 10
120
    /**
121
     * @return AMQPChannel
122
     */
123
    protected function getChannel(): AMQPChannel
124
    {
125 6
        return $this->getConnection()->getChannel();
126
    }
127
128 6
    /**
129 6
     * Create the Queue
130 6
     */
131 6
    public function create()
132 6
    {
133 6
        try {
134 6
            $this->getChannel()
135 6
                ->exchange_declare(
136 6
                    $this->attributes['name'],
137
                    $this->attributes['exchange_type'],
138 2
                    $this->attributes['passive'],
139
                    $this->attributes['durable'],
140 2
                    $this->attributes['auto_delete'],
141 1
                    $this->attributes['internal'],
142
                    $this->attributes['nowait'],
143
                    EntityArgumentsInterpreter::interpretArguments(
144 1
                        $this->attributes['arguments']
145
                    ),
146 5
                    $this->attributes['ticket']
147
                );
148
        } catch (AMQPProtocolChannelException $e) {
149
            // 406 is a soft error triggered for precondition failure (when redeclaring with different parameters)
150
            if (true === $this->attributes['throw_exception_on_redeclare'] || $e->amqp_reply_code !== 406) {
151 5
                throw $e;
152
            }
153 5
            // a failure trigger channels closing process
154 1
            $this->getConnection()->reconnect();
155
        }
156 4
    }
157
158 4
    /**
159 4
     * @throws AMQPProtocolChannelException
160 4
     */
161 4
    public function bind()
162 4
    {
163
        if (!isset($this->attributes['bind']) || empty($this->attributes['bind'])) {
164
            return;
165
        }
166
        foreach ($this->attributes['bind'] as $bindItem) {
167
            try {
168
                $this->getChannel()
169 4
                    ->queue_bind(
170
                        $bindItem['queue'],
171
                        $this->attributes['name'],
172 4
                        $bindItem['routing_key']
173
                    );
174
            } catch (AMQPProtocolChannelException $e) {
175
                // 404 is the code for trying to bind to a non-existing entity
176
                if (true === $this->attributes['throw_exception_on_bind_fail'] || $e->amqp_reply_code !== 404) {
177 1
                    throw $e;
178
                }
179 1
                $this->getConnection()->reconnect();
180 1
            }
181
        }
182
    }
183
184
    /**
185
     * Delete the queue
186
     */
187
    public function delete()
188
    {
189
        $this->getChannel()->exchange_delete($this->attributes['name']);
190
    }
191
192
    /**
193
     * {@inheritdoc}
194
     */
195
    public function reconnect()
196
    {
197
        $this->getConnection()->reconnect();
198 5
    }
199
200 5
    /**
201 3
     * Publish a message
202 3
     *
203
     * @param string $message
204
     * @param string $routingKey
205 5
     * @param array $properties
206 5
     * @return mixed|void
207 5
     * @throws AMQPProtocolChannelException
208
     */
209 5
    public function publish(string $message, string $routingKey = '', array $properties = [])
210
    {
211 4
        try {
212 2
            if ($this->attributes['auto_create'] === true) {
213 2
                $this->create();
214
                $this->bind();
215 2
            }
216 2
            $this->getChannel()->basic_publish(
217 2
                new AMQPMessage(
218 1
                    $message,
219
                    EntityArgumentsInterpreter::interpretProperties(
220 1
                        $this->attributes,
221
                        $properties
222 4
                    )
223
                ),
224
                $this->attributes['name'],
225
                $routingKey,
226
                true
227
            );
228
            $this->retryCount = 0;
229
        } catch (AMQPChannelClosedException $exception) {
230
            $this->retryCount++;
231
            // Retry publishing with re-connect
232
            if ($this->retryCount < self::MAX_RETRIES) {
233
                $this->getConnection()->reconnect();
234
                $this->publish($message, $routingKey);
235
                return;
236
            }
237
            throw $exception;
238
        }
239
    }
240
}
241