| @@ 45-76 (lines=32) @@ | ||
| 42 | * @param int $count |
|
| 43 | * @return EventStreamInterface |
|
| 44 | */ |
|
| 45 | public function readStreamEventsForward($streamId, $start = 1, $count = null) |
|
| 46 | { |
|
| 47 | if (!isset($count)) { |
|
| 48 | $count = self::MAX_UNSIGNED_BIG_INT; |
|
| 49 | } |
|
| 50 | $stmt = $this->connection->prepare( |
|
| 51 | 'SELECT * |
|
| 52 | FROM events |
|
| 53 | WHERE stream_id = :streamId |
|
| 54 | LIMIT :limit |
|
| 55 | OFFSET :offset' |
|
| 56 | ); |
|
| 57 | $stmt->bindValue(':streamId', $streamId); |
|
| 58 | $stmt->bindValue(':offset', (int) $start - 1, \PDO::PARAM_INT); |
|
| 59 | $stmt->bindValue(':limit', $count, \PDO::PARAM_INT); |
|
| 60 | $stmt->execute(); |
|
| 61 | $results = $stmt->fetchAll(); |
|
| 62 | ||
| 63 | $storedEvents = array_map(function($result) { |
|
| 64 | return new StoredEvent( |
|
| 65 | $result['id'], |
|
| 66 | $result['stream_id'], |
|
| 67 | $result['type'], |
|
| 68 | $result['event'], |
|
| 69 | $result['metadata'], |
|
| 70 | new \DateTimeImmutable($result['occurred_on']), |
|
| 71 | Version::fromString($result['version']) |
|
| 72 | ); |
|
| 73 | }, $results); |
|
| 74 | ||
| 75 | return $this->domainEventStreamFromStoredEvents($storedEvents); |
|
| 76 | } |
|
| 77 | ||
| 78 | /** |
|
| 79 | * @param string $streamId |
|
| @@ 46-77 (lines=32) @@ | ||
| 43 | * @param int $count |
|
| 44 | * @return EventStreamInterface |
|
| 45 | */ |
|
| 46 | public function readStreamEventsForward($streamId, $start = 1, $count = null) |
|
| 47 | { |
|
| 48 | if (!isset($count)) { |
|
| 49 | $count = self::MAX_UNSIGNED_BIG_INT; |
|
| 50 | } |
|
| 51 | $stmt = $this->connection->prepare( |
|
| 52 | 'SELECT * |
|
| 53 | FROM events |
|
| 54 | WHERE stream_id = :streamId |
|
| 55 | LIMIT :limit |
|
| 56 | OFFSET :offset' |
|
| 57 | ); |
|
| 58 | $stmt->bindValue(':streamId', $streamId); |
|
| 59 | $stmt->bindValue(':offset', (int) $start - 1, \PDO::PARAM_INT); |
|
| 60 | $stmt->bindValue(':limit', $count, \PDO::PARAM_INT); |
|
| 61 | $stmt->execute(); |
|
| 62 | $results = $stmt->fetchAll(); |
|
| 63 | ||
| 64 | $storedEvents = array_map(function($event) { |
|
| 65 | return new StoredEvent( |
|
| 66 | $event['id'], |
|
| 67 | $event['stream_id'], |
|
| 68 | $event['type'], |
|
| 69 | $event['event'], |
|
| 70 | $event['metadata'], |
|
| 71 | new \DateTimeImmutable($event['occurred_on']), |
|
| 72 | Version::fromString($event['version']) |
|
| 73 | ); |
|
| 74 | }, $results); |
|
| 75 | ||
| 76 | return $this->domainEventStreamFromStoredEvents($storedEvents); |
|
| 77 | } |
|
| 78 | ||
| 79 | /** |
|
| 80 | * @param string $streamId |
|