Issues (48)

RabbitMq/BaseConsumer.php (1 issue)

Severity
1
<?php
2
3
namespace OldSound\RabbitMqBundle\RabbitMq;
4
5
use PhpAmqpLib\Message\AMQPMessage;
6
7
abstract class BaseConsumer extends BaseAmqp implements DequeuerInterface
8
{
9
    /** @var int */
10
    protected $target;
11
12
    /** @var int */
13
    protected $consumed = 0;
14
15
    /** @var callable */
16
    protected $callback;
17
18
    /** @var bool */
19
    protected $forceStop = false;
20
21
    /** @var int */
22
    protected $idleTimeout = 0;
23
24
    /** @var int */
25
    protected $idleTimeoutExitCode;
26
27 15
    public function setCallback($callback)
28
    {
29 15
        $this->callback = $callback;
30 15
    }
31
32
    /**
33
     * @return callable
34
     */
35
    public function getCallback()
36
    {
37
        return $this->callback;
38
    }
39
40
    /**
41
     * @param int $msgAmount
42
     * @throws \ErrorException
43
     */
44
    public function start($msgAmount = 0)
45
    {
46
        $this->target = $msgAmount;
47
48
        $this->setupConsumer();
49
50
        while ($this->getChannel()->is_consuming()) {
51
            $this->getChannel()->wait();
52
        }
53
    }
54
55
    /**
56
     * Tell the server you are going to stop consuming.
57
     *
58
     * It will finish up the last message and not send you any more.
59
     */
60
    public function stopConsuming()
61
    {
62
        // This gets stuck and will not exit without the last two parameters set.
63
        $this->getChannel()->basic_cancel($this->getConsumerTag(), false, true);
64
    }
65
66 18
    protected function setupConsumer()
67
    {
68 18
        if ($this->autoSetupFabric) {
69
            $this->setupFabric();
70
        }
71 18
        $this->getChannel()->basic_consume($this->queueOptions['name'], $this->getConsumerTag(), false, $this->consumerOptions['no_ack'], false, false, [$this, 'processMessage']);
72 18
    }
73
74
    public function processMessage(AMQPMessage $msg)
0 ignored issues
show
The parameter $msg is not used and could be removed. ( Ignorable by Annotation )

If this is a false-positive, you can also ignore this issue in your code via the ignore-unused  annotation

74
    public function processMessage(/** @scrutinizer ignore-unused */ AMQPMessage $msg)

This check looks for parameters that have been defined for a function or method, but which are not used in the method body.

Loading history...
75
    {
76
        //To be implemented by descendant classes
77
    }
78
79 50
    protected function maybeStopConsumer()
80
    {
81 50
        if (extension_loaded('pcntl') && (defined('AMQP_WITHOUT_SIGNALS') ? !AMQP_WITHOUT_SIGNALS : true)) {
82 50
            if (!function_exists('pcntl_signal_dispatch')) {
83
                throw new \BadFunctionCallException("Function 'pcntl_signal_dispatch' is referenced in the php.ini 'disable_functions' and can't be called.");
84
            }
85
86 50
            pcntl_signal_dispatch();
87
        }
88
89 50
        if ($this->forceStop || ($this->consumed == $this->target && $this->target > 0)) {
90
            $this->stopConsuming();
91
        }
92 50
    }
93
94
    public function setConsumerTag($tag)
95
    {
96
        $this->consumerTag = $tag;
97
    }
98
99 18
    public function getConsumerTag()
100
    {
101 18
        return $this->consumerTag;
102
    }
103
104
    public function forceStopConsumer()
105
    {
106
        $this->forceStop = true;
107
    }
108
109
    /**
110
     * Sets the qos settings for the current channel
111
     * Consider that prefetchSize and global do not work with rabbitMQ version <= 8.0
112
     *
113
     * @param int $prefetchSize
114
     * @param int $prefetchCount
115
     * @param bool $global
116
     */
117
    public function setQosOptions($prefetchSize = 0, $prefetchCount = 0, $global = false)
118
    {
119
        $this->getChannel()->basic_qos($prefetchSize, $prefetchCount, $global);
120
    }
121
122 9
    public function setIdleTimeout($idleTimeout)
123
    {
124 9
        $this->idleTimeout = $idleTimeout;
125 9
    }
126
127
    /**
128
     * Set exit code to be returned when there is a timeout exception
129
     *
130
     * @param int|null $idleTimeoutExitCode
131
     */
132 5
    public function setIdleTimeoutExitCode($idleTimeoutExitCode)
133
    {
134 5
        $this->idleTimeoutExitCode = $idleTimeoutExitCode;
135 5
    }
136
137 19
    public function getIdleTimeout()
138
    {
139 19
        return $this->idleTimeout;
140
    }
141
142
    /**
143
     * Get exit code to be returned when there is a timeout exception
144
     *
145
     * @return int|null
146
     */
147 7
    public function getIdleTimeoutExitCode()
148
    {
149 7
        return $this->idleTimeoutExitCode;
150
    }
151
152
    /**
153
     * Resets the consumed property.
154
     * Use when you want to call start() or consume() multiple times.
155
     */
156
    public function resetConsumed()
157
    {
158
        $this->consumed = 0;
159
    }
160
}
161