1
|
|
|
<?php |
2
|
|
|
namespace Fwlib\Db; |
3
|
|
|
|
4
|
|
|
use Fwlib\Bridge\Adodb; |
5
|
|
|
use Fwlib\Util\UtilContainerAwareTrait; |
6
|
|
|
|
7
|
|
|
/** |
8
|
|
|
* Sync data between 2 db source with same schema |
9
|
|
|
* |
10
|
|
|
* Support one-way sync only. |
11
|
|
|
* |
12
|
|
|
* Sync define is an array, which key is source table, and value is dest table |
13
|
|
|
* or array of it. Sync is based on timestamp column in source table, on db |
14
|
|
|
* table should have at most 1 timestamp column, so use source table name as |
15
|
|
|
* key of define array is fine. Eg: |
16
|
|
|
* |
17
|
|
|
* { |
18
|
|
|
* tableSource1: tableDest1, |
19
|
|
|
* tableSource2: [tableDest2a, tableDest2b], |
20
|
|
|
* } |
21
|
|
|
* |
22
|
|
|
* By default data from source is directly write to dest, but you can do some |
23
|
|
|
* convert by define method convertData[TableSource]To[TableDest](), it's |
24
|
|
|
* parameter is data array retrieved from source, and return value should be |
25
|
|
|
* data array to write to dest. These convert method will automatic be called |
26
|
|
|
* if exists and fit source/dest table name. |
27
|
|
|
* |
28
|
|
|
* When sync job is done for a table, the latest timestamp will save in record |
29
|
|
|
* table in dest db, next time sync job will start from this timestamp. |
30
|
|
|
* |
31
|
|
|
* Avoid concurrence run by file lock. |
32
|
|
|
* @link http://stackoverflow.com/questions/16048648 |
33
|
|
|
* |
34
|
|
|
* @copyright Copyright 2008-2015 Fwolf |
35
|
|
|
* @license http://www.gnu.org/licenses/lgpl.html LGPL-3.0+ |
36
|
|
|
*/ |
37
|
|
|
class SyncDbData |
38
|
|
|
{ |
39
|
|
|
use UtilContainerAwareTrait; |
40
|
|
|
|
41
|
|
|
|
42
|
|
|
/** |
43
|
|
|
* Number of rows have processed |
44
|
|
|
* |
45
|
|
|
* Shared by syncDelete() and syncOneWay(). |
46
|
|
|
* |
47
|
|
|
* @var integer |
48
|
|
|
*/ |
49
|
|
|
protected $batchDone = 0; |
50
|
|
|
|
51
|
|
|
/** |
52
|
|
|
* Maximum rows to process per run |
53
|
|
|
* |
54
|
|
|
* If dest table is array, the actual rows synced may exceed this limit. |
55
|
|
|
* |
56
|
|
|
* @var integer |
57
|
|
|
*/ |
58
|
|
|
public $batchSize = 1000; |
59
|
|
|
|
60
|
|
|
/** |
61
|
|
|
* Source db connection |
62
|
|
|
* |
63
|
|
|
* @var Adodb |
64
|
|
|
*/ |
65
|
|
|
protected $dbSource = null; |
66
|
|
|
|
67
|
|
|
/** |
68
|
|
|
* Destination db connection |
69
|
|
|
* |
70
|
|
|
* @var Adodb |
71
|
|
|
*/ |
72
|
|
|
protected $dbDest = null; |
73
|
|
|
|
74
|
|
|
/** |
75
|
|
|
* Lock file handle |
76
|
|
|
* |
77
|
|
|
* @var resource |
78
|
|
|
*/ |
79
|
|
|
protected $lock = null; |
80
|
|
|
|
81
|
|
|
/** |
82
|
|
|
* Lock file to avoid concurrence run |
83
|
|
|
* |
84
|
|
|
* @var string |
85
|
|
|
*/ |
86
|
|
|
public $lockFile = 'sync-db-data.lock'; |
87
|
|
|
|
88
|
|
|
/** |
89
|
|
|
* Log message array |
90
|
|
|
* |
91
|
|
|
* @var array |
92
|
|
|
*/ |
93
|
|
|
public $logMessage = []; |
94
|
|
|
|
95
|
|
|
/** |
96
|
|
|
* Name of record table |
97
|
|
|
* |
98
|
|
|
* @var string |
99
|
|
|
*/ |
100
|
|
|
public $tableRecord = 'sync_db_data_record'; |
101
|
|
|
|
102
|
|
|
/** |
103
|
|
|
* Output all log message directly |
104
|
|
|
* |
105
|
|
|
* @var boolean |
106
|
|
|
*/ |
107
|
|
|
public $verbose = false; |
108
|
|
|
|
109
|
|
|
|
110
|
|
|
/** |
111
|
|
|
* Constructor |
112
|
|
|
*/ |
113
|
|
|
public function __construct() |
114
|
|
|
{ |
115
|
|
|
$this->log('======== ' . date('Y-m-d H:i:s') . ' ========'); |
116
|
|
|
|
117
|
|
|
try { |
118
|
|
|
$this->createLock($this->lockFile); |
119
|
|
|
|
120
|
|
|
} catch (\Exception $e) { |
121
|
|
|
$message = "Aborted: {$e->getMessage()}"; |
122
|
|
|
|
123
|
|
|
$this->log($message); |
124
|
|
|
|
125
|
|
|
throw new \Exception($message); |
126
|
|
|
} |
127
|
|
|
} |
128
|
|
|
|
129
|
|
|
|
130
|
|
|
/** |
131
|
|
|
* Destructor |
132
|
|
|
*/ |
133
|
|
|
public function __destruct() |
134
|
|
|
{ |
135
|
|
|
$this->releaseLock(); |
136
|
|
|
} |
137
|
|
|
|
138
|
|
|
|
139
|
|
|
/** |
140
|
|
|
* Check and create record table if not exists |
141
|
|
|
* |
142
|
|
|
* @param Adodb $db |
143
|
|
|
* @param string $table |
144
|
|
|
*/ |
145
|
|
|
protected function checkTableRecord($db, $table) |
146
|
|
|
{ |
147
|
|
|
if ($db->isTableExist($table)) { |
148
|
|
|
$this->log("Record table $table already exists."); |
149
|
|
|
|
150
|
|
|
return; |
151
|
|
|
} |
152
|
|
|
|
153
|
|
|
// @codeCoverageIgnoreStart |
154
|
|
|
try { |
155
|
|
|
// Table doesn't exist, create it |
156
|
|
|
// SQL for Create table diffs from several db |
157
|
|
|
|
158
|
|
|
if ($db->isDbSybase()) { |
159
|
|
|
// Sybase index was created separated |
160
|
|
|
$db->Execute( |
161
|
|
|
"CREATE TABLE $table ( |
162
|
|
|
uuid CHAR(25) NOT NULL, |
163
|
|
|
db_prof VARCHAR(50) NOT NULL, |
164
|
|
|
tbl_title VARCHAR(50) NOT NULL, |
165
|
|
|
/* Timestamp remembered, for next round */ |
166
|
|
|
last_ts VARCHAR(50) NOT NULL, |
167
|
|
|
/* Timestamp for this table */ |
168
|
|
|
/* In sybase 'timestamp' must be lower cased */ |
169
|
|
|
ts timestamp NOT NULL, |
170
|
|
|
constraint PK_$table PRIMARY KEY (uuid) |
171
|
|
|
)" |
172
|
|
|
); |
173
|
|
|
$db->Execute( |
174
|
|
|
"CREATE INDEX idx_{$table}_1 ON |
175
|
|
|
$table (db_prof, tbl_title) |
176
|
|
|
" |
177
|
|
|
); |
178
|
|
|
|
179
|
|
|
} elseif ($db->isDbMysql()) { |
180
|
|
|
// ENGINE=InnoDB DEFAULT CHARSET=utf8 COLLATE=utf8_general_ci |
181
|
|
|
$db->Execute( |
182
|
|
|
"CREATE TABLE $table ( |
183
|
|
|
uuid CHAR(36) NOT NULL, |
184
|
|
|
db_prof VARCHAR(50) NOT NULL, |
185
|
|
|
tbl_title VARCHAR(50) NOT NULL, |
186
|
|
|
/* Timestamp remembered, for next round */ |
187
|
|
|
last_ts VARCHAR(50) NOT NULL, |
188
|
|
|
/* Timestamp for this table */ |
189
|
|
|
ts TIMESTAMP NOT NULL |
190
|
|
|
DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP, |
191
|
|
|
PRIMARY KEY (uuid), |
192
|
|
|
INDEX idx_{$table}_1 (db_prof, tbl_title) |
193
|
|
|
);" |
194
|
|
|
); |
195
|
|
|
|
196
|
|
|
} else { |
197
|
|
|
throw new \Exception('Create table SQL not implemented.'); |
198
|
|
|
} |
199
|
|
|
|
200
|
|
|
$this->log("Record table $table doesn't exist, create it, done."); |
201
|
|
|
|
202
|
|
|
} catch (\Exception $e) { |
203
|
|
|
$message = "Record table $table doesn't exists and create fail: " . |
204
|
|
|
$e->getMessage(); |
205
|
|
|
|
206
|
|
|
$this->log($message); |
207
|
|
|
|
208
|
|
|
throw new \Exception($message); |
209
|
|
|
} |
210
|
|
|
// @codeCoverageIgnoreEnd |
211
|
|
|
} |
212
|
|
|
|
213
|
|
|
|
214
|
|
|
/** |
215
|
|
|
* Create lock using lockFile |
216
|
|
|
* |
217
|
|
|
* @param string $lockFile |
218
|
|
|
*/ |
219
|
|
|
protected function createLock($lockFile) |
220
|
|
|
{ |
221
|
|
|
$lockFile = sys_get_temp_dir() . "/$lockFile"; |
222
|
|
|
$lock = fopen($lockFile, 'w+'); |
223
|
|
|
|
224
|
|
|
// LOCK_NB make flock not blocking when obtain LOCK_EX fail |
225
|
|
|
if (!flock($lock, LOCK_EX | LOCK_NB)) { |
226
|
|
|
throw new \Exception('Lock file check failed.'); |
227
|
|
|
} |
228
|
|
|
|
229
|
|
|
// Keep lockFile info for release later |
230
|
|
|
$this->lock = $lock; |
231
|
|
|
$this->lockFile = $lockFile; |
232
|
|
|
} |
233
|
|
|
|
234
|
|
|
|
235
|
|
|
/** |
236
|
|
|
* Generate an UUID |
237
|
|
|
* |
238
|
|
|
* The UUID is PK in db record table. |
239
|
|
|
* |
240
|
|
|
* @return string |
241
|
|
|
*/ |
242
|
|
|
protected function generateUuid() |
243
|
|
|
{ |
244
|
|
|
return $this->getUtilContainer()->getUuidBase36()->generate(); |
245
|
|
|
} |
246
|
|
|
|
247
|
|
|
|
248
|
|
|
/** |
249
|
|
|
* Get last timestamp remembered |
250
|
|
|
* |
251
|
|
|
* @param $dbDest |
252
|
|
|
* @param $table Table name in source db |
253
|
|
|
* @return string Return null if no last_ts remembered |
254
|
|
|
*/ |
255
|
|
View Code Duplication |
protected function getLastTimestamp($dbDest, $table) |
|
|
|
|
256
|
|
|
{ |
257
|
|
|
$dbProf = $this->getDbSourceProfileString(); |
258
|
|
|
|
259
|
|
|
$rs = $dbDest->execute( |
260
|
|
|
[ |
261
|
|
|
'SELECT' => 'last_ts', |
262
|
|
|
'FROM' => $this->tableRecord, |
263
|
|
|
'WHERE' => [ |
264
|
|
|
"db_prof = '{$dbProf}'", |
265
|
|
|
"tbl_title = '$table'", |
266
|
|
|
], |
267
|
|
|
'LIMIT' => 1, |
268
|
|
|
] |
269
|
|
|
); |
270
|
|
|
|
271
|
|
|
if (0 < $rs->RowCount()) { |
272
|
|
|
return $rs->fields['last_ts']; |
273
|
|
|
|
274
|
|
|
} else { |
275
|
|
|
return null; |
276
|
|
|
} |
277
|
|
|
} |
278
|
|
|
|
279
|
|
|
|
280
|
|
|
/** |
281
|
|
|
* Get profile string of db source |
282
|
|
|
* |
283
|
|
|
* @return string |
284
|
|
|
*/ |
285
|
|
|
protected function getDbSourceProfileString() |
286
|
|
|
{ |
287
|
|
|
return $this->dbSource->getProfileString(); |
288
|
|
|
} |
289
|
|
|
|
290
|
|
|
|
291
|
|
|
/** |
292
|
|
|
* Save or output log message |
293
|
|
|
* |
294
|
|
|
* @param string $msg |
295
|
|
|
* @see $verbose |
296
|
|
|
*/ |
297
|
|
|
protected function log($msg) |
298
|
|
|
{ |
299
|
|
|
$this->logMessage[] = $msg; |
300
|
|
|
|
301
|
|
|
// @codeCoverageIgnoreStart |
302
|
|
|
if ($this->verbose) { |
303
|
|
|
$this->getUtilContainer()->getEnv()->ecl($msg); |
304
|
|
|
} |
305
|
|
|
// @codeCoverageIgnoreEnd |
306
|
|
|
} |
307
|
|
|
|
308
|
|
|
|
309
|
|
|
/** |
310
|
|
|
* Release lock used lock file |
311
|
|
|
* |
312
|
|
|
* @param boolean $deleteLockFile |
313
|
|
|
*/ |
314
|
|
|
protected function releaseLock($deleteLockFile = true) |
315
|
|
|
{ |
316
|
|
|
flock($this->lock, LOCK_UN); |
317
|
|
|
|
318
|
|
|
fclose($this->lock); |
319
|
|
|
|
320
|
|
|
if ($deleteLockFile) { |
321
|
|
|
unlink($this->lockFile); |
322
|
|
|
} |
323
|
|
|
} |
324
|
|
|
|
325
|
|
|
|
326
|
|
|
/** |
327
|
|
|
* Set source and dest db connection |
328
|
|
|
* |
329
|
|
|
* @param array|Adodb $source |
330
|
|
|
* @param array|Adodb $dest |
331
|
|
|
*/ |
332
|
|
|
public function setDb($source, $dest) |
333
|
|
|
{ |
334
|
|
|
foreach (['dbSource' => $source, 'dbDest' => $dest] as $k => $v) { |
335
|
|
|
if (is_array($v)) { |
336
|
|
|
// Param is profile, new db and connect |
337
|
|
|
$this->$k = new Adodb($v); |
338
|
|
|
$this->$k->connect(); |
339
|
|
|
|
340
|
|
|
} else { |
341
|
|
|
// Param is connected object |
342
|
|
|
$this->$k = $v; |
343
|
|
|
} |
344
|
|
|
} |
345
|
|
|
|
346
|
|
|
$this->checkTableRecord($this->dbDest, $this->tableRecord); |
347
|
|
|
} |
348
|
|
|
|
349
|
|
|
|
350
|
|
|
/** |
351
|
|
|
* Record last timestamp in dest db, for next round |
352
|
|
|
* |
353
|
|
|
* @param $dbDest |
354
|
|
|
* @param $table Table name in source db |
355
|
|
|
* @param $timestamp |
356
|
|
|
*/ |
357
|
|
|
protected function setLastTimestamp($dbDest, $table, $timestamp) |
358
|
|
|
{ |
359
|
|
|
$dbProf = $this->getDbSourceProfileString(); |
360
|
|
|
|
361
|
|
|
try { |
362
|
|
|
$timestampOld = $this->getLastTimestamp($dbDest, $table); |
363
|
|
|
|
364
|
|
|
// UPDATE if previous recorded, or INSERT |
365
|
|
|
if (empty($timestampOld)) { |
366
|
|
|
$dbDest->execute( |
367
|
|
|
[ |
368
|
|
|
'INSERT' => $this->tableRecord, |
369
|
|
|
'VALUES' => [ |
370
|
|
|
'uuid' => $this->generateUuid(), |
371
|
|
|
'db_prof' => $dbProf, |
372
|
|
|
'tbl_title' => $table, |
373
|
|
|
'last_ts' => $timestamp |
374
|
|
|
], |
375
|
|
|
] |
376
|
|
|
); |
377
|
|
|
} else { |
378
|
|
|
$dbDest->execute( |
379
|
|
|
[ |
380
|
|
|
'UPDATE' => $this->tableRecord, |
381
|
|
|
'SET' => ['last_ts' => $timestamp], |
382
|
|
|
'WHERE' => [ |
383
|
|
|
"db_prof = '$dbProf'", |
384
|
|
|
"tbl_title = '$table'", |
385
|
|
|
], |
386
|
|
|
'LIMIT' => 1, |
387
|
|
|
] |
388
|
|
|
); |
389
|
|
|
} |
390
|
|
|
|
391
|
|
|
} catch (\Exception $e) { |
392
|
|
|
// @codeCoverageIgnoreStart |
393
|
|
|
$message = "Record timestamp of $table fail: {$e->getMessage()}"; |
394
|
|
|
|
395
|
|
|
$this->log($message); |
396
|
|
|
|
397
|
|
|
throw new \Exception($message); |
398
|
|
|
// @codeCoverageIgnoreEnd |
399
|
|
|
} |
400
|
|
|
} |
401
|
|
|
|
402
|
|
|
|
403
|
|
|
/** |
404
|
|
|
* Sync for DELETE |
405
|
|
|
* |
406
|
|
|
* If data had been deleted from source, delete them from dest too. |
407
|
|
|
* |
408
|
|
|
* CAUTION: This may delete data in dest not come from source by sync. |
409
|
|
|
* |
410
|
|
|
* @param array &$config |
411
|
|
|
* @return integer Rows deleted |
412
|
|
|
*/ |
413
|
|
|
public function syncDelete(&$config) |
414
|
|
|
{ |
415
|
|
|
// syncOneWay() should run before syncDelete(), and if it's not fully |
416
|
|
|
// complete in this round, syncDelete() should wait for next round. |
417
|
|
|
if ($this->batchDone >= $this->batchSize) { |
418
|
|
|
$this->log('Wait for syncOneWay() fully complete, try next round.'); |
419
|
|
|
return 0; |
420
|
|
|
} |
421
|
|
|
|
422
|
|
|
|
423
|
|
|
$queryCountBeforeSync = $this->dbSource->getQueryCount() + |
424
|
|
|
$this->dbDest->getQueryCount(); |
425
|
|
|
$rowsDeleted = 0; |
426
|
|
|
|
427
|
|
|
foreach ($config as $tableSource => $tableDest) { |
428
|
|
|
if ($this->batchDone >= $this->batchSize) { |
429
|
|
|
$this->log("Reach batchSize limit {$this->batchSize}."); |
430
|
|
|
break; |
431
|
|
|
} |
432
|
|
|
|
433
|
|
|
$i = $this->syncDeleteTable($tableSource, $tableDest); |
434
|
|
|
|
435
|
|
|
$this->batchDone += $i; |
436
|
|
|
$rowsDeleted += $i; |
437
|
|
|
} |
438
|
|
|
|
439
|
|
|
$queryCount = $this->dbSource->getQueryCount() + |
440
|
|
|
$this->dbDest->getQueryCount() - $queryCountBeforeSync; |
441
|
|
|
$this->log( |
442
|
|
|
"syncDelete() done, total {$rowsDeleted} rows deleted," . |
443
|
|
|
" db query $queryCount times.\n" |
444
|
|
|
); |
445
|
|
|
|
446
|
|
|
return $rowsDeleted; |
447
|
|
|
} |
448
|
|
|
|
449
|
|
|
|
450
|
|
|
/** |
451
|
|
|
* Sync for delete, single source table |
452
|
|
|
* |
453
|
|
|
* $tableDest can be array of dest table. |
454
|
|
|
* |
455
|
|
|
* @param string $tableSource |
456
|
|
|
* @param string|array $tableDest |
457
|
|
|
* @return integer Number of rows deleted on destination db. |
458
|
|
|
*/ |
459
|
|
|
protected function syncDeleteTable($tableSource, $tableDest) |
460
|
|
|
{ |
461
|
|
View Code Duplication |
if (is_array($tableDest)) { |
|
|
|
|
462
|
|
|
$i = 0; |
463
|
|
|
foreach ($tableDest as $v) { |
464
|
|
|
$i += $this->syncDeleteTable($tableSource, $v); |
465
|
|
|
} |
466
|
|
|
|
467
|
|
|
return $i; |
468
|
|
|
} |
469
|
|
|
|
470
|
|
|
|
471
|
|
|
// If fewer rows in dest, need not do sync |
472
|
|
|
$iSource = $this->dbSource->getRowCount($tableSource); |
473
|
|
|
$iDest = $this->dbDest->getRowCount($tableDest); |
474
|
|
|
if ($iSource >= $iDest) { |
475
|
|
|
return 0; |
476
|
|
|
} |
477
|
|
|
|
478
|
|
|
|
479
|
|
|
$log = "syncDelete() check: $tableSource($iSource) <- $tableDest($iDest)"; |
480
|
|
|
|
481
|
|
|
// Find unnecessary PK in dest using compareData[Source]To[Dest](), it |
482
|
|
|
// should return array of PK for rows to delete in dest db. If PK in |
483
|
|
|
// dest table has multiple column, the PK value is array of these |
484
|
|
|
// columns, and the order of these column should same as db schema. |
485
|
|
|
$stringUtil = $this->getUtilContainer()->getString(); |
486
|
|
|
$compareFunc = 'compareData' . $stringUtil->toStudlyCaps($tableSource) |
487
|
|
|
. 'To' . $stringUtil->toStudlyCaps($tableDest); |
488
|
|
|
|
489
|
|
|
if (!method_exists($this, $compareFunc)) { |
490
|
|
|
$message = "Compare method needed: $tableSource to $tableDest."; |
491
|
|
|
|
492
|
|
|
$this->log($message); |
493
|
|
|
|
494
|
|
|
throw new \Exception($message); |
495
|
|
|
|
496
|
|
|
} else { |
497
|
|
|
$pkToDel = $this->$compareFunc(); |
498
|
|
|
|
499
|
|
|
if (empty($pkToDel)) { |
500
|
|
|
return 0; |
501
|
|
|
|
502
|
|
|
} else { |
503
|
|
|
$pkToDel = array_slice( |
504
|
|
|
$pkToDel, |
505
|
|
|
0, |
506
|
|
|
$this->batchSize - $this->batchDone |
507
|
|
|
); |
508
|
|
|
$this->dbDest->convertEncodingResult($pkToDel); |
509
|
|
|
|
510
|
|
|
// Read PK from dest db |
511
|
|
|
$pk = $this->dbDest->getMetaPrimaryKey($tableDest); |
512
|
|
|
// @codeCoverageIgnoreStart |
513
|
|
|
if (empty($pk)) { |
514
|
|
|
throw new \Exception( |
515
|
|
|
"syncDelete() need table $tableDest have PK." |
516
|
|
|
); |
517
|
|
|
} |
518
|
|
|
// @codeCoverageIgnoreEnd |
519
|
|
|
if (!is_array($pk)) { |
520
|
|
|
$pk = [$pk]; |
521
|
|
|
} |
522
|
|
|
|
523
|
|
|
// Generate SQL config |
524
|
|
|
$sqlConfig = [ |
525
|
|
|
'DELETE' => $tableDest, |
526
|
|
|
'LIMIT' => 1, |
527
|
|
|
]; |
528
|
|
|
foreach ($pk as $key) { |
529
|
|
|
$sqlConfig['WHERE'][] = "$key = " |
530
|
|
|
. $this->dbDest->Param($key); |
531
|
|
|
} |
532
|
|
|
|
533
|
|
|
// Execute SQL |
534
|
|
|
$rs = $this->dbDest->executePrepare($sqlConfig, $pkToDel); |
|
|
|
|
535
|
|
|
if (!$rs) { |
536
|
|
|
// DELETE SQL should not error |
537
|
|
|
// @codeCoverageIgnoreStart |
538
|
|
|
$message = "Error when execute DELETE SQL on $tableDest."; |
539
|
|
|
$this->log($message); |
540
|
|
|
return 0; |
541
|
|
|
// @codeCoverageIgnoreEnd |
542
|
|
|
|
543
|
|
|
} else { |
544
|
|
|
$i = count($pkToDel); |
545
|
|
|
$log .= ", $i rows deleted."; |
546
|
|
|
$this->log($log); |
547
|
|
|
return $i; |
548
|
|
|
} |
549
|
|
|
} |
550
|
|
|
} |
551
|
|
|
} |
552
|
|
|
|
553
|
|
|
|
554
|
|
|
/** |
555
|
|
|
* One-way sync for INSERT/UPDATE |
556
|
|
|
* |
557
|
|
|
* tableInDest can be array of table name, means tableInSource's data will |
558
|
|
|
* sync to more than 1 table in dest db. |
559
|
|
|
* |
560
|
|
|
* @param array &$config |
561
|
|
|
* @return integer Rows synced, count from dest db |
562
|
|
|
*/ |
563
|
|
|
public function syncOneWay(&$config) |
564
|
|
|
{ |
565
|
|
|
$queryCountBeforeSync = $this->dbSource->getQueryCount() + |
566
|
|
|
$this->dbDest->getQueryCount(); |
567
|
|
|
$rowsSynced = 0; |
568
|
|
|
|
569
|
|
|
foreach ($config as $tblSource => $tblDest) { |
570
|
|
|
if ($this->batchDone >= $this->batchSize) { |
571
|
|
|
$this->log("Reach batchSize limit {$this->batchSize}."); |
572
|
|
|
break; |
573
|
|
|
} |
574
|
|
|
|
575
|
|
|
$i = $this->syncOneWayTable($tblSource, $tblDest); |
576
|
|
|
|
577
|
|
|
$this->batchDone += $i; |
578
|
|
|
$rowsSynced += $i; |
579
|
|
|
} |
580
|
|
|
|
581
|
|
|
$queryCount = $this->dbSource->getQueryCount() + |
582
|
|
|
$this->dbDest->getQueryCount() - $queryCountBeforeSync; |
583
|
|
|
$this->log( |
584
|
|
|
"syncOneWay() done, total {$rowsSynced} rows synced," . |
585
|
|
|
" db query $queryCount times.\n" |
586
|
|
|
); |
587
|
|
|
|
588
|
|
|
return $rowsSynced; |
589
|
|
|
} |
590
|
|
|
|
591
|
|
|
|
592
|
|
|
/** |
593
|
|
|
* One-way sync for INSERT/UPDATE, single source table |
594
|
|
|
* |
595
|
|
|
* @param string $tableSource |
596
|
|
|
* @param mixed $tableDest |
597
|
|
|
* @return integer Number of rows synced in source db. |
598
|
|
|
*/ |
599
|
|
|
protected function syncOneWayTable($tableSource, $tableDest) |
600
|
|
|
{ |
601
|
|
View Code Duplication |
if (is_array($tableDest)) { |
|
|
|
|
602
|
|
|
$i = 0; |
603
|
|
|
foreach ($tableDest as $v) { |
604
|
|
|
$i += $this->syncOneWayTable($tableSource, $v); |
605
|
|
|
} |
606
|
|
|
|
607
|
|
|
return $i; |
608
|
|
|
} |
609
|
|
|
|
610
|
|
|
|
611
|
|
|
$timestamp = $this->getLastTimestamp($this->dbDest, $tableSource); |
612
|
|
|
$timestampColumn = $this->dbSource->getMetaTimestamp($tableSource); |
613
|
|
|
if (empty($timestampColumn)) { |
614
|
|
|
$message = "Table $tableSource in source db hasn't timestamp column."; |
615
|
|
|
$this->log($message); |
616
|
|
|
throw new \Exception($message); |
617
|
|
|
} |
618
|
|
|
|
619
|
|
|
|
620
|
|
|
// Retrieve data from source db |
621
|
|
|
$sqlConfig = [ |
622
|
|
|
'SELECT' => '*', |
623
|
|
|
'FROM' => $tableSource, |
624
|
|
|
'ORDERBY' => "$timestampColumn ASC", |
625
|
|
|
]; |
626
|
|
|
if (!empty($timestamp)) { |
627
|
|
|
$timestamp = $this->dbSource->quoteValue( |
628
|
|
|
$tableSource, |
629
|
|
|
$timestampColumn, |
630
|
|
|
$timestamp |
631
|
|
|
); |
632
|
|
|
|
633
|
|
|
// Some db's timestamp have duplicate value, need to use '>=' to |
634
|
|
|
// avoid some rows been skipped. But if N rows have same ts, and |
635
|
|
|
// N > $this->batchSize, it will be endless loop, so use '>' when |
636
|
|
|
// possible by db type. |
637
|
|
|
// @codeCoverageIgnoreStart |
638
|
|
|
if ($this->dbSource->isTimestampUnique()) { |
639
|
|
|
$sqlConfig['WHERE'] = "$timestampColumn > $timestamp"; |
640
|
|
|
} else { |
641
|
|
|
$sqlConfig['WHERE'] = "$timestampColumn >= $timestamp"; |
642
|
|
|
} |
643
|
|
|
// @codeCoverageIgnoreEnd |
644
|
|
|
} |
645
|
|
|
$sql = $this->dbSource->generateSql($sqlConfig); |
646
|
|
|
$rs = $this->dbSource->SelectLimit($sql, $this->batchSize - $this->batchDone); |
|
|
|
|
647
|
|
|
|
648
|
|
|
|
649
|
|
|
if (empty($rs) || 0 >= $rs->RowCount()) { |
650
|
|
|
// @codeCoverageIgnoreStart |
651
|
|
|
return 0; |
652
|
|
|
// @codeCoverageIgnoreEnd |
653
|
|
|
|
654
|
|
|
} else { |
655
|
|
|
// Got data, prepare |
656
|
|
|
$dataSource = []; |
657
|
|
|
$lastTimestamp = ''; |
658
|
|
|
|
659
|
|
|
while (!$rs->EOF) { |
660
|
|
|
$ar = $rs->FetchRow(); |
661
|
|
|
|
662
|
|
|
// Sybase timestamp is binary format, need convert to string |
663
|
|
|
// @codeCoverageIgnoreStart |
664
|
|
|
if ($this->dbSource->isDbSybase()) { |
665
|
|
|
$ar[$timestampColumn] = bin2hex($ar[$timestampColumn]); |
666
|
|
|
} |
667
|
|
|
// @codeCoverageIgnoreEnd |
668
|
|
|
|
669
|
|
|
// Remember timestamp, the last one will write to record table later |
670
|
|
|
$lastTimestamp = strval($ar[$timestampColumn]); |
671
|
|
|
|
672
|
|
|
$dataSource[] = $ar; |
673
|
|
|
} |
674
|
|
|
$dataSource = $this->dbSource->convertEncodingResult($dataSource); |
675
|
|
|
|
676
|
|
|
|
677
|
|
|
$rowsSynced = 0; |
678
|
|
|
$stringUtil = $this->getUtilContainer()->getString(); |
679
|
|
|
foreach ((array)$tableDest as $table) { |
680
|
|
|
// Call data convert method |
681
|
|
|
$convertFunc = 'convertData' . $stringUtil->toStudlyCaps($tableSource) |
682
|
|
|
. 'To' . $stringUtil->toStudlyCaps($table); |
683
|
|
|
|
684
|
|
|
$dataDest = []; |
685
|
|
|
if (method_exists($this, $convertFunc)) { |
686
|
|
|
// Convert data from source db to data for destination db. |
687
|
|
|
// If convert method return empty, will skip this row. |
688
|
|
|
foreach ($dataSource as &$row) { |
|
|
|
|
689
|
|
|
$ar = $this->$convertFunc($row); |
690
|
|
|
if (!empty($ar)) { |
691
|
|
|
$dataDest[] = $ar; |
692
|
|
|
} |
693
|
|
|
} |
694
|
|
|
unset($row); |
695
|
|
|
|
696
|
|
|
} else { |
697
|
|
|
$dataDest = &$dataSource; |
698
|
|
|
} |
699
|
|
|
|
700
|
|
|
|
701
|
|
|
// Write data to dest db |
702
|
|
|
if (!empty($dataDest)) { |
703
|
|
|
$rowsSynced += count($dataDest); |
704
|
|
|
|
705
|
|
|
// Row maybe UPDATE or INSERT, so can't use fast prepare |
706
|
|
|
foreach ($dataDest as &$row) { |
|
|
|
|
707
|
|
|
$this->dbDest->write($table, $row); |
708
|
|
|
} |
709
|
|
|
unset($row); |
710
|
|
|
|
711
|
|
|
$this->log( |
712
|
|
|
"syncOneWayTable() $tableSource -> $table, " . |
713
|
|
|
count($dataDest) . " rows wrote." |
714
|
|
|
); |
715
|
|
|
} |
716
|
|
|
} |
717
|
|
|
|
718
|
|
|
// Notice: If a table need to write to 2 table in dest, and one |
719
|
|
|
// table write successful and another fail, the last timestamp |
720
|
|
|
// will still set. |
721
|
|
|
$this->setLastTimestamp($this->dbDest, $tableSource, $lastTimestamp); |
722
|
|
|
|
723
|
|
|
return $rowsSynced; |
724
|
|
|
} |
725
|
|
|
} |
726
|
|
|
} |
727
|
|
|
|
Duplicated code is one of the most pungent code smells. If you need to duplicate the same code in three or more different places, we strongly encourage you to look into extracting the code into a single class or operation.
You can also find more detailed suggestions in the “Code” section of your repository.