DatabaseSynchronizer::run()   A
last analyzed

Complexity

Conditions 5
Paths 6

Size

Total Lines 19
Code Lines 10

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
eloc 10
dl 0
loc 19
rs 9.6111
c 0
b 0
f 0
cc 5
nc 6
nop 0
1
<?php
2
3
namespace mtolhuijs\LDS;
4
5
use PDOException;
6
use Illuminate\Support\Facades\DB;
7
use Illuminate\Support\Facades\Schema;
8
use Illuminate\Support\Facades\Artisan;
9
use Illuminate\Database\Schema\Blueprint;
10
use Illuminate\Database\ConnectionInterface;
11
use mtolhuijs\LDS\Exceptions\DatabaseConnectionException;
12
13
class DatabaseSynchronizer
14
{
15
    public const DEFAULT_LIMIT = 5000;
16
17
    public $cli;
18
    public $limit = self::DEFAULT_LIMIT;
19
    public $tables;
20
    public $skipTables;
21
    public $migrate;
22
    public $from;
23
    public $to;
24
    public $truncate;
25
26
    private $fromDB;
27
    private $toDB;
28
29
    public function __construct(string $from, string $to, $cli = false)
30
    {
31
        $this->from = $from;
32
        $this->to = $to;
33
        $this->cli = $cli;
34
35
        try {
36
            $this->fromDB = DB::connection($this->from);
37
            $this->toDB = DB::connection($this->to);
38
        } catch (\Exception $e) {
39
            throw new DatabaseConnectionException($e->getMessage());
40
        }
41
    }
42
43
    public function run(): void
44
    {
45
        if ($this->migrate) {
46
            Artisan::call('migrate'.($this->truncate ? ':refresh' : ''), [
47
                '--database' => $this->to,
48
            ]);
49
        }
50
51
        foreach ($this->getTables() as $table) {
52
            $this->feedback(PHP_EOL.PHP_EOL."Table: $table", 'line');
53
54
            if (! Schema::connection($this->from)->hasTable($table)) {
55
                $this->feedback("Table '$table' does not exist in $this->from", 'error');
56
57
                continue;
58
            }
59
60
            $this->syncTable($table);
61
            $this->syncRows($table);
62
        }
63
    }
64
65
    private function createTable(string $table, array $columns): void
66
    {
67
        $this->feedback("Creating '$this->to.$table' table", 'warn');
68
69
        Schema::connection($this->to)->create($table, function (Blueprint $table_bp) use ($table, $columns) {
70
            foreach ($columns as $column) {
71
                $type = Schema::connection($this->from)->getColumnType($table, $column);
72
73
                $table_bp->{$type}($column)->nullable();
74
75
                $this->feedback("Added {$type}('$column')->nullable()");
76
            }
77
        });
78
    }
79
80
    private function updateTable(string $table, string $column): void
81
    {
82
        Schema::connection($this->to)->table($table, function (Blueprint $table_bp) use ($table, $column) {
83
            $type = Schema::connection($this->from)->getColumnType($table, $column);
84
85
            $table_bp->{$type}($column)->nullable();
86
87
            $this->feedback("Added {$type}('$column')->nullable()");
88
        });
89
    }
90
91
    public function setSkipTables(array $skipTables)
92
    {
93
        $this->skipTables = $skipTables;
94
95
        return $this;
96
    }
97
98
    public function setTables(array $tables)
99
    {
100
        $this->tables = $tables;
101
102
        return $this;
103
    }
104
105
    public function setLimit(int $limit)
106
    {
107
        $this->limit = $limit;
108
109
        return $this;
110
    }
111
112
    public function setOptions(array $options)
113
    {
114
        foreach ($options as $option => $value) {
115
            if (! isset($this->{$option})) {
116
                $this->{$option} = $value;
117
            }
118
        }
119
120
        return $this;
121
    }
122
123
    protected function getFromDb(): ConnectionInterface
124
    {
125
        if ($this->fromDB === null) {
126
            $this->fromDB = DB::connection($this->from);
127
        }
128
129
        return $this->fromDB;
130
    }
131
132
    protected function getToDb(): ConnectionInterface
133
    {
134
        if ($this->toDB === null) {
135
            $this->toDB = DB::connection($this->to);
136
        }
137
138
        return $this->toDB;
139
    }
140
141
    public function getTables(): array
142
    {
143
        if (empty($this->tables)) {
144
            $this->tables = $this->getFromDb()->getDoctrineSchemaManager()->listTableNames();
145
        }
146
147
        return array_filter($this->tables, function ($table) {
148
            return ! in_array($table, $this->skipTables, true);
149
        });
150
    }
151
152
    /**
153
     * Check if tables and columns are present
154
     * Create or update them if not.
155
     *
156
     * @param string $table
157
     */
158
    public function syncTable(string $table): void
159
    {
160
        $schema = Schema::connection($this->to);
161
        $columns = Schema::connection($this->from)->getColumnListing($table);
162
163
        if ($schema->hasTable($table)) {
164
            foreach ($columns as $column) {
165
                if ($schema->hasColumn($table, $column)) {
166
                    continue;
167
                }
168
169
                $this->updateTable($table, $column);
170
            }
171
172
            return;
173
        }
174
175
        $this->createTable($table, $columns);
176
    }
177
178
    /**
179
     * Fetch all rows in $this->from and insert or update $this->to.
180
     * @todo need to get the real primary key
181
     * @todo add limit offset setup
182
     * @todo investigate: insert into on duplicate key update
183
     *
184
     * @param string $table
185
     */
186
    public function syncRows(string $table): void
187
    {
188
        $queryColumn = Schema::connection($this->from)->getColumnListing($table)[0];
189
        $statement = $this->prepareForInserts($table);
190
191
        while ($row = $statement->fetch(\PDO::FETCH_OBJ)) {
192
            $exists = $this->getToDb()->table($table)->where($queryColumn, $row->{$queryColumn})->first();
193
194
            if (! $exists) {
195
                $this->getToDb()->table($table)->insert((array) $row);
196
            } else {
197
                $this->getToDb()->table($table)->where($queryColumn, $row->{$queryColumn})->update((array) $row);
198
            }
199
200
            if ($this->cli) {
201
                $this->cli->progressBar->advance();
202
            }
203
        }
204
205
        if ($this->cli) {
206
            $this->cli->progressBar->finish();
207
        }
208
    }
209
210
    /**
211
     * @param string $table
212
     * @return \PDOStatement
213
     */
214
    private function prepareForInserts(string $table): \PDOStatement
215
    {
216
        $pdo = $this->getFromDb()->getPdo();
217
        $builder = $this->fromDB->table($table);
218
        $statement = $pdo->prepare($builder->toSql());
219
220
        if (! $statement instanceof \PDOStatement) {
221
            throw new PDOException("Could not prepare PDOStatement for $table");
222
        }
223
224
        $statement->execute($builder->getBindings());
225
        $amount = $statement->rowCount();
226
227
        if ($this->cli) {
228
            if ($amount > 0) {
229
                $this->feedback("Synchronizing '$this->to.$table' rows", 'comment');
230
                $this->cli->progressBar = $this->cli->getOutput()->createProgressBar($amount);
231
            } else {
232
                $this->feedback('No rows...', 'comment');
233
            }
234
        }
235
236
        if ($this->truncate) {
237
            $this->getToDb()->table($table)->truncate();
238
        }
239
240
        return $statement;
241
    }
242
243
    private function feedback(string $msg, $type = 'info'): void
244
    {
245
        if ($this->cli) {
246
            $this->cli->{$type}($msg);
247
        } else {
248
            echo PHP_EOL.$msg.PHP_EOL;
249
        }
250
    }
251
}
252