| @@ 41-72 (lines=32) @@ | ||
| 38 | * @param int $count |
|
| 39 | * @return EventStreamInterface |
|
| 40 | */ |
|
| 41 | public function readStreamEventsForward($streamId, $start = 1, $count = null) |
|
| 42 | { |
|
| 43 | if (!isset($count)) { |
|
| 44 | $count = self::MAX_UNSIGNED_BIG_INT; |
|
| 45 | } |
|
| 46 | $stmt = $this->connection->prepare( |
|
| 47 | 'SELECT * |
|
| 48 | FROM events |
|
| 49 | WHERE stream_id = :streamId |
|
| 50 | LIMIT :limit |
|
| 51 | OFFSET :offset' |
|
| 52 | ); |
|
| 53 | $stmt->bindValue(':streamId', $streamId); |
|
| 54 | $stmt->bindValue(':offset', (int) $start - 1, \PDO::PARAM_INT); |
|
| 55 | $stmt->bindValue(':limit', $count, \PDO::PARAM_INT); |
|
| 56 | $stmt->execute(); |
|
| 57 | $results = $stmt->fetchAll(); |
|
| 58 | ||
| 59 | $storedEvents = array_map(function($result) { |
|
| 60 | return new StoredEvent( |
|
| 61 | $result['id'], |
|
| 62 | $result['stream_id'], |
|
| 63 | $result['type'], |
|
| 64 | $result['event'], |
|
| 65 | $result['metadata'], |
|
| 66 | new \DateTimeImmutable($result['occurred_on']), |
|
| 67 | Version::fromString($result['version']) |
|
| 68 | ); |
|
| 69 | }, $results); |
|
| 70 | ||
| 71 | return $this->domainEventStreamFromStoredEvents($storedEvents); |
|
| 72 | } |
|
| 73 | ||
| 74 | /** |
|
| 75 | * @param string $streamId |
|
| @@ 43-74 (lines=32) @@ | ||
| 40 | * @param int $count |
|
| 41 | * @return EventStreamInterface |
|
| 42 | */ |
|
| 43 | public function readStreamEventsForward($streamId, $start = 1, $count = null) |
|
| 44 | { |
|
| 45 | if (!isset($count)) { |
|
| 46 | $count = self::MAX_UNSIGNED_BIG_INT; |
|
| 47 | } |
|
| 48 | $stmt = $this->connection->prepare( |
|
| 49 | 'SELECT * |
|
| 50 | FROM events |
|
| 51 | WHERE stream_id = :streamId |
|
| 52 | LIMIT :limit |
|
| 53 | OFFSET :offset' |
|
| 54 | ); |
|
| 55 | $stmt->bindValue(':streamId', $streamId); |
|
| 56 | $stmt->bindValue(':offset', (int) $start - 1, \PDO::PARAM_INT); |
|
| 57 | $stmt->bindValue(':limit', $count, \PDO::PARAM_INT); |
|
| 58 | $stmt->execute(); |
|
| 59 | $results = $stmt->fetchAll(); |
|
| 60 | ||
| 61 | $storedEvents = array_map(function($event) { |
|
| 62 | return new StoredEvent( |
|
| 63 | $event['id'], |
|
| 64 | $event['stream_id'], |
|
| 65 | $event['type'], |
|
| 66 | $event['event'], |
|
| 67 | $event['metadata'], |
|
| 68 | new \DateTimeImmutable($event['occurred_on']), |
|
| 69 | Version::fromString($event['version']) |
|
| 70 | ); |
|
| 71 | }, $results); |
|
| 72 | ||
| 73 | return $this->domainEventStreamFromStoredEvents($storedEvents); |
|
| 74 | } |
|
| 75 | ||
| 76 | /** |
|
| 77 | * @param string $streamId |
|