| @@ 78-102 (lines=25) @@ | ||
| 75 | * @param string $streamId |
|
| 76 | * @return EventStreamInterface |
|
| 77 | */ |
|
| 78 | public function readFullStream($streamId) |
|
| 79 | { |
|
| 80 | $stmt = $this->connection->prepare( |
|
| 81 | 'SELECT * |
|
| 82 | FROM events |
|
| 83 | WHERE stream_id = :streamId' |
|
| 84 | ); |
|
| 85 | $stmt->bindValue(':streamId', $streamId); |
|
| 86 | $stmt->execute(); |
|
| 87 | $results = $stmt->fetchAll(); |
|
| 88 | ||
| 89 | $storedEvents = array_map(function($result) { |
|
| 90 | return new StoredEvent( |
|
| 91 | $result['id'], |
|
| 92 | $result['stream_id'], |
|
| 93 | $result['type'], |
|
| 94 | $result['event'], |
|
| 95 | $result['metadata'], |
|
| 96 | new \DateTimeImmutable($result['occurred_on']), |
|
| 97 | Version::fromString($result['version']) |
|
| 98 | ); |
|
| 99 | }, $results); |
|
| 100 | ||
| 101 | return $this->domainEventStreamFromStoredEvents($storedEvents); |
|
| 102 | } |
|
| 103 | ||
| 104 | /** |
|
| 105 | * @return EventStreamInterface[] |
|
| @@ 125-151 (lines=27) @@ | ||
| 122 | * @param Version $version |
|
| 123 | * @return EventStreamInterface |
|
| 124 | */ |
|
| 125 | protected function readStoredEventsOfTypeAndVersion($type, $version) |
|
| 126 | { |
|
| 127 | $stmt = $this->connection->prepare( |
|
| 128 | 'SELECT * |
|
| 129 | FROM events |
|
| 130 | WHERE type = :type |
|
| 131 | AND version = :version' |
|
| 132 | ); |
|
| 133 | $stmt->bindValue(':type', $type); |
|
| 134 | $stmt->bindValue(':version', $version); |
|
| 135 | $stmt->execute(); |
|
| 136 | $results = $stmt->fetchAll(); |
|
| 137 | ||
| 138 | $storedEvents = array_map(function($result) { |
|
| 139 | return new StoredEvent( |
|
| 140 | $result['id'], |
|
| 141 | $result['stream_id'], |
|
| 142 | $result['type'], |
|
| 143 | $result['event'], |
|
| 144 | $result['metadata'], |
|
| 145 | new \DateTimeImmutable($result['occurred_on']), |
|
| 146 | Version::fromString($result['version']) |
|
| 147 | ); |
|
| 148 | }, $results); |
|
| 149 | ||
| 150 | return new EventStream($storedEvents); |
|
| 151 | } |
|
| 152 | ||
| 153 | /** |
|
| 154 | * @param string $streamId |
|
| @@ 80-104 (lines=25) @@ | ||
| 77 | * @param string $streamId |
|
| 78 | * @return EventStreamInterface |
|
| 79 | */ |
|
| 80 | public function readFullStream($streamId) |
|
| 81 | { |
|
| 82 | $stmt = $this->connection->prepare( |
|
| 83 | 'SELECT * |
|
| 84 | FROM events |
|
| 85 | WHERE stream_id = :streamId' |
|
| 86 | ); |
|
| 87 | $stmt->bindValue(':streamId', $streamId); |
|
| 88 | $stmt->execute(); |
|
| 89 | $results = $stmt->fetchAll(); |
|
| 90 | ||
| 91 | $storedEvents = array_map(function($event) { |
|
| 92 | return new StoredEvent( |
|
| 93 | $event['id'], |
|
| 94 | $event['stream_id'], |
|
| 95 | $event['type'], |
|
| 96 | $event['event'], |
|
| 97 | $event['metadata'], |
|
| 98 | new \DateTimeImmutable($event['occurred_on']), |
|
| 99 | Version::fromString($event['version']) |
|
| 100 | ); |
|
| 101 | }, $results); |
|
| 102 | ||
| 103 | return $this->domainEventStreamFromStoredEvents($storedEvents); |
|
| 104 | } |
|
| 105 | ||
| 106 | /** |
|
| 107 | * @return EventStreamInterface[] |
|
| @@ 195-221 (lines=27) @@ | ||
| 192 | * @param Version $version |
|
| 193 | * @return EventStreamInterface |
|
| 194 | */ |
|
| 195 | protected function readStoredEventsOfTypeAndVersion($type, $version) |
|
| 196 | { |
|
| 197 | $stmt = $this->connection->prepare( |
|
| 198 | 'SELECT * |
|
| 199 | FROM events |
|
| 200 | WHERE type = :type |
|
| 201 | AND version = :version' |
|
| 202 | ); |
|
| 203 | $stmt->bindValue(':type', $type); |
|
| 204 | $stmt->bindValue(':version', $version); |
|
| 205 | $stmt->execute(); |
|
| 206 | $results = $stmt->fetchAll(); |
|
| 207 | ||
| 208 | $storedEvents = array_map(function($result) { |
|
| 209 | return new StoredEvent( |
|
| 210 | $result['id'], |
|
| 211 | $result['stream_id'], |
|
| 212 | $result['type'], |
|
| 213 | $result['event'], |
|
| 214 | $result['metadata'], |
|
| 215 | new \DateTimeImmutable($result['occurred_on']), |
|
| 216 | Version::fromString($result['version']) |
|
| 217 | ); |
|
| 218 | }, $results); |
|
| 219 | ||
| 220 | return new EventStream($storedEvents); |
|
| 221 | } |
|
| 222 | } |
|
| 223 | ||