php-service-bus /
module-sagas
This project does not seem to handle request data directly as such no vulnerable execution paths were found.
include, or for example
via PHP's auto-loading mechanism.
| 1 | <?php |
||
| 2 | |||
| 3 | /** |
||
| 4 | * Saga pattern implementation module. |
||
| 5 | * |
||
| 6 | * @author Maksim Masiukevich <[email protected]> |
||
| 7 | * @license MIT |
||
| 8 | * @license https://opensource.org/licenses/MIT |
||
| 9 | */ |
||
| 10 | |||
| 11 | declare(strict_types = 1); |
||
| 12 | |||
| 13 | namespace ServiceBus\Sagas\Module; |
||
| 14 | |||
| 15 | use Amp\Success; |
||
| 16 | use ServiceBus\Mutex\InMemory\InMemoryMutexFactory; |
||
| 17 | use function Amp\call; |
||
| 18 | use function ServiceBus\Common\datetimeInstantiator; |
||
| 19 | use function ServiceBus\Common\invokeReflectionMethod; |
||
| 20 | use function ServiceBus\Common\now; |
||
| 21 | use function ServiceBus\Common\readReflectionPropertyValue; |
||
| 22 | use function ServiceBus\Sagas\createMutexKey; |
||
| 23 | use Amp\Promise; |
||
| 24 | use ServiceBus\Common\Context\ServiceBusContext; |
||
| 25 | use ServiceBus\Mutex\Lock; |
||
| 26 | use ServiceBus\Mutex\MutexFactory; |
||
| 27 | use ServiceBus\Sagas\Configuration\SagaMetadata; |
||
| 28 | use ServiceBus\Sagas\Module\Exceptions\CantSaveUnStartedSaga; |
||
| 29 | use ServiceBus\Sagas\Module\Exceptions\SagaMetaDataNotFound; |
||
| 30 | use ServiceBus\Sagas\Saga; |
||
| 31 | use ServiceBus\Sagas\SagaId; |
||
| 32 | use ServiceBus\Sagas\Store\Exceptions\LoadedExpiredSaga; |
||
| 33 | use ServiceBus\Sagas\Store\SagasStore; |
||
| 34 | |||
| 35 | /** |
||
| 36 | * Sagas provider. |
||
| 37 | */ |
||
| 38 | final class SagasProvider |
||
| 39 | { |
||
| 40 | /** @var SagasStore */ |
||
| 41 | private $sagaStore; |
||
| 42 | |||
| 43 | /** @var MutexFactory */ |
||
| 44 | private $mutexFactory; |
||
| 45 | |||
| 46 | /** |
||
| 47 | * Sagas meta data. |
||
| 48 | * |
||
| 49 | * @psalm-var array<string, \ServiceBus\Sagas\Configuration\SagaMetadata> |
||
| 50 | * |
||
| 51 | * @var SagaMetadata[] |
||
| 52 | */ |
||
| 53 | private $sagaMetaDataCollection = []; |
||
| 54 | |||
| 55 | /** @var Lock[] */ |
||
| 56 | private $lockCollection = []; |
||
| 57 | |||
| 58 | 12 | public function __construct(SagasStore $sagaStore, ?MutexFactory $mutexFactory = null) |
|
| 59 | { |
||
| 60 | 12 | $this->sagaStore = $sagaStore; |
|
| 61 | 12 | $this->mutexFactory = $mutexFactory ?? new InMemoryMutexFactory(); |
|
| 62 | 12 | } |
|
| 63 | |||
| 64 | 5 | public function __destruct() |
|
| 65 | { |
||
| 66 | 5 | unset($this->lockCollection); |
|
| 67 | 5 | } |
|
| 68 | |||
| 69 | /** |
||
| 70 | * Start a new saga. |
||
| 71 | * |
||
| 72 | * @return Promise<\ServiceBus\Sagas\Saga|null> |
||
| 73 | * |
||
| 74 | * @throws \ServiceBus\Sagas\Store\Exceptions\SagasStoreInteractionFailed Database interaction error |
||
| 75 | * @throws \ServiceBus\Sagas\Store\Exceptions\SagaSerializationError Error while serializing saga |
||
| 76 | * @throws \ServiceBus\Sagas\Module\Exceptions\SagaMetaDataNotFound |
||
| 77 | * @throws \ServiceBus\Sagas\Store\Exceptions\DuplicateSaga The specified saga has already been added |
||
| 78 | */ |
||
| 79 | 6 | public function start(SagaId $id, object $command, ServiceBusContext $context): Promise |
|
| 80 | { |
||
| 81 | 6 | return call( |
|
| 82 | function () use ($id, $command, $context): \Generator |
||
| 83 | { |
||
| 84 | 6 | yield from $this->setupMutex($id); |
|
| 85 | |||
| 86 | try |
||
| 87 | { |
||
| 88 | 6 | $sagaMetaData = $this->extractSagaMetaData($id->sagaClass); |
|
| 89 | |||
| 90 | /** @var \DateTimeImmutable $expireDate */ |
||
| 91 | 5 | $expireDate = datetimeInstantiator($sagaMetaData->expireDateModifier); |
|
| 92 | |||
| 93 | /** @var Saga $saga */ |
||
| 94 | 5 | $saga = new $id->sagaClass($id, $expireDate); |
|
| 95 | 5 | $saga->start($command); |
|
| 96 | |||
| 97 | 5 | yield from $this->doStore($saga, $context, true); |
|
| 98 | |||
| 99 | 4 | return $saga; |
|
| 100 | } |
||
| 101 | finally |
||
| 102 | { |
||
| 103 | 6 | yield from $this->releaseMutex($id); |
|
| 104 | } |
||
| 105 | 6 | } |
|
| 106 | ); |
||
| 107 | } |
||
| 108 | |||
| 109 | /** |
||
| 110 | * Load saga. |
||
| 111 | * |
||
| 112 | * @return Promise<\ServiceBus\Sagas\Saga|null> |
||
| 113 | * |
||
| 114 | * @throws \ServiceBus\Sagas\Store\Exceptions\LoadedExpiredSaga Expired saga loaded |
||
| 115 | * @throws \ServiceBus\Sagas\Store\Exceptions\SagasStoreInteractionFailed Database interaction error |
||
| 116 | * @throws \ServiceBus\Sagas\Store\Exceptions\SagaSerializationError Error while deserializing saga |
||
| 117 | */ |
||
| 118 | 4 | public function obtain(SagaId $id, ServiceBusContext $context): Promise |
|
| 119 | { |
||
| 120 | 4 | return call( |
|
| 121 | function () use ($id, $context): \Generator |
||
| 122 | { |
||
| 123 | 4 | yield from $this->setupMutex($id); |
|
| 124 | |||
| 125 | try |
||
| 126 | { |
||
| 127 | /** @var Saga|null $saga */ |
||
| 128 | 4 | $saga = yield $this->sagaStore->obtain($id); |
|
| 129 | } |
||
| 130 | 1 | catch (\Throwable $throwable) |
|
| 131 | { |
||
| 132 | 1 | yield from $this->releaseMutex($id); |
|
| 133 | |||
| 134 | 1 | throw $throwable; |
|
| 135 | } |
||
| 136 | |||
| 137 | 3 | if ($saga !== null) |
|
| 138 | { |
||
| 139 | /** Non-expired saga */ |
||
| 140 | 2 | if ($saga->expireDate() > now()) |
|
| 141 | { |
||
| 142 | 1 | return $saga; |
|
| 143 | } |
||
| 144 | |||
| 145 | 1 | yield from $this->doCloseExpired($saga, $context); |
|
| 146 | |||
| 147 | 1 | throw new LoadedExpiredSaga( |
|
| 148 | 1 | \sprintf('Unable to load the saga (ID: "%s") whose lifetime has expired', $id->toString()) |
|
| 149 | ); |
||
| 150 | } |
||
| 151 | |||
| 152 | 1 | yield from $this->releaseMutex($id); |
|
| 153 | 4 | } |
|
| 154 | ); |
||
| 155 | } |
||
| 156 | |||
| 157 | /** |
||
| 158 | * Save\update a saga. |
||
| 159 | * |
||
| 160 | * @return Promise<void> |
||
| 161 | * |
||
| 162 | * @throws \ServiceBus\Sagas\Module\Exceptions\CantSaveUnStartedSaga Attempt to save un-started saga |
||
| 163 | * @throws \ServiceBus\Sagas\Store\Exceptions\SagasStoreInteractionFailed Database interaction error |
||
| 164 | * @throws \ServiceBus\Sagas\Store\Exceptions\SagaSerializationError Error while serializing saga |
||
| 165 | */ |
||
| 166 | 4 | public function save(Saga $saga, ServiceBusContext $context): Promise |
|
| 167 | { |
||
| 168 | 4 | return call( |
|
| 169 | function () use ($saga, $context): \Generator |
||
| 170 | { |
||
| 171 | try |
||
| 172 | { |
||
| 173 | /** @var Saga|null $existsSaga */ |
||
| 174 | 4 | $existsSaga = yield $this->sagaStore->obtain($saga->id()); |
|
| 175 | |||
| 176 | 3 | if ($existsSaga !== null) |
|
| 177 | { |
||
| 178 | /** The saga has not been updated */ |
||
| 179 | 2 | if ($existsSaga->stateHash() !== $saga->stateHash()) |
|
| 180 | { |
||
| 181 | 1 | yield from $this->doStore($saga, $context, false); |
|
| 182 | } |
||
| 183 | |||
| 184 | 2 | return; |
|
| 185 | } |
||
| 186 | |||
| 187 | 1 | throw CantSaveUnStartedSaga::create($saga); |
|
| 188 | } |
||
| 189 | finally |
||
| 190 | { |
||
| 191 | 4 | yield from $this->releaseMutex($saga->id()); |
|
| 192 | } |
||
| 193 | 4 | } |
|
| 194 | ); |
||
| 195 | } |
||
| 196 | |||
| 197 | /** |
||
| 198 | * Close expired saga. |
||
| 199 | * |
||
| 200 | * @throws \ServiceBus\Sagas\Module\Exceptions\CantSaveUnStartedSaga Attempt to save un-started saga |
||
| 201 | * @throws \ServiceBus\Sagas\Store\Exceptions\SagasStoreInteractionFailed Database interaction error |
||
| 202 | * @throws \ServiceBus\Sagas\Store\Exceptions\SagaSerializationError Error while serializing saga |
||
| 203 | */ |
||
| 204 | 1 | private function doCloseExpired(Saga $saga, ServiceBusContext $context): \Generator |
|
| 205 | { |
||
| 206 | /** @var \ServiceBus\Sagas\SagaStatus $currentStatus */ |
||
| 207 | 1 | $currentStatus = readReflectionPropertyValue($saga, 'status'); |
|
| 208 | |||
| 209 | 1 | if ($currentStatus->inProgress() === true) |
|
| 210 | { |
||
| 211 | 1 | invokeReflectionMethod($saga, 'makeExpired'); |
|
| 212 | |||
| 213 | 1 | yield $this->save($saga, $context); |
|
| 214 | } |
||
| 215 | 1 | } |
|
| 216 | |||
| 217 | /** |
||
| 218 | * Execute add/update saga entry. |
||
| 219 | * |
||
| 220 | * @throws \ServiceBus\Sagas\Store\Exceptions\SagaSerializationError Error while serializing saga |
||
| 221 | * @throws \ServiceBus\Sagas\Store\Exceptions\DuplicateSaga The specified saga has already been added |
||
| 222 | * @throws \ServiceBus\Sagas\Store\Exceptions\SagasStoreInteractionFailed Database interaction error |
||
| 223 | */ |
||
| 224 | 5 | private function doStore(Saga $saga, ServiceBusContext $context, bool $isNew): \Generator |
|
| 225 | { |
||
| 226 | /** |
||
| 227 | * @psalm-var array<int, object> $messages |
||
| 228 | * |
||
| 229 | * @var object[] $messages |
||
| 230 | */ |
||
| 231 | 5 | $messages = invokeReflectionMethod($saga, 'messages'); |
|
| 232 | |||
| 233 | 5 | $isNew === true |
|
| 234 | 5 | ? yield $this->sagaStore->save($saga) |
|
| 235 | 1 | : yield $this->sagaStore->update($saga); |
|
| 236 | |||
| 237 | 4 | $promises = []; |
|
| 238 | |||
| 239 | 4 | foreach ($messages as $message) |
|
| 240 | { |
||
| 241 | 4 | $promises[] = $context->delivery($message); |
|
| 242 | } |
||
| 243 | |||
| 244 | 4 | if (\count($promises) !== 0) |
|
| 245 | { |
||
| 246 | 4 | yield $promises; |
|
| 247 | } |
||
| 248 | 4 | } |
|
| 249 | |||
| 250 | /** |
||
| 251 | * Receive saga meta data information. |
||
| 252 | * |
||
| 253 | * @throws \ServiceBus\Sagas\Module\Exceptions\SagaMetaDataNotFound |
||
| 254 | */ |
||
| 255 | 6 | private function extractSagaMetaData(string $sagaClass): SagaMetadata |
|
| 256 | { |
||
| 257 | 6 | if (isset($this->sagaMetaDataCollection[$sagaClass]) === true) |
|
| 258 | { |
||
| 259 | 5 | return $this->sagaMetaDataCollection[$sagaClass]; |
|
| 260 | } |
||
| 261 | |||
| 262 | 1 | throw SagaMetaDataNotFound::create($sagaClass); |
|
| 263 | } |
||
| 264 | |||
| 265 | /** |
||
| 266 | * Add meta data for specified saga |
||
| 267 | * Called from the infrastructure layer using Reflection API. |
||
| 268 | * |
||
| 269 | * @noinspection PhpUnusedPrivateMethodInspection |
||
| 270 | * |
||
| 271 | * @see SagaMessagesRouterConfigurator::configure |
||
| 272 | */ |
||
| 273 | 7 | private function appendMetaData(string $sagaClass, SagaMetadata $metadata): void |
|
|
0 ignored issues
–
show
|
|||
| 274 | { |
||
| 275 | 7 | $this->sagaMetaDataCollection[$sagaClass] = $metadata; |
|
| 276 | 7 | } |
|
| 277 | |||
| 278 | /** |
||
| 279 | * Setup mutex on saga. |
||
| 280 | */ |
||
| 281 | 8 | private function setupMutex(SagaId $id): \Generator |
|
| 282 | { |
||
| 283 | 8 | $mutexKey = createMutexKey($id); |
|
| 284 | |||
| 285 | 8 | $mutex = $this->mutexFactory->create($mutexKey); |
|
| 286 | |||
| 287 | /** @var Lock $lock */ |
||
| 288 | 8 | $lock = yield $mutex->acquire(); |
|
| 289 | |||
| 290 | 8 | $this->lockCollection[$mutexKey] = $lock; |
|
| 291 | 8 | } |
|
| 292 | |||
| 293 | /** |
||
| 294 | * Remove lock from saga. |
||
| 295 | */ |
||
| 296 | 10 | private function releaseMutex(SagaId $id): \Generator |
|
| 297 | { |
||
| 298 | 10 | $mutexKey = createMutexKey($id); |
|
| 299 | |||
| 300 | 10 | if (\array_key_exists($mutexKey, $this->lockCollection) === true) |
|
| 301 | { |
||
| 302 | /** @var Lock $lock */ |
||
| 303 | 8 | $lock = $this->lockCollection[$mutexKey]; |
|
| 304 | |||
| 305 | 8 | unset($this->lockCollection[$mutexKey]); |
|
| 306 | |||
| 307 | 8 | yield $lock->release(); |
|
| 308 | } |
||
| 309 | 10 | } |
|
| 310 | } |
||
| 311 |
This check looks for private methods that have been defined, but are not used inside the class.