Passed
Push — master ( fe9856...bbf0da )
by Emmanuel
02:00
created

DB::loadDataForSqlsrv()   A

Complexity

Conditions 4
Paths 4

Size

Total Lines 21
Code Lines 11

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 0
CRAP Score 20

Importance

Changes 0
Metric Value
dl 0
loc 21
ccs 0
cts 9
cp 0
rs 9.0534
c 0
b 0
f 0
cc 4
eloc 11
nc 4
nop 2
crap 20
1
<?php
2
/**
3
 * neuralyzer : Data Anonymization Library and CLI Tool
4
 *
5
 * PHP Version 7.1
6
 *
7
 * @author Emmanuel Dyan
8
 * @author Rémi Sauvat
9
 * @copyright 2018 Emmanuel Dyan
10
 *
11
 * @package edyan/neuralyzer
12
 *
13
 * @license GNU General Public License v2.0
14
 *
15
 * @link https://github.com/edyan/neuralyzer
16
 */
17
18
namespace Edyan\Neuralyzer\Anonymizer;
19
20
use Doctrine\DBAL\Connection;
21
use Doctrine\DBAL\Configuration as DbalConfiguration;
22
use Doctrine\DBAL\DriverManager as DbalDriverManager;
23
use Doctrine\DBAL\Query\QueryBuilder;
24
use Edyan\Neuralyzer\Exception\NeuralizerException;
25
use Edyan\Neuralyzer\Utils\CSVWriter;
26
use Edyan\Neuralyzer\Utils\DBUtils;
27
28
/**
29
 * Implement AbstractAnonymizer for DB, to read and write data via Doctrine DBAL
30
 */
31
class DB extends AbstractAnonymizer
32
{
33
    /**
34
     * Doctrine DB Adapter
35
     * @var Connection
36
     */
37
    private $conn;
38
39
    /**
40
     * Various Options for drivers
41
     * @var array
42
     */
43
    private $driverOptions = [
44
        'pdo_mysql' => [1001 => true], // 1001 = \PDO::MYSQL_ATTR_LOCAL_INFILE
45
    ];
46
47
    /**
48
     * Various generic utils
49
     * @var DBUtils
50
     */
51
    private $dbUtils;
52
53
    /**
54
     * Primary Key
55
     * @var string
56
     */
57
    private $priKey;
58
59
    /**
60
     * Define the way we update / insert data
61
     * @var string
62
     */
63
    private $mode = 'queries';
64
65
    /**
66
     * Contains queries if returnRes is true
67
     * @var array
68
     */
69
    private $queries = [];
70
71
    /**
72
     * File resource for the csv (batch mode)
73
     * @var CSVWriter
74
     */
75
    private $csv;
76
77
    /**
78
     * Define available update modes
79
     * @var array
80
     */
81
    private $updateMode = [
82
        'queries' => 'doUpdateByQueries',
83
        'batch' => 'doBatchUpdate'
84
    ];
85
86
    /**
87
     * Define available insert modes
88
     * @var array
89
     */
90
    private $insertMode = [
91
        'queries' => 'doInsertByQueries',
92
        'batch' => 'doBatchInsert'
93
    ];
94
95
    /**
96
     * Init connection
97
     *
98
     * @param $params   Parameters to send to Doctrine DB
99
     */
100 10
    public function __construct(array $params)
101
    {
102
        // Set specific options
103 10
        $params['driverOptions'] = array_key_exists($params['driver'], $this->driverOptions)
104 10
            ? $this->driverOptions[$params['driver']]
105
            : [];
106
107 10
        $this->conn = DbalDriverManager::getConnection($params, new DbalConfiguration());
108 10
        $this->conn->setFetchMode(\Doctrine\DBAL\FetchMode::ASSOCIATIVE);
109
110 10
        $this->dbUtils = new DBUtils($this->conn);
111 10
    }
112
113
114
    /**
115
     * Get Doctrine Connection
116
     *
117
     * @return Connection
118
     */
119
    public function getConn(): Connection
120
    {
121
        return $this->conn;
122
    }
123
124
125
    /**
126
     * Set the mode for update / insert
127
     * @param string $mode
128
     * @return DB
129
     */
130 2
    public function setMode(string $mode): DB
131
    {
132 2
        if (!in_array($mode, ['queries', 'batch'])) {
133 1
            throw new NeuralizerException('Mode could be only queries or batch');
134
        }
135
136 1
        if ($mode === 'batch') {
137 1
            $driver = $this->conn->getDriver()->getName();
138 1
            $enclosure = (strpos($driver, 'pgsql') || strpos($driver, 'sqlsrv'))
139
                ? chr(0)
140 1
                : '"';
141 1
            $this->csv = new CSVWriter();
142 1
            $this->csv->setCsvControl('|', $enclosure);
143
        }
144
145 1
        $this->mode = $mode;
146
147 1
        return $this;
148
    }
149
150
151
    /**
152
     * Process an entity by reading / writing to the DB
153
     *
154
     * @param string        $entity
155
     * @param callable|null $callback
156
     *
157
     * @return void|array
158
     */
159 9
    public function processEntity(string $entity, callable $callback = null): array
160
    {
161 9
        $this->dbUtils->assertTableExists($entity);
162
163 8
        $this->priKey = $this->dbUtils->getPrimaryKey($entity);
164 7
        $this->entityCols = $this->dbUtils->getTableCols($entity);
165 7
        $this->entity = $entity;
166
167 7
        $actionsOnThatEntity = $this->whatToDoWithEntity();
168 5
        $this->queries = [];
169
170
        // Wrap everything in a transaction
171
        try {
172 5
            $this->conn->beginTransaction();
173
174 5
            if ($actionsOnThatEntity & self::TRUNCATE_TABLE) {
175 3
                $where = $this->getWhereConditionInConfig();
176 3
                $query = $this->runDelete($where);
177 2
                ($this->returnRes === true ? array_push($this->queries, $query) : '');
178
            }
179
180 4
            if ($actionsOnThatEntity & self::UPDATE_TABLE) {
181 3
                $this->updateData($callback);
182
            }
183
184 4
            if ($actionsOnThatEntity & self::INSERT_TABLE) {
185 1
                $this->insertData($callback);
186
            }
187
188 4
            $this->conn->commit();
189 1
        } catch (\Exception $e) {
190 1
            $this->conn->rollback();
191 1
            $this->conn->close(); // To avoid locks
192
193 1
            throw $e;
194
        }
195
196 4
        return $this->queries;
197
    }
198
199
200
    /**
201
     * Execute the Delete with Doctrine Query Builder
202
     *
203
     * @param string $where
204
     *
205
     * @return string
206
     */
207 3
    private function runDelete(string $where): string
208
    {
209 3
        $queryBuilder = $this->conn->createQueryBuilder();
210 3
        $queryBuilder = $queryBuilder->delete($this->entity);
211 3
        if (!empty($where)) {
212 2
            $queryBuilder = $queryBuilder->where($where);
213
        }
214 3
        $sql = $queryBuilder->getSQL();
215
216 3
        if ($this->pretend === true) {
217 1
            return $sql;
218
        }
219
220 2
        $queryBuilder->execute();
221
222 1
        return $sql;
223
    }
224
225
226
    /**
227
     * Update data of table
228
     *
229
     * @param  callable $callback
230
     */
231 3
    private function updateData($callback = null): void
232
    {
233 3
        $queryBuilder = $this->conn->createQueryBuilder();
234 3
        if ($this->limit === 0) {
235 3
            $this->setLimit($this->dbUtils->countResults($this->entity));
236
        }
237
238 3
        $startAt = 0; // The first part of the limit (offset)
239 3
        $num = 0; // The number of rows updated
240 3
        while ($num < $this->limit) {
241
            $rows = $queryBuilder
242 3
                        ->select('*')->from($this->entity)
243 3
                        ->setFirstResult($startAt)->setMaxResults($this->batchSize)
244 3
                        ->orderBy($this->priKey)
245 3
                        ->execute();
246
247
            // I need to read line by line if I have to update the table
248
            // to make sure I do update by update (slower but no other choice for now)
249 3
            foreach ($rows as $row) {
0 ignored issues
show
Bug introduced by
The expression $rows of type object<Doctrine\DBAL\Driver\Statement>|integer is not guaranteed to be traversable. How about adding an additional type check?

There are different options of fixing this problem.

  1. If you want to be on the safe side, you can add an additional type-check:

    $collection = json_decode($data, true);
    if ( ! is_array($collection)) {
        throw new \RuntimeException('$collection must be an array.');
    }
    
    foreach ($collection as $item) { /** ... */ }
    
  2. If you are sure that the expression is traversable, you might want to add a doc comment cast to improve IDE auto-completion and static analysis:

    /** @var array $collection */
    $collection = json_decode($data, true);
    
    foreach ($collection as $item) { /** .. */ }
    
  3. Mark the issue as a false-positive: Just hover the remove button, in the top-right corner of this issue for more options.

Loading history...
250
                // Call the right method according to the mode
251 3
                $this->{$this->updateMode[$this->mode]}($row);
252
253 3
                if (!is_null($callback)) {
254 1
                    $callback(++$num);
255
                }
256
                // Have to exit now as we have reached the max
257 3
                if ($num >= $this->limit) {
258 3
                    break 2;
259
                }
260
            }
261
            // Move the offset
262
            // Make sure the loop ends if we have nothing to process
263 2
            $num = $startAt += $this->batchSize;
264
        }
265
        // Run a final method if defined
266 3
        if ($this->mode === 'batch') {
267
            $this->loadDataInBatch('update');
268
        }
269 3
    }
270
271
272
    /**
273
     * Execute the Update with Doctrine QueryBuilder
274
     * @SuppressWarnings("unused") - Used dynamically
275
     * @param  array $row  Full row
276
     */
277 3
    private function doUpdateByQueries(array $row): void
278
    {
279 3
        $data = $this->generateFakeData();
280
281 3
        $queryBuilder = $this->conn->createQueryBuilder();
282 3
        $queryBuilder = $queryBuilder->update($this->entity);
283 3
        foreach ($data as $field => $value) {
284 3
            $value = empty($row[$field]) ? '' : $value;
285 3
            $queryBuilder = $queryBuilder->set(
286 3
                $field,
287 3
                $this->dbUtils->getCondition($field, $this->entityCols[$field])
288
            );
289 3
            $queryBuilder = $queryBuilder->setParameter(":$field", $value);
290
        }
291 3
        $queryBuilder = $queryBuilder->where("{$this->priKey} = :{$this->priKey}");
292 3
        $queryBuilder = $queryBuilder->setParameter(":{$this->priKey}", $row[$this->priKey]);
293
294 3
        $this->returnRes === true ?
295 3
            array_push($this->queries, $this->dbUtils->getRawSQL($queryBuilder)) :
296
            '';
297
298 3
        if ($this->pretend === false) {
299
            $queryBuilder->execute();
300
        }
301 3
    }
302
303
304
    /**
305
     * Write the line required for a later LOAD DATA (or \copy)
306
     * @SuppressWarnings("unused") - Used dynamically
307
     * @param  array $row  Full row
308
     */
309
    private function doBatchUpdate(array $row): void
310
    {
311
        $fakeData = $this->generateFakeData();
312
        $data = [];
313
        // Go trough all fields, and take a value by priority
314
        foreach (array_keys($this->entityCols) as $field) {
315
            // First take the fake data
316
            $data[$field] = $row[$field];
317
            if (!empty($row[$field]) && array_key_exists($field, $fakeData)) {
318
                $data[$field] = $fakeData[$field];
319
            }
320
        }
321
322
        $this->csv->write($data);
323
    }
324
325
326
    /**
327
     * Insert data into table
328
     * @param  callable $callback
329
     */
330 1
    private function insertData($callback = null): void
331
    {
332 1
        for ($rowNum = 1; $rowNum <= $this->limit; $rowNum++) {
333
            // Call the right method according to the mode
334 1
            $this->{$this->insertMode[$this->mode]}($rowNum);
335
336 1
            if (!is_null($callback)) {
337 1
                $callback($rowNum);
338
            }
339
        }
340
341
        // Run a final method if defined
342 1
        if ($this->mode === 'batch') {
343 1
            $this->loadDataInBatch('insert');
344
        }
345 1
    }
346
347
348
    /**
349
     * Execute an INSERT with Doctrine QueryBuilder
350
     * @SuppressWarnings("unused") - Used dynamically
351
     */
352
    private function doInsertByQueries(): void
353
    {
354
        $data = $this->generateFakeData();
355
356
        $queryBuilder = $this->conn->createQueryBuilder();
357
        $queryBuilder = $queryBuilder->insert($this->entity);
358
        foreach ($data as $field => $value) {
359
            $queryBuilder = $queryBuilder->setValue($field, ":$field");
360
            $queryBuilder = $queryBuilder->setParameter(":$field", $value);
361
        }
362
363
        $this->returnRes === true ?
364
            array_push($this->queries, $this->dbUtils->getRawSQL($queryBuilder)) :
365
            '';
366
367
        if ($this->pretend === false) {
368
            $queryBuilder->execute();
369
        }
370
    }
371
372
373
    /**
374
     * Write the line required for a later LOAD DATA (or \copy)
375
     * @SuppressWarnings("unused") - Used dynamically
376
     */
377 1
    private function doBatchInsert(): void
378
    {
379 1
        $data = $this->generateFakeData();
380 1
        $this->csv->write($data);
381 1
    }
382
383
384
    /**
385
     * If a file has been created for the batch mode, destroy it
386
     * @SuppressWarnings("unused") - Used dynamically
387
     * @param string $mode "update" or "insert"
388
     */
389 1
    private function loadDataInBatch(string $mode): void
390
    {
391 1
        $dbType = substr($this->conn->getDriver()->getName(), 4);
392 1
        $method = 'loadDataFor' . ucfirst($dbType);
393
394 1
        $fields = array_keys($this->configEntites[$this->entity]['cols']);
395
        // Replace by all fields if update as we have to load everything
396 1
        if ($mode === 'update') {
397
            $fields = array_keys($this->entityCols);
398
        }
399
400 1
        $sql = $this->{$method}($fields, $mode);
401
402 1
        $this->returnRes === true ? array_push($this->queries, $sql) : '';
403
404
        // Destroy the file
405 1
        unlink($this->csv->getRealPath());
406 1
    }
407
408
409
    /**
410
     * Load Data for MySQL (Specific Query)
411
     * @SuppressWarnings("unused") - Used dynamically
412
     * @param  array   $fields
413
     * @param  string  $mode  Not in used here
414
     * @return string
415
     */
416 1
    private function loadDataForMysql(array $fields, string $mode): string
0 ignored issues
show
Unused Code introduced by
The parameter $mode is not used and could be removed.

This check looks from parameters that have been defined for a function or method, but which are not used in the method body.

Loading history...
417
    {
418 1
        $sql ="LOAD DATA LOCAL INFILE '" . $this->csv->getRealPath() . "'
419 1
     REPLACE INTO TABLE {$this->entity}
420
     FIELDS TERMINATED BY '|' ENCLOSED BY '\"' LINES TERMINATED BY '" . PHP_EOL . "'
421 1
     (`" . implode("`, `", $fields) . "`)";
422
        // Run the query if asked
423
        if ($this->pretend === false) {
424 1
            $this->conn->query($sql);
425 1
        }
426
427
        return $sql;
428 1
    }
429
430
431
    /**
432 1
     * Load Data for Postgres (Specific Query)
433
     * @SuppressWarnings("unused") - Used dynamically
434
     * @param  array   $fields
435
     * @param  string  $mode   "update" or "insert" to know if we truncate or not
436
     * @return string
437
     */
438
    private function loadDataForPgsql(array $fields, string $mode): string
439
    {
440
        $fields = implode(', ', $fields);
441
442
        $filename = $this->csv->getRealPath();
443
        if ($this->pretend === false) {
444
            if ($mode === 'update') {
445
                $this->conn->query("TRUNCATE {$this->entity}");
446
            }
447
            $pdo = $this->conn->getWrappedConnection();
448
            $pdo->pgsqlCopyFromFile($this->entity, $filename, '|', '\\\\N', $fields);
0 ignored issues
show
Bug introduced by
The method pgsqlCopyFromFile() does not seem to exist on object<Doctrine\DBAL\Driver\Connection>.

This check looks for calls to methods that do not seem to exist on a given type. It looks for the method on the type itself as well as in inherited classes or implemented interfaces.

This is most likely a typographical error or the method has been renamed.

Loading history...
449
        }
450
451
        $sql = "COPY {$this->entity} ($fields) FROM '{$filename}' ";
452
        $sql.= '... Managed by pgsqlCopyFromFile';
453
454
        return $sql;
455
    }
456
457
458
    /**
459
     * Load Data for SQLServer (Specific Query)
460
     * @SuppressWarnings("unused") - Used dynamically
461
     * @param  array   $fields
462
     * @param  string  $mode   "update" or "insert" to know if we truncate or not
463
     * @return string
464
     */
465
    private function loadDataForSqlsrv(array $fields, string $mode): string
0 ignored issues
show
Unused Code introduced by
The parameter $fields is not used and could be removed.

This check looks from parameters that have been defined for a function or method, but which are not used in the method body.

Loading history...
466
    {
467
        if (substr(gethostbyname($this->conn->getHost()), 0, 3) !== '127') {
468
            throw new NeuralizerException('SQL Server must be on the same host than PHP');
469
        }
470
471
        $sql ="BULK INSERT {$this->entity}
472
     FROM '" . $this->csv->getRealPath() . "' WITH (
473
         FIELDTERMINATOR = '|', DATAFILETYPE = 'widechar', ROWTERMINATOR = '" . PHP_EOL . "'
474
     )";
475
476
        if ($this->pretend === false) {
477
            if ($mode === 'update') {
478
                $this->conn->query("TRUNCATE TABLE {$this->entity}");
479
            }
480
481
            $this->conn->query($sql);
482
        }
483
484
        return $sql;
485
    }
486
}
487