Consumer::consume()   B
last analyzed

Complexity

Conditions 11
Paths 42

Size

Total Lines 48

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
dl 0
loc 48
rs 7.3166
c 0
b 0
f 0
cc 11
nc 42
nop 2

How to fix   Complexity   

Long Method

Small methods make your code easier to understand, in particular if combined with a good name. Besides, if your method is small, finding a good name is usually much easier.

For example, if you find yourself adding comments to a method's body, this is usually a good sign to extract the commented part to a new method, and use the comment as a starting point when coming up with a good name for this new method.

Commonly applied refactorings include:

1
<?php
2
3
namespace Kaliop\Queueing\Plugins\StompBundle\Adapter\Stomp;
4
5
use Kaliop\QueueingBundle\Queue\MessageConsumerInterface;
6
use Kaliop\QueueingBundle\Queue\ConsumerInterface;
7
use Kaliop\QueueingBundle\Queue\SignalHandlingConsumerInterface;
8
use Kaliop\QueueingBundle\Adapter\ForcedStopException;
9
use Psr\Log\LoggerInterface;
10
11
class Consumer extends Stomp implements ConsumerInterface, SignalHandlingConsumerInterface
12
{
13
    protected $callback;
14
    protected $routingKey;
15
    protected $logger;
16
    protected $subscribed = false;
17
    protected $queueName;
18
    protected $subscriptionName;
19
    protected $label;
20
    protected $forceStop = false;
21
    protected $forceStopReason;
22
    protected $dispatchSignals = false;
23
    protected $memoryLimit = null;
24
25
    public function setLogger(LoggerInterface $logger = null)
26
    {
27
        $this->logger = $logger;
28
29
        return $this;
30
    }
31
32
    /**
33
     * @param int $limit MB
34
     * @return $this
35
     */
36
    public function setMemoryLimit($limit)
37
    {
38
        $this->memoryLimit = $limit;
39
40
        return $this;
41
    }
42
43
    /**
44
     * @param string $name
45
     * @return $this
46
     */
47
    public function setSubscriptionName($name)
48
    {
49
        $this->subscriptionName = $name;
50
        $this->setClientId();
51
52
        return $this;
53
    }
54
55
    public function setLabel($label)
56
    {
57
        $this->label = $label;
58
        $this->setClientId();
59
60
        return $this;
61
    }
62
63
    protected function setClientId()
64
    {
65
        $newId = $this->subscriptionName . ($this->label != '' ? '_' . $this->label : '');
66
        if ($newId != $this->client->clientId) {
67
            $this->client->clientId = $newId;
68
            $this->subscribed = false;
69
        }
70
    }
71
72
    /**
73
     * NB: when changing this, you should change the subscription name as well, otherwise you will get an error for
74
     * trying to create a double subscription
75
     *
76
     * @param string $key
77
     * @return $this
78
     */
79
    public function setRoutingKey($key)
80
    {
81
        $this->routingKey = (string)$key;
82
        $this->subscribed = false;
83
84
        return $this;
85
    }
86
87
    /**
88
     * @param MessageConsumerInterface $callback
89
     * @return $this
90
     */
91
    public function setCallback($callback)
92
    {
93
        if (! $callback instanceof \Kaliop\QueueingBundle\Queue\MessageConsumerInterface) {
94
            throw new \RuntimeException('Can not set callback to Stomp Consumer, as it is not a MessageConsumerInterface');
95
        }
96
        $this->callback = $callback;
97
98
        return $this;
99
    }
100
101
    /**
102
     * @param string $queueName
103
     * @return $this
104
     */
105
    public function setQueueName($queueName)
106
    {
107
        $this->queueName = $queueName;
108
109
        return $this;
110
    }
111
112
    /**
113
     * @param int $amount
114
     * @param int $timeout seconds
115
     * @return nothing
116
     */
117
    public function consume($amount, $timeout=0)
118
    {
119
        $toConsume = $amount;
120
        if ($timeout > 0) {
121
            $startTime = time();
122
            $remaining = $timeout;
123
        }
124
125
        $this->connect();
126
127
        $this->subscribe();
128
129
        while(true) {
130
            if ($timeout > 0) {
131
                $this->client->setReadTimeout($remaining);
0 ignored issues
show
Bug introduced by
The variable $remaining does not seem to be defined for all execution paths leading up to this point.

If you define a variable conditionally, it can happen that it is not defined for all execution paths.

Let’s take a look at an example:

function myFunction($a) {
    switch ($a) {
        case 'foo':
            $x = 1;
            break;

        case 'bar':
            $x = 2;
            break;
    }

    // $x is potentially undefined here.
    echo $x;
}

In the above example, the variable $x is defined if you pass “foo” or “bar” as argument for $a. However, since the switch statement has no default case statement, if you pass any other value, the variable $x would be undefined.

Available Fixes

  1. Check for existence of the variable explicitly:

    function myFunction($a) {
        switch ($a) {
            case 'foo':
                $x = 1;
                break;
    
            case 'bar':
                $x = 2;
                break;
        }
    
        if (isset($x)) { // Make sure it's always set.
            echo $x;
        }
    }
    
  2. Define a default value for the variable:

    function myFunction($a) {
        $x = ''; // Set a default which gets overridden for certain paths.
        switch ($a) {
            case 'foo':
                $x = 1;
                break;
    
            case 'bar':
                $x = 2;
                break;
        }
    
        echo $x;
    }
    
  3. Add a value for the missing path:

    function myFunction($a) {
        switch ($a) {
            case 'foo':
                $x = 1;
                break;
    
            case 'bar':
                $x = 2;
                break;
    
            // We add support for the missing case.
            default:
                $x = '';
                break;
        }
    
        echo $x;
    }
    
Loading history...
132
            }
133
134
            $message = $this->client->readFrame();
135
136
            if ($message !== false) {
137
                switch($message->command)
138
                {
139
                    case 'MESSAGE':
140
                        $this->client->ack($message);
141
                        $this->callback->receive(new Message($message->body, $message->headers));
142
143
                        $toConsume--;
144
                        if ($toConsume == 0) {
145
                            return;
146
                        }
147
                        break;
148
149
                    case 'ERROR':
150
                        throw new \RuntimeException("Stomp server sent error frame: ".$message->body);
151
152
                    case 'RECEIPT':
153
                        // do nothing
154
                }
155
            }
156
157
            $this->maybeStopConsumer();
158
159
            if ($timeout > 0 && ($remaining = ($startTime + $timeout - time())) <= 0) {
0 ignored issues
show
Bug introduced by
The variable $startTime does not seem to be defined for all execution paths leading up to this point.

If you define a variable conditionally, it can happen that it is not defined for all execution paths.

Let’s take a look at an example:

function myFunction($a) {
    switch ($a) {
        case 'foo':
            $x = 1;
            break;

        case 'bar':
            $x = 2;
            break;
    }

    // $x is potentially undefined here.
    echo $x;
}

In the above example, the variable $x is defined if you pass “foo” or “bar” as argument for $a. However, since the switch statement has no default case statement, if you pass any other value, the variable $x would be undefined.

Available Fixes

  1. Check for existence of the variable explicitly:

    function myFunction($a) {
        switch ($a) {
            case 'foo':
                $x = 1;
                break;
    
            case 'bar':
                $x = 2;
                break;
        }
    
        if (isset($x)) { // Make sure it's always set.
            echo $x;
        }
    }
    
  2. Define a default value for the variable:

    function myFunction($a) {
        $x = ''; // Set a default which gets overridden for certain paths.
        switch ($a) {
            case 'foo':
                $x = 1;
                break;
    
            case 'bar':
                $x = 2;
                break;
        }
    
        echo $x;
    }
    
  3. Add a value for the missing path:

    function myFunction($a) {
        switch ($a) {
            case 'foo':
                $x = 1;
                break;
    
            case 'bar':
                $x = 2;
                break;
    
            // We add support for the missing case.
            default:
                $x = '';
                break;
        }
    
        echo $x;
    }
    
Loading history...
160
                return;
161
            }
162
163
        }
164
    }
165
166
    protected function getClientProperties(array $additionalProperties = array(), $command='')
167
    {
168
        $result = $additionalProperties;
169
170
        switch($command)
171
        {
172
            case 'SUBSCRIBE';
173
                //$result = array_merge(array('persistent' => 'true'), $result);
174
                break;
175
        }
176
177
        return $result;
178
    }
179
180
    protected function subscribe()
181
    {
182
        if (!$this->subscribed) {
183
184
            $this->client->subscribe(
185
                $this->getFullQueueName($this->routingKey),
186
                $this->getClientProperties(array(), 'SUBSCRIBE'),
187
                true
188
            );
189
190
            $this->subscribed = true;
191
        }
192
    }
193
194
    public function setHandleSignals($doHandle)
195
    {
196
        $this->dispatchSignals = $doHandle;
197
        $this->client->setHandleSignals($doHandle);
198
    }
199
200
201
    public function forceStop($reason = '')
202
    {
203
        $this->forceStop = true;
204
        $this->forceStopReason = $reason;
205
        $this->client->forceStop($reason);
206
    }
207
208
    /**
209
     * Dispatches signals and throws an exception if user wants to stop. To be called at execution points when there is no data loss
210
     *
211
     * @throws ForcedStopException
212
     */
213
    protected function maybeStopConsumer()
214
    {
215
        if ($this->dispatchSignals) {
216
            pcntl_signal_dispatch();
217
        }
218
219
        if ($this->memoryLimit > 0 && !$this->forceStop && memory_get_usage(true) >= ($this->memoryLimit * 1024 * 1024)) {
220
            $this->forceStop("Memory limit of {$this->memoryLimit} MB reached while consuming messages");
221
        }
222
223
        if ($this->forceStop) {
224
            throw new ForcedStopException($this->forceStopReason);
225
        }
226
    }
227
}
228