Completed
Push — master ( 490944...72b95b )
by Gaetano
09:38
created

Consumer   A

Complexity

Total Complexity 24

Size/Duplication

Total Lines 192
Duplicated Lines 0 %

Coupling/Cohesion

Components 1
Dependencies 5

Test Coverage

Coverage 18.18%

Importance

Changes 0
Metric Value
wmc 24
lcom 1
cbo 5
dl 0
loc 192
ccs 12
cts 66
cp 0.1818
rs 10
c 0
b 0
f 0

10 Methods

Rating   Name   Duplication   Size   Complexity  
A __construct() 0 4 1
A setMemoryLimit() 0 4 1
A setRoutingKey() 0 6 1
A setCallback() 0 9 2
A setSequenceNumberStore() 0 6 1
A setRequestBatchSize() 0 6 1
A setDefaultShardIteratorType() 0 6 1
A setQueueName() 0 6 1
C consume() 0 52 12
A getInitialMessageIterator() 0 21 3
1
<?php
2
3
namespace Kaliop\Queueing\Plugins\KinesisBundle\Adapter\Kinesis;
4
5
use Kaliop\QueueingBundle\Queue\MessageConsumerInterface;
6
use Kaliop\QueueingBundle\Queue\ConsumerInterface;
7
use Kaliop\Queueing\Plugins\KinesisBundle\Service\SequenceNumberStoreInterface;
8
use \Aws\Kinesis\KinesisClient;
9
10
class Consumer implements ConsumerInterface
11
{
12
    /** @var  \Aws\Kinesis\KinesisClient */
13
    protected $client;
14
    protected $shardId;
15
    protected $streamName;
16
    protected $callback;
17
    /** @var  \Kaliop\Queueing\Plugins\KinesisBundle\Service\SequenceNumberStoreInterface */
18
    protected $sequenceNumberStore;
19
    // allowed values: TRIM_HORIZON and LATEST
20
    protected $defaultShardIteratorType = 'TRIM_HORIZON';
21
    protected $requestBatchSize = 1;
22
23 1
    public function __construct(array $config)
24
    {
25 1
        $this->client = new KinesisClient($config);
26 1
    }
27
28
    /**
29
     * Does nothing
30
     * @param int $limit
31
     * @return Consumer
32
     */
33
    public function setMemoryLimit($limit)
34
    {
35
        return $this;
36
    }
37
38
    /**
39
     * @param string $key
40
     * @return Consumer
41
     * @todo if null and there is only 1 shard in the stream -> get it! Or allow asking for shard 1, 2, 3, ... instead of using the Id
42
     */
43
    public function setRoutingKey($key)
44
    {
45
        $this->shardId = $key;
46
47
        return $this;
48
    }
49
50
    /**
51
     * @param MessageConsumerInterface $callback
52
     * @return Consumer
53
     */
54
    public function setCallback($callback)
55
    {
56
        if (! $callback instanceof \Kaliop\QueueingBundle\Queue\MessageConsumerInterface) {
57
            throw new \RuntimeException('Can not set callback to SQS Consumer, as it is not a MessageConsumerInterface');
58
        }
59
        $this->callback = $callback;
60
61
        return $this;
62
    }
63
64
    /**
65
     * @param SequenceNumberStoreInterface $store
66
     * @return Consumer
67
     */
68 1
    public function setSequenceNumberStore(SequenceNumberStoreInterface $store)
69
    {
70 1
        $this->sequenceNumberStore = $store;
71
72 1
        return $this;
73
    }
74
75
    /**
76
     * The number of messages to download in every request to the Kinesis API.
77
     * Bigger numbers are better for performances, but there is a limit on the size of the response which Kinesis will send.
78
     * @param int $amount
79
     * @return Consumer
80
     */
81 1
    public function setRequestBatchSize($amount)
82
    {
83 1
        $this->requestBatchSize = $amount;
84
85 1
        return $this;
86
    }
87
88
    /**
89
     * Use this to decide what happens when the Consumer starts getting messages from a shard, and it does not
90
     * have stored a pointer to the last consumed message.
91
     *
92
     * @param string $type either LATEST (discard messages already in the shard) or TRIM_HORIZON (get all messages in the shard)
93
     * @return Consumer
94
     */
95 1
    public function setDefaultShardIteratorType($type)
96
    {
97 1
        $this->defaultShardIteratorType = $type;
98
99 1
        return $this;
100
    }
101
102
    /**
103
     * @see http://docs.aws.amazon.com/aws-sdk-php/v2/api/class-Aws.Kinesis.KinesisClient.html#_getRecords
104
     * Will throw an exception if $amount is > 10.000
105
     *
106
     * @param int $amount
107
     * @param int $timeout
108
     * @return nothing
109
     *
110
     * @todo de-hardcode the usage of json encoding
111
     */
112
    public function consume($amount, $timeout=0)
113
    {
114
        $iterator = $this->getInitialMessageIterator();
115
116
        $limit = ($amount > 0) ? $amount : $this->requestBatchSize;
117
        if ($timeout > 0) {
118
            $startTime = time();
119
            $remaining = $timeout;
0 ignored issues
show
Unused Code introduced by
$remaining is not used, you could remove the assignment.

This check looks for variable assignements that are either overwritten by other assignments or where the variable is not used subsequently.

$myVar = 'Value';
$higher = false;

if (rand(1, 6) > 3) {
    $higher = true;
} else {
    $higher = false;
}

Both the $myVar assignment in line 1 and the $higher assignment in line 2 are dead. The first because $myVar is never used and the second because $higher is always overwritten for every possible time line.

Loading history...
120
        }
121
122
        while(true) {
123
            $reqTime = microtime(true);
124
            $result = $this->client->getRecords(array(
125
                'ShardIterator' => $iterator,
126
                'Limit' => $limit,
127
            ));
128
129
            $records = $result->get('Records');
130
131
            if (count($records) && $this->sequenceNumberStore) {
132
                $last = end($records);
133
                $this->sequenceNumberStore->save($this->streamName, $this->shardId, $last['SequenceNumber']);
134
            }
135
136
            foreach($records as $record) {
137
                $data = $record['Data'];
138
                unset($record['Data']);
139
                $this->callback->receive(new Message($data, $record, 'application/json', $this->streamName));
140
            }
141
142
            if ($amount > 0) {
143
                return;
144
            }
145
146
            if ($timeout > 0 && ($remaining = ($startTime + $timeout - time())) <= 0) {
0 ignored issues
show
Bug introduced by
The variable $startTime does not seem to be defined for all execution paths leading up to this point.

If you define a variable conditionally, it can happen that it is not defined for all execution paths.

Let’s take a look at an example:

function myFunction($a) {
    switch ($a) {
        case 'foo':
            $x = 1;
            break;

        case 'bar':
            $x = 2;
            break;
    }

    // $x is potentially undefined here.
    echo $x;
}

In the above example, the variable $x is defined if you pass “foo” or “bar” as argument for $a. However, since the switch statement has no default case statement, if you pass any other value, the variable $x would be undefined.

Available Fixes

  1. Check for existence of the variable explicitly:

    function myFunction($a) {
        switch ($a) {
            case 'foo':
                $x = 1;
                break;
    
            case 'bar':
                $x = 2;
                break;
        }
    
        if (isset($x)) { // Make sure it's always set.
            echo $x;
        }
    }
    
  2. Define a default value for the variable:

    function myFunction($a) {
        $x = ''; // Set a default which gets overridden for certain paths.
        switch ($a) {
            case 'foo':
                $x = 1;
                break;
    
            case 'bar':
                $x = 2;
                break;
        }
    
        echo $x;
    }
    
  3. Add a value for the missing path:

    function myFunction($a) {
        switch ($a) {
            case 'foo':
                $x = 1;
                break;
    
            case 'bar':
                $x = 2;
                break;
    
            // We add support for the missing case.
            default:
                $x = '';
                break;
        }
    
        echo $x;
    }
    
Loading history...
147
                return;
148
            }
149
150
            $iterator = $result->get('NextShardIterator');
151
            if ($iterator == null) {
152
                // shard is closed
153
                return;
154
            }
155
156
            /// @todo use a parameter to decide the polling interval
157
            // observe MAX 5 requests per sec per shard: sleep for 0.2 secs in between requests
158
            $passedMs = (microtime(true) - $reqTime) * 1000000;
159
            if ($passedMs < 200000) {
160
                usleep(200000 - $passedMs);
161
            }
162
        }
163
    }
164
165
    /**
166
     * Builds an iterator to start getting messages from the shard based on both injected config and the fact that
167
     * the store has a value for the last Sequence Number previously read
168
     */
169
    protected function getInitialMessageIterator()
170
    {
171
        $start = null;
172
        if ($this->sequenceNumberStore) {
173
            $start = $this->sequenceNumberStore->fetch($this->streamName, $this->shardId);
174
        }
175
176
        $iteratorOptions = array(
177
            'StreamName' => $this->streamName,
178
            'ShardId' => $this->shardId
179
        );
180
181
        if ($start == null) {
0 ignored issues
show
Bug introduced by
It seems like you are loosely comparing $start of type string|null against null; this is ambiguous if the string can be empty. Consider using a strict comparison === instead.
Loading history...
182
            $iteratorOptions['ShardIteratorType'] = $this->defaultShardIteratorType;
183
        } else {
184
            $iteratorOptions['ShardIteratorType'] = 'AFTER_SEQUENCE_NUMBER';
185
            $iteratorOptions['StartingSequenceNumber'] = $start;
186
        }
187
188
        return $this->client->getShardIterator($iteratorOptions)->get('ShardIterator');
189
    }
190
191
    /**
192
     * @param string $queueName
193
     * @return Consumer
194
     */
195
    public function setQueueName($queueName)
196
    {
197
        $this->streamName = $queueName;
198
199
        return $this;
200
    }
201
}