Completed
Push — master ( b64fc1...5651bd )
by Raffael
16:20 queued 08:39
created

Sync::import()   D

Complexity

Conditions 12
Paths 230

Size

Total Lines 99

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 0
CRAP Score 156

Importance

Changes 0
Metric Value
dl 0
loc 99
ccs 0
cts 55
cp 0
rs 4.6284
c 0
b 0
f 0
cc 12
nc 230
nop 5
crap 156

How to fix   Long Method    Complexity   

Long Method

Small methods make your code easier to understand, in particular if combined with a good name. Besides, if your method is small, finding a good name is usually much easier.

For example, if you find yourself adding comments to a method's body, this is usually a good sign to extract the commented part to a new method, and use the comment as a starting point when coming up with a good name for this new method.

Commonly applied refactorings include:

1
<?php
2
3
declare(strict_types=1);
4
5
/**
6
 * tubee
7
 *
8
 * @copyright   Copryright (c) 2017-2019 gyselroth GmbH (https://gyselroth.com)
9
 * @license     GPL-3.0 https://opensource.org/licenses/GPL-3.0
10
 */
11
12
namespace Tubee\Async;
13
14
use MongoDB\BSON\UTCDateTime;
15
use MongoDB\Database;
16
use Monolog\Handler\MongoDBHandler;
17
use Monolog\Logger;
18
use Psr\Log\LoggerInterface;
19
use TaskScheduler\AbstractJob;
20
use TaskScheduler\Scheduler;
21
use Tubee\Collection\CollectionInterface;
22
use Tubee\Endpoint\EndpointInterface;
23
use Tubee\Helper;
24
use Tubee\Log\MongoDBFormatter;
25
use Tubee\ResourceNamespace\Factory as ResourceNamespaceFactory;
26
use Tubee\ResourceNamespace\ResourceNamespaceInterface;
27
use Zend\Mail\Message;
28
29
class Sync extends AbstractJob
30
{
31
    /**
32
     * Log levels.
33
     */
34
    public const LOG_LEVELS = [
35
        'debug' => Logger::DEBUG,
36
        'info' => Logger::INFO,
37
        'notice' => Logger::NOTICE,
38
        'warning' => Logger::WARNING,
39
        'error' => Logger::ERROR,
40
        'critical' => Logger::CRITICAL,
41
        'alert' => Logger::ALERT,
42
        'emergency' => Logger::EMERGENCY,
43
    ];
44
45
    /**
46
     * ResourceNamespace factory.
47
     *
48
     * @var ResourceNamespaceFactory
49
     */
50
    protected $namespace_factory;
51
52
    /**
53
     * Scheduler.
54
     *
55
     * @var Scheduler
56
     */
57
    protected $scheduler;
58
59
    /**
60
     * Logger.
61
     *
62
     * @var LoggerInterface
63
     */
64
    protected $logger;
65
66
    /**
67
     * Error count.
68
     *
69
     * @var int
70
     */
71
    protected $error_count = 0;
72
73
    /**
74
     * Start timestamp.
75
     *
76
     * @var UTCDateTime
77
     */
78
    protected $timestamp;
79
80
    /**
81
     * Database.
82
     *
83
     * @var Database
84
     */
85
    protected $db;
86
87
    /**
88
     * Process stack.
89
     *
90
     * @var array
91
     */
92
    protected $stack = [];
93
94
    /**
95
     * Resource namespace.
96
     *
97
     * @var ResourceNamespaceInterface
98
     */
99
    protected $namespace;
100
101
    /**
102
     * Sync.
103
     */
104 5
    public function __construct(ResourceNamespaceFactory $namespace_factory, Database $db, Scheduler $scheduler, LoggerInterface $logger)
105
    {
106 5
        $this->namespace_factory = $namespace_factory;
107 5
        $this->scheduler = $scheduler;
108 5
        $this->logger = $logger;
109 5
        $this->db = $db;
110 5
        $this->timestamp = new UTCDateTime();
111 5
    }
112
113
    /**
114
     * Start job.
115
     */
116 5
    public function start(): bool
117
    {
118 5
        $this->namespace = $this->namespace_factory->getOne($this->data['namespace']);
119
120 5
        foreach ($this->data['collections'] as $collections) {
121 5
            $collections = (array) $collections;
122 5
            $filter = in_array('*', $collections) ? [] : ['name' => ['$in' => $collections]];
123 5
            $collections = iterator_to_array($this->namespace->getCollections($filter));
124
125 5
            $endpoints = $this->data['endpoints'];
126 5
            $this->loopCollections($collections, $endpoints);
127
        }
128
129 5
        $this->notify();
130
131 5
        return true;
132
    }
133
134
    /**
135
     * Loop collections.
136
     */
137 5
    protected function loopCollections(array $collections, array $endpoints)
138
    {
139 5
        foreach ($endpoints as $ep) {
140 5
            foreach ($collections as $collection) {
141 5
                $this->loopEndpoints($collection, $collections, (array) $ep, $endpoints);
142
            }
143
144 5
            $this->logger->debug('wait for child stack ['.count($this->stack).'] to be finished', [
145 5
                'category' => get_class($this),
146
            ]);
147
148 5
            foreach ($this->stack as $proc) {
149 5
                $proc->wait();
150
            }
151
        }
152 5
    }
153
154
    /**
155
     * Loop endpoints.
156
     */
157 5
    protected function loopEndpoints(CollectionInterface $collection, array $all_collections, array $endpoints, array $all_endpoints)
158
    {
159 5
        $filter = in_array('*', $endpoints) ? [] : ['name' => ['$in' => $endpoints]];
160 5
        $endpoints = iterator_to_array($collection->getEndpoints($filter));
161
162 5
        foreach ($endpoints as $endpoint) {
163 5
            if (count($all_endpoints) > 1 || count($all_collections) > 1) {
164 3
                $data = $this->data;
165 3
                $data = array_merge($data, [
166 3
                    'collections' => [$collection->getName()],
167 3
                    'endpoints' => [$endpoint->getName()],
168 3
                    'parent' => $this->getId(),
169
                ]);
170
171 3
                $this->stack[] = $this->scheduler->addJob(self::class, $data);
172
            } else {
173 5
                $this->execute($collection, $endpoint);
174
            }
175
        }
176 5
    }
177
178
    /**
179
     * Execute.
180
     */
181 2
    protected function execute(CollectionInterface $collection, EndpointInterface $endpoint)
182
    {
183 2
        $this->setupLogger(self::LOG_LEVELS[$this->data['log_level']], [
184 2
            'process' => (string) $this->getId(),
185 2
            'parent' => isset($this->data['parent']) ? (string) $this->data['parent'] : null,
186 2
            'start' => $this->timestamp,
187 2
            'namespace' => $this->namespace->getName(),
188 2
            'collection' => $collection->getName(),
189 2
            'endpoint' => $endpoint->getName(),
190
        ]);
191
192 2
        if ($endpoint->getType() === EndpointInterface::TYPE_SOURCE) {
193
            $this->import($collection, $this->getFilter(), ['name' => $endpoint->getName()], $this->data['simulate'], $this->data['ignore']);
194 2
        } elseif ($endpoint->getType() === EndpointInterface::TYPE_DESTINATION) {
195
            $this->export($collection, $this->getFilter(), ['name' => $endpoint->getName()], $this->data['simulate'], $this->data['ignore']);
196
        } else {
197 2
            $this->logger->warning('skip endpoint ['.$endpoint->getIdentifier().'], endpoint type is neither source nor destination', [
198 2
                'category' => get_class($this),
199
            ]);
200
        }
201
202 2
        $this->logger->popProcessor();
203 2
    }
204
205
    /**
206
     * Decode filter.
207
     */
208
    protected function getFilter(): array
209
    {
210
        if ($this->data['filter'] === null) {
211
            return [];
212
        }
213
214
        return (array) Helper::jsonDecode($this->data['filter']);
215
    }
216
217
    /**
218
     * Set logger level.
219
     */
220 2
    protected function setupLogger(int $level, array $context): bool
221
    {
222 2
        if (isset($this->data['job'])) {
223
            $context['job'] = (string) $this->data['job'];
224
        }
225
226 2
        foreach ($this->logger->getHandlers() as $handler) {
227
            if ($handler instanceof MongoDBHandler) {
0 ignored issues
show
Bug introduced by
The class Monolog\Handler\MongoDBHandler does not exist. Did you forget a USE statement, or did you not list all dependencies?

This error could be the result of:

1. Missing dependencies

PHP Analyzer uses your composer.json file (if available) to determine the dependencies of your project and to determine all the available classes and functions. It expects the composer.json to be in the root folder of your repository.

Are you sure this class is defined by one of your dependencies, or did you maybe not list a dependency in either the require or require-dev section?

2. Missing use statement

PHP does not complain about undefined classes in ìnstanceof checks. For example, the following PHP code will work perfectly fine:

if ($x instanceof DoesNotExist) {
    // Do something.
}

If you have not tested against this specific condition, such errors might go unnoticed.

Loading history...
228
                $handler->setLevel($level);
229
                $handler->setFormatter(new MongoDBFormatter());
230
            }
231
        }
232
233 2
        while (count($this->logger->getProcessors()) > 1) {
234
            $this->logger->popProcessor();
235
        }
236
237 2
        $this->logger->pushProcessor(function ($record) use ($context) {
238
            $record['context'] = array_merge($record['context'], $context);
239
240
            return $record;
241 2
        });
242
243 2
        return true;
244
    }
245
246
    /**
247
     * {@inheritdoc}
248
     */
249
    protected function export(CollectionInterface $collection, array $filter = [], array $endpoints = [], bool $simulate = false, bool $ignore = false): bool
250
    {
251
        $this->logger->info('start export to destination endpoints from data type ['.$collection->getIdentifier().']', [
252
            'category' => get_class($this),
253
        ]);
254
255
        $endpoints = iterator_to_array($collection->getDestinationEndpoints($endpoints));
256
        $workflows = [];
257
258
        foreach ($endpoints as $ep) {
259
            if ($ep->flushRequired()) {
260
                $ep->flush($simulate);
261
            }
262
263
            $ep->setup($simulate);
264
        }
265
266
        $i = 0;
267
        foreach ($collection->getObjects($filter) as $id => $object) {
268
            ++$i;
269
            $this->logger->debug('process ['.$i.'] export for object ['.(string) $id.'] - [{fields}] from data type ['.$collection->getIdentifier().']', [
270
                'category' => get_class($this),
271
                'fields' => array_keys($object->toArray()),
272
            ]);
273
274
            foreach ($endpoints as $ep) {
275
                $identifier = $ep->getIdentifier();
276
                $this->logger->info('start export to destination endpoint ['.$identifier.']', [
277
                    'category' => get_class($this),
278
                ]);
279
280
                if (!isset($workflows[$identifier])) {
281
                    $workflows[$identifier] = iterator_to_array($ep->getWorkflows(['kind' => 'Workflow']));
282
283
                    if (count($workflows[$identifier]) === 0) {
284
                        $this->logger->warning('no workflows available in destination endpoint ['.$ep->getIdentifier().'], skip export', [
285
                            'category' => get_class($this),
286
                        ]);
287
288
                        continue;
289
                    }
290
                }
291
292
                try {
293
                    foreach ($workflows[$identifier] as $workflow) {
294
                        $this->logger->debug('start workflow ['.$workflow->getIdentifier().'] [ensure='.$workflow->getEnsure().'] for the current object', [
295
                            'category' => get_class($this),
296
                        ]);
297
298
                        if ($workflow->export($object, $this->timestamp, $simulate) === true) {
299
                            $this->logger->debug('workflow ['.$workflow->getIdentifier().'] executed for the object ['.(string) $id.'], skip any further workflows for the current data object', [
300
                                'category' => get_class($this),
301
                            ]);
302
303
                            continue 2;
304
                        }
305
                        $this->logger->debug('skip workflow ['.$workflow->getIdentifier().'] for object ['.(string) $id.'], condition does not match or unusable ensure', [
306
                                'category' => get_class($this),
307
                            ]);
308
                    }
309
310
                    $this->logger->debug('no workflow were executed within endpoint ['.$identifier.'] for the current object', [
311
                        'category' => get_class($this),
312
                    ]);
313
                } catch (\Throwable $e) {
314
                    ++$this->error_count;
315
316
                    $this->logger->error('failed export object to destination endpoint ['.$identifier.']', [
317
                        'category' => get_class($this),
318
                        'object' => $object->getId(),
319
                        'exception' => $e,
320
                    ]);
321
322
                    if ($ignore === false) {
323
                        return false;
324
                    }
325
                }
326
            }
327
        }
328
329
        if (count($endpoints) === 0) {
330
            $this->logger->warning('no destination endpoint available for collection ['.$collection->getIdentifier().'], skip export', [
331
                'category' => get_class($this),
332
            ]);
333
334
            return true;
335
        }
336
337
        foreach ($endpoints as $n => $ep) {
338
            $ep->shutdown($simulate);
339
        }
340
341
        return true;
342
    }
343
344
    /**
345
     * {@inheritdoc}
346
     */
347
    protected function import(CollectionInterface $collection, array $filter = [], array $endpoints = [], bool $simulate = false, bool $ignore = false): bool
348
    {
349
        $this->logger->info('start import from source endpoints into data type ['.$collection->getIdentifier().']', [
350
            'category' => get_class($this),
351
        ]);
352
353
        $endpoints = $collection->getSourceEndpoints($endpoints);
354
        $workflows = [];
355
356
        foreach ($endpoints as $ep) {
357
            $identifier = $ep->getIdentifier();
358
            $this->logger->info('start import from source endpoint ['.$identifier.']', [
359
                'category' => get_class($this),
360
            ]);
361
362
            if ($ep->flushRequired()) {
363
                $collection->flush($simulate);
364
            }
365
366
            $ep->setup($simulate);
367
            if (!isset($workflows[$identifier])) {
368
                $workflows[$identifier] = iterator_to_array($ep->getWorkflows(['kind' => 'Workflow']));
369
370
                if (count($workflows[$identifier]) === 0) {
371
                    $this->logger->warning('no workflows available in source endpoint ['.$ep->getIdentifier().'], skip import', [
372
                        'category' => get_class($this),
373
                    ]);
374
375
                    continue;
376
                }
377
            }
378
379
            $i = 0;
380
            foreach ($ep->getAll($filter) as $id => $object) {
381
                ++$i;
382
                $this->logger->debug('process object ['.$i.'] import for object ['.$object->getId().'] into data type ['.$collection->getIdentifier().']', [
383
                    'category' => get_class($this),
384
                    'attributes' => $object,
385
                ]);
386
387
                try {
388
                    foreach ($workflows[$identifier] as $workflow) {
389
                        $this->logger->debug('start workflow ['.$workflow->getIdentifier().'] [ensure='.$workflow->getEnsure().'] for the current object', [
390
                            'category' => get_class($this),
391
                        ]);
392
393
                        if ($workflow->import($collection, $object, $this->timestamp, $simulate) === true) {
394
                            $this->logger->debug('workflow ['.$workflow->getIdentifier().'] executed for the object ['.(string) $object->getId().'], skip any further workflows for the current data object', [
395
                                'category' => get_class($this),
396
                            ]);
397
398
                            continue 2;
399
                        }
400
                        $this->logger->debug('skip workflow ['.$workflow->getIdentifier().'] for object ['.(string) $object->getId().'], condition does not match or unusable ensure', [
401
                                'category' => get_class($this),
402
                            ]);
403
                    }
404
405
                    $this->logger->debug('no workflow were executed within endpoint ['.$identifier.'] for the current object', [
406
                        'category' => get_class($this),
407
                    ]);
408
                } catch (\Throwable $e) {
409
                    ++$this->error_count;
410
411
                    $this->logger->error('failed import data object from source endpoint ['.$identifier.']', [
412
                        'category' => get_class($this),
413
                        'namespace' => $collection->getResourceNamespace()->getName(),
414
                        'collection' => $collection->getName(),
415
                        'endpoint' => $ep->getName(),
416
                        'exception' => $e,
417
                    ]);
418
419
                    if ($ignore === false) {
420
                        return false;
421
                    }
422
                }
423
            }
424
425
            if (empty($filter)) {
426
                $this->garbageCollector($collection, $ep, $simulate, $ignore);
427
            } else {
428
                $this->logger->info('skip garbage collection, a query has been issued for import', [
429
                    'category' => get_class($this),
430
                ]);
431
            }
432
433
            $ep->shutdown($simulate);
434
        }
435
436
        if ($endpoints->getReturn() === 0) {
437
            $this->logger->warning('no source endpoint available for collection ['.$collection->getIdentifier().'], skip import', [
438
                'category' => get_class($this),
439
            ]);
440
441
            return true;
442
        }
443
444
        return true;
445
    }
446
447
    /**
448
     * Garbage.
449
     */
450
    protected function garbageCollector(CollectionInterface $collection, EndpointInterface $endpoint, bool $simulate = false, bool $ignore = false): bool
451
    {
452
        $this->logger->info('start garbage collector workflows from data type ['.$collection->getIdentifier().']', [
453
            'category' => get_class($this),
454
        ]);
455
456
        $filter = [
457
            'endpoints.'.$endpoint->getName().'.last_sync' => [
458
                '$lt' => $this->timestamp,
459
            ],
460
        ];
461
462
        $this->db->{$collection->getCollection()}->updateMany($filter, ['$set' => [
463
            'endpoints.'.$endpoint->getName().'.garbage' => true,
464
        ]]);
465
466
        $workflows = iterator_to_array($endpoint->getWorkflows(['kind' => 'GarbageWorkflow']));
467
        if (count($workflows) === 0) {
468
            $this->logger->info('no garbage workflows available in ['.$endpoint->getIdentifier().'], skip garbage collection', [
469
                'category' => get_class($this),
470
            ]);
471
472
            return false;
473
        }
474
475
        $i = 0;
476
        foreach ($collection->getObjects($filter, false) as $id => $object) {
477
            ++$i;
478
            $this->logger->debug('process ['.$i.'] garbage workflows for garbage object ['.$id.'] from data type ['.$collection->getIdentifier().']', [
479
                'category' => get_class($this),
480
            ]);
481
482
            try {
483
                foreach ($workflows as $workflow) {
484
                    $this->logger->debug('start workflow ['.$workflow->getIdentifier().'] for the current garbage object', [
485
                        'category' => get_class($this),
486
                    ]);
487
488
                    if ($workflow->cleanup($object, $this->timestamp, $simulate) === true) {
489
                        $this->logger->debug('workflow ['.$workflow->getIdentifier().'] executed for the current garbage object, skip any further workflows for the current garbage object', [
490
                            'category' => get_class($this),
491
                        ]);
492
493
                        break;
494
                    }
495
                }
496
            } catch (\Exception $e) {
497
                $this->logger->error('failed execute garbage collector for object ['.$id.'] from collection ['.$collection->getIdentifier().']', [
498
                    'category' => get_class($this),
499
                    'exception' => $e,
500
                ]);
501
502
                if ($ignore === false) {
503
                    return false;
504
                }
505
            }
506
        }
507
508
        $this->relationGarbageCollector($collection, $endpoint, $workflows);
509
510
        return true;
511
    }
512
513
    /**
514
     * Relation garbage collector.
515
     */
516
    protected function relationGarbageCollector(CollectionInterface $collection, EndpointInterface $endpoint, $workflows)
517
    {
518
        $namespace = $endpoint->getCollection()->getResourceNamespace()->getName();
519
        $collection = $endpoint->getCollection()->getName();
520
        $ep = $endpoint->getName();
521
        $key = join('/', [$namespace, $collection, $ep]);
522
523
        $filter = [
524
            'endpoints.'.$key.'.last_sync' => [
525
                '$lt' => $this->timestamp,
526
            ],
527
        ];
528
529
        $this->db->relations->updateMany($filter, ['$set' => [
530
            'endpoints.'.$key.'.garbage' => true,
531
        ]]);
532
533
        foreach ($workflows as $workflow) {
534
            foreach ($workflow->getAttributeMap()->getMap() as $attr) {
535
                if (isset($attr['map']) && $attr['map']['ensure'] === 'absent') {
536
                    $this->db->relations->deleteMany(['endpoints.'.$key.'.garbage' => true]);
537
                }
538
            }
539
        }
540
    }
541
542
    /**
543
     * Notify.
544
     */
545 5
    protected function notify(): bool
546
    {
547 5
        if ($this->data['notification']['enabled'] === false) {
548 5
            $this->logger->debug('skip notifiaction for process ['.$this->getId().'], notification is disabled', [
549 5
                'category' => get_class($this),
550
            ]);
551
552 5
            return false;
553
        }
554
555
        if (count($this->data['notification']['receiver']) === 0) {
556
            $this->logger->debug('skip notifiaction for process ['.$this->getId().'], no receiver configured', [
557
                'category' => get_class($this),
558
            ]);
559
        }
560
561
        $iso = $this->timestamp->toDateTime()->format('c');
562
563
        if ($this->error_count === 0) {
564
            $subject = "Job ended with $this->error_count errors";
565
            $body = "Hi there\n\nThe sync process ".(string) $this->getId()." started at $iso ended with $this->error_count errors.";
566
        } else {
567
            $subject = 'Good job! The job finished with no errors';
568
            $body = "Hi there\n\nThe sync process ".(string) $this->getId()." started at $iso finished with no errors.";
569
        }
570
571
        $mail = (new Message())
572
          ->setSubject($subject)
573
          ->setBody($body)
574
          ->setEncoding('UTF-8');
575
576
        foreach ($this->data['notification']['receiver'] as $receiver) {
577
            $mail->setTo($receiver);
578
579
            $this->logger->debug('send process notification ['.$this->getId().'] to ['.$receiver.']', [
580
                'category' => get_class($this),
581
            ]);
582
583
            $this->scheduler->addJob(Mail::class, $mail->toString(), [
584
                Scheduler::OPTION_RETRY => 1,
585
            ]);
586
        }
587
588
        return true;
589
    }
590
}
591