findLastSnapshot()   A
last analyzed

Complexity

Conditions 1
Paths 1

Size

Total Lines 11
Code Lines 7

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 7
CRAP Score 1

Importance

Changes 1
Bugs 0 Features 0
Metric Value
c 1
b 0
f 0
dl 0
loc 11
ccs 7
cts 7
cp 1
rs 9.4286
cc 1
eloc 7
nc 1
nop 3
crap 1
1
<?php
2
/**
3
 * This file is part of the SmartGecko(c) business platform.
4
 *
5
 * For the full copyright and license information, please view the LICENSE
6
 * file that was distributed with this source code.
7
 */
8
9
namespace Governor\Framework\EventStore\Mongo;
10
11
12
use Governor\Framework\Domain\DomainEventMessageInterface;
13
use Governor\Framework\Serializer\SerializerInterface;
14
15
class DocumentPerEventStorageStrategy implements StorageStrategyInterface
16
{
17
18
    const ORDER_ASC = 1;
19
    const ORDER_DESC = -1;
20
21
    /**
22
     * Generates the DBObject instances that need to be stored for a commit.
23
     *
24
     * @param string $type The aggregate's type identifier
25
     * @param SerializerInterface $eventSerializer The serializer to serialize events with
26
     * @param DomainEventMessageInterface[] The messages contained in this commit
27
     * @return array of DBObject, representing the documents to store
28
     */
29 8
    public function  createDocuments($type, SerializerInterface $eventSerializer, array $messages)
30
    {
31 8
        $dbObjects = [];
32
33 8
        foreach ($messages as $message) {
34 8
            $dbObjects[] = EventEntry::fromDomainEvent($type, $message, $eventSerializer)->asDBObject();
35 8
        }
36
37 8
        return $dbObjects;
38
    }
39
40
    /**
41
     * Extracts the individual Event Messages from the given <code>entry</code>. The <code>aggregateIdentifier</code>
42
     * is passed to allow messages to contain the actual object, instead of its serialized form. The
43
     * <code>serializer</code> and <code>upcasterChain</code> should be used to deserialize and upcast messages before
44
     * returning them.
45
     *
46
     * @param array $entry The entry containing information of a stored commit
47
     * @param string $aggregateIdentifier The aggregate identifier used to query events
48
     * @param SerializerInterface $serializer The serializer to deserialize events with
49
     * @param mixed $upcasterChain The upcaster chain to upcast stored events with // !!! TODO
50
     * @param bool $skipUnknownTypes If unknown event types should be skipped
51
     * @return DomainEventMessageInterface[] a list of messages contained in the entry
52
     */
53 6
    public function extractEventMessages(
54
        array $entry,
55
        $aggregateIdentifier,
56
        SerializerInterface $serializer,
57
        $upcasterChain,
58
        $skipUnknownTypes
59
    ) {
60
61 6
        return EventEntry::fromDbObject($entry)->getDomainEvents(
62 6
            $aggregateIdentifier,
63 6
            $serializer,
64 6
            $upcasterChain,
65
            $skipUnknownTypes
66 6
        );
67
    }
68
69
    /**
70
     * Provides a cursor for access to all events for an aggregate with given <code>aggregateType</code> and
71
     * <code>aggregateIdentifier</code>, with a sequence number equal or higher than the given
72
     * <code>firstSequenceNumber</code>. The returned documents should be ordered chronologically (typically by using
73
     * the sequence number).
74
     * <p/>
75
     * Each DBObject document returned as result of this cursor will be passed to {@link
76
     * #extractEventMessages} in order to retrieve individual DomainEventMessages.
77
     *
78
     * @param \MongoCollection $collection The collection to
79
     * @param string $aggregateType The type identifier of the aggregate to query
80
     * @param string $aggregateIdentifier The identifier of the aggregate to query
81
     * @param int $firstSequenceNumber The sequence number of the first event to return
82
     * @return \MongoCursor a Query object that represent a query for events of an aggregate
83
     */
84 6
    public function findEvents(
85
        \MongoCollection $collection,
86
        $aggregateType,
87
        $aggregateIdentifier,
88
        $firstSequenceNumber
89
    ) {
90 6
        return $collection->find(
91 6
            EventEntry::forAggregate($aggregateType, $aggregateIdentifier, $firstSequenceNumber)
92 6
        )->sort([EventEntry::SEQUENCE_NUMBER_PROPERTY => self::ORDER_ASC]);
93
    }
94
95
    /**
96
     * Find all events that match the given <code>criteria</code> in the given <code>collection</code>
97
     *
98
     * @param \MongoCollection $collection The collection to search for events
99
     * @param array $criteria The criteria to match against the events
100
     * @return \MongoCursor a cursor for the documents representing matched events
101
     */
102 1
    public function findEventsByCriteria(\MongoCollection $collection, array $criteria = [])
103
    {
104
        $sort = [
105 1
            EventEntry::TIME_STAMP_PROPERTY => self::ORDER_ASC,
106 1
            EventEntry::SEQUENCE_NUMBER_PROPERTY => self::ORDER_ASC
107 1
        ];
108
109 1
        return $collection->find($criteria)->sort($sort);
110
    }
111
112
    /**
113
     * Finds the entry containing the last snapshot event for an aggregate with given <code>aggregateType</code> and
114
     * <code>aggregateIdentifier</code> in the given <code>collection</code>. For each result returned by the Cursor,
115
     * an invocation to {@link #extractEventMessages} will be used to extract
116
     * the actual DomainEventMessages.
117
     *
118
     * @param \MongoCollection $collection The collection to find the last snapshot event in
119
     * @param string $aggregateType The type identifier of the aggregate to find a snapshot for
120
     * @param string $aggregateIdentifier The identifier of the aggregate to find a snapshot for
121
     * @return \MongoCursor a cursor providing access to the entries found
122
     */
123 4
    public function findLastSnapshot(\MongoCollection $collection, $aggregateType, $aggregateIdentifier)
124
    {
125
        $filter = [
126 4
            EventEntry::AGGREGATE_IDENTIFIER_PROPERTY => $aggregateIdentifier,
127 4
            EventEntry::AGGREGATE_TYPE_PROPERTY => $aggregateType
128 4
        ];
129
130 4
        return $collection->find($filter)
131 4
            ->sort([EventEntry::SEQUENCE_NUMBER_PROPERTY => self::ORDER_DESC])
132 4
            ->limit(1);
133
    }
134
135
    /**
136
     * Ensure that the correct indexes are in place.
137
     *
138
     * @param \MongoCollection $eventsCollection The collection containing the documents representing commits and events.
139
     * @param \MongoCollection $snapshotsCollection The collection containing the document representing snapshots
140
     */
141 10
    public function ensureIndexes(\MongoCollection $eventsCollection, \MongoCollection $snapshotsCollection)
142
    {
143 10
        $eventsCollection->ensureIndex(
144
            [
145 10
                EventEntry::AGGREGATE_IDENTIFIER_PROPERTY => 1,
146 10
                EventEntry::AGGREGATE_TYPE_PROPERTY => 1,
147 10
                EventEntry::SEQUENCE_NUMBER_PROPERTY => 1
148 10
            ],
149
            [
150 10
                'unique' => true,
151
                'name' => 'uniqueAggregateIndex'
152 10
            ]
153 10
        );
154
155 10
        $eventsCollection->ensureIndex(
156
            [
157 10
                EventEntry::TIME_STAMP_PROPERTY => 1,
158 10
                EventEntry::SEQUENCE_NUMBER_PROPERTY => 1
159 10
            ],
160
            [
161 10
                'unique' => false,
162
                'name' => 'orderedEventStreamIndex'
163 10
            ]
164 10
        );
165
166 10
        $snapshotsCollection->ensureIndex(
167
            [
168 10
                EventEntry::AGGREGATE_IDENTIFIER_PROPERTY => 1,
169 10
                EventEntry::AGGREGATE_TYPE_PROPERTY => 1,
170 10
                EventEntry::SEQUENCE_NUMBER_PROPERTY => 1
171 10
            ],
172
            [
173 10
                'unique' => true,
174
                'name' => 'uniqueAggregateIndex'
175 10
            ]
176 10
        );
177 10
    }
178
179
}