Completed
Push — master ( 0961a5...b8d325 )
by Constantin
04:43
created

DefaultCommandDispatcher   B

Complexity

Total Complexity 20

Size/Duplication

Total Lines 187
Duplicated Lines 0 %

Coupling/Cohesion

Components 1
Dependencies 16

Test Coverage

Coverage 100%

Importance

Changes 0
Metric Value
wmc 20
lcom 1
cbo 16
dl 0
loc 187
ccs 65
cts 65
cp 1
rs 8.4614
c 0
b 0
f 0

10 Methods

Rating   Name   Duplication   Size   Complexity  
B __construct() 0 24 1
C dispatchCommand() 0 25 7
A tryDispatchCommandAndSaveAggregate() 0 10 1
A canExecuteCommand() 0 10 2
A loadCommandHandlerAndAggregate() 0 8 1
A decorateEventWithMetaData() 0 4 1
B applyCommandAndReturnMessages() 0 34 4
A isScheduledEvent() 0 4 1
A isScheduledCommand() 0 4 1
A getMaximumCommandRetryCount() 0 4 1
1
<?php
2
/**
3
 * Copyright (c) 2017 Constantin Galbenu <[email protected]>
4
 */
5
6
namespace Gica\Cqrs\Command\CommandDispatcher;
7
8
use Gica\Cqrs\Aggregate\AggregateRepository;
9
use Gica\Cqrs\Command;
10
use Gica\Cqrs\Command\CommandApplier;
11
use Gica\Cqrs\Command\CommandDispatcher;
12
use Gica\Cqrs\Command\CommandSubscriber;
13
use Gica\Cqrs\Command\CommandWithMetadata;
14
use Gica\Cqrs\Command\MetadataWrapper as CommandMetadataFactory;
15
use Gica\Cqrs\Command\ValueObject\CommandHandlerAndAggregate;
16
use Gica\Cqrs\Event\EventDispatcher;
17
use Gica\Cqrs\Event\EventsApplier\EventsApplierOnAggregate;
18
use Gica\Cqrs\Event\EventWithMetaData;
19
use Gica\Cqrs\Event\MetaData;
20
use Gica\Cqrs\Event\MetadataFactory as EventMetadataFactory;
21
use Gica\Cqrs\Event\ScheduledEvent;
22
use Gica\Cqrs\FutureEventsStore;
23
use Gica\Cqrs\Scheduling\CommandScheduler;
24
use Gica\Cqrs\Scheduling\ScheduledCommand;
25
use Gica\Types\Guid;
26
27
class DefaultCommandDispatcher implements CommandDispatcher
28
{
29
    const MAXIMUM_SAVE_RETRIES = 50;
30
31
    /**
32
     * @var CommandSubscriber
33
     */
34
    private $commandSubscriber;
35
    /**
36
     * @var EventDispatcher
37
     */
38
    private $eventDispatcher;
39
    /**
40
     * @var CommandApplier
41
     */
42
    private $commandApplier;
43
    /**
44
     * @var AggregateRepository
45
     */
46
    private $aggregateRepository;
47
    /**
48
     * @var ConcurrentProofFunctionCaller
49
     */
50
    private $concurrentProofFunctionCaller;
51
    /**
52
     * @var FutureEventsStore|null
53
     */
54
    private $futureEventsStore;
55
    /**
56
     * @var EventsApplierOnAggregate
57
     */
58
    private $eventsApplierOnAggregate;
59
    /**
60
     * @var CommandScheduler|null
61
     */
62
    private $commandScheduler;
63
    /**
64
     * @var EventMetadataFactory
65
     */
66
    private $eventMetadataFactory;
67
    /**
68
     * @var CommandMetadataFactory
69
     */
70
    private $commandMetadataFactory;
71
72 5
    public function __construct(
73
        CommandSubscriber $commandSubscriber,
74
        EventDispatcher $eventDispatcher,
75
        CommandApplier $commandApplier,
76
        AggregateRepository $aggregateRepository,
77
        ConcurrentProofFunctionCaller $functionCaller,
78
        EventsApplierOnAggregate $eventsApplier,
79
        EventMetadataFactory $eventMetadataFactory,
80
        CommandMetadataFactory $commandMetadataFactory,
81
        ?FutureEventsStore $futureEventsStore = null,
82
        ?CommandScheduler $commandScheduler = null
83
    )
84
    {
85 5
        $this->commandSubscriber = $commandSubscriber;
86 5
        $this->eventDispatcher = $eventDispatcher;
87 5
        $this->commandApplier = $commandApplier;
88 5
        $this->aggregateRepository = $aggregateRepository;
89 5
        $this->concurrentProofFunctionCaller = $functionCaller;
90 5
        $this->futureEventsStore = $futureEventsStore;
91 5
        $this->eventsApplierOnAggregate = $eventsApplier;
92 5
        $this->commandScheduler = $commandScheduler;
93 5
        $this->eventMetadataFactory = $eventMetadataFactory;
94 5
        $this->commandMetadataFactory = $commandMetadataFactory;
95 5
    }
96
97 4
    public function dispatchCommand(Command $command, $metadata = null)
98
    {
99 4
        $command = $this->commandMetadataFactory->wrapCommandWithMetadata($command, $metadata);
100
101
        /** @var EventWithMetaData[] $eventsWithMetaData */
102
        /** @var ScheduledCommand[] $scheduledCommands */
103
104 4
        list($eventsWithMetaData, $futureEventsWithMeta, $scheduledCommands, $aggregateClass) = $this->concurrentProofFunctionCaller->executeFunction(function () use ($command) {
105 4
            return $this->tryDispatchCommandAndSaveAggregate($command);
106 4
        }, $this->getMaximumCommandRetryCount());
107
108 3
        foreach ($eventsWithMetaData as $eventWithMetaData) {
109 3
            $this->eventDispatcher->dispatchEvent($eventWithMetaData);
110
        }
111
112 3
        if ($this->futureEventsStore && !empty($futureEventsWithMeta)) {
113 1
            $this->futureEventsStore->scheduleEvents($futureEventsWithMeta);
114
        }
115
116 3
        if ($this->commandScheduler && !empty($scheduledCommands)) {
117 1
            foreach ($scheduledCommands as $scheduledCommand) {
118 1
                $this->commandScheduler->scheduleCommand($scheduledCommand, $aggregateClass, $command->getAggregateId(), $metadata);
119
            }
120
        }
121 3
    }
122
123 4
    private function tryDispatchCommandAndSaveAggregate(CommandWithMetadata $command)
124
    {
125 4
        $handlerAndAggregate = $this->loadCommandHandlerAndAggregate($command);
126
127 4
        list($eventsForNow, $eventsForTheFuture, $scheduledCommands) = $this->applyCommandAndReturnMessages($command, $handlerAndAggregate);
128
129 3
        $eventsForNow = $this->aggregateRepository->saveAggregate($command->getAggregateId(), $handlerAndAggregate->getAggregate(), $eventsForNow);
130
131 3
        return [$eventsForNow, $eventsForTheFuture, $scheduledCommands, $handlerAndAggregate->getCommandHandler()->getHandlerClass()];
132
    }
133
134 3
    public function canExecuteCommand(Command $command): bool
135
    {
136
        try {
137 3
            $command = $this->commandMetadataFactory->wrapCommandWithMetadata($command, null);
138 3
            $this->applyCommandAndReturnMessages($command, $this->loadCommandHandlerAndAggregate($command));
139 2
            return true;
140 1
        } catch (\Exception $exception) {
141 1
            return false;
142
        }
143
    }
144
145 5
    private function loadCommandHandlerAndAggregate(CommandWithMetadata $command): CommandHandlerAndAggregate
146
    {
147 5
        $handler = $this->commandSubscriber->getHandlerForCommand($command->getCommand());
148
149 5
        $aggregate = $this->aggregateRepository->loadAggregate($handler->getHandlerClass(), $command->getAggregateId());
150
151 5
        return new CommandHandlerAndAggregate($handler, $aggregate);
152
    }
153
154 3
    private function decorateEventWithMetaData($event, MetaData $metaData): EventWithMetaData
155
    {
156 3
        return new EventWithMetaData($event, $metaData->withEventId(Guid::generate()));
157
    }
158
159
    /**
160
     * @param CommandWithMetadata $command
161
     * @param CommandHandlerAndAggregate $handlerAndAggregate
162
     * @return array
163
     */
164 5
    private function applyCommandAndReturnMessages(CommandWithMetadata $command, CommandHandlerAndAggregate $handlerAndAggregate)
165
    {
166 5
        $aggregate = $handlerAndAggregate->getAggregate();
167 5
        $handler = $handlerAndAggregate->getCommandHandler();
168
169 5
        $metaData = $this->eventMetadataFactory->factoryEventMetadata($command, $aggregate);
170
171 5
        $newMessageGenerator = $this->commandApplier->applyCommand($aggregate, $command->getCommand(), $handler->getMethodName());
172
173
        /** @var EventWithMetaData[] $eventsWithMetaData */
174 5
        $eventsWithMetaData = [];
175
176
        /** @var EventWithMetaData[] $scheduledEvents */
177 5
        $scheduledEvents = [];
178
179
        /** @var ScheduledCommand[] $scheduledCommands */
180 5
        $scheduledCommands = [];
181
182 5
        foreach ($newMessageGenerator as $message) {
183 3
            if ($this->isScheduledCommand($message)) {
184 1
                $scheduledCommands[] = $message;
185
            } else {
186 3
                $eventWithMetaData = $this->decorateEventWithMetaData($message, $metaData);
187 3
                if (!$this->isScheduledEvent($message)) {
188 3
                    $this->eventsApplierOnAggregate->applyEventsOnAggregate($aggregate, [$eventWithMetaData]);
189 3
                    $eventsWithMetaData[] = $eventWithMetaData;
190
                } else {
191 3
                    $scheduledEvents[] = $eventWithMetaData;
192
                }
193
            }
194
        }
195
196 3
        return [$eventsWithMetaData, $scheduledEvents, $scheduledCommands];
197
    }
198
199 3
    private function isScheduledEvent($event): bool
200
    {
201 3
        return $event instanceof ScheduledEvent;
202
    }
203
204 3
    private function isScheduledCommand($message): bool
205
    {
206 3
        return $message instanceof ScheduledCommand;
207
    }
208
209 4
    protected function getMaximumCommandRetryCount(): int
210
    {
211 4
        return self::MAXIMUM_SAVE_RETRIES;
212
    }
213
}