Completed
Push — master ( 4f2d5a...a59eef )
by Gaetano
08:35
created

Consumer   B

Complexity

Total Complexity 52

Size/Duplication

Total Lines 340
Duplicated Lines 3.24 %

Coupling/Cohesion

Components 1
Dependencies 8

Test Coverage

Coverage 62.9%

Importance

Changes 0
Metric Value
wmc 52
lcom 1
cbo 8
dl 11
loc 340
ccs 78
cts 124
cp 0.629
rs 7.44
c 0
b 0
f 0

18 Methods

Rating   Name   Duplication   Size   Complexity  
A __construct() 0 4 1
A setLogger() 0 6 1
A setDebug() 11 11 4
A setMemoryLimit() 0 6 1
A setRoutingKey() 0 6 1
A setCallback() 0 9 2
A setQueueName() 0 6 1
A setRequestBatchSize() 0 6 1
A setRequestTimeout() 0 6 1
A setPollingInterval() 0 6 1
A setGCProbability() 0 6 1
F consume() 0 105 20
A matchRoutingKey() 0 17 5
A setQueueUrl() 0 6 1
A getQueueUrl() 0 4 1
A setHandleSignals() 0 4 1
A forceStop() 0 5 1
B maybeStopConsumer() 0 19 8

How to fix   Duplicated Code    Complexity   

Duplicated Code

Duplicate code is one of the most pungent code smells. A rule that is often used is to re-structure code once it is duplicated in three or more places.

Common duplication problems, and corresponding solutions are:

Complex Class

 Tip:   Before tackling complexity, make sure that you eliminate any duplication first. This often can reduce the size of classes significantly.

Complex classes like Consumer often do a lot of different things. To break such a class down, we need to identify a cohesive component within that class. A common approach to find such a component is to look for fields/methods that share the same prefixes, or suffixes. You can also have a look at the cohesion graph to spot any un-connected, or weakly-connected components.

Once you have determined the fields that belong together, you can apply the Extract Class refactoring. If the component makes sense as a sub-class, Extract Subclass is also a candidate, and is often faster.

While breaking up the class, it is a good idea to analyze how other classes use Consumer, and based on these observations, apply Extract Interface, too.

1
<?php
2
3
namespace Kaliop\Queueing\Plugins\SQSBundle\Adapter\SQS;
4
5
use Kaliop\QueueingBundle\Queue\MessageConsumerInterface;
6
use Kaliop\QueueingBundle\Queue\ConsumerInterface;
7
use Kaliop\QueueingBundle\Queue\SignalHandlingConsumerInterface;
8
use Kaliop\QueueingBundle\Adapter\ForcedStopException;
9
use Aws\Sqs\SqsClient;
10
use Aws\TraceMiddleware;
11
use Psr\Log\LoggerInterface;
12
13
/**
14
 * @todo support using short polling even when given a total timeout - even though it will complicate consume() even more than it already is
15
 */
16
class Consumer implements ConsumerInterface, SignalHandlingConsumerInterface
17
{
18
    /** @var  \Aws\Sqs\SqsClient */
19
    protected $client;
20
    protected $queueUrl;
21
    protected $queueName;
22
    protected $callback;
23
24
    protected $routingKey;
25
    protected $routingKeyRegexp;
26
    protected $logger;
27
    // The message attribute used to store content-type. To be kept in sync with the Producer
28
    protected $contentTypeAttribute = 'contentType';
29
    protected $routingAttribute = 'routingKey';
30
    protected $debug = false;
31
    protected $forceStop = false;
32
    protected $forceStopReason;
33
    protected $dispatchSignals = false;
34
    protected $memoryLimit = null;
35
    /** @var int $requestBatchSize how many messages to receive in each poll by default */
36
    protected $requestBatchSize = 1;
37
    /** @var int $requestTimeout how long to wait for messages in each request. Switches between long and short polling */
38
    protected $requestTimeout = 0;
39
    /** @var int the minimum interval between two queue polls - in milliseconds */
40
    protected $pollingIntervalMs = 200000;
41
    /** @var int $gcProbability the probability of calling gc_collect_cycles at the end of every poll */
42
    protected $gcProbability = 1;
43
44
    const MAX_MESSAGES_PER_REQUEST = 10;
45
    const MAX_REQUEST_TIMEOUT = 20;
46
47 9
    public function __construct(array $config)
48
    {
49 9
        $this->client = new SqsClient($config);
50 9
    }
51
52
    public function setLogger(LoggerInterface $logger = null)
53
    {
54
        $this->logger = $logger;
55
56
        return $this;
57
    }
58
59
    /**
60
     * Enabled debug. At the moment can not disable it
61
     * @param bool|array $debug
62
     * @return $this
63
     *
64
     * @todo test if using $handlerList->removeByInstance we can disable debug as well
65
     */
66 4 View Code Duplication
    public function setDebug($debug) {
0 ignored issues
show
Duplication introduced by
This method seems to be duplicated in your project.

Duplicated code is one of the most pungent code smells. If you need to duplicate the same code in three or more different places, we strongly encourage you to look into extracting the code into a single class or operation.

You can also find more detailed suggestions in the “Code” section of your repository.

Loading history...
67 4
        if ($debug == $this->debug) {
68 4
            return $this;
69
        }
70
        if ($debug) {
71
            $handlerList = $this->client->getHandlerList();
72
            $handlerList->interpose(new TraceMiddleware($debug === true ? [] : $debug));
0 ignored issues
show
Bug introduced by
It seems like $debug === true ? array() : $debug can also be of type boolean; however, Aws\TraceMiddleware::__construct() does only seem to accept array, maybe add an additional type check?

If a method or function can return multiple different values and unless you are sure that you only can receive a single value in this context, we recommend to add an additional type check:

/**
 * @return array|string
 */
function returnsDifferentValues($x) {
    if ($x) {
        return 'foo';
    }

    return array();
}

$x = returnsDifferentValues($y);
if (is_array($x)) {
    // $x is an array.
}

If this a common case that PHP Analyzer should handle natively, please let us know by opening an issue.

Loading history...
73
        }
74
75
        return $this;
76
    }
77
78
    /**
79
     * @param int $limit MB
80
     * @return Consumer
81
     */
82
    public function setMemoryLimit($limit)
83
    {
84
        $this->memoryLimit = $limit;
85
86
        return $this;
87
    }
88
89
    /**
90
     * @param string $key
91
     * @return Consumer
92
     */
93 9
    public function setRoutingKey($key)
94
    {
95 9
        $this->routingKey = (string)$key;
96 9
        $this->routingKeyRegexp = '/'.str_replace(array('\*', '#'), array('[^.]*', '.*'), preg_quote($this->routingKey, '/')).'/';
97 9
        return $this;
98
    }
99
100
    /**
101
     * @param MessageConsumerInterface $callback
102
     * @return Consumer
103
     */
104 4
    public function setCallback($callback)
105
    {
106 4
        if (! $callback instanceof \Kaliop\QueueingBundle\Queue\MessageConsumerInterface) {
107
            throw new \RuntimeException('Can not set callback to SQS Consumer, as it is not a MessageConsumerInterface');
108
        }
109 4
        $this->callback = $callback;
110
111 4
        return $this;
112
    }
113
114 9
    public function setQueueName($queueName)
115
    {
116 9
        $this->queueName = $queueName;
117
118 9
        return $this;
119
    }
120
121
    /**
122
     * The number of messages to download in every request to the SQS API.
123
     * Bigger numbers are better for performances, but there is a limit on the size of the response which SQS will send.
124
     * @param int $amount
125
     * @return Consumer
126
     */
127
    public function setRequestBatchSize($amount)
128
    {
129
        $this->requestBatchSize = $amount;
130
131
        return $this;
132
    }
133
134
    public function setRequestTimeout($timeout)
135
    {
136
        $this->requestTimeout = $timeout;
137
138
        return $this;
139
    }
140
141
    public function setPollingInterval($intervalMs)
142
    {
143
        $this->pollingIntervalMs = $intervalMs;
144
145
        return $this;
146
    }
147
148
    public function setGCProbability($probability)
149
    {
150
        $this->gcProbability = $probability;
151
152
        return $this;
153
    }
154
155
    /**
156
     * @see http://docs.aws.amazon.com/aws-sdk-php/v3/api/api-sqs-2012-11-05.html#receivemessage
157
     *
158
     * @param int $amount 0 for unlimited
159
     * @param int $timeout seconds 0 for unlimited. NB: any value > 0 activates 'long polling' mode
160
     * @return void
161
     */
162 4
    public function consume($amount, $timeout = 0)
163
    {
164 4
        if ($timeout > 0) {
165 4
            $endTime = time() + $timeout;
166 4
            $remainingTime = $timeout;
167
        }
168
169 4
        $received = 0;
170
171
        $receiveParams = array(
172 4
            'QueueUrl' => $this->queueUrl,
173
            'AttributeNames' => array('All'),
174
            'MessageAttributeNames' => array('All')
175
        );
176
177 4
        while(true) {
178 4
            $reqTime = microtime(true);
179
180 4
            if ($timeout > 0) {
181 4
                $wait = $remainingTime;
0 ignored issues
show
Bug introduced by
The variable $remainingTime does not seem to be defined for all execution paths leading up to this point.

If you define a variable conditionally, it can happen that it is not defined for all execution paths.

Let’s take a look at an example:

function myFunction($a) {
    switch ($a) {
        case 'foo':
            $x = 1;
            break;

        case 'bar':
            $x = 2;
            break;
    }

    // $x is potentially undefined here.
    echo $x;
}

In the above example, the variable $x is defined if you pass “foo” or “bar” as argument for $a. However, since the switch statement has no default case statement, if you pass any other value, the variable $x would be undefined.

Available Fixes

  1. Check for existence of the variable explicitly:

    function myFunction($a) {
        switch ($a) {
            case 'foo':
                $x = 1;
                break;
    
            case 'bar':
                $x = 2;
                break;
        }
    
        if (isset($x)) { // Make sure it's always set.
            echo $x;
        }
    }
    
  2. Define a default value for the variable:

    function myFunction($a) {
        $x = ''; // Set a default which gets overridden for certain paths.
        switch ($a) {
            case 'foo':
                $x = 1;
                break;
    
            case 'bar':
                $x = 2;
                break;
        }
    
        echo $x;
    }
    
  3. Add a value for the missing path:

    function myFunction($a) {
        switch ($a) {
            case 'foo':
                $x = 1;
                break;
    
            case 'bar':
                $x = 2;
                break;
    
            // We add support for the missing case.
            default:
                $x = '';
                break;
        }
    
        echo $x;
    }
    
Loading history...
182 4
                if ($wait > static::MAX_REQUEST_TIMEOUT) {
183 4
                    $wait = static::MAX_REQUEST_TIMEOUT;
184
                }
185
            } else {
186
                $wait = $this->requestTimeout;
187
            }
188
189 4
            if ($wait > 0) {
190
                // according to the spec, this is maximum wait time. If messages are available sooner, they get delivered immediately
191 4
                $receiveParams['WaitTimeSeconds'] = $wait;
192
            } else {
193
                if (isset($receiveParams['WaitTimeSeconds'])) {
194
                    unset($receiveParams['WaitTimeSeconds']);
195
                }
196
            }
197
198 4
            if ($amount > 0) {
199 4
                $limit = $amount - $received;
200
201 4
                if ($limit >= static::MAX_MESSAGES_PER_REQUEST) {
202 4
                    $limit = static::MAX_MESSAGES_PER_REQUEST;
203
                }
204
            } else {
205
                $limit = $this->requestBatchSize;
206
            }
207
208 4
            $receiveParams['MaxNumberOfMessages'] = $limit;
209
210 4
            $result = $this->client->receiveMessage($receiveParams);
211 4
            $messages = $result->get('Messages');
212
213 4
            if (is_array($messages)) {
214 4
                foreach($messages as $message) {
215
216
                    // How we implement routing keys with SQS: since it is not supported natively, we check if the route
217
                    // matches after having downloaded the message. If it does not match, we just skip processing it.
218
                    // Since we will not call deleteMessage, SQS will requeue the message in a short time.
219
                    // This is far from optimal, but it might be better than nothing
220 4
                    if (! $this->matchRoutingKey($message)) {
221 2
                        continue;
222
                    }
223
224 4
                    $received++;
225
226
                    // removing the message from the queue is manual with SQS
227 4
                    $this->client->deleteMessage(array(
228 4
                        'QueueUrl' => $this->queueUrl,
229 4
                        'ReceiptHandle' => $message['ReceiptHandle']
230
                    ));
231
232 4
                    $data = $message['Body'];
233 4
                    unset($message['Body']);
234
235 4
                    $contentType = isset( $message['MessageAttributes'][$this->contentTypeAttribute]['StringValue'] ) ?
236 4
                        $message['MessageAttributes'][$this->contentTypeAttribute]['StringValue'] : '';
237
238 4
                    if ($contentType != '') {
239 4
                        $this->callback->receive(new Message($data, $message, $contentType, $this->queueName));
240
                    } else {
241
                        if ($this->logger) {
242
                            $this->logger->warning('The SQS Consumer received a message with no content-type attribute. Assuming default');
243
                        }
244
245 4
                        $this->callback->receive(new Message($data, $message, null, $this->queueName));
246
                    }
247
                }
248
            }
249
250 4
            $this->maybeStopConsumer();
251
252 4
            if ($amount > 0 && $received >= $amount) {
253 4
                return;
254
            }
255
256 3
            if ($timeout > 0 && ($remainingTime = ($endTime - time())) <= 0) {
0 ignored issues
show
Bug introduced by
The variable $endTime does not seem to be defined for all execution paths leading up to this point.

If you define a variable conditionally, it can happen that it is not defined for all execution paths.

Let’s take a look at an example:

function myFunction($a) {
    switch ($a) {
        case 'foo':
            $x = 1;
            break;

        case 'bar':
            $x = 2;
            break;
    }

    // $x is potentially undefined here.
    echo $x;
}

In the above example, the variable $x is defined if you pass “foo” or “bar” as argument for $a. However, since the switch statement has no default case statement, if you pass any other value, the variable $x would be undefined.

Available Fixes

  1. Check for existence of the variable explicitly:

    function myFunction($a) {
        switch ($a) {
            case 'foo':
                $x = 1;
                break;
    
            case 'bar':
                $x = 2;
                break;
        }
    
        if (isset($x)) { // Make sure it's always set.
            echo $x;
        }
    }
    
  2. Define a default value for the variable:

    function myFunction($a) {
        $x = ''; // Set a default which gets overridden for certain paths.
        switch ($a) {
            case 'foo':
                $x = 1;
                break;
    
            case 'bar':
                $x = 2;
                break;
        }
    
        echo $x;
    }
    
  3. Add a value for the missing path:

    function myFunction($a) {
        switch ($a) {
            case 'foo':
                $x = 1;
                break;
    
            case 'bar':
                $x = 2;
                break;
    
            // We add support for the missing case.
            default:
                $x = '';
                break;
        }
    
        echo $x;
    }
    
Loading history...
257
                return;
258
            }
259
260
            // observe MAX 5 requests per sec per queue by default: sleep for 0.2 secs in between requests
261 3
            $passedMs = (microtime(true) - $reqTime) * 1000000;
262 3
            if ($passedMs < $this->pollingIntervalMs) {
263 3
                usleep($this->pollingIntervalMs - $passedMs);
264
            }
265
        }
266
    }
267
268
    /**
269
     * Adopt the RabbitMQ routing key algorithm:
270
     * - split on dots
271
     * - * matches one word (q: also empty ones?)
272
     * - # matches any words
273
     *
274
     * @todo the current implementation is naive and does probably not match RabbitMq if the routing key is something like aaa.*b.ccc
275
     *       A better implementation would probably involve usage of a trie
276
     *       Some pointers on how to implement it fast: http://lists.rabbitmq.com/pipermail/rabbitmq-discuss/2011-June/013564.html
277
     * @see setRoutingKey
278
     *
279
     * @param array $message
280
     * @return bool
281
     */
282 4
    protected function matchRoutingKey(array $message)
283
    {
284 4
        if ($this->routingKey === null || $this->routingKey === '') {
285 1
            return true;
286
        }
287 3
        if (!isset($message['MessageAttributes'][$this->routingAttribute]['StringValue'])) {
288
            if ($this->logger) {
289
                $this->logger->warning('The SQS Consumer has a routing key set, and it received a message without routing information. Processing it anyway');
290
            }
291
            return true;
292
        }
293
294 3
        return preg_match(
295 3
            $this->routingKeyRegexp,
296 3
            $message['MessageAttributes'][$this->routingAttribute]['StringValue']
297
        );
298
    }
299
300
    /**
301
     * @param string $queueUrl the complete queue name as used by SQS
302
     * @return Consumer
303
     */
304 9
    public function setQueueUrl($queueUrl)
305
    {
306 9
        $this->queueUrl = $queueUrl;
307
308 9
        return $this;
309
    }
310
311
    /**
312
     * @return string the complete queue name as used by SQS
313
     */
314
    public function getQueueUrl()
315
    {
316
        return $this->queueUrl;
317
    }
318
319
    public function setHandleSignals($doHandle)
320
    {
321
        $this->dispatchSignals = $doHandle;
322
    }
323
324
325
    public function forceStop($reason = '')
326
    {
327
        $this->forceStop = true;
328
        $this->forceStopReason = $reason;
329
    }
330
331
    /**
332
     * Dispatches signals and throws an exception if user wants to stop. To be called at execution points when there is no data loss
333
     *
334
     * @throws ForcedStopException
335
     */
336 4
    protected function maybeStopConsumer()
337
    {
338 4
        if ($this->dispatchSignals) {
339
            pcntl_signal_dispatch();
340
        }
341
342 4
        if ($this->gcProbability > 0 && rand(1, 100) <= $this->gcProbability) {
343 1
echo "GC!\n";
344 1
            gc_collect_cycles();
345
        }
346
347 4
        if ($this->memoryLimit > 0 && !$this->forceStop && memory_get_usage(true) >= ($this->memoryLimit * 1024 * 1024) ) {
348
            $this->forceStop("Memory limit of {$this->memoryLimit} MB reached while consuming messages");
349
        }
350
351 4
        if ($this->forceStop) {
352
            throw new ForcedStopException($this->forceStopReason);
353
        }
354 4
    }
355
}
356