MongoDB::saveStatus()   A
last analyzed

Complexity

Conditions 1
Paths 1

Size

Total Lines 3
Code Lines 1

Duplication

Lines 0
Ratio 0 %

Importance

Changes 1
Bugs 0 Features 0
Metric Value
cc 1
eloc 1
c 1
b 0
f 0
nc 1
nop 2
dl 0
loc 3
rs 10
1
<?php
2
3
namespace Ackintosh\Ganesha\Storage\Adapter;
4
5
use Ackintosh\Ganesha;
6
use Ackintosh\Ganesha\Configuration;
7
use Ackintosh\Ganesha\Exception\StorageException;
8
use Ackintosh\Ganesha\Storage\AdapterInterface;
9
use MongoDB\Driver\Cursor;
10
11
class MongoDB implements AdapterInterface, TumblingTimeWindowInterface, SlidingTimeWindowInterface
12
{
13
    /**
14
     * @var \MongoDB\Driver\Manager
15
     */
16
    private $manager;
17
18
    /**
19
     * @var Configuration
20
     */
21
    private $configuration;
22
23
    /**
24
     * @var string
25
     */
26
    private $collectionName;
27
28
    /**
29
     * @var string
30
     */
31
    private $dbName;
32
33
    /**
34
     * MongoDB constructor.
35
     * @param \MongoDB\Driver\Manager $manager
36
     */
37
    public function __construct(\MongoDB\Driver\Manager $manager)
38
    {
39
        $this->manager = $manager;
40
    }
41
42
    /**
43
     * @return bool
44
     */
45
    public function supportCountStrategy(): bool
46
    {
47
        return true;
48
    }
49
50
    /**
51
     * @return bool
52
     */
53
    public function supportRateStrategy(): bool
54
    {
55
        return true;
56
    }
57
58
    /**
59
     * @param Configuration $configuration
60
     * @return void
61
     * @throws \Exception
62
     */
63
    public function setConfiguration(Configuration $configuration): void
64
    {
65
        $this->configuration = $configuration;
66
        $this->dbName = $this->configuration->offsetGet('dbName');
67
        $this->collectionName = $this->configuration->offsetGet('collectionName');
68
    }
69
70
    /**
71
     * @param string $service
72
     * @return int
73
     * @throws StorageException
74
     */
75
    public function load(string $service): int
76
    {
77
        $cursor = $this->read(['service' => $service]);
78
        $result = $cursor->toArray();
79
        if ($result === null || empty($result)) {
80
            $this->update(['service' => $service], ['$set' => ['count' => 0]]);
81
            return 0;
82
        }
83
        if (!isset($result[0]['count'])) {
84
            throw new StorageException('failed to load service : file "count" not found.');
85
        }
86
87
        return $result[0]['count'];
88
    }
89
90
    /**
91
     * @param string $service
92
     * @param int $count
93
     * @return void
94
     * @throws StorageException
95
     */
96
    public function save(string $service, int $count): void
97
    {
98
        $this->update(['service' => $service], ['$set' => ['count' => $count]]);
99
    }
100
101
    /**
102
     * @param string $service
103
     * @return void
104
     * @throws StorageException
105
     */
106
    public function increment(string $service): void
107
    {
108
        $this->update(['service' => $service], ['$inc' => ['count' => 1]], ['safe' => true]);
109
    }
110
111
    /**
112
     * @param string $service
113
     * @return void
114
     * @throws StorageException
115
     */
116
    public function decrement(string $service): void
117
    {
118
        $this->update(['service' => $service], ['$inc' => ['count' => -1]], ['safe' => true]);
119
    }
120
121
    /**
122
     * @param string $service
123
     * @param int $lastFailureTime
124
     * @throws StorageException
125
     */
126
    public function saveLastFailureTime(string $service, int $lastFailureTime): void
127
    {
128
        $this->update(['service' => $service], ['$set' => ['lastFailureTime' => $lastFailureTime]]);
129
    }
130
131
    /**
132
     * @param  string $service
133
     * @return int
134
     * @throws StorageException
135
     */
136
    public function loadLastFailureTime(string $service): int
137
    {
138
        $cursor = $this->read(['service' => $service]);
139
        $result = $cursor->toArray();
140
        if ($result === null || empty($result)) {
141
            throw new StorageException('failed to last failure time : entry not found.');
142
        }
143
        if (!isset($result[0]['lastFailureTime'])) {
144
            throw new StorageException('failed to last failure time : field "lastFailureTime" not found.');
145
        }
146
147
        return $result[0]['lastFailureTime'];
148
    }
149
150
    /**
151
     * @param string $service
152
     * @param int $status
153
     * @throws StorageException
154
     */
155
    public function saveStatus(string $service, int $status): void
156
    {
157
        $this->update(['service' => $service], ['$set' => ['status' => $status]]);
158
    }
159
160
    /**
161
     * @param  string $service
162
     * @return int
163
     * @throws StorageException
164
     */
165
    public function loadStatus(string $service): int
166
    {
167
        $cursor = $this->read(['service' => $service]);
168
        $result = $cursor->toArray();
169
170
        if ($result === null || empty($result) || !isset($result[0]['status'])) {
171
            $this->saveStatus($service, Ganesha::STATUS_CALMED_DOWN);
172
            return Ganesha::STATUS_CALMED_DOWN;
173
        }
174
175
        return $result[0]['status'];
176
    }
177
178
    public function reset(): void
179
    {
180
        $this->delete([], []);
181
    }
182
183
    /**
184
     * @return string "db.collectionName"
185
     */
186
    private function getNamespace(): string
187
    {
188
        return $this->dbName . '.' . $this->collectionName;
189
    }
190
191
    /**
192
     * @param $filter
193
     * @param array $queryOptions
194
     * @return \MongoDB\Driver\Cursor
195
     */
196
    private function read(array $filter, array $queryOptions = []): Cursor
197
    {
198
        try {
199
            $query = new \MongoDB\Driver\Query($filter, $queryOptions);
200
            $cursor = $this->manager->executeQuery($this->getNamespace(), $query);
201
            $cursor->setTypeMap(['root' => 'array', 'document' => 'array', 'array' => 'array']);
202
            return $cursor;
203
        } catch (\MongoDB\Driver\Exception\Exception $ex) {
204
            throw new StorageException('adapter error : ' . $ex->getMessage());
205
        }
206
    }
207
208
    /**
209
     * @param $filter
210
     * @param array $deleteOptions
211
     * @return void
212
     */
213
    private function delete(array $filter, array $deleteOptions = []): void
214
    {
215
        $this->bulkWrite($filter, $options = ['deleteOptions' => $deleteOptions], 'delete');
216
    }
217
218
    /**
219
     * @param $filter
220
     * @param $newObj
221
     * @param array $updateOptions
222
     */
223
    private function update(array $filter, array $newObj, array $updateOptions = ['multi' => false, 'upsert' => true]): void
224
    {
225
        $this->bulkWrite($filter, $options = ['newObj' => $newObj, 'updateOptions' => $updateOptions], 'update');
226
    }
227
228
    /**
229
     * @param $filter
230
     * @param array $options
231
     * @param string $command
232
     */
233
    private function bulkWrite(array $filter, array $options, string $command): void
234
    {
235
        try {
236
            $bulk = new \MongoDB\Driver\BulkWrite();
237
            switch ($command) {
238
                case 'update':
239
                    if (isset($options['newObj']['$set'])) {
240
                        $options['newObj']['$set']['date'] = new \MongoDB\BSON\UTCDateTime();
241
                    }
242
                    $bulk->update($filter, $options['newObj'], $options['updateOptions']);
243
                    break;
244
                case 'delete':
245
                    $bulk->delete($filter, $options['deleteOptions']);
246
                    break;
247
            }
248
            $writeConcern = new \MongoDB\Driver\WriteConcern(\MongoDB\Driver\WriteConcern::MAJORITY, 100);
249
            $result = $this->manager->executeBulkWrite($this->getNamespace(), $bulk, $writeConcern);
250
            if (!empty($result->getWriteErrors())) {
251
                $errorMessage = '';
252
                foreach ($result->getWriteErrors() as $writeError) {
253
                    $errorMessage .= 'Operation#' . $writeError->getIndex() . ': ' . $writeError->getMessage() . ' (' . $writeError->getCode() . ')' . "\n";
254
                }
255
                throw new StorageException('failed '.$command.' the value : ' . $errorMessage);
256
            }
257
        } catch (\MongoDB\Driver\Exception\Exception $ex) {
258
            throw new StorageException('adapter error : ' . $ex->getMessage());
259
        }
260
    }
261
}
262