UniqueKeyExtractorAbstract::genRecordMap()   A
last analyzed

Complexity

Conditions 5
Paths 5

Size

Total Lines 32
Code Lines 13

Duplication

Lines 0
Ratio 0 %

Importance

Changes 1
Bugs 0 Features 0
Metric Value
cc 5
eloc 13
c 1
b 0
f 0
nc 5
nop 0
dl 0
loc 32
rs 9.5222
1
<?php
2
3
/*
4
 * This file is part of YaEtl
5
 *     (c) Fabrice de Stefanis / https://github.com/fab2s/YaEtl
6
 * This source file is licensed under the MIT license which you will
7
 * find in the LICENSE file or at https://opensource.org/licenses/MIT
8
 */
9
10
namespace fab2s\YaEtl\Extractors;
11
12
use fab2s\NodalFlow\NodalFlowException;
13
use fab2s\YaEtl\YaEtlException;
14
15
/**
16
 * Abstract Class UniqueKeyExtractorAbstract
17
 */
18
abstract class UniqueKeyExtractorAbstract extends DbExtractorAbstract implements JoinableInterface
19
{
20
    /**
21
     * The joined record collection
22
     *
23
     * @var array
24
     */
25
    protected $joinedRecords;
26
27
    /**
28
     * The composite key representation
29
     *
30
     * @var array|string
31
     */
32
    protected $compositeKey;
33
34
    /**
35
     * The unique key name
36
     *
37
     * @var string|null
38
     */
39
    protected $uniqueKeyName;
40
41
    /**
42
     * The unique key alias
43
     *
44
     * @var string|null
45
     */
46
    protected $uniqueKeyAlias;
47
48
    /**
49
     * List of unique key values, used to be joinable
50
     *
51
     * @var array
52
     */
53
    protected $uniqueKeyValues = [];
54
55
    /**
56
     * unique key value buffer, used to align batch sizes
57
     *
58
     * @var array
59
     */
60
    protected $uniqueKeyValueBuffer = [];
61
62
    /**
63
     * This Node's OnClose object if any
64
     *
65
     * @var OnClauseInterface|null
66
     */
67
    protected $onClose;
68
69
    /**
70
     * List of all joiners by their OnClose constraints
71
     *
72
     * @var array of OnClauseInterface
73
     */
74
    protected $joinerOnCloses = [];
75
76
    /**
77
     * The record map
78
     *
79
     * @var array
80
     */
81
    protected $recordMap;
82
83
    /**
84
     * The Joinable we may be joining against
85
     *
86
     * @var JoinableInterface
87
     */
88
    protected $joinFrom;
89
90
    /**
91
     * Generic extraction from tables with unique (composite) key
92
     *
93
     * @param string|null  $extractQuery
94
     * @param array|string $uniqueKeySetup can be either a unique key name as
95
     *                                     string
96
     *                                     `'(table.)compositeKeyName' // ('id' by default)`
97
     *
98
     *                      or an array :
99
     *                      `['(table.)compositeKey1'] // single unique key`
100
     *                      `['(table.)compositeKey1', '(table.)compositeKey2', ] // composite unique key`
101
     *
102
     *                      or an associative array in case you are using aliases :
103
     *                      `[
104
     *                          '(table.)compositeKey1' => 'aliasNameAsInRecord',
105
     *                      ]`
106
     *
107
     *                      and :
108
     *                      `[
109
     *                          '(table.)compositeKey1' => 'aliasNameAsInRecord1',
110
     *                          '(table.)compositeKey2' => 'aliasNameAsInRecord2',
111
     *                          // ...
112
     *                      ]`
113
     *
114
     * @throws NodalFlowException
115
     */
116
    public function __construct(?string $extractQuery = null, $uniqueKeySetup = 'id')
117
    {
118
        $this->configureUniqueKey($uniqueKeySetup);
119
120
        $this->nodeIncrements = array_replace($this->nodeIncrements, [
121
            'num_join' => 0,
122
        ]);
123
124
        parent::__construct($extractQuery);
125
    }
126
127
    /**
128
     * Get this Joiner's ON clause. Only used in Join mode
129
     *
130
     * @return OnClauseInterface|null
131
     */
132
    public function getOnClause(): ?OnClauseInterface
133
    {
134
        return $this->onClose;
135
    }
136
137
    /**
138
     * Set Joiner's ON clause. Only used in Join mode
139
     *
140
     * @param OnClauseInterface $onClause
141
     *
142
     * @return $this
143
     */
144
    public function setOnClause(OnClauseInterface $onClause): JoinableInterface
145
    {
146
        $this->onClose = $onClause;
147
148
        return $this;
149
    }
150
151
    /**
152
     * Register ON clause field mapping. Used by an eventual joiner to this
153
     *
154
     * @param OnClauseInterface $onClause
155
     *
156
     * @return $this
157
     */
158
    public function registerJoinerOnClause(OnClauseInterface $onClause): JoinableInterface
159
    {
160
        $this->joinerOnCloses[] = $onClause;
161
162
        return $this;
163
    }
164
165
    /**
166
     * Register the extractor we would be joining against
167
     *
168
     * @param JoinableInterface $joinFrom
169
     *
170
     * @throws YaEtlException
171
     *
172
     * @return $this
173
     */
174
    public function setJoinFrom(JoinableInterface $joinFrom): JoinableInterface
175
    {
176
        // at least make sure this joinable extends this very class
177
        // to enforce getRecordMap() type
178
        if (!is_a($joinFrom, self::class)) {
179
            throw new YaEtlException('The extractor joined against is not compatible, expected implementation of: ' . self::class . "\ngot: " . \get_class($joinFrom));
180
        }
181
182
        // since we are joining, we are not a traversable anymore
183
        $this->isATraversable = false;
184
        // and we return a value
185
        $this->isAReturningVal = true;
186
        $this->joinFrom        = $joinFrom;
187
188
        return $this;
189
    }
190
191
    /**
192
     * Get record map, used to allow joiners to join
193
     *
194
     * @param string|null $fromKeyAlias The from unique key to get the map against
195
     *                                  as exposed in the record
196
     *
197
     * @return array [keyValue1, keyValue2, ...]
198
     */
199
    public function getRecordMap(?string $fromKeyAlias = null)
200
    {
201
        return $fromKeyAlias === null ? $this->recordMap : $this->recordMap[$fromKeyAlias];
202
    }
203
204
    /**
205
     * Trigger extract
206
     *
207
     * @param mixed $param
208
     *
209
     * @throws YaEtlException
210
     *
211
     * @return bool
212
     */
213
    public function extract($param = null): bool
214
    {
215
        if (isset($this->joinFrom)) {
216
            return $this->joinExtract();
217
        }
218
219
        // enforce limit if any is set
220
        if ($this->isLimitReached()) {
221
            return false;
222
        }
223
224
        $this->enforceBatchSize();
225
        if ($this->fetchRecords()) {
226
            $this->incrementOffset()
227
                ->genRecordMap();
228
229
            return true;
230
        }
231
232
        return false;
233
    }
234
235
    /**
236
     * Enforce batch size consistency
237
     *
238
     * @return static
239
     */
240
    public function enforceBatchSize(): ExtractorBatchLimitInterface
241
    {
242
        if (isset($this->joinFrom)) {
243
            // obey batch size to allow fromer to fetch a huge amount of records
244
            // while keeping the "where in" query size under control by splitting
245
            // it into several chunks.
246
            // append uniqueKeyValues to uniqueKeyValueBuffer
247
            $this->uniqueKeyValueBuffer = \array_replace($this->uniqueKeyValueBuffer, $this->uniqueKeyValues);
248
            // only keep batchSize
249
            $this->uniqueKeyValues      = \array_slice($this->uniqueKeyValueBuffer, 0, $this->batchSize, true);
250
            // drop consumed keys
251
            $this->uniqueKeyValueBuffer = \array_slice($this->uniqueKeyValueBuffer, $this->batchSize, null, true);
252
253
            return $this;
254
        }
255
256
        parent::enforceBatchSize();
257
258
        return $this;
259
    }
260
261
    /**
262
     * Execute the Join
263
     *
264
     * @param mixed $record
265
     *
266
     * @throws YaEtlException
267
     *
268
     * @return mixed The result of the join
269
     */
270
    public function exec($record = null)
271
    {
272
        $uniqueKeyValue = $record[$this->uniqueKeyAlias];
273
274
        if (isset($this->joinedRecords[$uniqueKeyValue])) {
275
            $joinRecord = $this->joinedRecords[$uniqueKeyValue];
276
            unset($this->joinedRecords[$uniqueKeyValue]);
277
            if ($joinRecord === false) {
278
                // skip record
279
                $this->getCarrier()->continueFlow();
280
281
                return $record;
282
            }
283
284
            return $this->onClose-> /* @scrutinizer ignore-call */ merge($record, $joinRecord);
285
        }
286
287
        if ($this->joinExtract()) {
288
            return $this->exec($record);
289
        }
290
291
        // something is wrong as uniqueKeyValueBuffer should
292
        // never run out until the fromer stop providing records
293
        // which means we do not want to reach here
294
        throw new YaEtlException('Record map mismatch between Joiner ' . \get_class($this) . ' and Fromer ' . \get_class($this->joinFrom));
295
    }
296
297
    /**
298
     * Trigger an extract in join mode
299
     *
300
     * @throws YaEtlException
301
     *
302
     * @return bool
303
     */
304
    protected function joinExtract(): bool
305
    {
306
        // join mode, get record map
307
        $this->uniqueKeyValues = $this->joinFrom->getRecordMap($this->onClose-> /* @scrutinizer ignore-call */ getFromKeyAlias());
308
        // limit does not apply in join mode
309
        $this->enforceBatchSize();
310
        if (empty($this->uniqueKeyValues)) {
311
            return false;
312
        }
313
314
        if ($this->fetchJoinedRecords()) {
315
            $this->getCarrier()->getFlowMap()->incrementNode($this->getId(), 'num_join');
316
            // gen record map before we set defaults
317
            $this->genRecordMap()
318
                ->setDefaultExtracted();
319
320
            return true;
321
        }
322
323
        return false;
324
    }
325
326
    /**
327
     * Configure the unique key
328
     *
329
     * @param array|string $uniqueKeySetup can be either a unique key name as
330
     *                                     string
331
     *                                     `'(table.)compositeKeyName' // ('id' by default)`
332
     *
333
     *                      or an array :
334
     *                      `['(table.)compositeKey1'] // single unique key`
335
     *                      `['(table.)compositeKey1', '(table.)compositeKey2', ] // composite unique key`
336
     *
337
     *                      or an associative array in case you are using aliases :
338
     *                      `[
339
     *                          '(table.)compositeKey1' => 'aliasNameAsInRecord',
340
     *                      ]`
341
     *
342
     *                      and :
343
     *                      `[
344
     *                          '(table.)compositeKey1' => 'aliasNameAsInRecord1',
345
     *                          '(table.)compositeKey2' => 'aliasNameAsInRecord2',
346
     *                          // ...
347
     *                      ]`
348
     *
349
     * @return static
350
     */
351
    protected function configureUniqueKey($uniqueKeySetup): self
352
    {
353
        $uniqueKeySetup            = \is_array($uniqueKeySetup) ? $uniqueKeySetup : [$uniqueKeySetup];
354
        $this->compositeKey        = [];
355
        $this->uniqueKeyName       = null;
356
        $this->uniqueKeyAlias      = null;
357
        foreach ($uniqueKeySetup as $key => $value) {
358
            if (\is_numeric($key)) {
359
                $compositeKeyName  = $this->cleanUpKeyName($value);
360
                $compositeKeyParts = \explode('.', $compositeKeyName);
361
                $compositeKeyAlias = \end($compositeKeyParts);
362
            } else {
363
                $compositeKeyName  = $this->cleanUpKeyName($key);
364
                $compositeKeyAlias = $this->cleanUpKeyName($value);
365
            }
366
367
            $this->compositeKey[$compositeKeyName] = $compositeKeyAlias;
368
        }
369
370
        if (\count($this->compositeKey) === 1) {
371
            $this->uniqueKeyName  = \key($this->compositeKey);
372
            $this->uniqueKeyAlias = \current($this->compositeKey);
373
        }
374
375
        return $this;
376
    }
377
378
    /**
379
     * Clean up key names
380
     *
381
     * @param string $keyName
382
     *
383
     * @return string
384
     */
385
    protected function cleanUpKeyName($keyName): string
386
    {
387
        return \trim($keyName, '` ');
388
    }
389
390
    /**
391
     * Prepare record set to obey join mode eg return record = true
392
     * to break branch execution when no match are found in join more
393
     * or default to be later merged in left join mode
394
     *
395
     * @return static
396
     */
397
    protected function setDefaultExtracted(): self
398
    {
399
        $defaultRecord    = $this->onClose-> /* @scrutinizer ignore-call */ isLeftJoin() ? $this->onClose-> /* @scrutinizer ignore-call */ getDefaultRecord() : false;
400
        $defaultExtracted = \array_fill_keys($this->uniqueKeyValues, $defaultRecord);
401
402
        $this->joinedRecords = \array_replace($defaultExtracted, /* @scrutinizer ignore-type */ $this->joinedRecords);
403
404
        return $this;
405
    }
406
407
    /**
408
     * Generate record map
409
     *
410
     * @throws YaEtlException
411
     *
412
     * @return static
413
     */
414
    protected function genRecordMap(): self
415
    {
416
        // here we need to build record map ready for all joiners
417
        $this->recordMap = [];
418
        foreach ($this->joinerOnCloses as $onClose) {
419
            $fromKeyAlias = $onClose->getFromKeyAlias();
420
            if (isset($this->recordMap[$fromKeyAlias])) {
421
                // looks like there is more than
422
                // one joiner on this key
423
                continue;
424
            }
425
426
            // generate record map
427
            $this->recordMap[$fromKeyAlias] = [];
428
            $map                            = &$this->recordMap[$fromKeyAlias];
429
            // we do not want to map defaults here as we do not want joiners
430
            // to this to join on null
431
            // we could optimize a little bit for cases where
432
            // $this->joinedRecords is an indexed array on the proper key but ...
433
            foreach ($this->getExtracted() as $record) {
434
                if (!isset($record[$fromKeyAlias])) {
435
                    // Since we do not enforce key alias existence during init
436
                    // we have to do it here
437
                    throw new YaEtlException("From Key Alias not found in record: $fromKeyAlias");
438
                }
439
440
                $fromKeyValue       = $record[$fromKeyAlias];
441
                $map[$fromKeyValue] = $fromKeyValue;
442
            }
443
        }
444
445
        return $this;
446
    }
447
448
    /**
449
     * fetch records when joining against another extractor
450
     * They should still be send to setExtracted to be made
451
     * available in map generation for eventual joiners to
452
     * this joiner and also fill up joinedRecords as an
453
     * associative array indexed by the proper join key
454
     *
455
     * @return bool
456
     */
457
    abstract protected function fetchJoinedRecords(): bool;
458
}
459