Client   A
last analyzed

Complexity

Total Complexity 32

Size/Duplication

Total Lines 263
Duplicated Lines 0 %

Importance

Changes 0
Metric Value
eloc 89
dl 0
loc 263
rs 9.84
c 0
b 0
f 0
wmc 32

13 Methods

Rating   Name   Duplication   Size   Complexity  
A closeChannel() 0 7 2
A read() 0 25 5
A closeConnection() 0 7 2
A __construct() 0 18 1
A declareQueue() 0 15 2
A getQueue() 0 3 2
A getChannel() 0 3 2
A declareExchange() 0 17 2
A getConnection() 0 3 1
A bindQueue() 0 9 4
A __destruct() 0 8 4
A createSslContext() 0 13 2
A publish() 0 20 3
1
<?php
2
3
namespace CmdrSharp\AmqpRouteMessenger;
4
5
use PhpAmqpLib\Connection\AMQPStreamConnection;
6
use PhpAmqpLib\Connection\AMQPSSLConnection;
7
use PhpAmqpLib\Channel\AMQPChannel;
8
use PhpAmqpLib\Message\AMQPMessage;
9
use Log;
10
11
class Client implements ClientInterface
12
{
13
    /** @var AMQPStreamConnection */
14
    protected $connection;
15
16
    /** @var AMQPChannel */
17
    protected $channel;
18
19
    /** @var string */
20
    protected $exchange_name;
21
22
    /** @var string */
23
    protected $queue_name;
24
25
    /** @var mixed */
26
    private $response;
27
28
    /**
29
     * Factory constructor.
30
     */
31
    public function __construct()
32
    {
33
        $this->connection = new AMQPSSLConnection(
34
            config('amqproutemessenger.host', 'localhost'),
35
            config('amqproutemessenger.port', '5672'),
36
            config('amqproutemessenger.login', 'guest'),
37
            config('amqproutemessenger.password', 'guest'),
38
            config('amqproutemessenger.vhost', '/'),
39
            $this->createSslContext(),
40
            [
41
                'insist' => config('amqproutemessenger.insist', false),
42
                'login_method' => config('amqproutemessenger.login_method', 'AMQPLAIN'),
43
                'login_response' => null,
44
                'locale' => config('amqproutemessenger.locale', 'en_US'),
45
                'connection_timeout' => config('amqproutemessenger.connection_timeout', 3.0),
46
                'read_write_timeout' => config('amqproutemessenger.read_write_timeout', 3.0),
47
                'keepalive' => config('amqproutemessenger.keepalive', false),
48
                'heartbeat' => config('amqproutemessenger.heartbeat', 0)
49
            ]
50
        );
51
    }
52
53
    /**
54
     * Factory destructor.
55
     */
56
    public function __destruct()
57
    {
58
        if ($this->channel) {
59
            $this->channel->close();
60
        }
61
62
        if ($this->connection && $this->connection->isConnected()) {
63
            $this->connection->close();
64
        }
65
    }
66
67
    /**
68
     * Generates an array of SSL Options if specified.
69
     *
70
     * @return array
71
     */
72
    private function createSslContext(): array
73
    {
74
        $ca_file = config('amqproutemessenger.ca_file', null);
75
        $verify_peer = config('amqproutemessenger.verify_peer', false);
76
77
        if (null !== $ca_file) {
78
            return [
79
                'cafile' => $ca_file,
80
                'verify_peer' => $verify_peer
81
            ];
82
        }
83
84
        return [];
85
    }
86
87
    /**
88
     * Declares an Exchange.
89
     *
90
     * @param string $exchange_name
91
     * @param bool $passive
92
     * @return Client
93
     */
94
    public function declareExchange(string $exchange_name, bool $passive = false): ClientInterface
95
    {
96
        if (!$this->channel) {
97
            $this->channel = $this->connection->channel();
98
        }
99
100
        $this->exchange_name = $exchange_name;
101
102
        $this->channel->exchange_declare(
103
            $this->exchange_name,
104
            'direct',
105
            $passive,
106
            false,
107
            false
108
        );
109
110
        return $this;
111
    }
112
113
    /**
114
     * Declares a randomly generated queue.
115
     *
116
     * @param bool $passive
117
     * @return Client
118
     */
119
    public function declareQueue(bool $passive = false): ClientInterface
120
    {
121
        if (!$this->channel) {
122
            $this->channel = $this->connection->channel();
123
        }
124
125
        list($this->queue_name, , ) = $this->channel->queue_declare(
126
            "",
127
            $passive,
128
            false,
129
            true,
130
            false
131
        );
132
133
        return $this;
134
    }
135
136
    /**
137
     * Binds a queue.
138
     *
139
     * @param string $correlation_id
140
     * @return Client
141
     * @throws \ErrorException
142
     */
143
    public function bindQueue(string $correlation_id): ClientInterface
144
    {
145
        if (!$this->channel || !$this->queue_name || !$this->exchange_name) {
146
            throw new \ErrorException('An exchange has not been defined.');
147
        }
148
149
        $this->channel->queue_bind($this->queue_name, $this->exchange_name, $correlation_id);
150
151
        return $this;
152
    }
153
154
    /**
155
     * Publishes a routed message.
156
     *
157
     * @param string $correlation_id
158
     * @param string $message
159
     * @return bool
160
     * @throws \ErrorException
161
     */
162
    public function publish(string $correlation_id, string $message): bool
163
    {
164
        if (!$this->channel || !$this->exchange_name) {
165
            throw new \ErrorException('An exchange has not been defined.');
166
        }
167
168
        $this->channel->set_ack_handler(function (AMQPMessage $message) {
0 ignored issues
show
Unused Code introduced by
The parameter $message is not used and could be removed. ( Ignorable by Annotation )

If this is a false-positive, you can also ignore this issue in your code via the ignore-unused  annotation

168
        $this->channel->set_ack_handler(function (/** @scrutinizer ignore-unused */ AMQPMessage $message) {

This check looks for parameters that have been defined for a function or method, but which are not used in the method body.

Loading history...
169
            Log::info('AMQP Message ACK');
170
        });
171
172
        $this->channel->set_nack_handler(function (AMQPMessage $message) {
0 ignored issues
show
Unused Code introduced by
The parameter $message is not used and could be removed. ( Ignorable by Annotation )

If this is a false-positive, you can also ignore this issue in your code via the ignore-unused  annotation

172
        $this->channel->set_nack_handler(function (/** @scrutinizer ignore-unused */ AMQPMessage $message) {

This check looks for parameters that have been defined for a function or method, but which are not used in the method body.

Loading history...
173
            Log::error('AMQP Message NACK');
174
        });
175
176
        $this->channel->confirm_select();
177
178
        $this->channel->basic_publish(new AMQPMessage($message), $this->exchange_name, $correlation_id);
179
        $this->channel->wait_for_pending_acks();
180
181
        return true;
182
    }
183
184
    /**
185
     * Reads from a routed exchange and returns the response.
186
     *
187
     * @param int $timeout
188
     * @return mixed
189
     * @throws \ErrorException
190
     */
191
    public function read(int $timeout = 0): string
192
    {
193
        if (!$this->queue_name || !$this->channel) {
194
            throw new \ErrorException('A queue, or exchange, have not been defined.');
195
        }
196
197
        $callback = function ($msg) {
198
            return $this->response = $msg->body;
199
        };
200
201
        $this->channel->basic_consume(
202
            $this->queue_name,
203
            '',
204
            false,
205
            true,
206
            false,
207
            false,
208
            $callback
209
        );
210
211
        while ($this->response === null && count($this->channel->callbacks)) {
212
            $this->channel->wait(null, false, $timeout);
213
        }
214
215
        return $this->response;
216
    }
217
218
    /**
219
     * Closes a connection.
220
     *
221
     * @return $this
222
     */
223
    public function closeConnection(): ClientInterface
224
    {
225
        if ($this->connection->isConnected()) {
226
            $this->connection->close();
227
        }
228
229
        return $this;
230
    }
231
232
    /**
233
     * Closes a channel.
234
     *
235
     * @return $this
236
     */
237
    public function closeChannel(): ClientInterface
238
    {
239
        if ($this->channel) {
240
            $this->channel->close();
241
        }
242
243
        return $this;
244
    }
245
246
    /**
247
     * Returns the active connection.
248
     *
249
     * @return AMQPStreamConnection
250
     */
251
    public function getConnection(): AMQPStreamConnection
252
    {
253
        return $this->connection;
254
    }
255
256
    /**
257
     * Returns the channel.
258
     *
259
     * @return mixed
260
     */
261
    public function getChannel(): AMQPChannel
262
    {
263
        return $this->channel ?: false;
0 ignored issues
show
Bug Best Practice introduced by
The expression return $this->channel ?: false could return the type false which is incompatible with the type-hinted return PhpAmqpLib\Channel\AMQPChannel. Consider adding an additional type-check to rule them out.
Loading history...
264
    }
265
266
    /**
267
     * Returns the name of the queue declared.
268
     *
269
     * @return mixed
270
     */
271
    public function getQueue(): string
272
    {
273
        return $this->queue_name ?: false;
0 ignored issues
show
Bug Best Practice introduced by
The expression return $this->queue_name ?: false could return the type false which is incompatible with the type-hinted return string. Consider adding an additional type-check to rule them out.
Loading history...
274
    }
275
}
276