Completed
Push — master ( 2d1606...ef10be )
by Marcus
03:21
created

Client   A

Complexity

Total Complexity 33

Size/Duplication

Total Lines 264
Duplicated Lines 0 %

Importance

Changes 0
Metric Value
dl 0
loc 264
rs 9.3999
c 0
b 0
f 0
wmc 33

13 Methods

Rating   Name   Duplication   Size   Complexity  
A closeChannel() 0 7 2
B read() 0 25 5
A closeConnection() 0 7 2
A __construct() 0 18 1
A declareQueue() 0 15 2
A getQueue() 0 3 2
A getChannel() 0 3 2
A declareExchange() 0 17 2
A getConnection() 0 3 1
A bindQueue() 0 9 4
A __destruct() 0 8 4
A createSslContext() 0 15 3
A publish() 0 20 3
1
<?php
2
3
namespace CmdrSharp\AmqpRouteMessenger;
4
5
use PhpAmqpLib\Connection\AMQPStreamConnection;
6
use PhpAmqpLib\Connection\AMQPSSLConnection;
7
use PhpAmqpLib\Channel\AMQPChannel;
8
use PhpAmqpLib\Message\AMQPMessage;
9
use Log;
10
11
class Client implements ClientInterface
12
{
13
    /** @var AMQPStreamConnection */
14
    protected $connection;
15
16
    /** @var AMQPChannel */
17
    protected $channel;
18
19
    /** @var string */
20
    protected $exchange_name;
21
22
    /** @var string */
23
    protected $queue_name;
24
25
    /** @var mixed */
26
    private $response;
27
28
    /**
29
     * Factory constructor.
30
     */
31
    public function __construct()
32
    {
33
        $this->connection = new AMQPSSLConnection(
34
            config('amqproutemessenger.host', 'localhost'),
35
            config('amqproutemessenger.port', '5672'),
36
            config('amqproutemessenger.login', 'guest'),
37
            config('amqproutemessenger.password', 'guest'),
38
            config('amqproutemessenger.vhost', '/'),
39
            $this->createSslContext(),
40
            [
41
                'insist' => config('amqproutemessenger.insist', false),
42
                'login_method' => config('amqproutemessenger.login_method', 'AMQPLAIN'),
43
                'login_response' => null,
44
                'locale' => config('amqproutemessenger.locale', 'en_US'),
45
                'connection_timeout' => config('amqproutemessenger.connection_timeout', 3.0),
46
                'read_write_timeout' => config('amqproutemessenger.read_write_timeout', 3.0),
47
                'keepalive' => config('amqproutemessenger.keepalive', false),
48
                'hearttbeat' => config('amqproutemessenger.heartbeat', 0)
49
            ]
50
        );
51
    }
52
53
    /**
54
     * Factory destructor.
55
     */
56
    public function __destruct()
57
    {
58
        if ($this->channel) {
59
            $this->channel->close();
60
        }
61
62
        if ($this->connection && $this->connection->isConnected()) {
63
            $this->connection->close();
64
        }
65
    }
66
67
    /**
68
     * Generates an array of SSL Options if specified.
69
     *
70
     * @return array
71
     */
72
    private function createSslContext(): array
73
    {
74
        $ca_path = config('amqproutemessenger.ca_path', '');
75
        $ca_file = config('amqproutemessenger.ca_file', '');
76
        $verify_peer = config('amqproutemessenger.verify_peer', false);
77
78
        if (!empty($ca_path) && !empty($ca_file)) {
79
            return [
80
                'capath' => $ca_path,
81
                'cafile' => $ca_file,
82
                'verify_peer' => $verify_peer
83
            ];
84
        }
85
86
        return [];
87
    }
88
89
    /**
90
     * Declares an Exchange.
91
     *
92
     * @param string $exchange_name
93
     * @param bool $passive
94
     * @return Client
95
     */
96
    public function declareExchange(string $exchange_name, bool $passive = false): ClientInterface
97
    {
98
        if (!$this->channel) {
99
            $this->channel = $this->connection->channel();
100
        }
101
102
        $this->exchange_name = $exchange_name;
103
104
        $this->channel->exchange_declare(
105
            $this->exchange_name,
106
            'direct',
107
            $passive,
108
            false,
109
            false
110
        );
111
112
        return $this;
113
    }
114
115
    /**
116
     * Declares a randomly generated queue.
117
     *
118
     * @param bool $passive
119
     * @return Client
120
     */
121
    public function declareQueue(bool $passive = false): ClientInterface
122
    {
123
        if (!$this->channel) {
124
            $this->channel = $this->connection->channel();
125
        }
126
127
        list($this->queue_name, ,) = $this->channel->queue_declare(
128
            "",
129
            $passive,
130
            false,
131
            true,
132
            false
133
        );
134
135
        return $this;
136
    }
137
138
    /**
139
     * Binds a queue.
140
     *
141
     * @param string $correlation_id
142
     * @return Client
143
     * @throws \ErrorException
144
     */
145
    public function bindQueue(string $correlation_id): ClientInterface
146
    {
147
        if (!$this->channel || !$this->queue_name || !$this->exchange_name) {
148
            throw new \ErrorException('An exchange has not been defined.');
149
        }
150
151
        $this->channel->queue_bind($this->queue_name, $this->exchange_name, $correlation_id);
152
153
        return $this;
154
    }
155
156
    /**
157
     * Publishes a routed message.
158
     *
159
     * @param string $correlation_id
160
     * @param string $message
161
     * @return bool
162
     * @throws \ErrorException
163
     */
164
    public function publish(string $correlation_id, string $message): bool
165
    {
166
        if (!$this->channel || !$this->exchange_name) {
167
            throw new \ErrorException('An exchange has not been defined.');
168
        }
169
170
        $this->channel->set_ack_handler(function (AMQPMessage $message) {
0 ignored issues
show
Unused Code introduced by
The parameter $message is not used and could be removed. ( Ignorable by Annotation )

If this is a false-positive, you can also ignore this issue in your code via the ignore-unused  annotation

170
        $this->channel->set_ack_handler(function (/** @scrutinizer ignore-unused */ AMQPMessage $message) {

This check looks for parameters that have been defined for a function or method, but which are not used in the method body.

Loading history...
171
            Log::info('AMQP Message ACK');
172
        });
173
174
        $this->channel->set_nack_handler(function (AMQPMessage $message) {
0 ignored issues
show
Unused Code introduced by
The parameter $message is not used and could be removed. ( Ignorable by Annotation )

If this is a false-positive, you can also ignore this issue in your code via the ignore-unused  annotation

174
        $this->channel->set_nack_handler(function (/** @scrutinizer ignore-unused */ AMQPMessage $message) {

This check looks for parameters that have been defined for a function or method, but which are not used in the method body.

Loading history...
175
            Log::error('AMQP Message NACK');
176
        });
177
178
        $this->channel->confirm_select();
179
180
        $this->channel->basic_publish(new AMQPMessage($message), $this->exchange_name, $correlation_id);
181
        $this->channel->wait_for_pending_acks();
182
183
        return true;
184
    }
185
186
    /**
187
     * Reads from a routed exchange and returns the response.
188
     *
189
     * @return mixed
190
     * @throws \ErrorException
191
     */
192
    public function read(): string
193
    {
194
        if (!$this->queue_name || !$this->channel) {
195
            throw new \ErrorException('A queue, or exchange, have not been defined.');
196
        }
197
198
        $callback = function ($msg) {
199
            return $this->response = $msg->body;
200
        };
201
202
        $this->channel->basic_consume(
203
            $this->queue_name,
204
            '',
205
            false,
206
            true,
207
            false,
208
            false,
209
            $callback
210
        );
211
212
        while ($this->response === null && count($this->channel->callbacks)) {
213
            $this->channel->wait();
214
        }
215
216
        return $this->response;
217
    }
218
219
    /**
220
     * Closes a connection.
221
     *
222
     * @return $this
223
     */
224
    public function closeConnection(): ClientInterface
225
    {
226
        if ($this->connection->isConnected()) {
227
            $this->connection->close();
228
        }
229
230
        return $this;
231
    }
232
233
    /**
234
     * Closes a channel.
235
     *
236
     * @return $this
237
     */
238
    public function closeChannel(): ClientInterface
239
    {
240
        if ($this->channel) {
241
            $this->channel->close();
242
        }
243
244
        return $this;
245
    }
246
247
    /**
248
     * Returns the active connection.
249
     *
250
     * @return AMQPStreamConnection
251
     */
252
    public function getConnection(): AMQPStreamConnection
253
    {
254
        return $this->connection;
255
    }
256
257
    /**
258
     * Returns the channel.
259
     *
260
     * @return mixed
261
     */
262
    public function getChannel(): AMQPChannel
263
    {
264
        return $this->channel ?: false;
0 ignored issues
show
Bug Best Practice introduced by
The expression return $this->channel ?: false could return the type false which is incompatible with the type-hinted return PhpAmqpLib\Channel\AMQPChannel. Consider adding an additional type-check to rule them out.
Loading history...
265
    }
266
267
    /**
268
     * Returns the name of the queue declared.
269
     *
270
     * @return mixed
271
     */
272
    public function getQueue(): string
273
    {
274
        return $this->queue_name ?: false;
0 ignored issues
show
Bug Best Practice introduced by
The expression return $this->queue_name ?: false could return the type false which is incompatible with the type-hinted return string. Consider adding an additional type-check to rule them out.
Loading history...
275
    }
276
}
277