Completed
Push — master ( ca729a...c6cc13 )
by Constantin
02:37
created

DefaultCommandDispatcher::canExecuteCommand()   A

Complexity

Conditions 2
Paths 3

Size

Total Lines 10
Code Lines 7

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 6
CRAP Score 2

Importance

Changes 0
Metric Value
dl 0
loc 10
c 0
b 0
f 0
ccs 6
cts 6
cp 1
rs 9.4285
cc 2
eloc 7
nc 3
nop 1
crap 2
1
<?php
2
/**
3
 * Copyright (c) 2017 Constantin Galbenu <[email protected]>
4
 */
5
6
namespace Gica\Cqrs\Command\CommandDispatcher;
7
8
use Gica\Cqrs\Aggregate\AggregateRepository;
9
use Gica\Cqrs\Command;
10
use Gica\Cqrs\Command\CommandApplier;
11
use Gica\Cqrs\Command\CommandDispatcher;
12
use Gica\Cqrs\Command\CommandSubscriber;
13
use Gica\Cqrs\Command\CommandWithMetadata;
14
use Gica\Cqrs\Command\MetadataWrapper as CommandMetadataFactory;
15
use Gica\Cqrs\Command\ValueObject\CommandHandlerAndAggregate;
16
use Gica\Cqrs\Event\EventDispatcher;
17
use Gica\Cqrs\Event\EventsApplier\EventsApplierOnAggregate;
18
use Gica\Cqrs\Event\EventWithMetaData;
19
use Gica\Cqrs\Event\MetaData;
20
use Gica\Cqrs\Event\MetadataFactory as EventMetadataFactory;
21
use Gica\Cqrs\Event\ScheduledEvent;
22
use Gica\Cqrs\FutureEventsStore;
23
use Gica\Cqrs\Scheduling\CommandScheduler;
24
use Gica\Cqrs\Scheduling\ScheduledCommand;
25
use Gica\Types\Guid;
26
27
class DefaultCommandDispatcher implements CommandDispatcher
28
{
29
    const MAXIMUM_SAVE_RETRIES = 50;
30
31
    /**
32
     * @var CommandSubscriber
33
     */
34
    private $commandSubscriber;
35
    /**
36
     * @var EventDispatcher
37
     */
38
    private $eventDispatcher;
39
    /**
40
     * @var CommandApplier
41
     */
42
    private $commandApplier;
43
    /**
44
     * @var AggregateRepository
45
     */
46
    private $aggregateRepository;
47
    /**
48
     * @var ConcurrentProofFunctionCaller
49
     */
50
    private $concurrentProofFunctionCaller;
51
    /**
52
     * @var FutureEventsStore|null
53
     */
54
    private $futureEventsStore;
55
    /**
56
     * @var EventsApplierOnAggregate
57
     */
58
    private $eventsApplierOnAggregate;
59
    /**
60
     * @var CommandScheduler|null
61
     */
62
    private $commandScheduler;
63
    /**
64
     * @var EventMetadataFactory
65
     */
66
    private $eventMetadataFactory;
67
    /**
68
     * @var CommandMetadataFactory
69
     */
70
    private $commandMetadataFactory;
71
72 4
    public function __construct(
73
        CommandSubscriber $commandSubscriber,
74
        EventDispatcher $eventDispatcher,
75
        CommandApplier $commandApplier,
76
        AggregateRepository $aggregateRepository,
77
        ConcurrentProofFunctionCaller $functionCaller,
78
        EventsApplierOnAggregate $eventsApplier,
79
        EventMetadataFactory $eventMetadataFactory,
80
        CommandMetadataFactory $commandMetadataFactory,
81
        ?FutureEventsStore $futureEventsStore = null,
82
        ?CommandScheduler $commandScheduler = null
83
    )
84
    {
85 4
        $this->commandSubscriber = $commandSubscriber;
86 4
        $this->eventDispatcher = $eventDispatcher;
87 4
        $this->commandApplier = $commandApplier;
88 4
        $this->aggregateRepository = $aggregateRepository;
89 4
        $this->concurrentProofFunctionCaller = $functionCaller;
90 4
        $this->futureEventsStore = $futureEventsStore;
91 4
        $this->eventsApplierOnAggregate = $eventsApplier;
92 4
        $this->commandScheduler = $commandScheduler;
93 4
        $this->eventMetadataFactory = $eventMetadataFactory;
94 4
        $this->commandMetadataFactory = $commandMetadataFactory;
95 4
    }
96
97 4
    public function dispatchCommand(Command $command, $metadata = null)
98
    {
99 4
        $command = $this->commandMetadataFactory->wrapCommandWithMetadata($command, $metadata);
100
101
        /** @var EventWithMetaData[] $eventsWithMetaData */
102
        /** @var ScheduledCommand[] $scheduledCommands */
103
104 4
        list($eventsWithMetaData, $futureEventsWithMeta, $scheduledCommands, $aggregateClass) = $this->concurrentProofFunctionCaller->executeFunction(function () use ($command) {
105 4
            return $this->tryDispatchCommandAndSaveAggregate($command);
106 4
        }, $this->getMaximumCommandRetryCount());
107
108 3
        foreach ($eventsWithMetaData as $eventWithMetaData) {
109 3
            $this->eventDispatcher->dispatchEvent($eventWithMetaData);
110
        }
111
112 3
        if ($this->futureEventsStore && !empty($futureEventsWithMeta)) {
113 1
            $this->futureEventsStore->scheduleEvents($futureEventsWithMeta);
114
        }
115
116 3
        if ($this->commandScheduler && !empty($scheduledCommands)) {
117 1
            foreach ($scheduledCommands as $scheduledCommand) {
118 1
                $this->commandScheduler->scheduleCommand($scheduledCommand, $aggregateClass, $command->getAggregateId(), $metadata);
119
            }
120
        }
121 3
    }
122
123 4
    private function tryDispatchCommandAndSaveAggregate(CommandWithMetadata $command)
124
    {
125 4
        $handlerAndAggregate = $this->loadCommandHandlerAndAggregate($command);
126
127 4
        list($eventsForNow, $eventsForTheFuture, $scheduledCommands) = $this->applyCommandAndReturnMessages($command, $handlerAndAggregate);
128
129 3
        $eventsForNow = $this->aggregateRepository->saveAggregate($command->getAggregateId(), $handlerAndAggregate->getAggregate(), $eventsForNow);
130
131 3
        return [$eventsForNow, $eventsForTheFuture, $scheduledCommands, $handlerAndAggregate->getCommandHandler()->getHandlerClass()];
132
    }
133
134 4
    private function loadCommandHandlerAndAggregate(CommandWithMetadata $command): CommandHandlerAndAggregate
135
    {
136 4
        $handler = $this->commandSubscriber->getHandlerForCommand($command->getCommand());
137
138 4
        $aggregate = $this->aggregateRepository->loadAggregate($handler->getHandlerClass(), $command->getAggregateId());
139
140 4
        return new CommandHandlerAndAggregate($handler, $aggregate);
141
    }
142
143 3
    private function decorateEventWithMetaData($event, MetaData $metaData): EventWithMetaData
144
    {
145 3
        return new EventWithMetaData($event, $metaData->withEventId(Guid::generate()));
146
    }
147
148
    /**
149
     * @param CommandWithMetadata $command
150
     * @param CommandHandlerAndAggregate $handlerAndAggregate
151
     * @return array
152
     */
153 4
    private function applyCommandAndReturnMessages(CommandWithMetadata $command, CommandHandlerAndAggregate $handlerAndAggregate)
154
    {
155 4
        $aggregate = $handlerAndAggregate->getAggregate();
156 4
        $handler = $handlerAndAggregate->getCommandHandler();
157
158 4
        $metaData = $this->eventMetadataFactory->factoryEventMetadata($command, $aggregate);
159
160 4
        $newMessageGenerator = $this->commandApplier->applyCommand($aggregate, $command->getCommand(), $handler->getMethodName());
161
162
        /** @var EventWithMetaData[] $eventsWithMetaData */
163 4
        $eventsWithMetaData = [];
164
165
        /** @var EventWithMetaData[] $scheduledEvents */
166 4
        $scheduledEvents = [];
167
168
        /** @var ScheduledCommand[] $scheduledCommands */
169 4
        $scheduledCommands = [];
170
171 4
        foreach ($newMessageGenerator as $message) {
172 3
            if ($this->isScheduledCommand($message)) {
173 1
                $scheduledCommands[] = $message;
174
            } else {
175 3
                $eventWithMetaData = $this->decorateEventWithMetaData($message, $metaData);
176 3
                if (!$this->isScheduledEvent($message)) {
177 3
                    $this->eventsApplierOnAggregate->applyEventsOnAggregate($aggregate, [$eventWithMetaData]);
178 3
                    $eventsWithMetaData[] = $eventWithMetaData;
179
                } else {
180 1
                    $scheduledEvents[] = $eventWithMetaData;
181
                }
182
            }
183
        }
184
185 3
        return [$eventsWithMetaData, $scheduledEvents, $scheduledCommands];
186
    }
187
188 3
    private function isScheduledEvent($event): bool
189
    {
190 3
        return $event instanceof ScheduledEvent;
191
    }
192
193 3
    private function isScheduledCommand($message): bool
194
    {
195 3
        return $message instanceof ScheduledCommand;
196
    }
197
198 4
    protected function getMaximumCommandRetryCount(): int
199
    {
200 4
        return self::MAXIMUM_SAVE_RETRIES;
201
    }
202
}