GitHub Access Token became invalid

It seems like the GitHub access token used for retrieving details about this repository from GitHub became invalid. This might prevent certain types of inspections from being run (in particular, everything related to pull requests).
Please ask an admin of your repository to re-new the access token on this website.
Passed
Push — master ( fea049...d4e790 )
by herry
05:50 queued 05:42
created

AMQPManager::getExchangeTyp()   A

Complexity

Conditions 2
Paths 2

Size

Total Lines 6
Code Lines 3

Duplication

Lines 0
Ratio 0 %

Importance

Changes 1
Bugs 0 Features 0
Metric Value
cc 2
eloc 3
c 1
b 0
f 0
nc 2
nop 0
dl 0
loc 6
rs 10
1
<?php
2
/**
3
 * This file is part of the mucts.com.
4
 *
5
 * This source file is subject to the MIT license that is bundled
6
 * with this source code in the file LICENSE.
7
 *
8
 * @version 1.0
9
 * @author herry<[email protected]>
10
 * @copyright © 2020  MuCTS.com All Rights Reserved.
11
 */
12
13
namespace MuCTS\Laravel\AMQP;
14
15
16
use Closure;
17
use ErrorException;
18
use Exception;
19
use Illuminate\Container\Container;
20
use Illuminate\Foundation\Application;
21
use Illuminate\Support\Arr;
22
use Illuminate\Support\Collection;
23
use InvalidArgumentException;
24
use PhpAmqpLib\Channel\AMQPChannel;
25
use PhpAmqpLib\Connection\AbstractConnection;
26
use PhpAmqpLib\Connection\AMQPStreamConnection;
27
use PhpAmqpLib\Message\AMQPMessage;
28
29
/**
30
 * Class AMQPManager
31
 *
32
 * @mixin AMQPStreamConnection
33
 * @package MuCTS\Laravel\AMQP
34
 */
35
class AMQPManager
36
{
37
    /**
38
     * @var Application|Container
39
     */
40
    private $app;
41
42
    /**
43
     * AMQPMessage Connection Name
44
     *
45
     * @var string|null
46
     */
47
    private ?string $connectionName = null;
48
49
    /**
50
     * AMQPQueue Name
51
     * 队列名
52
     *
53
     * @var string
54
     */
55
    private string $queue = '';
56
57
    /**
58
     * AMQPExchange Name
59
     * 交换机
60
     *
61
     * @var string
62
     */
63
    private string $exchange = '';
64
65
    /**
66
     * AMQPExchange Type
67
     * 交换机类型
68
     *
69
     * @var string
70
     */
71
    private string $exchangeType = '';
72
73
    /**
74
     * Consumer identifier
75
     * 用户标签
76
     *
77
     * @var string
78
     */
79
    private string $consumerTag = '';
80
81
    /**
82
     * AMQPMessage Route Key
83
     * 路由键
84
     *
85
     * @var string
86
     */
87
    private string $routeKey = '';
88
89
    /**
90
     * Auto Ack
91
     * 自动消费
92
     *
93
     * @var bool
94
     */
95
    private bool $autoAck = false;
96
97
    /**
98
     * RabbitMQ constructor.
99
     * @param Application|Container $app
100
     */
101
    public function __construct($app)
102
    {
103
        $this->app = $app;
104
    }
105
106
    /**
107
     * Get default connection
108
     *
109
     * @return mixed
110
     */
111
    public function getDefaultConnection()
112
    {
113
        return $this->app['config']['amqp.default'];
114
    }
115
116
    /**
117
     * amqp connection
118
     *
119
     * @param string|null $name
120
     * @return $this
121
     * @throws Exception
122
     */
123
    public function connection(?string $name = null)
124
    {
125
        $this->connectionName = $name ?: $this->getDefaultConnection();
126
        return $this;
127
    }
128
129
    /**
130
     * Get the configuration for a connection.
131
     *
132
     * @param string $name
133
     * @return array
134
     * @throws InvalidArgumentException
135
     */
136
    protected function configuration(string $name): array
137
    {
138
        $name = $name ?: $this->getDefaultConnection();
139
        $connections = $this->app['config']['amqp.connections'];
140
        if (is_null($config = Arr::get($connections, $name))) {
141
            throw new InvalidArgumentException("RabbitMQ connection [{$name}] not configured.");
142
        }
143
        return $config;
144
    }
145
146
    /**
147
     * Make the amqp connection instance.
148
     *
149
     * @param string $name
150
     * @return AMQPStreamConnection
151
     * @throws Exception
152
     */
153
    protected function makeConnection(string $name)
154
    {
155
        $config = $this->configuration($name);
156
        if (isset($config['host'])) $config = [$config];
157
        return AMQPStreamConnection::create_connection($config);
158
    }
159
160
    /**
161
     * Get Connection
162
     *
163
     * @return mixed|AMQPStreamConnection
164
     * @throws Exception
165
     */
166
    protected function getConnection()
167
    {
168
        $name = $this->connectionName ?: $this->getDefaultConnection();
169
        return $this->makeConnection($name);
170
    }
171
172
    /**
173
     * set queue name
174
     *
175
     * @param string $queue
176
     * @return $this
177
     */
178
    public function setQueue(string $queue)
179
    {
180
        $this->queue = $queue;
181
        return $this;
182
    }
183
184
    /**
185
     * get queue name
186
     *
187
     * @return string
188
     */
189
    public function getQueue(): string
190
    {
191
        if (empty($this->queue)) {
192
            throw new InvalidArgumentException("'queue' key is required.");
193
        }
194
        return $this->queue;
195
    }
196
197
    /**
198
     * Set exchange name
199
     *
200
     * @param string $exchange
201
     * @return $this
202
     */
203
    public function setExchange(string $exchange)
204
    {
205
        $this->exchange = $exchange;
206
        return $this;
207
    }
208
209
    /**
210
     * Get exchange name
211
     *
212
     * @return string
213
     */
214
    public function getExchange(): string
215
    {
216
        if (empty($this->exchange)) {
217
            throw new InvalidArgumentException("'exchange' key is required.");
218
        }
219
        return $this->exchange;
220
    }
221
222
    /**
223
     * Set exchange name
224
     *
225
     * @param string $exchangeType
226
     * @return $this
227
     */
228
    public function setExchangeType(string $exchangeType)
229
    {
230
        $this->exchangeType = $exchangeType;
231
        return $this;
232
    }
233
234
    /**
235
     * Get exchange name
236
     *
237
     * @return string
238
     */
239
    public function getExchangeTyp(): string
240
    {
241
        if (empty($this->exchangeType)) {
242
            throw new InvalidArgumentException("'exchange type' key is required.");
243
        }
244
        return $this->exchangeType;
245
    }
246
247
    /**
248
     * Set Route Key
249
     *
250
     * @param string $routeKey
251
     * @return $this
252
     */
253
    public function setRouteKey(string $routeKey)
254
    {
255
        $this->routeKey = $routeKey;
256
        return $this;
257
    }
258
259
    /**
260
     * Get Route Key
261
     *
262
     * @return string
263
     */
264
    public function getRouteKey(): string
265
    {
266
        return $this->routeKey;
267
    }
268
269
    /**
270
     * Set Consumer identifier
271
     *
272
     * @param string $consumerTag
273
     * @return AMQPManager
274
     */
275
    public function setConsumerTag(string $consumerTag)
276
    {
277
        $this->consumerTag = $consumerTag;
278
        return $this;
279
    }
280
281
    /**
282
     * Get Consumer identifier
283
     *
284
     * @return string
285
     */
286
    public function getConsumerTag(): string
287
    {
288
        /*if (empty($this->consumerTag)) {
289
            throw new InvalidArgumentException("'consumer tag' key is required.");
290
        }*/
291
        return $this->consumerTag;
292
    }
293
294
    /**
295
     * Set auto ask
296
     *
297
     * @param bool $autoAsk
298
     * @return $this
299
     */
300
    public function setAutoAck(bool $autoAsk)
301
    {
302
        $this->autoAck = $autoAsk;
303
        return $this;
304
    }
305
306
    /**
307
     * Get auto ask
308
     *
309
     * @return bool
310
     */
311
    public function getAutoAck(): bool
312
    {
313
        return $this->autoAck;
314
    }
315
316
    /**
317
     * Amqp Message Consumer
318
     *
319
     * @param callable $processMessage
320
     * @throws ErrorException
321
     * @throws Exception
322
     */
323
    public function consume(callable $processMessage)
324
    {
325
        $connection = $this->getConnection();
326
        $channel = $connection->channel();
327
328
        /**
329
         * The following code is the same both in the consumer and the producer.
330
         * In this way we are sure we always have a queue to consume from and an
331
         * exchange where to publish messages.
332
         *
333
         * name: $queue
334
         * passive: false
335
         * durable: true the queue will survive server restarts
336
         * exclusive: false the queue can be accessed in other channels
337
         * auto_delete: false the queue won't be deleted once the channel is closed.
338
         */
339
        $channel->queue_declare($this->getQueue(), false, true, false, false);
340
341
        /**
342
         * name: $exchange
343
         * type: direct
344
         * passive: false
345
         * durable: true the exchange will survive server restarts
346
         * auto_delete: false the exchange won't be deleted once the channel is closed.
347
         */
348
        $channel->exchange_declare($this->getExchange(), $this->getExchangeTyp(), false, true, false);
349
350
        $channel->queue_bind($this->getQueue(), $this->getExchange(), $this->getRouteKey());
351
352
        /**
353
         *  queue: Queue from where to get the messages
354
         * consumer_tag: Consumer identifier
355
         * no_local: Don't receive messages published by this consumer.
356
         * no_ack: If set to true, automatic acknowledgement mode will be used by this consumer.See https://www.rabbitmq.com/confirms.html for details.
357
         * exclusive: Request exclusive consumer access, meaning only this consumer can access the queue
358
         * nowait:
359
         * callback: A PHP Callback
360
         */
361
        $channel->basic_consume($this->getQueue(),
362
            $this->getConsumerTag(),
363
            false, $this->getAutoAck(),
364
            false,
365
            false,
366
            function ($message) use ($processMessage) {
367
                /** @var AMQPMessage $message */
368
                $message = new Message($message);
369
                return $processMessage($message);
370
            });
371
372
        register_shutdown_function($this->shutdown(), $channel, $connection);
373
374
        // Loop as long as the channel has callbacks registered
375
        while ($channel->is_consuming()) {
376
            $channel->wait();
377
        }
378
    }
379
380
    /**
381
     * shutdown
382
     *
383
     * @return Closure
384
     */
385
    public function shutdown()
386
    {
387
        /**
388
         * @param AMQPChannel $channel
389
         * @param AbstractConnection $connection
390
         */
391
        return function ($channel, $connection) {
392
            $channel->close();
393
            $connection->close();
394
        };
395
    }
396
397
    /**
398
     * Amqp Message Publisher
399
     *
400
     * @param string|array|object|Collection|AMQPMessage|Message $message
401
     * @throws Exception
402
     */
403
    public function publish($message)
404
    {
405
        $connection = $this->getConnection();
406
        $channel = $connection->channel();
407
408
        /**
409
         * The following code is the same both in the consumer and the producer.
410
         * In this way we are sure we always have a queue to consume from and an
411
         * exchange where to publish messages.
412
         *
413
         * name: $queue
414
         * passive: false
415
         * durable: true the queue will survive server restarts
416
         * exclusive: false the queue can be accessed in other channels
417
         * auto_delete: false the queue won't be deleted once the channel is closed.
418
         */
419
        $channel->queue_declare($this->getQueue(), false, true, false, false);
420
421
        /**
422
         * name: $exchange
423
         * type: direct
424
         * passive: false
425
         * durable: true the exchange will survive server restarts
426
         * auto_delete: false the exchange won't be deleted once the channel is closed.
427
         * */
428
        $channel->exchange_declare($this->getExchange(), $this->getExchangeTyp(), false, true, false);
429
430
        $channel->queue_bind($this->getQueue(), $this->getExchange());
431
432
        if ($message instanceof Message) $message->getMessage();
433
        if (!$message instanceof AMQPMessage) {
434
            if ($message instanceof Collection) $message = $message->toJson();
435
            elseif (is_array($message) || is_object($message)) $message = json_encode($message);
436
            $message = new AMQPMessage($message, ['content_type' => 'text/plain', 'delivery_mode' => AMQPMessage::DELIVERY_MODE_PERSISTENT]);
437
        }
438
439
        $channel->basic_publish($message, $this->getExchange(), $this->getRouteKey());
440
441
        $channel->close();
442
        $connection->close();
443
    }
444
445
446
    /**
447
     * Dynamically pass methods to the default connection.
448
     *
449
     * @param string $method
450
     * @param array $parameters
451
     * @return mixed
452
     * @throws Exception
453
     */
454
    public function __call($method, $parameters)
455
    {
456
        return $this->getConnection()->{$method}(...$parameters);
457
    }
458
459
    /**
460
     * Dynamically pass value to the default connection.
461
     *
462
     * @param $name
463
     * @return mixed
464
     * @throws Exception
465
     */
466
    public function __get($name)
467
    {
468
        return $this->getConnection()->{$name};
469
    }
470
}