BaseAggregateRepository::loadSnapshot()   A
last analyzed

Complexity

Conditions 2
Paths 2

Size

Total Lines 8
Code Lines 4

Duplication

Lines 0
Ratio 0 %

Importance

Changes 2
Bugs 1 Features 0
Metric Value
c 2
b 1
f 0
dl 0
loc 8
rs 9.4286
cc 2
eloc 4
nc 2
nop 1
1
<?php
2
3
namespace Domain\Aggregates;
4
5
use Domain\Eventing\EventStore;
6
use Domain\Eventing\EventBus;
7
use Domain\Identity\Identity;
8
use Domain\Snapshotting\SnapshotStore;
9
use Domain\Snapshotting\SnapshottingPolicy;
10
use Domain\Snapshotting\Policy\IntervalBased;
11
use Domain\Eventing\CommittedEvents;
12
13
/**
14
 * Base class for concrete repositories which contains some sane defaults
15
 *
16
 * @author Sebastiaan Hilbers <[email protected]>
17
 */
18
abstract class BaseAggregateRepository implements AggregateRepository
19
{
20
    /**
21
     * @var EventStore
22
     */
23
    private $eventStore;
24
25
    /**
26
     * @var EventBus
27
     */
28
    private $eventBus;
29
30
    /**
31
     * @var SnapshotStore
32
     */
33
    private $snapshotStore;
34
35
    /**
36
     * @var SnapshottingPolicy
37
     */
38
    private $snapshottingPolicy;
39
40
    /**
41
     * @param EventStore $eventStore
42
     * @param EventBus $eventBus
43
     * @param SnapshotStore $snapshotStore
44
     * @param SnapshottingPolicy $policy
45
     */
46
    public function __construct(
47
        EventStore $eventStore,
48
        EventBus $eventBus = null,
49
        SnapshotStore $snapshotStore = null,
50
        SnapshottingPolicy $policy = null
51
    ) {
52
        $this->eventStore = $eventStore;
53
        $this->eventBus = $eventBus;
54
        $this->snapshotStore = $snapshotStore;
55
56
        // default policy sets up a 100 event interval for snapshotting
57
        if (!is_null($snapshotStore) && is_null($policy)) {
58
            $policy = new IntervalBased(100);
59
        }
60
61
        $this->snapshottingPolicy = $policy;
62
    }
63
64
    /**
65
     * To store the updates made to an Aggregate, we only need to
66
     * commit the latest recorded events to the EventStore.
67
     *
68
     * An eventbus can handle eventual or direct consistency.
69
     *
70
     * Snapsnots could be made if the policy allows it and this repo is
71
     * constructed with a storage for storing snapshots.
72
     *
73
     * @param AggregateRoot $aggregate
74
     * @return CommittedEvents
75
     */
76
    public function save(AggregateRoot $aggregate)
77
    {
78
        $uncommitted = $aggregate->getChanges();
79
        $committedStream = $this->eventStore->commit($uncommitted);
80
81
        // consider eventual consistency
82
        if (!is_null($this->eventBus)) {
83
            $this->eventBus->publish($committedStream);
84
        }
85
86
        // do we need to create a snapshot at this point?
87
        if (!is_null($this->snapshotStore) && $this->snapshottingPolicy->shouldCreateSnapshot($aggregate)) {
88
            $this->saveSnapshot($aggregate);
89
        }
90
91
        // remove changes on aggregate from memory
92
        $aggregate->clearChanges();
93
94
        return $committedStream;
95
    }
96
97
    /**
98
     * Fetching a single Aggregate is extremely easy: all we need to do is
99
     * reconstitute it from its history! Compare that to the complexity
100
     * of traditional ORMs.
101
     *
102
     * @param Identity $aggregateId
103
     * @return AggregateRoot
104
     */
105
    public function get(Identity $aggregateId)
106
    {
107
        if (!is_null($this->snapshotStore)) {
108
            if (!($snapshot = $this->loadSnapshot($aggregateId))) {
109
                $missing = $this->eventStore->getAggregateHistoryFor($aggregateId, $snapshot->getVersion());
110
111
                if ($missing->count() > 0) {
112
                    $snapshot->correctMissingHistory($missing);
113
                }
114
115
                return $snapshot->getAggregate();
116
            }
117
        }
118
119
        $aggregateHistory = $this->eventStore->getAggregateHistoryFor($aggregateId);
120
        $fqn = $this->getAggregateRootFqcn();
121
122
        return $fqn::reconstituteFrom($aggregateHistory);
123
    }
124
125
    /**
126
     * Concrete repositories should define the Aggregate's FQCN
127
     *
128
     * @return string
129
     */
130
    abstract protected function getAggregateRootFqcn();
131
132
    /**
133
     * Load a aggregate snapshot from the storage
134
     *
135
     * @param \Domain\Identity\Identity $id
136
     * @return Snapshot
137
     * @throws \Exception when no snapshotStore was attached
138
     */
139
    public function loadSnapshot(Identity $id)
140
    {
141
        if (is_null($this->snapshotStore)) {
142
            throw new \Exception('Unable to get snapshot; No store attached');
143
        }
144
145
        return $this->snapshotStore->get($id);
146
    }
147
148
    /**
149
     * Save the aggregate state as single snapshot instead of multiple history events
150
     *
151
     * @param \Domain\Aggregates\AggregateRoot $aggregate
152
     * @return type
153
     * @throws \Exception when no snapshotStore was attached
154
     */
155
    public function saveSnapshot(AggregateRoot $aggregate)
156
    {
157
        if (is_null($this->snapshotStore)) {
158
            throw new \Exception('Unable to create snapshot; No store attached');
159
        }
160
161
        if ($aggregate->hasChanges()) {
162
            $aggregate = $aggregate::reconstituteFrom(
163
                new CommittedEvents(
164
                    $aggregate->getIdentity(),
165
                    $aggregate->getChanges()->getEvents()
166
                )
167
            );
168
        }
169
170
        return $this->snapshotStore->save($aggregate);
171
    }
172
}
173