1
|
|
|
<?php |
2
|
|
|
|
3
|
|
|
namespace Wabel\Zoho\CRM\Copy; |
4
|
|
|
|
5
|
|
|
use Doctrine\DBAL\Connection; |
6
|
|
|
use Psr\Log\LoggerInterface; |
7
|
|
|
use Psr\Log\NullLogger; |
8
|
|
|
use Wabel\Zoho\CRM\AbstractZohoDao; |
9
|
|
|
use Wabel\Zoho\CRM\ZohoClient; |
10
|
|
|
use zcrmsdk\crm\crud\ZCRMRecord; |
11
|
|
|
use zcrmsdk\crm\exception\ZCRMException; |
12
|
|
|
use ZipArchive; |
13
|
|
|
|
14
|
|
|
/** |
15
|
|
|
* This class is in charge of synchronizing one table of your database with Zoho records. |
16
|
|
|
*/ |
17
|
|
|
class ZohoDatabaseCopier |
18
|
|
|
{ |
19
|
|
|
/** |
20
|
|
|
* @var Connection |
21
|
|
|
*/ |
22
|
|
|
private $connection; |
23
|
|
|
|
24
|
|
|
private $prefix; |
25
|
|
|
|
26
|
|
|
/** |
27
|
|
|
* @var ZohoChangeListener[] |
28
|
|
|
*/ |
29
|
|
|
private $listeners; |
30
|
|
|
|
31
|
|
|
/** |
32
|
|
|
* @var LoggerInterface |
33
|
|
|
*/ |
34
|
|
|
private $logger; |
35
|
|
|
|
36
|
|
|
/** |
37
|
|
|
* @var LocalChangesTracker |
38
|
|
|
*/ |
39
|
|
|
private $localChangesTracker; |
40
|
|
|
/** |
41
|
|
|
* @var ZohoUserService |
42
|
|
|
*/ |
43
|
|
|
private $zohoUserService; |
44
|
|
|
|
45
|
|
|
/** |
46
|
|
|
* ZohoDatabaseCopier constructor. |
47
|
|
|
* |
48
|
|
|
* @param Connection $connection |
49
|
|
|
* @param string $prefix Prefix for the table name in DB |
50
|
|
|
* @param ZohoChangeListener[] $listeners The list of listeners called when a record is inserted or updated. |
51
|
|
|
*/ |
52
|
|
View Code Duplication |
public function __construct(Connection $connection, ZohoUserService $zohoUserService, $prefix = 'zoho_', array $listeners = [], LoggerInterface $logger = null) |
|
|
|
|
53
|
|
|
{ |
54
|
|
|
$this->connection = $connection; |
55
|
|
|
$this->prefix = $prefix; |
56
|
|
|
$this->listeners = $listeners; |
57
|
|
|
if ($logger === null) { |
58
|
|
|
$this->logger = new NullLogger(); |
59
|
|
|
} else { |
60
|
|
|
$this->logger = $logger; |
61
|
|
|
} |
62
|
|
|
$this->localChangesTracker = new LocalChangesTracker($connection, $this->logger); |
63
|
|
|
$this->zohoUserService = $zohoUserService; |
64
|
|
|
} |
65
|
|
|
|
66
|
|
|
/** |
67
|
|
|
* @throws \Doctrine\DBAL\DBALException |
68
|
|
|
* @throws \Doctrine\DBAL\Schema\SchemaException |
69
|
|
|
* @throws \Wabel\Zoho\CRM\Exception\ZohoCRMResponseException |
70
|
|
|
*/ |
71
|
|
|
public function fetchUserFromZoho() |
72
|
|
|
{ |
73
|
|
|
$users = $this->zohoUserService->getUsers(); |
74
|
|
|
$tableName = 'users'; |
75
|
|
|
$this->logger->info('Fetched ' . count($users) . ' records for table ' . $tableName); |
76
|
|
|
|
77
|
|
|
$table = $this->connection->getSchemaManager()->createSchema()->getTable($tableName); |
78
|
|
|
|
79
|
|
|
$select = $this->connection->prepare('SELECT * FROM ' . $tableName . ' WHERE id = :id'); |
80
|
|
|
|
81
|
|
|
$this->connection->beginTransaction(); |
82
|
|
|
foreach ($users as $user) { |
|
|
|
|
83
|
|
|
$data = []; |
84
|
|
|
$types = []; |
85
|
|
|
foreach ($table->getColumns() as $column) { |
86
|
|
|
if ($column->getName() === 'id') { |
87
|
|
|
continue; |
88
|
|
|
} else { |
89
|
|
|
$fieldMethod = ZohoDatabaseHelper::getUserMethodNameFromField($column->getName()); |
90
|
|
|
if (method_exists($user, $fieldMethod) |
91
|
|
|
&& (!is_array($user->{$fieldMethod}()) && !is_object($user->{$fieldMethod}())) |
92
|
|
|
) { |
93
|
|
|
$data[$column->getName()] = $user->{$fieldMethod}(); |
94
|
|
|
} elseif (method_exists($user, $fieldMethod) |
95
|
|
|
&& is_array($user->{$fieldMethod}()) |
96
|
|
|
&& array_key_exists('name', $user->{$fieldMethod}()) |
97
|
|
|
&& array_key_exists('id', $user->{$fieldMethod}()) |
98
|
|
|
) { |
99
|
|
|
$data[$column->getName()] = $user->{$fieldMethod}()['name']; |
100
|
|
|
} elseif (method_exists($user, $fieldMethod) |
101
|
|
|
&& is_object($user->{$fieldMethod}()) && method_exists($user->{$fieldMethod}(), 'getName') |
102
|
|
|
) { |
103
|
|
|
$object = $user->{$fieldMethod}(); |
104
|
|
|
$data[$column->getName()] = $object->getName(); |
105
|
|
|
} elseif ($column->getName() === 'Currency') { |
106
|
|
|
//Todo: Do a pull request about \ZCRMUser::geCurrency() to \ZCRMUser::getCurrency() |
107
|
|
|
$data[$column->getName()] = $user->geCurrency(); |
108
|
|
|
} else { |
109
|
|
|
continue; |
110
|
|
|
} |
111
|
|
|
} |
112
|
|
|
} |
113
|
|
|
$select->execute(['id' => $user->getId()]); |
114
|
|
|
$result = $select->fetch(\PDO::FETCH_ASSOC); |
115
|
|
|
if ($result === false && $data) { |
|
|
|
|
116
|
|
|
$this->logger->debug(sprintf('Inserting record with ID \'%s\' in table %s...', $user->getId(), $tableName)); |
117
|
|
|
|
118
|
|
|
$data['id'] = $user->getId(); |
119
|
|
|
$types['id'] = 'string'; |
120
|
|
|
|
121
|
|
|
$this->connection->insert($tableName, $data, $types); |
122
|
|
|
} elseif ($data) { |
|
|
|
|
123
|
|
|
$this->logger->debug(sprintf('Updating record with ID \'%s\' in table %s...', $user->getId(), $tableName)); |
124
|
|
|
$identifier = ['id' => $user->getId()]; |
125
|
|
|
$types['id'] = 'string'; |
126
|
|
|
$this->connection->update($tableName, $data, $identifier, $types); |
127
|
|
|
} |
128
|
|
|
|
129
|
|
|
} |
130
|
|
|
$this->connection->commit(); |
131
|
|
|
} |
132
|
|
|
|
133
|
|
|
/** |
134
|
|
|
* @param AbstractZohoDao $dao |
135
|
|
|
* @param bool $incrementalSync Whether we synchronize only the modified files or everything. |
136
|
|
|
* @param bool $twoWaysSync |
137
|
|
|
* @param bool $throwErrors |
138
|
|
|
* @param string $modifiedSince |
139
|
|
|
* |
140
|
|
|
* @throws \Doctrine\DBAL\DBALException |
141
|
|
|
* @throws \Doctrine\DBAL\Schema\SchemaException |
142
|
|
|
* @throws \Wabel\Zoho\CRM\Exception\ZohoCRMResponseException |
143
|
|
|
*/ |
144
|
|
|
public function fetchFromZoho(AbstractZohoDao $dao, $incrementalSync = true, $twoWaysSync = true, $throwErrors = true, $modifiedSince = null) |
145
|
|
|
{ |
146
|
|
|
$tableName = ZohoDatabaseHelper::getTableName($dao, $this->prefix); |
147
|
|
|
|
148
|
|
|
$zohoSyncConfigTableExists = $this->connection->getSchemaManager()->tablesExist(['zoho_sync_config']); |
149
|
|
|
|
150
|
|
|
$currentDateTime = new \DateTime(); |
151
|
|
|
|
152
|
|
|
$totalRecords = 0; |
153
|
|
|
$totalRecordsDeleted = 0; |
154
|
|
|
|
155
|
|
|
$recordsPage = 1; |
156
|
|
|
$stopAndhasMoreResults = true; |
157
|
|
|
$recordsPaginationLastTime = null; |
158
|
|
|
|
159
|
|
|
while ($stopAndhasMoreResults) { |
160
|
|
|
try { |
161
|
|
|
if ($incrementalSync) { |
162
|
|
|
if ($recordsPage === 1) { |
163
|
|
|
// Let's get the last modification date: |
164
|
|
|
$tableDetail = $this->connection->getSchemaManager()->listTableDetails($tableName); |
165
|
|
|
/** @var \DateTime|null $lastActivityTime */ |
166
|
|
|
$lastActivityTime = null; |
167
|
|
|
|
168
|
|
|
$findDateByModifiedTime = false; |
169
|
|
|
|
170
|
|
|
if ($zohoSyncConfigTableExists && $modifiedSince === null) { |
171
|
|
|
$lastDateInConfig = $this->connection->fetchColumn('SELECT config_value FROM zoho_sync_config WHERE config_key = ? AND table_name = ?', [ |
172
|
|
|
'FETCH_RECORDS_MODIFIED_SINCE__DATE', |
173
|
|
|
$tableName |
174
|
|
|
]); |
175
|
|
|
if ($lastDateInConfig !== false) { |
176
|
|
|
$lastPageInConfig = $this->connection->fetchColumn('SELECT config_value FROM zoho_sync_config WHERE config_key = ? AND table_name = ?', [ |
177
|
|
|
'FETCH_RECORDS_MODIFIED_SINCE__PAGE', |
178
|
|
|
$tableName |
179
|
|
|
]); |
180
|
|
|
if ($lastPageInConfig === false) { |
181
|
|
|
$lastPageInConfig = '1'; |
182
|
|
|
} |
183
|
|
|
$lastActivityTime = new \DateTime($lastDateInConfig); |
184
|
|
|
$recordsPage = (int)$lastPageInConfig; |
185
|
|
|
} else { |
186
|
|
|
$findDateByModifiedTime = true; |
187
|
|
|
} |
188
|
|
|
} else { |
189
|
|
|
$findDateByModifiedTime = true; |
190
|
|
|
} |
191
|
|
|
|
192
|
|
|
if ($findDateByModifiedTime) { |
193
|
|
|
if ($modifiedSince) { |
|
|
|
|
194
|
|
|
$lastActivityTime = new \DateTime($modifiedSince); |
195
|
|
|
} else { |
196
|
|
|
if ($tableDetail->hasColumn('modifiedTime')) { |
197
|
|
|
$lastActivityTime = $this->connection->fetchColumn('SELECT MAX(modifiedTime) FROM ' . $tableName); |
198
|
|
|
} |
199
|
|
|
if (!$lastActivityTime && $tableDetail->hasColumn('createdTime')) { |
200
|
|
|
$lastActivityTime = $this->connection->fetchColumn('SELECT MAX(createdTime) FROM ' . $tableName); |
201
|
|
|
} |
202
|
|
|
|
203
|
|
|
if ($lastActivityTime !== null) { |
204
|
|
|
$lastActivityTime = new \DateTime($lastActivityTime, new \DateTimeZone($dao->getZohoClient()->getTimezone())); |
205
|
|
|
// Let's add one second to the last activity time (otherwise, we are fetching again the last record in DB). |
206
|
|
|
$lastActivityTime->add(new \DateInterval('PT1S')); |
207
|
|
|
} |
208
|
|
|
} |
209
|
|
|
} |
210
|
|
|
|
211
|
|
|
if ($lastActivityTime) { |
212
|
|
|
$this->logger->info(sprintf('Incremental copy from %s started for module %s', $lastActivityTime->format(\DateTime::ATOM), $dao->getPluralModuleName())); |
213
|
|
|
} else { |
214
|
|
|
$this->logger->info(sprintf('Incremental copy started for module %s', $dao->getPluralModuleName())); |
215
|
|
|
} |
216
|
|
|
} else { |
217
|
|
|
$lastActivityTime = $recordsPaginationLastTime; |
218
|
|
|
} |
219
|
|
|
|
220
|
|
|
$this->logger->notice(sprintf('Fetching the records to insert/update for module %s...', $dao->getPluralModuleName())); |
221
|
|
|
$records = $dao->getRecords(null, null, null, $lastActivityTime, $recordsPage, 200, $stopAndhasMoreResults); |
222
|
|
|
if ($stopAndhasMoreResults) { |
223
|
|
|
|
224
|
|
|
if ($zohoSyncConfigTableExists) { |
225
|
|
|
$lastDate = $lastActivityTime ? $lastActivityTime->format('Y-m-d H:i:s') : date('Y-m-d H:i:s', 0); |
226
|
|
|
$this->upsertZohoConfig('FETCH_RECORDS_MODIFIED_SINCE__DATE', $tableName, $lastDate); |
227
|
|
|
$this->upsertZohoConfig('FETCH_RECORDS_MODIFIED_SINCE__PAGE', $tableName, (string)$recordsPage); |
228
|
|
|
} |
229
|
|
|
|
230
|
|
|
$recordsPaginationLastTime = $lastActivityTime; |
231
|
|
|
$recordsPage++; |
232
|
|
|
} else { |
233
|
|
|
if ($zohoSyncConfigTableExists) { |
234
|
|
|
$latestDateToSave = $currentDateTime->format('Y-m-d H:i:s'); |
235
|
|
|
$tableDetail = $this->connection->getSchemaManager()->listTableDetails($tableName); |
236
|
|
|
if ($tableDetail->hasColumn('modifiedTime')) { |
237
|
|
|
$latestDateToSave = $this->connection->fetchColumn('SELECT MAX(modifiedTime) FROM ' . $tableName); |
238
|
|
|
} |
239
|
|
|
if (!$latestDateToSave && $tableDetail->hasColumn('createdTime')) { |
240
|
|
|
$latestDateToSave = $this->connection->fetchColumn('SELECT MAX(createdTime) FROM ' . $tableName); |
241
|
|
|
} |
242
|
|
|
if (!$latestDateToSave) { |
243
|
|
|
$latestDateToSave = $currentDateTime->format('Y-m-d H:i:s'); |
244
|
|
|
} |
245
|
|
|
$this->upsertZohoConfig('FETCH_RECORDS_MODIFIED_SINCE__DATE', $tableName, $latestDateToSave); |
246
|
|
|
$this->upsertZohoConfig('FETCH_RECORDS_MODIFIED_SINCE__PAGE', $tableName, '1'); |
247
|
|
|
} |
248
|
|
|
} |
249
|
|
|
$totalRecords = count($records); |
250
|
|
|
$this->logger->debug($totalRecords . ' records fetched.'); |
251
|
|
|
$deletedRecords = []; |
252
|
|
|
$totalRecordsDeleted = 0; |
253
|
|
|
if (($recordsPage - 1) === 1) { |
254
|
|
|
$this->logger->notice(sprintf('Fetching the records to delete for module %s...', $dao->getPluralModuleName())); |
255
|
|
|
$deletedRecords = $dao->getDeletedRecordIds($lastActivityTime); |
256
|
|
|
$totalRecordsDeleted = count($deletedRecords); |
257
|
|
|
$this->logger->debug($totalRecordsDeleted . ' records fetched.'); |
258
|
|
|
} |
259
|
|
|
} else { |
260
|
|
|
$this->logger->info(sprintf('Full copy started for module %s', $dao->getPluralModuleName())); |
261
|
|
|
$this->logger->notice(sprintf('Fetching the records to insert/update for module ...%s', $dao->getPluralModuleName())); |
262
|
|
|
$records = $dao->getRecords(); |
263
|
|
|
$totalRecords = count($records); |
264
|
|
|
$this->logger->debug($totalRecords . ' records fetched.'); |
265
|
|
|
$deletedRecords = []; |
266
|
|
|
$stopAndhasMoreResults = false; |
267
|
|
|
} |
268
|
|
|
} catch (ZCRMException $exception) { |
269
|
|
|
$this->logger->error('Error when getting records for module ' . $dao->getPluralModuleName() . ': ' . $exception->getMessage(), [ |
270
|
|
|
'exception' => $exception |
271
|
|
|
]); |
272
|
|
|
if ($throwErrors) { |
273
|
|
|
throw $exception; |
274
|
|
|
} |
275
|
|
|
return; |
276
|
|
|
} |
277
|
|
|
$this->logger->info(sprintf('Inserting/updating %s records into table %s...', $totalRecords, $tableName)); |
278
|
|
|
|
279
|
|
|
$table = $this->connection->getSchemaManager()->createSchema()->getTable($tableName); |
280
|
|
|
|
281
|
|
|
$select = $this->connection->prepare('SELECT * FROM ' . $tableName . ' WHERE id = :id'); |
282
|
|
|
|
283
|
|
|
$this->connection->beginTransaction(); |
284
|
|
|
|
285
|
|
|
$recordsModificationCounts = [ |
286
|
|
|
'insert' => 0, |
287
|
|
|
'update' => 0, |
288
|
|
|
'delete' => 0, |
289
|
|
|
]; |
290
|
|
|
|
291
|
|
|
$logOffset = $totalRecords >= 500 ? 100 : 50; |
292
|
|
|
$processedRecords = 0; |
293
|
|
|
foreach ($records as $record) { |
294
|
|
View Code Duplication |
if (($processedRecords % $logOffset) === 0) { |
|
|
|
|
295
|
|
|
$this->logger->info(sprintf('%d/%s records processed for module %s', $processedRecords, $totalRecords, $dao->getPluralModuleName())); |
296
|
|
|
} |
297
|
|
|
++$processedRecords; |
298
|
|
|
$data = []; |
299
|
|
|
$types = []; |
300
|
|
|
foreach ($table->getColumns() as $column) { |
301
|
|
|
if (in_array($column->getName(), ['id', 'uid'])) { |
302
|
|
|
continue; |
303
|
|
|
} |
304
|
|
|
$field = $dao->getFieldFromFieldName($column->getName()); |
|
|
|
|
305
|
|
|
if (!$field) { |
306
|
|
|
continue; |
307
|
|
|
} |
308
|
|
|
$getterName = $field->getGetter(); |
309
|
|
|
$dataValue = $record->$getterName(); |
310
|
|
|
$finalFieldData = null; |
|
|
|
|
311
|
|
|
if ($dataValue instanceof ZCRMRecord) { |
312
|
|
|
$finalFieldData = $dataValue->getEntityId(); |
313
|
|
|
} elseif (is_array($dataValue)) { |
314
|
|
|
$finalFieldData = implode(';', $dataValue); |
315
|
|
|
} else { |
316
|
|
|
$finalFieldData = $dataValue; |
317
|
|
|
} |
318
|
|
|
$data[$column->getName()] = $finalFieldData; |
319
|
|
|
$types[$column->getName()] = $column->getType()->getName(); |
320
|
|
|
} |
321
|
|
|
|
322
|
|
|
$select->execute(['id' => $record->getZohoId()]); |
323
|
|
|
$result = $select->fetch(\PDO::FETCH_ASSOC); |
324
|
|
|
if ($result === false) { |
325
|
|
|
$this->logger->debug(sprintf('Inserting record with ID \'%s\' in table %s...', $record->getZohoId(), $tableName)); |
326
|
|
|
|
327
|
|
|
$data['id'] = $record->getZohoId(); |
328
|
|
|
$types['id'] = 'string'; |
329
|
|
|
|
330
|
|
|
$recordsModificationCounts['insert'] += $this->connection->insert($tableName, $data, $types); |
331
|
|
|
|
332
|
|
|
foreach ($this->listeners as $listener) { |
333
|
|
|
$listener->onInsert($data, $dao); |
334
|
|
|
} |
335
|
|
|
} else { |
336
|
|
|
$this->logger->debug(sprintf('Updating record with ID \'%s\' in table %s...', $record->getZohoId(), $tableName)); |
337
|
|
|
$identifier = ['id' => $record->getZohoId()]; |
338
|
|
|
$types['id'] = 'string'; |
339
|
|
|
|
340
|
|
|
$recordsModificationCounts['update'] += $this->connection->update($tableName, $data, $identifier, $types); |
341
|
|
|
|
342
|
|
|
// Let's add the id for the update trigger |
343
|
|
|
$data['id'] = $record->getZohoId(); |
344
|
|
|
foreach ($this->listeners as $listener) { |
345
|
|
|
$listener->onUpdate($data, $result, $dao); |
346
|
|
|
} |
347
|
|
|
} |
348
|
|
|
} |
349
|
|
|
|
350
|
|
|
$this->logger->info(sprintf('Deleting %d records from table %s...', $totalRecordsDeleted, $tableName)); |
351
|
|
|
$sqlStatementUid = 'select uid from ' . $this->connection->quoteIdentifier($tableName) . ' where id = :id'; |
352
|
|
|
$processedRecords = 0; |
353
|
|
|
$logOffset = $totalRecordsDeleted >= 500 ? 100 : 50; |
354
|
|
|
foreach ($deletedRecords as $deletedRecord) { |
355
|
|
View Code Duplication |
if (($processedRecords % $logOffset) === 0) { |
|
|
|
|
356
|
|
|
$this->logger->info(sprintf('%d/%d records processed for module %s', $processedRecords, $totalRecordsDeleted, $dao->getPluralModuleName())); |
357
|
|
|
} |
358
|
|
|
++$processedRecords; |
359
|
|
|
$this->logger->debug(sprintf('Deleting record with ID \'%s\' in table %s...', $deletedRecord->getEntityId(), $tableName)); |
360
|
|
|
$uid = $this->connection->fetchColumn($sqlStatementUid, ['id' => $deletedRecord->getEntityId()]); |
361
|
|
|
$recordsModificationCounts['delete'] += $this->connection->delete($tableName, ['id' => $deletedRecord->getEntityId()]); |
362
|
|
|
if ($twoWaysSync) { |
363
|
|
|
// TODO: we could detect if there are changes to be updated to the server and try to warn with a log message |
364
|
|
|
// Also, let's remove the newly created field (because of the trigger) to avoid looping back to Zoho |
365
|
|
|
$this->connection->delete('local_delete', ['table_name' => $tableName, 'id' => $deletedRecord->getEntityId()]); |
366
|
|
|
$this->connection->delete('local_update', ['table_name' => $tableName, 'uid' => $uid]); |
367
|
|
|
} |
368
|
|
|
} |
369
|
|
|
|
370
|
|
|
$this->logger->notice(sprintf('Copy finished with %d item(s) inserted, %d item(s) updated and %d item(s) deleted.', |
371
|
|
|
$recordsModificationCounts['insert'], |
372
|
|
|
$recordsModificationCounts['update'], |
373
|
|
|
$recordsModificationCounts['delete'] |
374
|
|
|
)); |
375
|
|
|
|
376
|
|
|
if ($recordsModificationCounts['insert'] === 0 && $recordsModificationCounts['update'] === 0 && $recordsModificationCounts['delete'] === 0) { |
377
|
|
|
$stopAndhasMoreResults = false; |
378
|
|
|
if ($zohoSyncConfigTableExists) { |
379
|
|
|
$this->upsertZohoConfig('FETCH_RECORDS_MODIFIED_SINCE__DATE', $tableName, $currentDateTime->format('Y-m-d H:i:s')); |
380
|
|
|
$this->upsertZohoConfig('FETCH_RECORDS_MODIFIED_SINCE__PAGE', $tableName, '1'); |
381
|
|
|
} |
382
|
|
|
} |
383
|
|
|
|
384
|
|
|
$this->connection->commit(); |
385
|
|
|
} |
386
|
|
|
} |
387
|
|
|
|
388
|
|
|
public function fetchFromZohoInBulk(AbstractZohoDao $dao) |
389
|
|
|
{ |
390
|
|
|
/* |
391
|
|
|
* This method is really dirty, and do not use the php sdk because late development for the zoho v1 EOL in december. |
392
|
|
|
* Should be re-written to make it clean. |
393
|
|
|
*/ |
394
|
|
|
// Doc: https://www.zoho.com/crm/developer/docs/api/bulk-read/create-job.html |
395
|
|
|
|
396
|
|
|
$tableName = ZohoDatabaseHelper::getTableName($dao, $this->prefix); |
397
|
|
|
$table = $this->connection->getSchemaManager()->createSchema()->getTable($tableName); |
398
|
|
|
$apiModuleName = $dao->getPluralModuleName(); |
399
|
|
|
|
400
|
|
|
$this->logger->notice('Starting bulk fetch for module ' . $apiModuleName . '...'); |
401
|
|
|
|
402
|
|
|
$zohoClient = new ZohoClient([ |
403
|
|
|
'client_id' => ZOHO_CRM_CLIENT_ID, |
404
|
|
|
'client_secret' => ZOHO_CRM_CLIENT_SECRET, |
405
|
|
|
'redirect_uri' => ZOHO_CRM_CLIENT_REDIRECT_URI, |
406
|
|
|
'currentUserEmail' => ZOHO_CRM_CLIENT_CURRENT_USER_EMAIL, |
407
|
|
|
'applicationLogFilePath' => ZOHO_CRM_CLIENT_APPLICATION_LOGFILEPATH, |
408
|
|
|
'persistence_handler_class' => ZOHO_CRM_CLIENT_PERSISTENCE_HANDLER_CLASS, |
409
|
|
|
'token_persistence_path' => ZOHO_CRM_CLIENT_PERSITENCE_PATH, |
410
|
|
|
'sandbox' => ZOHO_CRM_SANDBOX |
411
|
|
|
], 'Europe/Paris'); |
412
|
|
|
|
413
|
|
|
$client = new \GuzzleHttp\Client(); |
414
|
|
|
$page = 1; |
415
|
|
|
while (true) { |
416
|
|
|
$oauthToken = $zohoClient->getZohoOAuthClient()->getAccessToken(ZOHO_CRM_CLIENT_CURRENT_USER_EMAIL); |
417
|
|
|
|
418
|
|
|
// Step 1: Create a bulk read job |
419
|
|
|
$this->logger->info('Creating read job for module ' . $apiModuleName . ' and page ' . $page . '...'); |
420
|
|
|
$response = $client->request('POST', 'https://' . (ZOHO_CRM_SANDBOX === 'true' ? 'sandbox' : 'www') . '.zohoapis.com/crm/bulk/v2/read', [ |
421
|
|
|
'http_errors' => false, |
422
|
|
|
'headers' => [ |
423
|
|
|
'Authorization' => 'Zoho-oauthtoken ' . $oauthToken |
424
|
|
|
], |
425
|
|
|
'json' => [ |
426
|
|
|
'query' => [ |
427
|
|
|
'module' => $apiModuleName, |
428
|
|
|
'page' => $page |
429
|
|
|
] |
430
|
|
|
] |
431
|
|
|
]); |
432
|
|
|
$jobId = null; |
|
|
|
|
433
|
|
|
if ($response->getStatusCode() >= 200 && $response->getStatusCode() < 300) { |
434
|
|
|
$resultStr = $response->getBody()->getContents(); |
435
|
|
|
$json = json_decode($resultStr, true); |
436
|
|
|
|
437
|
|
|
$jobId = $json['data'][0]['details']['id']; |
438
|
|
|
|
439
|
|
|
// We don't care about the job status right now, it will be checked later |
440
|
|
View Code Duplication |
} else { |
|
|
|
|
441
|
|
|
$this->logger->error('Cannot create bulk read query for module ' . $apiModuleName . ': status: ' . $response->getStatusCode() . '. Status: ' . $response->getBody()->getContents()); |
442
|
|
|
break; |
443
|
|
|
} |
444
|
|
|
|
445
|
|
|
if ($jobId === null) { |
446
|
|
|
$this->logger->error('JobID cannot be null. json:' . $resultStr); |
447
|
|
|
break; |
448
|
|
|
} |
449
|
|
|
|
450
|
|
|
// Step 2: Check job status |
451
|
|
|
$jobDetails = null; |
452
|
|
|
while (true) { |
453
|
|
|
$this->logger->info('Checking job ' . $jobId . ' status for module ' . $apiModuleName . ' and page ' . $page . '...'); |
454
|
|
|
$response = $client->request('GET', 'https://' . (ZOHO_CRM_SANDBOX === 'true' ? 'sandbox' : 'www') . '.zohoapis.com/crm/bulk/v2/read/' . $jobId, [ |
455
|
|
|
'http_errors' => false, |
456
|
|
|
'headers' => [ |
457
|
|
|
'Authorization' => 'Zoho-oauthtoken ' . $oauthToken |
458
|
|
|
] |
459
|
|
|
]); |
460
|
|
|
if ($response->getStatusCode() >= 200 && $response->getStatusCode() < 300) { |
461
|
|
|
$resultStr = $response->getBody()->getContents(); |
462
|
|
|
$json = json_decode($resultStr, true); |
463
|
|
|
|
464
|
|
|
if (isset($json['data'][0]['state'])) { |
465
|
|
|
$status = $json['data'][0]['state']; |
466
|
|
|
if ($status === 'ADDED' || $status === 'QUEUED') { |
467
|
|
|
$this->logger->info('Job still waiting for process'); |
468
|
|
|
} else if ($status === 'IN PROGRESS') { |
469
|
|
|
$this->logger->info('Job in progress'); |
470
|
|
|
} else if ($status === 'COMPLETED') { |
471
|
|
|
$this->logger->info('Job completed'); |
472
|
|
|
$jobDetails = $json; |
473
|
|
|
break; |
474
|
|
|
} else { |
475
|
|
|
$this->logger->info('Unsupported job status: ' . $resultStr); |
476
|
|
|
break; |
477
|
|
|
} |
478
|
|
|
} else { |
479
|
|
|
$this->logger->error('Unsupported response: ' . $resultStr); |
480
|
|
|
break; |
481
|
|
|
} |
482
|
|
View Code Duplication |
} else { |
|
|
|
|
483
|
|
|
$this->logger->error('Cannot get bulk job status query for module ' . $apiModuleName . ': status: ' . $response->getStatusCode() . '. Status: ' . $response->getBody()->getContents()); |
484
|
|
|
break; |
485
|
|
|
} |
486
|
|
|
sleep(15); |
487
|
|
|
} |
488
|
|
|
|
489
|
|
|
// Step 3: Download the result |
490
|
|
|
if ($jobDetails === null) { |
491
|
|
|
$this->logger->error('JobDetails cannot be empty. json:' . $resultStr); |
492
|
|
|
break; |
493
|
|
|
} |
494
|
|
|
$this->logger->debug(json_encode($jobDetails)); |
495
|
|
|
$this->logger->info('Downloading zip file for module ' . $apiModuleName . ' and page ' . $page . '...'); |
496
|
|
|
$jobZipFile = '/tmp/job_' . $dao->getZCRMModule()->getAPIName() . '_' . $jobDetails['data'][0]['id'] . '.zip'; |
497
|
|
|
$jobCsvPath = '/tmp/job_extract'; |
498
|
|
|
$jobCsvFile = '/tmp/job_extract/' . $jobDetails['data'][0]['id'] . '.csv'; |
499
|
|
|
$canProcessCsv = false; |
|
|
|
|
500
|
|
|
|
501
|
|
|
$response = $client->request('GET', 'https://' . (ZOHO_CRM_SANDBOX === 'true' ? 'sandbox' : 'www') . '.zohoapis.com/crm/bulk/v2/read/' . $jobId . '/result', [ |
502
|
|
|
'http_errors' => false, |
503
|
|
|
'headers' => [ |
504
|
|
|
'Authorization' => 'Zoho-oauthtoken ' . $oauthToken |
505
|
|
|
], |
506
|
|
|
'sink' => $jobZipFile |
507
|
|
|
]); |
508
|
|
|
if ($response->getStatusCode() >= 200 && $response->getStatusCode() < 300) { |
509
|
|
|
$this->logger->info('Extracting ' . $jobZipFile . ' file for module ' . $apiModuleName . ' and page ' . $page . '...'); |
510
|
|
|
$zip = new ZipArchive(); |
511
|
|
|
$res = $zip->open($jobZipFile); |
512
|
|
|
if ($res === TRUE) { |
513
|
|
|
$zip->extractTo($jobCsvPath); |
514
|
|
|
$zip->close(); |
515
|
|
|
$this->logger->info('File extracted in ' . $jobCsvFile); |
516
|
|
|
$canProcessCsv = true; |
517
|
|
|
} else { |
518
|
|
|
switch ($res) { |
519
|
|
|
case ZipArchive::ER_EXISTS: |
520
|
|
|
$zipErrorMessage = 'File already exists.'; |
521
|
|
|
break; |
522
|
|
|
case ZipArchive::ER_INCONS: |
523
|
|
|
$zipErrorMessage = 'Zip archive inconsistent.'; |
524
|
|
|
break; |
525
|
|
|
case ZipArchive::ER_MEMORY: |
526
|
|
|
$zipErrorMessage = 'Malloc failure.'; |
527
|
|
|
break; |
528
|
|
|
case ZipArchive::ER_NOENT: |
529
|
|
|
$zipErrorMessage = 'No such file.'; |
530
|
|
|
break; |
531
|
|
|
case ZipArchive::ER_NOZIP: |
532
|
|
|
$zipErrorMessage = 'Not a zip archive.'; |
533
|
|
|
break; |
534
|
|
|
case ZipArchive::ER_OPEN: |
535
|
|
|
$zipErrorMessage = "Can't open file."; |
536
|
|
|
break; |
537
|
|
|
case ZipArchive::ER_READ: |
538
|
|
|
$zipErrorMessage = 'Read error.'; |
539
|
|
|
break; |
540
|
|
|
case ZipArchive::ER_SEEK: |
541
|
|
|
$zipErrorMessage = 'Seek error.'; |
542
|
|
|
break; |
543
|
|
|
default: |
544
|
|
|
$zipErrorMessage = "Unknow (Code $res)"; |
545
|
|
|
break; |
546
|
|
|
} |
547
|
|
|
$this->logger->error('Error when extracting zip file: ' . $zipErrorMessage); |
548
|
|
|
break; |
549
|
|
|
} |
550
|
|
View Code Duplication |
} else { |
|
|
|
|
551
|
|
|
$this->logger->error('Cannot download results for module ' . $apiModuleName . ': status: ' . $response->getStatusCode() . '. Status: ' . $response->getBody()->getContents()); |
552
|
|
|
break; |
553
|
|
|
} |
554
|
|
|
|
555
|
|
|
// Step 4: Save data |
556
|
|
|
if (!$canProcessCsv) { |
557
|
|
|
$this->logger->error('Cannot process CSV'); |
558
|
|
|
break; |
559
|
|
|
} |
560
|
|
|
|
561
|
|
|
$this->logger->info('Building list of users...'); |
562
|
|
|
$usersQuery = $this->connection->executeQuery('SELECT id, full_name FROM users'); |
563
|
|
|
$usersResults = $usersQuery->fetchAll(); |
564
|
|
|
$users = []; |
565
|
|
|
foreach ($usersResults as $user) { |
566
|
|
|
$users[$user['id']] = $user['full_name']; |
567
|
|
|
} |
568
|
|
|
|
569
|
|
|
$this->logger->info('Saving records to db...'); |
570
|
|
|
$nbRecords = $jobDetails['data'][0]['result']['count']; |
571
|
|
|
$whenToLog = ceil($nbRecords / 100); |
572
|
|
|
$this->logger->info($nbRecords . ' records to save'); |
573
|
|
|
$nbSaved = 0; |
574
|
|
|
$handle = fopen($jobCsvFile, 'r'); |
575
|
|
|
$fields = []; |
576
|
|
|
if ($handle) { |
577
|
|
|
while (($row = fgetcsv($handle)) !== false) { |
578
|
|
|
if (empty($fields)) { |
579
|
|
|
$fields = $row; |
580
|
|
|
continue; |
581
|
|
|
} |
582
|
|
|
$recordDataToInsert = []; |
583
|
|
|
foreach ($row as $k => $value) { |
584
|
|
|
$columnName = $fields[$k]; |
585
|
|
|
$decodedColumnName = str_replace('_', '', $columnName); |
586
|
|
|
if ($table->hasColumn($decodedColumnName)) { |
587
|
|
|
$recordDataToInsert[$decodedColumnName] = $value === '' ? null : $value; |
588
|
|
|
} else { |
589
|
|
|
if ($columnName === 'Owner' || $columnName === 'Created_By' || $columnName === 'Modified_By') { |
590
|
|
|
$recordDataToInsert[$decodedColumnName . '_OwnerID'] = $value === '' ? null : $value; |
591
|
|
|
$recordDataToInsert[$decodedColumnName . '_OwnerName'] = $users[$value] ?? null; |
592
|
|
|
} else if ($table->hasColumn($decodedColumnName . '_ID')) { |
593
|
|
|
$recordDataToInsert[$decodedColumnName . '_ID'] = $value === '' ? null : $value; |
594
|
|
|
} |
595
|
|
|
} |
596
|
|
|
} |
597
|
|
|
$this->connection->insert($tableName, $recordDataToInsert); |
598
|
|
|
++$nbSaved; |
599
|
|
|
if (($nbSaved % $whenToLog) === 0) { |
600
|
|
|
$this->logger->info($nbSaved . '/' . $nbRecords . ' records processed'); |
601
|
|
|
} |
602
|
|
|
} |
603
|
|
|
$this->logger->info($nbSaved . ' records saved for module ' . $apiModuleName . ' and page ' . $page); |
604
|
|
|
fclose($handle); |
605
|
|
|
} |
606
|
|
|
|
607
|
|
|
// Step 5: Check if there is more results |
608
|
|
|
$hasMoreRecords = $jobDetails['data'][0]['result']['more_records']; |
609
|
|
|
if (!$hasMoreRecords) { |
610
|
|
|
$this->logger->info('No more records for the module ' . $apiModuleName); |
611
|
|
|
break; |
612
|
|
|
} |
613
|
|
|
$this->logger->info('More records to fetch for the module ' . $apiModuleName); |
614
|
|
|
++$page; |
615
|
|
|
} |
616
|
|
|
} |
617
|
|
|
|
618
|
|
|
private function upsertZohoConfig(string $configKey, string $tableName, string $configValue) |
619
|
|
|
{ |
620
|
|
|
$configExists = $this->connection->fetchColumn('SELECT config_value FROM zoho_sync_config WHERE config_key = ? AND table_name = ?', [ |
621
|
|
|
$configKey, |
622
|
|
|
$tableName |
623
|
|
|
]); |
624
|
|
|
if ($configExists === false) { |
625
|
|
|
$this->connection->insert('zoho_sync_config', [ |
626
|
|
|
'config_key' => $configKey, |
627
|
|
|
'table_name' => $tableName, |
628
|
|
|
'config_value' => $configValue |
629
|
|
|
]); |
630
|
|
|
} else { |
631
|
|
|
$this->connection->update('zoho_sync_config', [ |
632
|
|
|
'config_value' => $configValue |
633
|
|
|
], [ |
634
|
|
|
'config_key' => $configKey, |
635
|
|
|
'table_name' => $tableName, |
636
|
|
|
]); |
637
|
|
|
} |
638
|
|
|
} |
639
|
|
|
} |
640
|
|
|
|
Duplicated code is one of the most pungent code smells. If you need to duplicate the same code in three or more different places, we strongly encourage you to look into extracting the code into a single class or operation.
You can also find more detailed suggestions in the “Code” section of your repository.