Completed
Push — master ( d97e26...f4067f )
by Marwan
21s queued 12s
created

Consumer::waitForAll()   B

Complexity

Conditions 10
Paths 48

Size

Total Lines 51
Code Lines 28

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 25
CRAP Score 10

Importance

Changes 2
Bugs 0 Features 0
Metric Value
eloc 28
c 2
b 0
f 0
dl 0
loc 51
ccs 25
cts 25
cp 1
rs 7.6666
cc 10
nc 48
nop 2
crap 10

How to fix   Long Method    Complexity   

Long Method

Small methods make your code easier to understand, in particular if combined with a good name. Besides, if your method is small, finding a good name is usually much easier.

For example, if you find yourself adding comments to a method's body, this is usually a good sign to extract the commented part to a new method, and use the comment as a starting point when coming up with a good name for this new method.

Commonly applied refactorings include:

1
<?php
2
3
/**
4
 * @author Marwan Al-Soltany <[email protected]>
5
 * @copyright Marwan Al-Soltany 2020
6
 * For the full copyright and license information, please view
7
 * the LICENSE file that was distributed with this source code.
8
 */
9
10
declare(strict_types=1);
11
12
namespace MAKS\AmqpAgent\Worker;
13
14
use PhpAmqpLib\Connection\AMQPStreamConnection;
15
use PhpAmqpLib\Channel\AMQPChannel;
16
use PhpAmqpLib\Message\AMQPMessage;
17
use PhpAmqpLib\Exception\AMQPTimeoutException;
18
use PhpAmqpLib\Exception\AMQPRuntimeException;
19
use PhpAmqpLib\Exception\AMQPOutOfBoundsException;
20
use MAKS\AmqpAgent\Worker\AbstractWorker;
21
use MAKS\AmqpAgent\Worker\ConsumerInterface;
22
use MAKS\AmqpAgent\Worker\WorkerFacilitationInterface;
23
use MAKS\AmqpAgent\Exception\CallbackDoesNotExistException;
24
use MAKS\AmqpAgent\Exception\AmqpAgentException as Exception;
25
use MAKS\AmqpAgent\Config\ConsumerParameters as Parameters;
26
27
/**
28
 * A class specialized in consuming. Implementing only the methods needed for a consumer.
29
 *
30
 * Example:
31
 * ```
32
 * $consumer = new Consumer();
33
 * $consumer->connect();
34
 * $consumer->queue();
35
 * $consumer->qos();
36
 * $consumer->consume('SomeNamespace\SomeClass::someCallback');
37
 * $consumer->wait();
38
 * $consumer->disconnect();
39
 * ```
40
 *
41
 * @since 1.0.0
42
 * @api
43
 */
44
class Consumer extends AbstractWorker implements ConsumerInterface, WorkerFacilitationInterface
45
{
46
    /**
47
     * The full quality of service options that should be used for the worker.
48
     * @var array
49
     */
50
    protected $qosOptions;
51
52
    /**
53
     * The full wait options that should be used for the worker.
54
     * @var array
55
     */
56
    protected $waitOptions;
57
58
    /**
59
     * The full consume options that should be used for the worker.
60
     * @var array
61
     */
62
    protected $consumeOptions;
63
64
    /**
65
     * The full acknowledgment options that should be used for the worker.
66
     * @var array
67
     */
68
    protected $ackOptions;
69
70
    /**
71
     * The full unacknowledgment options that should be used for the worker.
72
     * @var array
73
     */
74
    protected $nackOptions;
75
76
77
    /**
78
     * Consumer object constructor.
79
     * @param array $connectionOptions [optional] The overrides for the default connection options of the worker.
80
     * @param array $channelOptions [optional] The overrides for the default channel options of the worker.
81
     * @param array $queueOptions [optional] The overrides for the default queue options of the worker.
82
     * @param array $qosOptions [optional] The overrides for the default quality of service options of the worker.
83
     * @param array $waitOptions [optional] The overrides for the default wait options of the worker.
84
     * @param array $consumeOptions [optional] The overrides for the default consume options of the worker.
85
     */
86 15
    public function __construct(
87
        array $connectionOptions = [],
88
        array $channelOptions = [],
89
        array $queueOptions = [],
90
        array $qosOptions = [],
91
        array $waitOptions = [],
92
        array $consumeOptions = []
93
    ) {
94 15
        $this->qosOptions     = Parameters::patch($qosOptions, 'QOS_OPTIONS');
95 15
        $this->waitOptions    = Parameters::patch($waitOptions, 'WAIT_OPTIONS');
96 15
        $this->consumeOptions = Parameters::patch($consumeOptions, 'CONSUME_OPTIONS');
97 15
        $this->ackOptions     = Parameters::ACK_OPTIONS;
98 15
        $this->nackOptions    = Parameters::NACK_OPTIONS;
99
100 15
        parent::__construct($connectionOptions, $channelOptions, $queueOptions);
101 15
    }
102
103
104
    /**
105
     * Acknowledges an AMQP message object.
106
     * Starting from v1.1.1, you can use php-amqplib AMQPMessage::ack() method instead.
107
     * @param AMQPMessage $_message The message object that should be acknowledged.
108
     * @param array|null $parameters [optional] The overrides for the default acknowledge options.
109
     * @return void
110
     * @throws AMQPRuntimeException
111
     */
112 3
    public static function ack(AMQPMessage $_message, ?array $parameters = null): void
113
    {
114 3
        $parameters = Parameters::patch($parameters ?? [], 'ACK_OPTIONS');
115
116
        /**
117
         * If a consumer dies without sending an acknowledgement the AMQP broker will redeliver it
118
         * to another consumer. If none are available at the time, the broker will wait until at
119
         * least one consumer is registered for the same queue before attempting redelivery
120
         */
121
        try {
122
            /** @var AMQPChannel */
123 3
            $channel = $_message->getChannel();
124 3
            $channel->basic_ack(
125 3
                $_message->getDeliveryTag(),
126 3
                $parameters['multiple']
127
            );
128
        } catch (AMQPRuntimeException $error) { // @codeCoverageIgnore
129
            Exception::rethrow($error); // @codeCoverageIgnore
130
        }
131 3
    }
132
133
    /**
134
     * Unacknowledges an AMQP message object.
135
     * Starting from v1.1.1, you can use php-amqplib AMQPMessage::nack() method instead.
136
     * @param AMQPChannel|null $_channel [optional] The channel that should be used. The method will try using the channel attached with the message if no channel was specified, although there is no guarantee this will work as this depends on the way the message was fetched.
137
     * @param AMQPMessage $_message The message object that should be unacknowledged.
138
     * @param array|null $parameters [optional] The overrides for the default exchange options.
139
     * @return void
140
     * @throws AMQPRuntimeException
141
     */
142 1
    public static function nack(?AMQPChannel $_channel, AMQPMessage $_message, ?array $parameters = null): void
143
    {
144 1
        $parameters = Parameters::patch($parameters ?? [], 'NACK_OPTIONS');
145
146
        try {
147
            /** @var AMQPChannel */
148 1
            $channel = $_channel ?? $_message->getChannel();
149 1
            $channel->basic_nack(
150 1
                $_message->getDeliveryTag(),
151 1
                $parameters['multiple'],
152 1
                $parameters['requeue']
153
            );
154
        } catch (AMQPRuntimeException $error) { // @codeCoverageIgnore
155
            Exception::rethrow($error); // @codeCoverageIgnore
156
        }
157 1
    }
158
159
    /**
160
     * Gets a message object from a channel, direct access to a queue.
161
     * @deprecated 1.0.0 Direct queue access is not recommended. Use `self::consume()` instead.
162
     * @param AMQPChannel $_channel The channel that should be used.
163
     * @param array|null $parameters [optional] The overrides for the default get options.
164
     * @return AMQPMessage|null
165
     * @throws AMQPTimeoutException
166
     */
167 3
    public static function get(AMQPChannel $_channel, ?array $parameters = null): ?AMQPMessage
168
    {
169 3
        $parameters = Parameters::patch($parameters ?? [], 'GET_OPTIONS');
170
171
        try {
172 3
            $return = $_channel->basic_get(
173 3
                $parameters['queue'],
174 3
                $parameters['no_ack'],
175 3
                $parameters['ticket']
176
            );
177
        } catch (AMQPTimeoutException $error) { // @codeCoverageIgnore
178
            Exception::rethrow($error); // @codeCoverageIgnore
179
        }
180
181 3
        return $return;
182
    }
183
184
    /**
185
     * Ends a queue consumer.
186
     * @param AMQPChannel $_channel The channel that should be used.
187
     * @param array|null $parameters [optional] The overrides for the default cancel options.
188
     * @return mixed
189
     * @throws AMQPTimeoutException
190
     */
191 1
    public static function cancel(AMQPChannel $_channel, ?array $parameters = null)
192
    {
193 1
        $parameters = Parameters::patch($parameters ?? [], 'CANCEL_OPTIONS');
194
195
        try {
196 1
            $return = $_channel->basic_cancel(
197 1
                $parameters['consumer_tag'],
198 1
                $parameters['nowait'],
199 1
                $parameters['noreturn']
200
            );
201
        } catch (AMQPTimeoutException $error) { // @codeCoverageIgnore
202
            Exception::rethrow($error); // @codeCoverageIgnore
203
        }
204
205 1
        return $return;
206
    }
207
208
    /**
209
     * Redelivers unacknowledged messages
210
     * @param AMQPChannel $_channel The channel that should be used.
211
     * @param array|null $parameters [optional] The overrides for the default recover options.
212
     * @return mixed
213
     * @throws AMQPTimeoutException
214
     */
215 1
    public static function recover(AMQPChannel $_channel, ?array $parameters = null)
216
    {
217 1
        $parameters = Parameters::patch($parameters ?? [], 'RECOVER_OPTIONS');
218
219
        try {
220 1
            $return = $_channel->basic_recover(
221 1
                $parameters['requeue']
222
            );
223
        } catch (AMQPTimeoutException $error) { // @codeCoverageIgnore
224
            Exception::rethrow($error); // @codeCoverageIgnore
225
        }
226
227 1
        return $return;
228
    }
229
230
    /**
231
     * Rejects an AMQP message object.
232
     * @deprecated Starting from v1.1.1, you can use php-amqplib native AMQPMessage::reject() method instead.
233
     * @param AMQPChannel $_channel The channel that should be used.
234
     * @param AMQPMessage $_message The message object that should be rejected.
235
     * @param array|null $parameters [optional] The overrides for the default reject options.
236
     * @return void
237
     * @throws AMQPRuntimeException
238
     */
239 1
    public static function reject(AMQPChannel $_channel, AMQPMessage $_message, ?array $parameters = null): void
240
    {
241 1
        $parameters = Parameters::patch($parameters ?? [], 'REJECT_OPTIONS');
242
243
        try {
244 1
            $_channel->basic_reject(
245 1
                $_message->getDeliveryTag(),
246 1
                $parameters['requeue']
247
            );
248
        } catch (AMQPRuntimeException $error) { // @codeCoverageIgnore
249
            Exception::rethrow($error); // @codeCoverageIgnore
250
        }
251 1
    }
252
253
254
    /**
255
     * Specifies the quality of service on the default channel of the worker's connection to RabbitMQ server.
256
     * @param array|null $parameters [optional] The overrides for the default quality of service options of the worker.
257
     * @param AMQPChannel|null $_channel [optional] The channel that should be used instead of the default worker's channel.
258
     * @return self
259
     */
260 10
    public function qos(?array $parameters = null, ?AMQPChannel $_channel = null)
261
    {
262 10
        $changes = null;
263 10
        if ($parameters) {
264 4
            $changes = $this->mutateClassMember('qosOptions', $parameters);
265
        }
266
267 10
        $channel = $_channel ?: $this->channel;
268
269 10
        $channel->basic_qos(
270 10
            $this->qosOptions['prefetch_size'],
271 10
            $this->qosOptions['prefetch_count'],
272 10
            $this->qosOptions['a_global']
273
        );
274
275 10
        if ($changes) {
276 4
            $this->mutateClassMember('qosOptions', $changes);
277
        }
278
279 10
        return $this;
280
    }
281
282
    /**
283
     * Consumes messages from the default channel of the worker's connection to RabbitMQ server.
284
     * @param callback|array|string|null $callback [optional] The callback that the consumer uses to process the messages.
285
     * @param array|null $variables [optional] The variables that should be passed to the callback.
286
     * @param array|null $parameters [optional] The overrides for the default exchange options of the worker.
287
     * @param AMQPChannel|null $_channel [optional] The channel that should be used instead of the default worker's channel.
288
     * @return self
289
     * @throws CallbackDoesNotExistException|AMQPTimeoutException
290
     */
291 8
    public function consume($callback = null, ?array $variables = null, ?array $parameters = null, ?AMQPChannel $_channel = null)
292
    {
293 8
        $changes = null;
294 8
        if ($parameters) {
295 4
            $changes = $this->mutateClassMember('consumeOptions', $parameters);
296
        }
297
298 8
        $channel = $_channel ?: $this->channel;
299
300 8
        $originalCallback = $this->consumeOptions['callback'];
301
302 8
        $callback = $callback ?: $originalCallback;
303
304 8
        if (is_callable($callback)) {
305 6
            if (is_array($callback) || is_string($callback)) {
306
                $this->consumeOptions['callback'] = function ($message) use ($callback, $variables) {
307 4
                    if ($variables) {
308 2
                        array_unshift($variables, $message);
309 2
                        call_user_func_array($callback, $variables);
310
                    } else {
311 2
                        call_user_func($callback, $message);
312
                    }
313 5
                };
314
            } else {
315
                $this->consumeOptions['callback'] = function ($message) use ($callback, $variables) {
316
                    // @codeCoverageIgnoreStart
317
                    if ($variables) {
318
                        $variables = array_values($variables);
319
                        $callback($message, ...$variables);
320
                    } else {
321
                        $callback($message);
322
                    }
323
                    // @codeCoverageIgnoreEnd
324 6
                };
325
            }
326
        } else {
327 2
            throw new CallbackDoesNotExistException(
328 2
                sprintf(
329 2
                    'The first parameter must be a valid callable, a callback, a variable containing a callback, a name of a function as string, a string like %s, or an array like %s. The given parameter (data-type: %s) was none of them.',
330 2
                    '"Foo\Bar\Baz::qux"',
331 2
                    '["Foo\Bar\Baz", "qux"]',
332 2
                    is_object($callback) ? get_class($callback) : gettype($callback)
333
                )
334
            );
335
        }
336
337
        try {
338 6
            $channel->basic_consume(
339 6
                $this->consumeOptions['queue'],
340 6
                $this->consumeOptions['consumer_tag'],
341 6
                $this->consumeOptions['no_local'],
342 6
                $this->consumeOptions['no_ack'],
343 6
                $this->consumeOptions['exclusive'],
344 6
                $this->consumeOptions['nowait'],
345 6
                $this->consumeOptions['callback'],
346 6
                $this->consumeOptions['ticket'],
347 6
                $this->consumeOptions['arguments']
348
            );
349
        } catch (AMQPTimeoutException $error) { // @codeCoverageIgnore
350
            Exception::rethrow($error); // @codeCoverageIgnore
351 6
        } finally {
352
            // reverting consumeOptions back to its state.
353 6
            $this->consumeOptions['callback'] = $originalCallback;
354
        }
355
356
357 6
        if ($changes) {
358 4
            $this->mutateClassMember('consumeOptions', $changes);
359
        }
360
361 6
        register_shutdown_function([__CLASS__, 'shutdown'], ...array_values(array_merge($this->channels, $this->connections)));
362
363 6
        return $this;
364
    }
365
366
    /**
367
     * Checks whether the default channel is consuming.
368
     * @param AMQPChannel|null $_channel [optional] The channel that should be used instead of the default worker's channel.
369
     * @return bool
370
     */
371 1
    public function isConsuming(?AMQPChannel $_channel = null): bool
372
    {
373 1
        $channel = $_channel ?: $this->channel;
374 1
        return $channel->is_consuming();
375
    }
376
377
    /**
378
     * Keeps the connection to RabbitMQ server alive as long as the default channel is in used.
379
     * @param array|null $parameters [optional] The overrides for the default exchange options of the worker.
380
     * @param AMQPChannel|null $_channel [optional] The channel that should be used instead of the default worker's channel.
381
     * @return self
382
     * @throws AMQPOutOfBoundsException|AMQPRuntimeException|AMQPTimeoutException
383
     */
384 3
    public function wait(?array $parameters = null, ?AMQPChannel $_channel = null)
385
    {
386 3
        $changes = null;
387 3
        if ($parameters) {
388 1
            $changes = $this->mutateClassMember('waitOptions', $parameters);
389
        }
390
391 3
        $channel = $_channel ?: $this->channel;
392
393 3
        ignore_user_abort(true);
394 3
        set_time_limit(0);
395
396 3
        while ($channel->is_consuming()) {
397
            try {
398 3
                $channel->wait(
399 3
                    $this->waitOptions['allowed_methods'],
400 3
                    $this->waitOptions['non_blocking'],
401 3
                    $this->waitOptions['timeout']
402
                );
403
            } catch (AMQPOutOfBoundsException | AMQPRuntimeException | AMQPTimeoutException $error) { // @codeCoverageIgnore
404
                Exception::rethrow($error); // @codeCoverageIgnore
405
            }
406
        }
407
408 3
        if ($changes) {
409 1
            $this->mutateClassMember('waitOptions', $changes);
410
        }
411
412 3
        return $this;
413
    }
414
415
    /**
416
     * Tries to keep the connection to RabbitMQ server alive as long as there are channels in used (default or not).
417
     * @param array|null $parameters [optional] The overrides for the default exchange options of the worker.
418
     * @param AMQPStreamConnection|null $_connection [optional] The connection that should be used instead of the default worker's connection.
419
     * @return self
420
     * @throws AMQPOutOfBoundsException|AMQPRuntimeException|AMQPTimeoutException
421
     */
422 1
    public function waitForAll(?array $parameters = null, ?AMQPStreamConnection $_connection = null)
423
    {
424 1
        $changes = null;
425 1
        if ($parameters) {
426 1
            $changes = $this->mutateClassMember('waitOptions', $parameters);
427
        }
428
429 1
        $connection = $_connection ?: $this->connection;
430
431
        // count must is >= 1 because the first channel is the connection itself.
432
        // this means there are always at least two, one connection and one channel.
433 1
        $active = count($connection->channels) > 1;
434
435 1
        ignore_user_abort(true);
436 1
        set_time_limit(0);
437
438 1
        while ($active) {
439
            try {
440 1
                $breaks = 0;
441
442 1
                foreach ($connection->channels as $channel) {
443 1
                    if ($channel instanceof AMQPChannel) {
444 1
                        if ($channel->is_consuming()) {
445 1
                            $channel->wait(
446 1
                                $this->waitOptions['allowed_methods'],
447 1
                                $this->waitOptions['non_blocking'],
448 1
                                $this->waitOptions['timeout']
449
                            );
450
                        } else {
451 1
                            $breaks++;
452
                        }
453
                    }
454
                }
455
456
                // refresh channels count to allow waiting for new channels.
457 1
                $count = count($connection->channels);
458
459 1
                if ($breaks === $count - 1) {
460 1
                    $active = false;
0 ignored issues
show
Unused Code introduced by
The assignment to $active is dead and can be removed.
Loading history...
461 1
                    break;
462
                }
463
            } catch (AMQPOutOfBoundsException | AMQPRuntimeException | AMQPTimeoutException $error) { // @codeCoverageIgnore
464
                Exception::rethrow($error); // @codeCoverageIgnore
465
            }
466
        }
467
468 1
        if ($changes) {
469 1
            $this->mutateClassMember('waitOptions', $changes);
470
        }
471
472 1
        return $this;
473
    }
474
475
    /**
476
     * Executes `self::connect()`, `self::queue()`, and `self::qos()` respectively (note that `self::wait()` needs to be executed after `self::consume()`).
477
     * @return self
478
     */
479 6
    public function prepare()
480
    {
481 6
        $this->connect();
482 6
        $this->queue();
483 6
        $this->qos();
484
485 6
        return $this;
486
    }
487
488
    /**
489
     * Executes `self::connect()`, `self::queue()`, `self::qos()`, `self::consume()`, `self::wait()`, and `self::disconnect()` respectively.
490
     * @param callback|array|string $callback The callback that the consumer should use to process the messages (same as `self::consume()`).
491
     * @return void
492
     * @throws Exception
493
     */
494 2
    public function work($callback): void
495
    {
496
        try {
497 2
            $this->prepare();
498 2
            $this->consume($callback);
499 1
            $this->wait();
500 1
            $this->disconnect();
501 1
        } catch (Exception $error) {
502 1
            Exception::rethrow($error, null, false);
503
        }
504 1
    }
505
}
506