Completed
Push — master ( 4c6e24...29c278 )
by Gaetano
06:28
created

Consumer::setRoutingKey()   A

Complexity

Conditions 1
Paths 1

Size

Total Lines 6

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 4
CRAP Score 1

Importance

Changes 0
Metric Value
dl 0
loc 6
ccs 4
cts 4
cp 1
rs 10
c 0
b 0
f 0
cc 1
nc 1
nop 1
crap 1
1
<?php
2
3
namespace Kaliop\Queueing\Plugins\SQSBundle\Adapter\SQS;
4
5
use Kaliop\QueueingBundle\Queue\MessageConsumerInterface;
6
use Kaliop\QueueingBundle\Queue\ConsumerInterface;
7
use Kaliop\QueueingBundle\Queue\SignalHandlingConsumerInterface;
8
use Kaliop\QueueingBundle\Adapter\ForcedStopException;
9
use Aws\Sqs\SqsClient;
10
use Aws\TraceMiddleware;
11
use Psr\Log\LoggerInterface;
12
13
/**
14
 * @todo support using short polling even when given a total timeout - even though it will complicate consume() even more than it already is
15
 */
16
class Consumer implements ConsumerInterface, SignalHandlingConsumerInterface
17
{
18
    /** @var  \Aws\Sqs\SqsClient */
19
    protected $client;
20
    protected $queueUrl;
21
    protected $queueName;
22
    protected $callback;
23
24
    protected $routingKey;
25
    protected $routingKeyRegexp;
26
    protected $logger;
27
    // The message attribute used to store content-type. To be kept in sync with the Producer
28
    protected $contentTypeAttribute = 'contentType';
29
    protected $routingAttribute = 'routingKey';
30
    protected $debug = false;
31
    protected $forceStop = false;
32
    protected $forceStopReason;
33
    protected $dispatchSignals = false;
34
    protected $memoryLimit = null;
35
    // NB: when changing the defaults below, alter as well KaliopQueueingPluginsSQSExtension::load
36
    /** @var int $requestBatchSize how many messages to receive in each poll by default */
37
    protected $requestBatchSize = 1;
38
    /** @var int $requestTimeout how long to wait for messages in each request. Switches between long and short polling */
39
    protected $requestTimeout = 0;
40
    /** @var int the minimum interval between two queue polls - in microseconds */
41
    protected $pollingIntervalUs = 200000;
42
    /** @var int $gcProbability the probability of calling gc_collect_cycles at the end of every poll */
43
    protected $gcProbability = 1;
44
45
    const MAX_MESSAGES_PER_REQUEST = 10;
46
    const MAX_REQUEST_TIMEOUT = 20;
47
48 9
    public function __construct(array $config)
49
    {
50 9
        $this->client = new SqsClient($config);
51 9
    }
52
53
    public function setLogger(LoggerInterface $logger = null)
54
    {
55
        $this->logger = $logger;
56
57
        return $this;
58
    }
59
60
    /**
61
     * Enabled debug. At the moment can not disable it
62
     * @param bool|array $debug
63
     * @return $this
64
     *
65
     * @todo test if using $handlerList->removeByInstance we can disable debug as well
66
     */
67 4 View Code Duplication
    public function setDebug($debug) {
0 ignored issues
show
Duplication introduced by
This method seems to be duplicated in your project.

Duplicated code is one of the most pungent code smells. If you need to duplicate the same code in three or more different places, we strongly encourage you to look into extracting the code into a single class or operation.

You can also find more detailed suggestions in the “Code” section of your repository.

Loading history...
68 4
        if ($debug == $this->debug) {
69 4
            return $this;
70
        }
71
        if ($debug) {
72
            $handlerList = $this->client->getHandlerList();
73
            $handlerList->interpose(new TraceMiddleware($debug === true ? [] : $debug));
0 ignored issues
show
Bug introduced by
It seems like $debug === true ? array() : $debug can also be of type boolean; however, Aws\TraceMiddleware::__construct() does only seem to accept array, maybe add an additional type check?

If a method or function can return multiple different values and unless you are sure that you only can receive a single value in this context, we recommend to add an additional type check:

/**
 * @return array|string
 */
function returnsDifferentValues($x) {
    if ($x) {
        return 'foo';
    }

    return array();
}

$x = returnsDifferentValues($y);
if (is_array($x)) {
    // $x is an array.
}

If this a common case that PHP Analyzer should handle natively, please let us know by opening an issue.

Loading history...
74
        }
75
76
        return $this;
77
    }
78
79
    /**
80
     * @param int $limit MB
81
     * @return Consumer
82
     */
83
    public function setMemoryLimit($limit)
84
    {
85
        $this->memoryLimit = $limit;
86
87
        return $this;
88
    }
89
90
    /**
91
     * @param string $key
92
     * @return Consumer
93
     */
94 9
    public function setRoutingKey($key)
95
    {
96 9
        $this->routingKey = (string)$key;
97 9
        $this->routingKeyRegexp = '/'.str_replace(array('\*', '#'), array('[^.]*', '.*'), preg_quote($this->routingKey, '/')).'/';
98 9
        return $this;
99
    }
100
101
    /**
102
     * @param MessageConsumerInterface $callback
103
     * @return Consumer
104
     */
105 4
    public function setCallback($callback)
106
    {
107 4
        if (! $callback instanceof \Kaliop\QueueingBundle\Queue\MessageConsumerInterface) {
108
            throw new \RuntimeException('Can not set callback to SQS Consumer, as it is not a MessageConsumerInterface');
109
        }
110 4
        $this->callback = $callback;
111
112 4
        return $this;
113
    }
114
115 9
    public function setQueueName($queueName)
116
    {
117 9
        $this->queueName = $queueName;
118
119 9
        return $this;
120
    }
121
122
    /**
123
     * The number of messages to download in every request to the SQS API.
124
     * Bigger numbers are better for performances, but there is a limit on the size of the response which SQS will send.
125
     * @param int $amount
126
     * @return Consumer
127
     */
128
    public function setRequestBatchSize($amount)
129
    {
130
        $this->requestBatchSize = $amount;
131
132
        return $this;
133
    }
134
135
    public function setRequestTimeout($timeout)
136
    {
137
        $this->requestTimeout = $timeout;
138
139
        return $this;
140
    }
141
142
    public function setPollingInterval($intervalUs)
143
    {
144
        $this->pollingIntervalUs = $intervalUs;
145
146
        return $this;
147
    }
148
149
    public function setGCProbability($probability)
150
    {
151
        $this->gcProbability = $probability;
152
153
        return $this;
154
    }
155
156
    /**
157
     * @see http://docs.aws.amazon.com/aws-sdk-php/v3/api/api-sqs-2012-11-05.html#receivemessage
158
     *
159
     * @param int $amount 0 for unlimited
160
     * @param int $timeout seconds 0 for unlimited. NB: any value > 0 activates 'long polling' mode
161
     * @return void
162
     */
163 4
    public function consume($amount, $timeout = 0)
164
    {
165 4
        if ($timeout > 0) {
166 4
            $endTime = time() + $timeout;
167 4
            $remainingTime = $timeout;
168
        }
169
170 4
        $received = 0;
171
172
        $receiveParams = array(
173 4
            'QueueUrl' => $this->queueUrl,
174
            'AttributeNames' => array('All'),
175
            'MessageAttributeNames' => array('All')
176
        );
177
178 4
        while(true) {
179 4
            $reqTime = microtime(true);
180
181 4
            if ($timeout > 0) {
182 4
                $wait = $remainingTime;
0 ignored issues
show
Bug introduced by
The variable $remainingTime does not seem to be defined for all execution paths leading up to this point.

If you define a variable conditionally, it can happen that it is not defined for all execution paths.

Let’s take a look at an example:

function myFunction($a) {
    switch ($a) {
        case 'foo':
            $x = 1;
            break;

        case 'bar':
            $x = 2;
            break;
    }

    // $x is potentially undefined here.
    echo $x;
}

In the above example, the variable $x is defined if you pass “foo” or “bar” as argument for $a. However, since the switch statement has no default case statement, if you pass any other value, the variable $x would be undefined.

Available Fixes

  1. Check for existence of the variable explicitly:

    function myFunction($a) {
        switch ($a) {
            case 'foo':
                $x = 1;
                break;
    
            case 'bar':
                $x = 2;
                break;
        }
    
        if (isset($x)) { // Make sure it's always set.
            echo $x;
        }
    }
    
  2. Define a default value for the variable:

    function myFunction($a) {
        $x = ''; // Set a default which gets overridden for certain paths.
        switch ($a) {
            case 'foo':
                $x = 1;
                break;
    
            case 'bar':
                $x = 2;
                break;
        }
    
        echo $x;
    }
    
  3. Add a value for the missing path:

    function myFunction($a) {
        switch ($a) {
            case 'foo':
                $x = 1;
                break;
    
            case 'bar':
                $x = 2;
                break;
    
            // We add support for the missing case.
            default:
                $x = '';
                break;
        }
    
        echo $x;
    }
    
Loading history...
183
184 4
                if ($wait > $this->requestTimeout) {
185 4
                    $wait = $this->requestTimeout;
186
                }
187
            } else {
188
                $wait = $this->requestTimeout;
189
            }
190
191
            // we leave it up to the API to fail
192
            //if ($wait > static::MAX_REQUEST_TIMEOUT) {
193
            //    $wait = static::MAX_REQUEST_TIMEOUT;
194
            //}
195
196 4
            if ($wait > 0) {
197
                // according to the spec, this is maximum wait time. If messages are available sooner, they get delivered immediately
198
                $receiveParams['WaitTimeSeconds'] = $wait;
199
            } else {
200 4
                if (isset($receiveParams['WaitTimeSeconds'])) {
201
                    unset($receiveParams['WaitTimeSeconds']);
202
                }
203
            }
204
205 4
            if ($amount > 0) {
206 4
                $limit = $amount - $received;
207
208 4
                if ($limit > $this->requestBatchSize) {
209 4
                    $limit = $this->requestBatchSize;
210
                }
211
            } else {
212
                $limit = $this->requestBatchSize;
213
            }
214
215
            // we leave it up to the API to fial
216
            //if ($limit > static::MAX_MESSAGES_PER_REQUEST) {
217
            //    $limit = static::MAX_MESSAGES_PER_REQUEST;
218
            //}
219
220 4
            $receiveParams['MaxNumberOfMessages'] = $limit;
221
222 4
            $result = $this->client->receiveMessage($receiveParams);
223 4
            $messages = $result->get('Messages');
224
225 4
            if (is_array($messages)) {
226 4
                foreach($messages as $message) {
227
228
                    // How we implement routing keys with SQS: since it is not supported natively, we check if the route
229
                    // matches after having downloaded the message. If it does not match, we just skip processing it.
230
                    // Since we will not call deleteMessage, SQS will requeue the message in a short time.
231
                    // This is far from optimal, but it might be better than nothing
232 4
                    if (! $this->matchRoutingKey($message)) {
233
                        continue;
234
                    }
235
236 4
                    $received++;
237
238
                    // removing the message from the queue is manual with SQS
239 4
                    $this->client->deleteMessage(array(
240 4
                        'QueueUrl' => $this->queueUrl,
241 4
                        'ReceiptHandle' => $message['ReceiptHandle']
242
                    ));
243
244 4
                    $data = $message['Body'];
245 4
                    unset($message['Body']);
246
247 4
                    $contentType = isset( $message['MessageAttributes'][$this->contentTypeAttribute]['StringValue'] ) ?
248 4
                        $message['MessageAttributes'][$this->contentTypeAttribute]['StringValue'] : '';
249
250 4
                    if ($contentType != '') {
251 4
                        $this->callback->receive(new Message($data, $message, $contentType, $this->queueName));
252
                    } else {
253
                        if ($this->logger) {
254
                            $this->logger->warning('The SQS Consumer received a message with no content-type attribute. Assuming default');
255
                        }
256
257 4
                        $this->callback->receive(new Message($data, $message, null, $this->queueName));
258
                    }
259
                }
260
            }
261
262 4
            $this->maybeStopConsumer();
263
264 4
            if ($amount > 0 && $received >= $amount) {
265 4
                return;
266
            }
267
268 1
            if ($timeout > 0 && ($remainingTime = ($endTime - time())) <= 0) {
0 ignored issues
show
Bug introduced by
The variable $endTime does not seem to be defined for all execution paths leading up to this point.

If you define a variable conditionally, it can happen that it is not defined for all execution paths.

Let’s take a look at an example:

function myFunction($a) {
    switch ($a) {
        case 'foo':
            $x = 1;
            break;

        case 'bar':
            $x = 2;
            break;
    }

    // $x is potentially undefined here.
    echo $x;
}

In the above example, the variable $x is defined if you pass “foo” or “bar” as argument for $a. However, since the switch statement has no default case statement, if you pass any other value, the variable $x would be undefined.

Available Fixes

  1. Check for existence of the variable explicitly:

    function myFunction($a) {
        switch ($a) {
            case 'foo':
                $x = 1;
                break;
    
            case 'bar':
                $x = 2;
                break;
        }
    
        if (isset($x)) { // Make sure it's always set.
            echo $x;
        }
    }
    
  2. Define a default value for the variable:

    function myFunction($a) {
        $x = ''; // Set a default which gets overridden for certain paths.
        switch ($a) {
            case 'foo':
                $x = 1;
                break;
    
            case 'bar':
                $x = 2;
                break;
        }
    
        echo $x;
    }
    
  3. Add a value for the missing path:

    function myFunction($a) {
        switch ($a) {
            case 'foo':
                $x = 1;
                break;
    
            case 'bar':
                $x = 2;
                break;
    
            // We add support for the missing case.
            default:
                $x = '';
                break;
        }
    
        echo $x;
    }
    
Loading history...
269
                return;
270
            }
271
272
            // observe MAX 5 requests per sec per queue by default: sleep for 0.2 secs in between requests
273 1
            $passedUs = (microtime(true) - $reqTime) * 1000000;
274 1
            if ($passedUs < $this->pollingIntervalUs) {
275 1
                usleep($this->pollingIntervalUs - $passedUs);
276
            }
277
        }
278
    }
279
280
    /**
281
     * Adopt the RabbitMQ routing key algorithm:
282
     * - split on dots
283
     * - * matches one word (q: also empty ones?)
284
     * - # matches any words
285
     *
286
     * @todo the current implementation is naive and does probably not match RabbitMq if the routing key is something like aaa.*b.ccc
287
     *       A better implementation would probably involve usage of a trie
288
     *       Some pointers on how to implement it fast: http://lists.rabbitmq.com/pipermail/rabbitmq-discuss/2011-June/013564.html
289
     * @see setRoutingKey
290
     *
291
     * @param array $message
292
     * @return bool
293
     */
294 4
    protected function matchRoutingKey(array $message)
295
    {
296 4
        if ($this->routingKey === null || $this->routingKey === '') {
297 1
            return true;
298
        }
299 3
        if (!isset($message['MessageAttributes'][$this->routingAttribute]['StringValue'])) {
300
            if ($this->logger) {
301
                $this->logger->warning('The SQS Consumer has a routing key set, and it received a message without routing information. Processing it anyway');
302
            }
303
            return true;
304
        }
305
306 3
        return preg_match(
307 3
            $this->routingKeyRegexp,
308 3
            $message['MessageAttributes'][$this->routingAttribute]['StringValue']
309
        );
310
    }
311
312
    /**
313
     * @param string $queueUrl the complete queue name as used by SQS
314
     * @return Consumer
315
     */
316 9
    public function setQueueUrl($queueUrl)
317
    {
318 9
        $this->queueUrl = $queueUrl;
319
320 9
        return $this;
321
    }
322
323
    /**
324
     * @return string the complete queue name as used by SQS
325
     */
326
    public function getQueueUrl()
327
    {
328
        return $this->queueUrl;
329
    }
330
331
    public function setHandleSignals($doHandle)
332
    {
333
        $this->dispatchSignals = $doHandle;
334
    }
335
336
337
    public function forceStop($reason = '')
338
    {
339
        $this->forceStop = true;
340
        $this->forceStopReason = $reason;
341
    }
342
343
    /**
344
     * Dispatches signals and throws an exception if user wants to stop. To be called at execution points when there is no data loss
345
     *
346
     * @throws ForcedStopException
347
     */
348 4
    protected function maybeStopConsumer()
349
    {
350 4
        if ($this->dispatchSignals) {
351
            pcntl_signal_dispatch();
352
        }
353
354 4
        if ($this->gcProbability > 0 && rand(1, 100) <= $this->gcProbability) {
355
            gc_collect_cycles();
356
        }
357
358 4
        if ($this->memoryLimit > 0 && !$this->forceStop && memory_get_usage(true) >= ($this->memoryLimit * 1024 * 1024) ) {
359
            $this->forceStop("Memory limit of {$this->memoryLimit} MB reached while consuming messages");
360
        }
361
362 4
        if ($this->forceStop) {
363
            throw new ForcedStopException($this->forceStopReason);
364
        }
365 4
    }
366
}
367