Passed
Push — master ( 3ccc44...cef086 )
by Emmanuel
02:16 queued 15s
created

DB::doBatchInsert()   A

Complexity

Conditions 1
Paths 1

Size

Total Lines 4
Code Lines 2

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 3
CRAP Score 1

Importance

Changes 0
Metric Value
dl 0
loc 4
ccs 3
cts 3
cp 1
rs 10
c 0
b 0
f 0
cc 1
eloc 2
nc 1
nop 0
crap 1
1
<?php
2
/**
3
 * neuralyzer : Data Anonymization Library and CLI Tool
4
 *
5
 * PHP Version 7.1
6
 *
7
 * @author Emmanuel Dyan
8
 * @author Rémi Sauvat
9
 * @copyright 2018 Emmanuel Dyan
10
 *
11
 * @package edyan/neuralyzer
12
 *
13
 * @license GNU General Public License v2.0
14
 *
15
 * @link https://github.com/edyan/neuralyzer
16
 */
17
18
namespace Edyan\Neuralyzer\Anonymizer;
19
20
use Doctrine\DBAL\Connection;
21
use Doctrine\DBAL\Configuration as DbalConfiguration;
22
use Doctrine\DBAL\DriverManager as DbalDriverManager;
23
use Doctrine\DBAL\Query\QueryBuilder;
24
use Edyan\Neuralyzer\Helper\DB as DBHelper;
25
use Edyan\Neuralyzer\Exception\NeuralizerException;
26
use Edyan\Neuralyzer\Utils\CSVWriter;
27
use Edyan\Neuralyzer\Utils\DBUtils;
28
29
/**
30
 * Implement AbstractAnonymizer for DB, to read and write data via Doctrine DBAL
31
 */
32
class DB extends AbstractAnonymizer
33
{
34
    /**
35
     * Doctrine DB Adapter
36
     * @var Connection
37
     */
38
    private $conn;
39
40
    /**
41
     * A helper for the current driver
42
     * @var DBHelper\AbstractDBHelper
43
     */
44
    private $dbHelper;
45
46
    /**
47
     * Various generic utils
48
     * @var DBUtils
49
     */
50
    private $dbUtils;
51
52
    /**
53
     * Primary Key
54
     * @var string
55
     */
56
    private $priKey;
57
58
    /**
59
     * Define the way we update / insert data
60
     * @var string
61
     */
62
    private $mode = 'queries';
63
64
    /**
65
     * Contains queries if returnRes is true
66
     * @var array
67
     */
68
    private $queries = [];
69
70
    /**
71
     * File resource for the csv (batch mode)
72
     * @var CSVWriter
73
     */
74
    private $csv;
75
76
    /**
77
     * Define available update modes
78
     * @var array
79
     */
80
    private $updateMode = [
81
        'queries' => 'doUpdateByQueries',
82
        'batch' => 'doBatchUpdate'
83
    ];
84
85
    /**
86
     * Define available insert modes
87
     * @var array
88
     */
89
    private $insertMode = [
90
        'queries' => 'doInsertByQueries',
91
        'batch' => 'doBatchInsert'
92
    ];
93
94
    /**
95
     * Init connection
96
     *
97
     * @param array $params   Parameters to send to Doctrine DB
98
     */
99 46
    public function __construct(array $params)
100
    {
101 46
        $dbHelperClass = DBHelper\DriverGuesser::getDBHelper($params['driver']);
102
103
        // Set specific options
104 45
        $params['driverOptions'] = $dbHelperClass::getDriverOptions();
105 45
        $this->conn = DbalDriverManager::getConnection($params, new DbalConfiguration());
106 45
        $this->conn->setFetchMode(\Doctrine\DBAL\FetchMode::ASSOCIATIVE);
107
108 45
        $this->dbUtils = new DBUtils($this->conn);
109 45
        $this->dbHelper = new $dbHelperClass($this->conn);
110 45
    }
111
112
113
    /**
114
     * Get Doctrine Connection
115
     *
116
     * @return Connection
117
     */
118 25
    public function getConn(): Connection
119
    {
120 25
        return $this->conn;
121
    }
122
123
124
    /**
125
     * Set the mode for update / insert
126
     * @param string $mode
127
     * @return DB
128
     */
129 16
    public function setMode(string $mode): DB
130
    {
131 16
        if (!in_array($mode, ['queries', 'batch'])) {
132 1
            throw new NeuralizerException('Mode could be only queries or batch');
133
        }
134
135 15
        if ($mode === 'batch') {
136 9
            $this->csv = new CSVWriter();
137 9
            $this->csv->setCsvControl('|', $this->dbHelper->getEnclosureForCSV());
138
        }
139
140 15
        $this->mode = $mode;
141
142 15
        return $this;
143
    }
144
145
146
    /**
147
     * Process an entity by reading / writing to the DB
148
     *
149
     * @param string        $entity
150
     * @param callable|null $callback
151
     *
152
     * @return void|array
153
     */
154 28
    public function processEntity(string $entity, callable $callback = null): array
155
    {
156 28
        $this->dbUtils->assertTableExists($entity);
157
158 27
        $this->priKey = $this->dbUtils->getPrimaryKey($entity);
159 26
        $this->entityCols = $this->dbUtils->getTableCols($entity);
160 26
        $this->entity = $entity;
161
162 26
        $actionsOnThatEntity = $this->whatToDoWithEntity();
163 24
        $this->queries = [];
164
165
        // Wrap everything in a transaction
166
        try {
167 24
            $this->conn->beginTransaction();
168
169 24
            if ($actionsOnThatEntity & self::TRUNCATE_TABLE) {
170 5
                $where = $this->getWhereConditionInConfig();
171 5
                $query = $this->runDelete($where);
172 4
                ($this->returnRes === true ? array_push($this->queries, $query) : '');
173
            }
174
175 23
            if ($actionsOnThatEntity & self::UPDATE_TABLE) {
176 17
                $this->updateData($callback);
177
            }
178
179 21
            if ($actionsOnThatEntity & self::INSERT_TABLE) {
180 6
                $this->insertData($callback);
181
            }
182
183 20
            $this->conn->commit();
184 4
        } catch (\Exception $e) {
185 4
            $this->conn->rollback();
186 4
            $this->conn->close(); // To avoid locks
187
188 4
            throw $e;
189
        }
190
191 20
        return $this->queries;
192
    }
193
194
195
    /**
196
     * Execute the Delete with Doctrine Query Builder
197
     *
198
     * @param string $where
199
     *
200
     * @return string
201
     */
202 5
    private function runDelete(string $where): string
203
    {
204 5
        $queryBuilder = $this->conn->createQueryBuilder();
205 5
        $queryBuilder = $queryBuilder->delete($this->entity);
206 5
        if (!empty($where)) {
207 3
            $queryBuilder = $queryBuilder->where($where);
208
        }
209 5
        $sql = $queryBuilder->getSQL();
210
211 5
        if ($this->pretend === true) {
212 1
            return $sql;
213
        }
214
215 4
        $queryBuilder->execute();
216
217 3
        return $sql;
218
    }
219
220
221
    /**
222
     * Update data of table
223
     *
224
     * @param  callable $callback
225
     */
226 17
    private function updateData($callback = null): void
227
    {
228 17
        $queryBuilder = $this->conn->createQueryBuilder();
229 17
        if ($this->limit === 0) {
230 9
            $this->setLimit($this->dbUtils->countResults($this->entity));
231
        }
232
233 17
        $startAt = 0; // The first part of the limit (offset)
234 17
        $num = 0; // The number of rows updated
235 17
        while ($num < $this->limit) {
236
            $rows = $queryBuilder
237 16
                        ->select('*')->from($this->entity)
238 16
                        ->setFirstResult($startAt)->setMaxResults($this->batchSize)
239 16
                        ->orderBy($this->priKey)
240 16
                        ->execute();
241
242
            // I need to read line by line if I have to update the table
243
            // to make sure I do update by update (slower but no other choice for now)
244 16
            foreach ($rows as $row) {
245
                // Call the right method according to the mode
246 16
                $this->{$this->updateMode[$this->mode]}($row);
247
248 14
                if (!is_null($callback)) {
249 9
                    $callback(++$num);
250
                }
251
                // Have to exit now as we have reached the max
252 14
                if ($num >= $this->limit) {
253 14
                    break 2;
254
                }
255
            }
256
            // Move the offset
257
            // Make sure the loop ends if we have nothing to process
258 5
            $num = $startAt += $this->batchSize;
259
        }
260
        // Run a final method if defined
261 15
        if ($this->mode === 'batch') {
262 4
            $this->loadDataInBatch('update');
263
        }
264 15
    }
265
266
267
    /**
268
     * Execute the Update with Doctrine QueryBuilder
269
     * @SuppressWarnings("unused") - Used dynamically
270
     * @param  array $row  Full row
271
     */
272 12
    private function doUpdateByQueries(array $row): void
273
    {
274 12
        $data = $this->generateFakeData();
275
276 10
        $queryBuilder = $this->conn->createQueryBuilder();
277 10
        $queryBuilder = $queryBuilder->update($this->entity);
278 10
        foreach ($data as $field => $value) {
279 10
            $value = empty($row[$field]) ? '' : $value;
280 10
            $condition = $this->dbUtils->getCondition($field, $this->entityCols[$field]);
281 10
            $queryBuilder = $queryBuilder->set($field, $condition);
282 10
            $queryBuilder = $queryBuilder->setParameter(":$field", $value);
283
        }
284 10
        $queryBuilder = $queryBuilder->where("{$this->priKey} = :{$this->priKey}");
285 10
        $queryBuilder = $queryBuilder->setParameter(":{$this->priKey}", $row[$this->priKey]);
286
287 10
        $this->returnRes === true ?
288 7
            array_push($this->queries, $this->dbUtils->getRawSQL($queryBuilder)) :
289 3
            '';
290
291 10
        if ($this->pretend === false) {
292 7
            $queryBuilder->execute();
293
        }
294 10
    }
295
296
297
    /**
298
     * Write the line required for a later LOAD DATA (or \copy)
299
     * @SuppressWarnings("unused") - Used dynamically
300
     * @param  array $row  Full row
301
     */
302 4
    private function doBatchUpdate(array $row): void
303
    {
304 4
        $fakeData = $this->generateFakeData();
305 4
        $data = [];
306
        // Go trough all fields, and take a value by priority
307 4
        foreach (array_keys($this->entityCols) as $field) {
308
            // First take the fake data
309 4
            $data[$field] = $row[$field];
310 4
            if (!empty($row[$field]) && array_key_exists($field, $fakeData)) {
311 4
                $data[$field] = $fakeData[$field];
312
            }
313
        }
314
315 4
        $this->csv->write($data);
316 4
    }
317
318
319
    /**
320
     * Insert data into table
321
     * @param  callable $callback
322
     */
323 6
    private function insertData($callback = null): void
324
    {
325 6
        for ($rowNum = 1; $rowNum <= $this->limit; $rowNum++) {
326
            // Call the right method according to the mode
327 6
            $this->{$this->insertMode[$this->mode]}($rowNum);
328
329 6
            if (!is_null($callback)) {
330 4
                $callback($rowNum);
331
            }
332
        }
333
334
        // Run a final method if defined
335 6
        if ($this->mode === 'batch') {
336 2
            $this->loadDataInBatch('insert');
337
        }
338 5
    }
339
340
341
    /**
342
     * Execute an INSERT with Doctrine QueryBuilder
343
     * @SuppressWarnings("unused") - Used dynamically
344
     */
345 4
    private function doInsertByQueries(): void
346
    {
347 4
        $data = $this->generateFakeData();
348
349 4
        $queryBuilder = $this->conn->createQueryBuilder();
350 4
        $queryBuilder = $queryBuilder->insert($this->entity);
351 4
        foreach ($data as $field => $value) {
352 4
            $queryBuilder = $queryBuilder->setValue($field, ":$field");
353 4
            $queryBuilder = $queryBuilder->setParameter(":$field", $value);
354
        }
355
356 4
        $this->returnRes === true ?
357 2
            array_push($this->queries, $this->dbUtils->getRawSQL($queryBuilder)) :
358 2
            '';
359
360 4
        if ($this->pretend === false) {
361 3
            $queryBuilder->execute();
362
        }
363 4
    }
364
365
366
    /**
367
     * Write the line required for a later LOAD DATA (or \copy)
368
     * @SuppressWarnings("unused") - Used dynamically
369
     */
370 2
    private function doBatchInsert(): void
371
    {
372 2
        $data = $this->generateFakeData();
373 2
        $this->csv->write($data);
374 2
    }
375
376
377
    /**
378
     * If a file has been created for the batch mode, destroy it
379
     * @SuppressWarnings("unused") - Used dynamically
380
     * @param string $mode "update" or "insert"
381
     */
382 6
    private function loadDataInBatch(string $mode): void
383
    {
384 6
        $fields = array_keys($this->configEntites[$this->entity]['cols']);
385
        // Replace by all fields if update as we have to load everything
386 6
        if ($mode === 'update') {
387 4
            $fields = array_keys($this->entityCols);
388
        }
389
390
        // Load the data from the helper, only if pretend is false
391 6
        $filename = $this->csv->getRealPath();
392 6
        $this->dbHelper->setPretend($this->pretend);
393 6
        $sql = $this->dbHelper->loadData($this->entity, $filename, $fields, $mode);
394
395 5
        $this->returnRes === true ? array_push($this->queries, $sql) : '';
396
397
        // Destroy the file
398 5
        unlink($this->csv->getRealPath());
399 5
    }
400
}
401