Completed
Branch master (5b18a9)
by Marwan
06:51
created

AbstractWorker::setChannel()   A

Complexity

Conditions 1
Paths 1

Size

Total Lines 4
Code Lines 2

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 3
CRAP Score 1

Importance

Changes 0
Metric Value
cc 1
eloc 2
c 0
b 0
f 0
nc 1
nop 1
dl 0
loc 4
ccs 3
cts 3
cp 1
crap 1
rs 10
1
<?php
2
/**
3
 * @author Marwan Al-Soltany <[email protected]>
4
 * @copyright Marwan Al-Soltany 2020
5
 * For the full copyright and license information, please view
6
 * the LICENSE file that was distributed with this source code.
7
 */
8
9
namespace MAKS\AmqpAgent\Worker;
10
11
use PhpAmqpLib\Connection\AMQPStreamConnection;
12
use PhpAmqpLib\Channel\AMQPChannel;
13
use PhpAmqpLib\Message\AMQPMessage;
14
use PhpAmqpLib\Wire\AMQPTable;
15
use PhpAmqpLib\Exception\AMQPInvalidArgumentException;
16
use PhpAmqpLib\Exception\AMQPTimeoutException;
17
use PhpAmqpLib\Exception\AMQPConnectionClosedException;
18
use MAKS\AmqpAgent\Worker\AbstractWorkerInterface;
19
use MAKS\AmqpAgent\Worker\WorkerCommandTrait;
20
use MAKS\AmqpAgent\Worker\WorkerMutationTrait;
21
use MAKS\AmqpAgent\Exception\AmqpAgentException;
22
use MAKS\AmqpAgent\Exception\MethodDoesNotExistException;
23
use MAKS\AmqpAgent\Exception\PropertyDoesNotExistException;
24
25
/**
26
 * An abstract class implementing the basic functionality of a worker.
27
 * @since 1.0.0
28
 * @api
29
 */
30
abstract class AbstractWorker implements AbstractWorkerInterface
31
{
32
    use WorkerMutationTrait;
33
    use WorkerCommandTrait;
34
35
    /**
36
     * The default connection options that the worker should use when no overrides are provided.
37
     * @var array
38
     */
39
    protected $connectionOptions;
40
41
    /**
42
     * The default channel options that the worker should use when no overrides are provided.
43
     * @var array
44
     */
45
    protected $channelOptions;
46
47
    /**
48
     * The default queue options that the worker should use when no overrides are provided.
49
     * @var array
50
     */
51
    protected $queueOptions;
52
53
    /**
54
     * The default connection of the worker.
55
     * @var AMQPStreamConnection
56
     */
57
    public $connection;
58
59
    /**
60
     * The default channel of the worker.
61
     * @var AMQPChannel
62
     */
63
    public $channel;
64
65
    /**
66
     * All opened connections of the worker.
67
     * @var AMQPStreamConnection[]
68
     */
69
    public $connections = [];
70
71
    /**
72
     * All opened channels of the the worker.
73
     * @var AMQPChannel[]
74
     */
75
    public $channels = [];
76
77
78
    /**
79
     * AbstractWorker object constuctor.
80
     * @param array $connectionOptions [optional] The overrides for the default connection options of the worker.
81
     * @param array $channelOptions [optional] The overrides for the default channel options of the worker.
82
     * @param array $queueOptions [optional] The overrides for the default queue options of the worker.
83
     */
84 50
    public function __construct(array $connectionOptions = [], array $channelOptions = [], array $queueOptions = [])
85
    {
86 50
        $this->connectionOptions = [
87 50
            'host'                   =>    $connectionOptions['host'] ?? self::CONNECTION_OPTIONS['host'],
88 50
            'port'                   =>    $connectionOptions['port'] ?? self::CONNECTION_OPTIONS['port'],
89 50
            'user'                   =>    $connectionOptions['user'] ?? self::CONNECTION_OPTIONS['user'],
90 50
            'password'               =>    $connectionOptions['password'] ?? self::CONNECTION_OPTIONS['password'],
91 50
            'vhost'                  =>    $connectionOptions['vhost'] ?? self::CONNECTION_OPTIONS['vhost'],
92 50
            'insist'                 =>    $connectionOptions['insist'] ?? self::CONNECTION_OPTIONS['insist'],
93 50
            'login_method'           =>    $connectionOptions['login_method'] ?? self::CONNECTION_OPTIONS['login_method'],
94 50
            'login_response'         =>    $connectionOptions['login_response'] ?? self::CONNECTION_OPTIONS['login_response'],
95 50
            'locale'                 =>    $connectionOptions['locale'] ?? self::CONNECTION_OPTIONS['locale'],
96 50
            'connection_timeout'     =>    $connectionOptions['connection_timeout'] ?? self::CONNECTION_OPTIONS['connection_timeout'],
97 50
            'read_write_timeout'     =>    $connectionOptions['read_write_timeout'] ?? self::CONNECTION_OPTIONS['read_write_timeout'],
98 50
            'context'                =>    $connectionOptions['context'] ?? self::CONNECTION_OPTIONS['context'],
99 50
            'keepalive'              =>    $connectionOptions['keepalive'] ?? self::CONNECTION_OPTIONS['keepalive'],
100 50
            'heartbeat'              =>    $connectionOptions['heartbeat'] ?? self::CONNECTION_OPTIONS['heartbeat'],
101 50
            'channel_rpc_timeout'    =>    $connectionOptions['channel_rpc_timeout'] ?? self::CONNECTION_OPTIONS['channel_rpc_timeout'],
102 50
            'ssl_protocol'           =>    $connectionOptions['ssl_protocol'] ?? self::CONNECTION_OPTIONS['ssl_protocol']
103
        ];
104
105 50
        $this->channelOptions = [
106 50
            'channel_id'    =>    $channelOptions['channel_id'] ?? self::CHANNEL_OPTIONS['channel_id']
107
        ];
108
109 50
        $this->queueOptions = [
110 50
            'queue'          =>    $queueOptions['queue'] ?? self::QUEUE_OPTIONS['queue'],
111 50
            'passive'        =>    $queueOptions['passive'] ?? self::QUEUE_OPTIONS['passive'],
112 50
            'durable'        =>    $queueOptions['durable'] ?? self::QUEUE_OPTIONS['durable'],
113 50
            'exclusive'      =>    $queueOptions['exclusive'] ?? self::QUEUE_OPTIONS['exclusive'],
114 50
            'auto_delete'    =>    $queueOptions['auto_delete'] ?? self::QUEUE_OPTIONS['auto_delete'],
115 50
            'nowait'         =>    $queueOptions['nowait'] ?? self::QUEUE_OPTIONS['nowait'],
116 50
            'arguments'      =>    $queueOptions['arguments'] ?? self::QUEUE_OPTIONS['arguments'],
117 50
            'ticket'         =>    $queueOptions['ticket'] ?? self::QUEUE_OPTIONS['ticket']
118
        ];
119 50
    }
120
121
    /**
122
     * Closes the connection with RabbitMQ server before destoring the object.
123
     */
124 47
    public function __destruct()
125
    {
126 47
        $this->disconnect();
127 47
    }
128
129
    /**
130
     * Gets a class member via public property access notation.
131
     * @param string $member Property name.
132
     * @return mixed
133
     * @throws PropertyDoesNotExistException
134
     */
135 4
    public function __get(string $member)
136
    {
137 4
        $isMember = property_exists($this, $member);
138 4
        if ($isMember) {
139 3
            return $this->{$member};
140
        }
141
142 1
        throw new PropertyDoesNotExistException(
143
            sprintf( // @codeCoverageIgnore
144
                // PHPUnit reports the line above as uncovered although the entire block is tested.
145 1
                'The requested property with the name "%s" does not exist!',
146
                $member
147
            )
148
        );
149
    }
150
151
    /**
152
     * Sets a class member via public property assignment notation.
153
     * @param string $member Property name.
154
     * @param array $array Array of overrides. The array type here is important, because only *Options properties should be overwritable.
155
     * @return void
156
     * @throws PropertyDoesNotExistException
157
     */
158 4
    public function __set(string $member, array $array)
159
    {
160 4
        $isMember = property_exists($this, $member);
161 4
        $notProtected = $member !== 'mutation' ? true : false;
162
163 4
        if ($isMember && $notProtected) {
164 3
            $acceptedKeys = array_keys($this->{$member});
165 3
            foreach ($array as $key => $value) {
166 3
                if (in_array($key, $acceptedKeys)) {
167 3
                    $this->{$member}[$key] = $value;
168
                }
169
            }
170 3
            return;
171
        }
172
173 1
        throw new PropertyDoesNotExistException(
174
            sprintf( // @codeCoverageIgnore
175
                // PHPUnit reports the line above as uncovered although the entire block is tested.
176 1
                'A property with the name "%s" is immutable or does not exist!',
177
                $member
178
            )
179
        );
180
    }
181
182
    /**
183
     * Throws an exception for calls to undefined methods.
184
     * @param string $function Function name.
185
     * @param array $arguments Function arguments.
186
     * @return mixed
187
     * @throws MethodDoesNotExistException
188
     */
189 1
    public function __call(string $function, array $arguments)
190
    {
191 1
        throw new MethodDoesNotExistException(
192
            sprintf( // @codeCoverageIgnore
193
                // PHPUnit reports the line above as uncovered although the entire block is tested.
194 1
                'The called method "%s" with the parameters "%s" does not exist!',
195
                $function,
196 1
                implode(', ', $arguments)
197
            )
198
        );
199
    }
200
201
    /**
202
     * Throws an exception for calls to undefined static methods.
203
     * @param string $function Function name.
204
     * @param array $arguments Function arguments.
205
     * @return mixed
206
     * @throws MethodDoesNotExistException
207
     */
208 1
    public static function __callStatic(string $function, array $arguments)
209
    {
210 1
        throw new MethodDoesNotExistException(
211
            sprintf( // @codeCoverageIgnore
212
                // PHPUnit reports the line above as uncovered although the entire block is tested.
213 1
                'The called static method "%s" with the parameters "%s" does not exist!',
214
                $function,
215 1
                implode(', ', $arguments)
216
            )
217
        );
218
    }
219
220
221
    /**
222
     * Closes the connection or the channel or both with RabbitMQ server.
223
     * @param AMQPStreamConnection|AMQPChannel|AMQPMessage ...$object The object that should be used to close the channel or the connection.
224
     * @return bool True on success.
225
     * @throws AMQPInvalidArgumentException
226
     */
227 7
    public static function shutdown(...$object): bool
228
    {
229 7
        $successful = true;
230 7
        $parameters = [];
231
232 7
        foreach ($object as $class) {
233 7
            $parameters[] = is_object($class) ? get_class($class) : gettype($class);
234
            if (
235 7
                $class instanceof AMQPStreamConnection ||
236 7
                $class instanceof AMQPChannel ||
237 7
                $class instanceof AMQPMessage
238
            ) {
239
                try {
240 6
                    if (!($class instanceof AMQPMessage)) {
241 2
                        $class->close();
242 2
                        continue;
243
                    }
244 4
                    $class->delivery_info['channel']->close();
0 ignored issues
show
Deprecated Code introduced by
The property PhpAmqpLib\Message\AMQPMessage::$delivery_info has been deprecated. ( Ignorable by Annotation )

If this is a false-positive, you can also ignore this issue in your code via the ignore-deprecated  annotation

244
                    /** @scrutinizer ignore-deprecated */ $class->delivery_info['channel']->close();
Loading history...
245 4
                } catch (AMQPConnectionClosedException $e) {
246
                    // No need to throw the exception here as it's extraneous. This error
247
                    // happens when a channel gets closed multiple times in different ways.
248
                }
249
            } else {
250 1
                $successful = false;
251
            }
252
        }
253
254 7
        if ($successful) {
255 6
            return $successful;
256
        }
257
258 1
        throw new AMQPInvalidArgumentException(
259
            sprintf( // @codeCoverageIgnore
260
                // PHPUnit reports the line above as uncovered although the entire block is tested.
261 1
                'The passed parameter must be of type %s, %s or %s or a combination of them. Given parameter(s) has/have the type(s): %s!',
262 1
                AMQPStreamConnection::class,
263 1
                AMQPChannel::class,
264 1
                AMQPMessage::class,
265 1
                implode(', ', $parameters)
266
            )
267
        );
268
    }
269
270
271
    /**
272
     * Establishes a connection with RabbitMQ server and opens a channel for the worker in the opened connection, it also sets both of them as defaults.
273
     * @return self
274
     */
275 36
    public function connect(): self
276
    {
277 36
        if (empty($this->connection)) {
278 35
            $this->connection = $this->getNewConnection();
279
        }
280
281 36
        if (empty($this->channel)) {
282 35
            $this->channel = $this->getNewChannel();
283
        }
284
285 36
        return $this;
286
    }
287
288
    /**
289
     * Closes all open channels and connections with RabbitMQ server.
290
     * @return self
291
     */
292 48
    public function disconnect(): self
293
    {
294 48
        if (count($this->channels)) {
295 33
            foreach ($this->channels as $channel) {
296 33
                $channel->close();
297
            }
298 33
            $this->channel = null;
299 33
            $this->channels = [];
300
        }
301
302 48
        if (count($this->connections)) {
303 33
            foreach ($this->connections as $connection) {
304 33
                $connection->close();
305
            }
306 33
            $this->connection = null;
307 33
            $this->connections = [];
308
        }
309
310 48
        return $this;
311
    }
312
313
    /**
314
     * Executes self::disconnect() and self::connect() respectively.
315
     * @return self
316
     */
317 1
    public function reconnect(): self
318
    {
319 1
        $this->disconnect();
320 1
        $this->connect();
321
322 1
        return $this;
323
    }
324
325
    /**
326
     * Declares a queue on the default channel of the worker's connection with RabbitMQ server.
327
     * @param array $parameters [optional] The overrides for the default queue options of the worker.
328
     * @param AMQPChannel $_channel [optional] The channel that should be used instead of the default worker's channel.
329
     * @return self
330
     * @throws AMQPTimeoutException
331
     */
332 18
    public function queue(?array $parameters = null, ?AMQPChannel $_channel = null): self
333
    {
334 18
        $changes = null;
335 18
        if ($parameters) {
336 7
            $changes = $this->mutateClassMember('queueOptions', $parameters);
337
        }
338
339 18
        $channel = $_channel ?: $this->channel;
340
341
        try {
342 18
            $channel->queue_declare(
343 18
                $this->queueOptions['queue'],
344 18
                $this->queueOptions['passive'],
345 18
                $this->queueOptions['durable'],
346 18
                $this->queueOptions['exclusive'],
347 18
                $this->queueOptions['auto_delete'],
348 18
                $this->queueOptions['nowait'],
349 18
                $this->queueOptions['arguments'],
350 18
                $this->queueOptions['ticket']
351
            );
352
        } catch (AMQPTimeoutException $error) { // @codeCoverageIgnore
353
            AmqpAgentException::rethrowException($error, __METHOD__ . '() failed!'); // @codeCoverageIgnore
354
        }
355
356 18
        if ($changes) {
357 7
            $this->mutateClassMember('queueOptions', $changes);
358
        }
359
360 18
        return $this;
361
    }
362
363
    /**
364
     * Returns an AMQPTable object.
365
     * @param array $array An array of the option wished to be turn into the an arguments object.
366
     * @return AMQPTable
367
     */
368 1
    public function arguments(array $array): AMQPTable
369
    {
370 1
        return new AMQPTable($array);
371
    }
372
373
    /**
374
     * Returns the default connection of the worker. If the worker is not connected, it returns null.
375
     * @return AMQPStreamConnection|null
376
     */
377 4
    public function getConnection(): ?AMQPStreamConnection
378
    {
379 4
        return $this->connection;
380
    }
381
382
    /**
383
     * Sets the passed connection as the default connection of the worker.
384
     * @param AMQPStreamConnection $connection The connection that should be as the default connection of the worker.
385
     * @return self
386
     */
387 1
    public function setConnection(AMQPStreamConnection $connection): self
388
    {
389 1
        $this->connection = $connection;
390 1
        return $this;
391
    }
392
393
    /**
394
     * Opens a new connection to RabbitMQ server and returns it. Connections returned by this method pushed to connections array and are not set as default automaticly.
395
     * @return AMQPStreamConnection
396
     */
397 35
    public function getNewConnection(array $parameters = null): AMQPStreamConnection
398
    {
399 35
        $changes = null;
400 35
        if ($parameters) {
401 1
            $changes = $this->mutateClassMember('connectionOptions', $parameters);
402
        }
403
404 35
        $this->connections[] = $connection = new AMQPStreamConnection(
405 35
            $this->connectionOptions['host'],
406 35
            $this->connectionOptions['port'],
407 35
            $this->connectionOptions['user'],
408 35
            $this->connectionOptions['password'],
409 35
            $this->connectionOptions['vhost'],
410 35
            $this->connectionOptions['insist'],
411 35
            $this->connectionOptions['login_method'],
412 35
            $this->connectionOptions['login_response'],
413 35
            $this->connectionOptions['locale'],
414 35
            $this->connectionOptions['connection_timeout'],
415 35
            $this->connectionOptions['read_write_timeout'],
416 35
            $this->connectionOptions['context'],
417 35
            $this->connectionOptions['keepalive'],
418 35
            $this->connectionOptions['heartbeat'],
419 35
            $this->connectionOptions['channel_rpc_timeout'],
420 35
            $this->connectionOptions['ssl_protocol']
421
        );
422
423 35
        if ($changes) {
424 1
            $this->mutateClassMember('connectionOptions', $changes);
425
        }
426
427 35
        return $connection;
428
    }
429
430
    /**
431
     * Returns the default channel of the worker. If the worker is not connected, it returns null.
432
     * @return AMQPChannel|null
433
     */
434 11
    public function getChannel(): ?AMQPChannel
435
    {
436 11
        return $this->channel;
437
    }
438
439
    /**
440
     * Sets the passed channel as the default channel of the worker.
441
     * @param AMQPChannel $channel The channel that should be as the default channel of the worker.
442
     * @return self
443
     */
444 1
    public function setChannel(AMQPChannel $channel): self
445
    {
446 1
        $this->channel = $channel;
447 1
        return $this;
448
    }
449
450
    /**
451
     * Returns a new channel on the the passed connection of the worker. If no connection is passed, it uses the default connection. If the worker is not connected, it returns null.
452
     * @param array $parameters [optional] The overrides for the default channel options of the worker.
453
     * @param AMQPStreamConnection $_connection [optional] The connection that should be used instead of the default worker's connection.
454
     * @return AMQPChannel|null
455
     */
456 35
    public function getNewChannel(array $parameters = null, ?AMQPStreamConnection $_connection = null): ?AMQPChannel
457
    {
458 35
        $changes = null;
459 35
        if ($parameters) {
460 2
            $changes = $this->mutateClassMember('channelOptions', $parameters);
461
        }
462
463 35
        $connection = $_connection ?: $this->connection;
464
465 35
        $channel = null;
466 35
        if (isset($connection)) {
467 35
            $this->channels[] = $channel = $connection->channel(
468 35
                $this->channelOptions['channel_id']
469
            );
470
        }
471
472 35
        if ($changes) {
473 2
            $this->mutateClassMember('channelOptions', $changes);
474
        }
475
476 35
        return $channel;
477
    }
478
479
    /**
480
     * Fetches a channel object identified by the passed id (channel_id). If not found, it returns null.
481
     * @param int $channleId The id of the channel wished to be fetched.
482
     * @param AMQPStreamConnection $_connection [optional] The connection that should be used instead of the default worker's connection.
483
     * @return AMQPChannel|null
484
     */
485 2
    public function getChannelById(int $channleId, ?AMQPStreamConnection $_connection = null): ?AMQPChannel
486
    {
487 2
        $connection = $_connection ?: $this->connection;
488 2
        $channels = $connection->channels;
489
490 2
        if (array_key_exists($channleId, $channels)) {
491 1
            return $channels[$channleId];
492
        }
493
494 1
        return null;
495
    }
496
}
497