Completed
Push — master ( 1eebc9...96a719 )
by Gaetano
04:19
created

Consumer::maybeStopConsumer()   B

Complexity

Conditions 6
Paths 8

Size

Total Lines 14
Code Lines 7

Duplication

Lines 0
Ratio 0 %

Importance

Changes 2
Bugs 0 Features 2
Metric Value
c 2
b 0
f 2
dl 0
loc 14
rs 8.8571
cc 6
eloc 7
nc 8
nop 0
1
<?php
2
3
namespace Kaliop\Queueing\Plugins\StompBundle\Adapter\Stomp;
4
5
use Kaliop\QueueingBundle\Queue\MessageConsumerInterface;
6
use Kaliop\QueueingBundle\Queue\ConsumerInterface;
7
use Kaliop\QueueingBundle\Queue\SignalHandlingConsumerInterface;
8
use Kaliop\QueueingBundle\Adapter\ForcedStopException;
9
use Psr\Log\LoggerInterface;
10
11
class Consumer extends Stomp implements ConsumerInterface, SignalHandlingConsumerInterface
12
{
13
    protected $callback;
14
    protected $routingKey;
15
    protected $logger;
16
    protected $subscribed = false;
17
    protected $queueName;
18
    protected $subscriptionName;
19
    protected $label;
20
    protected $forceStop = false;
21
    protected $forceStopReason;
22
    protected $dispatchSignals = false;
23
    protected $memoryLimit = null;
24
25
    public function setLogger(LoggerInterface $logger = null)
26
    {
27
        $this->logger = $logger;
28
29
        return $this;
30
    }
31
32
    /**
33
     * @param int $limit MB
34
     * @return $this
35
     */
36
    public function setMemoryLimit($limit)
37
    {
38
        $this->memoryLimit = $limit;
39
40
        return $this;
41
    }
42
43
    /**
44
     * @param string $name
45
     * @return $this
46
     */
47
    public function setSubscriptionName($name)
48
    {
49
        $this->subscriptionName = $name;
50
        $this->setClientId();
51
52
        return $this;
53
    }
54
55
    public function setLabel($label)
56
    {
57
        $this->label = $label;
58
        $this->setClientId();
59
60
        return $this;
61
    }
62
63
    protected function setClientId()
64
    {
65
        $newId = $this->subscriptionName . ($this->label != '' ? '_' . $this->label : '');
66
        if ($newId != $this->client->clientId) {
67
            $this->client->clientId = $newId;
68
            $this->subscribed = false;
69
        }
70
    }
71
72
    /**
73
     * NB: when changing this, you should change the subscription name as well, otherwise you will get an error for
74
     * trying to create a double subscription
75
     *
76
     * @param string $key
77
     * @return $this
78
     */
79
    public function setRoutingKey($key)
80
    {
81
        $this->routingKey = (string)$key;
82
        $this->subscribed = false;
83
84
        return $this;
85
    }
86
87
    /**
88
     * @param MessageConsumerInterface $callback
89
     * @return $this
90
     */
91
    public function setCallback($callback)
92
    {
93
        if (! $callback instanceof \Kaliop\QueueingBundle\Queue\MessageConsumerInterface) {
94
            throw new \RuntimeException('Can not set callback to Stomp Consumer, as it is not a MessageConsumerInterface');
95
        }
96
        $this->callback = $callback;
97
98
        return $this;
99
    }
100
101
    /**
102
     * @param string $queueName
103
     * @return $this
104
     */
105
    public function setQueueName($queueName)
106
    {
107
        $this->queueName = $queueName;
108
109
        return $this;
110
    }
111
112
    /**
113
     * @param int $amount
114
     * @param int $timeout seconds
115
     * @return nothing
116
     */
117
    public function consume($amount, $timeout=0)
118
    {
119
        $toConsume = $amount;
120
        if ($timeout > 0) {
121
            $startTime = time();
122
            $remaining = $timeout;
123
        }
124
125
        $this->connect();
126
127
        $this->subscribe();
128
129
        while(true) {
130
            if ($timeout > 0) {
131
                $this->client->setReadTimeout($remaining);
0 ignored issues
show
Bug introduced by
The variable $remaining does not seem to be defined for all execution paths leading up to this point.

If you define a variable conditionally, it can happen that it is not defined for all execution paths.

Let’s take a look at an example:

function myFunction($a) {
    switch ($a) {
        case 'foo':
            $x = 1;
            break;

        case 'bar':
            $x = 2;
            break;
    }

    // $x is potentially undefined here.
    echo $x;
}

In the above example, the variable $x is defined if you pass “foo” or “bar” as argument for $a. However, since the switch statement has no default case statement, if you pass any other value, the variable $x would be undefined.

Available Fixes

  1. Check for existence of the variable explicitly:

    function myFunction($a) {
        switch ($a) {
            case 'foo':
                $x = 1;
                break;
    
            case 'bar':
                $x = 2;
                break;
        }
    
        if (isset($x)) { // Make sure it's always set.
            echo $x;
        }
    }
    
  2. Define a default value for the variable:

    function myFunction($a) {
        $x = ''; // Set a default which gets overridden for certain paths.
        switch ($a) {
            case 'foo':
                $x = 1;
                break;
    
            case 'bar':
                $x = 2;
                break;
        }
    
        echo $x;
    }
    
  3. Add a value for the missing path:

    function myFunction($a) {
        switch ($a) {
            case 'foo':
                $x = 1;
                break;
    
            case 'bar':
                $x = 2;
                break;
    
            // We add support for the missing case.
            default:
                $x = '';
                break;
        }
    
        echo $x;
    }
    
Loading history...
132
            }
133
134
            $message = $this->client->readFrame();
135
136
            if ($message !== false) {
137
                switch($message->command)
138
                {
139
                    case 'MESSAGE':
140
                        $this->client->ack($message);
141
                        $this->callback->receive(new Message($message->body, $message->headers));
142
143
                        $toConsume--;
144
                        if ($toConsume == 0) {
145
                            return;
146
                        }
147
                        break;
148
149
                    case 'ERROR':
150
                        throw new \RuntimeException("Stomp server sent error frame: ".$message->body);
151
152
                    case 'RECEIPT':
153
                        // do nothing
154
                }
155
            }
156
157
            $this->maybeStopConsumer();
158
159
            if ($timeout > 0 && ($remaining = ($startTime + $timeout - time())) <= 0) {
0 ignored issues
show
Bug introduced by
The variable $startTime does not seem to be defined for all execution paths leading up to this point.

If you define a variable conditionally, it can happen that it is not defined for all execution paths.

Let’s take a look at an example:

function myFunction($a) {
    switch ($a) {
        case 'foo':
            $x = 1;
            break;

        case 'bar':
            $x = 2;
            break;
    }

    // $x is potentially undefined here.
    echo $x;
}

In the above example, the variable $x is defined if you pass “foo” or “bar” as argument for $a. However, since the switch statement has no default case statement, if you pass any other value, the variable $x would be undefined.

Available Fixes

  1. Check for existence of the variable explicitly:

    function myFunction($a) {
        switch ($a) {
            case 'foo':
                $x = 1;
                break;
    
            case 'bar':
                $x = 2;
                break;
        }
    
        if (isset($x)) { // Make sure it's always set.
            echo $x;
        }
    }
    
  2. Define a default value for the variable:

    function myFunction($a) {
        $x = ''; // Set a default which gets overridden for certain paths.
        switch ($a) {
            case 'foo':
                $x = 1;
                break;
    
            case 'bar':
                $x = 2;
                break;
        }
    
        echo $x;
    }
    
  3. Add a value for the missing path:

    function myFunction($a) {
        switch ($a) {
            case 'foo':
                $x = 1;
                break;
    
            case 'bar':
                $x = 2;
                break;
    
            // We add support for the missing case.
            default:
                $x = '';
                break;
        }
    
        echo $x;
    }
    
Loading history...
160
                return;
161
            }
162
163
        }
164
    }
165
166
    protected function getClientProperties(array $additionalProperties = array(), $command='')
167
    {
168
        $result = $additionalProperties;
169
170
        switch($command)
171
        {
172
            case 'SUBSCRIBE';
0 ignored issues
show
Coding Style introduced by
case statements should not use curly braces.

As per the PSR-2 coding standard, case statements should not be wrapped in curly braces. There is no need for braces, since each case is terminated by the next break.

switch ($expr) {
    case "A": { //wrong
        doSomething();
        break;
    }
    case "B": //right
        doSomething();
        break;
}

To learn more about the PSR-2 coding standard, please refer to the PHP-Fig.

Loading history...
173
                //$result = array_merge(array('persistent' => 'true'), $result);
0 ignored issues
show
Unused Code Comprehensibility introduced by
64% of this comment could be valid code. Did you maybe forget this after debugging?

Sometimes obsolete code just ends up commented out instead of removed. In this case it is better to remove the code once you have checked you do not need it.

The code might also have been commented out for debugging purposes. In this case it is vital that someone uncomments it again or your project may behave in very unexpected ways in production.

This check looks for comments that seem to be mostly valid code and reports them.

Loading history...
174
                break;
175
        }
176
177
        return $result;
178
    }
179
180
    protected function subscribe()
181
    {
182
        if (!$this->subscribed) {
183
184
            $this->client->subscribe(
185
                $this->getFullQueueName($this->routingKey),
186
                $this->getClientProperties(array(), 'SUBSCRIBE'),
187
                true
188
            );
189
190
            $this->subscribed = true;
191
        }
192
    }
193
194
    public function setHandleSignals($doHandle)
195
    {
196
        $this->dispatchSignals = $doHandle;
197
        $this->client->setHandleSignals($doHandle);
198
    }
199
200
201
    public function forceStop($reason = '')
202
    {
203
        $this->forceStop = true;
204
        $this->forceStopReason = $reason;
205
        $this->client->forceStop($reason);
206
    }
207
208
    /**
209
     * Dispatches signals and throws an exception if user wants to stop. To be called at execution points when there is no data loss
210
     *
211
     * @throws ForcedStopException
212
     */
213
    protected function maybeStopConsumer()
214
    {
215
        if ($this->dispatchSignals) {
216
            pcntl_signal_dispatch();
217
        }
218
219
        if ($this->memoryLimit > 0 && !$this->forceStop && memory_get_usage(true) >= ($this->memoryLimit * 1024 * 1024)) {
220
            $this->forceStop("Memory limit of {$this->memoryLimit} MB reached while consuming messages");
221
        }
222
223
        if ($this->forceStop) {
224
            throw new ForcedStopException($this->forceStopReason);
225
        }
226
    }
227
}
228