Consumer::consume()   F
last analyzed

Complexity

Conditions 20
Paths 434

Size

Total Lines 116

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 43
CRAP Score 21.545

Importance

Changes 0
Metric Value
dl 0
loc 116
ccs 43
cts 51
cp 0.8431
rs 0.6288
c 0
b 0
f 0
cc 20
nc 434
nop 2
crap 21.545

How to fix   Long Method    Complexity   

Long Method

Small methods make your code easier to understand, in particular if combined with a good name. Besides, if your method is small, finding a good name is usually much easier.

For example, if you find yourself adding comments to a method's body, this is usually a good sign to extract the commented part to a new method, and use the comment as a starting point when coming up with a good name for this new method.

Commonly applied refactorings include:

1
<?php
2
3
namespace Kaliop\Queueing\Plugins\SQSBundle\Adapter\SQS;
4
5
use Kaliop\QueueingBundle\Queue\MessageConsumerInterface;
6
use Kaliop\QueueingBundle\Queue\ConsumerInterface;
7
use Kaliop\QueueingBundle\Queue\SignalHandlingConsumerInterface;
8
use Kaliop\QueueingBundle\Adapter\ForcedStopException;
9
use Aws\Sqs\SqsClient;
10
use Aws\TraceMiddleware;
11
use Psr\Log\LoggerInterface;
12
13
/**
14
 * @todo support using short polling even when given a total timeout - even though it will complicate consume() even more than it already is
15
 */
16
class Consumer implements ConsumerInterface, SignalHandlingConsumerInterface
17
{
18
    /** @var  \Aws\Sqs\SqsClient */
19
    protected $client;
20
    protected $queueUrl;
21
    protected $queueName;
22
    protected $callback;
23
24
    protected $routingKey;
25
    protected $routingKeyRegexp;
26
    protected $logger;
27
    // The message attribute used to store content-type. To be kept in sync with the Producer
28
    protected $contentTypeAttribute = 'contentType';
29
    protected $routingAttribute = 'routingKey';
30
    protected $debug = false;
31
    protected $forceStop = false;
32
    protected $forceStopReason;
33
    protected $dispatchSignals = false;
34
    protected $memoryLimit = null;
35
    // NB: when changing the defaults below, alter as well KaliopQueueingPluginsSQSExtension::load
36
    /** @var int $requestBatchSize how many messages to receive in each poll by default */
37
    protected $requestBatchSize = 1;
38
    /** @var int $requestTimeout how long to wait for messages in each request. Switches between long and short polling */
39
    protected $requestTimeout = 0;
40
    /** @var int the minimum interval between two queue polls - in microseconds */
41
    protected $pollingIntervalUs = 200000;
42
    /** @var int $gcProbability the probability of calling gc_collect_cycles at the end of every poll */
43
    protected $gcProbability = 1;
44
45
    const MAX_MESSAGES_PER_REQUEST = 10;
46
    const MAX_REQUEST_TIMEOUT = 20;
47
48 10
    public function __construct(array $config)
49
    {
50 10
        $this->client = new SqsClient($config);
51 10
    }
52
53
    public function setLogger(LoggerInterface $logger = null)
54
    {
55
        $this->logger = $logger;
56
57
        return $this;
58
    }
59
60
    /**
61
     * Enabled debug. At the moment can not disable it
62
     * @param bool|array $debug
63
     * @return $this
64
     *
65
     * @todo test if using $handlerList->removeByInstance we can disable debug as well
66
     */
67 6 View Code Duplication
    public function setDebug($debug)
0 ignored issues
show
Duplication introduced by
This method seems to be duplicated in your project.

Duplicated code is one of the most pungent code smells. If you need to duplicate the same code in three or more different places, we strongly encourage you to look into extracting the code into a single class or operation.

You can also find more detailed suggestions in the “Code” section of your repository.

Loading history...
68
    {
69 6
        if ($debug == $this->debug) {
70 6
            return $this;
71
        }
72
        if ($debug) {
73
            $handlerList = $this->client->getHandlerList();
74
            $handlerList->interpose(new TraceMiddleware($debug === true ? [] : $debug));
0 ignored issues
show
Bug introduced by
It seems like $debug === true ? array() : $debug can also be of type boolean; however, Aws\TraceMiddleware::__construct() does only seem to accept array, maybe add an additional type check?

If a method or function can return multiple different values and unless you are sure that you only can receive a single value in this context, we recommend to add an additional type check:

/**
 * @return array|string
 */
function returnsDifferentValues($x) {
    if ($x) {
        return 'foo';
    }

    return array();
}

$x = returnsDifferentValues($y);
if (is_array($x)) {
    // $x is an array.
}

If this a common case that PHP Analyzer should handle natively, please let us know by opening an issue.

Loading history...
75
        }
76
77
        return $this;
78
    }
79
80
    /**
81
     * @param int $limit MB
82
     * @return Consumer
83
     */
84
    public function setMemoryLimit($limit)
85
    {
86
        $this->memoryLimit = $limit;
87
88
        return $this;
89
    }
90
91
    /**
92
     * @param string $key
93
     * @return Consumer
94
     */
95 10
    public function setRoutingKey($key)
96
    {
97 10
        $this->routingKey = (string)$key;
98 10
        $this->routingKeyRegexp = '/'.str_replace(array('\*', '#'), array('[^.]*', '.*'), preg_quote($this->routingKey, '/')).'/';
99 10
        return $this;
100
    }
101
102
    /**
103
     * @param MessageConsumerInterface $callback
104
     * @return Consumer
105
     */
106 6
    public function setCallback($callback)
107
    {
108 6
        if (! $callback instanceof \Kaliop\QueueingBundle\Queue\MessageConsumerInterface) {
109
            throw new \RuntimeException('Can not set callback to SQS Consumer, as it is not a MessageConsumerInterface');
110
        }
111 6
        $this->callback = $callback;
112
113 6
        return $this;
114
    }
115
116 10
    public function setQueueName($queueName)
117
    {
118 10
        $this->queueName = $queueName;
119
120 10
        return $this;
121
    }
122
123
    /**
124
     * The number of messages to download in every request to the SQS API.
125
     * Bigger numbers are better for performances, but there is a limit on the size of the response which SQS will send.
126
     * @param int $amount
127
     * @return Consumer
128
     */
129
    public function setRequestBatchSize($amount)
130
    {
131
        $this->requestBatchSize = $amount;
132
133
        return $this;
134
    }
135
136
    public function setRequestTimeout($timeout)
137
    {
138
        $this->requestTimeout = $timeout;
139
140
        return $this;
141
    }
142
143
    public function setPollingInterval($intervalUs)
144
    {
145
        $this->pollingIntervalUs = $intervalUs;
146
147
        return $this;
148
    }
149
150
    public function setGCProbability($probability)
151
    {
152
        $this->gcProbability = $probability;
153
154
        return $this;
155
    }
156
157
    /**
158
     * @see http://docs.aws.amazon.com/aws-sdk-php/v3/api/api-sqs-2012-11-05.html#receivemessage
159
     *
160
     * @param int $amount 0 for unlimited
161
     * @param int $timeout seconds 0 for unlimited. NB: any value > 0 activates 'long polling' mode
162
     * @return void
163
     */
164 6
    public function consume($amount, $timeout = 0)
165
    {
166 6
        if ($timeout > 0) {
167 6
            $endTime = time() + $timeout;
168 6
            $remainingTime = $timeout;
169
        }
170
171 6
        $received = 0;
172
173
        $receiveParams = array(
174 6
            'QueueUrl' => $this->queueUrl,
175
            'AttributeNames' => array('All'),
176
            'MessageAttributeNames' => array('All')
177
        );
178
179 6
        while(true) {
180 6
            $reqTime = microtime(true);
181
182 6
            if ($timeout > 0) {
183 6
                $wait = $remainingTime;
0 ignored issues
show
Bug introduced by
The variable $remainingTime does not seem to be defined for all execution paths leading up to this point.

If you define a variable conditionally, it can happen that it is not defined for all execution paths.

Let’s take a look at an example:

function myFunction($a) {
    switch ($a) {
        case 'foo':
            $x = 1;
            break;

        case 'bar':
            $x = 2;
            break;
    }

    // $x is potentially undefined here.
    echo $x;
}

In the above example, the variable $x is defined if you pass “foo” or “bar” as argument for $a. However, since the switch statement has no default case statement, if you pass any other value, the variable $x would be undefined.

Available Fixes

  1. Check for existence of the variable explicitly:

    function myFunction($a) {
        switch ($a) {
            case 'foo':
                $x = 1;
                break;
    
            case 'bar':
                $x = 2;
                break;
        }
    
        if (isset($x)) { // Make sure it's always set.
            echo $x;
        }
    }
    
  2. Define a default value for the variable:

    function myFunction($a) {
        $x = ''; // Set a default which gets overridden for certain paths.
        switch ($a) {
            case 'foo':
                $x = 1;
                break;
    
            case 'bar':
                $x = 2;
                break;
        }
    
        echo $x;
    }
    
  3. Add a value for the missing path:

    function myFunction($a) {
        switch ($a) {
            case 'foo':
                $x = 1;
                break;
    
            case 'bar':
                $x = 2;
                break;
    
            // We add support for the missing case.
            default:
                $x = '';
                break;
        }
    
        echo $x;
    }
    
Loading history...
184
185 6
                if ($wait > $this->requestTimeout) {
186 6
                    $wait = $this->requestTimeout;
187
                }
188
            } else {
189
                $wait = $this->requestTimeout;
190
            }
191
192
            // we leave it up to the API to fail
193
            //if ($wait > static::MAX_REQUEST_TIMEOUT) {
194
            //    $wait = static::MAX_REQUEST_TIMEOUT;
195
            //}
196
197 6
            if ($wait > 0) {
198
                // according to the spec, this is maximum wait time. If messages are available sooner, they get delivered immediately
199
                $receiveParams['WaitTimeSeconds'] = $wait;
200
            } else {
201 6
                if (isset($receiveParams['WaitTimeSeconds'])) {
202
                    unset($receiveParams['WaitTimeSeconds']);
203
                }
204
            }
205
206 6
            if ($amount > 0) {
207 6
                $limit = $amount - $received;
208
209 6
                if ($limit > $this->requestBatchSize) {
210 6
                    $limit = $this->requestBatchSize;
211
                }
212
            } else {
213
                $limit = $this->requestBatchSize;
214
            }
215
216
            // we leave it up to the API to fial
217
            //if ($limit > static::MAX_MESSAGES_PER_REQUEST) {
218
            //    $limit = static::MAX_MESSAGES_PER_REQUEST;
219
            //}
220
221 6
            $receiveParams['MaxNumberOfMessages'] = $limit;
222
223 6
            $result = $this->client->receiveMessage($receiveParams);
224 6
            $messages = $result->get('Messages');
225
226 6
            if (is_array($messages)) {
227 6
                foreach($messages as $message) {
228
229
                    // How we implement routing keys with SQS: since it is not supported natively, we check if the route
230
                    // matches after having downloaded the message. If it does not match, we just skip processing it.
231
                    // Since we will not call deleteMessage, SQS will requeue the message in a short time.
232
                    // This is far from optimal, but it might be better than nothing
233 6
                    if (! $this->matchRoutingKey($message)) {
234 1
                        continue;
235
                    }
236
237 6
                    $received++;
238
239
                    // removing the message from the queue is manual with SQS
240 6
                    $this->client->deleteMessage(array(
241 6
                        'QueueUrl' => $this->queueUrl,
242 6
                        'ReceiptHandle' => $message['ReceiptHandle']
243
                    ));
244
245 6
                    $data = $message['Body'];
246 6
                    unset($message['Body']);
247
248 6
                    $contentType = isset( $message['MessageAttributes'][$this->contentTypeAttribute]['StringValue'] ) ?
249 6
                        $message['MessageAttributes'][$this->contentTypeAttribute]['StringValue'] : '';
250
251 6
                    if ($contentType != '') {
252 6
                        $this->callback->receive(new Message($data, $message, $contentType, $this->queueName));
253
                    } else {
254
                        if ($this->logger) {
255
                            $this->logger->warning('The SQS Consumer received a message with no content-type attribute. Assuming default');
256
                        }
257
258 6
                        $this->callback->receive(new Message($data, $message, null, $this->queueName));
259
                    }
260
                }
261
            }
262
263 6
            $this->maybeStopConsumer();
264
265 6
            if ($amount > 0 && $received >= $amount) {
266 6
                return;
267
            }
268
269 2
            if ($timeout > 0 && ($remainingTime = ($endTime - time())) <= 0) {
0 ignored issues
show
Bug introduced by
The variable $endTime does not seem to be defined for all execution paths leading up to this point.

If you define a variable conditionally, it can happen that it is not defined for all execution paths.

Let’s take a look at an example:

function myFunction($a) {
    switch ($a) {
        case 'foo':
            $x = 1;
            break;

        case 'bar':
            $x = 2;
            break;
    }

    // $x is potentially undefined here.
    echo $x;
}

In the above example, the variable $x is defined if you pass “foo” or “bar” as argument for $a. However, since the switch statement has no default case statement, if you pass any other value, the variable $x would be undefined.

Available Fixes

  1. Check for existence of the variable explicitly:

    function myFunction($a) {
        switch ($a) {
            case 'foo':
                $x = 1;
                break;
    
            case 'bar':
                $x = 2;
                break;
        }
    
        if (isset($x)) { // Make sure it's always set.
            echo $x;
        }
    }
    
  2. Define a default value for the variable:

    function myFunction($a) {
        $x = ''; // Set a default which gets overridden for certain paths.
        switch ($a) {
            case 'foo':
                $x = 1;
                break;
    
            case 'bar':
                $x = 2;
                break;
        }
    
        echo $x;
    }
    
  3. Add a value for the missing path:

    function myFunction($a) {
        switch ($a) {
            case 'foo':
                $x = 1;
                break;
    
            case 'bar':
                $x = 2;
                break;
    
            // We add support for the missing case.
            default:
                $x = '';
                break;
        }
    
        echo $x;
    }
    
Loading history...
270
                return;
271
            }
272
273
            // observe MAX 5 requests per sec per queue by default: sleep for 0.2 secs in between requests
274 2
            $passedUs = (microtime(true) - $reqTime) * 1000000;
275 2
            if ($passedUs < $this->pollingIntervalUs) {
276 2
                usleep($this->pollingIntervalUs - $passedUs);
277
            }
278
        }
279
    }
280
281
    /**
282
     * Adopt the RabbitMQ routing key algorithm:
283
     * - split on dots
284
     * - * matches one word (q: also empty ones?)
285
     * - # matches any words
286
     *
287
     * @todo the current implementation is naive and does probably not match RabbitMq if the routing key is something like aaa.*b.ccc
288
     *       A better implementation would probably involve usage of a trie
289
     *       Some pointers on how to implement it fast: http://lists.rabbitmq.com/pipermail/rabbitmq-discuss/2011-June/013564.html
290
     * @see setRoutingKey
291
     *
292
     * @param array $message
293
     * @return bool
294
     */
295 6
    protected function matchRoutingKey(array $message)
296
    {
297 6
        if ($this->routingKey === null || $this->routingKey === '') {
298 3
            return true;
299
        }
300 3
        if (!isset($message['MessageAttributes'][$this->routingAttribute]['StringValue'])) {
301
            if ($this->logger) {
302
                $this->logger->warning('The SQS Consumer has a routing key set, and it received a message without routing information. Processing it anyway');
303
            }
304
            return true;
305
        }
306
307 3
        return preg_match(
308 3
            $this->routingKeyRegexp,
309 3
            $message['MessageAttributes'][$this->routingAttribute]['StringValue']
310
        );
311
    }
312
313
    /**
314
     * @param string $queueUrl the complete queue name as used by SQS
315
     * @return Consumer
316
     */
317 10
    public function setQueueUrl($queueUrl)
318
    {
319 10
        $this->queueUrl = $queueUrl;
320
321 10
        return $this;
322
    }
323
324
    /**
325
     * @return string the complete queue name as used by SQS
326
     */
327
    public function getQueueUrl()
328
    {
329
        return $this->queueUrl;
330
    }
331
332
    public function setHandleSignals($doHandle)
333
    {
334
        $this->dispatchSignals = $doHandle;
335
    }
336
337
338
    public function forceStop($reason = '')
339
    {
340
        $this->forceStop = true;
341
        $this->forceStopReason = $reason;
342
    }
343
344
    /**
345
     * Dispatches signals and throws an exception if user wants to stop. To be called at execution points when there is no data loss
346
     *
347
     * @throws ForcedStopException
348
     */
349 6
    protected function maybeStopConsumer()
350
    {
351 6
        if ($this->dispatchSignals) {
352
            pcntl_signal_dispatch();
353
        }
354
355 6
        if ($this->gcProbability > 0 && rand(1, 100) <= $this->gcProbability) {
356
            gc_collect_cycles();
357
        }
358
359 6
        if ($this->memoryLimit > 0 && !$this->forceStop && memory_get_usage(true) >= ($this->memoryLimit * 1024 * 1024) ) {
360
            $this->forceStop("Memory limit of {$this->memoryLimit} MB reached while consuming messages");
361
        }
362
363 6
        if ($this->forceStop) {
364
            throw new ForcedStopException($this->forceStopReason);
365
        }
366 6
    }
367
}
368