AbstractWorker::__destruct()   A
last analyzed

Complexity

Conditions 1
Paths 1

Size

Total Lines 3
Code Lines 1

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 2
CRAP Score 1

Importance

Changes 2
Bugs 0 Features 0
Metric Value
eloc 1
c 2
b 0
f 0
dl 0
loc 3
ccs 2
cts 2
cp 1
rs 10
cc 1
nc 1
nop 0
crap 1
1
<?php
2
3
/**
4
 * @author Marwan Al-Soltany <[email protected]>
5
 * @copyright Marwan Al-Soltany 2020
6
 * For the full copyright and license information, please view
7
 * the LICENSE file that was distributed with this source code.
8
 */
9
10
declare(strict_types=1);
11
12
namespace MAKS\AmqpAgent\Worker;
13
14
use PhpAmqpLib\Connection\AMQPStreamConnection;
15
use PhpAmqpLib\Channel\AMQPChannel;
16
use PhpAmqpLib\Message\AMQPMessage;
17
use PhpAmqpLib\Wire\AMQPTable;
18
use PhpAmqpLib\Exception\AMQPInvalidArgumentException;
19
use PhpAmqpLib\Exception\AMQPTimeoutException;
20
use PhpAmqpLib\Exception\AMQPConnectionClosedException;
21
use MAKS\AmqpAgent\Worker\AbstractWorkerInterface;
22
use MAKS\AmqpAgent\Worker\WorkerCommandTrait;
23
use MAKS\AmqpAgent\Worker\WorkerMutationTrait;
24
use MAKS\AmqpAgent\Exception\MagicMethodsExceptionsTrait;
25
use MAKS\AmqpAgent\Exception\PropertyDoesNotExistException;
26
use MAKS\AmqpAgent\Exception\AmqpAgentException as Exception;
27
use MAKS\AmqpAgent\Config\AbstractWorkerParameters as Parameters;
28
29
/**
30
 * An abstract class implementing the basic functionality of a worker.
31
 * @since 1.0.0
32
 * @api
33
 */
34
abstract class AbstractWorker implements AbstractWorkerInterface
35
{
36
    use MagicMethodsExceptionsTrait {
37
        __get as private __get_MMET;
38
        __set as private __set_MMET;
39
    }
40
    use WorkerMutationTrait;
41
    use WorkerCommandTrait;
42
43
    /**
44
     * The default connection options that the worker should use when no overrides are provided.
45
     * @var array
46
     */
47
    protected $connectionOptions;
48
49
    /**
50
     * The default channel options that the worker should use when no overrides are provided.
51
     * @var array
52
     */
53
    protected $channelOptions;
54
55
    /**
56
     * The default queue options that the worker should use when no overrides are provided.
57
     * @var array
58
     */
59
    protected $queueOptions;
60
61
    /**
62
     * The default connection of the worker.
63
     * @var AMQPStreamConnection
64
     */
65
    public $connection;
66
67
    /**
68
     * The default channel of the worker.
69
     * @var AMQPChannel
70
     */
71
    public $channel;
72
73
    /**
74
     * All opened connections of the worker.
75
     * @var AMQPStreamConnection[]
76
     */
77
    public $connections = [];
78
79
    /**
80
     * All opened channels of the the worker.
81
     * @var AMQPChannel[]
82
     */
83
    public $channels = [];
84
85
86
    /**
87
     * AbstractWorker object constructor.
88
     * @param array $connectionOptions [optional] The overrides for the default connection options of the worker.
89
     * @param array $channelOptions [optional] The overrides for the default channel options of the worker.
90
     * @param array $queueOptions [optional] The overrides for the default queue options of the worker.
91
     */
92 53
    public function __construct(
93
        array $connectionOptions = [],
94
        array $channelOptions = [],
95
        array $queueOptions = []
96
    ) {
97 53
        $this->connectionOptions = Parameters::patch($connectionOptions, 'CONNECTION_OPTIONS');
98 53
        $this->channelOptions    = Parameters::patch($channelOptions, 'CHANNEL_OPTIONS');
99 53
        $this->queueOptions      = Parameters::patch($queueOptions, 'QUEUE_OPTIONS');
100 53
    }
101
102
    /**
103
     * Closes the connection with RabbitMQ server before destroying the object.
104
     */
105 50
    public function __destruct()
106
    {
107 50
        $this->disconnect();
108 50
    }
109
110
    /**
111
     * Gets a class member via public property access notation.
112
     * @param string $member Property name.
113
     * @return mixed
114
     * @throws PropertyDoesNotExistException
115
     */
116 4
    public function __get(string $member)
117
    {
118 4
        $isMember = property_exists($this, $member);
119 4
        if ($isMember) {
120 3
            return $this->{$member};
121
        }
122
123 1
        $this->__get_MMET($member);
124
    }
125
126
    /**
127
     * Sets a class member via public property assignment notation.
128
     * @param string $member Property name.
129
     * @param array $array Array of overrides. The array type here is important, because only *Options properties should be overridable.
130
     * @return void
131
     * @throws PropertyDoesNotExistException
132
     */
133 4
    public function __set(string $member, array $array)
134
    {
135 4
        $isMember = property_exists($this, $member);
136 4
        $notProtected = $member !== 'mutation';
137
138 4
        if ($isMember && $notProtected) {
139 3
            $acceptedKeys = array_keys($this->{$member});
140 3
            foreach ($array as $key => $value) {
141 3
                if (in_array($key, $acceptedKeys)) {
142 3
                    $this->{$member}[$key] = $value;
143
                }
144
            }
145 3
            return;
146
        }
147
148 1
        $this->__set_MMET($member, $array);
149
    }
150
151
152
    /**
153
     * Closes the connection or the channel or both with RabbitMQ server.
154
     * @param AMQPStreamConnection|AMQPChannel|AMQPMessage ...$object The object that should be used to close the channel or the connection.
155
     * @return bool True on success.
156
     * @throws Exception
157
     */
158 7
    public static function shutdown(...$object): bool
159
    {
160 7
        $successful = true;
161 7
        $parameters = [];
162
163 7
        foreach ($object as $class) {
164 7
            $parameters[] = is_object($class) ? get_class($class) : gettype($class);
165
            if (
166 7
                $class instanceof AMQPStreamConnection ||
167 7
                $class instanceof AMQPChannel ||
168 7
                $class instanceof AMQPMessage
169
            ) {
170
                try {
171 6
                    if (!($class instanceof AMQPMessage)) {
172 2
                        $class->close();
173 2
                        continue;
174
                    }
175 4
                    $class->getChannel()->close();
176 4
                } catch (AMQPConnectionClosedException $e) {
177
                    // No need to throw the exception here as it's extraneous. This error
178
                    // happens when a channel gets closed multiple times in different ways.
179
                }
180
            } else {
181 5
                $successful = false;
182
            }
183
        }
184
185 7
        if ($successful) {
186 6
            return $successful;
187
        }
188
189 1
        throw new Exception(
190 1
            sprintf(
191 1
                'The passed parameter must be of type %s, %s or %s or a combination of them. Given parameter(s) has/have the type(s): %s!',
192 1
                AMQPStreamConnection::class,
193 1
                AMQPChannel::class,
194 1
                AMQPMessage::class,
195 1
                implode(', ', $parameters)
196
            )
197
        );
198
    }
199
200
    /**
201
     * Returns an AMQPTable object.
202
     * @param array $array An array of the option wished to be turn into the an arguments object.
203
     * @return AMQPTable
204
     */
205 1
    public static function arguments(array $array): AMQPTable
206
    {
207 1
        return new AMQPTable($array);
208
    }
209
210
211
    /**
212
     * Establishes a connection with RabbitMQ server and opens a channel for the worker in the opened connection, it also sets both of them as defaults.
213
     * @return self
214
     */
215 38
    public function connect()
216
    {
217 38
        if (empty($this->connection)) {
218 37
            $this->connection = $this->getNewConnection();
219
        }
220
221 38
        if (empty($this->channel)) {
222 37
            $this->channel = $this->getNewChannel();
223
        }
224
225 38
        return $this;
226
    }
227
228
    /**
229
     * Closes all open channels and connections with RabbitMQ server.
230
     * @return self
231
     */
232 51
    public function disconnect()
233
    {
234 51
        if (count($this->channels)) {
235 36
            foreach ($this->channels as $channel) {
236 36
                $channel->close();
237
            }
238 36
            $this->channel = null;
239 36
            $this->channels = [];
240
        }
241
242 51
        if (count($this->connections)) {
243 36
            foreach ($this->connections as $connection) {
244 36
                $connection->close();
245
            }
246 36
            $this->connection = null;
247 36
            $this->connections = [];
248
        }
249
250 51
        return $this;
251
    }
252
253
    /**
254
     * Executes `self::disconnect()` and `self::connect()` respectively. Note that this method will not restore old channels.
255
     * @return self
256
     */
257 1
    public function reconnect()
258
    {
259 1
        $this->disconnect();
260 1
        $this->connect();
261
262 1
        return $this;
263
    }
264
265
    /**
266
     * Declares a queue on the default channel of the worker's connection with RabbitMQ server.
267
     * @param array $parameters [optional] The overrides for the default queue options of the worker.
268
     * @param AMQPChannel $_channel [optional] The channel that should be used instead of the default worker's channel.
269
     * @return self
270
     * @throws AMQPTimeoutException
271
     */
272 20
    public function queue(?array $parameters = null, ?AMQPChannel $_channel = null)
273
    {
274 20
        $changes = null;
275 20
        if ($parameters) {
276 7
            $changes = $this->mutateClassMember('queueOptions', $parameters);
277
        }
278
279 20
        $channel = $_channel ?: $this->channel;
280
281
        try {
282 20
            $channel->queue_declare(
283 20
                $this->queueOptions['queue'],
284 20
                $this->queueOptions['passive'],
285 20
                $this->queueOptions['durable'],
286 20
                $this->queueOptions['exclusive'],
287 20
                $this->queueOptions['auto_delete'],
288 20
                $this->queueOptions['nowait'],
289 20
                $this->queueOptions['arguments'],
290 20
                $this->queueOptions['ticket']
291
            );
292
        } catch (AMQPTimeoutException $error) { // @codeCoverageIgnore
293
            Exception::rethrow($error); // @codeCoverageIgnore
294
        }
295
296 20
        if ($changes) {
297 7
            $this->mutateClassMember('queueOptions', $changes);
298
        }
299
300 20
        return $this;
301
    }
302
303
    /**
304
     * Returns the default connection of the worker. If the worker is not connected, it returns null.
305
     * @since 1.1.0
306
     * @return AMQPStreamConnection|null
307
     */
308 4
    public function getConnection(): ?AMQPStreamConnection
309
    {
310 4
        return $this->connection;
311
    }
312
313
    /**
314
     * Sets the passed connection as the default connection of the worker.
315
     * @since 1.1.0
316
     * @param AMQPStreamConnection $connection The connection that should be as the default connection of the worker.
317
     * @return self
318
     */
319 1
    public function setConnection(AMQPStreamConnection $connection)
320
    {
321 1
        $this->connection = $connection;
322 1
        return $this;
323
    }
324
325
    /**
326
     * Opens a new connection to RabbitMQ server and returns it. Connections returned by this method pushed to connections array and are not set as default automatically.
327
     * @since 1.1.0
328
     * @param array|null $parameters
329
     * @return AMQPStreamConnection
330
     */
331 37
    public function getNewConnection(array $parameters = null): AMQPStreamConnection
332
    {
333 37
        $changes = null;
334 37
        if ($parameters) {
335 1
            $changes = $this->mutateClassMember('connectionOptions', $parameters);
336
        }
337
338 37
        $this->connections[] = $connection = new AMQPStreamConnection(
339 37
            $this->connectionOptions['host'],
340 37
            $this->connectionOptions['port'],
341 37
            $this->connectionOptions['user'],
342 37
            $this->connectionOptions['password'],
343 37
            $this->connectionOptions['vhost'],
344 37
            $this->connectionOptions['insist'],
345 37
            $this->connectionOptions['login_method'],
346 37
            $this->connectionOptions['login_response'],
347 37
            $this->connectionOptions['locale'],
348 37
            $this->connectionOptions['connection_timeout'],
349 37
            $this->connectionOptions['read_write_timeout'],
350 37
            $this->connectionOptions['context'],
351 37
            $this->connectionOptions['keepalive'],
352 37
            $this->connectionOptions['heartbeat'],
353 37
            $this->connectionOptions['channel_rpc_timeout'],
354 37
            $this->connectionOptions['ssl_protocol']
355
        );
356
357 37
        if ($changes) {
358 1
            $this->mutateClassMember('connectionOptions', $changes);
359
        }
360
361 37
        return $connection;
362
    }
363
364
    /**
365
     * Returns the default channel of the worker. If the worker is not connected, it returns null.
366
     * @return AMQPChannel|null
367
     */
368 11
    public function getChannel(): ?AMQPChannel
369
    {
370 11
        return $this->channel;
371
    }
372
373
    /**
374
     * Sets the passed channel as the default channel of the worker.
375
     * @since 1.1.0
376
     * @param AMQPChannel $channel The channel that should be as the default channel of the worker.
377
     * @return self
378
     */
379 1
    public function setChannel(AMQPChannel $channel)
380
    {
381 1
        $this->channel = $channel;
382 1
        return $this;
383
    }
384
385
    /**
386
     * Returns a new channel on the the passed connection of the worker. If no connection is passed, it uses the default connection. If the worker is not connected, it returns null.
387
     * @param array|null $parameters [optional] The overrides for the default channel options of the worker.
388
     * @param AMQPStreamConnection|null $_connection [optional] The connection that should be used instead of the default worker's connection.
389
     * @return AMQPChannel|null
390
     */
391 37
    public function getNewChannel(array $parameters = null, ?AMQPStreamConnection $_connection = null): ?AMQPChannel
392
    {
393 37
        $changes = null;
394 37
        if ($parameters) {
395 2
            $changes = $this->mutateClassMember('channelOptions', $parameters);
396
        }
397
398 37
        $connection = $_connection ?: $this->connection;
399
400 37
        $channel = null;
401 37
        if (isset($connection)) {
402 37
            $this->channels[] = $channel = $connection->channel(
403 37
                $this->channelOptions['channel_id']
404
            );
405
        }
406
407 37
        if ($changes) {
408 2
            $this->mutateClassMember('channelOptions', $changes);
409
        }
410
411 37
        return $channel;
412
    }
413
414
    /**
415
     * Fetches a channel object identified by the passed id (channel_id). If not found, it returns null.
416
     * @param int $channelId The id of the channel wished to be fetched.
417
     * @param AMQPStreamConnection|null $_connection [optional] The connection that should be used instead of the default worker's connection.
418
     * @return AMQPChannel|null
419
     */
420 2
    public function getChannelById(int $channelId, ?AMQPStreamConnection $_connection = null): ?AMQPChannel
421
    {
422 2
        $connection = $_connection ?: $this->connection;
423 2
        $channels = $connection->channels;
424
425 2
        if (array_key_exists($channelId, $channels)) {
426 1
            return $channels[$channelId];
427
        }
428
429 1
        return null;
430
    }
431
}
432