DefaultUnitOfWork::isTransactional()   A
last analyzed

Complexity

Conditions 1
Paths 1

Size

Total Lines 4
Code Lines 2

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 2
CRAP Score 1

Importance

Changes 1
Bugs 0 Features 0
Metric Value
c 1
b 0
f 0
dl 0
loc 4
ccs 2
cts 2
cp 1
rs 10
cc 1
eloc 2
nc 1
nop 0
crap 1
1
<?php
2
3
/*
4
 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
5
 * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
6
 * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
7
 * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
8
 * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
9
 * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
10
 * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
11
 * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
12
 * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
13
 * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
14
 * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
15
 *
16
 * The software is based on the Axon Framework project which is
17
 * licensed under the Apache 2.0 license. For more information on the Axon Framework
18
 * see <http://www.axonframework.org/>.
19
 *
20
 * This software consists of voluntary contributions made by many individuals
21
 * and is licensed under the MIT license. For more information, see
22
 * <http://www.governor-framework.org/>.
23
 */
24
25
namespace Governor\Framework\UnitOfWork;
26
27
use Governor\Framework\Domain\AggregateRootInterface;
28
use Governor\Framework\EventHandling\EventBusInterface;
29
use Governor\Framework\Domain\EventMessageInterface;
30
use Governor\Framework\Domain\DomainEventMessageInterface;
31
32
/**
33
 * DefaultUnitOfWork.
34
 *
35
 * @author    "David Kalosi" <[email protected]>
36
 * @license   <a href="http://www.opensource.org/licenses/mit-license.php">MIT License</a>
37
 */
38
class DefaultUnitOfWork extends NestableUnitOfWork
39
{
40
41
    /**
42
     * @var AggregateRootInterface[]
43
     */
44
    private $registeredAggregates = [];
45
46
    /**
47
     * @var \SplObjectStorage
48
     */
49
    private $eventsToPublish;
50
51
    /**
52
     * @var UnitOfWorkListenerCollection
53
     */
54
    private $listeners;
55
56
    /**
57
     * @var integer
58
     */
59
    private $dispatcherStatus;
60
61
    /**
62
     * @var TransactionManagerInterface
63
     */
64
    private $transactionManager;
65
66
    /**
67
     * @var mixed
68
     */
69
    private $transaction;
70
71
    const STATUS_READY = 0;
72
    const STATUS_DISPATCHING = 1;
73
74 46
    public function __construct(TransactionManagerInterface $transactionManager = null)
75
    {
76 46
        parent::__construct();
77
78 46
        $this->listeners = new UnitOfWorkListenerCollection();
79 46
        $this->eventsToPublish = new \SplObjectStorage();
80 46
        $this->dispatcherStatus = self::STATUS_READY;
81 46
        $this->transactionManager = $transactionManager;
82 46
    }
83
84
    /**
85
     *
86
     * @param TransactionManagerInterface $transactionManager
87
     * @return UnitOfWorkInterface
88
     */
89 41
    public static function startAndGet(TransactionManagerInterface $transactionManager = null)
90
    {
91 41
        $uow = new DefaultUnitOfWork($transactionManager);
92 41
        $uow->start();
93
94 41
        return $uow;
95
    }
96
97 46
    public function isTransactional()
98
    {
99 46
        return null !== $this->transactionManager;
100
    }
101
102 24
    protected function doCommit()
103
    {
104 24
        $this->publishEvents();
105 23
        $this->commitInnerUnitOfWork();
106
107 23
        if ($this->isTransactional()) {
108 3
            $this->notifyListenersPrepareTransactionCommit(null);
109 2
            $this->transactionManager->commitTransaction($this->transaction);
110 2
        }
111
112 22
        $this->notifyListenersAfterCommit();
113 22
    }
114
115 27
    protected function doRollback(\Exception $ex = null)
116
    {
117 27
        $this->registeredAggregates = array();
118 27
        $this->eventsToPublish = new \SplObjectStorage();
119
120
        try {
121 27
            if (null !== $this->transaction) {
122 1
                $this->transactionManager->rollbackTransaction($this->transaction);
123 1
            }
124 27
        } finally {
125 27
            $this->notifyListenersRollback($ex);
126
        }
127 27
    }
128
129 27
    protected function notifyListenersRollback(\Exception $ex = null)
130
    {
131 27
        $this->listeners->onRollback($this, $ex);
132 27
    }
133
134 16
    public function registerAggregate(
135
        AggregateRootInterface $aggregateRoot,
136
        EventBusInterface $eventBus,
137
        SaveAggregateCallbackInterface $saveAggregateCallback
138
    ) {
139 16
        $similarAggregate = $this->findSimilarAggregate(
140 16
            get_class($aggregateRoot),
141 16
            $aggregateRoot->getIdentifier()
142 16
        );
143 16
        if (null !== $similarAggregate) {
144
145 1
            $this->logger->info(
146
                "Ignoring aggregate registration. An aggregate of same type and identifier was already".
147 1
                "registered in this Unit Of Work: type [{aggregate}], identifier [{identifier}]",
148 1
                array('aggregate' => get_class($aggregateRoot), 'identifier' => $aggregateRoot->getIdentifier())
149 1
            );
150
151 1
            return $similarAggregate;
152
        }
153
154 16
        $uow = $this;
155 16
        $eventRegistrationCallback = new UoWEventRegistrationCallback(
156 11
            function (DomainEventMessageInterface $event) use ($uow, $eventBus) {
157 11
                $event = $uow->invokeEventRegistrationListeners($event);
158 11
                $uow->eventsToPublishOn($event, $eventBus);
159
160 11
                return $event;
161
            }
162 16
        );
163
164 16
        $this->registeredAggregates[spl_object_hash($aggregateRoot)] = array(
165 16
            $aggregateRoot,
166
            $saveAggregateCallback
167 16
        );
168
169 16
        $this->logger->debug(
170 16
            "Registering aggregate {aggregate}",
171 16
            array('aggregate' => get_class($aggregateRoot))
172 16
        );
173
174
        // listen for new events registered in the aggregate
175 16
        $aggregateRoot->addEventRegistrationCallback($eventRegistrationCallback);
176
177 16
        return $aggregateRoot;
178
    }
179
180 26
    public function registerListener($listener)
181
    {
182 26
        $this->listeners->add($listener);
183 26
    }
184
185 16
    private function findSimilarAggregate($aggregateType, $identifier)
186
    {
187 16
        foreach ($this->registeredAggregates as $hash => $aggregateEntry) {
188 1
            list ($aggregate, $callback) = $aggregateEntry;
0 ignored issues
show
Unused Code introduced by
The assignment to $callback is unused. Consider omitting it like so list($first,,$third).

This checks looks for assignemnts to variables using the list(...) function, where not all assigned variables are subsequently used.

Consider the following code example.

<?php

function returnThreeValues() {
    return array('a', 'b', 'c');
}

list($a, $b, $c) = returnThreeValues();

print $a . " - " . $c;

Only the variables $a and $c are used. There was no need to assign $b.

Instead, the list call could have been.

list($a,, $c) = returnThreeValues();
Loading history...
189
190 1
            if (get_class($aggregate) === $aggregateType && $aggregate->getIdentifier()
191
                === $identifier
192 1
            ) {
193 1
                return $aggregate;
194
            }
195 16
        }
196
197 16
        return null;
198
    }
199
200 13
    private function eventsToPublishOn(
201
        EventMessageInterface $event,
202
        EventBusInterface $eventBus
203
    ) {
204 13
        if (!$this->eventsToPublish->contains($eventBus)) {
205 13
            $this->eventsToPublish->attach($eventBus, array($event));
206
207 13
            return;
208
        }
209
210 4
        $events = $this->eventsToPublish->offsetGet($eventBus);
211 4
        $events[] = $event;
212 4
        $this->eventsToPublish->offsetSet($eventBus, $events);
213 4
    }
214
215 13
    private function invokeEventRegistrationListeners(EventMessageInterface $event)
216
    {
217 13
        return $this->listeners->onEventRegistered($this, $event);
218
    }
219
220 2
    public function publishEvent(
221
        EventMessageInterface $event,
222
        EventBusInterface $eventBus
223
    ) {
224 2
        $event = $this->invokeEventRegistrationListeners($event);
225 2
        $this->eventsToPublishOn($event, $eventBus);
226 2
    }
227
228
    /**
229
     * Publishes all registered events to their respective event bus.
230
     */
231 24
    protected function publishEvents()
232
    {
233 24
        $this->logger->debug("Publishing events to the event bus");
234 24
        if ($this->dispatcherStatus == self::STATUS_DISPATCHING) {
235
            // this prevents events from overtaking each other
236
            $this->logger->debug(
237
                "UnitOfWork is already in the dispatch process. ".
238
                "That process will publish events instead. Aborting..."
239
            );
240
241
            return;
242
        }
243 24
        $this->dispatcherStatus = self::STATUS_DISPATCHING;
244 24
        $this->eventsToPublish->rewind();
245
246 24
        while ($this->eventsToPublish->valid()) {
247 12
            $bus = $this->eventsToPublish->current();
248 12
            $events = $this->eventsToPublish->getInfo();
249
250 12
            foreach ($events as $event) {
251 12
                $this->logger->debug(
252 12
                    "Publishing event [{event}] to event bus [{bus}]",
253 12
                    array('event' => $event->getPayloadType(), 'bus' => get_class($bus))
254 12
                );
255 12
            }
256
257
            // clear and send
258 12
            $this->eventsToPublish->setInfo(array());
259 12
            $bus->publish($events);
260
261 11
            $this->eventsToPublish->next();
262 11
        }
263
264 23
        $this->logger->debug("All events successfully published.");
265 23
        $this->dispatcherStatus = self::STATUS_READY;
266 23
    }
267
268 46
    protected function doStart()
269
    {
270 46
        if ($this->isTransactional()) {
271 4
            $this->transaction = $this->transactionManager->startTransaction();
272 4
        }
273 46
    }
274
275 28
    protected function notifyListenersPrepareCommit()
276
    {
277 28
        $list = array();
278
279 28
        foreach ($this->registeredAggregates as $aggregateEntry) {
280 12
            $list[] = $aggregateEntry[0];
281 28
        }
282
283 28
        $this->listeners->onPrepareCommit($this, $list, $this->eventsToPublish());
284 27
    }
285
286
    /**
287
     * Send a {@link UnitOfWorkListener#afterCommit(UnitOfWork)} notification to all registered listeners.
288
     *
289
     * @param mixed $transaction The object representing the transaction to about to be committed
290
     */
291 3
    protected function notifyListenersPrepareTransactionCommit($transaction)
292
    {
293 3
        $this->listeners->onPrepareTransactionCommit($this, $transaction);
294 2
    }
295
296 28
    private function eventsToPublish()
297
    {
298 28
        $events = array();
299
300 28
        $this->eventsToPublish->rewind();
301 28
        while ($this->eventsToPublish->valid()) {
302 11
            $events = array_merge($events, $this->eventsToPublish->getInfo());
303 11
            $this->eventsToPublish->next();
304 11
        }
305
306 28
        return $events;
307
    }
308
309 44
    protected function notifyListenersCleanup()
310
    {
311 44
        $this->listeners->onCleanup($this);
312 44
    }
313
314 22
    protected function notifyListenersAfterCommit()
315
    {
316 22
        $this->listeners->afterCommit($this);
317 22
    }
318
319 27
    protected function saveAggregates()
320
    {
321 27
        $this->logger->debug("Persisting changes to aggregates");
322 27
        foreach ($this->registeredAggregates as $aggregateEntry) {
323 12
            list ($aggregate, $callback) = $aggregateEntry;
324
325 12
            $this->logger->debug(
326 12
                "Persisting changes to [{aggregate}], identifier: [{id}]",
327
                array(
328 12
                    'aggregate' => get_class($aggregate),
329 12
                    'id' => $aggregate->getIdentifier()
330 12
                )
331 12
            );
332
333 12
            $callback->save($aggregate);
334 26
        }
335 26
        $this->logger->debug("Aggregates successfully persisted");
336 26
        $this->registeredAggregates = array();
337 26
    }
338
339
}
340