1
|
|
|
<?php |
2
|
|
|
/** |
3
|
|
|
* @package fwolflib |
4
|
|
|
* @subpackage class |
5
|
|
|
* @copyright Copyright 2008-2009, Fwolf |
6
|
|
|
* @author Fwolf <[email protected]> |
7
|
|
|
* @since 2008-05-20 |
8
|
|
|
*/ |
9
|
|
|
|
10
|
|
|
|
11
|
|
|
require_once(dirname(__FILE__) . '/fwolflib.php'); |
12
|
|
|
require_once(FWOLFLIB . 'class/adodb.php'); |
13
|
|
|
require_once(FWOLFLIB . 'func/ecl.php'); |
14
|
|
|
require_once(FWOLFLIB . 'func/string.php'); |
15
|
|
|
require_once(FWOLFLIB . 'func/uuid.php'); |
16
|
|
|
|
17
|
|
|
|
18
|
|
|
/** |
19
|
|
|
* Sync data between 2 database source |
20
|
|
|
* |
21
|
|
|
* At now, only from 1 to another, |
22
|
|
|
* cannot do two-side sync yet. |
23
|
|
|
* |
24
|
|
|
* @deprecated Use Fwlib\Db\SyncDbData |
25
|
|
|
* @package fwolflib |
26
|
|
|
* @subpackage class |
27
|
|
|
* @copyright Copyright 2008, Fwolf |
28
|
|
|
* @author Fwolf <[email protected]> |
29
|
|
|
* @since 2008-05-20 |
30
|
|
|
* @version $Id$ |
31
|
|
|
* @see AdoDb |
32
|
|
|
*/ |
33
|
|
|
class SyncDbData extends Fwolflib { |
|
|
|
|
34
|
|
|
|
35
|
|
|
/** |
36
|
|
|
* Oneway sync config |
37
|
|
|
* |
38
|
|
|
* If 1 tbl from source need write to 2 tbl to destination |
39
|
|
|
* set destination tbl in queue an array. |
40
|
|
|
* |
41
|
|
|
* 1 source tbl can only accur once in queue array index, |
42
|
|
|
* because timestamp is record by it. |
43
|
|
|
* |
44
|
|
|
* <code> |
45
|
|
|
* srce Source db profile(array) |
46
|
|
|
* dest Destination db profile(array) |
47
|
|
|
* queue = array( |
48
|
|
|
* tbl_srce => tbl_dest, |
49
|
|
|
* tbl_srce => array(tbl_dest1, tbl_dest2), |
50
|
|
|
* ) |
51
|
|
|
* </code> |
52
|
|
|
* |
53
|
|
|
* Will auto call data parse function, if not exist, use original data. |
54
|
|
|
* eg: DataConvertTblSrce |
55
|
|
|
* This function accept data array retrieve from source, |
56
|
|
|
* and return data array will write to destination. |
57
|
|
|
* |
58
|
|
|
* When destination write done, update timestamp in record tbl. |
59
|
|
|
* |
60
|
|
|
* Change to assign through function param, here is just a sample. |
61
|
|
|
* @var array |
62
|
|
|
*/ |
63
|
|
|
//public $aCfgOneway = array(); |
64
|
|
|
|
65
|
|
|
/** |
66
|
|
|
* Log message |
67
|
|
|
* @var array |
68
|
|
|
*/ |
69
|
|
|
public $aLog = array(); |
70
|
|
|
|
71
|
|
|
/** |
72
|
|
|
* Number of rows have processed |
73
|
|
|
* @var integer |
74
|
|
|
*/ |
75
|
|
|
protected $iBatchDone = 0; |
76
|
|
|
|
77
|
|
|
/** |
78
|
|
|
* Process N rows per run |
79
|
|
|
* @var integer |
80
|
|
|
*/ |
81
|
|
|
public $iBatchSize = 100; |
82
|
|
|
|
83
|
|
|
/** |
84
|
|
|
* Source db profile name |
85
|
|
|
* A join of db type, host, name in db config array, '-' splitted. |
86
|
|
|
* @var string |
87
|
|
|
*/ |
88
|
|
|
protected $sDbProfSrce = ''; |
89
|
|
|
|
90
|
|
|
/** |
91
|
|
|
* Lock file to avoid run duplicate. |
92
|
|
|
* |
93
|
|
|
* @var string |
94
|
|
|
*/ |
95
|
|
|
public $sLockFile = '/tmp/sync-db-data.lock'; |
96
|
|
|
|
97
|
|
|
/** |
98
|
|
|
* Name of record table |
99
|
|
|
* |
100
|
|
|
* Create in destination db. |
101
|
|
|
* @var string |
102
|
|
|
*/ |
103
|
|
|
public $sTblRecord = 'sync_db_data_record'; |
104
|
|
|
|
105
|
|
|
/** |
106
|
|
|
* Db object - source |
107
|
|
|
* @var object |
108
|
|
|
*/ |
109
|
|
|
protected $oDbSrce = null; |
110
|
|
|
|
111
|
|
|
/** |
112
|
|
|
* Db object - destination |
113
|
|
|
* @var object |
114
|
|
|
*/ |
115
|
|
|
protected $oDbDest = null; |
116
|
|
|
|
117
|
|
|
|
118
|
|
|
/** |
119
|
|
|
* construct |
120
|
|
|
*/ |
121
|
|
|
public function __construct() { |
122
|
|
|
$this->Log('======== ' . date('Y-m-d H:i:s') . ' ========'); |
123
|
|
|
|
124
|
|
|
// Check if lockfile exists |
125
|
|
|
if (file_exists($this->sLockFile)) { |
126
|
|
|
Ecl("\n======== " . date('Y-m-d H:i:s') . ' ========'); |
|
|
|
|
127
|
|
|
die("Lock file exists, previous run not end, quit.\n"); |
128
|
|
|
} else |
129
|
|
|
$this->LockFileCreate(); |
130
|
|
|
|
131
|
|
|
// Do check after we know target db |
132
|
|
|
//$this->ChkTblRecord(); |
133
|
|
|
|
134
|
|
|
} // end of func __construct |
135
|
|
|
|
136
|
|
|
|
137
|
|
|
/** |
138
|
|
|
* destruct, output log message, only when there is some sync happen. |
139
|
|
|
*/ |
140
|
|
|
public function __destruct() { |
141
|
|
|
if (0 != $this->iBatchDone) |
142
|
|
|
foreach ($this->aLog as &$log) |
143
|
|
|
Ecl($log); |
|
|
|
|
144
|
|
|
|
145
|
|
|
$this->LockFileDelete(); |
146
|
|
|
} // end of func destruct |
147
|
|
|
|
148
|
|
|
|
149
|
|
|
/** |
150
|
|
|
* Check and create db connection |
151
|
|
|
* @param array &$config |
152
|
|
|
*/ |
153
|
|
|
protected function ChkDbConn(&$config) { |
154
|
|
|
// Check and connection db |
155
|
|
|
if (!empty($config['srce'])) { |
156
|
|
|
$db_srce = $this->DbConn($config['srce']); |
157
|
|
|
$this->sDbProfSrce = $config['srce']['type'] |
158
|
|
|
. '-' . $config['srce']['host'] |
159
|
|
|
. '-' . $config['srce']['name']; |
160
|
|
|
$this->oDbSrce = &$db_srce; |
161
|
|
|
} |
162
|
|
|
|
163
|
|
|
if (!empty($config['dest'])) { |
164
|
|
|
$db_dest = $this->DbConn($config['dest']); |
165
|
|
|
$this->oDbDest = &$db_dest; |
166
|
|
|
// Record tbl was create in destination db |
167
|
|
|
$this->ChkTblRecord($db_dest); |
|
|
|
|
168
|
|
|
} |
169
|
|
|
} // end of func ChkDbConn |
170
|
|
|
|
171
|
|
|
|
172
|
|
|
/** |
173
|
|
|
* Check and install record table if not exists |
174
|
|
|
* @param object $db Db connection |
175
|
|
|
* @param string $tbl Name of record tbl, if empty, use $this->sTblRecord |
176
|
|
|
*/ |
177
|
|
|
protected function ChkTblRecord($db, $tbl = '') |
178
|
|
|
{ |
179
|
|
|
if (empty($tbl)) |
180
|
|
|
$tbl = $this->sTblRecord; |
181
|
|
|
|
182
|
|
|
if (!$db->TblExists($tbl)) { |
183
|
|
|
// Table doesn't exist, create it |
184
|
|
|
// SQL for Create table diffs from several db |
185
|
|
|
if ($db->IsDbSybase()) |
186
|
|
|
// Sybase's index was created seperated, not implement now. |
187
|
|
|
$sql = " |
188
|
|
|
CREATE TABLE $tbl ( |
189
|
|
|
uuid CHAR(36) NOT NULL, |
190
|
|
|
db_prof VARCHAR(50) NOT NULL, |
191
|
|
|
tbl_title VARCHAR(50) NOT NULL, |
192
|
|
|
-- Timestamp remembered, for next round |
193
|
|
|
last_ts VARCHAR(50) NOT NULL, |
194
|
|
|
-- Timestamp for this table |
195
|
|
|
-- In sybase 'timestamp' must be lower cased |
196
|
|
|
ts timestamp NOT NULL, |
197
|
|
|
constraint PK_$tbl PRIMARY KEY (uuid) |
198
|
|
|
) |
199
|
|
|
"; |
200
|
|
|
elseif ($db->IsDbMysql()) |
201
|
|
|
// ENGINE=InnoDB DEFAULT CHARSET=utf8 COLLATE=utf8_general_ci |
202
|
|
|
$sql = " |
203
|
|
|
CREATE TABLE $tbl ( |
204
|
|
|
uuid CHAR(36) NOT NULL, |
205
|
|
|
db_prof VARCHAR(50) NOT NULL, |
206
|
|
|
tbl_title VARCHAR(50) NOT NULL, |
207
|
|
|
-- Timestamp remembered, for next round |
208
|
|
|
last_ts VARCHAR(50) NOT NULL, |
209
|
|
|
-- Timestamp for this table |
210
|
|
|
ts TIMESTAMP NOT NULL |
211
|
|
|
DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP, |
212
|
|
|
PRIMARY KEY (uuid), |
213
|
|
|
INDEX idx_{$tbl}_1 (db_prof, tbl_title) |
214
|
|
|
); |
215
|
|
|
"; |
216
|
|
|
else { |
217
|
|
|
$this->Log('Table record create syntax not implement.'); |
218
|
|
|
$this->Quit(); |
219
|
|
|
} |
220
|
|
|
|
221
|
|
|
// Execute create table sql |
222
|
|
|
$db->Execute($sql); |
|
|
|
|
223
|
|
|
if (0 < $db->ErrorNo()) |
224
|
|
|
{ |
225
|
|
|
$this->Log($db->ErrorNo() . ' - ' . $db->ErrorMsg()); |
226
|
|
|
$this->Log("Log table $tbl doesn't exists and create fail."); |
227
|
|
|
$this->Quit(); |
228
|
|
|
} |
229
|
|
|
else { |
230
|
|
|
// Sybase - create index |
231
|
|
|
$db->Execute("CREATE INDEX idx_{$tbl}_1 ON |
232
|
|
|
$tbl(db_prof, tbl_title) |
233
|
|
|
"); |
234
|
|
|
|
235
|
|
|
// Log table create information |
236
|
|
|
$this->Log("Log table $tbl doesn't exist, create it, done."); |
237
|
|
|
} |
238
|
|
|
} |
239
|
|
|
else |
240
|
|
|
{ |
241
|
|
|
// Log table exist information |
242
|
|
|
$this->Log("Log table $tbl already exists."); |
243
|
|
|
} |
244
|
|
|
} // end of func ChkTblRecord |
245
|
|
|
|
246
|
|
|
|
247
|
|
|
/** |
248
|
|
|
* Connect to db, using func defined in Adodb, check error here. |
249
|
|
|
* |
250
|
|
|
* <code> |
251
|
|
|
* $s = array(type, host, user, pass, name, lang); |
252
|
|
|
* type is mysql/sybase_ase etc, |
253
|
|
|
* name is dbname to select, |
254
|
|
|
* lang is db server charset. |
255
|
|
|
* </code> |
256
|
|
|
* |
257
|
|
|
* Useing my extended ADODB class now, little difference when new object. |
258
|
|
|
* @var array $dbprofile Server config array |
259
|
|
|
* @return object Db connection object |
260
|
|
|
*/ |
261
|
|
|
protected function DbConn($dbprofile) |
262
|
|
|
{ |
263
|
|
|
//$conn = DbConn($s); |
264
|
|
|
$conn = new Adodb($dbprofile); |
|
|
|
|
265
|
|
|
$conn->Connect(); |
266
|
|
|
//var_dump($conn); |
267
|
|
|
|
268
|
|
|
if (0 !=$conn->ErrorNo()) |
|
|
|
|
269
|
|
|
{ |
270
|
|
|
// Display error |
271
|
|
|
$s = 'ErrorNo: ' . $conn->ErrorNo() . "\n" |
|
|
|
|
272
|
|
|
. "ErrorMsg: " . $conn->ErrorMsg(); |
|
|
|
|
273
|
|
|
$this->Log($s); |
274
|
|
|
$this->Quit(); |
275
|
|
|
} |
276
|
|
|
else |
277
|
|
|
return $conn; |
278
|
|
|
} // end of func DbConn |
279
|
|
|
|
280
|
|
|
|
281
|
|
|
/** |
282
|
|
|
* Get last timestamp remembered |
283
|
|
|
* @param $db_dest Destination db connection, find here |
284
|
|
|
* @param $tbl_srce Table name of source table. |
285
|
|
|
* @return string Return '' if no last_ts remembered. |
286
|
|
|
* @see $sDbProfSrce |
287
|
|
|
*/ |
288
|
|
View Code Duplication |
protected function GetLastTs($db_dest, $tbl_srce) { |
|
|
|
|
289
|
|
|
$sql = $db_dest->GenSql(array( |
290
|
|
|
'SELECT' => 'last_ts', |
291
|
|
|
'FROM' => $this->sTblRecord, |
292
|
|
|
'WHERE' => array( |
293
|
|
|
"db_prof = '{$this->sDbProfSrce}'", |
294
|
|
|
"tbl_title = '$tbl_srce'", |
295
|
|
|
), |
296
|
|
|
'LIMIT' => 1, |
297
|
|
|
)); |
298
|
|
|
$rs = $db_dest->Execute($sql); |
299
|
|
|
if (0 < $rs->RowCount()) { |
300
|
|
|
return $rs->fields['last_ts']; |
301
|
|
|
} |
302
|
|
|
else { |
303
|
|
|
return ''; |
304
|
|
|
} |
305
|
|
|
} // end of func GetLastTs |
306
|
|
|
|
307
|
|
|
|
308
|
|
|
/** |
309
|
|
|
* Create lock file. |
310
|
|
|
* |
311
|
|
|
* @return $this |
312
|
|
|
*/ |
313
|
|
|
protected function LockFileCreate() { |
314
|
|
|
if (is_writable($this->sLockFile)) |
315
|
|
|
file_put_contents($this->sLockFile, ''); |
316
|
|
|
return $this; |
317
|
|
|
} // end of func LockFileCreate |
318
|
|
|
|
319
|
|
|
|
320
|
|
|
/** |
321
|
|
|
* Delete lock file. |
322
|
|
|
* |
323
|
|
|
* @return $this |
324
|
|
|
*/ |
325
|
|
|
protected function LockFileDelete() { |
326
|
|
|
if (is_writable($this->sLockFile)) |
327
|
|
|
unlink($this->sLockFile); |
328
|
|
|
return $this; |
329
|
|
|
} // end of func LockFileDelete |
330
|
|
|
|
331
|
|
|
|
332
|
|
|
/* |
333
|
|
|
* Save or output log message, change to save now, output when destruct. |
334
|
|
|
* @param string $log |
335
|
|
|
*/ |
336
|
|
|
public function Log($log) |
337
|
|
|
{ |
338
|
|
|
//$this->sSummary .= $log; |
339
|
|
|
//Ecl($log); |
340
|
|
|
$this->aLog[] = $log; |
341
|
|
|
} // end of func Log |
342
|
|
|
|
343
|
|
|
|
344
|
|
|
/** |
345
|
|
|
* Quit this program when error |
346
|
|
|
*/ |
347
|
|
|
protected function Quit() { |
348
|
|
|
$this->LockFileDelete(); |
349
|
|
|
die(); |
350
|
|
|
} // end of func Quit |
351
|
|
|
|
352
|
|
|
|
353
|
|
|
/** |
354
|
|
|
* Set last timestamp, for next round |
355
|
|
|
* @param $db_dest Destination db connection, write here |
356
|
|
|
* @param $tbl_srce Table name of source table. |
357
|
|
|
* @param $last_ts Last timestamp value. |
358
|
|
|
* @param boolean Operate true or false |
359
|
|
|
*/ |
360
|
|
|
protected function SetLastTs($db_dest, $tbl_srce, $last_ts) { |
361
|
|
|
// Find if position exists first, and set gen sql config |
362
|
|
|
if ('' == $this->GetLastTs($db_dest, $tbl_srce)) { |
363
|
|
|
// Insert mode |
364
|
|
|
$ar_conf = array( |
365
|
|
|
'INSERT' => $this->sTblRecord, |
366
|
|
|
'VALUES' => array( |
367
|
|
|
'uuid' => $this->Uuid(), |
368
|
|
|
'db_prof' => $this->sDbProfSrce, |
369
|
|
|
'tbl_title' => $tbl_srce, |
370
|
|
|
'last_ts' => $last_ts |
371
|
|
|
), |
372
|
|
|
); |
373
|
|
|
} |
374
|
|
|
else { |
375
|
|
|
// Update mode |
376
|
|
|
$ar_conf = array( |
377
|
|
|
'UPDATE' => $this->sTblRecord, |
378
|
|
|
'SET' => array('last_ts' => $last_ts), |
379
|
|
|
'WHERE' => array( |
380
|
|
|
"db_prof = '$this->sDbProfSrce'", |
381
|
|
|
"tbl_title = '$tbl_srce'", |
382
|
|
|
), |
383
|
|
|
'LIMIT' => 1, |
384
|
|
|
); |
385
|
|
|
} |
386
|
|
|
// Execute sql |
387
|
|
|
$sql = $db_dest->GenSql($ar_conf); |
388
|
|
|
$rs = $db_dest->Execute($sql); |
|
|
|
|
389
|
|
|
if (0 == $db_dest->ErrorNo()) { |
390
|
|
|
return true; |
391
|
|
|
} |
392
|
|
|
else { |
393
|
|
|
return false; |
394
|
|
|
} |
395
|
|
|
} // end of func SetLastTs |
396
|
|
|
|
397
|
|
|
|
398
|
|
|
/** |
399
|
|
|
* Check if data had been deleted from srce |
400
|
|
|
* Caution if dest table's data is ONLY from srce table, |
401
|
|
|
* if not, you shouldn't use this func. |
402
|
|
|
* @param array &$config |
403
|
|
|
*/ |
404
|
|
|
public function SyncChkDel(&$config) { |
405
|
|
|
if ($this->iBatchDone >= $this->iBatchSize) { |
406
|
|
|
$this->Log('Data sync not complete, delete check will not run.'); |
407
|
|
|
return; |
408
|
|
|
} |
409
|
|
|
|
410
|
|
|
$this->ChkDbConn($config); |
411
|
|
|
|
412
|
|
|
// Doing queue |
413
|
|
|
$i_batch_done = $this->iBatchDone; // Temp save |
414
|
|
|
$this->iBatchDone = 0; |
415
|
|
View Code Duplication |
if (!empty($config['queue']) && is_array($config['queue'])) { |
|
|
|
|
416
|
|
|
foreach ($config['queue'] as $tbl_srce => $tbl_dest) |
417
|
|
|
if ($this->iBatchDone < $this->iBatchSize) |
418
|
|
|
// Notice, $tbl_dest maybe an array |
419
|
|
|
$this->iBatchDone += $this->SyncChkDelTbl( |
420
|
|
|
$this->oDbSrce, $this->oDbDest, $tbl_srce, $tbl_dest); |
421
|
|
|
} |
422
|
|
|
|
423
|
|
|
// Output message |
424
|
|
|
global $i_db_query_times; |
425
|
|
|
$this->Log("SyncChkDel done, total {$this->iBatchDone} rows deleted," |
426
|
|
|
. " db query(s) $i_db_query_times times.\n"); |
427
|
|
|
|
428
|
|
|
// Reset stat data for other operation |
429
|
|
|
$i_db_query_times = 0; |
430
|
|
|
$this->iBatchDone += $i_batch_done; // Temp restore |
431
|
|
|
} // end of func SyncChkDel |
432
|
|
|
|
433
|
|
|
|
434
|
|
|
/** |
435
|
|
|
* Check if data had been deleted from srce on a single table |
436
|
|
|
* @param object $db_srce Source db connection |
437
|
|
|
* @param object $db_dest Destination db connection |
438
|
|
|
* @param string $tbl_srce Source table |
439
|
|
|
* @param mixed $tbl_dest Destination table(name or array of name) |
440
|
|
|
* @return integer Number of rows deleted on destination db. |
441
|
|
|
*/ |
442
|
|
|
public function SyncChkDelTbl($db_srce, $db_dest, $tbl_srce, $tbl_dest) { |
443
|
|
|
if (is_array($tbl_dest)) { |
444
|
|
|
$i = 0; |
445
|
|
|
foreach ($tbl_dest as $dest) |
446
|
|
|
$i += $this->SyncChkDelTbl($db_srce, $db_dest |
447
|
|
|
, $tbl_srce, $dest); |
448
|
|
|
return $i; |
449
|
|
|
} |
450
|
|
|
// In below, $tbl_dest is STRING now |
451
|
|
|
|
452
|
|
|
// Get row count from each side |
453
|
|
|
$i_srce = $db_srce->GetRowCount($tbl_srce); |
454
|
|
|
$i_dest = $db_dest->GetRowCount($tbl_dest); |
455
|
|
|
|
456
|
|
|
// Need ? compare row count. |
457
|
|
|
if ($i_srce < $i_dest) { |
458
|
|
|
// Log check begin |
459
|
|
|
$s_log = "Delete check: $tbl_srce($i_srce) <- "; |
460
|
|
|
$s_log .= $tbl_dest . '(' . $i_dest . ')'; |
461
|
|
|
//$this->Log($s_log . ' .'); |
462
|
|
|
|
463
|
|
|
// Find unnecessary PK in dest(srce To dest) |
464
|
|
|
// DataCompare func return PK of rows need to del |
465
|
|
|
$s_func = 'DataCompare' . StrUnderline2Ucfirst($tbl_srce) |
|
|
|
|
466
|
|
|
. 'To' . StrUnderline2Ucfirst($tbl_dest); |
|
|
|
|
467
|
|
|
if (method_exists($this, $s_func)) { |
468
|
|
|
// Got the rows |
469
|
|
|
$ar_todel = $this->$s_func(); |
470
|
|
|
if (!empty($ar_todel)) { |
471
|
|
|
// Do del |
472
|
|
|
$ar_conf = array( |
473
|
|
|
'DELETE' => $tbl_dest, |
474
|
|
|
'LIMIT' => 1, |
475
|
|
|
); |
476
|
|
|
|
477
|
|
|
// Apply PK to WHERE clause |
478
|
|
|
// Notice: order of pk must same with $ar_todel |
479
|
|
|
$pk = $this->oDbDest->GetMetaPrimaryKey($tbl_dest); |
480
|
|
|
if (!is_array($pk)) |
481
|
|
|
$pk = array(0 => $pk); |
482
|
|
|
foreach ($pk as $key) { |
483
|
|
|
$ar_conf['WHERE'][] = "$key = " |
484
|
|
|
. $this->oDbDest->Param($key); |
485
|
|
|
} |
486
|
|
|
// Prepare sql |
487
|
|
|
$sql = $this->oDbDest->GenSqlPrepare($ar_conf); |
488
|
|
|
$stmt = $this->oDbDest->Prepare($sql); |
489
|
|
|
|
490
|
|
|
// Execute sql |
491
|
|
|
$this->oDbDest->EncodingConvert($ar_todel); |
492
|
|
|
try { |
493
|
|
|
$this->oDbDest->Execute($stmt, $ar_todel); |
494
|
|
|
} |
495
|
|
|
catch (Exception $e) { |
496
|
|
|
// Show error message ? |
497
|
|
|
$this->oDbDest->RollbackTrans(); |
498
|
|
|
$s_log .= ' fail(1) .'; |
499
|
|
|
$this->Log($s_log); |
500
|
|
|
$this->Log("\tError when execute del sql on $tbl_dest ."); |
501
|
|
|
return -100; |
502
|
|
|
} |
503
|
|
|
// Any error ? |
504
|
|
|
if (0 != $this->oDbDest->ErrorNo()) { |
505
|
|
|
$s_log .= ' fail(2) .'; |
506
|
|
|
$this->Log($s_log); |
507
|
|
|
// Log to error log file |
508
|
|
|
$this->Log(' ErrorNo: ' . $this->oDbDest->ErrorNo() |
509
|
|
|
. "\n\tErrorMsg: " . $this->oDbDest->ErrorMsg() |
510
|
|
|
); |
511
|
|
|
$this->oDbDest->RollbackTrans(); |
512
|
|
|
return -100; |
513
|
|
|
} |
514
|
|
|
else { |
515
|
|
|
$this->oDbDest->CommitTrans(); |
516
|
|
|
// Ok, count affected rows |
517
|
|
|
// But Affectd_Rows can't use with prepare |
518
|
|
|
$i = count($ar_todel); |
519
|
|
|
$s_log .= ", $i rows deleted."; |
520
|
|
|
$this->Log($s_log); |
521
|
|
|
return $i; |
522
|
|
|
} |
523
|
|
|
// Sql execute completed |
524
|
|
|
} |
525
|
|
|
|
526
|
|
|
// :DEBUG: Make msg always printed |
527
|
|
|
//$this->iBatchDone ++; |
528
|
|
|
} |
529
|
|
|
else { |
530
|
|
|
// Need compare func |
531
|
|
|
$s_log .= ' fail .'; |
532
|
|
|
$this->Log($s_log); |
533
|
|
|
$this->Log(" Compare func needed: $tbl_srce To $tbl_dest ."); |
534
|
|
|
return 0; |
535
|
|
|
} |
536
|
|
|
} |
537
|
|
|
} // end of func SyncChkDelTbl |
538
|
|
|
|
539
|
|
|
|
540
|
|
|
/** |
541
|
|
|
* Do oneway sync |
542
|
|
|
* @param array &$config |
543
|
|
|
*/ |
544
|
|
|
public function SyncOneway(&$config) { |
545
|
|
|
$this->ChkDbConn($config); |
546
|
|
|
|
547
|
|
|
// Doing queue |
548
|
|
|
$i_batch_done = $this->iBatchDone; // Temp save |
549
|
|
|
$this->iBatchDone = 0; |
550
|
|
View Code Duplication |
if (!empty($config['queue']) && is_array($config['queue'])) { |
|
|
|
|
551
|
|
|
foreach ($config['queue'] as $tbl_srce => $tbl_dest) |
552
|
|
|
if ($this->iBatchDone < $this->iBatchSize) |
553
|
|
|
// Notice, $tbl_dest maybe an array |
554
|
|
|
$this->iBatchDone += $this->SyncOnewayTbl( |
555
|
|
|
$this->oDbSrce, $this->oDbDest, $tbl_srce, $tbl_dest); |
556
|
|
|
} |
557
|
|
|
// Output message |
558
|
|
|
global $i_db_query_times; |
559
|
|
|
$this->Log("SyncOneway done, total {$this->iBatchDone} rows wrote," |
560
|
|
|
. " db query(s) $i_db_query_times times.\n"); |
561
|
|
|
|
562
|
|
|
// Reset stat data for other operation |
563
|
|
|
$i_db_query_times = 0; |
564
|
|
|
$this->iBatchDone += $i_batch_done; // Temp restore |
565
|
|
|
} // end of func SyncOneway |
566
|
|
|
|
567
|
|
|
|
568
|
|
|
/** |
569
|
|
|
* Do oneway sync on a single table |
570
|
|
|
* @param object $db_srce Source db connection |
571
|
|
|
* @param object $db_dest Destination db connection |
572
|
|
|
* @param string $tbl_srce Source table |
573
|
|
|
* @param mixed $tbl_dest Destination table(name or array of name) |
574
|
|
|
* Main dest tbl should define before others. |
575
|
|
|
* @return integer Number of rows write to destination db. |
576
|
|
|
*/ |
577
|
|
|
public function SyncOnewayTbl($db_srce, $db_dest, $tbl_srce, $tbl_dest) { |
578
|
|
|
// Prepare |
579
|
|
|
$last_ts = $this->GetLastTs($db_dest, $tbl_srce); |
580
|
|
|
$col_ts = $db_srce->FindColTs($tbl_srce); |
581
|
|
|
if (empty($col_ts)) { |
582
|
|
|
$this->Log("Table $tbl_srce in source db hasn't timestamp column."); |
583
|
|
|
$this->Quit(); |
584
|
|
|
} |
585
|
|
|
|
586
|
|
|
// Retrieve data from source db |
587
|
|
|
$ar_conf = array( |
588
|
|
|
'SELECT' => '*', |
589
|
|
|
'FROM' => $tbl_srce, |
590
|
|
|
//'LIMIT' => $this->iBatchSize, |
591
|
|
|
'ORDERBY' => "$col_ts asc", |
592
|
|
|
); |
593
|
|
|
if (!empty($last_ts)) { |
594
|
|
|
$last_ts = $db_srce->QuoteValue($tbl_srce, $col_ts, $last_ts); |
595
|
|
|
// Some db's timestamp have duplicate value, use '>=' to avoid some rows been skipped. |
596
|
|
|
// :NOTICE: If N rows have same ts, and N > $this->iBatchSize, it will endless loop. |
597
|
|
|
// So use '>' when possible. |
598
|
|
|
if ($db_srce->IsTsUnique()) |
599
|
|
|
$ar_conf['WHERE'] = "$col_ts > $last_ts"; |
600
|
|
|
else |
601
|
|
|
$ar_conf['WHERE'] = "$col_ts >= $last_ts"; |
602
|
|
|
} |
603
|
|
|
$sql = $db_srce->GenSql($ar_conf); |
604
|
|
|
$rs = $db_srce->SelectLimit($sql, $this->iBatchSize - $this->iBatchDone); |
605
|
|
|
|
606
|
|
|
if (!empty($rs) && 0 < $rs->RowCount()) { |
607
|
|
|
// Got data, prepare write to destination db |
608
|
|
|
// Multi-rows write mode |
609
|
|
|
$ar_rows = array(); |
610
|
|
|
$last_ts = ''; // Last ts to be remembered |
611
|
|
|
while (!$rs->EOF) { |
612
|
|
|
// Get one data row, and convert it to dest format |
613
|
|
|
$ar = $rs->FetchRow(); |
614
|
|
|
|
615
|
|
|
// Php-sybase in ubuntu intrepid use mssql wrongly, so read timestamp |
616
|
|
|
// error way, need to correct, and before encoding convert. |
617
|
|
|
if (16 != strlen($ar[$col_ts])) |
618
|
|
|
$ar[$col_ts] = bin2hex($ar[$col_ts]); |
619
|
|
|
// Remember timestamp, the last one will write to record table below |
620
|
|
|
$last_ts = strval($ar[$col_ts]); |
621
|
|
|
|
622
|
|
|
$ar = $db_srce->EncodingConvert($ar); |
623
|
|
|
|
624
|
|
|
// Add data from source db to queue, will convert later |
625
|
|
|
if (!empty($ar)) |
626
|
|
|
$ar_rows[] = $ar; |
627
|
|
|
} |
628
|
|
|
// Maybe any reason cause no data in $ar_rows |
629
|
|
|
if (empty($ar_rows)) |
630
|
|
|
return 0; |
631
|
|
|
|
632
|
|
|
// Write data rows to db |
633
|
|
|
//print_r($ar_rows); |
634
|
|
|
// If $tbl_dest is string, convert to array |
635
|
|
|
if (!is_array($tbl_dest)) |
636
|
|
|
$tbl_dest = array($tbl_dest); |
637
|
|
|
$i_batch_done = 0; |
638
|
|
|
// Loop as if $tbl_dest is multi table |
639
|
|
|
foreach ($tbl_dest as &$tbl_dest_single) { |
640
|
|
|
$i_batch_done_single = 0; |
641
|
|
|
// Important: call data convert function |
642
|
|
|
$s_func = 'DataConvert' . StrUnderline2Ucfirst($tbl_srce) |
|
|
|
|
643
|
|
|
. 'To' . StrUnderline2Ucfirst($tbl_dest_single); |
|
|
|
|
644
|
|
|
$ar_dest = array(); |
645
|
|
|
if (method_exists($this, $s_func)) { |
646
|
|
|
// Convert data from source db to data for destination db |
647
|
|
|
foreach ($ar_rows as &$ar_row) { |
648
|
|
|
$ar = $this->$s_func($ar_row); |
649
|
|
|
if (!empty($ar)) |
650
|
|
|
$ar_dest[] = $ar; |
651
|
|
|
} |
652
|
|
|
} |
653
|
|
|
else |
654
|
|
|
// No data convert needed |
655
|
|
|
$ar_dest = &$ar_rows; |
656
|
|
|
|
657
|
|
|
// If got final data, write to db |
658
|
|
|
if (!empty($ar_dest)) { |
659
|
|
|
// Must loop manually, because each row's update/insert is difference |
660
|
|
|
foreach ($ar_dest as &$ar_dest_row) { |
661
|
|
|
$j = $db_dest->Write($tbl_dest_single, $ar_dest_row); |
662
|
|
|
if (0 < $j) { |
663
|
|
|
$i_batch_done += $j; |
664
|
|
|
$i_batch_done_single += $j; |
665
|
|
|
} |
666
|
|
|
} |
667
|
|
|
//$db_dest->Write($tbl_dest, $ar_rows); |
668
|
|
|
} |
669
|
|
|
|
670
|
|
|
// Log single table sync message |
671
|
|
|
if (0 < $i_batch_done_single) |
672
|
|
|
$this->Log("SyncOnewayTbl $tbl_srce -> $tbl_dest_single, " |
673
|
|
|
. "$i_batch_done_single rows wrote."); |
674
|
|
|
} |
675
|
|
|
|
676
|
|
|
// Notice, if a table need to write to 2 table in dest, |
677
|
|
|
// and 1 table write successful and another fail, it will still set last ts. |
678
|
|
|
if (0 <= $i_batch_done) |
679
|
|
|
$this->SetLastTs($db_dest, $tbl_srce, $last_ts); |
680
|
|
|
return $i_batch_done; |
681
|
|
|
} |
682
|
|
|
else |
683
|
|
|
return 0; |
684
|
|
|
} // end of func SyncOnewayTbl |
685
|
|
|
|
686
|
|
|
|
687
|
|
|
/** |
688
|
|
|
* Trim a string, used in array_walk, so param need to be reference |
689
|
|
|
* @param string &$str |
690
|
|
|
* @return string |
691
|
|
|
*/ |
692
|
|
|
public function Trim(&$str) |
693
|
|
|
{ |
694
|
|
|
$str = trim($str); |
695
|
|
|
return $str; |
696
|
|
|
} // end of func Trim |
697
|
|
|
|
698
|
|
|
|
699
|
|
|
/** |
700
|
|
|
* Generate an UUID, can be re-write by sub class |
701
|
|
|
* @return string |
702
|
|
|
*/ |
703
|
|
|
protected function Uuid() { |
704
|
|
|
return Uuid(); |
|
|
|
|
705
|
|
|
} // end of func Uuid |
706
|
|
|
|
707
|
|
|
|
708
|
|
|
} // end of class SyncDbData |
709
|
|
|
?> |
|
|
|
|
710
|
|
|
|
This class, trait or interface has been deprecated. The supplier of the file has supplied an explanatory message.
The explanatory message should give you some clue as to whether and when the type will be removed from the class and what other constant to use instead.