1
|
|
|
<?php |
2
|
|
|
|
3
|
|
|
declare(strict_types=1); |
4
|
|
|
|
5
|
|
|
/** |
6
|
|
|
* tubee.io |
7
|
|
|
* |
8
|
|
|
* @copyright Copryright (c) 2017-2019 gyselroth GmbH (https://gyselroth.com) |
9
|
|
|
* @license GPL-3.0 https://opensource.org/licenses/GPL-3.0 |
10
|
|
|
*/ |
11
|
|
|
|
12
|
|
|
namespace Tubee\Async; |
13
|
|
|
|
14
|
|
|
use MongoDB\BSON\UTCDateTime; |
15
|
|
|
use MongoDB\Database; |
16
|
|
|
use Monolog\Handler\MongoDBHandler; |
17
|
|
|
use Monolog\Logger; |
18
|
|
|
use Psr\Log\LoggerInterface; |
19
|
|
|
use TaskScheduler\AbstractJob; |
20
|
|
|
use TaskScheduler\Scheduler; |
21
|
|
|
use Tubee\Collection\CollectionInterface; |
22
|
|
|
use Tubee\Endpoint\EndpointInterface; |
23
|
|
|
use Tubee\Job\Validator as JobValidator; |
24
|
|
|
use Tubee\ResourceNamespace\Factory as ResourceNamespaceFactory; |
25
|
|
|
use Zend\Mail\Message; |
26
|
|
|
|
27
|
|
|
class Sync extends AbstractJob |
28
|
|
|
{ |
29
|
|
|
/** |
30
|
|
|
* ResourceNamespace factory. |
31
|
|
|
* |
32
|
|
|
* @var ResourceNamespaceFactory |
33
|
|
|
*/ |
34
|
|
|
protected $namespace_factory; |
35
|
|
|
|
36
|
|
|
/** |
37
|
|
|
* Scheduler. |
38
|
|
|
* |
39
|
|
|
* @var Scheduler |
40
|
|
|
*/ |
41
|
|
|
protected $scheduler; |
42
|
|
|
|
43
|
|
|
/** |
44
|
|
|
* Logger. |
45
|
|
|
* |
46
|
|
|
* @var LoggerInterface |
47
|
|
|
*/ |
48
|
|
|
protected $logger; |
49
|
|
|
|
50
|
|
|
/** |
51
|
|
|
* Error count. |
52
|
|
|
* |
53
|
|
|
* @var int |
54
|
|
|
*/ |
55
|
|
|
protected $error_count = 0; |
56
|
|
|
|
57
|
|
|
/** |
58
|
|
|
* Start timestamp. |
59
|
|
|
* |
60
|
|
|
* @var UTCDateTime |
61
|
|
|
*/ |
62
|
|
|
protected $timestamp; |
63
|
|
|
|
64
|
|
|
/** |
65
|
|
|
* Database. |
66
|
|
|
* |
67
|
|
|
* @var Database |
68
|
|
|
*/ |
69
|
|
|
protected $db; |
70
|
|
|
|
71
|
|
|
/** |
72
|
|
|
* Sync. |
73
|
|
|
*/ |
74
|
|
|
public function __construct(ResourceNamespaceFactory $namespace_factory, Database $db, Scheduler $scheduler, LoggerInterface $logger) |
75
|
|
|
{ |
76
|
|
|
$this->namespace_factory = $namespace_factory; |
77
|
|
|
$this->scheduler = $scheduler; |
78
|
|
|
$this->logger = $logger; |
79
|
|
|
$this->db = $db; |
80
|
|
|
$this->timestamp = new UTCDateTime(); |
81
|
|
|
} |
82
|
|
|
|
83
|
|
|
/** |
84
|
|
|
* Start job. |
85
|
|
|
*/ |
86
|
|
|
public function start(): bool |
87
|
|
|
{ |
88
|
|
|
$procs = []; |
89
|
|
|
$namespace = $this->namespace_factory->getOne($this->data['namespace']); |
90
|
|
|
|
91
|
|
|
$filter = in_array('*', $this->data['collections']) ? [] : ['name' => ['$in' => $this->data['collections']]]; |
92
|
|
|
foreach ($namespace->getCollections($filter) as $collection) { |
93
|
|
|
$filter = in_array('*', $this->data['endpoints']) ? [] : ['name' => ['$in' => $this->data['endpoints']]]; |
94
|
|
|
foreach ($collection->getEndpoints($filter) as $endpoint) { |
95
|
|
|
if ($this->data['loadbalance'] === true) { |
96
|
|
|
$data = $this->data; |
97
|
|
|
$data = array_merge($data, [ |
98
|
|
|
'endpoints' => [$endpoint->getName()], |
99
|
|
|
'parent' => $this->getId(), |
100
|
|
|
'loadbalance' => false, |
101
|
|
|
]); |
102
|
|
|
|
103
|
|
|
$procs[] = $this->scheduler->addJob(self::class, $data); |
104
|
|
|
} else { |
105
|
|
|
$this->setupLogger(JobValidator::LOG_LEVELS[$this->data['log_level']], [ |
106
|
|
|
'process' => (string) $this->getId(), |
107
|
|
|
'parent' => isset($this->data['parent']) ? (string) $this->data['parent'] : null, |
108
|
|
|
'start' => $this->timestamp, |
109
|
|
|
'namespace' => $namespace->getName(), |
110
|
|
|
'collection' => $collection->getName(), |
111
|
|
|
'endpoint' => $endpoint->getName(), |
112
|
|
|
]); |
113
|
|
|
|
114
|
|
|
if ($endpoint->getType() === EndpointInterface::TYPE_SOURCE) { |
115
|
|
|
$this->import($collection, $this->data['filter'], ['name' => $endpoint->getName()], $this->data['simulate'], $this->data['ignore']); |
116
|
|
|
} elseif ($endpoint->getType() === EndpointInterface::TYPE_DESTINATION) { |
117
|
|
|
$this->export($collection, $this->data['filter'], ['name' => $endpoint->getName()], $this->data['simulate'], $this->data['ignore']); |
118
|
|
|
} |
119
|
|
|
|
120
|
|
|
$this->logger->popProcessor(); |
121
|
|
|
$this->notify(); |
122
|
|
|
} |
123
|
|
|
} |
124
|
|
|
} |
125
|
|
|
|
126
|
|
|
foreach ($procs as $process) { |
127
|
|
|
$process->wait(); |
128
|
|
|
} |
129
|
|
|
|
130
|
|
|
return true; |
131
|
|
|
} |
132
|
|
|
|
133
|
|
|
/** |
134
|
|
|
* Set logger level. |
135
|
|
|
*/ |
136
|
|
|
protected function setupLogger(int $level, array $context): bool |
137
|
|
|
{ |
138
|
|
|
if (isset($this->data['job'])) { |
139
|
|
|
$context['job'] = (string) $this->data['job']; |
140
|
|
|
} |
141
|
|
|
|
142
|
|
|
foreach ($this->logger->getHandlers() as $handler) { |
143
|
|
|
if ($handler instanceof MongoDBHandler) { |
|
|
|
|
144
|
|
|
$handler->setLevel($level); |
145
|
|
|
|
146
|
|
|
$this->logger->pushProcessor(function ($record) use ($context) { |
147
|
|
|
$record['context'] = array_merge($record['context'], $context); |
148
|
|
|
|
149
|
|
|
return $record; |
150
|
|
|
}); |
151
|
|
|
} |
152
|
|
|
} |
153
|
|
|
|
154
|
|
|
return true; |
155
|
|
|
} |
156
|
|
|
|
157
|
|
|
/** |
158
|
|
|
* {@inheritdoc} |
159
|
|
|
*/ |
160
|
|
|
protected function export(CollectionInterface $collection, array $filter = [], array $endpoints = [], bool $simulate = false, bool $ignore = false): bool |
161
|
|
|
{ |
162
|
|
|
$this->logger->info('start export to destination endpoints from data type ['.$collection->getIdentifier().']', [ |
163
|
|
|
'category' => get_class($this), |
164
|
|
|
]); |
165
|
|
|
|
166
|
|
|
$endpoints = iterator_to_array($collection->getDestinationEndpoints($endpoints)); |
167
|
|
|
|
168
|
|
|
foreach ($endpoints as $ep) { |
169
|
|
|
if ($ep->flushRequired()) { |
170
|
|
|
$ep->flush($simulate); |
171
|
|
|
} |
172
|
|
|
|
173
|
|
|
$ep->setup($simulate); |
174
|
|
|
} |
175
|
|
|
|
176
|
|
|
foreach ($collection->getObjects($filter) as $id => $object) { |
177
|
|
|
$this->logger->debug('process write for object ['.(string) $id.'] from data type ['.$collection->getIdentifier().']', [ |
178
|
|
|
'category' => get_class($this), |
179
|
|
|
]); |
180
|
|
|
|
181
|
|
|
foreach ($endpoints as $ep) { |
182
|
|
|
$this->logger->info('start write onto destination endpoint ['.$ep->getIdentifier().']', [ |
183
|
|
|
'category' => get_class($this), |
184
|
|
|
]); |
185
|
|
|
|
186
|
|
|
try { |
187
|
|
View Code Duplication |
foreach ($ep->getWorkflows() as $workflow) { |
|
|
|
|
188
|
|
|
$this->logger->debug('start workflow ['.$workflow->getIdentifier().'] for the current object', [ |
189
|
|
|
'category' => get_class($this), |
190
|
|
|
]); |
191
|
|
|
|
192
|
|
|
if ($workflow->export($object, $this->timestamp, $simulate) === true) { |
193
|
|
|
$this->logger->debug('workflow ['.$workflow->getIdentifier().'] executed for the current object, skip any further workflows for the current data object', [ |
194
|
|
|
'category' => get_class($this), |
195
|
|
|
]); |
196
|
|
|
|
197
|
|
|
continue 2; |
198
|
|
|
} |
199
|
|
|
} |
200
|
|
|
|
201
|
|
|
$this->logger->debug('no workflow were executed within endpoint ['.$ep->getIdentifier().'] for the current object', [ |
202
|
|
|
'category' => get_class($this), |
203
|
|
|
]); |
204
|
|
|
} catch (\Exception $e) { |
205
|
|
|
++$this->error_count; |
206
|
|
|
|
207
|
|
|
$this->logger->error('failed write object to destination endpoint ['.$ep->getIdentifier().']', [ |
208
|
|
|
'category' => get_class($this), |
209
|
|
|
'object' => $object->getId(), |
210
|
|
|
'exception' => $e, |
211
|
|
|
]); |
212
|
|
|
|
213
|
|
|
if ($ignore === false) { |
214
|
|
|
return false; |
215
|
|
|
} |
216
|
|
|
} |
217
|
|
|
} |
218
|
|
|
} |
219
|
|
|
|
220
|
|
View Code Duplication |
if (count($endpoints) === 0) { |
|
|
|
|
221
|
|
|
$this->logger->warning('no destination endpoint available for collection ['.$collection->getIdentifier().'], skip export', [ |
222
|
|
|
'category' => get_class($this), |
223
|
|
|
]); |
224
|
|
|
|
225
|
|
|
return true; |
226
|
|
|
} |
227
|
|
|
|
228
|
|
|
foreach ($endpoints as $n => $ep) { |
229
|
|
|
$ep->shutdown($simulate); |
230
|
|
|
} |
231
|
|
|
|
232
|
|
|
return true; |
233
|
|
|
} |
234
|
|
|
|
235
|
|
|
/** |
236
|
|
|
* {@inheritdoc} |
237
|
|
|
*/ |
238
|
|
|
protected function import(CollectionInterface $collection, array $filter = [], array $endpoints = [], bool $simulate = false, bool $ignore = false): bool |
239
|
|
|
{ |
240
|
|
|
$this->logger->info('start import from source endpoints into data type ['.$collection->getIdentifier().']', [ |
241
|
|
|
'category' => get_class($this), |
242
|
|
|
]); |
243
|
|
|
|
244
|
|
|
$endpoints = $collection->getSourceEndpoints($endpoints); |
245
|
|
|
|
246
|
|
|
foreach ($endpoints as $ep) { |
247
|
|
|
$this->logger->info('start import from source endpoint ['.$ep->getIdentifier().']', [ |
248
|
|
|
'category' => get_class($this), |
249
|
|
|
]); |
250
|
|
|
|
251
|
|
|
if ($ep->flushRequired()) { |
252
|
|
|
$collection->flush($simulate); |
253
|
|
|
} |
254
|
|
|
|
255
|
|
|
$ep->setup($simulate); |
256
|
|
|
|
257
|
|
|
foreach ($ep->getAll($filter) as $id => $object) { |
258
|
|
|
$this->logger->debug('process import for object ['.$id.'] into data type ['.$collection->getIdentifier().']', [ |
259
|
|
|
'category' => get_class($this), |
260
|
|
|
'attributes' => $object, |
261
|
|
|
]); |
262
|
|
|
|
263
|
|
|
try { |
264
|
|
View Code Duplication |
foreach ($ep->getWorkflows() as $workflow) { |
|
|
|
|
265
|
|
|
$this->logger->debug('start workflow ['.$workflow->getIdentifier().'] for the current object', [ |
266
|
|
|
'category' => get_class($this), |
267
|
|
|
]); |
268
|
|
|
|
269
|
|
|
if ($workflow->import($collection, $object, $this->timestamp, $simulate) === true) { |
270
|
|
|
$this->logger->debug('workflow ['.$workflow->getIdentifier().'] executed for the current object, skip any further workflows for the current data object', [ |
271
|
|
|
'category' => get_class($this), |
272
|
|
|
]); |
273
|
|
|
|
274
|
|
|
continue 2; |
275
|
|
|
} |
276
|
|
|
} |
277
|
|
|
|
278
|
|
|
$this->logger->debug('no workflow were executed within endpoint ['.$ep->getIdentifier().'] for the current object', [ |
279
|
|
|
'category' => get_class($this), |
280
|
|
|
]); |
281
|
|
|
} catch (\Exception $e) { |
282
|
|
|
++$this->error_count; |
283
|
|
|
|
284
|
|
|
$this->logger->error('failed import data object from source endpoint ['.$ep->getIdentifier().']', [ |
285
|
|
|
'category' => get_class($this), |
286
|
|
|
'namespace' => $collection->getResourceNamespace()->getName(), |
287
|
|
|
'collection' => $collection->getName(), |
288
|
|
|
'endpoint' => $ep->getName(), |
289
|
|
|
'exception' => $e, |
290
|
|
|
]); |
291
|
|
|
|
292
|
|
|
if ($ignore === false) { |
293
|
|
|
return false; |
294
|
|
|
} |
295
|
|
|
} |
296
|
|
|
} |
297
|
|
|
|
298
|
|
|
$this->garbageCollector($collection, $ep, $simulate, $ignore); |
299
|
|
|
$ep->shutdown($simulate); |
300
|
|
|
} |
301
|
|
|
|
302
|
|
View Code Duplication |
if ($endpoints->getReturn() === 0) { |
|
|
|
|
303
|
|
|
$this->logger->warning('no source endpoint available for collection ['.$collection->getIdentifier().'], skip import', [ |
304
|
|
|
'category' => get_class($this), |
305
|
|
|
]); |
306
|
|
|
|
307
|
|
|
return true; |
308
|
|
|
} |
309
|
|
|
|
310
|
|
|
return true; |
311
|
|
|
} |
312
|
|
|
|
313
|
|
|
/** |
314
|
|
|
* Garbage. |
315
|
|
|
*/ |
316
|
|
|
protected function garbageCollector(CollectionInterface $collection, EndpointInterface $endpoint, bool $simulate = false, bool $ignore = false): bool |
317
|
|
|
{ |
318
|
|
|
$this->logger->info('start garbage collector workflows from data type ['.$collection->getIdentifier().']', [ |
319
|
|
|
'category' => get_class($this), |
320
|
|
|
]); |
321
|
|
|
|
322
|
|
|
$filter = [ |
323
|
|
|
'$or' => [ |
324
|
|
|
[ |
325
|
|
|
'endpoints.'.$endpoint->getName().'.last_sync' => [ |
326
|
|
|
'$lte' => $this->timestamp, |
327
|
|
|
], |
328
|
|
|
], |
329
|
|
|
], |
330
|
|
|
]; |
331
|
|
|
|
332
|
|
|
$this->db->{$collection->getCollection()}->updateMany($filter, ['$set' => [ |
333
|
|
|
'endpoints.'.$endpoint->getName().'.garbage' => true, |
334
|
|
|
]]); |
335
|
|
|
|
336
|
|
|
foreach ($collection->getObjects($filter, false) as $id => $object) { |
337
|
|
|
$this->logger->debug('process garbage workflows for garbage object ['.$id.'] from data type ['.$collection->getIdentifier().']', [ |
338
|
|
|
'category' => get_class($this), |
339
|
|
|
]); |
340
|
|
|
|
341
|
|
|
try { |
342
|
|
View Code Duplication |
foreach ($endpoint->getWorkflows() as $workflow) { |
|
|
|
|
343
|
|
|
$this->logger->debug('start workflow ['.$workflow->getIdentifier().'] for the current garbage object', [ |
344
|
|
|
'category' => get_class($this), |
345
|
|
|
]); |
346
|
|
|
|
347
|
|
|
if ($workflow->cleanup($object, $this->timestamp, $simulate) === true) { |
348
|
|
|
$this->logger->debug('workflow ['.$workflow->getIdentifier().'] executed for the current garbage object, skip any further workflows for the current garbage object', [ |
349
|
|
|
'category' => get_class($this), |
350
|
|
|
]); |
351
|
|
|
|
352
|
|
|
break; |
353
|
|
|
} |
354
|
|
|
} |
355
|
|
|
} catch (\Exception $e) { |
356
|
|
|
$this->logger->error('failed execute garbage collector for object ['.$id.'] from collection ['.$collection->getIdentifier().']', [ |
357
|
|
|
'category' => get_class($this), |
358
|
|
|
'exception' => $e, |
359
|
|
|
]); |
360
|
|
|
|
361
|
|
|
if ($ignore === false) { |
362
|
|
|
return false; |
363
|
|
|
} |
364
|
|
|
} |
365
|
|
|
} |
366
|
|
|
|
367
|
|
|
return true; |
368
|
|
|
} |
369
|
|
|
|
370
|
|
|
/** |
371
|
|
|
* Notify. |
372
|
|
|
*/ |
373
|
|
|
protected function notify(): bool |
374
|
|
|
{ |
375
|
|
View Code Duplication |
if ($this->data['notification']['enabled'] === false) { |
|
|
|
|
376
|
|
|
$this->logger->debug('skip notifiaction for process ['.$this->getId().'], notification is disabled', [ |
377
|
|
|
'category' => get_class($this), |
378
|
|
|
]); |
379
|
|
|
|
380
|
|
|
return false; |
381
|
|
|
} |
382
|
|
|
|
383
|
|
View Code Duplication |
if (count($this->data['notification']['receiver']) === 0) { |
|
|
|
|
384
|
|
|
$this->logger->debug('skip notifiaction for process ['.$this->getId().'], no receiver configured', [ |
385
|
|
|
'category' => get_class($this), |
386
|
|
|
]); |
387
|
|
|
} |
388
|
|
|
|
389
|
|
|
$iso = $this->timestamp->toDateTime()->format('c'); |
390
|
|
|
|
391
|
|
|
if ($this->error_count === 0) { |
392
|
|
|
$subject = "Job ended with $this->error_count errors"; |
393
|
|
|
$body = "Hi there\n\nThe sync process ".(string) $this->getId()." started at $iso ended with $this->error_count errors."; |
394
|
|
|
} else { |
395
|
|
|
$subject = 'Good job! The job finished with no errors'; |
396
|
|
|
$body = "Hi there\n\nThe sync process ".(string) $this->getId()." started at $iso finished with no errors."; |
397
|
|
|
} |
398
|
|
|
|
399
|
|
|
$mail = (new Message()) |
400
|
|
|
->setSubject($subject) |
401
|
|
|
->setBody($body) |
402
|
|
|
->setEncoding('UTF-8'); |
403
|
|
|
|
404
|
|
|
foreach ($this->data['notification']['receiver'] as $receiver) { |
405
|
|
|
$mail->setTo($receiver); |
406
|
|
|
|
407
|
|
|
$this->logger->debug('send process notification ['.$this->getId().'] to ['.$receiver.']', [ |
408
|
|
|
'category' => get_class($this), |
409
|
|
|
]); |
410
|
|
|
|
411
|
|
|
$this->scheduler->addJob(Mail::class, $mail->toString(), [ |
412
|
|
|
Scheduler::OPTION_RETRY => 1, |
413
|
|
|
]); |
414
|
|
|
} |
415
|
|
|
|
416
|
|
|
return true; |
417
|
|
|
} |
418
|
|
|
} |
419
|
|
|
|
This error could be the result of:
1. Missing dependencies
PHP Analyzer uses your
composer.json
file (if available) to determine the dependencies of your project and to determine all the available classes and functions. It expects thecomposer.json
to be in the root folder of your repository.Are you sure this class is defined by one of your dependencies, or did you maybe not list a dependency in either the
require
orrequire-dev
section?2. Missing use statement
PHP does not complain about undefined classes in
ìnstanceof
checks. For example, the following PHP code will work perfectly fine:If you have not tested against this specific condition, such errors might go unnoticed.