This project does not seem to handle request data directly as such no vulnerable execution paths were found.
include
, or for example
via PHP's auto-loading mechanism.
These results are based on our legacy PHP analysis, consider migrating to our new PHP analysis engine instead. Learn more
1 | <?php |
||
2 | |||
3 | namespace Kaliop\Queueing\Plugins\SQSBundle\Adapter\SQS; |
||
4 | |||
5 | use Kaliop\QueueingBundle\Queue\MessageConsumerInterface; |
||
6 | use Kaliop\QueueingBundle\Queue\ConsumerInterface; |
||
7 | use Kaliop\QueueingBundle\Queue\SignalHandlingConsumerInterface; |
||
8 | use Kaliop\QueueingBundle\Adapter\ForcedStopException; |
||
9 | use Aws\Sqs\SqsClient; |
||
10 | use Aws\TraceMiddleware; |
||
11 | use Psr\Log\LoggerInterface; |
||
12 | |||
13 | /** |
||
14 | * @todo support using short polling even when given a total timeout - even though it will complicate consume() even more than it already is |
||
15 | */ |
||
16 | class Consumer implements ConsumerInterface, SignalHandlingConsumerInterface |
||
17 | { |
||
18 | /** @var \Aws\Sqs\SqsClient */ |
||
19 | protected $client; |
||
20 | protected $queueUrl; |
||
21 | protected $queueName; |
||
22 | protected $callback; |
||
23 | |||
24 | protected $routingKey; |
||
25 | protected $routingKeyRegexp; |
||
26 | protected $logger; |
||
27 | // The message attribute used to store content-type. To be kept in sync with the Producer |
||
28 | protected $contentTypeAttribute = 'contentType'; |
||
29 | protected $routingAttribute = 'routingKey'; |
||
30 | protected $debug = false; |
||
31 | protected $forceStop = false; |
||
32 | protected $forceStopReason; |
||
33 | protected $dispatchSignals = false; |
||
34 | protected $memoryLimit = null; |
||
35 | // NB: when changing the defaults below, alter as well KaliopQueueingPluginsSQSExtension::load |
||
36 | /** @var int $requestBatchSize how many messages to receive in each poll by default */ |
||
37 | protected $requestBatchSize = 1; |
||
38 | /** @var int $requestTimeout how long to wait for messages in each request. Switches between long and short polling */ |
||
39 | protected $requestTimeout = 0; |
||
40 | /** @var int the minimum interval between two queue polls - in microseconds */ |
||
41 | protected $pollingIntervalUs = 200000; |
||
42 | /** @var int $gcProbability the probability of calling gc_collect_cycles at the end of every poll */ |
||
43 | protected $gcProbability = 1; |
||
44 | |||
45 | const MAX_MESSAGES_PER_REQUEST = 10; |
||
46 | const MAX_REQUEST_TIMEOUT = 20; |
||
47 | |||
48 | 10 | public function __construct(array $config) |
|
49 | { |
||
50 | 10 | $this->client = new SqsClient($config); |
|
51 | 10 | } |
|
52 | |||
53 | public function setLogger(LoggerInterface $logger = null) |
||
54 | { |
||
55 | $this->logger = $logger; |
||
56 | |||
57 | return $this; |
||
58 | } |
||
59 | |||
60 | /** |
||
61 | * Enabled debug. At the moment can not disable it |
||
62 | * @param bool|array $debug |
||
63 | * @return $this |
||
64 | * |
||
65 | * @todo test if using $handlerList->removeByInstance we can disable debug as well |
||
66 | */ |
||
67 | 6 | View Code Duplication | public function setDebug($debug) |
0 ignored issues
–
show
|
|||
68 | { |
||
69 | 6 | if ($debug == $this->debug) { |
|
70 | 6 | return $this; |
|
71 | } |
||
72 | if ($debug) { |
||
73 | $handlerList = $this->client->getHandlerList(); |
||
74 | $handlerList->interpose(new TraceMiddleware($debug === true ? [] : $debug)); |
||
0 ignored issues
–
show
It seems like
$debug === true ? array() : $debug can also be of type boolean ; however, Aws\TraceMiddleware::__construct() does only seem to accept array , maybe add an additional type check?
If a method or function can return multiple different values and unless you are sure that you only can receive a single value in this context, we recommend to add an additional type check: /**
* @return array|string
*/
function returnsDifferentValues($x) {
if ($x) {
return 'foo';
}
return array();
}
$x = returnsDifferentValues($y);
if (is_array($x)) {
// $x is an array.
}
If this a common case that PHP Analyzer should handle natively, please let us know by opening an issue. ![]() |
|||
75 | } |
||
76 | |||
77 | return $this; |
||
78 | } |
||
79 | |||
80 | /** |
||
81 | * @param int $limit MB |
||
82 | * @return Consumer |
||
83 | */ |
||
84 | public function setMemoryLimit($limit) |
||
85 | { |
||
86 | $this->memoryLimit = $limit; |
||
87 | |||
88 | return $this; |
||
89 | } |
||
90 | |||
91 | /** |
||
92 | * @param string $key |
||
93 | * @return Consumer |
||
94 | */ |
||
95 | 10 | public function setRoutingKey($key) |
|
96 | { |
||
97 | 10 | $this->routingKey = (string)$key; |
|
98 | 10 | $this->routingKeyRegexp = '/'.str_replace(array('\*', '#'), array('[^.]*', '.*'), preg_quote($this->routingKey, '/')).'/'; |
|
99 | 10 | return $this; |
|
100 | } |
||
101 | |||
102 | /** |
||
103 | * @param MessageConsumerInterface $callback |
||
104 | * @return Consumer |
||
105 | */ |
||
106 | 6 | public function setCallback($callback) |
|
107 | { |
||
108 | 6 | if (! $callback instanceof \Kaliop\QueueingBundle\Queue\MessageConsumerInterface) { |
|
109 | throw new \RuntimeException('Can not set callback to SQS Consumer, as it is not a MessageConsumerInterface'); |
||
110 | } |
||
111 | 6 | $this->callback = $callback; |
|
112 | |||
113 | 6 | return $this; |
|
114 | } |
||
115 | |||
116 | 10 | public function setQueueName($queueName) |
|
117 | { |
||
118 | 10 | $this->queueName = $queueName; |
|
119 | |||
120 | 10 | return $this; |
|
121 | } |
||
122 | |||
123 | /** |
||
124 | * The number of messages to download in every request to the SQS API. |
||
125 | * Bigger numbers are better for performances, but there is a limit on the size of the response which SQS will send. |
||
126 | * @param int $amount |
||
127 | * @return Consumer |
||
128 | */ |
||
129 | public function setRequestBatchSize($amount) |
||
130 | { |
||
131 | $this->requestBatchSize = $amount; |
||
132 | |||
133 | return $this; |
||
134 | } |
||
135 | |||
136 | public function setRequestTimeout($timeout) |
||
137 | { |
||
138 | $this->requestTimeout = $timeout; |
||
139 | |||
140 | return $this; |
||
141 | } |
||
142 | |||
143 | public function setPollingInterval($intervalUs) |
||
144 | { |
||
145 | $this->pollingIntervalUs = $intervalUs; |
||
146 | |||
147 | return $this; |
||
148 | } |
||
149 | |||
150 | public function setGCProbability($probability) |
||
151 | { |
||
152 | $this->gcProbability = $probability; |
||
153 | |||
154 | return $this; |
||
155 | } |
||
156 | |||
157 | /** |
||
158 | * @see http://docs.aws.amazon.com/aws-sdk-php/v3/api/api-sqs-2012-11-05.html#receivemessage |
||
159 | * |
||
160 | * @param int $amount 0 for unlimited |
||
161 | * @param int $timeout seconds 0 for unlimited. NB: any value > 0 activates 'long polling' mode |
||
162 | * @return void |
||
163 | */ |
||
164 | 6 | public function consume($amount, $timeout = 0) |
|
165 | { |
||
166 | 6 | if ($timeout > 0) { |
|
167 | 6 | $endTime = time() + $timeout; |
|
168 | 6 | $remainingTime = $timeout; |
|
169 | } |
||
170 | |||
171 | 6 | $received = 0; |
|
172 | |||
173 | $receiveParams = array( |
||
174 | 6 | 'QueueUrl' => $this->queueUrl, |
|
175 | 'AttributeNames' => array('All'), |
||
176 | 'MessageAttributeNames' => array('All') |
||
177 | ); |
||
178 | |||
179 | 6 | while(true) { |
|
180 | 6 | $reqTime = microtime(true); |
|
181 | |||
182 | 6 | if ($timeout > 0) { |
|
183 | 6 | $wait = $remainingTime; |
|
0 ignored issues
–
show
The variable
$remainingTime does not seem to be defined for all execution paths leading up to this point.
If you define a variable conditionally, it can happen that it is not defined for all execution paths. Let’s take a look at an example: function myFunction($a) {
switch ($a) {
case 'foo':
$x = 1;
break;
case 'bar':
$x = 2;
break;
}
// $x is potentially undefined here.
echo $x;
}
In the above example, the variable $x is defined if you pass “foo” or “bar” as argument for $a. However, since the switch statement has no default case statement, if you pass any other value, the variable $x would be undefined. Available Fixes
![]() |
|||
184 | |||
185 | 6 | if ($wait > $this->requestTimeout) { |
|
186 | 6 | $wait = $this->requestTimeout; |
|
187 | } |
||
188 | } else { |
||
189 | $wait = $this->requestTimeout; |
||
190 | } |
||
191 | |||
192 | // we leave it up to the API to fail |
||
193 | //if ($wait > static::MAX_REQUEST_TIMEOUT) { |
||
194 | // $wait = static::MAX_REQUEST_TIMEOUT; |
||
195 | //} |
||
196 | |||
197 | 6 | if ($wait > 0) { |
|
198 | // according to the spec, this is maximum wait time. If messages are available sooner, they get delivered immediately |
||
199 | $receiveParams['WaitTimeSeconds'] = $wait; |
||
200 | } else { |
||
201 | 6 | if (isset($receiveParams['WaitTimeSeconds'])) { |
|
202 | unset($receiveParams['WaitTimeSeconds']); |
||
203 | } |
||
204 | } |
||
205 | |||
206 | 6 | if ($amount > 0) { |
|
207 | 6 | $limit = $amount - $received; |
|
208 | |||
209 | 6 | if ($limit > $this->requestBatchSize) { |
|
210 | 6 | $limit = $this->requestBatchSize; |
|
211 | } |
||
212 | } else { |
||
213 | $limit = $this->requestBatchSize; |
||
214 | } |
||
215 | |||
216 | // we leave it up to the API to fial |
||
217 | //if ($limit > static::MAX_MESSAGES_PER_REQUEST) { |
||
218 | // $limit = static::MAX_MESSAGES_PER_REQUEST; |
||
219 | //} |
||
220 | |||
221 | 6 | $receiveParams['MaxNumberOfMessages'] = $limit; |
|
222 | |||
223 | 6 | $result = $this->client->receiveMessage($receiveParams); |
|
224 | 6 | $messages = $result->get('Messages'); |
|
225 | |||
226 | 6 | if (is_array($messages)) { |
|
227 | 6 | foreach($messages as $message) { |
|
228 | |||
229 | // How we implement routing keys with SQS: since it is not supported natively, we check if the route |
||
230 | // matches after having downloaded the message. If it does not match, we just skip processing it. |
||
231 | // Since we will not call deleteMessage, SQS will requeue the message in a short time. |
||
232 | // This is far from optimal, but it might be better than nothing |
||
233 | 6 | if (! $this->matchRoutingKey($message)) { |
|
234 | 1 | continue; |
|
235 | } |
||
236 | |||
237 | 6 | $received++; |
|
238 | |||
239 | // removing the message from the queue is manual with SQS |
||
240 | 6 | $this->client->deleteMessage(array( |
|
241 | 6 | 'QueueUrl' => $this->queueUrl, |
|
242 | 6 | 'ReceiptHandle' => $message['ReceiptHandle'] |
|
243 | )); |
||
244 | |||
245 | 6 | $data = $message['Body']; |
|
246 | 6 | unset($message['Body']); |
|
247 | |||
248 | 6 | $contentType = isset( $message['MessageAttributes'][$this->contentTypeAttribute]['StringValue'] ) ? |
|
249 | 6 | $message['MessageAttributes'][$this->contentTypeAttribute]['StringValue'] : ''; |
|
250 | |||
251 | 6 | if ($contentType != '') { |
|
252 | 6 | $this->callback->receive(new Message($data, $message, $contentType, $this->queueName)); |
|
253 | } else { |
||
254 | if ($this->logger) { |
||
255 | $this->logger->warning('The SQS Consumer received a message with no content-type attribute. Assuming default'); |
||
256 | } |
||
257 | |||
258 | 6 | $this->callback->receive(new Message($data, $message, null, $this->queueName)); |
|
259 | } |
||
260 | } |
||
261 | } |
||
262 | |||
263 | 6 | $this->maybeStopConsumer(); |
|
264 | |||
265 | 6 | if ($amount > 0 && $received >= $amount) { |
|
266 | 6 | return; |
|
267 | } |
||
268 | |||
269 | 2 | if ($timeout > 0 && ($remainingTime = ($endTime - time())) <= 0) { |
|
0 ignored issues
–
show
The variable
$endTime does not seem to be defined for all execution paths leading up to this point.
If you define a variable conditionally, it can happen that it is not defined for all execution paths. Let’s take a look at an example: function myFunction($a) {
switch ($a) {
case 'foo':
$x = 1;
break;
case 'bar':
$x = 2;
break;
}
// $x is potentially undefined here.
echo $x;
}
In the above example, the variable $x is defined if you pass “foo” or “bar” as argument for $a. However, since the switch statement has no default case statement, if you pass any other value, the variable $x would be undefined. Available Fixes
![]() |
|||
270 | return; |
||
271 | } |
||
272 | |||
273 | // observe MAX 5 requests per sec per queue by default: sleep for 0.2 secs in between requests |
||
274 | 2 | $passedUs = (microtime(true) - $reqTime) * 1000000; |
|
275 | 2 | if ($passedUs < $this->pollingIntervalUs) { |
|
276 | 2 | usleep($this->pollingIntervalUs - $passedUs); |
|
277 | } |
||
278 | } |
||
279 | } |
||
280 | |||
281 | /** |
||
282 | * Adopt the RabbitMQ routing key algorithm: |
||
283 | * - split on dots |
||
284 | * - * matches one word (q: also empty ones?) |
||
285 | * - # matches any words |
||
286 | * |
||
287 | * @todo the current implementation is naive and does probably not match RabbitMq if the routing key is something like aaa.*b.ccc |
||
288 | * A better implementation would probably involve usage of a trie |
||
289 | * Some pointers on how to implement it fast: http://lists.rabbitmq.com/pipermail/rabbitmq-discuss/2011-June/013564.html |
||
290 | * @see setRoutingKey |
||
291 | * |
||
292 | * @param array $message |
||
293 | * @return bool |
||
294 | */ |
||
295 | 6 | protected function matchRoutingKey(array $message) |
|
296 | { |
||
297 | 6 | if ($this->routingKey === null || $this->routingKey === '') { |
|
298 | 3 | return true; |
|
299 | } |
||
300 | 3 | if (!isset($message['MessageAttributes'][$this->routingAttribute]['StringValue'])) { |
|
301 | if ($this->logger) { |
||
302 | $this->logger->warning('The SQS Consumer has a routing key set, and it received a message without routing information. Processing it anyway'); |
||
303 | } |
||
304 | return true; |
||
305 | } |
||
306 | |||
307 | 3 | return preg_match( |
|
308 | 3 | $this->routingKeyRegexp, |
|
309 | 3 | $message['MessageAttributes'][$this->routingAttribute]['StringValue'] |
|
310 | ); |
||
311 | } |
||
312 | |||
313 | /** |
||
314 | * @param string $queueUrl the complete queue name as used by SQS |
||
315 | * @return Consumer |
||
316 | */ |
||
317 | 10 | public function setQueueUrl($queueUrl) |
|
318 | { |
||
319 | 10 | $this->queueUrl = $queueUrl; |
|
320 | |||
321 | 10 | return $this; |
|
322 | } |
||
323 | |||
324 | /** |
||
325 | * @return string the complete queue name as used by SQS |
||
326 | */ |
||
327 | public function getQueueUrl() |
||
328 | { |
||
329 | return $this->queueUrl; |
||
330 | } |
||
331 | |||
332 | public function setHandleSignals($doHandle) |
||
333 | { |
||
334 | $this->dispatchSignals = $doHandle; |
||
335 | } |
||
336 | |||
337 | |||
338 | public function forceStop($reason = '') |
||
339 | { |
||
340 | $this->forceStop = true; |
||
341 | $this->forceStopReason = $reason; |
||
342 | } |
||
343 | |||
344 | /** |
||
345 | * Dispatches signals and throws an exception if user wants to stop. To be called at execution points when there is no data loss |
||
346 | * |
||
347 | * @throws ForcedStopException |
||
348 | */ |
||
349 | 6 | protected function maybeStopConsumer() |
|
350 | { |
||
351 | 6 | if ($this->dispatchSignals) { |
|
352 | pcntl_signal_dispatch(); |
||
353 | } |
||
354 | |||
355 | 6 | if ($this->gcProbability > 0 && rand(1, 100) <= $this->gcProbability) { |
|
356 | gc_collect_cycles(); |
||
357 | } |
||
358 | |||
359 | 6 | if ($this->memoryLimit > 0 && !$this->forceStop && memory_get_usage(true) >= ($this->memoryLimit * 1024 * 1024) ) { |
|
360 | $this->forceStop("Memory limit of {$this->memoryLimit} MB reached while consuming messages"); |
||
361 | } |
||
362 | |||
363 | 6 | if ($this->forceStop) { |
|
364 | throw new ForcedStopException($this->forceStopReason); |
||
365 | } |
||
366 | 6 | } |
|
367 | } |
||
368 |
Duplicated code is one of the most pungent code smells. If you need to duplicate the same code in three or more different places, we strongly encourage you to look into extracting the code into a single class or operation.
You can also find more detailed suggestions in the “Code” section of your repository.