Test Failed
Pull Request — master (#38)
by Aleksandr
14:31 queued 04:35
created

BaseConsumer::getCallback()   A

Complexity

Conditions 1
Paths 1

Size

Total Lines 3
Code Lines 1

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 0
CRAP Score 2

Importance

Changes 0
Metric Value
cc 1
eloc 1
c 0
b 0
f 0
nc 1
nop 0
dl 0
loc 3
ccs 0
cts 0
cp 0
crap 2
rs 10
1
<?php
2
3
namespace OldSound\RabbitMqBundle\RabbitMq;
4
5
use PhpAmqpLib\Message\AMQPMessage;
6
7
abstract class BaseConsumer extends BaseAmqp implements DequeuerInterface
8
{
9
    /** @var int */
10
    protected $target;
11
12
    /** @var int */
13
    protected $consumed = 0;
14
15
    /** @var callable */
16
    protected $callback;
17
18
    /** @var bool */
19
    protected $forceStop = false;
20
21
    /** @var int */
22
    protected $idleTimeout = 0;
23
24
    /** @var int */
25
    protected $idleTimeoutExitCode;
26
27 15
    /** @var string[] */
28
    protected $groups = ['default'];
29 15
30 15
    public function setCallback($callback)
31
    {
32
        $this->callback = $callback;
33
    }
34
35
    /**
36
     * @return callable
37
     */
38
    public function getCallback()
39
    {
40
        return $this->callback;
41
    }
42
43
    /**
44
     * @param int $msgAmount
45
     * @throws \ErrorException
46
     */
47
    public function start($msgAmount = 0)
48
    {
49
        $this->target = $msgAmount;
50
51
        $this->setupConsumer();
52
53
        while (count($this->getChannel()->callbacks)) {
54
            $this->getChannel()->wait();
55
        }
56
    }
57
58
    /**
59
     * Tell the server you are going to stop consuming.
60
     *
61
     * It will finish up the last message and not send you any more.
62
     */
63
    public function stopConsuming()
64
    {
65
        // This gets stuck and will not exit without the last two parameters set.
66 12
        $this->getChannel()->basic_cancel($this->getConsumerTag(), false, true);
67
    }
68 12
69
    public function setupConsumer()
70
    {
71 12
        if ($this->autoSetupFabric) {
72 12
            $this->setupFabric();
73
        }
74
        $this->getChannel()->basic_consume($this->queueOptions['name'], $this->getConsumerTag(), false, false, false, false, array($this, 'processMessage'));
75
    }
76
77
    public function processMessage(AMQPMessage $msg)
0 ignored issues
show
Unused Code introduced by
The parameter $msg is not used and could be removed. ( Ignorable by Annotation )

If this is a false-positive, you can also ignore this issue in your code via the ignore-unused  annotation

77
    public function processMessage(/** @scrutinizer ignore-unused */ AMQPMessage $msg)

This check looks for parameters that have been defined for a function or method, but which are not used in the method body.

Loading history...
78
    {
79 42
        //To be implemented by descendant classes
80
    }
81 42
82 42
    protected function maybeStopConsumer()
83
    {
84
        if (extension_loaded('pcntl') && (defined('AMQP_WITHOUT_SIGNALS') ? !AMQP_WITHOUT_SIGNALS : true)) {
85
            if (!function_exists('pcntl_signal_dispatch')) {
86 42
                throw new \BadFunctionCallException("Function 'pcntl_signal_dispatch' is referenced in the php.ini 'disable_functions' and can't be called.");
87
            }
88
89 42
            pcntl_signal_dispatch();
90
        }
91
92 42
        if ($this->forceStop || ($this->consumed == $this->target && $this->target > 0)) {
93
            $this->stopConsuming();
94
        }
95
    }
96
97
    public function setConsumerTag($tag)
98
    {
99 12
        $this->consumerTag = $tag;
100
    }
101 12
102
    public function getConsumerTag()
103
    {
104 1
        return $this->consumerTag;
105
    }
106 1
107 1
    public function forceStopConsumer()
108
    {
109
        $this->forceStop = true;
110
    }
111
112
    /**
113
     * Sets the qos settings for the current channel
114
     * Consider that prefetchSize and global do not work with rabbitMQ version <= 8.0
115
     *
116
     * @param int $prefetchSize
117
     * @param int $prefetchCount
118
     * @param bool $global
119
     */
120
    public function setQosOptions($prefetchSize = 0, $prefetchCount = 0, $global = false)
121
    {
122 3
        $this->getChannel()->basic_qos($prefetchSize, $prefetchCount, $global);
123
    }
124 3
125 3
    public function setIdleTimeout($idleTimeout)
126
    {
127
        $this->idleTimeout = $idleTimeout;
128
    }
129
130
    /**
131
     * Set exit code to be returned when there is a timeout exception
132 3
     *
133
     * @param int|null $idleTimeoutExitCode
134 3
     */
135 3
    public function setIdleTimeoutExitCode($idleTimeoutExitCode)
136
    {
137 13
        $this->idleTimeoutExitCode = $idleTimeoutExitCode;
138
    }
139 13
140
    public function getIdleTimeout()
141
    {
142
        return $this->idleTimeout;
143
    }
144
145
    /**
146
     * Get exit code to be returned when there is a timeout exception
147 5
     *
148
     * @return int|null
149 5
     */
150
    public function getIdleTimeoutExitCode()
151
    {
152
        return $this->idleTimeoutExitCode;
153
    }
154
155
    public function setGroups(array $groups)
156
    {
157
        $this->groups = $groups;
158
    }
159
160
    /**
161
     * @return string[]
162
     */
163
    public function getGroups()
164
    {
165
        return $this->groups;
166
    }
167
168
    /**
169
     * Resets the consumed property.
170
     * Use when you want to call start() or consume() multiple times.
171
     */
172
    public function resetConsumed()
173
    {
174
        $this->consumed = 0;
175
    }
176
}
177