Repository   A
last analyzed

Complexity

Total Complexity 15

Size/Duplication

Total Lines 147
Duplicated Lines 0 %

Coupling/Cohesion

Components 1
Dependencies 9

Importance

Changes 7
Bugs 0 Features 5
Metric Value
wmc 15
c 7
b 0
f 5
lcom 1
cbo 9
dl 0
loc 147
rs 10

6 Methods

Rating   Name   Duplication   Size   Complexity  
A __construct() 0 8 1
A persist() 0 14 3
A find() 0 18 4
B publish() 0 24 3
A broadcast() 0 6 2
A assertInstanceOf() 0 10 2
1
<?php
2
/*
3
 * This file is part of the Borobudur-Event-Sourcing package.
4
 *
5
 * (c) Hexacodelabs <http://hexacodelabs.com>
6
 *
7
 * For the full copyright and license information, please view the LICENSE
8
 * file that was distributed with this source code.
9
 */
10
11
namespace Borobudur\EventSourcing\Repository;
12
13
use Borobudur\Broadcasting\Broadcaster;
14
use Borobudur\Broadcasting\BroadcastEventInterface;
15
use Borobudur\Bus\BusProxy;
16
use Borobudur\Cqrs\IdentifierInterface;
17
use Borobudur\Cqrs\Message\DomainEventInterface;
18
use Borobudur\EventSourcing\BroadcastEventMessage;
19
use Borobudur\EventSourcing\Domain\DomainEventStreamInterface;
20
use Borobudur\EventSourcing\Entity\AggregateRootInterface;
21
use Borobudur\EventSourcing\Exception\LogicException;
22
use Borobudur\EventSourcing\Snapshot\SnapshotInterface;
23
use Borobudur\EventSourcing\Storage\StorageInterface;
24
25
/**
26
 * @author      Iqbal Maulana <[email protected]>
27
 * @created     8/20/15
28
 */
29
class Repository implements RepositoryInterface
30
{
31
    /**
32
     * @var StorageInterface
33
     */
34
    private $storage;
35
36
    /**
37
     * @var SnapshotInterface
38
     */
39
    private $snapshot;
40
41
    /**
42
     * @var BusProxy
43
     */
44
    private $eventBus;
45
46
    /**
47
     * @var string
48
     */
49
    private $aggregateRootClass;
50
51
    /**
52
     * @var string
53
     */
54
    private $table;
55
56
    /**
57
     * Constructor.
58
     *
59
     * @param StorageInterface $storage
60
     * @param BusProxy         $eventBus
61
     * @param string           $aggregateRootClass
62
     * @param string           $table
63
     */
64
    public function __construct(StorageInterface $storage, BusProxy $eventBus, $aggregateRootClass, $table)
65
    {
66
        $this->storage = $storage;
67
        $this->snapshot = $storage->getSnapshot();
68
        $this->eventBus = $eventBus;
69
        $this->aggregateRootClass = $aggregateRootClass;
70
        $this->table = $table;
71
    }
72
73
    /**
74
     * {@inheritdoc}
75
     */
76
    public function persist(AggregateRootInterface $aggregateRoot)
77
    {
78
        $this->assertInstanceOf($aggregateRoot, $this->aggregateRootClass);
79
        if (null !== $this->snapshot) {
80
            $this->snapshot->append($aggregateRoot, $this->table);
81
        }
82
83
        $stream = $aggregateRoot->getUncommittedEvents();
84
        if ($stream->count()) {
85
            return $this->publish($stream);
86
        }
87
88
        return null;
89
    }
90
91
    /**
92
     * {@inheritdoc}
93
     */
94
    public function find(IdentifierInterface $id)
95
    {
96
        $class = $this->aggregateRootClass;
97
        $aggregate = null;
98
        if (null !== $this->snapshot) {
99
            $aggregate = $this->snapshot->find($id, $class, $this->table);
100
        }
101
102
        if (null === $aggregate) {
103
            return $class::{'build'}($this->storage->findEventFromBeginning($this->table, $id));
104
        }
105
106
        if (null !== $events = $this->storage->findEventFromVersion($this->table, $id, $aggregate->getVersion())) {
107
            $aggregate->replay($events);
108
        }
109
110
        return $aggregate;
111
    }
112
113
    /**
114
     * Publish domain events in stream.
115
     *
116
     * @param DomainEventStreamInterface $stream
117
     *
118
     * @return array
119
     * @throws \Exception
120
     */
121
    private function publish(DomainEventStreamInterface $stream)
122
    {
123
        $results = array();
124
        foreach ($stream->all() as $domain) {
125
            try {
126
                $results[] = $this->eventBus->dispatch($domain->getPayload());
127
                $this->storage->append($this->table, $domain, 'OK');
128
                $this->broadcast($domain->getPayload(), array('status' => 'OK', 'exception' => null));
129
            } catch (\Exception $e) {
130
                $metadata = array(
131
                    'code'    => $e->getCode(),
132
                    'file'    => $e->getFile(),
133
                    'line'    => $e->getLine(),
134
                    'message' => $e->getMessage(),
135
                );
136
                $this->storage->append($this->table, $domain, 'ERROR', $metadata);
137
                $this->broadcast($domain->getPayload(), array('status' => 'ERROR', 'exception' => $metadata));
138
139
                throw $e;
140
            }
141
        }
142
143
        return $results;
144
    }
145
146
    /**
147
     * Broadcast event.
148
     *
149
     * @param DomainEventInterface $event
150
     * @param array                $metadata
151
     */
152
    private function broadcast(DomainEventInterface $event, array $metadata)
153
    {
154
        if ($event instanceof BroadcastEventInterface) {
155
            Broadcaster::getInstance()->broadcast(new BroadcastEventMessage($event, $metadata));
156
        }
157
    }
158
159
    /**
160
     * Assert that aggregate root should be instance of $className
161
     *
162
     * @param AggregateRootInterface $aggregateRoot
163
     * @param string                 $className
164
     */
165
    private function assertInstanceOf(AggregateRootInterface $aggregateRoot, $className)
166
    {
167
        if (!$aggregateRoot instanceof $className) {
168
            throw new LogicException(sprintf(
169
                'Class "%s" was expected to be instanceof of "%s" but is not.',
170
                get_class($aggregateRoot),
171
                $className
172
            ));
173
        }
174
    }
175
}
176