SignalExtension   A
last analyzed

Complexity

Total Complexity 17

Size/Duplication

Total Lines 156
Duplicated Lines 0 %

Importance

Changes 1
Bugs 0 Features 0
Metric Value
wmc 17
eloc 41
c 1
b 0
f 0
dl 0
loc 156
rs 10

8 Methods

Rating   Name   Duplication   Size   Complexity  
A onStart() 0 13 3
A description() 0 3 1
A onIdle() 0 4 2
A getSubscribedEvents() 0 6 1
A shouldBeStopped() 0 13 2
A onAfterMessageReceived() 0 4 2
A name() 0 3 1
A handleSignal() 0 36 5
1
<?php
2
3
declare(strict_types=1);
4
5
namespace BinaryCube\CarrotMQ\Extension;
6
7
use BinaryCube\CarrotMQ\Event;
8
use BinaryCube\CarrotMQ\Exception\Exception;
9
10
use function vsprintf;
11
use function pcntl_signal;
12
use function extension_loaded;
13
use function pcntl_async_signals;
14
15
/**
16
 * Class SignalExtension
17
 */
18
class SignalExtension extends Extension
19
{
20
21
    /**
22
     * @var boolean
23
     */
24
    protected $interruptConsumption = false;
25
26
    /**
27
     * Returns an array of event names this subscriber wants to listen to.
28
     *
29
     * The array keys are event names and the value can be:
30
     *
31
     *  * The method name to call (priority defaults to 0)
32
     *  * An array composed of the method name to call and the priority
33
     *  * An array of arrays composed of the method names to call and respective
34
     *    priorities, or 0 if unset
35
     *
36
     * For instance:
37
     *
38
     *  * ['eventName' => 'methodName']
39
     *  * ['eventName' => ['methodName', $priority]]
40
     *  * ['eventName' => [['methodName1', $priority], ['methodName2']]]
41
     *
42
     * @return array The event names to listen to
43
     */
44
    public static function getSubscribedEvents()
45
    {
46
        return [
47
            Event\Consumer\Start::name()                => 'onStart',
48
            Event\Consumer\AfterMessageReceived::name() => 'onAfterMessageReceived',
49
            Event\Consumer\Idle::name()                 => 'onIdle',
50
        ];
51
    }
52
53
    /**
54
     * @return string
55
     */
56
    public static function name(): string
57
    {
58
        return 'SignalExtension';
59
    }
60
61
    /**
62
     * @return string
63
     */
64
    public static function description(): string
65
    {
66
        return '';
67
    }
68
69
    /**
70
     * @param Event\Consumer\Start $event
71
     *
72
     * @return void
73
     *
74
     * @throws Exception
75
     */
76
    public function onStart(Event\Consumer\Start $event): void
77
    {
78
        if (! extension_loaded('pcntl')) {
79
            throw new Exception('The pcntl extension is required in order to catch signals.');
80
        }
81
82
        pcntl_async_signals(true);
83
84
        foreach ([SIGTERM, SIGINT, SIGHUP, SIGQUIT] as $signal) {
85
            pcntl_signal($signal, [$this, 'handleSignal']);
86
        }
87
88
        $this->interruptConsumption = false;
89
    }
90
91
    /**
92
     * @param Event\Consumer\AfterMessageReceived $event
93
     *
94
     * @return void
95
     */
96
    public function onAfterMessageReceived(Event\Consumer\AfterMessageReceived $event): void
97
    {
98
        if ($this->shouldBeStopped()) {
99
            $event->interruptExecution();
100
        }
101
    }
102
103
    /**
104
     * @param Event\Consumer\Idle $event
105
     *
106
     * @return void
107
     */
108
    public function onIdle(Event\Consumer\Idle $event): void
109
    {
110
        if ($this->shouldBeStopped()) {
111
            $event->interruptExecution();
112
        }
113
    }
114
115
    /**
116
     * @param int $signal
117
     *
118
     * @return $this
119
     */
120
    public function handleSignal($signal)
121
    {
122
        $this
123
            ->logger
124
            ->debug(
125
                vsprintf('[%s] Caught signal: %s', [self::name(), $signal])
126
            );
127
128
        /*
129
        |--------------------------------------------------------------------------
130
        | Signal Map.
131
        |--------------------------------------------------------------------------
132
        | SIGTERM :: supervisor default stop
133
        | SIGQUIT :: kill -s QUIT
134
        | SIGINT  :: ctrl+c
135
        | SIGHUP  :: terminal is closed
136
        */
137
138
        switch ($signal) {
139
            case SIGTERM:
140
            case SIGQUIT:
141
            case SIGINT:
142
            case SIGHUP:
143
                $this
144
                    ->logger
145
                    ->debug(vsprintf('[%s] Interrupt consumption', [self::name()]));
146
147
                $this->interruptConsumption = true;
148
149
                break;
150
151
            default:
152
                break;
153
        }//end switch
154
155
        return $this;
156
    }
157
158
    /**
159
     * @return boolean
160
     */
161
    public function shouldBeStopped(): bool
162
    {
163
        if (false === $this->interruptConsumption) {
164
            return false;
165
        }
166
167
        $this
168
            ->logger
169
            ->debug(vsprintf('[%s] Interrupt execution', [self::name()]));
170
171
        $this->interruptConsumption = false;
172
173
        return true;
174
    }
175
176
}
177