Completed
Push — master ( e50616...a5acb0 )
by Raffael
28:40 queued 24:37
created

Sync   D

Complexity

Total Complexity 59

Size/Duplication

Total Lines 515
Duplicated Lines 0 %

Coupling/Cohesion

Components 1
Dependencies 4

Test Coverage

Coverage 27.18%

Importance

Changes 0
Metric Value
wmc 59
lcom 1
cbo 4
dl 0
loc 515
ccs 62
cts 228
cp 0.2718
rs 4.08
c 0
b 0
f 0

11 Methods

Rating   Name   Duplication   Size   Complexity  
A __construct() 0 8 1
A start() 0 17 3
A loopCollections() 0 16 4
A loopEndpoints() 0 20 5
A execute() 0 23 4
A setupLogger() 0 20 4
D export() 0 86 12
C import() 0 84 10
B garbageCollector() 0 55 6
A relationGarbageCollector() 0 25 5
B notify() 0 45 5

How to fix   Complexity   

Complex Class

Complex classes like Sync often do a lot of different things. To break such a class down, we need to identify a cohesive component within that class. A common approach to find such a component is to look for fields/methods that share the same prefixes, or suffixes. You can also have a look at the cohesion graph to spot any un-connected, or weakly-connected components.

Once you have determined the fields that belong together, you can apply the Extract Class refactoring. If the component makes sense as a sub-class, Extract Subclass is also a candidate, and is often faster.

While breaking up the class, it is a good idea to analyze how other classes use Sync, and based on these observations, apply Extract Interface, too.

1
<?php
2
3
declare(strict_types=1);
4
5
/**
6
 * tubee
7
 *
8
 * @copyright   Copryright (c) 2017-2019 gyselroth GmbH (https://gyselroth.com)
9
 * @license     GPL-3.0 https://opensource.org/licenses/GPL-3.0
10
 */
11
12
namespace Tubee\Async;
13
14
use MongoDB\BSON\UTCDateTime;
15
use MongoDB\Database;
16
use Monolog\Handler\MongoDBHandler;
17
use Monolog\Logger;
18
use Psr\Log\LoggerInterface;
19
use TaskScheduler\AbstractJob;
20
use TaskScheduler\Scheduler;
21
use Tubee\Collection\CollectionInterface;
22
use Tubee\Endpoint\EndpointInterface;
23
use Tubee\ResourceNamespace\Factory as ResourceNamespaceFactory;
24
use Tubee\ResourceNamespace\ResourceNamespaceInterface;
25
use Zend\Mail\Message;
26
27
class Sync extends AbstractJob
28
{
29
    /**
30
     * Log levels.
31
     */
32
    public const LOG_LEVELS = [
33
        'debug' => Logger::DEBUG,
34
        'info' => Logger::INFO,
35
        'notice' => Logger::NOTICE,
36
        'warning' => Logger::WARNING,
37
        'error' => Logger::ERROR,
38
        'critical' => Logger::CRITICAL,
39
        'alert' => Logger::ALERT,
40
        'emergency' => Logger::EMERGENCY,
41
    ];
42
43
    /**
44
     * ResourceNamespace factory.
45
     *
46
     * @var ResourceNamespaceFactory
47
     */
48
    protected $namespace_factory;
49
50
    /**
51
     * Scheduler.
52
     *
53
     * @var Scheduler
54
     */
55
    protected $scheduler;
56
57
    /**
58
     * Logger.
59
     *
60
     * @var LoggerInterface
61
     */
62
    protected $logger;
63
64
    /**
65
     * Error count.
66
     *
67
     * @var int
68
     */
69
    protected $error_count = 0;
70
71
    /**
72
     * Start timestamp.
73
     *
74
     * @var UTCDateTime
75
     */
76
    protected $timestamp;
77
78
    /**
79
     * Database.
80
     *
81
     * @var Database
82
     */
83
    protected $db;
84
85
    /**
86
     * Process stack.
87
     *
88
     * @var array
89
     */
90
    protected $stack = [];
91
92
    /**
93
     * Resource namespace.
94
     *
95
     * @var ResourceNamespaceInterface
96
     */
97
    protected $namespace;
98
99
    /**
100
     * Sync.
101
     */
102 5
    public function __construct(ResourceNamespaceFactory $namespace_factory, Database $db, Scheduler $scheduler, LoggerInterface $logger)
103
    {
104 5
        $this->namespace_factory = $namespace_factory;
105 5
        $this->scheduler = $scheduler;
106 5
        $this->logger = $logger;
107 5
        $this->db = $db;
108 5
        $this->timestamp = new UTCDateTime();
109 5
    }
110
111
    /**
112
     * Start job.
113
     */
114 5
    public function start(): bool
115
    {
116 5
        $this->namespace = $this->namespace_factory->getOne($this->data['namespace']);
117
118 5
        foreach ($this->data['collections'] as $collections) {
119 5
            $collections = (array) $collections;
120 5
            $filter = in_array('*', $collections) ? [] : ['name' => ['$in' => $collections]];
121 5
            $collections = iterator_to_array($this->namespace->getCollections($filter));
122
123 5
            $endpoints = $this->data['endpoints'];
124 5
            $this->loopCollections($collections, $endpoints);
125
        }
126
127 5
        $this->notify();
128
129 5
        return true;
130
    }
131
132
    /**
133
     * Loop collections.
134
     */
135 5
    protected function loopCollections(array $collections, array $endpoints)
136
    {
137 5
        foreach ($endpoints as $ep) {
138 5
            foreach ($collections as $collection) {
139 5
                $this->loopEndpoints($collection, $collections, (array) $ep, $endpoints);
140
            }
141
142 5
            $this->logger->debug('wait for child stack ['.count($this->stack).'] to be finished', [
143 5
                'category' => get_class($this),
144
            ]);
145
146 5
            foreach ($this->stack as $proc) {
147 5
                $proc->wait();
148
            }
149
        }
150 5
    }
151
152
    /**
153
     * Loop endpoints.
154
     */
155 5
    protected function loopEndpoints(CollectionInterface $collection, array $all_collections, array $endpoints, array $all_endpoints)
156
    {
157 5
        $filter = in_array('*', $endpoints) ? [] : ['name' => ['$in' => $endpoints]];
158 5
        $endpoints = iterator_to_array($collection->getEndpoints($filter));
159
160 5
        foreach ($endpoints as $endpoint) {
161 5
            if (count($all_endpoints) > 1 || count($all_collections) > 1) {
162 3
                $data = $this->data;
163 3
                $data = array_merge($data, [
164 3
                    'collections' => [$collection->getName()],
165 3
                    'endpoints' => [$endpoint->getName()],
166 3
                    'parent' => $this->getId(),
167
                ]);
168
169 3
                $this->stack[] = $this->scheduler->addJob(self::class, $data);
170
            } else {
171 5
                $this->execute($collection, $endpoint);
172
            }
173
        }
174 5
    }
175
176
    /**
177
     * Execute.
178
     */
179 2
    protected function execute(CollectionInterface $collection, EndpointInterface $endpoint)
180
    {
181 2
        $this->setupLogger(self::LOG_LEVELS[$this->data['log_level']], [
182 2
            'process' => (string) $this->getId(),
183 2
            'parent' => isset($this->data['parent']) ? (string) $this->data['parent'] : null,
184 2
            'start' => $this->timestamp,
185 2
            'namespace' => $this->namespace->getName(),
186 2
            'collection' => $collection->getName(),
187 2
            'endpoint' => $endpoint->getName(),
188
        ]);
189
190 2
        if ($endpoint->getType() === EndpointInterface::TYPE_SOURCE) {
191
            $this->import($collection, $this->data['filter'], ['name' => $endpoint->getName()], $this->data['simulate'], $this->data['ignore']);
192 2
        } elseif ($endpoint->getType() === EndpointInterface::TYPE_DESTINATION) {
193
            $this->export($collection, $this->data['filter'], ['name' => $endpoint->getName()], $this->data['simulate'], $this->data['ignore']);
194
        } else {
195 2
            $this->logger->warning('skip endpoint ['.$endpoint->getIdentifier().'], endpoint type is neither source nor destination', [
196 2
                'category' => get_class($this),
197
            ]);
198
        }
199
200 2
        $this->logger->popProcessor();
201 2
    }
202
203
    /**
204
     * Set logger level.
205
     */
206 2
    protected function setupLogger(int $level, array $context): bool
207
    {
208 2
        if (isset($this->data['job'])) {
209
            $context['job'] = (string) $this->data['job'];
210
        }
211
212 2
        foreach ($this->logger->getHandlers() as $handler) {
213
            if ($handler instanceof MongoDBHandler) {
0 ignored issues
show
Bug introduced by
The class Monolog\Handler\MongoDBHandler does not exist. Did you forget a USE statement, or did you not list all dependencies?

This error could be the result of:

1. Missing dependencies

PHP Analyzer uses your composer.json file (if available) to determine the dependencies of your project and to determine all the available classes and functions. It expects the composer.json to be in the root folder of your repository.

Are you sure this class is defined by one of your dependencies, or did you maybe not list a dependency in either the require or require-dev section?

2. Missing use statement

PHP does not complain about undefined classes in ìnstanceof checks. For example, the following PHP code will work perfectly fine:

if ($x instanceof DoesNotExist) {
    // Do something.
}

If you have not tested against this specific condition, such errors might go unnoticed.

Loading history...
214
                $handler->setLevel($level);
215
216
                $this->logger->pushProcessor(function ($record) use ($context) {
217
                    $record['context'] = array_merge($record['context'], $context);
218
219
                    return $record;
220
                });
221
            }
222
        }
223
224 2
        return true;
225
    }
226
227
    /**
228
     * {@inheritdoc}
229
     */
230
    protected function export(CollectionInterface $collection, array $filter = [], array $endpoints = [], bool $simulate = false, bool $ignore = false): bool
231
    {
232
        $this->logger->info('start export to destination endpoints from data type ['.$collection->getIdentifier().']', [
233
            'category' => get_class($this),
234
        ]);
235
236
        $endpoints = iterator_to_array($collection->getDestinationEndpoints($endpoints));
237
        $workflows = [];
238
239
        foreach ($endpoints as $ep) {
240
            if ($ep->flushRequired()) {
241
                $ep->flush($simulate);
242
            }
243
244
            $ep->setup($simulate);
245
        }
246
247
        $i = 0;
248
        foreach ($collection->getObjects($filter) as $id => $object) {
249
            ++$i;
250
            $this->logger->debug('process ['.$i.'] export for object ['.(string) $id.'] - [{fields}] from data type ['.$collection->getIdentifier().']', [
251
                'category' => get_class($this),
252
                'fields' => array_keys($object->toArray()),
253
            ]);
254
255
            foreach ($endpoints as $ep) {
256
                $identifier = $ep->getIdentifier();
257
                $this->logger->info('start expot to destination endpoint ['.$identifier.']', [
258
                    'category' => get_class($this),
259
                ]);
260
261
                if (!isset($workflows[$identifier])) {
262
                    $workflows[$identifier] = iterator_to_array($ep->getWorkflows(['kind' => 'Workflow']));
263
                }
264
265
                try {
266
                    foreach ($workflows[$identifier] as $workflow) {
267
                        $this->logger->debug('start workflow ['.$workflow->getIdentifier().'] [ensure='.$workflow->getEnsure().'] for the current object', [
268
                            'category' => get_class($this),
269
                        ]);
270
271
                        if ($workflow->export($object, $this->timestamp, $simulate) === true) {
272
                            $this->logger->debug('workflow ['.$workflow->getIdentifier().'] executed for the object ['.(string) $id.'], skip any further workflows for the current data object', [
273
                                'category' => get_class($this),
274
                            ]);
275
276
                            continue 2;
277
                        }
278
                        $this->logger->debug('skip workflow ['.$workflow->getIdentifier().'] for object ['.(string) $id.'], condition does not match or unusable ensure', [
279
                                'category' => get_class($this),
280
                            ]);
281
                    }
282
283
                    $this->logger->debug('no workflow were executed within endpoint ['.$identifier.'] for the current object', [
284
                        'category' => get_class($this),
285
                    ]);
286
                } catch (\Exception $e) {
287
                    ++$this->error_count;
288
289
                    $this->logger->error('failed export object to destination endpoint ['.$identifier.']', [
290
                        'category' => get_class($this),
291
                        'object' => $object->getId(),
292
                        'exception' => $e,
293
                    ]);
294
295
                    if ($ignore === false) {
296
                        return false;
297
                    }
298
                }
299
            }
300
        }
301
302
        if (count($endpoints) === 0) {
303
            $this->logger->warning('no destination endpoint available for collection ['.$collection->getIdentifier().'], skip export', [
304
                'category' => get_class($this),
305
            ]);
306
307
            return true;
308
        }
309
310
        foreach ($endpoints as $n => $ep) {
311
            $ep->shutdown($simulate);
312
        }
313
314
        return true;
315
    }
316
317
    /**
318
     * {@inheritdoc}
319
     */
320
    protected function import(CollectionInterface $collection, array $filter = [], array $endpoints = [], bool $simulate = false, bool $ignore = false): bool
321
    {
322
        $this->logger->info('start import from source endpoints into data type ['.$collection->getIdentifier().']', [
323
            'category' => get_class($this),
324
        ]);
325
326
        $endpoints = $collection->getSourceEndpoints($endpoints);
327
        $workflows = [];
328
329
        foreach ($endpoints as $ep) {
330
            $identifier = $ep->getIdentifier();
331
            $this->logger->info('start import from source endpoint ['.$identifier.']', [
332
                'category' => get_class($this),
333
            ]);
334
335
            if ($ep->flushRequired()) {
336
                $collection->flush($simulate);
337
            }
338
339
            $ep->setup($simulate);
340
            if (!isset($workflows[$identifier])) {
341
                $workflows[$identifier] = iterator_to_array($ep->getWorkflows(['kind' => 'Workflow']));
342
            }
343
344
            $i = 0;
345
            foreach ($ep->getAll($filter) as $id => $object) {
346
                ++$i;
347
                $this->logger->debug('process ['.$i.'] import for object ['.$id.'] into data type ['.$collection->getIdentifier().']', [
348
                    'category' => get_class($this),
349
                    'attributes' => $object,
350
                ]);
351
352
                try {
353
                    foreach ($workflows[$identifier] as $workflow) {
354
                        $this->logger->debug('start workflow ['.$workflow->getIdentifier().'] [ensure='.$workflow->getEnsure().'] for the current object', [
355
                            'category' => get_class($this),
356
                        ]);
357
358
                        if ($workflow->import($collection, $object, $this->timestamp, $simulate) === true) {
359
                            $this->logger->debug('workflow ['.$workflow->getIdentifier().'] executed for the object ['.(string) $id.'], skip any further workflows for the current data object', [
360
                                'category' => get_class($this),
361
                            ]);
362
363
                            continue 2;
364
                        }
365
                        $this->logger->debug('skip workflow ['.$workflow->getIdentifier().'] for object ['.(string) $id.'], condition does not match or unusable ensure', [
366
                                'category' => get_class($this),
367
                            ]);
368
                    }
369
370
                    $this->logger->debug('no workflow were executed within endpoint ['.$identifier.'] for the current object', [
371
                        'category' => get_class($this),
372
                    ]);
373
                } catch (\Exception $e) {
374
                    ++$this->error_count;
375
376
                    $this->logger->error('failed import data object from source endpoint ['.$identifier.']', [
377
                        'category' => get_class($this),
378
                        'namespace' => $collection->getResourceNamespace()->getName(),
379
                        'collection' => $collection->getName(),
380
                        'endpoint' => $ep->getName(),
381
                        'exception' => $e,
382
                    ]);
383
384
                    if ($ignore === false) {
385
                        return false;
386
                    }
387
                }
388
            }
389
390
            $this->garbageCollector($collection, $ep, $simulate, $ignore);
391
            $ep->shutdown($simulate);
392
        }
393
394
        if ($endpoints->getReturn() === 0) {
395
            $this->logger->warning('no source endpoint available for collection ['.$collection->getIdentifier().'], skip import', [
396
                'category' => get_class($this),
397
            ]);
398
399
            return true;
400
        }
401
402
        return true;
403
    }
404
405
    /**
406
     * Garbage.
407
     */
408
    protected function garbageCollector(CollectionInterface $collection, EndpointInterface $endpoint, bool $simulate = false, bool $ignore = false): bool
409
    {
410
        $this->logger->info('start garbage collector workflows from data type ['.$collection->getIdentifier().']', [
411
            'category' => get_class($this),
412
        ]);
413
414
        $filter = [
415
            'endpoints.'.$endpoint->getName().'.last_sync' => [
416
                '$lt' => $this->timestamp,
417
            ],
418
        ];
419
420
        $this->db->{$collection->getCollection()}->updateMany($filter, ['$set' => [
421
            'endpoints.'.$endpoint->getName().'.garbage' => true,
422
        ]]);
423
424
        $workflows = iterator_to_array($endpoint->getWorkflows(['kind' => 'GarbageWorkflow']));
425
426
        $i = 0;
427
        foreach ($collection->getObjects($filter, false) as $id => $object) {
428
            ++$i;
429
            $this->logger->debug('process ['.$i.'] garbage workflows for garbage object ['.$id.'] from data type ['.$collection->getIdentifier().']', [
430
                'category' => get_class($this),
431
            ]);
432
433
            try {
434
                foreach ($workflows as $workflow) {
435
                    $this->logger->debug('start workflow ['.$workflow->getIdentifier().'] for the current garbage object', [
436
                        'category' => get_class($this),
437
                    ]);
438
439
                    if ($workflow->cleanup($object, $this->timestamp, $simulate) === true) {
440
                        $this->logger->debug('workflow ['.$workflow->getIdentifier().'] executed for the current garbage object, skip any further workflows for the current garbage object', [
441
                            'category' => get_class($this),
442
                        ]);
443
444
                        break;
445
                    }
446
                }
447
            } catch (\Exception $e) {
448
                $this->logger->error('failed execute garbage collector for object ['.$id.'] from collection ['.$collection->getIdentifier().']', [
449
                    'category' => get_class($this),
450
                    'exception' => $e,
451
                ]);
452
453
                if ($ignore === false) {
454
                    return false;
455
                }
456
            }
457
        }
458
459
        $this->relationGarbageCollector($collection, $endpoint, $workflows);
460
461
        return true;
462
    }
463
464
    /**
465
     * Relation garbage collector.
466
     */
467
    protected function relationGarbageCollector(CollectionInterface $collection, EndpointInterface $endpoint, $workflows)
468
    {
469
        $namespace = $endpoint->getCollection()->getResourceNamespace()->getName();
470
        $collection = $endpoint->getCollection()->getName();
471
        $ep = $endpoint->getName();
472
        $key = join('/', [$namespace, $collection, $ep]);
473
474
        $filter = [
475
            'endpoints.'.$key.'.last_sync' => [
476
                '$lt' => $this->timestamp,
477
            ],
478
        ];
479
480
        $this->db->relations->updateMany($filter, ['$set' => [
481
            'endpoints.'.$key.'.garbage' => true,
482
        ]]);
483
484
        foreach ($workflows as $workflow) {
485
            foreach ($workflow->getAttributeMap()->getMap() as $attr) {
486
                if (isset($attr['map']) && $attr['map']['ensure'] === 'absent') {
487
                    $this->db->relations->deleteMany(['endpoints.'.$key.'.garbage' => true]);
488
                }
489
            }
490
        }
491
    }
492
493
    /**
494
     * Notify.
495
     */
496 5
    protected function notify(): bool
497
    {
498 5
        if ($this->data['notification']['enabled'] === false) {
499 5
            $this->logger->debug('skip notifiaction for process ['.$this->getId().'], notification is disabled', [
500 5
                'category' => get_class($this),
501
            ]);
502
503 5
            return false;
504
        }
505
506
        if (count($this->data['notification']['receiver']) === 0) {
507
            $this->logger->debug('skip notifiaction for process ['.$this->getId().'], no receiver configured', [
508
                'category' => get_class($this),
509
            ]);
510
        }
511
512
        $iso = $this->timestamp->toDateTime()->format('c');
513
514
        if ($this->error_count === 0) {
515
            $subject = "Job ended with $this->error_count errors";
516
            $body = "Hi there\n\nThe sync process ".(string) $this->getId()." started at $iso ended with $this->error_count errors.";
517
        } else {
518
            $subject = 'Good job! The job finished with no errors';
519
            $body = "Hi there\n\nThe sync process ".(string) $this->getId()." started at $iso finished with no errors.";
520
        }
521
522
        $mail = (new Message())
523
          ->setSubject($subject)
524
          ->setBody($body)
525
          ->setEncoding('UTF-8');
526
527
        foreach ($this->data['notification']['receiver'] as $receiver) {
528
            $mail->setTo($receiver);
529
530
            $this->logger->debug('send process notification ['.$this->getId().'] to ['.$receiver.']', [
531
                'category' => get_class($this),
532
            ]);
533
534
            $this->scheduler->addJob(Mail::class, $mail->toString(), [
535
                Scheduler::OPTION_RETRY => 1,
536
            ]);
537
        }
538
539
        return true;
540
    }
541
}
542