|
1
|
|
|
<?php |
|
2
|
|
|
|
|
3
|
|
|
namespace RayRutjes\GetEventStore\Client\Http\Feed; |
|
4
|
|
|
|
|
5
|
|
|
use RayRutjes\GetEventStore\Client\Http\HttpClient; |
|
6
|
|
|
use RayRutjes\GetEventStore\Client\Http\ReadEventStreamViaPersistentSubscriptionRequestFactory; |
|
7
|
|
|
use RayRutjes\GetEventStore\Client\Http\ReadEventStreamViaPersistentSubscriptionResponseInspector; |
|
8
|
|
|
use RayRutjes\GetEventStore\StreamId; |
|
9
|
|
|
|
|
10
|
|
|
final class EventStreamViaPersistentSubscriptionFeedIterator implements \Iterator |
|
11
|
|
|
{ |
|
12
|
|
|
/** |
|
13
|
|
|
* @var StreamId |
|
14
|
|
|
*/ |
|
15
|
|
|
private $streamId; |
|
16
|
|
|
|
|
17
|
|
|
/** |
|
18
|
|
|
* @var HttpClient |
|
19
|
|
|
*/ |
|
20
|
|
|
private $client; |
|
21
|
|
|
|
|
22
|
|
|
/** |
|
23
|
|
|
* @var EventStreamViaPersistentSubscriptionFeed |
|
24
|
|
|
*/ |
|
25
|
|
|
private $currentFeed; |
|
26
|
|
|
|
|
27
|
|
|
/** |
|
28
|
|
|
* @var string |
|
29
|
|
|
*/ |
|
30
|
|
|
private $headUri; |
|
31
|
|
|
|
|
32
|
|
|
/** |
|
33
|
|
|
* @var bool |
|
34
|
|
|
*/ |
|
35
|
|
|
private $initialized = false; |
|
36
|
|
|
|
|
37
|
|
|
/** |
|
38
|
|
|
* @var int |
|
39
|
|
|
*/ |
|
40
|
|
|
private $currentKey = 0; |
|
41
|
|
|
|
|
42
|
|
|
/** |
|
43
|
|
|
* @var string |
|
44
|
|
|
*/ |
|
45
|
|
|
private $groupName; |
|
46
|
|
|
|
|
47
|
|
|
/** |
|
48
|
|
|
* @var int |
|
49
|
|
|
*/ |
|
50
|
|
|
private $batchSize; |
|
51
|
|
|
|
|
52
|
|
|
/** |
|
53
|
|
|
* @param StreamId $streamId |
|
54
|
|
|
* @param string $groupName |
|
55
|
|
|
* @param HttpClient $client |
|
56
|
|
|
* @param int $batchSize |
|
57
|
|
|
*/ |
|
58
|
|
|
public function __construct(StreamId $streamId, string $groupName, HttpClient $client, int $batchSize = 1) |
|
59
|
|
|
{ |
|
60
|
|
|
$this->streamId = $streamId; |
|
61
|
|
|
$this->groupName = $groupName; |
|
62
|
|
|
$this->client = $client; |
|
63
|
|
|
$this->headUri = sprintf('/subscriptions/%s/%s', $this->streamId->toString(), $this->groupName); |
|
64
|
|
|
$this->batchSize = $batchSize; |
|
65
|
|
|
} |
|
66
|
|
|
|
|
67
|
|
|
/** |
|
68
|
|
|
* @param string $uri |
|
69
|
|
|
* |
|
70
|
|
|
* @return EventStreamViaPersistentSubscriptionFeed |
|
71
|
|
|
*/ |
|
72
|
|
|
private function readEventStreamFeed(string $uri): EventStreamViaPersistentSubscriptionFeed |
|
73
|
|
|
{ |
|
74
|
|
|
$factory = new ReadEventStreamViaPersistentSubscriptionRequestFactory($uri, $this->batchSize); |
|
75
|
|
|
$inspector = new ReadEventStreamViaPersistentSubscriptionResponseInspector(); |
|
76
|
|
|
$this->client->send($factory->buildRequest(), $inspector); |
|
77
|
|
|
|
|
78
|
|
|
return $inspector->getFeed(); |
|
79
|
|
|
} |
|
80
|
|
|
|
|
81
|
|
|
/** |
|
82
|
|
|
* @return EventStreamViaPersistentSubscriptionFeed |
|
83
|
|
|
*/ |
|
84
|
|
View Code Duplication |
public function current(): EventStreamViaPersistentSubscriptionFeed |
|
|
|
|
|
|
85
|
|
|
{ |
|
86
|
|
|
if (null === $this->currentFeed) { |
|
87
|
|
|
if (true === $this->initialized) { |
|
88
|
|
|
throw new \OutOfBoundsException('Stream overflow.'); |
|
89
|
|
|
} else { |
|
90
|
|
|
$this->rewind(); |
|
91
|
|
|
} |
|
92
|
|
|
} |
|
93
|
|
|
|
|
94
|
|
|
return $this->currentFeed; |
|
95
|
|
|
} |
|
96
|
|
|
|
|
97
|
|
|
public function next() |
|
98
|
|
|
{ |
|
99
|
|
|
$this->currentKey++; |
|
100
|
|
|
$this->readForward(); |
|
101
|
|
|
} |
|
102
|
|
|
|
|
103
|
|
View Code Duplication |
private function readForward() |
|
|
|
|
|
|
104
|
|
|
{ |
|
105
|
|
|
if ($this->currentFeed->hasPreviousLink() && !$this->currentFeed->isHeadOfStream()) { |
|
106
|
|
|
$this->currentFeed = $this->readEventStreamFeed($this->currentFeed->getPreviousLink()->getUri()); |
|
107
|
|
|
|
|
108
|
|
|
return; |
|
109
|
|
|
} |
|
110
|
|
|
$this->currentFeed = null; |
|
111
|
|
|
} |
|
112
|
|
|
|
|
113
|
|
|
/** |
|
114
|
|
|
* @return int |
|
115
|
|
|
*/ |
|
116
|
|
|
public function key(): int |
|
117
|
|
|
{ |
|
118
|
|
|
return $this->currentKey; |
|
119
|
|
|
} |
|
120
|
|
|
|
|
121
|
|
|
/** |
|
122
|
|
|
* @return bool |
|
123
|
|
|
*/ |
|
124
|
|
|
public function valid(): bool |
|
125
|
|
|
{ |
|
126
|
|
|
return null !== $this->currentFeed; |
|
127
|
|
|
} |
|
128
|
|
|
|
|
129
|
|
|
public function rewind() |
|
130
|
|
|
{ |
|
131
|
|
|
$this->currentKey = 0; |
|
132
|
|
|
$this->currentFeed = $this->readEventStreamFeed($this->headUri); |
|
133
|
|
|
$this->initialized = true; |
|
134
|
|
|
} |
|
135
|
|
|
} |
|
136
|
|
|
|
Duplicated code is one of the most pungent code smells. If you need to duplicate the same code in three or more different places, we strongly encourage you to look into extracting the code into a single class or operation.
You can also find more detailed suggestions in the “Code” section of your repository.