BaseConsumer::setupConsumer()   A
last analyzed

Complexity

Conditions 2
Paths 2

Size

Total Lines 16
Code Lines 11

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 14
CRAP Score 2

Importance

Changes 0
Metric Value
dl 0
loc 16
ccs 14
cts 14
cp 1
rs 9.4285
c 0
b 0
f 0
cc 2
eloc 11
nc 2
nop 0
crap 2
1
<?php
2
3
namespace RabbitMqModule;
4
5
use PhpAmqpLib\Message\AMQPMessage;
6
use Zend\EventManager\EventManagerAwareInterface;
7
use Zend\EventManager\EventManagerAwareTrait;
8
9
/**
10
 * Class BaseConsumer
11
 * @package RabbitMqModule
12
 */
13
abstract class BaseConsumer extends BaseAmqp implements
14
    EventManagerAwareInterface
15
{
16
    use EventManagerAwareTrait;
17
18
    /**
19
     * @var string
20
     */
21
    protected $consumerTag;
22
    /**
23
     * @var callable
24
     */
25
    protected $callback;
26
    /**
27
     * @var bool
28
     */
29
    protected $forceStop = false;
30
    /**
31
     * @var int
32
     */
33
    protected $idleTimeout = 0;
34
    /**
35
     * @var bool
36
     */
37
    protected $signalsEnabled = true;
38
39
    /**
40
     * @return bool
41
     */
42 8
    public function isSignalsEnabled()
43
    {
44 8
        return $this->signalsEnabled;
45
    }
46
47
    /**
48
     * @param bool $signalsEnabled
49
     *
50
     * @return $this
51
     */
52 2
    public function setSignalsEnabled($signalsEnabled = true)
53
    {
54 2
        $this->signalsEnabled = $signalsEnabled;
55
56 2
        return $this;
57
    }
58
59
    /**
60
     * @return string
61
     */
62 5
    public function getConsumerTag()
63
    {
64 5
        if (!$this->consumerTag) {
65 3
            $this->consumerTag = sprintf('PHPPROCESS_%s_%s', gethostname(), getmypid());
66 3
        }
67
68 5
        return $this->consumerTag;
69
    }
70
71
    /**
72
     * @param string $consumerTag
73
     *
74
     * @return $this
75
     */
76 3
    public function setConsumerTag($consumerTag)
77
    {
78 3
        $this->consumerTag = $consumerTag;
79
80 3
        return $this;
81
    }
82
83
    /**
84
     * @return callable
85
     */
86 9
    public function getCallback()
87
    {
88 9
        return $this->callback;
89
    }
90
91
    /**
92
     * @param callable $callback
93
     *
94
     * @return $this
95
     */
96 10
    public function setCallback($callback)
97
    {
98 10
        if (!is_callable($callback)) {
99 1
            throw new \InvalidArgumentException('Invalid callback provided');
100
        }
101 9
        $this->callback = $callback;
102
103 9
        return $this;
104
    }
105
106
    /**
107
     * @return int
108
     */
109 5
    public function getIdleTimeout()
110
    {
111 5
        return $this->idleTimeout;
112
    }
113
114
    /**
115
     * @param int $idleTimeout
116
     *
117
     * @return $this
118
     */
119 3
    public function setIdleTimeout($idleTimeout)
120
    {
121 3
        $this->idleTimeout = $idleTimeout;
122
123 3
        return $this;
124
    }
125
126
    /**
127
     * Start consumer.
128
     */
129 1
    public function start()
130
    {
131 1
        $this->setupConsumer();
132 1
        while (count($this->getChannel()->callbacks)) {
133 1
            $this->getChannel()->wait();
134 1
        }
135 1
    }
136
137 3
    protected function setupConsumer()
138
    {
139 3
        if ($this->isAutoSetupFabricEnabled()) {
140 3
            $this->setupFabric();
141 3
        }
142
143 3
        $this->getChannel()->basic_consume(
144 3
            $this->getQueueOptions()->getName(),
145 3
            $this->getConsumerTag(),
146 3
            false,
147 3
            false,
148 3
            false,
149 3
            false,
150 3
            [$this, 'processMessage']
151 3
        );
152 3
    }
153
154
    /**
155
     * Sets the qos settings for the current channel
156
     * Consider that prefetchSize and global do not work with rabbitMQ version <= 8.0.
157
     *
158
     * @param int  $prefetchSize
159
     * @param int  $prefetchCount
160
     * @param bool $global
161
     *
162
     * @return $this
163
     */
164 2
    public function setQosOptions($prefetchSize = 0, $prefetchCount = 0, $global = false)
165
    {
166 2
        $this->getChannel()->basic_qos($prefetchSize, $prefetchCount, $global);
167
168 2
        return $this;
169
    }
170
171
    /**
172
     * @return $this
173
     */
174 8
    protected function maybeStopConsumer()
175
    {
176
        // @codeCoverageIgnoreStart
177
        if (extension_loaded('pcntl') && $this->isSignalsEnabled()) {
178
            if (!function_exists('pcntl_signal_dispatch')) {
179
                throw new \BadFunctionCallException(
180
                    'Function \'pcntl_signal_dispatch\' is referenced in the php.ini'.
181
                    '\'disable_functions\' and can\'t be called.'
182
                );
183
            }
184
            pcntl_signal_dispatch();
185
        }
186
        // @codeCoverageIgnoreEnd
187
188 8
        if ($this->forceStop) {
189 1
            $this->stopConsuming();
190 1
        }
191
192 8
        return $this;
193
    }
194
195
    /**
196
     * @return $this
197
     */
198 1
    public function forceStopConsumer()
199
    {
200 1
        $this->forceStop = true;
201
202 1
        return $this;
203
    }
204
205
    /**
206
     * @return $this
207
     */
208 1
    public function stopConsuming()
209
    {
210 1
        $this->getChannel()->basic_cancel($this->getConsumerTag());
211
212 1
        return $this;
213
    }
214
215
    /**
216
     * @param AMQPMessage $message
217
     * @return void
218
     */
219
    abstract public function processMessage(AMQPMessage $message);
220
}
221