| @@ 93-117 (lines=25) @@ | ||
| 90 | * @param string $streamId |
|
| 91 | * @return EventStreamInterface |
|
| 92 | */ |
|
| 93 | public function readFullStream($streamId) |
|
| 94 | { |
|
| 95 | $stmt = $this->connection->prepare( |
|
| 96 | 'SELECT * |
|
| 97 | FROM events |
|
| 98 | WHERE stream_id = :streamId' |
|
| 99 | ); |
|
| 100 | $stmt->bindValue(':streamId', $streamId); |
|
| 101 | $stmt->execute(); |
|
| 102 | $results = $stmt->fetchAll(); |
|
| 103 | ||
| 104 | $storedEvents = array_map(function($result) { |
|
| 105 | return new StoredEvent( |
|
| 106 | $result['id'], |
|
| 107 | $result['stream_id'], |
|
| 108 | $result['type'], |
|
| 109 | $result['event'], |
|
| 110 | $result['metadata'], |
|
| 111 | new \DateTimeImmutable($result['occurred_on']), |
|
| 112 | Version::fromString($result['version']) |
|
| 113 | ); |
|
| 114 | }, $results); |
|
| 115 | ||
| 116 | return $this->domainEventStreamFromStoredEvents($storedEvents); |
|
| 117 | } |
|
| 118 | ||
| 119 | /** |
|
| 120 | * @return EventStreamInterface[] |
|
| @@ 140-166 (lines=27) @@ | ||
| 137 | * @param Version $version |
|
| 138 | * @return EventStreamInterface |
|
| 139 | */ |
|
| 140 | protected function readStoredEventsOfTypeAndVersion($type, $version) |
|
| 141 | { |
|
| 142 | $stmt = $this->connection->prepare( |
|
| 143 | 'SELECT * |
|
| 144 | FROM events |
|
| 145 | WHERE type = :type |
|
| 146 | AND version = :version' |
|
| 147 | ); |
|
| 148 | $stmt->bindValue(':type', $type); |
|
| 149 | $stmt->bindValue(':version', $version); |
|
| 150 | $stmt->execute(); |
|
| 151 | $results = $stmt->fetchAll(); |
|
| 152 | ||
| 153 | $storedEvents = array_map(function($result) { |
|
| 154 | return new StoredEvent( |
|
| 155 | $result['id'], |
|
| 156 | $result['stream_id'], |
|
| 157 | $result['type'], |
|
| 158 | $result['event'], |
|
| 159 | $result['metadata'], |
|
| 160 | new \DateTimeImmutable($result['occurred_on']), |
|
| 161 | Version::fromString($result['version']) |
|
| 162 | ); |
|
| 163 | }, $results); |
|
| 164 | ||
| 165 | return new EventStream($storedEvents); |
|
| 166 | } |
|
| 167 | ||
| 168 | /** |
|
| 169 | * @param string $streamId |
|
| @@ 94-118 (lines=25) @@ | ||
| 91 | * @param string $streamId |
|
| 92 | * @return EventStreamInterface |
|
| 93 | */ |
|
| 94 | public function readFullStream($streamId) |
|
| 95 | { |
|
| 96 | $stmt = $this->connection->prepare( |
|
| 97 | 'SELECT * |
|
| 98 | FROM events |
|
| 99 | WHERE stream_id = :streamId' |
|
| 100 | ); |
|
| 101 | $stmt->bindValue(':streamId', $streamId); |
|
| 102 | $stmt->execute(); |
|
| 103 | $results = $stmt->fetchAll(); |
|
| 104 | ||
| 105 | $storedEvents = array_map(function($event) { |
|
| 106 | return new StoredEvent( |
|
| 107 | $event['id'], |
|
| 108 | $event['stream_id'], |
|
| 109 | $event['type'], |
|
| 110 | $event['event'], |
|
| 111 | $event['metadata'], |
|
| 112 | new \DateTimeImmutable($event['occurred_on']), |
|
| 113 | Version::fromString($event['version']) |
|
| 114 | ); |
|
| 115 | }, $results); |
|
| 116 | ||
| 117 | return $this->domainEventStreamFromStoredEvents($storedEvents); |
|
| 118 | } |
|
| 119 | ||
| 120 | /** |
|
| 121 | * @return EventStreamInterface[] |
|
| @@ 209-235 (lines=27) @@ | ||
| 206 | * @param Version $version |
|
| 207 | * @return EventStreamInterface |
|
| 208 | */ |
|
| 209 | protected function readStoredEventsOfTypeAndVersion($type, $version) |
|
| 210 | { |
|
| 211 | $stmt = $this->connection->prepare( |
|
| 212 | 'SELECT * |
|
| 213 | FROM events |
|
| 214 | WHERE type = :type |
|
| 215 | AND version = :version' |
|
| 216 | ); |
|
| 217 | $stmt->bindValue(':type', $type); |
|
| 218 | $stmt->bindValue(':version', $version); |
|
| 219 | $stmt->execute(); |
|
| 220 | $results = $stmt->fetchAll(); |
|
| 221 | ||
| 222 | $storedEvents = array_map(function($result) { |
|
| 223 | return new StoredEvent( |
|
| 224 | $result['id'], |
|
| 225 | $result['stream_id'], |
|
| 226 | $result['type'], |
|
| 227 | $result['event'], |
|
| 228 | $result['metadata'], |
|
| 229 | new \DateTimeImmutable($result['occurred_on']), |
|
| 230 | Version::fromString($result['version']) |
|
| 231 | ); |
|
| 232 | }, $results); |
|
| 233 | ||
| 234 | return new EventStream($storedEvents); |
|
| 235 | } |
|
| 236 | ||
| 237 | public function initialize() |
|
| 238 | { |
|