Completed
Push — v4.0 ( d88c1e...d1a754 )
by Masiukevich
03:36
created

IndexProvider::releaseMutex()   A

Complexity

Conditions 2
Paths 2

Size

Total Lines 12
Code Lines 5

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 6
CRAP Score 2

Importance

Changes 1
Bugs 0 Features 0
Metric Value
cc 2
eloc 5
c 1
b 0
f 0
nc 2
nop 1
dl 0
loc 12
ccs 6
cts 6
cp 1
crap 2
rs 10
1
<?php
2
3
/**
4
 * Event Sourcing implementation module.
5
 *
6
 * @author  Maksim Masiukevich <[email protected]>
7
 * @license MIT
8
 * @license https://opensource.org/licenses/MIT
9
 */
10
11
declare(strict_types = 1);
12
13
namespace ServiceBus\EventSourcingModule;
14
15
use ServiceBus\Mutex\InMemory\InMemoryMutexFactory;
16
use function Amp\call;
17
use Amp\Promise;
18
use ServiceBus\EventSourcing\Indexes\IndexKey;
19
use ServiceBus\EventSourcing\Indexes\IndexValue;
20
use ServiceBus\EventSourcing\Indexes\Store\IndexStore;
21
use ServiceBus\EventSourcingModule\Exceptions\IndexOperationFailed;
22
use ServiceBus\Mutex\Lock;
23
use ServiceBus\Mutex\MutexFactory;
24
use ServiceBus\Storage\Common\Exceptions\UniqueConstraintViolationCheckFailed;
25
26
/**
27
 *
28
 */
29
final class IndexProvider
30
{
31
    /** @var IndexStore */
32
    private $store;
33
34
    /**
35
     * Mutex creator.
36
     *
37
     * @var MutexFactory
38
     */
39
    private $mutexFactory;
40
41
    /** @var Lock[] */
42
    private $lockCollection = [];
43
44 6
    public function __construct(IndexStore $store, ?MutexFactory $mutexFactory = null)
45
    {
46 6
        $this->store        = $store;
47 6
        $this->mutexFactory = $mutexFactory ?? new InMemoryMutexFactory();
48 6
    }
49
50
    /**
51
     * Receive index value.
52
     *
53
     * Returns \ServiceBus\EventSourcing\Indexes\IndexValue|null
54
     *
55
     * @throws \ServiceBus\EventSourcingModule\Exceptions\IndexOperationFailed
56
     */
57 3
    public function get(IndexKey $indexKey): Promise
58
    {
59 3
        return call(
60
            function () use ($indexKey): \Generator
61
            {
62
                try
63
                {
64 3
                    yield from $this->setupMutex($indexKey);
65
66
                    /** @var IndexValue|null $value */
67 3
                    $value = yield $this->store->find($indexKey);
68
69 3
                    return $value;
70
                }
71
                catch (\Throwable $throwable)
72
                {
73
                    throw IndexOperationFailed::fromThrowable($throwable);
74
                }
75
                finally
76
                {
77 3
                    yield from $this->releaseMutex($indexKey);
78
                }
79 3
            }
80
        );
81
    }
82
83
    /**
84
     * Is there a value in the index.
85
     *
86
     * @throws \ServiceBus\EventSourcingModule\Exceptions\IndexOperationFailed
87
     */
88 1
    public function has(IndexKey $indexKey): Promise
89
    {
90 1
        return call(
91
            function () use ($indexKey): \Generator
92
            {
93
                try
94
                {
95
                    /** @var IndexValue|null $value */
96 1
                    $value = yield $this->store->find($indexKey);
97
98 1
                    return $value !== null;
99
                }
100
                catch (\Throwable $throwable)
101
                {
102
                    throw IndexOperationFailed::fromThrowable($throwable);
103
                }
104 1
            }
105
        );
106
    }
107
108
    /**
109
     * Add a value to the index. If false, then the value already exists.
110
     *
111
     * @throws \ServiceBus\EventSourcingModule\Exceptions\IndexOperationFailed
112
     */
113 5
    public function add(IndexKey $indexKey, IndexValue $value): Promise
114
    {
115 5
        return call(
116
            function () use ($indexKey, $value): \Generator
117
            {
118
                try
119
                {
120 5
                    yield from $this->setupMutex($indexKey);
121
122
                    /** @var int $affectedRows */
123 5
                    $affectedRows = yield $this->store->add($indexKey, $value);
124
125 5
                    return $affectedRows !== 0;
126
                }
127 1
                catch (UniqueConstraintViolationCheckFailed $exception)
128
                {
129 1
                    return false;
130
                }
131
                catch (\Throwable $throwable)
132
                {
133
                    throw IndexOperationFailed::fromThrowable($throwable);
134
                }
135
                finally
136
                {
137 5
                    yield from $this->releaseMutex($indexKey);
138
                }
139 5
            }
140
        );
141
    }
142
143
    /**
144
     * Remove value from index.
145
     *
146
     * @throws \ServiceBus\EventSourcingModule\Exceptions\IndexOperationFailed
147
     */
148 1
    public function remove(IndexKey $indexKey): Promise
149
    {
150 1
        return call(
151
            function () use ($indexKey): \Generator
152
            {
153
                try
154
                {
155 1
                    yield from $this->setupMutex($indexKey);
156 1
                    yield $this->store->delete($indexKey);
157
                }
158
                catch (\Throwable $throwable)
159
                {
160
                    throw IndexOperationFailed::fromThrowable($throwable);
161
                }
162
                finally
163 1
                {
164 1
                    yield from $this->releaseMutex($indexKey);
165
                }
166 1
            }
167
        );
168
    }
169
170
    /**
171
     * Update value in index.
172
     *
173
     * @throws \ServiceBus\EventSourcingModule\Exceptions\IndexOperationFailed
174
     */
175 1
    public function update(IndexKey $indexKey, IndexValue $value): Promise
176
    {
177 1
        return call(
178
            function () use ($indexKey, $value): \Generator
179
            {
180
                try
181
                {
182 1
                    yield from $this->setupMutex($indexKey);
183
184
                    /** @var int $affectedRows */
185 1
                    $affectedRows = yield $this->store->update($indexKey, $value);
186
187 1
                    return $affectedRows !== 0;
188
                }
189
                catch (\Throwable $throwable)
190
                {
191
                    throw IndexOperationFailed::fromThrowable($throwable);
192
                }
193
                finally
194
                {
195 1
                    yield from $this->releaseMutex($indexKey);
196
                }
197 1
            }
198
        );
199
    }
200
201 5
    private function setupMutex(IndexKey $indexKey): \Generator
202
    {
203 5
        $mutexKey = createIndexMutex($indexKey);
204
205 5
        if (\array_key_exists($mutexKey, $this->lockCollection) === false)
206
        {
207 5
            $mutex = $this->mutexFactory->create($mutexKey);
208
209
            /** @var Lock $lock */
210 5
            $lock = yield $mutex->acquire();
211
212 5
            $this->lockCollection[$mutexKey] = $lock;
213
        }
214 5
    }
215
216 5
    private function releaseMutex(IndexKey $indexKey): \Generator
217
    {
218 5
        $mutexKey = createIndexMutex($indexKey);
219
220 5
        if (\array_key_exists($mutexKey, $this->lockCollection) === true)
221
        {
222
            /** @var Lock $lock */
223 5
            $lock = $this->lockCollection[$mutexKey];
224
225 5
            unset($this->lockCollection[$mutexKey]);
226
227 5
            yield $lock->release();
228
        }
229 5
    }
230
}
231