kaliop-uk /
kueueingbundle-stomp
This project does not seem to handle request data directly as such no vulnerable execution paths were found.
include, or for example
via PHP's auto-loading mechanism.
These results are based on our legacy PHP analysis, consider migrating to our new PHP analysis engine instead. Learn more
| 1 | <?php |
||
| 2 | |||
| 3 | namespace Kaliop\Queueing\Plugins\StompBundle\Adapter\Stomp; |
||
| 4 | |||
| 5 | use Kaliop\QueueingBundle\Queue\MessageConsumerInterface; |
||
| 6 | use Kaliop\QueueingBundle\Queue\ConsumerInterface; |
||
| 7 | use Kaliop\QueueingBundle\Queue\SignalHandlingConsumerInterface; |
||
| 8 | use Kaliop\QueueingBundle\Adapter\ForcedStopException; |
||
| 9 | use Psr\Log\LoggerInterface; |
||
| 10 | |||
| 11 | class Consumer extends Stomp implements ConsumerInterface, SignalHandlingConsumerInterface |
||
| 12 | { |
||
| 13 | protected $callback; |
||
| 14 | protected $routingKey; |
||
| 15 | protected $logger; |
||
| 16 | protected $subscribed = false; |
||
| 17 | protected $queueName; |
||
| 18 | protected $subscriptionName; |
||
| 19 | protected $label; |
||
| 20 | protected $forceStop = false; |
||
| 21 | protected $forceStopReason; |
||
| 22 | protected $dispatchSignals = false; |
||
| 23 | protected $memoryLimit = null; |
||
| 24 | |||
| 25 | public function setLogger(LoggerInterface $logger = null) |
||
| 26 | { |
||
| 27 | $this->logger = $logger; |
||
| 28 | |||
| 29 | return $this; |
||
| 30 | } |
||
| 31 | |||
| 32 | /** |
||
| 33 | * @param int $limit MB |
||
| 34 | * @return $this |
||
| 35 | */ |
||
| 36 | public function setMemoryLimit($limit) |
||
| 37 | { |
||
| 38 | $this->memoryLimit = $limit; |
||
| 39 | |||
| 40 | return $this; |
||
| 41 | } |
||
| 42 | |||
| 43 | /** |
||
| 44 | * @param string $name |
||
| 45 | * @return $this |
||
| 46 | */ |
||
| 47 | public function setSubscriptionName($name) |
||
| 48 | { |
||
| 49 | $this->subscriptionName = $name; |
||
| 50 | $this->setClientId(); |
||
| 51 | |||
| 52 | return $this; |
||
| 53 | } |
||
| 54 | |||
| 55 | public function setLabel($label) |
||
| 56 | { |
||
| 57 | $this->label = $label; |
||
| 58 | $this->setClientId(); |
||
| 59 | |||
| 60 | return $this; |
||
| 61 | } |
||
| 62 | |||
| 63 | protected function setClientId() |
||
| 64 | { |
||
| 65 | $newId = $this->subscriptionName . ($this->label != '' ? '_' . $this->label : ''); |
||
| 66 | if ($newId != $this->client->clientId) { |
||
| 67 | $this->client->clientId = $newId; |
||
| 68 | $this->subscribed = false; |
||
| 69 | } |
||
| 70 | } |
||
| 71 | |||
| 72 | /** |
||
| 73 | * NB: when changing this, you should change the subscription name as well, otherwise you will get an error for |
||
| 74 | * trying to create a double subscription |
||
| 75 | * |
||
| 76 | * @param string $key |
||
| 77 | * @return $this |
||
| 78 | */ |
||
| 79 | public function setRoutingKey($key) |
||
| 80 | { |
||
| 81 | $this->routingKey = (string)$key; |
||
| 82 | $this->subscribed = false; |
||
| 83 | |||
| 84 | return $this; |
||
| 85 | } |
||
| 86 | |||
| 87 | /** |
||
| 88 | * @param MessageConsumerInterface $callback |
||
| 89 | * @return $this |
||
| 90 | */ |
||
| 91 | public function setCallback($callback) |
||
| 92 | { |
||
| 93 | if (! $callback instanceof \Kaliop\QueueingBundle\Queue\MessageConsumerInterface) { |
||
| 94 | throw new \RuntimeException('Can not set callback to Stomp Consumer, as it is not a MessageConsumerInterface'); |
||
| 95 | } |
||
| 96 | $this->callback = $callback; |
||
| 97 | |||
| 98 | return $this; |
||
| 99 | } |
||
| 100 | |||
| 101 | /** |
||
| 102 | * @param string $queueName |
||
| 103 | * @return $this |
||
| 104 | */ |
||
| 105 | public function setQueueName($queueName) |
||
| 106 | { |
||
| 107 | $this->queueName = $queueName; |
||
| 108 | |||
| 109 | return $this; |
||
| 110 | } |
||
| 111 | |||
| 112 | /** |
||
| 113 | * @param int $amount |
||
| 114 | * @param int $timeout seconds |
||
| 115 | * @return nothing |
||
| 116 | */ |
||
| 117 | public function consume($amount, $timeout=0) |
||
| 118 | { |
||
| 119 | $toConsume = $amount; |
||
| 120 | if ($timeout > 0) { |
||
| 121 | $startTime = time(); |
||
| 122 | $remaining = $timeout; |
||
| 123 | } |
||
| 124 | |||
| 125 | $this->connect(); |
||
| 126 | |||
| 127 | $this->subscribe(); |
||
| 128 | |||
| 129 | while(true) { |
||
| 130 | if ($timeout > 0) { |
||
| 131 | $this->client->setReadTimeout($remaining); |
||
|
0 ignored issues
–
show
|
|||
| 132 | } |
||
| 133 | |||
| 134 | $message = $this->client->readFrame(); |
||
| 135 | |||
| 136 | if ($message !== false) { |
||
| 137 | switch($message->command) |
||
| 138 | { |
||
| 139 | case 'MESSAGE': |
||
| 140 | $this->client->ack($message); |
||
| 141 | $this->callback->receive(new Message($message->body, $message->headers)); |
||
| 142 | |||
| 143 | $toConsume--; |
||
| 144 | if ($toConsume == 0) { |
||
| 145 | return; |
||
| 146 | } |
||
| 147 | break; |
||
| 148 | |||
| 149 | case 'ERROR': |
||
| 150 | throw new \RuntimeException("Stomp server sent error frame: ".$message->body); |
||
| 151 | |||
| 152 | case 'RECEIPT': |
||
| 153 | // do nothing |
||
| 154 | } |
||
| 155 | } |
||
| 156 | |||
| 157 | $this->maybeStopConsumer(); |
||
| 158 | |||
| 159 | if ($timeout > 0 && ($remaining = ($startTime + $timeout - time())) <= 0) { |
||
|
0 ignored issues
–
show
The variable
$startTime does not seem to be defined for all execution paths leading up to this point.
If you define a variable conditionally, it can happen that it is not defined for all execution paths. Let’s take a look at an example: function myFunction($a) {
switch ($a) {
case 'foo':
$x = 1;
break;
case 'bar':
$x = 2;
break;
}
// $x is potentially undefined here.
echo $x;
}
In the above example, the variable $x is defined if you pass “foo” or “bar” as argument for $a. However, since the switch statement has no default case statement, if you pass any other value, the variable $x would be undefined. Available Fixes
Loading history...
|
|||
| 160 | return; |
||
| 161 | } |
||
| 162 | |||
| 163 | } |
||
| 164 | } |
||
| 165 | |||
| 166 | protected function getClientProperties(array $additionalProperties = array(), $command='') |
||
| 167 | { |
||
| 168 | $result = $additionalProperties; |
||
| 169 | |||
| 170 | switch($command) |
||
| 171 | { |
||
| 172 | case 'SUBSCRIBE'; |
||
| 173 | //$result = array_merge(array('persistent' => 'true'), $result); |
||
| 174 | break; |
||
| 175 | } |
||
| 176 | |||
| 177 | return $result; |
||
| 178 | } |
||
| 179 | |||
| 180 | protected function subscribe() |
||
| 181 | { |
||
| 182 | if (!$this->subscribed) { |
||
| 183 | |||
| 184 | $this->client->subscribe( |
||
| 185 | $this->getFullQueueName($this->routingKey), |
||
| 186 | $this->getClientProperties(array(), 'SUBSCRIBE'), |
||
| 187 | true |
||
| 188 | ); |
||
| 189 | |||
| 190 | $this->subscribed = true; |
||
| 191 | } |
||
| 192 | } |
||
| 193 | |||
| 194 | public function setHandleSignals($doHandle) |
||
| 195 | { |
||
| 196 | $this->dispatchSignals = $doHandle; |
||
| 197 | $this->client->setHandleSignals($doHandle); |
||
| 198 | } |
||
| 199 | |||
| 200 | |||
| 201 | public function forceStop($reason = '') |
||
| 202 | { |
||
| 203 | $this->forceStop = true; |
||
| 204 | $this->forceStopReason = $reason; |
||
| 205 | $this->client->forceStop($reason); |
||
| 206 | } |
||
| 207 | |||
| 208 | /** |
||
| 209 | * Dispatches signals and throws an exception if user wants to stop. To be called at execution points when there is no data loss |
||
| 210 | * |
||
| 211 | * @throws ForcedStopException |
||
| 212 | */ |
||
| 213 | protected function maybeStopConsumer() |
||
| 214 | { |
||
| 215 | if ($this->dispatchSignals) { |
||
| 216 | pcntl_signal_dispatch(); |
||
| 217 | } |
||
| 218 | |||
| 219 | if ($this->memoryLimit > 0 && !$this->forceStop && memory_get_usage(true) >= ($this->memoryLimit * 1024 * 1024)) { |
||
| 220 | $this->forceStop("Memory limit of {$this->memoryLimit} MB reached while consuming messages"); |
||
| 221 | } |
||
| 222 | |||
| 223 | if ($this->forceStop) { |
||
| 224 | throw new ForcedStopException($this->forceStopReason); |
||
| 225 | } |
||
| 226 | } |
||
| 227 | } |
||
| 228 |
If you define a variable conditionally, it can happen that it is not defined for all execution paths.
Let’s take a look at an example:
In the above example, the variable $x is defined if you pass “foo” or “bar” as argument for $a. However, since the switch statement has no default case statement, if you pass any other value, the variable $x would be undefined.
Available Fixes
Check for existence of the variable explicitly:
Define a default value for the variable:
Add a value for the missing path: