Completed
Push — master ( 44b865...d365d6 )
by Dmitry
04:15 queued 02:12
created

Temporal::initSchema()   B

Complexity

Conditions 4
Paths 3

Size

Total Lines 66
Code Lines 51

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
dl 0
loc 66
rs 8.8291
c 0
b 0
f 0
cc 4
eloc 51
nc 3
nop 1

How to fix   Long Method   

Long Method

Small methods make your code easier to understand, in particular if combined with a good name. Besides, if your method is small, finding a good name is usually much easier.

For example, if you find yourself adding comments to a method's body, this is usually a good sign to extract the commented part to a new method, and use the comment as a starting point when coming up with a good name for this new method.

Commonly applied refactorings include:

1
<?php
2
3
namespace Tarantool\Mapper\Plugin;
4
5
use Carbon\Carbon;
6
use Exception;
7
use Tarantool\Mapper\Mapper;
8
use Tarantool\Mapper\Plugin;
9
10
class Temporal extends Plugin
11
{
12
    private $actor;
13
    private $timestamps = [];
14
15
    public function getLinks($entity, $id, $date)
16
    {
17
        $links = $this->getData($entity, $id, $date, '_link_aggregate');
18
        foreach ($links as $i => $source) {
19
            $link = array_key_exists(1, $source) ? ['data' => $source[1]] : [];
20
            foreach ($source[0] as $spaceId => $entityId) {
21
                $spaceName = $this->mapper->getSchema()->getSpace($spaceId)->getName();
22
                $link[$spaceName] = $entityId;
23
            }
24
            $links[$i] = $link;
25
        }
26
        return $links;
27
    }
28
29
    public function getState($entity, $id, $date)
30
    {
31
        return $this->getData($entity, $id, $date, '_override_aggregate');
32
    }
33
34
    private function getData($entity, $id, $date, $space)
35
    {
36
        if (!$this->mapper->getSchema()->hasSpace($entity)) {
37
            throw new Exception("invalid entity: " . $entity);
38
        }
39
40
        $entity = $this->mapper->getSchema()->getSpace($entity)->getId();
41
        $date = $this->getTimestamp($date);
42
43
        $rows = $this->mapper->getClient()->getSpace($space)
44
            ->select([$entity, $id, $date], 0, 1, 0, 4) // [key, index, limit, offset, iterator = LE]
45
            ->getData();
46
47
        if (count($rows)) {
48
            $state = $this->mapper->findOne($space, [
49
                'entity' => $entity,
50
                'id' => $id,
51
                'begin' => $rows[0][2]
52
            ]);
53
            if (!$state->end || $state->end >= $date) {
54
                return $state->data;
55
            }
56
        }
57
58
        return [];
59
    }
60
61
    public function override(array $override)
62
    {
63
        $override = $this->parseConfig($override);
64
65
        foreach ($override as $k => $v) {
66
            if (!in_array($k, ['entity', 'id', 'begin', 'end', 'data'])) {
67
                $override['entity'] = $k;
68
                $override['id'] = $v;
69
                unset($override[$k]);
70
            }
71
        }
72
73
        if (!array_key_exists('entity', $override)) {
74
            throw new Exception("no entity defined");
75
        }
76
77
        if (!$this->mapper->getSchema()->hasSpace($override['entity'])) {
78
            throw new Exception("invalid entity " . $override['entity']);
79
        }
80
81
        $space = $this->mapper->getSchema()->getSpace($override['entity']);
82
83
        // set entity id
84
        $override['entity'] = $space->getId();
85
86
        $override['actor'] = $this->actor;
87
        $override['timestamp'] = Carbon::now()->timestamp;
88
89
        $this->initSchema('override');
90
        $this->mapper->create('_override', $override);
91
        $this->processOverride($override);
92
    }
93
94
    public function link(array $link)
95
    {
96
        $link = $this->parseConfig($link);
97
98
        $this->initSchema('link');
99
100
        ksort($link);
101
        $node = null;
102
        foreach ($link as $entity => $id) {
103
            if (in_array($entity, ['begin', 'end', 'data'])) {
104
                continue;
105
            }
106
            $spaceId = $this->mapper->getSchema()->getSpace($entity)->getId();
107
108
            $params = [
109
                'entity'   => $spaceId,
110
                'entityId' => $id,
111
                'parent'   => $node ? $node->id : 0,
112
            ];
113
            $node = $this->mapper->findOrCreate('_link', $params);
114
        }
115
116
        if (!$node || !$node->parent) {
117
            throw new Exception("Invalid link configuration");
118
        }
119
120
        $node->begin = $link['begin'];
121
        $node->end = $link['end'];
122
        $node->actor = $this->actor;
123
        $node->timestamp = Carbon::now()->timestamp;
124
        if (array_key_exists('data', $link)) {
125
            $node->data = $link['data'];
126
        }
127
128
        $node->save();
129
        foreach ($link as $entity => $id) {
130
            if (in_array($entity, ['begin', 'end', 'data'])) {
131
                continue;
132
            }
133
            $this->processLink($entity, $id);
134
        }
135
    }
136
137
    public function setActor($actor)
138
    {
139
        $this->actor = $actor;
140
        return $this;
141
    }
142
143
    private function getLeafs($link)
144
    {
145
        if ($link->timestamp) {
146
            return [$link];
147
        }
148
149
        $leafs = [];
150
        foreach ($this->mapper->find('_link', ['parent' => $link->id]) as $child) {
151
            foreach ($this->getLeafs($child) as $leaf) {
152
                $leafs[] = $leaf;
153
            }
154
        }
155
        return $leafs;
156
    }
157
158
    private function getTimestamp($string)
159
    {
160
        if (!array_key_exists($string, $this->timestamps)) {
161
            $this->timestamps[$string] = Carbon::parse($string)->timestamp;
162
        }
163
        return $this->timestamps[$string];
164
    }
165
166
    private function initSchema($name)
167
    {
168
        switch ($name) {
169
            case 'override':
170
                return $this->mapper->getSchema()->once(__CLASS__.'@states', function (Mapper $mapper) {
171
                    $mapper->getSchema()
172
                        ->createSpace('_override', [
173
                            'entity'     => 'unsigned',
174
                            'id'         => 'unsigned',
175
                            'begin'      => 'unsigned',
176
                            'end'        => 'unsigned',
177
                            'timestamp'  => 'unsigned',
178
                            'actor'      => 'unsigned',
179
                            'data'       => '*',
180
                        ])
181
                        ->addIndex(['entity', 'id', 'begin', 'timestamp', 'actor']);
182
183
                    $mapper->getSchema()
184
                        ->createSpace('_override_aggregate', [
185
                            'entity'     => 'unsigned',
186
                            'id'         => 'unsigned',
187
                            'begin'      => 'unsigned',
188
                            'end'        => 'unsigned',
189
                            'data'       => '*',
190
                        ])
191
                        ->addIndex(['entity', 'id', 'begin']);
192
                });
193
194
            case 'link':
195
                return $this->mapper->getSchema()->once(__CLASS__.'@link', function (Mapper $mapper) {
196
                    if (!$mapper->hasPlugin(Sequence::class)) {
197
                        $mapper->addPlugin(Sequence::class);
198
                    }
199
                    $mapper->getSchema()
200
                        ->createSpace('_link', [
201
                            'id'        => 'unsigned',
202
                            'parent'    => 'unsigned',
203
                            'entityId'  => 'unsigned',
204
                            'entity'    => 'unsigned',
205
                            'begin'     => 'unsigned',
206
                            'end'       => 'unsigned',
207
                            'timestamp' => 'unsigned',
208
                            'actor'     => 'unsigned',
209
                            'data'      => '*',
210
                        ])
211
                        ->addIndex(['id'])
212
                        ->addIndex(['entity', 'entityId', 'parent', 'begin', 'timestamp', 'actor'])
213
                        ->addIndex([
214
                            'fields' => 'parent',
215
                            'unique' => false,
216
                        ]);
217
218
                    $mapper->getSchema()
219
                        ->createSpace('_link_aggregate', [
220
                            'entity' => 'unsigned',
221
                            'id'     => 'unsigned',
222
                            'begin'  => 'unsigned',
223
                            'end'    => 'unsigned',
224
                            'data'   => '*',
225
                        ])
226
                        ->addIndex(['entity', 'id', 'begin']);
227
                });
228
        }
229
230
        throw new Exception("Invalid schema $name");
231
    }
232
233
    private function parseConfig(array $data)
234
    {
235
        if (!$this->actor) {
236
            throw new Exception("actor is undefined");
237
        }
238
239
        if (array_key_exists('actor', $data)) {
240
            throw new Exception("actor is defined");
241
        }
242
243
        if (array_key_exists('timestamp', $data)) {
244
            throw new Exception("timestamp is defined");
245
        }
246
247
        foreach (['begin', 'end'] as $field) {
248
            if (array_key_exists($field, $data)) {
249
                if (is_string($data[$field])) {
250
                    $data[$field] = $this->getTimestamp($data[$field]);
251
                }
252
            } else {
253
                $data[$field] = 0;
254
            }
255
        }
256
257
        return $data;
258
    }
259
260
    private function processLink($entity, $id)
261
    {
262
        $spaceId = $this->mapper->getSchema()->getSpace($entity)->getId();
263
        $source = $this->mapper->find('_link', [
264
            'entity'   => $spaceId,
265
            'entityId' => $id,
266
        ]);
267
268
        $leafs = [];
269
        foreach ($source as $node) {
270
            foreach ($this->getLeafs($node) as $detail) {
271
                $leafs[] = $detail;
272
            }
273
        }
274
275
        $changeaxis = [];
276
277
        foreach ($leafs as $leaf) {
278
            $current = $leaf;
279
            $ref = [];
280
281
            while ($current) {
282
                if ($current->entity != $spaceId) {
283
                    $ref[$current->entity] = $current->entityId;
284
                }
285
                if ($current->parent) {
286
                    $current = $this->mapper->findOne('_link', $current->parent);
287
                } else {
288
                    $current = null;
289
                }
290
            }
291
292
            $data = [$ref];
293
            if (property_exists($leaf, 'data')) {
294
                $data[] = $leaf->data;
295
            }
296
297
            if (!array_key_exists($leaf->timestamp, $changeaxis)) {
298
                $changeaxis[$leaf->timestamp] = [];
299
            }
300
            $changeaxis[$leaf->timestamp][] = [
301
                'begin' => $leaf->begin,
302
                'end' => $leaf->end,
303
                'data' => $data
304
            ];
305
        }
306
307
        $params = [
308
            'entity' => $spaceId,
309
            'id'     => $id,
310
        ];
311
312
        $this->updateAggregation('link', $params, $changeaxis);
313
    }
314
315
    private function processOverride($params)
316
    {
317
        $params = [
318
            'entity' => $params['entity'],
319
            'id'     => $params['id'],
320
        ];
321
322
        $changeaxis = [];
323
324
        foreach ($this->mapper->find('_override', $params) as $i => $override) {
325
            if (!array_key_exists($override->timestamp, $changeaxis)) {
326
                $changeaxis[$override->timestamp] = [];
327
            }
328
            $changeaxis[$override->timestamp][] = [
329
                'begin' => $override->begin,
330
                'end' => $override->end,
331
                'data' => $override->data,
332
            ];
333
        }
334
335
        $this->updateAggregation('override', $params, $changeaxis);
336
    }
337
338
    private function updateAggregation($type, $params, $changeaxis)
339
    {
340
        $isLink = $type === 'link';
341
        $space = $isLink ? '_link_aggregate' : '_override_aggregate';
342
343
        $timeaxis = [];
344
        foreach ($changeaxis as $timestamp => $changes) {
345
            foreach ($changes as $change) {
346
                foreach (['begin', 'end'] as $field) {
347
                    if (!array_key_exists($change[$field], $timeaxis)) {
348
                        $timeaxis[$change[$field]] = [
349
                            'begin' => $change[$field],
350
                            'end'   => $change[$field],
351
                            'data'  => [],
352
                        ];
353
                    }
354
                }
355
            }
356
        }
357
358
        ksort($changeaxis);
359
        ksort($timeaxis);
360
361
        $nextSliceId = null;
362
        foreach (array_reverse(array_keys($timeaxis)) as $timestamp) {
363
            if ($nextSliceId) {
364
                $timeaxis[$timestamp]['end'] = $nextSliceId;
365
            } else {
366
                $timeaxis[$timestamp]['end'] = 0;
367
            }
368
            $nextSliceId = $timestamp;
369
        }
370
371
        foreach ($this->mapper->find($space, $params) as $state) {
372
            $this->mapper->remove($state);
373
        }
374
375
        $states = [];
376
        foreach ($timeaxis as $state) {
377
            foreach ($changeaxis as $changes) {
378
                foreach ($changes as $change) {
379
                    if ($change['begin'] > $state['begin']) {
380
                        // future override
381
                        continue;
382
                    }
383
                    if ($change['end'] && ($change['end'] < $state['end'] || !$state['end'])) {
384
                        // complete override
385
                        continue;
386
                    }
387
                    if ($isLink) {
388
                        $state['data'][] = $change['data'];
389
                    } else {
390
                        $state['data'] = array_merge($state['data'], $change['data']);
391
                    }
392
                }
393
            }
394
            if (count($state['data'])) {
395
                $states[] = array_merge($state, $params);
396
            }
397
        }
398
399
        // merge states
400
        $clean = $isLink;
401
        while (!$clean) {
402
            $clean = true;
403
            foreach ($states as $i => $state) {
404
                if (array_key_exists($i+1, $states)) {
405
                    $next = $states[$i+1];
406
                    if (!count(array_diff_assoc($state['data'], $next['data']))) {
407
                        $states[$i]['end'] = $next['end'];
408
                        unset($states[$i+1]);
409
                        $states = array_values($states);
410
                        $clean = false;
411
                        break;
412
                    }
413
                }
414
            }
415
        }
416
417
        foreach ($states as $state) {
418
            $this->mapper->create($space, $state);
419
        }
420
    }
421
}
422