LimitConsumedMessagesExtension   A
last analyzed

Complexity

Total Complexity 8

Size/Duplication

Total Lines 103
Duplicated Lines 0 %

Importance

Changes 1
Bugs 0 Features 0
Metric Value
wmc 8
eloc 22
c 1
b 0
f 0
dl 0
loc 103
rs 10

6 Methods

Rating   Name   Duplication   Size   Complexity  
A description() 0 3 1
A getSubscribedEvents() 0 4 1
A onAfterMessageReceived() 0 6 2
A name() 0 3 1
A shouldBeStopped() 0 19 2
A __construct() 0 6 1
1
<?php
2
3
declare(strict_types=1);
4
5
namespace BinaryCube\CarrotMQ\Extension;
6
7
use BinaryCube\CarrotMQ\Event;
8
9
use function vsprintf;
10
11
/**
12
 * Class LimitConsumedMessagesExtension
13
 */
14
class LimitConsumedMessagesExtension extends Extension
15
{
16
17
    /**
18
     * @var integer
19
     */
20
    protected $limit;
21
22
    /**
23
     * @var integer
24
     */
25
    protected $consumed;
26
27
    /**
28
     * Returns an array of event names this subscriber wants to listen to.
29
     *
30
     * The array keys are event names and the value can be:
31
     *
32
     *  * The method name to call (priority defaults to 0)
33
     *  * An array composed of the method name to call and the priority
34
     *  * An array of arrays composed of the method names to call and respective
35
     *    priorities, or 0 if unset
36
     *
37
     * For instance:
38
     *
39
     *  * ['eventName' => 'methodName']
40
     *  * ['eventName' => ['methodName', $priority]]
41
     *  * ['eventName' => [['methodName1', $priority], ['methodName2']]]
42
     *
43
     * @return array The event names to listen to
44
     */
45
    public static function getSubscribedEvents()
46
    {
47
        return [
48
            Event\Consumer\AfterMessageReceived::name() => 'onAfterMessageReceived',
49
        ];
50
    }
51
52
    /**
53
     * @return string
54
     */
55
    public static function name(): string
56
    {
57
        return 'LimitConsumedMessagesExtension';
58
    }
59
60
    /**
61
     * @return string
62
     */
63
    public static function description(): string
64
    {
65
        return '';
66
    }
67
68
    /**
69
     * Constructor.
70
     *
71
     * @param int $limit
72
     */
73
    public function __construct(int $limit)
74
    {
75
        parent::__construct();
76
77
        $this->limit    = $limit;
78
        $this->consumed = 0;
79
    }
80
81
    /**
82
     * @param Event\Consumer\AfterMessageReceived $event
83
     *
84
     * @return void
85
     */
86
    public function onAfterMessageReceived(Event\Consumer\AfterMessageReceived $event): void
87
    {
88
        $this->consumed++;
89
90
        if ($this->shouldBeStopped()) {
91
            $event->interruptExecution();
92
        }
93
    }
94
95
    /**
96
     * @return boolean
97
     */
98
    public function shouldBeStopped(): bool
99
    {
100
        if ($this->consumed < $this->limit) {
101
            return false;
102
        }
103
104
        $this
105
            ->logger
106
            ->debug(
107
                vsprintf(
108
                    '[%s] Interrupt execution. Reached the limit of %s',
109
                    [
110
                        self::name(),
111
                        $this->limit,
112
                    ]
113
                )
114
            );
115
116
        return true;
117
    }
118
119
}
120