Passed
Pull Request — master (#38)
by Aleksandr
07:19 queued 02:12
created

BaseConsumer::getIdleTimeout()   A

Complexity

Conditions 1
Paths 1

Size

Total Lines 3
Code Lines 1

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 2
CRAP Score 1

Importance

Changes 0
Metric Value
cc 1
eloc 1
c 0
b 0
f 0
nc 1
nop 0
dl 0
loc 3
ccs 2
cts 2
cp 1
crap 1
rs 10
1
<?php
2
3
namespace OldSound\RabbitMqBundle\RabbitMq;
4
5
use PhpAmqpLib\Message\AMQPMessage;
6
7
abstract class BaseConsumer extends BaseAmqp implements DequeuerInterface
8
{
9
    /** @var int */
10
    protected $target;
11
12
    /** @var int */
13
    protected $consumed = 0;
14
15
    /** @var callable */
16
    protected $callback;
17
18
    /** @var bool */
19
    protected $forceStop = false;
20
21
    /** @var int */
22
    protected $idleTimeout = 0;
23
24
    /** @var int */
25
    protected $idleTimeoutExitCode;
26
27
    /** @var string[] */
28
    protected $groups = ['default'];
29
30 15
    public function setCallback($callback)
31
    {
32 15
        $this->callback = $callback;
33 15
    }
34
35
    /**
36
     * @return callable
37
     */
38
    public function getCallback()
39
    {
40
        return $this->callback;
41
    }
42
43
    /**
44
     * @param int $msgAmount
45
     * @throws \ErrorException
46
     */
47
    public function start($msgAmount = 0)
48
    {
49
        $this->target = $msgAmount;
50
51
        $this->setupConsumer();
52
53
        while (count($this->getChannel()->callbacks)) {
54
            $this->getChannel()->wait();
55
        }
56
    }
57
58
    /**
59
     * Tell the server you are going to stop consuming.
60
     *
61
     * It will finish up the last message and not send you any more.
62
     */
63
    public function stopConsuming()
64
    {
65
        // This gets stuck and will not exit without the last two parameters set.
66
        $this->getChannel()->basic_cancel($this->getConsumerTag(), false, true);
67
    }
68
69 12
    public function setupConsumer()
70
    {
71 12
        if ($this->autoSetupFabric) {
72
            $this->setupFabric();
73
        }
74 12
        $this->getChannel()->basic_consume($this->queueOptions['name'], $this->getConsumerTag(), false, false, false, false, array($this, 'processMessage'));
75 12
    }
76
77
    public function processMessage(AMQPMessage $msg)
0 ignored issues
show
Unused Code introduced by
The parameter $msg is not used and could be removed. ( Ignorable by Annotation )

If this is a false-positive, you can also ignore this issue in your code via the ignore-unused  annotation

77
    public function processMessage(/** @scrutinizer ignore-unused */ AMQPMessage $msg)

This check looks for parameters that have been defined for a function or method, but which are not used in the method body.

Loading history...
78
    {
79
        //To be implemented by descendant classes
80
    }
81
82 42
    protected function maybeStopConsumer()
83
    {
84 42
        if (extension_loaded('pcntl') && (defined('AMQP_WITHOUT_SIGNALS') ? !AMQP_WITHOUT_SIGNALS : true)) {
85 42
            if (!function_exists('pcntl_signal_dispatch')) {
86
                throw new \BadFunctionCallException("Function 'pcntl_signal_dispatch' is referenced in the php.ini 'disable_functions' and can't be called.");
87
            }
88
89 42
            pcntl_signal_dispatch();
90
        }
91
92 42
        if ($this->forceStop || ($this->consumed == $this->target && $this->target > 0)) {
93
            $this->stopConsuming();
94
        }
95 42
    }
96
97
    public function setConsumerTag($tag)
98
    {
99
        $this->consumerTag = $tag;
100
    }
101
102 12
    public function getConsumerTag()
103
    {
104 12
        return $this->consumerTag;
105
    }
106
107 1
    public function forceStopConsumer()
108
    {
109 1
        $this->forceStop = true;
110 1
    }
111
112
    /**
113
     * Sets the qos settings for the current channel
114
     * Consider that prefetchSize and global do not work with rabbitMQ version <= 8.0
115
     *
116
     * @param int $prefetchSize
117
     * @param int $prefetchCount
118
     * @param bool $global
119
     */
120
    public function setQosOptions($prefetchSize = 0, $prefetchCount = 0, $global = false)
121
    {
122
        $this->getChannel()->basic_qos($prefetchSize, $prefetchCount, $global);
123
    }
124
125 3
    public function setIdleTimeout($idleTimeout)
126
    {
127 3
        $this->idleTimeout = $idleTimeout;
128 3
    }
129
130
    /**
131
     * Set exit code to be returned when there is a timeout exception
132
     *
133
     * @param int|null $idleTimeoutExitCode
134
     */
135 3
    public function setIdleTimeoutExitCode($idleTimeoutExitCode)
136
    {
137 3
        $this->idleTimeoutExitCode = $idleTimeoutExitCode;
138 3
    }
139
140 13
    public function getIdleTimeout()
141
    {
142 13
        return $this->idleTimeout;
143
    }
144
145
    /**
146
     * Get exit code to be returned when there is a timeout exception
147
     *
148
     * @return int|null
149
     */
150 5
    public function getIdleTimeoutExitCode()
151
    {
152 5
        return $this->idleTimeoutExitCode;
153
    }
154
155
    public function setGroups(array $groups)
156
    {
157
        $this->groups = $groups;
158
    }
159
160
    /**
161
     * @return string[]
162
     */
163
    public function getGroups()
164
    {
165
        return $this->groups;
166
    }
167
168
    /**
169
     * Resets the consumed property.
170
     * Use when you want to call start() or consume() multiple times.
171
     */
172
    public function resetConsumed()
173
    {
174
        $this->consumed = 0;
175
    }
176
}
177