Completed
Push — master ( eb83dc...4c6e24 )
by Gaetano
03:52
created

Consumer   B

Complexity

Total Complexity 52

Size/Duplication

Total Lines 351
Duplicated Lines 3.13 %

Coupling/Cohesion

Components 1
Dependencies 8

Test Coverage

Coverage 0%

Importance

Changes 0
Metric Value
wmc 52
lcom 1
cbo 8
dl 11
loc 351
ccs 0
cts 189
cp 0
rs 7.44
c 0
b 0
f 0

18 Methods

Rating   Name   Duplication   Size   Complexity  
A __construct() 0 4 1
A setLogger() 0 6 1
A setDebug() 11 11 4
A setMemoryLimit() 0 6 1
A setRoutingKey() 0 6 1
A setCallback() 0 9 2
A setQueueName() 0 6 1
A setRequestBatchSize() 0 6 1
A setRequestTimeout() 0 6 1
A setPollingInterval() 0 6 1
A setGCProbability() 0 6 1
F consume() 0 116 20
A matchRoutingKey() 0 17 5
A setQueueUrl() 0 6 1
A getQueueUrl() 0 4 1
A setHandleSignals() 0 4 1
A forceStop() 0 5 1
B maybeStopConsumer() 0 18 8

How to fix   Duplicated Code    Complexity   

Duplicated Code

Duplicate code is one of the most pungent code smells. A rule that is often used is to re-structure code once it is duplicated in three or more places.

Common duplication problems, and corresponding solutions are:

Complex Class

 Tip:   Before tackling complexity, make sure that you eliminate any duplication first. This often can reduce the size of classes significantly.

Complex classes like Consumer often do a lot of different things. To break such a class down, we need to identify a cohesive component within that class. A common approach to find such a component is to look for fields/methods that share the same prefixes, or suffixes. You can also have a look at the cohesion graph to spot any un-connected, or weakly-connected components.

Once you have determined the fields that belong together, you can apply the Extract Class refactoring. If the component makes sense as a sub-class, Extract Subclass is also a candidate, and is often faster.

While breaking up the class, it is a good idea to analyze how other classes use Consumer, and based on these observations, apply Extract Interface, too.

1
<?php
2
3
namespace Kaliop\Queueing\Plugins\SQSBundle\Adapter\SQS;
4
5
use Kaliop\QueueingBundle\Queue\MessageConsumerInterface;
6
use Kaliop\QueueingBundle\Queue\ConsumerInterface;
7
use Kaliop\QueueingBundle\Queue\SignalHandlingConsumerInterface;
8
use Kaliop\QueueingBundle\Adapter\ForcedStopException;
9
use Aws\Sqs\SqsClient;
10
use Aws\TraceMiddleware;
11
use Psr\Log\LoggerInterface;
12
13
/**
14
 * @todo support using short polling even when given a total timeout - even though it will complicate consume() even more than it already is
15
 */
16
class Consumer implements ConsumerInterface, SignalHandlingConsumerInterface
17
{
18
    /** @var  \Aws\Sqs\SqsClient */
19
    protected $client;
20
    protected $queueUrl;
21
    protected $queueName;
22
    protected $callback;
23
24
    protected $routingKey;
25
    protected $routingKeyRegexp;
26
    protected $logger;
27
    // The message attribute used to store content-type. To be kept in sync with the Producer
28
    protected $contentTypeAttribute = 'contentType';
29
    protected $routingAttribute = 'routingKey';
30
    protected $debug = false;
31
    protected $forceStop = false;
32
    protected $forceStopReason;
33
    protected $dispatchSignals = false;
34
    protected $memoryLimit = null;
35
    // NB: when changing the defaults below, alter as well KaliopQueueingPluginsSQSExtension::load
36
    /** @var int $requestBatchSize how many messages to receive in each poll by default */
37
    protected $requestBatchSize = 1;
38
    /** @var int $requestTimeout how long to wait for messages in each request. Switches between long and short polling */
39
    protected $requestTimeout = 0;
40
    /** @var int the minimum interval between two queue polls - in microseconds */
41
    protected $pollingIntervalUs = 200000;
42
    /** @var int $gcProbability the probability of calling gc_collect_cycles at the end of every poll */
43
    protected $gcProbability = 1;
44
45
    const MAX_MESSAGES_PER_REQUEST = 10;
46
    const MAX_REQUEST_TIMEOUT = 20;
47
48
    public function __construct(array $config)
49
    {
50
        $this->client = new SqsClient($config);
51
    }
52
53
    public function setLogger(LoggerInterface $logger = null)
54
    {
55
        $this->logger = $logger;
56
57
        return $this;
58
    }
59
60
    /**
61
     * Enabled debug. At the moment can not disable it
62
     * @param bool|array $debug
63
     * @return $this
64
     *
65
     * @todo test if using $handlerList->removeByInstance we can disable debug as well
66
     */
67 View Code Duplication
    public function setDebug($debug) {
0 ignored issues
show
Duplication introduced by
This method seems to be duplicated in your project.

Duplicated code is one of the most pungent code smells. If you need to duplicate the same code in three or more different places, we strongly encourage you to look into extracting the code into a single class or operation.

You can also find more detailed suggestions in the “Code” section of your repository.

Loading history...
68
        if ($debug == $this->debug) {
69
            return $this;
70
        }
71
        if ($debug) {
72
            $handlerList = $this->client->getHandlerList();
73
            $handlerList->interpose(new TraceMiddleware($debug === true ? [] : $debug));
0 ignored issues
show
Bug introduced by
It seems like $debug === true ? array() : $debug can also be of type boolean; however, Aws\TraceMiddleware::__construct() does only seem to accept array, maybe add an additional type check?

If a method or function can return multiple different values and unless you are sure that you only can receive a single value in this context, we recommend to add an additional type check:

/**
 * @return array|string
 */
function returnsDifferentValues($x) {
    if ($x) {
        return 'foo';
    }

    return array();
}

$x = returnsDifferentValues($y);
if (is_array($x)) {
    // $x is an array.
}

If this a common case that PHP Analyzer should handle natively, please let us know by opening an issue.

Loading history...
74
        }
75
76
        return $this;
77
    }
78
79
    /**
80
     * @param int $limit MB
81
     * @return Consumer
82
     */
83
    public function setMemoryLimit($limit)
84
    {
85
        $this->memoryLimit = $limit;
86
87
        return $this;
88
    }
89
90
    /**
91
     * @param string $key
92
     * @return Consumer
93
     */
94
    public function setRoutingKey($key)
95
    {
96
        $this->routingKey = (string)$key;
97
        $this->routingKeyRegexp = '/'.str_replace(array('\*', '#'), array('[^.]*', '.*'), preg_quote($this->routingKey, '/')).'/';
98
        return $this;
99
    }
100
101
    /**
102
     * @param MessageConsumerInterface $callback
103
     * @return Consumer
104
     */
105
    public function setCallback($callback)
106
    {
107
        if (! $callback instanceof \Kaliop\QueueingBundle\Queue\MessageConsumerInterface) {
108
            throw new \RuntimeException('Can not set callback to SQS Consumer, as it is not a MessageConsumerInterface');
109
        }
110
        $this->callback = $callback;
111
112
        return $this;
113
    }
114
115
    public function setQueueName($queueName)
116
    {
117
        $this->queueName = $queueName;
118
119
        return $this;
120
    }
121
122
    /**
123
     * The number of messages to download in every request to the SQS API.
124
     * Bigger numbers are better for performances, but there is a limit on the size of the response which SQS will send.
125
     * @param int $amount
126
     * @return Consumer
127
     */
128
    public function setRequestBatchSize($amount)
129
    {
130
        $this->requestBatchSize = $amount;
131
132
        return $this;
133
    }
134
135
    public function setRequestTimeout($timeout)
136
    {
137
        $this->requestTimeout = $timeout;
138
139
        return $this;
140
    }
141
142
    public function setPollingInterval($intervalUs)
143
    {
144
        $this->pollingIntervalUs = $intervalUs;
145
146
        return $this;
147
    }
148
149
    public function setGCProbability($probability)
150
    {
151
        $this->gcProbability = $probability;
152
153
        return $this;
154
    }
155
156
    /**
157
     * @see http://docs.aws.amazon.com/aws-sdk-php/v3/api/api-sqs-2012-11-05.html#receivemessage
158
     *
159
     * @param int $amount 0 for unlimited
160
     * @param int $timeout seconds 0 for unlimited. NB: any value > 0 activates 'long polling' mode
161
     * @return void
162
     */
163
    public function consume($amount, $timeout = 0)
164
    {
165
        if ($timeout > 0) {
166
            $endTime = time() + $timeout;
167
            $remainingTime = $timeout;
168
        }
169
170
        $received = 0;
171
172
        $receiveParams = array(
173
            'QueueUrl' => $this->queueUrl,
174
            'AttributeNames' => array('All'),
175
            'MessageAttributeNames' => array('All')
176
        );
177
178
        while(true) {
179
            $reqTime = microtime(true);
180
181
            if ($timeout > 0) {
182
                $wait = $remainingTime;
0 ignored issues
show
Bug introduced by
The variable $remainingTime does not seem to be defined for all execution paths leading up to this point.

If you define a variable conditionally, it can happen that it is not defined for all execution paths.

Let’s take a look at an example:

function myFunction($a) {
    switch ($a) {
        case 'foo':
            $x = 1;
            break;

        case 'bar':
            $x = 2;
            break;
    }

    // $x is potentially undefined here.
    echo $x;
}

In the above example, the variable $x is defined if you pass “foo” or “bar” as argument for $a. However, since the switch statement has no default case statement, if you pass any other value, the variable $x would be undefined.

Available Fixes

  1. Check for existence of the variable explicitly:

    function myFunction($a) {
        switch ($a) {
            case 'foo':
                $x = 1;
                break;
    
            case 'bar':
                $x = 2;
                break;
        }
    
        if (isset($x)) { // Make sure it's always set.
            echo $x;
        }
    }
    
  2. Define a default value for the variable:

    function myFunction($a) {
        $x = ''; // Set a default which gets overridden for certain paths.
        switch ($a) {
            case 'foo':
                $x = 1;
                break;
    
            case 'bar':
                $x = 2;
                break;
        }
    
        echo $x;
    }
    
  3. Add a value for the missing path:

    function myFunction($a) {
        switch ($a) {
            case 'foo':
                $x = 1;
                break;
    
            case 'bar':
                $x = 2;
                break;
    
            // We add support for the missing case.
            default:
                $x = '';
                break;
        }
    
        echo $x;
    }
    
Loading history...
183
184
                if ($wait > $this->requestTimeout) {
185
                    $wait = $this->requestTimeout;
186
                }
187
            } else {
188
                $wait = $this->requestTimeout;
189
            }
190
191
            // we leave it up to the API to fail
192
            //if ($wait > static::MAX_REQUEST_TIMEOUT) {
193
            //    $wait = static::MAX_REQUEST_TIMEOUT;
194
            //}
195
196
            if ($wait > 0) {
197
                // according to the spec, this is maximum wait time. If messages are available sooner, they get delivered immediately
198
                $receiveParams['WaitTimeSeconds'] = $wait;
199
            } else {
200
                if (isset($receiveParams['WaitTimeSeconds'])) {
201
                    unset($receiveParams['WaitTimeSeconds']);
202
                }
203
            }
204
205
            if ($amount > 0) {
206
                $limit = $amount - $received;
207
208
                if ($limit > $this->requestBatchSize) {
209
                    $limit = $this->requestBatchSize;
210
                }
211
            } else {
212
                $limit = $this->requestBatchSize;
213
            }
214
215
            // we leave it up to the API to fial
216
            //if ($limit > static::MAX_MESSAGES_PER_REQUEST) {
217
            //    $limit = static::MAX_MESSAGES_PER_REQUEST;
218
            //}
219
220
            $receiveParams['MaxNumberOfMessages'] = $limit;
221
222
            $result = $this->client->receiveMessage($receiveParams);
223
            $messages = $result->get('Messages');
224
225
            if (is_array($messages)) {
226
                foreach($messages as $message) {
227
228
                    // How we implement routing keys with SQS: since it is not supported natively, we check if the route
229
                    // matches after having downloaded the message. If it does not match, we just skip processing it.
230
                    // Since we will not call deleteMessage, SQS will requeue the message in a short time.
231
                    // This is far from optimal, but it might be better than nothing
232
                    if (! $this->matchRoutingKey($message)) {
233
                        continue;
234
                    }
235
236
                    $received++;
237
238
                    // removing the message from the queue is manual with SQS
239
                    $this->client->deleteMessage(array(
240
                        'QueueUrl' => $this->queueUrl,
241
                        'ReceiptHandle' => $message['ReceiptHandle']
242
                    ));
243
244
                    $data = $message['Body'];
245
                    unset($message['Body']);
246
247
                    $contentType = isset( $message['MessageAttributes'][$this->contentTypeAttribute]['StringValue'] ) ?
248
                        $message['MessageAttributes'][$this->contentTypeAttribute]['StringValue'] : '';
249
250
                    if ($contentType != '') {
251
                        $this->callback->receive(new Message($data, $message, $contentType, $this->queueName));
252
                    } else {
253
                        if ($this->logger) {
254
                            $this->logger->warning('The SQS Consumer received a message with no content-type attribute. Assuming default');
255
                        }
256
257
                        $this->callback->receive(new Message($data, $message, null, $this->queueName));
258
                    }
259
                }
260
            }
261
262
            $this->maybeStopConsumer();
263
264
            if ($amount > 0 && $received >= $amount) {
265
                return;
266
            }
267
268
            if ($timeout > 0 && ($remainingTime = ($endTime - time())) <= 0) {
0 ignored issues
show
Bug introduced by
The variable $endTime does not seem to be defined for all execution paths leading up to this point.

If you define a variable conditionally, it can happen that it is not defined for all execution paths.

Let’s take a look at an example:

function myFunction($a) {
    switch ($a) {
        case 'foo':
            $x = 1;
            break;

        case 'bar':
            $x = 2;
            break;
    }

    // $x is potentially undefined here.
    echo $x;
}

In the above example, the variable $x is defined if you pass “foo” or “bar” as argument for $a. However, since the switch statement has no default case statement, if you pass any other value, the variable $x would be undefined.

Available Fixes

  1. Check for existence of the variable explicitly:

    function myFunction($a) {
        switch ($a) {
            case 'foo':
                $x = 1;
                break;
    
            case 'bar':
                $x = 2;
                break;
        }
    
        if (isset($x)) { // Make sure it's always set.
            echo $x;
        }
    }
    
  2. Define a default value for the variable:

    function myFunction($a) {
        $x = ''; // Set a default which gets overridden for certain paths.
        switch ($a) {
            case 'foo':
                $x = 1;
                break;
    
            case 'bar':
                $x = 2;
                break;
        }
    
        echo $x;
    }
    
  3. Add a value for the missing path:

    function myFunction($a) {
        switch ($a) {
            case 'foo':
                $x = 1;
                break;
    
            case 'bar':
                $x = 2;
                break;
    
            // We add support for the missing case.
            default:
                $x = '';
                break;
        }
    
        echo $x;
    }
    
Loading history...
269
                return;
270
            }
271
272
            // observe MAX 5 requests per sec per queue by default: sleep for 0.2 secs in between requests
273
            $passedUs = (microtime(true) - $reqTime) * 1000000;
274
            if ($passedUs < $this->pollingIntervalUs) {
275
                usleep($this->pollingIntervalUs - $passedUs);
276
            }
277
        }
278
    }
279
280
    /**
281
     * Adopt the RabbitMQ routing key algorithm:
282
     * - split on dots
283
     * - * matches one word (q: also empty ones?)
284
     * - # matches any words
285
     *
286
     * @todo the current implementation is naive and does probably not match RabbitMq if the routing key is something like aaa.*b.ccc
287
     *       A better implementation would probably involve usage of a trie
288
     *       Some pointers on how to implement it fast: http://lists.rabbitmq.com/pipermail/rabbitmq-discuss/2011-June/013564.html
289
     * @see setRoutingKey
290
     *
291
     * @param array $message
292
     * @return bool
293
     */
294
    protected function matchRoutingKey(array $message)
295
    {
296
        if ($this->routingKey === null || $this->routingKey === '') {
297
            return true;
298
        }
299
        if (!isset($message['MessageAttributes'][$this->routingAttribute]['StringValue'])) {
300
            if ($this->logger) {
301
                $this->logger->warning('The SQS Consumer has a routing key set, and it received a message without routing information. Processing it anyway');
302
            }
303
            return true;
304
        }
305
306
        return preg_match(
307
            $this->routingKeyRegexp,
308
            $message['MessageAttributes'][$this->routingAttribute]['StringValue']
309
        );
310
    }
311
312
    /**
313
     * @param string $queueUrl the complete queue name as used by SQS
314
     * @return Consumer
315
     */
316
    public function setQueueUrl($queueUrl)
317
    {
318
        $this->queueUrl = $queueUrl;
319
320
        return $this;
321
    }
322
323
    /**
324
     * @return string the complete queue name as used by SQS
325
     */
326
    public function getQueueUrl()
327
    {
328
        return $this->queueUrl;
329
    }
330
331
    public function setHandleSignals($doHandle)
332
    {
333
        $this->dispatchSignals = $doHandle;
334
    }
335
336
337
    public function forceStop($reason = '')
338
    {
339
        $this->forceStop = true;
340
        $this->forceStopReason = $reason;
341
    }
342
343
    /**
344
     * Dispatches signals and throws an exception if user wants to stop. To be called at execution points when there is no data loss
345
     *
346
     * @throws ForcedStopException
347
     */
348
    protected function maybeStopConsumer()
349
    {
350
        if ($this->dispatchSignals) {
351
            pcntl_signal_dispatch();
352
        }
353
354
        if ($this->gcProbability > 0 && rand(1, 100) <= $this->gcProbability) {
355
            gc_collect_cycles();
356
        }
357
358
        if ($this->memoryLimit > 0 && !$this->forceStop && memory_get_usage(true) >= ($this->memoryLimit * 1024 * 1024) ) {
359
            $this->forceStop("Memory limit of {$this->memoryLimit} MB reached while consuming messages");
360
        }
361
362
        if ($this->forceStop) {
363
            throw new ForcedStopException($this->forceStopReason);
364
        }
365
    }
366
}
367