tryDispatchCommandAndSaveAggregate()   A
last analyzed

Complexity

Conditions 1
Paths 1

Size

Total Lines 10
Code Lines 5

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 5
CRAP Score 1

Importance

Changes 0
Metric Value
dl 0
loc 10
c 0
b 0
f 0
ccs 5
cts 5
cp 1
rs 9.4285
cc 1
eloc 5
nc 1
nop 1
crap 1
1
<?php
2
/**
3
 * Copyright (c) 2017 Constantin Galbenu <[email protected]>
4
 */
5
6
namespace Gica\Cqrs\Command\CommandDispatcher;
7
8
use Gica\Cqrs\Aggregate\AggregateRepository;
9
use Gica\Cqrs\Command;
10
use Gica\Cqrs\Command\CommandApplier;
11
use Gica\Cqrs\Command\CommandDispatcher;
12
use Gica\Cqrs\Command\CommandMetadata;
13
use Gica\Cqrs\Command\CommandSubscriber;
14
use Gica\Cqrs\Command\CommandWithMetadata;
15
use Gica\Cqrs\Command\MetadataWrapper as CommandMetadataFactory;
16
use Gica\Cqrs\Command\ValueObject\CommandHandlerAndAggregate;
17
use Gica\Cqrs\Event\EventDispatcher;
18
use Gica\Cqrs\Event\EventsApplier\EventsApplierOnAggregate;
19
use Gica\Cqrs\Event\EventWithMetaData;
20
use Gica\Cqrs\Event\MetaData;
21
use Gica\Cqrs\Event\MetadataFactory as EventMetadataFactory;
22
use Gica\Cqrs\Event\ScheduledEvent;
23
use Gica\Cqrs\FutureEventsStore;
24
use Gica\Cqrs\Scheduling\CommandScheduler;
25
use Gica\Cqrs\Scheduling\ScheduledCommand;
26
use Gica\Types\Guid;
27
28
class DefaultCommandDispatcher implements CommandDispatcher
29
{
30
    const MAXIMUM_SAVE_RETRIES = 50;
31
32
    /**
33
     * @var CommandSubscriber
34
     */
35
    private $commandSubscriber;
36
    /**
37
     * @var EventDispatcher
38
     */
39
    private $eventDispatcher;
40
    /**
41
     * @var CommandApplier
42
     */
43
    private $commandApplier;
44
    /**
45
     * @var AggregateRepository
46
     */
47
    private $aggregateRepository;
48
    /**
49
     * @var ConcurrentProofFunctionCaller
50
     */
51
    private $concurrentProofFunctionCaller;
52
    /**
53
     * @var FutureEventsStore|null
54
     */
55
    private $futureEventsStore;
56
    /**
57
     * @var EventsApplierOnAggregate
58
     */
59
    private $eventsApplierOnAggregate;
60
    /**
61
     * @var CommandScheduler|null
62
     */
63
    private $commandScheduler;
64
    /**
65
     * @var EventMetadataFactory
66
     */
67
    private $eventMetadataFactory;
68
    /**
69
     * @var CommandMetadataFactory
70
     */
71
    private $commandMetadataFactory;
72
73 4
    public function __construct(
74
        CommandSubscriber $commandSubscriber,
75
        EventDispatcher $eventDispatcher,
76
        CommandApplier $commandApplier,
77
        AggregateRepository $aggregateRepository,
78
        ConcurrentProofFunctionCaller $functionCaller,
79
        EventsApplierOnAggregate $eventsApplier,
80
        EventMetadataFactory $eventMetadataFactory,
81
        CommandMetadataFactory $commandMetadataFactory,
82
        ?FutureEventsStore $futureEventsStore = null,
83
        ?CommandScheduler $commandScheduler = null
84
    )
85
    {
86 4
        $this->commandSubscriber = $commandSubscriber;
87 4
        $this->eventDispatcher = $eventDispatcher;
88 4
        $this->commandApplier = $commandApplier;
89 4
        $this->aggregateRepository = $aggregateRepository;
90 4
        $this->concurrentProofFunctionCaller = $functionCaller;
91 4
        $this->futureEventsStore = $futureEventsStore;
92 4
        $this->eventsApplierOnAggregate = $eventsApplier;
93 4
        $this->commandScheduler = $commandScheduler;
94 4
        $this->eventMetadataFactory = $eventMetadataFactory;
95 4
        $this->commandMetadataFactory = $commandMetadataFactory;
96 4
    }
97
98 4
    public function dispatchCommand(Command $command, CommandMetadata $metadata = null)
99
    {
100 4
        $command = $this->commandMetadataFactory->wrapCommandWithMetadata($command, $metadata);
101
102
        /** @var EventWithMetaData[] $eventsWithMetaData */
103
        /** @var ScheduledCommand[] $scheduledCommands */
104
105 4
        list($eventsWithMetaData, $futureEventsWithMeta, $scheduledCommands, $aggregateClass) = $this->concurrentProofFunctionCaller->executeFunction(function () use ($command) {
106 4
            return $this->tryDispatchCommandAndSaveAggregate($command);
107 4
        }, $this->getMaximumCommandRetryCount());
108
109 3
        foreach ($eventsWithMetaData as $eventWithMetaData) {
110 3
            $this->eventDispatcher->dispatchEvent($eventWithMetaData);
111
        }
112
113 3
        if ($this->futureEventsStore && !empty($futureEventsWithMeta)) {
114 1
            $this->futureEventsStore->scheduleEvents($futureEventsWithMeta);
115
        }
116
117 3
        if ($this->commandScheduler && !empty($scheduledCommands)) {
118 1
            foreach ($scheduledCommands as $scheduledCommand) {
119 1
                $this->commandScheduler->scheduleCommand($scheduledCommand, $aggregateClass, $command->getAggregateId(), $metadata);
120
            }
121
        }
122 3
    }
123
124 4
    private function tryDispatchCommandAndSaveAggregate(CommandWithMetadata $command)
125
    {
126 4
        $handlerAndAggregate = $this->loadCommandHandlerAndAggregate($command);
127
128 4
        list($eventsForNow, $eventsForTheFuture, $scheduledCommands) = $this->applyCommandAndReturnMessages($command, $handlerAndAggregate);
129
130 3
        $eventsForNow = $this->aggregateRepository->saveAggregate($command->getAggregateId(), $handlerAndAggregate->getAggregate(), $eventsForNow);
131
132 3
        return [$eventsForNow, $eventsForTheFuture, $scheduledCommands, $handlerAndAggregate->getCommandHandler()->getHandlerClass()];
133
    }
134
135 4
    private function loadCommandHandlerAndAggregate(CommandWithMetadata $command): CommandHandlerAndAggregate
136
    {
137 4
        $handler = $this->commandSubscriber->getHandlerForCommand($command->getCommand());
138
139 4
        $aggregate = $this->aggregateRepository->loadAggregate($handler->getHandlerClass(), $command->getAggregateId());
140
141 4
        return new CommandHandlerAndAggregate($handler, $aggregate);
142
    }
143
144 3
    private function decorateEventWithMetaData($event, MetaData $metaData): EventWithMetaData
145
    {
146 3
        return new EventWithMetaData($event, $metaData->withEventId(Guid::generate()));
147
    }
148
149
    /**
150
     * @param CommandWithMetadata $command
151
     * @param CommandHandlerAndAggregate $handlerAndAggregate
152
     * @return array
153
     */
154 4
    private function applyCommandAndReturnMessages(CommandWithMetadata $command, CommandHandlerAndAggregate $handlerAndAggregate)
155
    {
156 4
        $aggregate = $handlerAndAggregate->getAggregate();
157 4
        $handler = $handlerAndAggregate->getCommandHandler();
158
159 4
        $metaData = $this->eventMetadataFactory->factoryEventMetadata($command, $aggregate);
160
161 4
        $newMessageGenerator = $this->commandApplier->applyCommand($aggregate, $command->getCommand(), $handler->getMethodName());
162
163
        /** @var EventWithMetaData[] $eventsWithMetaData */
164 4
        $eventsWithMetaData = [];
165
166
        /** @var EventWithMetaData[] $scheduledEvents */
167 4
        $scheduledEvents = [];
168
169
        /** @var ScheduledCommand[] $scheduledCommands */
170 4
        $scheduledCommands = [];
171
172 4
        foreach ($newMessageGenerator as $message) {
173 3
            if ($this->isScheduledCommand($message)) {
174 1
                $scheduledCommands[] = $message;
175
            } else {
176 3
                $eventWithMetaData = $this->decorateEventWithMetaData($message, $metaData);
177 3
                if (!$this->isScheduledEvent($message)) {
178 3
                    $this->eventsApplierOnAggregate->applyEventsOnAggregate($aggregate, [$eventWithMetaData]);
179 3
                    $eventsWithMetaData[] = $eventWithMetaData;
180
                } else {
181 3
                    $scheduledEvents[] = $eventWithMetaData;
182
                }
183
            }
184
        }
185
186 3
        return [$eventsWithMetaData, $scheduledEvents, $scheduledCommands];
187
    }
188
189 3
    private function isScheduledEvent($event): bool
190
    {
191 3
        return $event instanceof ScheduledEvent;
192
    }
193
194 3
    private function isScheduledCommand($message): bool
195
    {
196 3
        return $message instanceof ScheduledCommand;
197
    }
198
199 4
    protected function getMaximumCommandRetryCount(): int
200
    {
201 4
        return self::MAXIMUM_SAVE_RETRIES;
202
    }
203
}