GitHub Access Token became invalid

It seems like the GitHub access token used for retrieving details about this repository from GitHub became invalid. This might prevent certain types of inspections from being run (in particular, everything related to pull requests).
Please ask an admin of your repository to re-new the access token on this website.

AMQPManager::getConnection()   A
last analyzed

Complexity

Conditions 2
Paths 1

Size

Total Lines 4
Code Lines 2

Duplication

Lines 0
Ratio 0 %

Importance

Changes 1
Bugs 0 Features 0
Metric Value
cc 2
eloc 2
c 1
b 0
f 0
nc 1
nop 0
dl 0
loc 4
rs 10
1
<?php
2
/**
3
 * This file is part of the mucts.com.
4
 *
5
 * This source file is subject to the MIT license that is bundled
6
 * with this source code in the file LICENSE.
7
 *
8
 * @version 1.0
9
 * @author herry<[email protected]>
10
 * @copyright © 2020  MuCTS.com All Rights Reserved.
11
 */
12
13
namespace MuCTS\Laravel\AMQP;
14
15
16
use Closure;
17
use ErrorException;
18
use Exception;
19
use Illuminate\Container\Container;
20
use Illuminate\Foundation\Application;
21
use Illuminate\Support\Arr;
22
use Illuminate\Support\Collection;
23
use InvalidArgumentException;
24
use PhpAmqpLib\Channel\AMQPChannel;
25
use PhpAmqpLib\Connection\AbstractConnection;
26
use PhpAmqpLib\Connection\AMQPStreamConnection;
27
use PhpAmqpLib\Message\AMQPMessage;
28
29
/**
30
 * Class AMQPManager
31
 *
32
 * @mixin AMQPStreamConnection
33
 * @package MuCTS\Laravel\AMQP
34
 */
35
class AMQPManager
36
{
37
    /**
38
     * @var Application|Container
39
     */
40
    private $app;
41
42
    /**
43
     * AMQPMessage Connection Name
44
     *
45
     * @var string|null
46
     */
47
    private ?string $connection = null;
48
49
    /**
50
     * AMQPQueue Name
51
     * 队列名
52
     *
53
     * @var string
54
     */
55
    private string $queue = '';
56
57
    /**
58
     * AMQPExchange Name
59
     * 交换机
60
     *
61
     * @var string
62
     */
63
    private string $exchange = '';
64
65
    /**
66
     * AMQPExchange Type
67
     * 交换机类型
68
     *
69
     * @var string
70
     */
71
    private string $exchangeType = '';
72
73
    /**
74
     * Consumer identifier
75
     * 用户标签
76
     *
77
     * @var string
78
     */
79
    private string $consumerTag = '';
80
81
    /**
82
     * AMQPMessage Route Key
83
     * 路由键
84
     *
85
     * @var string
86
     */
87
    private string $routeKey = '';
88
89
    /**
90
     * Auto Ack
91
     * 自动消费
92
     *
93
     * @var bool
94
     */
95
    private bool $autoAck = false;
96
97
    /**
98
     * RabbitMQ constructor.
99
     * @param Application|Container $app
100
     */
101
    public function __construct($app)
102
    {
103
        $this->app = $app;
104
    }
105
106
    /**
107
     * Get default connection
108
     *
109
     * @return mixed
110
     */
111
    public function getDefaultConnection()
112
    {
113
        return $this->app['config']['amqp.default'];
114
    }
115
116
    /**
117
     * amqp connection
118
     *
119
     * @param string|null $name
120
     * @return $this
121
     * @throws Exception
122
     */
123
    public function connection(?string $name = null)
124
    {
125
        $this->connection = $name ?: $this->getDefaultConnection();
126
        return $this;
127
    }
128
129
    /**
130
     * Get the configuration for a connection.
131
     *
132
     * @param string $name
133
     * @return array
134
     * @throws InvalidArgumentException
135
     */
136
    protected function configuration(string $name): array
137
    {
138
        $name = $name ?: $this->getDefaultConnection();
139
        $connections = $this->app['config']['amqp.connections'];
140
        if (is_null($config = Arr::get($connections, $name))) {
141
            throw new InvalidArgumentException("RabbitMQ connection [{$name}] not configured.");
142
        }
143
        return $config;
144
    }
145
146
    /**
147
     * Make the amqp connection instance.
148
     *
149
     * @param string $name
150
     * @return AMQPStreamConnection
151
     * @throws Exception
152
     */
153
    protected function makeConnection(string $name)
154
    {
155
        $config = $this->configuration($name);
156
        if (isset($config['host'])) $config = [$config];
157
        return AMQPStreamConnection::create_connection($config);
158
    }
159
160
    /**
161
     * Get Connection
162
     *
163
     * @return mixed|AMQPStreamConnection
164
     * @throws Exception
165
     */
166
    protected function getConnection()
167
    {
168
        $name = $this->connection ?: $this->getDefaultConnection();
169
        return $this->makeConnection($name);
170
    }
171
172
    /**
173
     * Start Connection
174
     *
175
     * @return array
176
     * @throws Exception
177
     */
178
    protected function startConnection()
179
    {
180
        $connection = $this->getConnection();
181
        $channel = $connection->channel();
182
183
        /**
184
         * The following code is the same both in the consumer and the producer.
185
         * In this way we are sure we always have a queue to consume from and an
186
         * exchange where to publish messages.
187
         *
188
         * name: $queue
189
         * passive: false
190
         * durable: true the queue will survive server restarts
191
         * exclusive: false the queue can be accessed in other channels
192
         * auto_delete: false the queue won't be deleted once the channel is closed.
193
         */
194
        $channel->queue_declare($this->getQueue(), false, true, false, false);
195
196
        /**
197
         * name: $exchange
198
         * type: direct
199
         * passive: false
200
         * durable: true the exchange will survive server restarts
201
         * auto_delete: false the exchange won't be deleted once the channel is closed.
202
         */
203
        $channel->exchange_declare($this->getExchange(), $this->getExchangeTyp(), false, true, false);
204
205
        $channel->queue_bind($this->getQueue(), $this->getExchange(), $this->getRouteKey());
206
207
        return [$connection, $channel];
208
    }
209
210
    /**
211
     * set queue name
212
     *
213
     * @param string $queue
214
     * @return $this
215
     */
216
    public function setQueue(string $queue)
217
    {
218
        $this->queue = $queue;
219
        return $this;
220
    }
221
222
    /**
223
     * get queue name
224
     *
225
     * @return string
226
     */
227
    public function getQueue(): string
228
    {
229
        if (empty($this->queue)) {
230
            throw new InvalidArgumentException("'queue' key is required.");
231
        }
232
        return $this->queue;
233
    }
234
235
    /**
236
     * Set exchange name
237
     *
238
     * @param string $exchange
239
     * @return $this
240
     */
241
    public function setExchange(string $exchange)
242
    {
243
        $this->exchange = $exchange;
244
        return $this;
245
    }
246
247
    /**
248
     * Get exchange name
249
     *
250
     * @return string
251
     */
252
    public function getExchange(): string
253
    {
254
        if (empty($this->exchange)) {
255
            throw new InvalidArgumentException("'exchange' key is required.");
256
        }
257
        return $this->exchange;
258
    }
259
260
    /**
261
     * Set exchange name
262
     *
263
     * @param string $exchangeType
264
     * @return $this
265
     */
266
    public function setExchangeType(string $exchangeType)
267
    {
268
        $this->exchangeType = $exchangeType;
269
        return $this;
270
    }
271
272
    /**
273
     * Get exchange name
274
     *
275
     * @return string
276
     */
277
    public function getExchangeTyp(): string
278
    {
279
        if (empty($this->exchangeType)) {
280
            throw new InvalidArgumentException("'exchange type' key is required.");
281
        }
282
        return $this->exchangeType;
283
    }
284
285
    /**
286
     * Set Route Key
287
     *
288
     * @param string $routeKey
289
     * @return $this
290
     */
291
    public function setRouteKey(string $routeKey)
292
    {
293
        $this->routeKey = $routeKey;
294
        return $this;
295
    }
296
297
    /**
298
     * Get Route Key
299
     *
300
     * @return string
301
     */
302
    public function getRouteKey(): string
303
    {
304
        return $this->routeKey;
305
    }
306
307
    /**
308
     * Set Consumer identifier
309
     *
310
     * @param string $consumerTag
311
     * @return AMQPManager
312
     */
313
    public function setConsumerTag(string $consumerTag)
314
    {
315
        $this->consumerTag = $consumerTag;
316
        return $this;
317
    }
318
319
    /**
320
     * Get Consumer identifier
321
     *
322
     * @return string
323
     */
324
    public function getConsumerTag(): string
325
    {
326
        return $this->consumerTag;
327
    }
328
329
    /**
330
     * Set auto ask
331
     *
332
     * @param bool $autoAsk
333
     * @return $this
334
     */
335
    public function setAutoAck(bool $autoAsk)
336
    {
337
        $this->autoAck = $autoAsk;
338
        return $this;
339
    }
340
341
    /**
342
     * Get auto ask
343
     *
344
     * @return bool
345
     */
346
    public function getAutoAck(): bool
347
    {
348
        return $this->autoAck;
349
    }
350
351
    /**
352
     * Amqp Message Consumer
353
     *
354
     * @param callable $processMessage
355
     * @throws ErrorException
356
     * @throws Exception
357
     */
358
    public function consume(callable $processMessage)
359
    {
360
        /** @var AMQPStreamConnection $connection */
361
        /** @var AMQPChannel $channel */
362
        list($connection, $channel) = $this->startConnection();
363
364
        /**
365
         *  queue: Queue from where to get the messages
366
         * consumer_tag: Consumer identifier
367
         * no_local: Don't receive messages published by this consumer.
368
         * no_ack: If set to true, automatic acknowledgement mode will be used by this consumer.See https://www.rabbitmq.com/confirms.html for details.
369
         * exclusive: Request exclusive consumer access, meaning only this consumer can access the queue
370
         * nowait:
371
         * callback: A PHP Callback
372
         */
373
        $channel->basic_consume($this->getQueue(),
374
            $this->getConsumerTag(),
375
            false, $this->getAutoAck(),
376
            false,
377
            false,
378
            function ($message) use ($processMessage) {
379
                /** @var AMQPMessage $message */
380
                $message = new Message($message);
381
                return $processMessage($message);
382
            });
383
384
        register_shutdown_function($this->shutdown(), $channel, $connection);
385
386
        // Loop as long as the channel has callbacks registered
387
        while ($channel->is_consuming()) {
388
            $channel->wait();
389
        }
390
    }
391
392
    /**
393
     * shutdown
394
     *
395
     * @return Closure
396
     */
397
    public function shutdown()
398
    {
399
        /**
400
         * @param AMQPChannel $channel
401
         * @param AbstractConnection $connection
402
         */
403
        return function ($channel, $connection) {
404
            $channel->close();
405
            $connection->close();
406
        };
407
    }
408
409
    /**
410
     * Amqp Message Publisher
411
     *
412
     * @param string|array|object|Collection|AMQPMessage|Message $message
413
     * @throws Exception
414
     */
415
    public function publish($message)
416
    {
417
        /** @var AMQPStreamConnection $connection */
418
        /** @var AMQPChannel $channel */
419
        list($connection, $channel) = $this->startConnection();
420
421
        if ($message instanceof Message) $message->getMessage();
422
        if (!$message instanceof AMQPMessage) {
423
            if ($message instanceof Collection) $message = $message->toJson();
424
            elseif (is_array($message) || is_object($message)) $message = json_encode($message);
425
            $message = new AMQPMessage($message, ['content_type' => 'text/plain', 'delivery_mode' => AMQPMessage::DELIVERY_MODE_PERSISTENT]);
426
        }
427
428
        $channel->basic_publish($message, $this->getExchange(), $this->getRouteKey());
429
430
        $channel->close();
431
        $connection->close();
432
    }
433
434
435
    /**
436
     * Dynamically pass methods to the default connection.
437
     *
438
     * @param string $method
439
     * @param array $parameters
440
     * @return mixed
441
     * @throws Exception
442
     */
443
    public function __call($method, $parameters)
444
    {
445
        return $this->getConnection()->{$method}(...$parameters);
446
    }
447
448
    /**
449
     * Dynamically pass value to the default connection.
450
     *
451
     * @param $name
452
     * @return mixed
453
     * @throws Exception
454
     */
455
    public function __get($name)
456
    {
457
        return $this->getConnection()->{$name};
458
    }
459
}