1
|
|
|
<?php |
2
|
|
|
|
3
|
|
|
declare(strict_types=1); |
4
|
|
|
|
5
|
|
|
/** |
6
|
|
|
* tubee |
7
|
|
|
* |
8
|
|
|
* @copyright Copryright (c) 2017-2019 gyselroth GmbH (https://gyselroth.com) |
9
|
|
|
* @license GPL-3.0 https://opensource.org/licenses/GPL-3.0 |
10
|
|
|
*/ |
11
|
|
|
|
12
|
|
|
namespace Tubee\Async; |
13
|
|
|
|
14
|
|
|
use MongoDB\BSON\UTCDateTime; |
15
|
|
|
use MongoDB\Database; |
16
|
|
|
use Monolog\Handler\MongoDBHandler; |
17
|
|
|
use Monolog\Logger; |
18
|
|
|
use Psr\Log\LoggerInterface; |
19
|
|
|
use TaskScheduler\AbstractJob; |
20
|
|
|
use TaskScheduler\Scheduler; |
21
|
|
|
use Tubee\Collection\CollectionInterface; |
22
|
|
|
use Tubee\Endpoint\EndpointInterface; |
23
|
|
|
use Tubee\Helper; |
24
|
|
|
use Tubee\Log\MongoDBFormatter; |
25
|
|
|
use Tubee\ResourceNamespace\Factory as ResourceNamespaceFactory; |
26
|
|
|
use Tubee\ResourceNamespace\ResourceNamespaceInterface; |
27
|
|
|
use Zend\Mail\Message; |
28
|
|
|
|
29
|
|
|
class Sync extends AbstractJob |
30
|
|
|
{ |
31
|
|
|
/** |
32
|
|
|
* Log levels. |
33
|
|
|
*/ |
34
|
|
|
public const LOG_LEVELS = [ |
35
|
|
|
'debug' => Logger::DEBUG, |
36
|
|
|
'info' => Logger::INFO, |
37
|
|
|
'notice' => Logger::NOTICE, |
38
|
|
|
'warning' => Logger::WARNING, |
39
|
|
|
'error' => Logger::ERROR, |
40
|
|
|
'critical' => Logger::CRITICAL, |
41
|
|
|
'alert' => Logger::ALERT, |
42
|
|
|
'emergency' => Logger::EMERGENCY, |
43
|
|
|
]; |
44
|
|
|
|
45
|
|
|
/** |
46
|
|
|
* ResourceNamespace factory. |
47
|
|
|
* |
48
|
|
|
* @var ResourceNamespaceFactory |
49
|
|
|
*/ |
50
|
|
|
protected $namespace_factory; |
51
|
|
|
|
52
|
|
|
/** |
53
|
|
|
* Scheduler. |
54
|
|
|
* |
55
|
|
|
* @var Scheduler |
56
|
|
|
*/ |
57
|
|
|
protected $scheduler; |
58
|
|
|
|
59
|
|
|
/** |
60
|
|
|
* Logger. |
61
|
|
|
* |
62
|
|
|
* @var LoggerInterface |
63
|
|
|
*/ |
64
|
|
|
protected $logger; |
65
|
|
|
|
66
|
|
|
/** |
67
|
|
|
* Error count. |
68
|
|
|
* |
69
|
|
|
* @var int |
70
|
|
|
*/ |
71
|
|
|
protected $error_count = 0; |
72
|
|
|
|
73
|
|
|
/** |
74
|
|
|
* Start timestamp. |
75
|
|
|
* |
76
|
|
|
* @var UTCDateTime |
77
|
|
|
*/ |
78
|
|
|
protected $timestamp; |
79
|
|
|
|
80
|
|
|
/** |
81
|
|
|
* Database. |
82
|
|
|
* |
83
|
|
|
* @var Database |
84
|
|
|
*/ |
85
|
|
|
protected $db; |
86
|
|
|
|
87
|
|
|
/** |
88
|
|
|
* Process stack. |
89
|
|
|
* |
90
|
|
|
* @var array |
91
|
|
|
*/ |
92
|
|
|
protected $stack = []; |
93
|
|
|
|
94
|
|
|
/** |
95
|
|
|
* Resource namespace. |
96
|
|
|
* |
97
|
|
|
* @var ResourceNamespaceInterface |
98
|
|
|
*/ |
99
|
|
|
protected $namespace; |
100
|
|
|
|
101
|
|
|
/** |
102
|
|
|
* Sync. |
103
|
|
|
*/ |
104
|
5 |
|
public function __construct(ResourceNamespaceFactory $namespace_factory, Database $db, Scheduler $scheduler, LoggerInterface $logger) |
105
|
|
|
{ |
106
|
5 |
|
$this->namespace_factory = $namespace_factory; |
107
|
5 |
|
$this->scheduler = $scheduler; |
108
|
5 |
|
$this->logger = $logger; |
109
|
5 |
|
$this->db = $db; |
110
|
5 |
|
$this->timestamp = new UTCDateTime(); |
111
|
5 |
|
} |
112
|
|
|
|
113
|
|
|
/** |
114
|
|
|
* Start job. |
115
|
|
|
*/ |
116
|
5 |
|
public function start(): bool |
117
|
|
|
{ |
118
|
5 |
|
$this->namespace = $this->namespace_factory->getOne($this->data['namespace']); |
119
|
|
|
|
120
|
5 |
|
foreach ($this->data['collections'] as $collections) { |
121
|
5 |
|
$collections = (array) $collections; |
122
|
5 |
|
$filter = in_array('*', $collections) ? [] : ['name' => ['$in' => $collections]]; |
123
|
5 |
|
$collections = iterator_to_array($this->namespace->getCollections($filter)); |
124
|
|
|
|
125
|
5 |
|
$endpoints = $this->data['endpoints']; |
126
|
5 |
|
$this->loopCollections($collections, $endpoints); |
127
|
|
|
} |
128
|
|
|
|
129
|
5 |
|
$this->notify(); |
130
|
|
|
|
131
|
5 |
|
return true; |
132
|
|
|
} |
133
|
|
|
|
134
|
|
|
/** |
135
|
|
|
* Loop collections. |
136
|
|
|
*/ |
137
|
5 |
|
protected function loopCollections(array $collections, array $endpoints) |
138
|
|
|
{ |
139
|
5 |
|
foreach ($endpoints as $ep) { |
140
|
5 |
|
foreach ($collections as $collection) { |
141
|
5 |
|
$this->loopEndpoints($collection, $collections, (array) $ep, $endpoints); |
142
|
|
|
} |
143
|
|
|
|
144
|
5 |
|
$this->logger->debug('wait for child stack ['.count($this->stack).'] to be finished', [ |
145
|
5 |
|
'category' => get_class($this), |
146
|
|
|
]); |
147
|
|
|
|
148
|
5 |
|
foreach ($this->stack as $proc) { |
149
|
5 |
|
$proc->wait(); |
150
|
|
|
} |
151
|
|
|
} |
152
|
5 |
|
} |
153
|
|
|
|
154
|
|
|
/** |
155
|
|
|
* Loop endpoints. |
156
|
|
|
*/ |
157
|
5 |
|
protected function loopEndpoints(CollectionInterface $collection, array $all_collections, array $endpoints, array $all_endpoints) |
158
|
|
|
{ |
159
|
5 |
|
$filter = in_array('*', $endpoints) ? [] : ['name' => ['$in' => $endpoints]]; |
160
|
5 |
|
$endpoints = iterator_to_array($collection->getEndpoints($filter)); |
161
|
|
|
|
162
|
5 |
|
foreach ($endpoints as $endpoint) { |
163
|
5 |
|
if (count($all_endpoints) > 1 || count($all_collections) > 1) { |
164
|
3 |
|
$data = $this->data; |
165
|
3 |
|
$data = array_merge($data, [ |
166
|
3 |
|
'collections' => [$collection->getName()], |
167
|
3 |
|
'endpoints' => [$endpoint->getName()], |
168
|
3 |
|
'parent' => $this->getId(), |
169
|
|
|
]); |
170
|
|
|
|
171
|
3 |
|
$this->stack[] = $this->scheduler->addJob(self::class, $data); |
172
|
|
|
} else { |
173
|
5 |
|
$this->execute($collection, $endpoint); |
174
|
|
|
} |
175
|
|
|
} |
176
|
5 |
|
} |
177
|
|
|
|
178
|
|
|
/** |
179
|
|
|
* Execute. |
180
|
|
|
*/ |
181
|
2 |
|
protected function execute(CollectionInterface $collection, EndpointInterface $endpoint) |
182
|
|
|
{ |
183
|
2 |
|
$this->setupLogger(self::LOG_LEVELS[$this->data['log_level']], [ |
184
|
2 |
|
'process' => (string) $this->getId(), |
185
|
2 |
|
'parent' => isset($this->data['parent']) ? (string) $this->data['parent'] : null, |
186
|
2 |
|
'start' => $this->timestamp, |
187
|
2 |
|
'namespace' => $this->namespace->getName(), |
188
|
2 |
|
'collection' => $collection->getName(), |
189
|
2 |
|
'endpoint' => $endpoint->getName(), |
190
|
|
|
]); |
191
|
|
|
|
192
|
2 |
|
if ($endpoint->getType() === EndpointInterface::TYPE_SOURCE) { |
193
|
|
|
$this->import($collection, $this->getFilter(), ['name' => $endpoint->getName()], $this->data['simulate'], $this->data['ignore']); |
194
|
2 |
|
} elseif ($endpoint->getType() === EndpointInterface::TYPE_DESTINATION) { |
195
|
|
|
$this->export($collection, $this->getFilter(), ['name' => $endpoint->getName()], $this->data['simulate'], $this->data['ignore']); |
196
|
|
|
} else { |
197
|
2 |
|
$this->logger->warning('skip endpoint ['.$endpoint->getIdentifier().'], endpoint type is neither source nor destination', [ |
198
|
2 |
|
'category' => get_class($this), |
199
|
|
|
]); |
200
|
|
|
} |
201
|
|
|
|
202
|
2 |
|
$this->logger->popProcessor(); |
203
|
2 |
|
} |
204
|
|
|
|
205
|
|
|
/** |
206
|
|
|
* Decode filter. |
207
|
|
|
*/ |
208
|
|
|
protected function getFilter(): array |
209
|
|
|
{ |
210
|
|
|
if ($this->data['filter'] === null) { |
211
|
|
|
return []; |
212
|
|
|
} |
213
|
|
|
|
214
|
|
|
return (array) Helper::jsonDecode($this->data['filter']); |
215
|
|
|
} |
216
|
|
|
|
217
|
|
|
/** |
218
|
|
|
* Set logger level. |
219
|
|
|
*/ |
220
|
2 |
|
protected function setupLogger(int $level, array $context): bool |
221
|
|
|
{ |
222
|
2 |
|
if (isset($this->data['job'])) { |
223
|
|
|
$context['job'] = (string) $this->data['job']; |
224
|
|
|
} |
225
|
|
|
|
226
|
2 |
|
foreach ($this->logger->getHandlers() as $handler) { |
227
|
|
|
if ($handler instanceof MongoDBHandler) { |
|
|
|
|
228
|
|
|
$handler->setLevel($level); |
229
|
|
|
$handler->setFormatter(new MongoDBFormatter()); |
230
|
|
|
} |
231
|
|
|
} |
232
|
|
|
|
233
|
2 |
|
while (count($this->logger->getProcessors()) > 1) { |
234
|
|
|
$this->logger->popProcessor(); |
235
|
|
|
} |
236
|
|
|
|
237
|
2 |
|
$this->logger->pushProcessor(function ($record) use ($context) { |
238
|
|
|
$record['context'] = array_merge($record['context'], $context); |
239
|
|
|
|
240
|
|
|
return $record; |
241
|
2 |
|
}); |
242
|
|
|
|
243
|
2 |
|
return true; |
244
|
|
|
} |
245
|
|
|
|
246
|
|
|
/** |
247
|
|
|
* {@inheritdoc} |
248
|
|
|
*/ |
249
|
|
|
protected function export(CollectionInterface $collection, array $filter = [], array $endpoints = [], bool $simulate = false, bool $ignore = false): bool |
250
|
|
|
{ |
251
|
|
|
$this->logger->info('start export to destination endpoints from data type ['.$collection->getIdentifier().']', [ |
252
|
|
|
'category' => get_class($this), |
253
|
|
|
]); |
254
|
|
|
|
255
|
|
|
$endpoints = iterator_to_array($collection->getDestinationEndpoints($endpoints)); |
256
|
|
|
$workflows = []; |
257
|
|
|
|
258
|
|
|
foreach ($endpoints as $ep) { |
259
|
|
|
if ($ep->flushRequired()) { |
260
|
|
|
$ep->flush($simulate); |
261
|
|
|
} |
262
|
|
|
|
263
|
|
|
$ep->setup($simulate); |
264
|
|
|
} |
265
|
|
|
|
266
|
|
|
$i = 0; |
267
|
|
|
foreach ($collection->getObjects($filter) as $id => $object) { |
268
|
|
|
++$i; |
269
|
|
|
$this->logger->debug('process ['.$i.'] export for object ['.(string) $id.'] - [{fields}] from data type ['.$collection->getIdentifier().']', [ |
270
|
|
|
'category' => get_class($this), |
271
|
|
|
'fields' => array_keys($object->toArray()), |
272
|
|
|
]); |
273
|
|
|
|
274
|
|
|
foreach ($endpoints as $ep) { |
275
|
|
|
$identifier = $ep->getIdentifier(); |
276
|
|
|
$this->logger->info('start export to destination endpoint ['.$identifier.']', [ |
277
|
|
|
'category' => get_class($this), |
278
|
|
|
]); |
279
|
|
|
|
280
|
|
|
if (!isset($workflows[$identifier])) { |
281
|
|
|
$workflows[$identifier] = iterator_to_array($ep->getWorkflows(['kind' => 'Workflow'])); |
282
|
|
|
|
283
|
|
|
if (count($workflows[$identifier]) === 0) { |
284
|
|
|
$this->logger->warning('no workflows available in destination endpoint ['.$ep->getIdentifier().'], skip export', [ |
285
|
|
|
'category' => get_class($this), |
286
|
|
|
]); |
287
|
|
|
|
288
|
|
|
continue; |
289
|
|
|
} |
290
|
|
|
} |
291
|
|
|
|
292
|
|
|
try { |
293
|
|
|
foreach ($workflows[$identifier] as $workflow) { |
294
|
|
|
$this->logger->debug('start workflow ['.$workflow->getIdentifier().'] [ensure='.$workflow->getEnsure().'] for the current object', [ |
295
|
|
|
'category' => get_class($this), |
296
|
|
|
]); |
297
|
|
|
|
298
|
|
|
if ($workflow->export($object, $this->timestamp, $simulate) === true) { |
299
|
|
|
$this->logger->debug('workflow ['.$workflow->getIdentifier().'] executed for the object ['.(string) $id.'], skip any further workflows for the current data object', [ |
300
|
|
|
'category' => get_class($this), |
301
|
|
|
]); |
302
|
|
|
|
303
|
|
|
continue 2; |
304
|
|
|
} |
305
|
|
|
$this->logger->debug('skip workflow ['.$workflow->getIdentifier().'] for object ['.(string) $id.'], condition does not match or unusable ensure', [ |
306
|
|
|
'category' => get_class($this), |
307
|
|
|
]); |
308
|
|
|
} |
309
|
|
|
|
310
|
|
|
$this->logger->debug('no workflow were executed within endpoint ['.$identifier.'] for the current object', [ |
311
|
|
|
'category' => get_class($this), |
312
|
|
|
]); |
313
|
|
|
} catch (\Throwable $e) { |
314
|
|
|
++$this->error_count; |
315
|
|
|
|
316
|
|
|
$this->logger->error('failed export object to destination endpoint ['.$identifier.']', [ |
317
|
|
|
'category' => get_class($this), |
318
|
|
|
'object' => $object->getId(), |
319
|
|
|
'exception' => $e, |
320
|
|
|
]); |
321
|
|
|
|
322
|
|
|
if ($ignore === false) { |
323
|
|
|
return false; |
324
|
|
|
} |
325
|
|
|
} |
326
|
|
|
} |
327
|
|
|
} |
328
|
|
|
|
329
|
|
|
if (count($endpoints) === 0) { |
330
|
|
|
$this->logger->warning('no destination endpoint available for collection ['.$collection->getIdentifier().'], skip export', [ |
331
|
|
|
'category' => get_class($this), |
332
|
|
|
]); |
333
|
|
|
|
334
|
|
|
return true; |
335
|
|
|
} |
336
|
|
|
|
337
|
|
|
foreach ($endpoints as $n => $ep) { |
338
|
|
|
$ep->shutdown($simulate); |
339
|
|
|
} |
340
|
|
|
|
341
|
|
|
return true; |
342
|
|
|
} |
343
|
|
|
|
344
|
|
|
/** |
345
|
|
|
* {@inheritdoc} |
346
|
|
|
*/ |
347
|
|
|
protected function import(CollectionInterface $collection, array $filter = [], array $endpoints = [], bool $simulate = false, bool $ignore = false): bool |
348
|
|
|
{ |
349
|
|
|
$this->logger->info('start import from source endpoints into data type ['.$collection->getIdentifier().']', [ |
350
|
|
|
'category' => get_class($this), |
351
|
|
|
]); |
352
|
|
|
|
353
|
|
|
$endpoints = $collection->getSourceEndpoints($endpoints); |
354
|
|
|
$workflows = []; |
355
|
|
|
|
356
|
|
|
foreach ($endpoints as $ep) { |
357
|
|
|
$identifier = $ep->getIdentifier(); |
358
|
|
|
$this->logger->info('start import from source endpoint ['.$identifier.']', [ |
359
|
|
|
'category' => get_class($this), |
360
|
|
|
]); |
361
|
|
|
|
362
|
|
|
if ($ep->flushRequired()) { |
363
|
|
|
$collection->flush($simulate); |
364
|
|
|
} |
365
|
|
|
|
366
|
|
|
$ep->setup($simulate); |
367
|
|
|
if (!isset($workflows[$identifier])) { |
368
|
|
|
$workflows[$identifier] = iterator_to_array($ep->getWorkflows(['kind' => 'Workflow'])); |
369
|
|
|
|
370
|
|
|
if (count($workflows[$identifier]) === 0) { |
371
|
|
|
$this->logger->warning('no workflows available in source endpoint ['.$ep->getIdentifier().'], skip import', [ |
372
|
|
|
'category' => get_class($this), |
373
|
|
|
]); |
374
|
|
|
|
375
|
|
|
continue; |
376
|
|
|
} |
377
|
|
|
} |
378
|
|
|
|
379
|
|
|
$i = 0; |
380
|
|
|
foreach ($ep->getAll($filter) as $id => $object) { |
381
|
|
|
++$i; |
382
|
|
|
$this->logger->debug('process object ['.$i.'] import for object ['.$object->getId().'] into data type ['.$collection->getIdentifier().']', [ |
383
|
|
|
'category' => get_class($this), |
384
|
|
|
'attributes' => $object, |
385
|
|
|
]); |
386
|
|
|
|
387
|
|
|
try { |
388
|
|
|
foreach ($workflows[$identifier] as $workflow) { |
389
|
|
|
$this->logger->debug('start workflow ['.$workflow->getIdentifier().'] [ensure='.$workflow->getEnsure().'] for the current object', [ |
390
|
|
|
'category' => get_class($this), |
391
|
|
|
]); |
392
|
|
|
|
393
|
|
|
if ($workflow->import($collection, $object, $this->timestamp, $simulate) === true) { |
394
|
|
|
$this->logger->debug('workflow ['.$workflow->getIdentifier().'] executed for the object ['.(string) $object->getId().'], skip any further workflows for the current data object', [ |
395
|
|
|
'category' => get_class($this), |
396
|
|
|
]); |
397
|
|
|
|
398
|
|
|
continue 2; |
399
|
|
|
} |
400
|
|
|
$this->logger->debug('skip workflow ['.$workflow->getIdentifier().'] for object ['.(string) $object->getId().'], condition does not match or unusable ensure', [ |
401
|
|
|
'category' => get_class($this), |
402
|
|
|
]); |
403
|
|
|
} |
404
|
|
|
|
405
|
|
|
$this->logger->debug('no workflow were executed within endpoint ['.$identifier.'] for the current object', [ |
406
|
|
|
'category' => get_class($this), |
407
|
|
|
]); |
408
|
|
|
} catch (\Throwable $e) { |
409
|
|
|
++$this->error_count; |
410
|
|
|
|
411
|
|
|
$this->logger->error('failed import data object from source endpoint ['.$identifier.']', [ |
412
|
|
|
'category' => get_class($this), |
413
|
|
|
'namespace' => $collection->getResourceNamespace()->getName(), |
414
|
|
|
'collection' => $collection->getName(), |
415
|
|
|
'endpoint' => $ep->getName(), |
416
|
|
|
'exception' => $e, |
417
|
|
|
]); |
418
|
|
|
|
419
|
|
|
if ($ignore === false) { |
420
|
|
|
return false; |
421
|
|
|
} |
422
|
|
|
} |
423
|
|
|
} |
424
|
|
|
|
425
|
|
|
if (empty($filter)) { |
426
|
|
|
$this->garbageCollector($collection, $ep, $simulate, $ignore); |
427
|
|
|
} else { |
428
|
|
|
$this->logger->info('skip garbage collection, a query has been issued for import', [ |
429
|
|
|
'category' => get_class($this), |
430
|
|
|
]); |
431
|
|
|
} |
432
|
|
|
|
433
|
|
|
$ep->shutdown($simulate); |
434
|
|
|
} |
435
|
|
|
|
436
|
|
|
if ($endpoints->getReturn() === 0) { |
437
|
|
|
$this->logger->warning('no source endpoint available for collection ['.$collection->getIdentifier().'], skip import', [ |
438
|
|
|
'category' => get_class($this), |
439
|
|
|
]); |
440
|
|
|
|
441
|
|
|
return true; |
442
|
|
|
} |
443
|
|
|
|
444
|
|
|
return true; |
445
|
|
|
} |
446
|
|
|
|
447
|
|
|
/** |
448
|
|
|
* Garbage. |
449
|
|
|
*/ |
450
|
|
|
protected function garbageCollector(CollectionInterface $collection, EndpointInterface $endpoint, bool $simulate = false, bool $ignore = false): bool |
451
|
|
|
{ |
452
|
|
|
$this->logger->info('start garbage collector workflows from data type ['.$collection->getIdentifier().']', [ |
453
|
|
|
'category' => get_class($this), |
454
|
|
|
]); |
455
|
|
|
|
456
|
|
|
$filter = [ |
457
|
|
|
'endpoints.'.$endpoint->getName().'.last_sync' => [ |
458
|
|
|
'$lt' => $this->timestamp, |
459
|
|
|
], |
460
|
|
|
]; |
461
|
|
|
|
462
|
|
|
$this->db->{$collection->getCollection()}->updateMany($filter, ['$set' => [ |
463
|
|
|
'endpoints.'.$endpoint->getName().'.garbage' => true, |
464
|
|
|
]]); |
465
|
|
|
|
466
|
|
|
$workflows = iterator_to_array($endpoint->getWorkflows(['kind' => 'GarbageWorkflow'])); |
467
|
|
|
if (count($workflows) === 0) { |
468
|
|
|
$this->logger->info('no garbage workflows available in ['.$endpoint->getIdentifier().'], skip garbage collection', [ |
469
|
|
|
'category' => get_class($this), |
470
|
|
|
]); |
471
|
|
|
|
472
|
|
|
return false; |
473
|
|
|
} |
474
|
|
|
|
475
|
|
|
$i = 0; |
476
|
|
|
foreach ($collection->getObjects($filter, false) as $id => $object) { |
477
|
|
|
++$i; |
478
|
|
|
$this->logger->debug('process ['.$i.'] garbage workflows for garbage object ['.$id.'] from data type ['.$collection->getIdentifier().']', [ |
479
|
|
|
'category' => get_class($this), |
480
|
|
|
]); |
481
|
|
|
|
482
|
|
|
try { |
483
|
|
|
foreach ($workflows as $workflow) { |
484
|
|
|
$this->logger->debug('start workflow ['.$workflow->getIdentifier().'] for the current garbage object', [ |
485
|
|
|
'category' => get_class($this), |
486
|
|
|
]); |
487
|
|
|
|
488
|
|
|
if ($workflow->cleanup($object, $this->timestamp, $simulate) === true) { |
489
|
|
|
$this->logger->debug('workflow ['.$workflow->getIdentifier().'] executed for the current garbage object, skip any further workflows for the current garbage object', [ |
490
|
|
|
'category' => get_class($this), |
491
|
|
|
]); |
492
|
|
|
|
493
|
|
|
break; |
494
|
|
|
} |
495
|
|
|
} |
496
|
|
|
} catch (\Exception $e) { |
497
|
|
|
$this->logger->error('failed execute garbage collector for object ['.$id.'] from collection ['.$collection->getIdentifier().']', [ |
498
|
|
|
'category' => get_class($this), |
499
|
|
|
'exception' => $e, |
500
|
|
|
]); |
501
|
|
|
|
502
|
|
|
if ($ignore === false) { |
503
|
|
|
return false; |
504
|
|
|
} |
505
|
|
|
} |
506
|
|
|
} |
507
|
|
|
|
508
|
|
|
$this->relationGarbageCollector($collection, $endpoint, $workflows); |
509
|
|
|
|
510
|
|
|
return true; |
511
|
|
|
} |
512
|
|
|
|
513
|
|
|
/** |
514
|
|
|
* Relation garbage collector. |
515
|
|
|
*/ |
516
|
|
|
protected function relationGarbageCollector(CollectionInterface $collection, EndpointInterface $endpoint, $workflows) |
517
|
|
|
{ |
518
|
|
|
$namespace = $endpoint->getCollection()->getResourceNamespace()->getName(); |
519
|
|
|
$collection = $endpoint->getCollection()->getName(); |
520
|
|
|
$ep = $endpoint->getName(); |
521
|
|
|
$key = join('/', [$namespace, $collection, $ep]); |
522
|
|
|
|
523
|
|
|
$filter = [ |
524
|
|
|
'endpoints.'.$key.'.last_sync' => [ |
525
|
|
|
'$lt' => $this->timestamp, |
526
|
|
|
], |
527
|
|
|
]; |
528
|
|
|
|
529
|
|
|
$this->db->relations->updateMany($filter, ['$set' => [ |
530
|
|
|
'endpoints.'.$key.'.garbage' => true, |
531
|
|
|
]]); |
532
|
|
|
|
533
|
|
|
foreach ($workflows as $workflow) { |
534
|
|
|
foreach ($workflow->getAttributeMap()->getMap() as $attr) { |
535
|
|
|
if (isset($attr['map']) && $attr['map']['ensure'] === 'absent') { |
536
|
|
|
$this->db->relations->deleteMany(['endpoints.'.$key.'.garbage' => true]); |
537
|
|
|
} |
538
|
|
|
} |
539
|
|
|
} |
540
|
|
|
} |
541
|
|
|
|
542
|
|
|
/** |
543
|
|
|
* Notify. |
544
|
|
|
*/ |
545
|
5 |
|
protected function notify(): bool |
546
|
|
|
{ |
547
|
5 |
|
if ($this->data['notification']['enabled'] === false) { |
548
|
5 |
|
$this->logger->debug('skip notifiaction for process ['.$this->getId().'], notification is disabled', [ |
549
|
5 |
|
'category' => get_class($this), |
550
|
|
|
]); |
551
|
|
|
|
552
|
5 |
|
return false; |
553
|
|
|
} |
554
|
|
|
|
555
|
|
|
if (count($this->data['notification']['receiver']) === 0) { |
556
|
|
|
$this->logger->debug('skip notifiaction for process ['.$this->getId().'], no receiver configured', [ |
557
|
|
|
'category' => get_class($this), |
558
|
|
|
]); |
559
|
|
|
} |
560
|
|
|
|
561
|
|
|
$iso = $this->timestamp->toDateTime()->format('c'); |
562
|
|
|
|
563
|
|
|
if ($this->error_count === 0) { |
564
|
|
|
$subject = "Job ended with $this->error_count errors"; |
565
|
|
|
$body = "Hi there\n\nThe sync process ".(string) $this->getId()." started at $iso ended with $this->error_count errors."; |
566
|
|
|
} else { |
567
|
|
|
$subject = 'Good job! The job finished with no errors'; |
568
|
|
|
$body = "Hi there\n\nThe sync process ".(string) $this->getId()." started at $iso finished with no errors."; |
569
|
|
|
} |
570
|
|
|
|
571
|
|
|
$mail = (new Message()) |
572
|
|
|
->setSubject($subject) |
573
|
|
|
->setBody($body) |
574
|
|
|
->setEncoding('UTF-8'); |
575
|
|
|
|
576
|
|
|
foreach ($this->data['notification']['receiver'] as $receiver) { |
577
|
|
|
$mail->setTo($receiver); |
578
|
|
|
|
579
|
|
|
$this->logger->debug('send process notification ['.$this->getId().'] to ['.$receiver.']', [ |
580
|
|
|
'category' => get_class($this), |
581
|
|
|
]); |
582
|
|
|
|
583
|
|
|
$this->scheduler->addJob(Mail::class, $mail->toString(), [ |
584
|
|
|
Scheduler::OPTION_RETRY => 1, |
585
|
|
|
]); |
586
|
|
|
} |
587
|
|
|
|
588
|
|
|
return true; |
589
|
|
|
} |
590
|
|
|
} |
591
|
|
|
|
This error could be the result of:
1. Missing dependencies
PHP Analyzer uses your
composer.json
file (if available) to determine the dependencies of your project and to determine all the available classes and functions. It expects thecomposer.json
to be in the root folder of your repository.Are you sure this class is defined by one of your dependencies, or did you maybe not list a dependency in either the
require
orrequire-dev
section?2. Missing use statement
PHP does not complain about undefined classes in
ìnstanceof
checks. For example, the following PHP code will work perfectly fine:If you have not tested against this specific condition, such errors might go unnoticed.