ExchangeEntity::reconnect()   A
last analyzed

Complexity

Conditions 1
Paths 1

Size

Total Lines 3
Code Lines 1

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
cc 1
eloc 1
nc 1
nop 0
dl 0
loc 3
rs 10
c 0
b 0
f 0
1
<?php
2
namespace NeedleProject\LaravelRabbitMq\Entity;
3
4
use NeedleProject\LaravelRabbitMq\AMQPConnection;
5
use NeedleProject\LaravelRabbitMq\Interpreter\EntityArgumentsInterpreter;
6
use NeedleProject\LaravelRabbitMq\PublisherInterface;
7
use PhpAmqpLib\Channel\AMQPChannel;
8
use PhpAmqpLib\Exception\AMQPChannelClosedException;
9
use PhpAmqpLib\Exception\AMQPProtocolChannelException;
10
use PhpAmqpLib\Message\AMQPMessage;
11
use PhpAmqpLib\Wire\AMQPTable;
12
13
/**
14
 * Class ExchangeEntity
15
 *
16
 * @package NeedleProject\LaravelRabbitMq\Entity
17
 * @author  Adrian Tilita <[email protected]>
18
 */
19
class ExchangeEntity implements PublisherInterface, AMQPEntityInterface
20
{
21
    /**
22
     * @const int   Retry count when a Channel Closed exception is thrown
23
     */
24
    const MAX_RETRIES = 3;
25
26
    /**
27
     * @const array Default connections parameters
28
     */
29
    const DEFAULTS = [
30
        'exchange_type'                => 'topic',
31
        // Whether to check if it exists or to verify existance using argument types (Throws PRECONDITION_FAILED)
32
        'passive'                      => false,
33
        // Entities with durable will be re-created uppon server restart
34
        'durable'                      => false,
35
        // Whether to delete it when no queues ar bind to it
36
        'auto_delete'                  => false,
37
        // Whether the exchange can be used by a publisher or block it (declared just for internal "wiring")
38
        'internal'                     => false,
39
        // Whether to receive a Declare confirmation
40
        'nowait'                       => false,
41
        // Additional arguments for queue creation
42
        'arguments'                    => [],
43
        // Whether to auto create the entity before publishing/consuming it
44
        'auto_create'                  => false,
45
        // whether to "hide" the exception on re-declare.
46
        // if the `passive` filter is set true, this is redundant
47
        'throw_exception_on_redeclare' => true,
48
        // whether to throw on exception when trying to
49
        // bind to an in-existent queue/exchange
50
        'throw_exception_on_bind_fail' => true,
51
        // no ideea what it represents - @todo - find a documentation that states it's role
52
        'ticket'                       => null
53
    ];
54
55
    /**
56
     * @var AMQPConnection
57
     */
58
    protected $connection;
59
60
    /**
61
     * @var string
62
     */
63
    protected $aliasName;
64
65
    /**
66
     * @var array
67
     */
68
    protected $attributes;
69
70
    /**
71
     * @var int
72
     */
73
    protected $retryCount = 0;
74
75
    /**
76
     * ExchangeEntity constructor.
77
     *
78
     * @param AMQPConnection $connection
79
     * @param string $aliasName
80
     * @param array $attributes
81
     */
82
    public function __construct(AMQPConnection $connection, string $aliasName, array $attributes = [])
83
    {
84
        $this->connection = $connection;
85
        $this->aliasName  = $aliasName;
86
        $this->attributes = $attributes;
87
    }
88
89
    /**
90
     * @param AMQPConnection $connection
91
     * @param string $aliasName
92
     * @param array $exchangeDetails
93
     * @return ExchangeEntity
94
     */
95
    public static function createExchange(AMQPConnection $connection, string $aliasName, array $exchangeDetails)
96
    {
97
        return new static(
98
            $connection,
99
            $aliasName,
100
            array_merge(self::DEFAULTS, $exchangeDetails)
101
        );
102
    }
103
104
    /**
105
     * @return string
106
     */
107
    public function getAliasName(): string
108
    {
109
        return $this->aliasName;
110
    }
111
112
    /**
113
     * @return AMQPConnection
114
     */
115
    protected function getConnection(): AMQPConnection
116
    {
117
        return $this->connection;
118
    }
119
120
    /**
121
     * @return AMQPChannel
122
     */
123
    protected function getChannel(): AMQPChannel
124
    {
125
        return $this->getConnection()->getChannel();
0 ignored issues
show
Bug Best Practice introduced by
The expression return $this->getConnection()->getChannel() could return the type null which is incompatible with the type-hinted return PhpAmqpLib\Channel\AMQPChannel. Consider adding an additional type-check to rule them out.
Loading history...
126
    }
127
128
    /**
129
     * Create the Queue
130
     */
131
    public function create()
132
    {
133
        try {
134
            $this->getChannel()
135
                ->exchange_declare(
136
                    $this->attributes['name'],
137
                    $this->attributes['exchange_type'],
138
                    $this->attributes['passive'],
139
                    $this->attributes['durable'],
140
                    $this->attributes['auto_delete'],
141
                    $this->attributes['internal'],
142
                    $this->attributes['nowait'],
143
                    EntityArgumentsInterpreter::interpretArguments(
144
                        $this->attributes['arguments']
145
                    ),
146
                    $this->attributes['ticket']
147
                );
148
        } catch (AMQPProtocolChannelException $e) {
149
            // 406 is a soft error triggered for precondition failure (when redeclaring with different parameters)
150
            if (true === $this->attributes['throw_exception_on_redeclare'] || $e->amqp_reply_code !== 406) {
151
                throw $e;
152
            }
153
            // a failure trigger channels closing process
154
            $this->getConnection()->reconnect();
155
        }
156
    }
157
158
    /**
159
     * @throws AMQPProtocolChannelException
160
     */
161
    public function bind()
162
    {
163
        if (!isset($this->attributes['bind']) || empty($this->attributes['bind'])) {
164
            return;
165
        }
166
        foreach ($this->attributes['bind'] as $bindItem) {
167
            try {
168
                $this->getChannel()
169
                    ->queue_bind(
170
                        $bindItem['queue'],
171
                        $this->attributes['name'],
172
                        $bindItem['routing_key']
173
                    );
174
            } catch (AMQPProtocolChannelException $e) {
175
                // 404 is the code for trying to bind to a non-existing entity
176
                if (true === $this->attributes['throw_exception_on_bind_fail'] || $e->amqp_reply_code !== 404) {
177
                    throw $e;
178
                }
179
                $this->getConnection()->reconnect();
180
            }
181
        }
182
    }
183
184
    /**
185
     * Delete the queue
186
     */
187
    public function delete()
188
    {
189
        $this->getChannel()->exchange_delete($this->attributes['name']);
190
    }
191
192
    /**
193
     * {@inheritdoc}
194
     */
195
    public function reconnect()
196
    {
197
        $this->getConnection()->reconnect();
198
    }
199
200
    /**
201
     * Publish a message
202
     *
203
     * @param string $message
204
     * @param string $routingKey
205
     * @param array $properties
206
     * @return mixed|void
207
     * @throws AMQPProtocolChannelException
208
     */
209
    public function publish(string $message, string $routingKey = '', array $properties = [])
210
    {
211
        try {
212
            if ($this->attributes['auto_create'] === true) {
213
                $this->create();
214
                $this->bind();
215
            }
216
            $this->getChannel()->basic_publish(
217
                new AMQPMessage(
218
                    $message,
219
                    EntityArgumentsInterpreter::interpretProperties(
220
                        $this->attributes,
221
                        $properties
222
                    )
223
                ),
224
                $this->attributes['name'],
225
                $routingKey,
226
                true
227
            );
228
            $this->retryCount = 0;
229
        } catch (AMQPChannelClosedException $exception) {
230
            $this->retryCount++;
231
            // Retry publishing with re-connect
232
            if ($this->retryCount < self::MAX_RETRIES) {
233
                $this->getConnection()->reconnect();
234
                $this->publish($message, $routingKey);
235
                return;
236
            }
237
            throw $exception;
238
        }
239
    }
240
}
241