1
|
|
|
<?php |
2
|
|
|
|
3
|
|
|
namespace Wabel\Zoho\CRM\Copy; |
4
|
|
|
|
5
|
|
|
use Doctrine\DBAL\Connection; |
6
|
|
|
use Psr\Log\LoggerInterface; |
7
|
|
|
use Psr\Log\NullLogger; |
8
|
|
|
use Wabel\Zoho\CRM\AbstractZohoDao; |
9
|
|
|
use Wabel\Zoho\CRM\Request\Response; |
10
|
|
|
use zcrmsdk\crm\crud\ZCRMRecord; |
11
|
|
|
use zcrmsdk\crm\exception\ZCRMException; |
12
|
|
|
|
13
|
|
|
/** |
14
|
|
|
* This class is in charge of synchronizing one table of your database with Zoho records. |
15
|
|
|
*/ |
16
|
|
|
class ZohoDatabaseCopier |
17
|
|
|
{ |
18
|
|
|
/** |
19
|
|
|
* @var Connection |
20
|
|
|
*/ |
21
|
|
|
private $connection; |
22
|
|
|
|
23
|
|
|
private $prefix; |
24
|
|
|
|
25
|
|
|
/** |
26
|
|
|
* @var ZohoChangeListener[] |
27
|
|
|
*/ |
28
|
|
|
private $listeners; |
29
|
|
|
|
30
|
|
|
/** |
31
|
|
|
* @var LoggerInterface |
32
|
|
|
*/ |
33
|
|
|
private $logger; |
34
|
|
|
|
35
|
|
|
/** |
36
|
|
|
* @var LocalChangesTracker |
37
|
|
|
*/ |
38
|
|
|
private $localChangesTracker; |
39
|
|
|
/** |
40
|
|
|
* @var ZohoUserService |
41
|
|
|
*/ |
42
|
|
|
private $zohoUserService; |
43
|
|
|
|
44
|
|
|
/** |
45
|
|
|
* ZohoDatabaseCopier constructor. |
46
|
|
|
* |
47
|
|
|
* @param Connection $connection |
48
|
|
|
* @param string $prefix Prefix for the table name in DB |
49
|
|
|
* @param ZohoChangeListener[] $listeners The list of listeners called when a record is inserted or updated. |
50
|
|
|
*/ |
51
|
|
View Code Duplication |
public function __construct(Connection $connection, ZohoUserService $zohoUserService, $prefix = 'zoho_', array $listeners = [], LoggerInterface $logger = null) |
|
|
|
|
52
|
|
|
{ |
53
|
|
|
$this->connection = $connection; |
54
|
|
|
$this->prefix = $prefix; |
55
|
|
|
$this->listeners = $listeners; |
56
|
|
|
if ($logger === null) { |
57
|
|
|
$this->logger = new NullLogger(); |
58
|
|
|
} else { |
59
|
|
|
$this->logger = $logger; |
60
|
|
|
} |
61
|
|
|
$this->localChangesTracker = new LocalChangesTracker($connection, $this->logger); |
62
|
|
|
$this->zohoUserService = $zohoUserService; |
63
|
|
|
} |
64
|
|
|
|
65
|
|
|
/** |
66
|
|
|
* @throws \Doctrine\DBAL\DBALException |
67
|
|
|
* @throws \Doctrine\DBAL\Schema\SchemaException |
68
|
|
|
* @throws \Wabel\Zoho\CRM\Exception\ZohoCRMResponseException |
69
|
|
|
*/ |
70
|
|
|
public function fetchUserFromZoho() |
71
|
|
|
{ |
72
|
|
|
$users = $this->zohoUserService->getUsers(); |
73
|
|
|
$tableName = 'users'; |
74
|
|
|
$this->logger->info('Fetched ' . count($users) . ' records'); |
75
|
|
|
|
76
|
|
|
$table = $this->connection->getSchemaManager()->createSchema()->getTable($tableName); |
77
|
|
|
|
78
|
|
|
$select = $this->connection->prepare('SELECT * FROM ' . $tableName . ' WHERE id = :id'); |
79
|
|
|
|
80
|
|
|
$this->connection->beginTransaction(); |
81
|
|
|
foreach ($users as $user) { |
|
|
|
|
82
|
|
|
$data = []; |
83
|
|
|
$types = []; |
84
|
|
|
foreach ($table->getColumns() as $column) { |
85
|
|
|
if ($column->getName() === 'id') { |
86
|
|
|
continue; |
87
|
|
|
} else { |
88
|
|
|
$fieldMethod = ZohoDatabaseHelper::getUserMethodNameFromField($column->getName()); |
89
|
|
|
if (method_exists($user, $fieldMethod) |
90
|
|
|
&& (!is_array($user->{$fieldMethod}()) && !is_object($user->{$fieldMethod}())) |
91
|
|
|
) { |
92
|
|
|
$data[$column->getName()] = $user->{$fieldMethod}(); |
93
|
|
|
} elseif (method_exists($user, $fieldMethod) |
94
|
|
|
&& is_array($user->{$fieldMethod}()) |
95
|
|
|
&& array_key_exists('name', $user->{$fieldMethod}()) |
96
|
|
|
&& array_key_exists('id', $user->{$fieldMethod}()) |
97
|
|
|
) { |
98
|
|
|
$data[$column->getName()] = $user->{$fieldMethod}()['name']; |
99
|
|
|
} elseif (method_exists($user, $fieldMethod) |
100
|
|
|
&& is_object($user->{$fieldMethod}()) && method_exists($user->{$fieldMethod}(), 'getName') |
101
|
|
|
) { |
102
|
|
|
$object = $user->{$fieldMethod}(); |
103
|
|
|
$data[$column->getName()] = $object->getName(); |
104
|
|
|
} elseif ($column->getName() === 'Currency') { |
105
|
|
|
//Todo: Do a pull request about \ZCRMUser::geCurrency() to \ZCRMUser::getCurrency() |
106
|
|
|
$data[$column->getName()] = $user->geCurrency(); |
107
|
|
|
} else { |
108
|
|
|
continue; |
109
|
|
|
} |
110
|
|
|
} |
111
|
|
|
} |
112
|
|
|
$select->execute(['id' => $user->getId()]); |
113
|
|
|
$result = $select->fetch(\PDO::FETCH_ASSOC); |
114
|
|
|
if ($result === false && $data) { |
|
|
|
|
115
|
|
|
$this->logger->debug("Inserting record with ID '" . $user->getId() . "'."); |
116
|
|
|
|
117
|
|
|
$data['id'] = $user->getId(); |
118
|
|
|
$types['id'] = 'string'; |
119
|
|
|
|
120
|
|
|
$this->connection->insert($tableName, $data, $types); |
121
|
|
|
} elseif ($data) { |
|
|
|
|
122
|
|
|
$this->logger->debug("Updating record with ID '" . $user->getId() . "'."); |
123
|
|
|
$identifier = ['id' => $user->getId()]; |
124
|
|
|
$types['id'] = 'string'; |
125
|
|
|
$this->connection->update($tableName, $data, $identifier, $types); |
126
|
|
|
} |
127
|
|
|
|
128
|
|
|
} |
129
|
|
|
$this->connection->commit(); |
130
|
|
|
} |
131
|
|
|
|
132
|
|
|
/** |
133
|
|
|
* @param AbstractZohoDao $dao |
134
|
|
|
* @param bool $incrementalSync Whether we synchronize only the modified files or everything. |
135
|
|
|
* @param bool $twoWaysSync |
136
|
|
|
* @param bool $throwErrors |
137
|
|
|
* |
138
|
|
|
* @throws \Doctrine\DBAL\DBALException |
139
|
|
|
* @throws \Doctrine\DBAL\Schema\SchemaException |
140
|
|
|
* @throws \Wabel\Zoho\CRM\Exception\ZohoCRMResponseException |
141
|
|
|
*/ |
142
|
|
|
public function fetchFromZoho(AbstractZohoDao $dao, $incrementalSync = true, $twoWaysSync = true, $throwErrors = true) |
143
|
|
|
{ |
144
|
|
|
$tableName = ZohoDatabaseHelper::getTableName($dao, $this->prefix); |
145
|
|
|
|
146
|
|
|
$totalRecords = 0; |
147
|
|
|
$totalRecordsDeleted = 0; |
148
|
|
|
try { |
149
|
|
|
if ($incrementalSync) { |
150
|
|
|
// Let's get the last modification date: |
151
|
|
|
$tableDetail = $this->connection->getSchemaManager()->listTableDetails($tableName); |
152
|
|
|
$lastActivityTime = null; |
153
|
|
|
if ($tableDetail->hasColumn('modifiedTime')) { |
154
|
|
|
$lastActivityTime = $this->connection->fetchColumn('SELECT MAX(modifiedTime) FROM ' . $tableName); |
155
|
|
|
} |
156
|
|
|
if (!$lastActivityTime && $tableDetail->hasColumn('createdTime')) { |
157
|
|
|
$lastActivityTime = $this->connection->fetchColumn('SELECT MAX(createdTime) FROM ' . $tableName); |
158
|
|
|
} |
159
|
|
|
|
160
|
|
|
if ($lastActivityTime !== null) { |
161
|
|
|
$lastActivityTime = new \DateTime($lastActivityTime, new \DateTimeZone($dao->getZohoClient()->getTimezone())); |
162
|
|
|
// Let's add one second to the last activity time (otherwise, we are fetching again the last record in DB). |
163
|
|
|
$lastActivityTime->add(new \DateInterval('PT1S')); |
164
|
|
|
} |
165
|
|
|
|
166
|
|
|
if ($lastActivityTime) { |
167
|
|
|
$this->logger->info('Incremental copy from ' . $lastActivityTime->format(\DateTime::ATOM) . ' started'); |
168
|
|
|
} else { |
169
|
|
|
$this->logger->info('Incremental copy started'); |
170
|
|
|
} |
171
|
|
|
|
172
|
|
|
$this->logger->info('Fetching the records to insert/update...'); |
173
|
|
|
$records = $dao->getRecords(null, null, null, $lastActivityTime); |
174
|
|
|
$totalRecords = count($records); |
175
|
|
|
$this->logger->debug($totalRecords . ' records fetched.'); |
176
|
|
|
$this->logger->info('Fetching the records to delete...'); |
177
|
|
|
$deletedRecords = $dao->getDeletedRecordIds($lastActivityTime); |
178
|
|
|
$totalRecordsDeleted = count($deletedRecords); |
179
|
|
|
$this->logger->debug($totalRecordsDeleted . ' records fetched.'); |
180
|
|
|
} else { |
181
|
|
|
$this->logger->info('Full copy started'); |
182
|
|
|
$this->logger->info('Fetching the records to insert/update...'); |
183
|
|
|
$records = $dao->getRecords(); |
184
|
|
|
$totalRecords = count($records); |
185
|
|
|
$this->logger->debug($totalRecords . ' records fetched.'); |
186
|
|
|
$deletedRecords = []; |
187
|
|
|
} |
188
|
|
|
} catch (ZCRMException $exception) { |
|
|
|
|
189
|
|
|
$this->logger->error('Error when getting records for module ' . $tableName . ': ' . $exception->getMessage(), [ |
190
|
|
|
'exception' => $exception |
191
|
|
|
]); |
192
|
|
|
if ($throwErrors) { |
193
|
|
|
throw $exception; |
194
|
|
|
} |
195
|
|
|
return; |
196
|
|
|
} |
197
|
|
|
$this->logger->info('Inserting/updating ' . count($records) . ' records into table ' . $tableName . '...'); |
198
|
|
|
|
199
|
|
|
$table = $this->connection->getSchemaManager()->createSchema()->getTable($tableName); |
200
|
|
|
|
201
|
|
|
$select = $this->connection->prepare('SELECT * FROM ' . $tableName . ' WHERE id = :id'); |
202
|
|
|
|
203
|
|
|
$this->connection->beginTransaction(); |
204
|
|
|
|
205
|
|
|
$recordsModificationCounts = [ |
206
|
|
|
'insert' => 0, |
207
|
|
|
'update' => 0, |
208
|
|
|
'delete' => 0, |
209
|
|
|
]; |
210
|
|
|
|
211
|
|
|
$logOffset = $totalRecords >= 500 ? 100 : 50; |
212
|
|
|
$processedRecords = 0; |
213
|
|
|
foreach ($records as $record) { |
214
|
|
View Code Duplication |
if (($processedRecords % $logOffset) === 0) { |
|
|
|
|
215
|
|
|
$this->logger->info($processedRecords . '/' . $totalRecords . ' records processed'); |
216
|
|
|
} |
217
|
|
|
++$processedRecords; |
218
|
|
|
$data = []; |
219
|
|
|
$types = []; |
220
|
|
|
foreach ($table->getColumns() as $column) { |
221
|
|
|
if (in_array($column->getName(), ['id', 'uid'])) { |
222
|
|
|
continue; |
223
|
|
|
} else { |
224
|
|
|
$field = $dao->getFieldFromFieldName($column->getName()); |
|
|
|
|
225
|
|
|
if (!$field) { |
226
|
|
|
continue; |
227
|
|
|
} |
228
|
|
|
$getterName = $field->getGetter(); |
229
|
|
|
$dataValue = $record->$getterName(); |
230
|
|
|
$finalFieldData = null; |
|
|
|
|
231
|
|
|
if ($dataValue instanceof ZCRMRecord) { |
|
|
|
|
232
|
|
|
$finalFieldData = $dataValue->getEntityId(); |
233
|
|
|
} elseif (is_array($dataValue)) { |
234
|
|
|
$finalFieldData = implode(';', $dataValue); |
235
|
|
|
} else { |
236
|
|
|
$finalFieldData = $dataValue; |
237
|
|
|
} |
238
|
|
|
$data[$column->getName()] = $finalFieldData; |
239
|
|
|
$types[$column->getName()] = $column->getType()->getName(); |
240
|
|
|
} |
241
|
|
|
} |
242
|
|
|
|
243
|
|
|
$select->execute(['id' => $record->getZohoId()]); |
244
|
|
|
$result = $select->fetch(\PDO::FETCH_ASSOC); |
245
|
|
|
if ($result === false) { |
246
|
|
|
$this->logger->debug("Inserting record with ID '" . $record->getZohoId() . "'..."); |
247
|
|
|
|
248
|
|
|
$data['id'] = $record->getZohoId(); |
249
|
|
|
$types['id'] = 'string'; |
250
|
|
|
|
251
|
|
|
$recordsModificationCounts['insert'] += $this->connection->insert($tableName, $data, $types); |
252
|
|
|
|
253
|
|
|
foreach ($this->listeners as $listener) { |
254
|
|
|
$listener->onInsert($data, $dao); |
255
|
|
|
} |
256
|
|
|
} else { |
257
|
|
|
$this->logger->debug("Updating record with ID '" . $record->getZohoId() . "'..."); |
258
|
|
|
$identifier = ['id' => $record->getZohoId()]; |
259
|
|
|
$types['id'] = 'string'; |
260
|
|
|
|
261
|
|
|
$recordsModificationCounts['update'] += $this->connection->update($tableName, $data, $identifier, $types); |
262
|
|
|
|
263
|
|
|
// Let's add the id for the update trigger |
264
|
|
|
$data['id'] = $record->getZohoId(); |
265
|
|
|
foreach ($this->listeners as $listener) { |
266
|
|
|
$listener->onUpdate($data, $result, $dao); |
267
|
|
|
} |
268
|
|
|
} |
269
|
|
|
} |
270
|
|
|
|
271
|
|
|
$this->logger->info('Deleting ' . count($deletedRecords) . ' records into table ' . $tableName . '...'); |
272
|
|
|
$sqlStatementUid = 'select uid from ' . $this->connection->quoteIdentifier($tableName) . ' where id = :id'; |
273
|
|
|
$processedRecords = 0; |
274
|
|
|
foreach ($deletedRecords as $deletedRecord) { |
275
|
|
View Code Duplication |
if (($processedRecords % $logOffset) === 0) { |
|
|
|
|
276
|
|
|
$this->logger->info($processedRecords . '/' . $totalRecords . ' records processed'); |
277
|
|
|
} |
278
|
|
|
++$processedRecords; |
279
|
|
|
$this->logger->debug("Deleting record with ID '" . $deletedRecord->getEntityId() . "'..."); |
280
|
|
|
$uid = $this->connection->fetchColumn($sqlStatementUid, ['id' => $deletedRecord->getEntityId()]); |
281
|
|
|
$recordsModificationCounts['delete'] += $this->connection->delete($tableName, ['id' => $deletedRecord->getEntityId()]); |
282
|
|
|
if ($twoWaysSync) { |
283
|
|
|
// TODO: we could detect if there are changes to be updated to the server and try to warn with a log message |
284
|
|
|
// Also, let's remove the newly created field (because of the trigger) to avoid looping back to Zoho |
285
|
|
|
$this->connection->delete('local_delete', ['table_name' => $tableName, 'id' => $deletedRecord->getEntityId()]); |
286
|
|
|
$this->connection->delete('local_update', ['table_name' => $tableName, 'uid' => $uid]); |
287
|
|
|
} |
288
|
|
|
} |
289
|
|
|
|
290
|
|
|
$this->logger->notice(sprintf('Copy finished with %d item(s) inserted, %d item(s) updated and %d item(s) deleted.', |
291
|
|
|
$recordsModificationCounts['insert'], |
292
|
|
|
$recordsModificationCounts['update'], |
293
|
|
|
$recordsModificationCounts['delete'] |
294
|
|
|
)); |
295
|
|
|
|
296
|
|
|
$this->connection->commit(); |
297
|
|
|
} |
298
|
|
|
} |
299
|
|
|
|
Duplicated code is one of the most pungent code smells. If you need to duplicate the same code in three or more different places, we strongly encourage you to look into extracting the code into a single class or operation.
You can also find more detailed suggestions in the “Code” section of your repository.