Passed
Push — master ( 802214...5abda7 )
by Tilita
01:20 queued 11s
created

ExchangeEntity::getConnection()   A

Complexity

Conditions 1
Paths 1

Size

Total Lines 4

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 2
CRAP Score 1

Importance

Changes 0
Metric Value
dl 0
loc 4
ccs 2
cts 2
cp 1
rs 10
c 0
b 0
f 0
cc 1
nc 1
nop 0
crap 1
1
<?php
2
namespace NeedleProject\LaravelRabbitMq\Entity;
3
4
use NeedleProject\LaravelRabbitMq\AMQPConnection;
5
use NeedleProject\LaravelRabbitMq\PublisherInterface;
6
use PhpAmqpLib\Channel\AMQPChannel;
7
use PhpAmqpLib\Exception\AMQPChannelClosedException;
8
use PhpAmqpLib\Exception\AMQPProtocolChannelException;
9
use PhpAmqpLib\Message\AMQPMessage;
10
11
/**
12
 * Class ExchangeEntity
13
 *
14
 * @package NeedleProject\LaravelRabbitMq\Entity
15
 * @author  Adrian Tilita <[email protected]>
16
 */
17
class ExchangeEntity implements PublisherInterface, AMQPEntityInterface
18
{
19
    /**
20
     * @const int   Retry count when a Channel Closed exeption is thrown
21
     */
22
    const MAX_RETRIES = 3;
23
24
    /**
25
     * @const array Default connections parameters
26
     */
27
    const DEFAULTS = [
28
        'exchange_type'                => 'topic',
29
        // Whether to check if it exists or to verify existance using argument types (Throws PRECONDITION_FAILED)
30
        'passive'                      => false,
31
        // Entities with durable will be re-created uppon server restart
32
        'durable'                      => false,
33
        // Whether to delete it when no queues ar bind to it
34
        'auto_delete'                  => false,
35
        // Whether the exchange can be used by a publisher or block it (declared just for internal "wiring")
36
        'internal'                     => false,
37
        // Whether to receive a Declare confirmation
38
        'nowait'                       => false,
39
        // Whether to auto create the entity before publishing/consuming it
40
        'auto_create'                  => false,
41
        // whether to "hide" the exception on re-declare.
42
        // if the `passive` filter is set true, this is redundant
43
        'throw_exception_on_redeclare' => true,
44
        // whether to throw on exception when trying to
45
        // bind to an in-existent queue/exchange
46
        'throw_exception_on_bind_fail' => true,
47
    ];
48
49
    /**
50
     * @var AMQPConnection
51
     */
52
    protected $connection;
53
54
    /**
55
     * @var string
56
     */
57
    protected $aliasName;
58
59
    /**
60
     * @var array
61
     */
62
    protected $attributes;
63
64
    /**
65
     * @var int 
66
     */
67
    protected $retryCount = 0;
68
69
    /**
70
     * ExchangeEntity constructor.
71
     *
72
     * @param AMQPConnection $connection
73
     * @param string $aliasName
74
     * @param array $attributes
75
     */
76 15
    public function __construct(AMQPConnection $connection, string $aliasName, array $attributes = [])
77
    {
78 15
        $this->connection = $connection;
79 15
        $this->aliasName  = $aliasName;
80 15
        $this->attributes = $attributes;
81 15
    }
82
83
    /**
84
     * @param AMQPConnection $connection
85
     * @param string $aliasName
86
     * @param array $exchangeDetails
87
     * @return ExchangeEntity
88
     */
89 15
    public static function createExchange(AMQPConnection $connection, string $aliasName, array $exchangeDetails)
90
    {
91 15
        return new static(
92
            $connection,
93
            $aliasName,
94 15
            array_merge(self::DEFAULTS, $exchangeDetails)
95
        );
96
    }
97
98
    /**
99
     * @return string
100
     */
101 2
    public function getAliasName(): string
102
    {
103 2
        return $this->aliasName;
104
    }
105
106
    /**
107
     * @return AMQPConnection
108
     */
109 10
    protected function getConnection(): AMQPConnection
110
    {
111 10
        return $this->connection;
112
    }
113
114
    /**
115
     * @return AMQPChannel
116
     */
117 10
    protected function getChannel(): AMQPChannel
118
    {
119 10
        return $this->getConnection()->getChannel();
120
    }
121
122
    /**
123
     * Create the Queue
124
     */
125 6 View Code Duplication
    public function create()
0 ignored issues
show
Duplication introduced by
This method seems to be duplicated in your project.

Duplicated code is one of the most pungent code smells. If you need to duplicate the same code in three or more different places, we strongly encourage you to look into extracting the code into a single class or operation.

You can also find more detailed suggestions in the “Code” section of your repository.

Loading history...
126
    {
127
        try {
128 6
            $this->getChannel()
129 6
                ->exchange_declare(
130 6
                    $this->attributes['name'],
131 6
                    $this->attributes['exchange_type'],
132 6
                    $this->attributes['passive'],
133 6
                    $this->attributes['durable'],
134 6
                    $this->attributes['auto_delete'],
135 6
                    $this->attributes['internal'],
136 6
                    $this->attributes['nowait']
137
                );
138 2
        } catch (AMQPProtocolChannelException $e) {
139
            // 406 is a soft error triggered for precondition failure (when redeclaring with different parameters)
140 2
            if (true === $this->attributes['throw_exception_on_redeclare'] || $e->amqp_reply_code !== 406) {
0 ignored issues
show
Unused Code Bug introduced by
The strict comparison !== seems to always evaluate to true as the types of $e->amqp_reply_code (string) and 406 (integer) can never be identical. Maybe you want to use a loose comparison != instead?
Loading history...
141 1
                throw $e;
142
            }
143
            // a failure trigger channels closing process
144 1
            $this->getConnection()->reconnect();
145
        }
146 5
    }
147
148
    /**
149
     * @throws AMQPProtocolChannelException
150
     */
151 5 View Code Duplication
    public function bind()
0 ignored issues
show
Duplication introduced by
This method seems to be duplicated in your project.

Duplicated code is one of the most pungent code smells. If you need to duplicate the same code in three or more different places, we strongly encourage you to look into extracting the code into a single class or operation.

You can also find more detailed suggestions in the “Code” section of your repository.

Loading history...
152
    {
153 5
        if (!isset($this->attributes['bind']) || empty($this->attributes['bind'])) {
154 1
            return;
155
        }
156 4
        foreach ($this->attributes['bind'] as $bindItem) {
157
            try {
158 4
                $this->getChannel()
159 4
                    ->queue_bind(
160 4
                        $bindItem['queue'],
161 4
                        $this->attributes['name'],
162 4
                        $bindItem['routing_key']
163
                    );
164
            } catch (AMQPProtocolChannelException $e) {
165
                // 404 is the code for trying to bind to an non-existing entity
166
                if (true === $this->attributes['throw_exception_on_bind_fail'] || $e->amqp_reply_code !== 404) {
0 ignored issues
show
Unused Code Bug introduced by
The strict comparison !== seems to always evaluate to true as the types of $e->amqp_reply_code (string) and 404 (integer) can never be identical. Maybe you want to use a loose comparison != instead?
Loading history...
167
                    throw $e;
168
                }
169 4
                $this->getConnection()->reconnect();
170
            }
171
        }
172 4
    }
173
174
    /**
175
     * Delete the queue
176
     */
177 1
    public function delete()
178
    {
179 1
        $this->getChannel()->exchange_delete($this->attributes['name']);
180 1
    }
181
182
    /**
183
     * {@inheritdoc}
184
     */
185
    public function reconnect()
186
    {
187
        $this->getConnection()->reconnect();
188
    }
189
190
    /**
191
     * Publish a message
192
     *
193
     * @param string $message
194
     * @param string $routingKey
195
     * @return mixed|void
196
     * @throws AMQPProtocolChannelException
197
     */
198 5 View Code Duplication
    public function publish(string $message, string $routingKey = '')
0 ignored issues
show
Duplication introduced by
This method seems to be duplicated in your project.

Duplicated code is one of the most pungent code smells. If you need to duplicate the same code in three or more different places, we strongly encourage you to look into extracting the code into a single class or operation.

You can also find more detailed suggestions in the “Code” section of your repository.

Loading history...
199
    {
200 5
        if ($this->attributes['auto_create'] === true) {
201 3
            $this->create();
202 3
            $this->bind();
203
        }
204
        try {
205 5
            $this->getChannel()->basic_publish(
206 5
                new AMQPMessage($message),
207 5
                $this->attributes['name'],
208
                $routingKey,
209 5
                true
210
            );
211 4
            $this->retryCount = 0;
212 2
        } catch (AMQPChannelClosedException $exception) {
213 2
            $this->retryCount++;
214
            // Retry publishing with re-connect
215 2
            if ($this->retryCount < self::MAX_RETRIES) {
216 2
                $this->getConnection()->reconnect();
217 2
                $this->publish($message, $routingKey);
218 1
                return;
219
            }
220 1
            throw $exception;
221
        }
222 4
    }
223
}
224