Issues (10)

src/Entity/ExchangeEntity.php (1 issue)

Severity
1
<?php
2
/**
3
 * Author: Joker
4
 * Date: 2020-05-08 13:57
5
 */
6
7
namespace JokerProject\LaravelAliyunAmqp\Entity;
8
9
use JokerProject\LaravelAliyunAmqp\AMQPConnection;
10
use JokerProject\LaravelAliyunAmqp\PublisherInterface;
11
use PhpAmqpLib\Channel\AMQPChannel;
12
use PhpAmqpLib\Exception\AMQPChannelClosedException;
13
use PhpAmqpLib\Exception\AMQPProtocolChannelException;
14
use PhpAmqpLib\Message\AMQPMessage;
15
16
/**
17
 * Class ExchangeEntity
18
 *
19
 * @package JokerProject\LaravelAliyunAmqp\Entity
20
 */
21
class ExchangeEntity implements PublisherInterface, AMQPEntityInterface
22
{
23
    /**
24
     * @const int   Retry count when a Channel Closed exeption is thrown
25
     */
26
    const MAX_RETRIES = 3;
27
28
    /**
29
     * @const array Default connections parameters
30
     */
31
    const DEFAULTS = [
32
        'exchange_type'                => 'topic',
33
        // Whether to check if it exists or to verify existance using argument types (Throws PRECONDITION_FAILED)
34
        'passive'                      => false,
35
        // Entities with durable will be re-created uppon server restart
36
        'durable'                      => false,
37
        // Whether to delete it when no queues ar bind to it
38
        'auto_delete'                  => false,
39
        // Whether the exchange can be used by a publisher or block it (declared just for internal "wiring")
40
        'internal'                     => false,
41
        // Whether to receive a Declare confirmation
42
        'nowait'                       => false,
43
        // Whether to auto create the entity before publishing/consuming it
44
        'auto_create'                  => false,
45
        // whether to "hide" the exception on re-declare.
46
        // if the `passive` filter is set true, this is redundant
47
        'throw_exception_on_redeclare' => true,
48
        // whether to throw on exception when trying to
49
        // bind to an in-existent queue/exchange
50
        'throw_exception_on_bind_fail' => true,
51
    ];
52
53
    /**
54
     * @var AMQPConnection
55
     */
56
    protected $connection;
57
58
    /**
59
     * @var string
60
     */
61
    protected $aliasName;
62
63
    /**
64
     * @var array
65
     */
66
    protected $attributes;
67
68
    /**
69
     * @var int 
70
     */
71
    protected $retryCount = 0;
72
73
    /**
74
     * ExchangeEntity constructor.
75
     *
76
     * @param AMQPConnection $connection
77
     * @param string $aliasName
78
     * @param array $attributes
79
     */
80
    public function __construct(AMQPConnection $connection, string $aliasName, array $attributes = [])
81
    {
82
        $this->connection = $connection;
83
        $this->aliasName  = $aliasName;
84
        $this->attributes = $attributes;
85
    }
86
87
    /**
88
     * @param AMQPConnection $connection
89
     * @param string $aliasName
90
     * @param array $exchangeDetails
91
     * @return ExchangeEntity
92
     */
93
    public static function createExchange(AMQPConnection $connection, string $aliasName, array $exchangeDetails)
94
    {
95
        return new static(
96
            $connection,
97
            $aliasName,
98
            array_merge(self::DEFAULTS, $exchangeDetails)
99
        );
100
    }
101
102
    /**
103
     * @return string
104
     */
105
    public function getAliasName(): string
106
    {
107
        return $this->aliasName;
108
    }
109
110
    /**
111
     * @return AMQPConnection
112
     */
113
    protected function getConnection(): AMQPConnection
114
    {
115
        return $this->connection;
116
    }
117
118
    /**
119
     * @return AMQPChannel
120
     */
121
    protected function getChannel(): AMQPChannel
122
    {
123
        return $this->getConnection()->getChannel();
124
    }
125
126
    /**
127
     * Create the Queue
128
     */
129
    public function create()
130
    {
131
        try {
132
            $this->getChannel()
133
                ->exchange_declare(
134
                    $this->attributes['name'],
135
                    $this->attributes['exchange_type'],
136
                    $this->attributes['passive'],
137
                    $this->attributes['durable'],
138
                    $this->attributes['auto_delete'],
139
                    $this->attributes['internal'],
140
                    $this->attributes['nowait']
141
                );
142
        } catch (AMQPProtocolChannelException $e) {
143
            // 406 is a soft error triggered for precondition failure (when redeclaring with different parameters)
144
            if (true === $this->attributes['throw_exception_on_redeclare'] || $e->amqp_reply_code !== 406) {
0 ignored issues
show
The condition $e->amqp_reply_code !== 406 is always true.
Loading history...
145
                throw $e;
146
            }
147
            // a failure trigger channels closing process
148
            $this->getConnection()->reconnect();
149
        }
150
    }
151
152
    /**
153
     * @throws AMQPProtocolChannelException
154
     */
155
    public function bind()
156
    {
157
        if (!isset($this->attributes['bind']) || empty($this->attributes['bind'])) {
158
            return;
159
        }
160
        foreach ($this->attributes['bind'] as $bindItem) {
161
            try {
162
                $this->getChannel()
163
                    ->queue_bind(
164
                        $bindItem['queue'],
165
                        $this->attributes['name'],
166
                        $bindItem['routing_key']
167
                    );
168
            } catch (AMQPProtocolChannelException $e) {
169
                // 404 is the code for trying to bind to an non-existing entity
170
                if (true === $this->attributes['throw_exception_on_bind_fail'] || $e->amqp_reply_code !== 404) {
171
                    throw $e;
172
                }
173
                $this->getConnection()->reconnect();
174
            }
175
        }
176
    }
177
178
    /**
179
     * Delete the queue
180
     */
181
    public function delete()
182
    {
183
        $this->getChannel()->exchange_delete($this->attributes['name']);
184
    }
185
186
    /**
187
     * {@inheritdoc}
188
     */
189
    public function reconnect()
190
    {
191
        $this->getConnection()->reconnect();
192
    }
193
194
    /**
195
     * Publish a message
196
     *
197
     * @param string $message
198
     * @param string $routingKey
199
     * @return mixed|void
200
     * @throws AMQPProtocolChannelException
201
     */
202
    public function publish(string $message, string $routingKey = '')
203
    {
204
        if ($this->attributes['auto_create'] === true) {
205
            $this->create();
206
            $this->bind();
207
        }
208
        try {
209
            $this->getChannel()->basic_publish(
210
                new AMQPMessage($message),
211
                $this->attributes['name'],
212
                $routingKey,
213
                true
214
            );
215
            $this->retryCount = 0;
216
        } catch (AMQPChannelClosedException $exception) {
217
            $this->retryCount++;
218
            // Retry publishing with re-connect
219
            if ($this->retryCount < self::MAX_RETRIES) {
220
                $this->getConnection()->reconnect();
221
                $this->publish($message, $routingKey);
222
                return;
223
            }
224
            throw $exception;
225
        }
226
    }
227
}
228