Passed
Push — v4.2 ( 50993c...c387ce )
by Masiukevich
10:37
created

IndexProvider   A

Complexity

Total Complexity 16

Size/Duplication

Total Lines 200
Duplicated Lines 0 %

Importance

Changes 1
Bugs 0 Features 0
Metric Value
eloc 63
c 1
b 0
f 0
dl 0
loc 200
rs 10
wmc 16

8 Methods

Rating   Name   Duplication   Size   Complexity  
A update() 0 21 2
A setupMutex() 0 12 2
A get() 0 21 2
A remove() 0 18 2
A releaseMutex() 0 12 2
A __construct() 0 4 1
A add() 0 25 3
A has() 0 15 2
1
<?php
2
3
/**
4
 * Event Sourcing implementation.
5
 *
6
 * @author  Maksim Masiukevich <[email protected]>
7
 * @license MIT
8
 * @license https://opensource.org/licenses/MIT
9
 */
10
11
declare(strict_types = 1);
12
13
namespace ServiceBus\EventSourcing;
14
15
use function Amp\call;
16
use Amp\Promise;
17
use ServiceBus\EventSourcing\Exceptions\IndexOperationFailed;
18
use ServiceBus\EventSourcing\Indexes\IndexKey;
19
use ServiceBus\EventSourcing\Indexes\IndexValue;
20
use ServiceBus\EventSourcing\Indexes\Store\IndexStore;
21
use ServiceBus\Mutex\InMemory\InMemoryMutexFactory;
22
use ServiceBus\Mutex\Lock;
23
use ServiceBus\Mutex\MutexFactory;
24
use ServiceBus\Storage\Common\Exceptions\UniqueConstraintViolationCheckFailed;
25
26
/**
27
 *
28
 */
29
final class IndexProvider
30
{
31
    /** @var IndexStore */
32
    private $store;
33
34
    /**
35
     * Mutex creator.
36
     *
37
     * @var MutexFactory
38
     */
39
    private $mutexFactory;
40
41
    /** @var Lock[] */
42
    private $lockCollection = [];
43
44
    public function __construct(IndexStore $store, ?MutexFactory $mutexFactory = null)
45
    {
46
        $this->store        = $store;
47
        $this->mutexFactory = $mutexFactory ?? new InMemoryMutexFactory();
48
    }
49
50
    /**
51
     * Receive index value.
52
     *
53
     * Returns \ServiceBus\EventSourcing\Indexes\IndexValue|null
54
     *
55
     * @throws \ServiceBus\EventSourcing\Exceptions\IndexOperationFailed
56
     */
57
    public function get(IndexKey $indexKey): Promise
58
    {
59
        return call(
60
            function() use ($indexKey): \Generator
61
            {
62
                yield from $this->setupMutex($indexKey);
63
64
                try
65
                {
66
                    /** @var IndexValue|null $value */
67
                    $value = yield $this->store->find($indexKey);
68
69
                    return $value;
70
                }
71
                catch (\Throwable $throwable)
72
                {
73
                    throw IndexOperationFailed::fromThrowable($throwable);
74
                }
75
                finally
76
                {
77
                    yield from $this->releaseMutex($indexKey);
78
                }
79
            }
80
        );
81
    }
82
83
    /**
84
     * Is there a value in the index.
85
     *
86
     * @throws \ServiceBus\EventSourcing\Exceptions\IndexOperationFailed
87
     */
88
    public function has(IndexKey $indexKey): Promise
89
    {
90
        return call(
91
            function() use ($indexKey): \Generator
92
            {
93
                try
94
                {
95
                    /** @var IndexValue|null $value */
96
                    $value = yield $this->store->find($indexKey);
97
98
                    return null !== $value;
99
                }
100
                catch (\Throwable $throwable)
101
                {
102
                    throw IndexOperationFailed::fromThrowable($throwable);
103
                }
104
            }
105
        );
106
    }
107
108
    /**
109
     * Add a value to the index. If false, then the value already exists.
110
     *
111
     * @throws \ServiceBus\EventSourcing\Exceptions\IndexOperationFailed
112
     */
113
    public function add(IndexKey $indexKey, IndexValue $value): Promise
114
    {
115
        return call(
116
            function() use ($indexKey, $value): \Generator
117
            {
118
                yield from $this->setupMutex($indexKey);
119
120
                try
121
                {
122
                    /** @var int $affectedRows */
123
                    $affectedRows = yield $this->store->add($indexKey, $value);
124
125
                    return 0 !== $affectedRows;
126
                }
127
                catch (UniqueConstraintViolationCheckFailed $exception)
128
                {
129
                    return false;
130
                }
131
                catch (\Throwable $throwable)
132
                {
133
                    throw IndexOperationFailed::fromThrowable($throwable);
134
                }
135
                finally
136
                {
137
                    yield from $this->releaseMutex($indexKey);
138
                }
139
            }
140
        );
141
    }
142
143
    /**
144
     * Remove value from index.
145
     *
146
     * @throws \ServiceBus\EventSourcing\Exceptions\IndexOperationFailed
147
     */
148
    public function remove(IndexKey $indexKey): Promise
149
    {
150
        return call(
151
            function() use ($indexKey): \Generator
152
            {
153
                yield from $this->setupMutex($indexKey);
154
155
                try
156
                {
157
                    yield $this->store->delete($indexKey);
158
                }
159
                catch (\Throwable $throwable)
160
                {
161
                    throw IndexOperationFailed::fromThrowable($throwable);
162
                }
163
                finally
164
                {
165
                    yield from $this->releaseMutex($indexKey);
166
                }
167
            }
168
        );
169
    }
170
171
    /**
172
     * Update value in index.
173
     *
174
     * @throws \ServiceBus\EventSourcing\Exceptions\IndexOperationFailed
175
     */
176
    public function update(IndexKey $indexKey, IndexValue $value): Promise
177
    {
178
        return call(
179
            function() use ($indexKey, $value): \Generator
180
            {
181
                yield from $this->setupMutex($indexKey);
182
183
                try
184
                {
185
                    /** @var int $affectedRows */
186
                    $affectedRows = yield $this->store->update($indexKey, $value);
187
188
                    return 0 !== $affectedRows;
189
                }
190
                catch (\Throwable $throwable)
191
                {
192
                    throw IndexOperationFailed::fromThrowable($throwable);
193
                }
194
                finally
195
                {
196
                    yield from $this->releaseMutex($indexKey);
197
                }
198
            }
199
        );
200
    }
201
202
    private function setupMutex(IndexKey $indexKey): \Generator
203
    {
204
        $mutexKey = createIndexMutex($indexKey);
205
206
        if (\array_key_exists($mutexKey, $this->lockCollection))
207
        {
208
            $mutex = $this->mutexFactory->create($mutexKey);
209
210
            /** @var Lock $lock */
211
            $lock = yield $mutex->acquire();
212
213
            $this->lockCollection[$mutexKey] = $lock;
214
        }
215
    }
216
217
    private function releaseMutex(IndexKey $indexKey): \Generator
218
    {
219
        $mutexKey = createIndexMutex($indexKey);
220
221
        if (\array_key_exists($mutexKey, $this->lockCollection))
222
        {
223
            /** @var Lock $lock */
224
            $lock = $this->lockCollection[$mutexKey];
225
226
            unset($this->lockCollection[$mutexKey]);
227
228
            yield $lock->release();
229
        }
230
    }
231
}
232