Passed
Push — master ( eb9c6b...2046e5 )
by Fabrice
02:42
created

src/Extractors/UniqueKeyExtractorAbstract.php (1 issue)

Labels
Severity
1
<?php
2
3
/*
4
 * This file is part of YaEtl.
5
 *     (c) Fabrice de Stefanis / https://github.com/fab2s/YaEtl
6
 * This source file is licensed under the MIT license which you will
7
 * find in the LICENSE file or at https://opensource.org/licenses/MIT
8
 */
9
10
namespace fab2s\YaEtl\Extractors;
11
12
use fab2s\NodalFlow\NodalFlowException;
13
use fab2s\NodalFlow\YaEtlException;
14
15
/**
16
 * Abstract Class UniqueKeyExtractorAbstract
17
 */
18
abstract class UniqueKeyExtractorAbstract extends DbExtractorAbstract implements JoinableInterface
19
{
20
    /**
21
     * The composite key representation
22
     *
23
     * @var array|string
24
     */
25
    protected $compositeKey;
26
27
    /**
28
     * The unique key name
29
     *
30
     * @var string|null
31
     */
32
    protected $uniqueKeyName;
33
34
    /**
35
     * The unique key alias
36
     *
37
     * @var string|null
38
     */
39
    protected $uniqueKeyAlias;
40
41
    /**
42
     * List of unique key values, used to be joinable
43
     *
44
     * @var array
45
     */
46
    protected $uniqueKeyValues = [];
47
48
    /**
49
     * unique key value buffer, used to align batch sizes
50
     *
51
     * @var array
52
     */
53
    protected $uniqueKeyValueBuffer = [];
54
55
    /**
56
     * This Node's OnClose object if any
57
     *
58
     * @var OnClauseInterface|null
59
     */
60
    protected $onClose;
61
62
    /**
63
     * List of all joiners by their OnClose constraints
64
     *
65
     * @var array of OnClauseInterface
66
     */
67
    protected $joinerOnCloses = [];
68
69
    /**
70
     * The record map
71
     *
72
     * @var array
73
     */
74
    protected $recordMap;
75
76
    /**
77
     * The Joinable we may be joining against
78
     *
79
     * @var JoinableInterface
80
     */
81
    protected $joinFrom;
82
83
    /**
84
     * Generic extraction from tables with unique (composite) key
85
     *
86
     * @param string|null  $extractQuery
87
     * @param array|string $uniqueKeySetup can be either a unique key name as
88
     *                                     string
89
     *                                     `'(table.)compositeKeyName' // ('id' by default)`
90
     *
91
     *                      or an array :
92
     *                      `['(table.)compositeKey1'] // single unique key`
93
     *                      `['(table.)compositeKey1', '(table.)compositeKey2', ] // composite unique key`
94
     *
95
     *                      or an associative array in case you are using aliases :
96
     *                      `[
97
     *                          '(table.)compositeKey1' => 'aliasNameAsInRecord',
98
     *                      ]`
99
     *
100
     *                      and :
101
     *                      `[
102
     *                          '(table.)compositeKey1' => 'aliasNameAsInRecord1',
103
     *                          '(table.)compositeKey2' => 'aliasNameAsInRecord2',
104
     *                          // ...
105
     *                      ]`
106
     *
107
     * @throws NodalFlowException
108
     */
109
    public function __construct($extractQuery = null, $uniqueKeySetup = 'id')
110
    {
111
        $this->configureUniqueKey($uniqueKeySetup);
112
113
        $this->nodeIncrements = array_replace($this->nodeIncrements, [
114
            'num_join' => 0,
115
        ]);
116
117
        parent::__construct($extractQuery);
118
    }
119
120
    /**
121
     * Get this Joiner's ON clause. Only used in Join mode
122
     *
123
     * @return OnClauseInterface|null
124
     */
125
    public function getOnClause()
126
    {
127
        return $this->onClose;
128
    }
129
130
    /**
131
     * Set Joiner's ON clause. Only used in Join mode
132
     *
133
     * @param OnClauseInterface $onClause
134
     *
135
     * @return $this
136
     */
137
    public function setOnClause(OnClauseInterface $onClause)
138
    {
139
        $this->onClose = $onClause;
140
141
        return $this;
142
    }
143
144
    /**
145
     * Register ON clause field mapping. Used by an eventual joiner to this
146
     *
147
     * @param OnClauseInterface $onClause
148
     *
149
     * @return $this
150
     */
151
    public function registerJoinerOnClause(OnClauseInterface $onClause)
152
    {
153
        $this->joinerOnCloses[] = $onClause;
154
155
        return $this;
156
    }
157
158
    /**
159
     * Register the extractor we would be joining against
160
     *
161
     * @param JoinableInterface $joinFrom
162
     *
163
     * @throws YaEtlException
164
     *
165
     * @return $this
166
     */
167
    public function setJoinFrom(JoinableInterface $joinFrom)
168
    {
169
        // at least make sure this joinable extends this very class
170
        // to enforce getRecordMap() type
171
        if (!is_a($joinFrom, self::class)) {
172
            throw new YaEtlException('The extractor joined against is not compatible, expected implementation of: ' . self::class . "\ngot: " . \get_class($joinFrom));
173
        }
174
175
        if (preg_match('`^.+?order\s+by.*?$`is', $this->extractQuery)) {
176
            throw new YaEtlException("A Joiner must not order its query got: $this->extractQuery");
177
        }
178
179
        // since we are joining, we are not a traversable anymore
180
        $this->isATraversable = false;
181
        // and we return a value
182
        $this->isAReturningVal = true;
183
        $this->joinFrom        = $joinFrom;
184
185
        return $this;
186
    }
187
188
    /**
189
     * Get record map, used to allow joiners to join
190
     *
191
     * @param string|null $fromKeyAlias The from unique key to get the map against
192
     *                                  as exposed in the record
193
     *
194
     * @return array [keyValue1, keyValue2, ...]
195
     */
196
    public function getRecordMap($fromKeyAlias = null)
197
    {
198
        return $fromKeyAlias === null ? $this->recordMap : $this->recordMap[$fromKeyAlias];
199
    }
200
201
    /**
202
     * Trigger extract
203
     *
204
     * @param mixed $param
205
     *
206
     * @throws YaEtlException
207
     *
208
     * @return bool
209
     */
210
    public function extract($param = null)
211
    {
212
        if (isset($this->joinFrom)) {
213
            return $this->joinExtract();
214
        }
215
216
        // enforce limit if any is set
217
        if ($this->isLimitReached()) {
218
            return false;
219
        }
220
221
        $this->enforceBatchSize();
222
        if ($this->fetchRecords()) {
223
            $this->incrementOffset()
224
                ->genRecordMap();
225
226
            return true;
227
        }
228
229
        return false;
230
    }
231
232
    /**
233
     * Enforce batch size consistency
234
     *
235
     * @return $this
236
     */
237
    public function enforceBatchSize()
238
    {
239
        if (isset($this->joinFrom)) {
240
            // obey batch size to allow fromer to fetch a huge amount of records
241
            // while keeping the "where in" query size under control by splitting
242
            // it into several chunks.
243
            // append uniqueKeyValues to uniqueKeyValueBuffer
244
            $this->uniqueKeyValueBuffer = \array_replace($this->uniqueKeyValueBuffer, $this->uniqueKeyValues);
245
            // only keep batchSize
246
            $this->uniqueKeyValues      = \array_slice($this->uniqueKeyValueBuffer, 0, $this->batchSize, true);
247
            // drop consumed keys
248
            $this->uniqueKeyValueBuffer = \array_slice($this->uniqueKeyValueBuffer, $this->batchSize, null, true);
249
250
            return $this;
251
        }
252
253
        parent::enforceBatchSize();
254
255
        return $this;
256
    }
257
258
    /**
259
     * Execute the Join
260
     *
261
     * @param mixed $record
262
     *
263
     * @throws YaEtlException
264
     *
265
     * @return mixed The result of the join
266
     */
267
    public function exec($record)
268
    {
269
        $uniqueKeyValue = $record[$this->uniqueKeyAlias];
270
271
        if (isset($this->extracted[$uniqueKeyValue])) {
272
            $joinRecord = $this->extracted[$uniqueKeyValue];
273
            unset($this->extracted[$uniqueKeyValue]);
274
            if ($joinRecord === false) {
275
                // skip record
276
                $this->carrier->continueFlow();
277
278
                return $record;
279
            }
280
281
            ++$this->numRecords;
282
283
            return $this->onClose-> /* @scrutinizer ignore-call */ merge($record, $joinRecord);
284
        }
285
286
        if ($this->extract()) {
287
            return $this->exec($record);
288
        }
289
290
        // something is wrong as uniqueKeyValueBuffer should
291
        // never run out until the fromer stop providing records
292
        // which means we do not want to reach here
293
        throw new YaEtlException('Record map mismatch between Joiner ' . \get_class($this) . ' and Fromer ' . \get_class($this->joinFrom));
294
    }
295
296
    /**
297
     * Trigger an extract in join mode
298
     *
299
     * @throws YaEtlException
300
     *
301
     * @return bool
302
     */
303
    protected function joinExtract()
304
    {
305
        // join mode, get record map
306
        $this->uniqueKeyValues = $this->joinFrom->getRecordMap($this->onClose->getFromKeyAlias());
0 ignored issues
show
The method getFromKeyAlias() does not exist on null. ( Ignorable by Annotation )

If this is a false-positive, you can also ignore this issue in your code via the ignore-call  annotation

306
        $this->uniqueKeyValues = $this->joinFrom->getRecordMap($this->onClose->/** @scrutinizer ignore-call */ getFromKeyAlias());

This check looks for calls to methods that do not seem to exist on a given type. It looks for the method on the type itself as well as in inherited classes or implemented interfaces.

This is most likely a typographical error or the method has been renamed.

Loading history...
307
        // limit does not apply in join mode
308
        $this->enforceBatchSize();
309
        if (empty($this->uniqueKeyValues)) {
310
            return false;
311
        }
312
313
        if ($this->fetchRecords()) {
314
            // gen record map before we set defaults
315
            $this->genRecordMap()
316
                ->setDefaultExtracted();
317
            $this->getCarrier()->getFlowMap()->incrementNode($this->getId(), 'num_join');
318
319
            return true;
320
        }
321
322
        return false;
323
    }
324
325
    /**
326
     * Configure the unique key
327
     *
328
     * @param array|string $uniqueKeySetup can be either a unique key name as
329
     *                                     string
330
     *                                     `'(table.)compositeKeyName' // ('id' by default)`
331
     *
332
     *                      or an array :
333
     *                      `['(table.)compositeKey1'] // single unique key`
334
     *                      `['(table.)compositeKey1', '(table.)compositeKey2', ] // composite unique key`
335
     *
336
     *                      or an associative array in case you are using aliases :
337
     *                      `[
338
     *                          '(table.)compositeKey1' => 'aliasNameAsInRecord',
339
     *                      ]`
340
     *
341
     *                      and :
342
     *                      `[
343
     *                          '(table.)compositeKey1' => 'aliasNameAsInRecord1',
344
     *                          '(table.)compositeKey2' => 'aliasNameAsInRecord2',
345
     *                          // ...
346
     *                      ]`
347
     *
348
     * @return $this
349
     */
350
    protected function configureUniqueKey($uniqueKeySetup)
351
    {
352
        $uniqueKeySetup            = \is_array($uniqueKeySetup) ? $uniqueKeySetup : [$uniqueKeySetup];
353
        $this->compositeKey        = [];
354
        $this->uniqueKeyName       = null;
355
        $this->uniqueKeyAlias      = null;
356
        foreach ($uniqueKeySetup as $key => $value) {
357
            if (\is_numeric($key)) {
358
                $compositeKeyName  = $this->cleanUpKeyName($value);
359
                $compositeKeyParts = \explode('.', $compositeKeyName);
360
                $compositeKeyAlias = \end($compositeKeyParts);
361
            } else {
362
                $compositeKeyName  = $this->cleanUpKeyName($key);
363
                $compositeKeyAlias = $this->cleanUpKeyName($value);
364
            }
365
366
            $this->compositeKey[$compositeKeyName] = $compositeKeyAlias;
367
        }
368
369
        if (\count($this->compositeKey) === 1) {
370
            $this->uniqueKeyName  = \key($this->compositeKey);
371
            $this->uniqueKeyAlias = \current($this->compositeKey);
372
        }
373
374
        return $this;
375
    }
376
377
    /**
378
     * Clean up key names
379
     *
380
     * @param string $keyName
381
     *
382
     * @return string
383
     */
384
    protected function cleanUpKeyName($keyName)
385
    {
386
        return \trim($keyName, '` ');
387
    }
388
389
    /**
390
     * Prepare record set to obey join mode eg return record = true
391
     * to break branch execution when no match are found in join more
392
     * or default to be later merged in left join mode
393
     *
394
     * @return $this
395
     */
396
    protected function setDefaultExtracted()
397
    {
398
        if ($this->joinFrom !== null) {
399
            $defaultRecord    = $this->onClose->isLeftJoin() ? $this->onClose->getDefaultRecord() : false;
400
            $defaultExtracted = \array_fill_keys($this->uniqueKeyValues, $defaultRecord);
401
402
            $this->extracted = \array_replace($defaultExtracted, /* @scrutinizer ignore-type */ $this->extracted);
403
        }
404
405
        return $this;
406
    }
407
408
    /**
409
     * Generate record map
410
     *
411
     * @throws YaEtlException
412
     *
413
     * @return $this
414
     */
415
    protected function genRecordMap()
416
    {
417
        // here we need to build record map ready for all joiners
418
        $this->recordMap = [];
419
        foreach ($this->joinerOnCloses as $onClose) {
420
            $fromKeyAlias = $onClose->getFromKeyAlias();
421
            if (isset($this->recordMap[$fromKeyAlias])) {
422
                // looks like there is more than
423
                // one joiner on this key
424
                continue;
425
            }
426
427
            // generate record map
428
            $this->recordMap[$fromKeyAlias] = [];
429
            $map                            = &$this->recordMap[$fromKeyAlias];
430
            // we do not want to map defaults here as we do not want joiners
431
            // to this to join on null
432
            // we could optimize a little bit for cases where
433
            // $this->extracted is an indexed array on the proper key but ...
434
            foreach ($this->extracted as $record) {
435
                if (!isset($record[$fromKeyAlias])) {
436
                    // Since we do not enforce key alias existence during init
437
                    // we have to do it here
438
                    throw new YaEtlException("From Key Alias not found in record: $fromKeyAlias");
439
                }
440
441
                $fromKeyValue       = $record[$fromKeyAlias];
442
                $map[$fromKeyValue] = $fromKeyValue;
443
            }
444
        }
445
446
        return $this;
447
    }
448
}
449