1
|
|
|
<?php |
2
|
|
|
|
3
|
|
|
namespace Kaliop\Queueing\Plugins\KinesisBundle\Adapter\Kinesis; |
4
|
|
|
|
5
|
|
|
use Kaliop\QueueingBundle\Queue\MessageConsumerInterface; |
6
|
|
|
use Kaliop\QueueingBundle\Queue\ConsumerInterface; |
7
|
|
|
use Kaliop\Queueing\Plugins\KinesisBundle\Service\SequenceNumberStoreInterface; |
8
|
|
|
use \Aws\Kinesis\KinesisClient; |
9
|
|
|
|
10
|
|
|
class Consumer implements ConsumerInterface |
11
|
|
|
{ |
12
|
|
|
/** @var \Aws\Kinesis\KinesisClient */ |
13
|
|
|
protected $client; |
14
|
|
|
protected $shardId; |
15
|
|
|
protected $streamName; |
16
|
|
|
protected $callback; |
17
|
|
|
/** @var \Kaliop\Queueing\Plugins\KinesisBundle\Service\SequenceNumberStoreInterface */ |
18
|
|
|
protected $sequenceNumberStore; |
19
|
|
|
// allowed values: TRIM_HORIZON and LATEST |
20
|
|
|
protected $defaultShardIteratorType = 'TRIM_HORIZON'; |
21
|
|
|
protected $requestBatchSize = 1; |
22
|
|
|
|
23
|
1 |
|
public function __construct(array $config) |
24
|
|
|
{ |
25
|
1 |
|
$this->client = new KinesisClient($config); |
26
|
1 |
|
} |
27
|
|
|
|
28
|
|
|
/** |
29
|
|
|
* Does nothing |
30
|
|
|
* @param int $limit |
31
|
|
|
* @return Consumer |
32
|
|
|
*/ |
33
|
|
|
public function setMemoryLimit($limit) |
34
|
|
|
{ |
35
|
|
|
return $this; |
36
|
|
|
} |
37
|
|
|
|
38
|
|
|
/** |
39
|
|
|
* @param string $key |
40
|
|
|
* @return Consumer |
41
|
|
|
* @todo if null and there is only 1 shard in the stream -> get it! Or allow asking for shard 1, 2, 3, ... instead of using the Id |
42
|
|
|
*/ |
43
|
|
|
public function setRoutingKey($key) |
44
|
|
|
{ |
45
|
|
|
$this->shardId = $key; |
46
|
|
|
|
47
|
|
|
return $this; |
48
|
|
|
} |
49
|
|
|
|
50
|
|
|
/** |
51
|
|
|
* @param MessageConsumerInterface $callback |
52
|
|
|
* @return Consumer |
53
|
|
|
*/ |
54
|
|
|
public function setCallback($callback) |
55
|
|
|
{ |
56
|
|
|
if (! $callback instanceof \Kaliop\QueueingBundle\Queue\MessageConsumerInterface) { |
57
|
|
|
throw new \RuntimeException('Can not set callback to SQS Consumer, as it is not a MessageConsumerInterface'); |
58
|
|
|
} |
59
|
|
|
$this->callback = $callback; |
60
|
|
|
|
61
|
|
|
return $this; |
62
|
|
|
} |
63
|
|
|
|
64
|
|
|
/** |
65
|
|
|
* @param SequenceNumberStoreInterface $store |
66
|
|
|
* @return Consumer |
67
|
|
|
*/ |
68
|
1 |
|
public function setSequenceNumberStore(SequenceNumberStoreInterface $store) |
69
|
|
|
{ |
70
|
1 |
|
$this->sequenceNumberStore = $store; |
71
|
|
|
|
72
|
1 |
|
return $this; |
73
|
|
|
} |
74
|
|
|
|
75
|
|
|
/** |
76
|
|
|
* The number of messages to download in every request to the Kinesis API. |
77
|
|
|
* Bigger numbers are better for performances, but there is a limit on the size of the response which Kinesis will send. |
78
|
|
|
* @param int $amount |
79
|
|
|
* @return Consumer |
80
|
|
|
*/ |
81
|
1 |
|
public function setRequestBatchSize($amount) |
82
|
|
|
{ |
83
|
1 |
|
$this->requestBatchSize = $amount; |
84
|
|
|
|
85
|
1 |
|
return $this; |
86
|
|
|
} |
87
|
|
|
|
88
|
|
|
/** |
89
|
|
|
* Use this to decide what happens when the Consumer starts getting messages from a shard, and it does not |
90
|
|
|
* have stored a pointer to the last consumed message. |
91
|
|
|
* |
92
|
|
|
* @param string $type either LATEST (discard messages already in the shard) or TRIM_HORIZON (get all messages in the shard) |
93
|
|
|
* @return Consumer |
94
|
|
|
*/ |
95
|
1 |
|
public function setDefaultShardIteratorType($type) |
96
|
|
|
{ |
97
|
1 |
|
$this->defaultShardIteratorType = $type; |
98
|
|
|
|
99
|
1 |
|
return $this; |
100
|
|
|
} |
101
|
|
|
|
102
|
|
|
/** |
103
|
|
|
* @see http://docs.aws.amazon.com/aws-sdk-php/v2/api/class-Aws.Kinesis.KinesisClient.html#_getRecords |
104
|
|
|
* Will throw an exception if $amount is > 10.000 |
105
|
|
|
* |
106
|
|
|
* @param int $amount |
107
|
|
|
* @param int $timeout |
108
|
|
|
* @return nothing |
109
|
|
|
*/ |
110
|
|
|
public function consume($amount, $timeout=0) |
111
|
|
|
{ |
112
|
|
|
$iterator = $this->getInitialMessageIterator(); |
113
|
|
|
|
114
|
|
|
$limit = ($amount > 0) ? $amount : $this->requestBatchSize; |
115
|
|
|
if ($timeout > 0) { |
116
|
|
|
$startTime = time(); |
117
|
|
|
$remaining = $timeout; |
|
|
|
|
118
|
|
|
} |
119
|
|
|
|
120
|
|
|
while(true) { |
121
|
|
|
$reqTime = microtime(true); |
122
|
|
|
$result = $this->client->getRecords(array( |
123
|
|
|
'ShardIterator' => $iterator, |
124
|
|
|
'Limit' => $limit, |
125
|
|
|
)); |
126
|
|
|
|
127
|
|
|
$records = $result->get('Records'); |
128
|
|
|
|
129
|
|
|
if (count($records) && $this->sequenceNumberStore) { |
130
|
|
|
$last = end($records); |
131
|
|
|
$this->sequenceNumberStore->save($this->streamName, $this->shardId, $last['SequenceNumber']); |
132
|
|
|
} |
133
|
|
|
|
134
|
|
|
foreach($records as $record) { |
135
|
|
|
$data = $record['Data']; |
136
|
|
|
unset($record['Data']); |
137
|
|
|
$this->callback->receive(new Message($data, $record)); |
138
|
|
|
} |
139
|
|
|
|
140
|
|
|
if ($amount > 0) { |
141
|
|
|
return; |
142
|
|
|
} |
143
|
|
|
|
144
|
|
|
if ($timeout > 0 && ($remaining = ($startTime + $timeout - time())) <= 0) { |
|
|
|
|
145
|
|
|
return; |
146
|
|
|
} |
147
|
|
|
|
148
|
|
|
$iterator = $result->get('NextShardIterator'); |
149
|
|
|
if ($iterator == null) { |
150
|
|
|
// shard is closed |
151
|
|
|
return; |
152
|
|
|
} |
153
|
|
|
|
154
|
|
|
// observe MAX 5 requests per sec per shard: sleep for 0.2 secs in between requests |
155
|
|
|
$passedMs = (microtime(true) - $reqTime) * 1000000; |
156
|
|
|
if ($passedMs < 200000) { |
157
|
|
|
usleep(200000 - $passedMs); |
158
|
|
|
} |
159
|
|
|
} |
160
|
|
|
} |
161
|
|
|
|
162
|
|
|
/** |
163
|
|
|
* Builds an iterator to start getting messages from the shard based on both injected config and the fact that |
164
|
|
|
* the store has a value for the last Sequence Number previously read |
165
|
|
|
*/ |
166
|
|
|
protected function getInitialMessageIterator() |
167
|
|
|
{ |
168
|
|
|
$start = null; |
169
|
|
|
if ($this->sequenceNumberStore) { |
170
|
|
|
$start = $this->sequenceNumberStore->fetch($this->streamName, $this->shardId); |
171
|
|
|
} |
172
|
|
|
|
173
|
|
|
$iteratorOptions = array( |
174
|
|
|
'StreamName' => $this->streamName, |
175
|
|
|
'ShardId' => $this->shardId |
176
|
|
|
); |
177
|
|
|
|
178
|
|
|
if ($start == null) { |
|
|
|
|
179
|
|
|
$iteratorOptions['ShardIteratorType'] = $this->defaultShardIteratorType; |
180
|
|
|
} else { |
181
|
|
|
$iteratorOptions['ShardIteratorType'] = 'AFTER_SEQUENCE_NUMBER'; |
182
|
|
|
$iteratorOptions['StartingSequenceNumber'] = $start; |
183
|
|
|
} |
184
|
|
|
|
185
|
|
|
return $this->client->getShardIterator($iteratorOptions)->get('ShardIterator'); |
186
|
|
|
} |
187
|
|
|
|
188
|
|
|
/** |
189
|
|
|
* @param string $queueName |
190
|
|
|
* @return Consumer |
191
|
|
|
*/ |
192
|
|
|
public function setQueueName($queueName) |
193
|
|
|
{ |
194
|
|
|
$this->streamName = $queueName; |
195
|
|
|
|
196
|
|
|
return $this; |
197
|
|
|
} |
198
|
|
|
} |
This check looks for variable assignements that are either overwritten by other assignments or where the variable is not used subsequently.
Both the
$myVar
assignment in line 1 and the$higher
assignment in line 2 are dead. The first because$myVar
is never used and the second because$higher
is always overwritten for every possible time line.