Passed
Pull Request — master (#24)
by Maarten
06:40 queued 03:16
created

DatabaseSynchronizer::feedback()   A

Complexity

Conditions 2
Paths 2

Size

Total Lines 6
Code Lines 4

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
eloc 4
dl 0
loc 6
rs 10
c 0
b 0
f 0
cc 2
nc 2
nop 2
1
<?php
2
3
namespace mtolhuijs\LDS;
4
5
use PDOException;
6
use Illuminate\Support\Facades\DB;
7
use Illuminate\Support\Facades\Schema;
8
use Illuminate\Database\Schema\Blueprint;
9
use Illuminate\Database\ConnectionInterface;
10
use mtolhuijs\LDS\Exceptions\DatabaseConnectionException;
11
12
class DatabaseSynchronizer
13
{
14
    public const DEFAULT_LIMIT = 5000;
15
16
    public $cli;
17
    public $limit = self::DEFAULT_LIMIT;
18
    public $tables;
19
    public $skipTables;
20
    public $from;
21
    public $to;
22
    public $truncate;
23
24
    private $fromDB;
25
    private $toDB;
26
27
    public function __construct(string $from, string $to, $cli = false)
28
    {
29
        $this->from = $from;
30
        $this->to = $to;
31
        $this->cli = $cli;
32
33
        try {
34
            $this->fromDB = DB::connection($this->from);
35
            $this->toDB = DB::connection($this->to);
36
        } catch (\Exception $e) {
37
            throw new DatabaseConnectionException($e->getMessage());
38
        }
39
    }
40
41
    public function setSkipTables(array $skipTables)
42
    {
43
        $this->skipTables = $skipTables;
44
45
        return $this;
46
    }
47
48
    public function setTables(array $tables)
49
    {
50
        $this->tables = $tables;
51
52
        return $this;
53
    }
54
55
    public function setLimit(int $limit)
56
    {
57
        $this->limit = $limit;
58
59
        return $this;
60
    }
61
62
    public function setOptions(array $options)
63
    {
64
        foreach ($options as $option => $value) {
65
            if (! isset($this->{$option})) {
66
                $this->{$option} = $value;
67
            }
68
        }
69
70
        return $this;
71
    }
72
73
    protected function getFromDb(): ConnectionInterface
74
    {
75
        if ($this->fromDB === null) {
76
            $this->fromDB = DB::connection($this->from);
77
        }
78
79
        return $this->fromDB;
80
    }
81
82
    protected function getToDb(): ConnectionInterface
83
    {
84
        if ($this->toDB === null) {
85
            $this->toDB = DB::connection($this->to);
86
        }
87
88
        return $this->toDB;
89
    }
90
91
    public function getTables(): array
92
    {
93
        if (empty($this->tables)) {
94
            $this->tables = $this->getFromDb()->getDoctrineSchemaManager()->listTableNames();
95
        }
96
97
        return array_filter($this->tables, function ($table) {
98
            return ! in_array($table, $this->skipTables, true);
99
        });
100
    }
101
102
    public function run(): void
103
    {
104
        foreach ($this->getTables() as $table) {
105
            $this->feedback(PHP_EOL.PHP_EOL."Table: $table", 'line');
106
107
            if (! Schema::connection($this->from)->hasTable($table)) {
108
                $this->feedback("Table '$table' does not exist in $this->from", 'error');
109
110
                continue;
111
            }
112
113
            $this->syncTable($table);
114
            $this->syncRows($table);
115
        }
116
    }
117
118
    /**
119
     * Check if tables and columns are present
120
     * Create or update them if not.
121
     *
122
     * @param string $table
123
     */
124
    public function syncTable(string $table): void
125
    {
126
        $schema = Schema::connection($this->to);
127
        $columns = Schema::connection($this->from)->getColumnListing($table);
128
129
        if ($schema->hasTable($table)) {
130
            foreach ($columns as $column) {
131
                if ($schema->hasColumn($table, $column)) {
132
                    continue;
133
                }
134
135
                $this->updateTable($table, $column);
136
            }
137
138
            return;
139
        }
140
141
        $this->createTable($table, $columns);
142
    }
143
144
    /**
145
     * Fetch all rows in $this->from and insert or update $this->to.
146
     * @todo need to get the real primary key
147
     * @todo add limit offset setup
148
     * @todo investigate: insert into on duplicate key update
149
     *
150
     * @param string $table
151
     */
152
    public function syncRows(string $table): void
153
    {
154
        $queryColumn = Schema::connection($this->from)->getColumnListing($table)[0];
155
        $statement = $this->prepareForInserts($table);
156
157
        while ($row = $statement->fetch(\PDO::FETCH_OBJ)) {
158
            $exists = $this->getToDb()->table($table)->where($queryColumn, $row->{$queryColumn})->first();
159
160
            if (! $exists) {
161
                $this->getToDb()->table($table)->insert((array) $row);
162
            } else {
163
                $this->getToDb()->table($table)->where($queryColumn, $row->{$queryColumn})->update((array) $row);
164
            }
165
166
            if ($this->cli) {
167
                $this->cli->progressBar->advance();
168
            }
169
        }
170
171
        if ($this->cli) {
172
            $this->cli->progressBar->finish();
173
        }
174
    }
175
176
    /**
177
     * @param string $table
178
     * @return \PDOStatement
179
     */
180
    private function prepareForInserts(string $table)
181
    {
182
        $pdo = $this->getFromDb()->getPdo();
183
        $builder = $this->fromDB->table($table);
184
        $statement = $pdo->prepare($builder->toSql());
185
186
        if (! $statement instanceof \PDOStatement) {
187
            throw new PDOException("Could not prepare PDOStatement for $table");
188
        }
189
190
        $statement->execute($builder->getBindings());
191
        $amount = $statement->rowCount();
192
193
        if ($this->cli) {
194
            if ($amount > 0) {
195
                $this->feedback("Synchronizing '$this->to.$table' rows", 'comment');
196
                $this->cli->progressBar = $this->cli->getOutput()->createProgressBar($amount);
197
            } else {
198
                $this->feedback('No rows...', 'comment');
199
            }
200
        }
201
202
        if ($this->truncate) {
203
            $this->getToDb()->table($table)->truncate();
204
        }
205
206
        return $statement;
207
    }
208
209
    private function createTable(string $table, array $columns): void
210
    {
211
        $this->feedback("Creating '$this->to.$table' table", 'warn');
212
213
        Schema::connection($this->to)->create($table, function (Blueprint $table_bp) use ($table, $columns) {
214
            foreach ($columns as $column) {
215
                $type = Schema::connection($this->from)->getColumnType($table, $column);
216
217
                $table_bp->{$type}($column)->nullable();
218
219
                $this->feedback("Added {$type}('$column')->nullable()");
220
            }
221
        });
222
    }
223
224
    private function updateTable(string $table, string $column): void
225
    {
226
        Schema::connection($this->to)->table($table, function (Blueprint $table_bp) use ($table, $column) {
227
            $type = Schema::connection($this->from)->getColumnType($table, $column);
228
229
            $table_bp->{$type}($column)->nullable();
230
231
            $this->feedback("Added {$type}('$column')->nullable()");
232
        });
233
    }
234
235
    private function feedback(string $msg, $type = 'info'): void
236
    {
237
        if ($this->cli) {
238
            $this->cli->{$type}($msg);
239
        } else {
240
            echo PHP_EOL.$msg.PHP_EOL;
241
        }
242
    }
243
}
244