SnapshotStore::__construct()   A
last analyzed

Complexity

Conditions 2
Paths 2

Size

Total Lines 14
Code Lines 5

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
cc 2
eloc 5
nc 2
nop 6
dl 0
loc 14
rs 10
c 0
b 0
f 0
1
<?php
2
3
declare(strict_types=1);
4
5
namespace Blixit\EventSourcing\Store\SnapshotStore;
6
7
use Blixit\EventSourcing\Aggregate\AggregateRootInterface;
8
use Blixit\EventSourcing\Event\EventInterface;
9
use Blixit\EventSourcing\Messaging\DispatcherInterface;
10
use Blixit\EventSourcing\Store\EventStore;
11
use Blixit\EventSourcing\Store\Persistence\EventPersisterException;
12
use Blixit\EventSourcing\Store\Persistence\EventPersisterInterface;
13
use ReflectionException;
14
use function serialize;
15
use function unserialize;
16
17
class SnapshotStore extends EventStore
18
{
19
    // define this parameter as a snapshot parameter
20
    public const STEP = 10;
21
22
    /** @var SnapshotPersisterInterface $snapshotPersister */
23
    private $snapshotPersister;
24
25
    /** @var SnapshotConfiguration $configuration */
26
    private $configuration;
27
28
    public function __construct(
29
        string $aggregateClass,
30
        EventPersisterInterface $eventPersister,
31
        string $streamStrategyClass,
32
        SnapshotPersisterInterface $snapshotPersister,
33
        ?SnapshotConfiguration $configuration,
34
        ?DispatcherInterface $messageBus
35
    ) {
36
        parent::__construct($aggregateClass, $eventPersister, $streamStrategyClass, $messageBus);
37
        $this->snapshotPersister = $snapshotPersister;
38
39
        $this->configuration = empty($configuration)
40
            ? new SnapshotConfiguration(self::STEP, Snapshot::class)
41
            : $configuration;
42
    }
43
44
    /**
45
     * @throws EventPersisterException
46
     * @throws ReflectionException
47
     */
48
    protected function writeLoopIteration(AggregateRootInterface &$aggregateRoot, EventInterface &$event) : void
49
    {
50
        parent::writeLoopIteration($aggregateRoot, $event);
51
        $snapshotAggregate = $this->buildAggregatelocally($aggregateRoot->getAggregateId());
52
        if ($event->getSequence() < $snapshotAggregate->getSequence() + $this->configuration->getSteps()) {
53
            return;
54
        }
55
        $this->snapshotPersister->snapshot($this->toSnapshot($aggregateRoot));
56
    }
57
58
    /**
59
     * @param mixed $aggregateId
60
     *
61
     * @throws ReflectionException
62
     */
63
    protected function buildAggregatelocally($aggregateId) : AggregateRootInterface
64
    {
65
        $streamName = (string) $this->getStreamNameForAggregateId($aggregateId);
66
        $snapshot   = $this->snapshotPersister->get($streamName);
67
        return $this->toAggregate($snapshot) ?? $this->getEmptyAggregate($aggregateId);
68
    }
69
70
    /**
71
     * @param mixed $aggregateId
72
     *
73
     * @throws ReflectionException
74
     */
75
    protected function buildAggregate($aggregateId) : AggregateRootInterface
76
    {
77
        return $this->buildAggregatelocally($aggregateId);
78
    }
79
80
    protected function toSnapshot(AggregateRootInterface $aggregateRoot) : SnapshotInterface
81
    {
82
        $streamName    = (string) $this->getStreamNameForAggregateId($aggregateRoot->getAggregateId());
83
        $snapshotClass = $this->configuration->getSnapshotClass();
84
        return new $snapshotClass($streamName, serialize($aggregateRoot));
85
    }
86
87
    protected function toAggregate(?SnapshotInterface $snapshot = null) : ?AggregateRootInterface
88
    {
89
        return empty($snapshot)
90
            ? null
91
            : unserialize($snapshot->getPayload());
92
    }
93
}
94