Consumer::handle()   B
last analyzed

Complexity

Conditions 6
Paths 9

Size

Total Lines 51
Code Lines 23

Duplication

Lines 0
Ratio 0 %

Importance

Changes 1
Bugs 0 Features 0
Metric Value
eloc 23
c 1
b 0
f 0
dl 0
loc 51
rs 8.9297
cc 6
nc 9
nop 3

How to fix   Long Method   

Long Method

Small methods make your code easier to understand, in particular if combined with a good name. Besides, if your method is small, finding a good name is usually much easier.

For example, if you find yourself adding comments to a method's body, this is usually a good sign to extract the commented part to a new method, and use the comment as a starting point when coming up with a good name for this new method.

Commonly applied refactorings include:

1
<?php
2
3
declare(strict_types=1);
4
5
namespace BinaryCube\CarrotMQ;
6
7
use Throwable;
8
use Interop\Amqp;
9
use LogicException;
10
use Psr\Log\LoggerInterface;
11
use BinaryCube\CarrotMQ\Event;
12
use BinaryCube\CarrotMQ\Entity;
13
use BinaryCube\CarrotMQ\Support\Collection;
14
use BinaryCube\CarrotMQ\Extension\Extension;
15
use BinaryCube\CarrotMQ\Exception\Exception;
16
use BinaryCube\CarrotMQ\Support\DispatcherAwareTrait;
17
use BinaryCube\CarrotMQ\Collection\ExtensionRepository;
18
use BinaryCube\CarrotMQ\Support\AutoWireAwareTrait;
19
use BinaryCube\CarrotMQ\Exception\StopConsumerException;
20
21
use function vsprintf;
22
use function getmypid;
23
use function microtime;
24
use function get_class;
25
use function gethostname;
26
use function is_subclass_of;
27
28
/**
29
 * Class Consumer
30
 */
31
class Consumer extends Core implements ConsumerInterface
32
{
33
    use AutoWireAwareTrait;
34
    use DispatcherAwareTrait;
35
36
    /**
37
     * @const array Default consumer parameters
38
     */
39
    const DEFAULTS = [
40
        // In Seconds.
41
        'receive_timeout' => 30,
42
43
        'qos' => [
44
            'enabled'        => false,
45
            'prefetch_size'  => 0,
46
            'prefetch_count' => 0,
47
            'global'         => false,
48
        ],
49
    ];
50
51
    /**
52
     * @var string
53
     */
54
    protected $tag;
55
56
    /**
57
     * @var Entity\Queue
58
     */
59
    protected $queue;
60
61
    /**
62
     * @var Processor\Processor
63
     */
64
    protected $processor;
65
66
    /**
67
     * @var ExtensionRepository
68
     */
69
    protected $extensions;
70
71
    /**
72
     * @var array
73
     */
74
    protected $config;
75
76
    /**
77
     * Constructor.
78
     *
79
     * @param string               $id
80
     * @param Entity\Queue         $queue
81
     * @param Processor\Processor  $processor
82
     * @param Container            $container
83
     * @param array                $config
84
     * @param LoggerInterface|null $logger
85
     */
86
    public function __construct(
87
        string $id,
88
        Entity\Queue $queue,
89
        Processor\Processor $processor,
90
        Container $container,
91
        array $config = [],
92
        ?LoggerInterface $logger = null
93
    ) {
94
        parent::__construct($id, $container, $logger);
95
96
        $this->id         = $id;
97
        $this->queue      = $queue;
98
        $this->container  = $container;
99
        $this->processor  = $processor;
100
        $this->config     = Collection::make(static::DEFAULTS)->merge($config)->all();
101
        $this->extensions = new ExtensionRepository();
102
103
        $this->tag = (
104
            ! empty($this->tag)
105
                ? $this->tag
106
                : vsprintf('[host: %s]-[pid: %s]-[queue: %s]', [gethostname(), getmypid(), $this->queue->name()])
107
        );
108
109
        $this->queue->setLogger($this->logger);
110
    }
111
112
    /**
113
     * Get or set the tag.
114
     *
115
     * @param null|string $name
116
     *
117
     * @return $this|string
118
     */
119
    public function tag($name = null)
120
    {
121
        if (isset($name)) {
122
            $this->tag = $name;
123
            return $this;
124
        }
125
126
        return $this->tag;
127
    }
128
129
    /**
130
     * @return ExtensionRepository
131
     */
132
    public function extensions(): ExtensionRepository
133
    {
134
        return $this->extensions;
135
    }
136
137
    /**
138
     * @return Entity\Queue
139
     */
140
    public function queue(): Entity\Queue
141
    {
142
        return $this->queue;
143
    }
144
145
    /**
146
     * @return Connection
147
     */
148
    public function connection(): Connection
149
    {
150
        return $this->queue->connection();
151
    }
152
153
    /**
154
     * @return $this
155
     *
156
     * @throws Exception
157
     */
158
    protected function prepare()
159
    {
160
        $this->dispatcher(true);
161
162
        foreach ($this->extensions->all() as $extension) {
163
            if (! is_subclass_of($extension, Extension::class)) {
164
                throw new Exception(
165
                    vsprintf(
166
                        'The given class, "%s", is not an instance of "%s"',
167
                        [
168
                            get_class($extension),
169
                            Extension::class,
170
                        ]
171
                    )
172
                );
173
            }
174
175
            /**
176
             * @var Extension $extension
177
             */
178
            $extension->setLogger($this->logger);
179
180
            $this->dispatcher->addSubscriber($extension);
181
        }
182
183
        $this->queue->setLogger($this->logger);
184
185
        $this->autoWire($this->container);
186
187
        return $this;
188
    }
189
190
    /**
191
     * @return int
192
     *
193
     * @throws Throwable
194
     */
195
    public function consume(): int
196
    {
197
        $qos            = $this->config['qos'];
198
        $receiveTimeout = (int) ($this->config['receive_timeout'] * 1e3);
199
200
        $this->logger->debug(
201
            vsprintf(
202
                'Consumer "%s" start consuming queue "%s" ("%s")',
203
                [$this->id(), $this->queue->id(), $this->queue->name()]
204
            )
205
        );
206
207
        $this->prepare();
208
209
        $context = $this->queue->context(true);
210
211
        if ($qos['enabled']) {
212
            $context->setQos((int) $qos['prefetch_size'], (int) $qos['prefetch_count'], (bool) $qos['global']);
213
        }
214
215
        $consumer = $context->createConsumer($this->queue->model());
216
217
        /**
218
         * @var Amqp\AmqpSubscriptionConsumer $subscription
219
         */
220
        $subscription = $context->createSubscriptionConsumer();
221
222
        /*
223
        |--------------------------------------------------------------------------
224
        | Consumer tag
225
        |--------------------------------------------------------------------------
226
        |
227
        */
228
        $consumer->setConsumerTag($this->tag);
229
230
        /*
231
        |--------------------------------------------------------------------------
232
        | Mounting the processor.
233
        |--------------------------------------------------------------------------
234
        |
235
        */
236
        $this->processor->mount($this->queue, $context, $consumer, $this->container, $this->logger);
237
238
        /*
239
        |--------------------------------------------------------------------------
240
        | Message Receiver
241
        |--------------------------------------------------------------------------
242
        |
243
        */
244
        $subscription->subscribe(
245
            $consumer,
246
            function ($message, $consumer) use ($context) {
247
                return $this->handle($message, $consumer, $context);
248
            }
249
        );
250
251
        $startTime = microtime(true);
252
253
        /*
254
        |--------------------------------------------------------------------------
255
        | Start Event
256
        |--------------------------------------------------------------------------
257
        |
258
        */
259
        $startEvent = new Event\Consumer\Start($this->queue, $context, $startTime);
0 ignored issues
show
Bug introduced by
It seems like $startTime can also be of type string; however, parameter $startTime of BinaryCube\CarrotMQ\Even...er\Start::__construct() does only seem to accept double, maybe add an additional type check? ( Ignorable by Annotation )

If this is a false-positive, you can also ignore this issue in your code via the ignore-type  annotation

259
        $startEvent = new Event\Consumer\Start($this->queue, $context, /** @scrutinizer ignore-type */ $startTime);
Loading history...
260
261
        $this->dispatcher->dispatch($startEvent, Event\Consumer\Start::name());
262
263
        if ($startEvent->isExecutionInterrupted()) {
264
            $this->end($this->queue, $context, $startTime, $startEvent->exitStatus(), $subscription);
0 ignored issues
show
Bug introduced by
It seems like $startTime can also be of type string; however, parameter $startTime of BinaryCube\CarrotMQ\Consumer::end() does only seem to accept double, maybe add an additional type check? ( Ignorable by Annotation )

If this is a false-positive, you can also ignore this issue in your code via the ignore-type  annotation

264
            $this->end($this->queue, $context, /** @scrutinizer ignore-type */ $startTime, $startEvent->exitStatus(), $subscription);
Loading history...
265
266
            return 0;
267
        }
268
269
        while (true) {
270
            try {
271
                $subscription->consume($receiveTimeout);
272
273
                /*
274
                |--------------------------------------------------------------------------
275
                | Idle Event.
276
                |--------------------------------------------------------------------------
277
                |
278
                */
279
                $idleEvent = new Event\Consumer\Idle($this->queue, $context);
280
281
                $this->dispatcher->dispatch($idleEvent, Event\Consumer\Idle::name());
282
283
                if ($idleEvent->isExecutionInterrupted()) {
284
                    $this->end($this->queue, $context, $startTime, $idleEvent->exitStatus(), $subscription);
285
                    break;
286
                }
287
                //
288
            } catch (StopConsumerException $exception) {
289
                $this->end($this->queue, $context, $startTime, null, $subscription);
290
                break;
291
            } catch (Throwable $exception) {
292
                $this->end($this->queue, $context, $startTime, 0, $subscription);
293
                throw $exception;
294
            }//end try
295
            //end try
296
        }//end while
297
298
        return 0;
299
    }
300
301
    /**
302
     * @param Amqp\AmqpMessage  $message
303
     * @param Amqp\AmqpConsumer $consumer
304
     * @param Amqp\AmqpContext  $context
305
     *
306
     * @return bool
307
     *
308
     * @throws StopConsumerException
309
     */
310
    private function handle($message, $consumer, $context): bool
311
    {
312
        $receivedAt = microtime(true);
313
314
        /*
315
        |--------------------------------------------------------------------------
316
        | Message Received Event.
317
        |--------------------------------------------------------------------------
318
        |
319
        */
320
        $messageReceivedEvent = new Event\Consumer\MessageReceived($this->queue, $context, $message, $receivedAt);
0 ignored issues
show
Bug introduced by
It seems like $receivedAt can also be of type string; however, parameter $receivedAt of BinaryCube\CarrotMQ\Even...Received::__construct() does only seem to accept double, maybe add an additional type check? ( Ignorable by Annotation )

If this is a false-positive, you can also ignore this issue in your code via the ignore-type  annotation

320
        $messageReceivedEvent = new Event\Consumer\MessageReceived($this->queue, $context, $message, /** @scrutinizer ignore-type */ $receivedAt);
Loading history...
321
322
        $this->dispatcher->dispatch($messageReceivedEvent, Event\Consumer\MessageReceived::name());
323
324
        $result = $this->processor->process($message, $context);
325
326
        switch ($result) {
327
            case Processor\Processor::ACK:
328
                $consumer->acknowledge($message);
329
                break;
330
331
            case Processor\Processor::REJECT:
332
                $consumer->reject($message, false);
333
                break;
334
335
            case Processor\Processor::REQUEUE:
336
                $consumer->reject($message, true);
337
                break;
338
339
            case Processor\Processor::SELF_ACK:
340
                break;
341
342
            default:
343
                throw new LogicException(vsprintf('Status is not supported: %s', [$result]));
344
        }
345
346
        /*
347
        |--------------------------------------------------------------------------
348
        | After Message Received Event.
349
        |--------------------------------------------------------------------------
350
        |
351
        */
352
        $afterMessageReceived = new Event\Consumer\AfterMessageReceived($this->queue, $context, $message, $receivedAt, $result);
0 ignored issues
show
Bug introduced by
It seems like $receivedAt can also be of type string; however, parameter $receivedAt of BinaryCube\CarrotMQ\Even...Received::__construct() does only seem to accept double, maybe add an additional type check? ( Ignorable by Annotation )

If this is a false-positive, you can also ignore this issue in your code via the ignore-type  annotation

352
        $afterMessageReceived = new Event\Consumer\AfterMessageReceived($this->queue, $context, $message, /** @scrutinizer ignore-type */ $receivedAt, $result);
Loading history...
353
354
        $this->dispatcher->dispatch($afterMessageReceived, Event\Consumer\AfterMessageReceived::name());
355
356
        if ($afterMessageReceived->isExecutionInterrupted()) {
357
            throw new StopConsumerException();
358
        }
359
360
        return true;
361
    }
362
363
    /**
364
     * @param Entity\Queue                       $queue
365
     * @param Amqp\AmqpContext                   $context
366
     * @param float                              $startTime
367
     * @param int|null                           $exitStatus
368
     * @param Amqp\AmqpSubscriptionConsumer|null $subscription
369
     *
370
     * @return $this
371
     */
372
    private function end(
373
        Entity\Queue $queue,
374
        Amqp\AmqpContext $context,
375
        float $startTime,
376
        ?int $exitStatus = null,
377
        Amqp\AmqpSubscriptionConsumer $subscription = null
378
    ) {
379
        $endTime = microtime(true);
380
381
        $endEvent = new Event\Consumer\End($queue, $context, $startTime, $endTime, $exitStatus);
0 ignored issues
show
Bug introduced by
It seems like $endTime can also be of type string; however, parameter $endTime of BinaryCube\CarrotMQ\Even...umer\End::__construct() does only seem to accept double, maybe add an additional type check? ( Ignorable by Annotation )

If this is a false-positive, you can also ignore this issue in your code via the ignore-type  annotation

381
        $endEvent = new Event\Consumer\End($queue, $context, $startTime, /** @scrutinizer ignore-type */ $endTime, $exitStatus);
Loading history...
382
383
        $this->dispatcher->dispatch($endEvent, Event\Consumer\End::name());
384
385
        try {
386
            $this->processor->unmount();
387
        } catch (Throwable $exception) {
388
            // Do nothing.
389
        }
390
391
        try {
392
            if ($subscription) {
393
                $subscription->unsubscribeAll();
394
            }
395
        } catch (Throwable $exception) {
396
            // Do nothing.
397
        }
398
399
        try {
400
            $this->queue->connection()->close();
401
        } catch (Throwable $exception) {
402
            // Do nothing.
403
        }
404
405
        $this->logger->debug(
406
            vsprintf(
407
                'Consumer "%s" stop consuming queue "%s" ("%s")',
408
                [$this->id(), $this->queue->id(), $this->queue->name()]
409
            )
410
        );
411
412
        return $this;
413
    }
414
415
}
416