Completed
Push — master ( f66822...7bafbd )
by Gaetano
03:48
created

Consumer::consume()   C

Complexity

Conditions 12
Paths 84

Size

Total Lines 51
Code Lines 29

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 0
CRAP Score 156

Importance

Changes 6
Bugs 1 Features 2
Metric Value
c 6
b 1
f 2
dl 0
loc 51
ccs 0
cts 36
cp 0
rs 5.6668
cc 12
eloc 29
nc 84
nop 2
crap 156

How to fix   Long Method    Complexity   

Long Method

Small methods make your code easier to understand, in particular if combined with a good name. Besides, if your method is small, finding a good name is usually much easier.

For example, if you find yourself adding comments to a method's body, this is usually a good sign to extract the commented part to a new method, and use the comment as a starting point when coming up with a good name for this new method.

Commonly applied refactorings include:

1
<?php
2
3
namespace Kaliop\Queueing\Plugins\KinesisBundle\Adapter\Kinesis;
4
5
use Kaliop\QueueingBundle\Queue\MessageConsumerInterface;
6
use Kaliop\QueueingBundle\Queue\ConsumerInterface;
7
use Kaliop\Queueing\Plugins\KinesisBundle\Service\SequenceNumberStoreInterface;
8
use \Aws\Kinesis\KinesisClient;
9
10
class Consumer implements ConsumerInterface
11
{
12
    /** @var  \Aws\Kinesis\KinesisClient */
13
    protected $client;
14
    protected $shardId;
15
    protected $streamName;
16
    protected $callback;
17
    /** @var  \Kaliop\Queueing\Plugins\KinesisBundle\Service\SequenceNumberStoreInterface */
18
    protected $sequenceNumberStore;
19
    // allowed values: TRIM_HORIZON and LATEST
20
    protected $defaultShardIteratorType = 'TRIM_HORIZON';
21
    protected $requestBatchSize = 1;
22
23 1
    public function __construct(array $config)
24
    {
25 1
        $this->client = new KinesisClient($config);
26 1
    }
27
28
    /**
29
     * Does nothing
30
     * @param int $limit
31
     * @return Consumer
32
     */
33
    public function setMemoryLimit($limit)
34
    {
35
        return $this;
36
    }
37
38
    /**
39
     * @param string $key
40
     * @return Consumer
41
     * @todo if null and there is only 1 shard in the stream -> get it! Or allow asking for shard 1, 2, 3, ... instead of using the Id
42
     */
43
    public function setRoutingKey($key)
44
    {
45
        $this->shardId = $key;
46
47
        return $this;
48
    }
49
50
    /**
51
     * @param MessageConsumerInterface $callback
52
     * @return Consumer
53
     */
54
    public function setCallback($callback)
55
    {
56
        if (! $callback instanceof \Kaliop\QueueingBundle\Queue\MessageConsumerInterface) {
57
            throw new \RuntimeException('Can not set callback to SQS Consumer, as it is not a MessageConsumerInterface');
58
        }
59
        $this->callback = $callback;
60
61
        return $this;
62
    }
63
64
    /**
65
     * @param SequenceNumberStoreInterface $store
66
     * @return Consumer
67
     */
68 1
    public function setSequenceNumberStore(SequenceNumberStoreInterface $store)
69
    {
70 1
        $this->sequenceNumberStore = $store;
71
72 1
        return $this;
73
    }
74
75
    /**
76
     * The number of messages to download in every request to the Kinesis API.
77
     * Bigger numbers are better for performances, but there is a limit on the size of the response which Kinesis will send.
78
     * @param int $amount
79
     * @return Consumer
80
     */
81 1
    public function setRequestBatchSize($amount)
82
    {
83 1
        $this->requestBatchSize = $amount;
84
85 1
        return $this;
86
    }
87
88
    /**
89
     * Use this to decide what happens when the Consumer starts getting messages from a shard, and it does not
90
     * have stored a pointer to the last consumed message.
91
     *
92
     * @param string $type either LATEST (discard messages already in the shard) or TRIM_HORIZON (get all messages in the shard)
93
     * @return Consumer
94
     */
95 1
    public function setDefaultShardIteratorType($type)
96
    {
97 1
        $this->defaultShardIteratorType = $type;
98
99 1
        return $this;
100
    }
101
102
    /**
103
     * @see http://docs.aws.amazon.com/aws-sdk-php/v2/api/class-Aws.Kinesis.KinesisClient.html#_getRecords
104
     * Will throw an exception if $amount is > 10.000
105
     *
106
     * @param int $amount
107
     * @param int $timeout
108
     * @return nothing
109
     */
110
    public function consume($amount, $timeout=0)
111
    {
112
        $iterator = $this->getInitialMessageIterator();
113
114
        $limit = ($amount > 0) ? $amount : $this->requestBatchSize;
115
        if ($timeout > 0) {
116
            $startTime = time();
117
            $remaining = $timeout;
0 ignored issues
show
Unused Code introduced by
$remaining is not used, you could remove the assignment.

This check looks for variable assignements that are either overwritten by other assignments or where the variable is not used subsequently.

$myVar = 'Value';
$higher = false;

if (rand(1, 6) > 3) {
    $higher = true;
} else {
    $higher = false;
}

Both the $myVar assignment in line 1 and the $higher assignment in line 2 are dead. The first because $myVar is never used and the second because $higher is always overwritten for every possible time line.

Loading history...
118
        }
119
120
        while(true) {
121
            $reqTime = microtime(true);
122
            $result = $this->client->getRecords(array(
123
                'ShardIterator' => $iterator,
124
                'Limit' => $limit,
125
            ));
126
127
            $records = $result->get('Records');
128
129
            if (count($records) && $this->sequenceNumberStore) {
130
                $last = end($records);
131
                $this->sequenceNumberStore->save($this->streamName, $this->shardId, $last['SequenceNumber']);
132
            }
133
134
            foreach($records as $record) {
135
                $data = $record['Data'];
136
                unset($record['Data']);
137
                $this->callback->receive(new Message($data, $record));
138
            }
139
140
            if ($amount > 0) {
141
                return;
142
            }
143
144
            if ($timeout > 0 && ($remaining = ($startTime + $timeout - time())) <= 0) {
0 ignored issues
show
Bug introduced by
The variable $startTime does not seem to be defined for all execution paths leading up to this point.

If you define a variable conditionally, it can happen that it is not defined for all execution paths.

Let’s take a look at an example:

function myFunction($a) {
    switch ($a) {
        case 'foo':
            $x = 1;
            break;

        case 'bar':
            $x = 2;
            break;
    }

    // $x is potentially undefined here.
    echo $x;
}

In the above example, the variable $x is defined if you pass “foo” or “bar” as argument for $a. However, since the switch statement has no default case statement, if you pass any other value, the variable $x would be undefined.

Available Fixes

  1. Check for existence of the variable explicitly:

    function myFunction($a) {
        switch ($a) {
            case 'foo':
                $x = 1;
                break;
    
            case 'bar':
                $x = 2;
                break;
        }
    
        if (isset($x)) { // Make sure it's always set.
            echo $x;
        }
    }
    
  2. Define a default value for the variable:

    function myFunction($a) {
        $x = ''; // Set a default which gets overridden for certain paths.
        switch ($a) {
            case 'foo':
                $x = 1;
                break;
    
            case 'bar':
                $x = 2;
                break;
        }
    
        echo $x;
    }
    
  3. Add a value for the missing path:

    function myFunction($a) {
        switch ($a) {
            case 'foo':
                $x = 1;
                break;
    
            case 'bar':
                $x = 2;
                break;
    
            // We add support for the missing case.
            default:
                $x = '';
                break;
        }
    
        echo $x;
    }
    
Loading history...
145
                return;
146
            }
147
148
            $iterator = $result->get('NextShardIterator');
149
            if ($iterator == null) {
150
                // shard is closed
151
                return;
152
            }
153
154
            // observe MAX 5 requests per sec per shard: sleep for 0.2 secs in between requests
155
            $passedMs = (microtime(true) - $reqTime) * 1000000;
156
            if ($passedMs < 200000) {
157
                usleep(200000 - $passedMs);
158
            }
159
        }
160
    }
161
162
    /**
163
     * Builds an iterator to start getting messages from the shard based on both injected config and the fact that
164
     * the store has a value for the last Sequence Number previously read
165
     */
166
    protected function getInitialMessageIterator()
167
    {
168
        $start = null;
169
        if ($this->sequenceNumberStore) {
170
            $start = $this->sequenceNumberStore->fetch($this->streamName, $this->shardId);
171
        }
172
173
        $iteratorOptions = array(
174
            'StreamName' => $this->streamName,
175
            'ShardId' => $this->shardId
176
        );
177
178
        if ($start == null) {
0 ignored issues
show
Bug introduced by
It seems like you are loosely comparing $start of type string|null against null; this is ambiguous if the string can be empty. Consider using a strict comparison === instead.
Loading history...
179
            $iteratorOptions['ShardIteratorType'] = $this->defaultShardIteratorType;
180
        } else {
181
            $iteratorOptions['ShardIteratorType'] = 'AFTER_SEQUENCE_NUMBER';
182
            $iteratorOptions['StartingSequenceNumber'] = $start;
183
        }
184
185
        return $this->client->getShardIterator($iteratorOptions)->get('ShardIterator');
186
    }
187
188
    /**
189
     * @param string $queueName
190
     * @return Consumer
191
     */
192
    public function setQueueName($queueName)
193
    {
194
        $this->streamName = $queueName;
195
196
        return $this;
197
    }
198
}