MongoEventStore::appendEvents()   B
last analyzed

Complexity

Conditions 4
Paths 5

Size

Total Lines 29
Code Lines 17

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 18
CRAP Score 4.0023

Importance

Changes 1
Bugs 0 Features 0
Metric Value
c 1
b 0
f 0
dl 0
loc 29
ccs 18
cts 19
cp 0.9474
rs 8.5806
cc 4
eloc 17
nc 5
nop 2
crap 4.0023
1
<?php
2
/**
3
 * This file is part of the SmartGecko(c) business platform.
4
 *
5
 * For the full copyright and license information, please view the LICENSE
6
 * file that was distributed with this source code.
7
 */
8
9
namespace Governor\Framework\EventStore\Mongo;
10
11
use Governor\Framework\Domain\DomainEventMessageInterface;
12
use Governor\Framework\Domain\DomainEventStreamInterface;
13
use Governor\Framework\EventStore\EventStoreException;
14
use Governor\Framework\EventStore\EventStoreInterface;
15
use Governor\Framework\EventStore\EventVisitorInterface;
16
use Governor\Framework\EventStore\Management\CriteriaBuilderInterface;
17
use Governor\Framework\EventStore\Management\CriteriaInterface;
18
use Governor\Framework\EventStore\Management\EventStoreManagementInterface;
19
use Governor\Framework\EventStore\Mongo\Criteria\MongoCriteriaBuilder;
20
use Governor\Framework\EventStore\PartialEventStreamSupportInterface;
21
use Governor\Framework\EventStore\SnapshotEventStoreInterface;
22
use Governor\Framework\Serializer\SerializerInterface;
23
use Governor\Framework\Repository\ConcurrencyException;
24
use Governor\Framework\EventStore\EventStreamNotFoundException;
25
use Governor\Framework\Common\Logging\NullLogger;
26
use Psr\Log\LoggerInterface;
27
28
class MongoEventStore implements EventStoreInterface, EventStoreManagementInterface, SnapshotEventStoreInterface, PartialEventStreamSupportInterface
29
{
30
    /**
31
     * @var LoggerInterface
32
     */
33
    private $logger;
34
35
    /**
36
     * @var MongoTemplateInterface
37
     */
38
    private $mongoTemplate;
39
40
    /**
41
     * @var SerializerInterface
42
     */
43
    private $eventSerializer;
44
45
    /**
46
     * @var StorageStrategyInterface
47
     */
48
    private $storageStrategy;
49
50
51
    private $upcasterChain;
52
53
    /**
54
     * @param MongoTemplateInterface $mongoTemplate
55
     * @param SerializerInterface $eventSerializer
56
     * @param StorageStrategyInterface $storageStrategy
57
     */
58 10
    function __construct(
0 ignored issues
show
Best Practice introduced by
It is generally recommended to explicitly declare the visibility for methods.

Adding explicit visibility (private, protected, or public) is generally recommend to communicate to other developers how, and from where this method is intended to be used.

Loading history...
59
        MongoTemplateInterface $mongoTemplate,
60
        SerializerInterface $eventSerializer,
61
        StorageStrategyInterface $storageStrategy
62
    ) {
63 10
        $this->mongoTemplate = $mongoTemplate;
64 10
        $this->eventSerializer = $eventSerializer;
65 10
        $this->storageStrategy = $storageStrategy;
66
67 10
        $this->logger = new NullLogger();
68
69 10
        $this->storageStrategy->ensureIndexes(
70 10
            $this->mongoTemplate->domainEventCollection(),
71 10
            $this->mongoTemplate->snapshotEventCollection()
72 10
        );
73 10
    }
74
75
76
    /**
77
     * Append the events in the given {@link DomainEventStreamInterface stream} to the event store.
78
     *
79
     * @param string $type The type descriptor of the object to store
80
     * @param DomainEventStreamInterface $events The event stream containing the events to store
81
     * @throws ConcurrencyException if an error occurs while storing the events in the event stream
82
     */
83 7
    public function appendEvents($type, DomainEventStreamInterface $events)
84
    {
85 7
        if (!$events->hasNext()) {
86
            return;
87
        }
88
89 7
        $messages = [];
90
91 7
        while ($events->hasNext()) {
92 7
            $messages[] = $events->next();
93 7
        }
94
95
        try {
96 7
            $this->mongoTemplate->domainEventCollection()->batchInsert(
97 7
                $this->storageStrategy->createDocuments(
98 7
                    $type,
99 7
                    $this->eventSerializer,
100
                    $messages
101 7
                )
102 7
            );
103 7
        } catch (\MongoDuplicateKeyException $ex) {
104 1
            throw new ConcurrencyException(
105
                "Trying to insert an Event for an aggregate with a sequence "
106 1
                ."number that is already present in the Event Store", null, $ex
107 1
            );
108
        }
109
110 7
        $this->logger->debug("{num} events appended", ['num' => count($messages)]);
111 7
    }
112
113
    /**
114
     * @param string $type
115
     * @param string $identifier
116
     * @return \Governor\Framework\Domain\DomainEventMessageInterface[]
117
     */
118 4
    private function loadLastSnapshotEvent($type, $identifier)
119
    {
120 4
        $dbCursor = $this->storageStrategy->findLastSnapshot(
121 4
            $this->mongoTemplate->snapshotEventCollection(),
122 4
            $type,
123
            $identifier
124 4
        );
125
126 4
        if (!$dbCursor->hasNext()) {
127 2
            return [];
128
        }
129 2
        $first = $dbCursor->next();
0 ignored issues
show
Bug introduced by
Are you sure the assignment to $first is correct as $dbCursor->next() (which targets MongoCursor::next()) seems to always return null.

This check looks for function or method calls that always return null and whose return value is assigned to a variable.

class A
{
    function getObject()
    {
        return null;
    }

}

$a = new A();
$object = $a->getObject();

The method getObject() can return nothing but null, so it makes no sense to assign that value to a variable.

The reason is most likely that a function or method is imcomplete or has been reduced for debug purposes.

Loading history...
130
131 2
        return $this->storageStrategy->extractEventMessages(
132 2
            $first,
0 ignored issues
show
Documentation introduced by
$first is of type null, but the function expects a array.

It seems like the type of the argument is not accepted by the function/method which you are calling.

In some cases, in particular if PHP’s automatic type-juggling kicks in this might be fine. In other cases, however this might be a bug.

We suggest to add an explicit type cast like in the following example:

function acceptsInteger($int) { }

$x = '123'; // string "123"

// Instead of
acceptsInteger($x);

// we recommend to use
acceptsInteger((integer) $x);
Loading history...
133 2
            $identifier,
134 2
            $this->eventSerializer,
135 2
            $this->upcasterChain,
136
            false
137 2
        );
138
    }
139
140
    /**
141
     * Read the events of the aggregate identified by the given type and identifier that allow the current aggregate
142
     * state to be rebuilt. Implementations may omit or replace events (e.g. by using snapshot events) from the stream
143
     * for performance purposes.
144
     *
145
     * @param string $type The type descriptor of the object to retrieve
146
     * @param mixed $identifier The unique aggregate identifier of the events to load
147
     * @return DomainEventStreamInterface an event stream containing the events of the aggregate
148
     *
149
     * @throws EventStoreException if an error occurs while reading the events in the event stream
150
     */
151 4
    public function readEvents($type, $identifier)
152
    {
153 4
        $snapshotSequenceNumber = -1;
154
155 4
        $lastSnapshotCommit = $this->loadLastSnapshotEvent($type, $identifier);
156
157 4
        if (null !== $lastSnapshotCommit && !empty($lastSnapshotCommit)) {
158 2
            $snapshotSequenceNumber = $lastSnapshotCommit[0]->getScn();
159 2
        }
160
161 4
        $dbCursor = $this->storageStrategy->findEvents(
162 4
            $this->mongoTemplate->domainEventCollection(),
163 4
            $type,
164 4
            $identifier,
165
            $snapshotSequenceNumber + 1
166 4
        );
167
168 4
        $stream = new CursorBackedDomainEventStream(
169 4
            $dbCursor,
170 4
            $lastSnapshotCommit,
171 4
            $identifier,
172 4
            null,
173 4
            false,
174 4
            $this->getCursorCallback()
175 4
        );
176
177 4
        if (!$stream->hasNext()) {
178 1
            throw new EventStreamNotFoundException($type, $identifier);
179
        }
180
181 3
        return $stream;
182
    }
183
184
    /**
185
     * @return callable
186
     */
187 6
    private function getCursorCallback()
188
    {
189 6
        $self = $this;
190
191
        $cb = function (array $entry, $identifier) use ($self) {
192 5
            return $self->storageStrategy->extractEventMessages(
193 5
                $entry,
194 5
                $identifier,
195 5
                $self->eventSerializer,
196 5
                $self->upcasterChain,
197
                false
198 5
            );
199 6
        };
200
201 6
        return $cb;
202
    }
203
204
    /**
205
     * Returns a Stream containing events for the aggregate identified by the given {@code type} and {@code
206
     * identifier}, starting at the event with the given {@code firstSequenceNumber} (included) up to and including the
207
     * event with given {@code lastSequenceNumber}.
208
     * If no event with given {@code lastSequenceNumber} exists, the returned stream will simply read until the end of
209
     * the aggregate's events.
210
     * <p/>
211
     * The returned stream will not contain any snapshot events.
212
     *
213
     * @param string $type The type identifier of the aggregate
214
     * @param string $identifier The identifier of the aggregate
215
     * @param int $firstSequenceNumber The sequence number of the first event to find
216
     * @param int|null $lastSequenceNumber The sequence number of the last event in the stream
217
     * @return DomainEventStreamInterface a Stream containing events for the given aggregate, starting at the given first sequence number
218
     * @throws EventStreamNotFoundException
219
     */
220 2
    public function readEventsWithinScn(
221
        $type,
222
        $identifier,
223
        $firstSequenceNumber,
224
        $lastSequenceNumber = null
225
    ) {
226 2
        $dbCursor = $this->storageStrategy->findEvents(
227 2
            $this->mongoTemplate->domainEventCollection(),
228 2
            $type,
229 2
            $identifier,
230
            $firstSequenceNumber
231 2
        );
232
233 2
        $stream = new CursorBackedDomainEventStream(
234 2
            $dbCursor, [], $identifier, $lastSequenceNumber,
235 2
            false, $this->getCursorCallback()
236 2
        );
237
238 2
        if (!$stream->hasNext()) {
239
            throw new EventStreamNotFoundException($type, $identifier);
240
        }
241
242 2
        return $stream;
243
    }
244
245
246
    /**
247
     * Loads all events available in the event store and calls
248
     * {@link \Governor\Framework\EventStore\EventVisitorInterface::doWithEvent}
249
     * for each event found. Events of a single aggregate are guaranteed to be ordered by their sequence number.
250
     * <p/>
251
     * Implementations are encouraged, though not required, to supply events in the absolute chronological order.
252
     * <p/>
253
     * Processing stops when the visitor throws an exception.
254
     *
255
     * @param EventVisitorInterface $visitor The visitor the receives each loaded event
256
     * @param CriteriaInterface $criteria The criteria describing the events to select.
257
     */
258 1
    public function visitEvents(
259
        EventVisitorInterface $visitor,
260
        CriteriaInterface $criteria = null
261
    ) {
262 1
        $params = isset($criteria) ? $criteria->asMongoObject() : [];
0 ignored issues
show
Bug introduced by
It seems like you code against a concrete implementation and not the interface Governor\Framework\Event...ement\CriteriaInterface as the method asMongoObject() does only exist in the following implementations of said interface: Governor\Framework\EventStore\Mongo\Criteria\AndX, Governor\Framework\Event...eria\CollectionCriteria, Governor\Framework\Event...e\Mongo\Criteria\Equals, Governor\Framework\Event...\Criteria\MongoCriteria, Governor\Framework\EventStore\Mongo\Criteria\OrX, Governor\Framework\Event...ria\SimpleMongoOperator.

Let’s take a look at an example:

interface User
{
    /** @return string */
    public function getPassword();
}

class MyUser implements User
{
    public function getPassword()
    {
        // return something
    }

    public function getDisplayName()
    {
        // return some name.
    }
}

class AuthSystem
{
    public function authenticate(User $user)
    {
        $this->logger->info(sprintf('Authenticating %s.', $user->getDisplayName()));
        // do something.
    }
}

In the above example, the authenticate() method works fine as long as you just pass instances of MyUser. However, if you now also want to pass a different implementation of User which does not have a getDisplayName() method, the code will break.

Available Fixes

  1. Change the type-hint for the parameter:

    class AuthSystem
    {
        public function authenticate(MyUser $user) { /* ... */ }
    }
    
  2. Add an additional type-check:

    class AuthSystem
    {
        public function authenticate(User $user)
        {
            if ($user instanceof MyUser) {
                $this->logger->info(/** ... */);
            }
    
            // or alternatively
            if ( ! $user instanceof MyUser) {
                throw new \LogicException(
                    '$user must be an instance of MyUser, '
                   .'other instances are not supported.'
                );
            }
    
        }
    }
    
Note: PHP Analyzer uses reverse abstract interpretation to narrow down the types inside the if block in such a case.
  1. Add the method to the interface:

    interface User
    {
        /** @return string */
        public function getPassword();
    
        /** @return string */
        public function getDisplayName();
    }
    
Loading history...
263
264 1
        $cursor = $this->storageStrategy->findEventsByCriteria(
265 1
            $this->mongoTemplate->domainEventCollection(),
266
            $params
267 1
        );
268
269
        //cursor.addOption(Bytes.QUERYOPTION_NOTIMEOUT);
270 1
        $self = $this;
271
272 1
        $cb = function (array $entry, $identifier) use ($self) {
273 1
            return $self->storageStrategy->extractEventMessages(
274 1
                $entry,
275 1
                $identifier,
276 1
                $self->eventSerializer,
277 1
                $self->upcasterChain,
278
                false
279 1
            );
280 1
        };
281
282 1
        $events = new CursorBackedDomainEventStream($cursor, [], null, null, true, $cb);
283
284
        try {
285 1
            while ($events->hasNext()) {
286 1
                $visitor->doWithEvent($events->next());
287 1
            }
288 1
        } finally {
289 1
            $events->close();
290
        }
291 1
    }
292
293
    /**
294
     * Returns a CriteriaBuilderInterface that allows the construction of criteria for this EventStore implementation
295
     *
296
     * @return CriteriaBuilderInterface a builder to create Criteria for this Event Store.
297
     */
298
    public function newCriteriaBuilder()
299
    {
300
        return new MongoCriteriaBuilder();
301
    }
302
303
    /**
304
     * Sets a logger instance on the object
305
     *
306
     * @param LoggerInterface $logger
307
     * @return null
308
     */
309
    public function setLogger(LoggerInterface $logger)
310
    {
311
        $this->logger = $logger;
312
    }
313
314
    /**
315
     * Append the given <code>snapshotEvent</code> to the snapshot event log for the given type <code>type</code>. The
316
     * sequence number of the <code>snapshotEvent</code> must be equal to the sequence number of the last regular
317
     * domain
318
     * event that is included in the snapshot.
319
     * <p/>
320
     * Implementations may choose to prune snapshots upon appending a new snapshot, in order to minimize storage space.
321
     *
322
     * @param string $type The type of aggregate the event belongs to
323
     * @param DomainEventMessageInterface $snapshotEvent The event summarizing one or more domain events for a specific aggregate.
324
     * @throws ConcurrencyException
325
     */
326 5
    public function appendSnapshotEvent(
327
        $type,
328
        DomainEventMessageInterface $snapshotEvent
329
    ) {
330 5
        $dbObject = $this->storageStrategy->createDocuments($type, $this->eventSerializer, [$snapshotEvent]);
331
332
        try {
333 5
            $this->mongoTemplate->snapshotEventCollection()->batchInsert($dbObject);
334 5
        } catch (\MongoDuplicateKeyException $ex) {
335 1
            throw new ConcurrencyException(
336
                "Trying to insert a SnapshotEvent with aggregate identifier and sequence "
337 1
                ."number that is already present in the Event Store", null, $ex
338 1
            );
339
        }
340
341 5
        $this->logger->debug("snapshot event of type {type} appended.", ['type' => $type]);
342
343 5
    }
344
345
}