Completed
Push — master ( 316baf...2178d1 )
by Raffael
67:25 queued 62:39
created

Sync::start()   A

Complexity

Conditions 3
Paths 3

Size

Total Lines 17

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 10
CRAP Score 3

Importance

Changes 0
Metric Value
dl 0
loc 17
ccs 10
cts 10
cp 1
rs 9.7
c 0
b 0
f 0
cc 3
nc 3
nop 0
crap 3
1
<?php
2
3
declare(strict_types=1);
4
5
/**
6
 * tubee.io
7
 *
8
 * @copyright   Copryright (c) 2017-2019 gyselroth GmbH (https://gyselroth.com)
9
 * @license     GPL-3.0 https://opensource.org/licenses/GPL-3.0
10
 */
11
12
namespace Tubee\Async;
13
14
use MongoDB\BSON\UTCDateTime;
15
use MongoDB\Database;
16
use Monolog\Handler\MongoDBHandler;
17
use Monolog\Logger;
18
use Psr\Log\LoggerInterface;
19
use TaskScheduler\AbstractJob;
20
use TaskScheduler\Scheduler;
21
use Tubee\Collection\CollectionInterface;
22
use Tubee\Endpoint\EndpointInterface;
23
use Tubee\Job\Validator as JobValidator;
24
use Tubee\ResourceNamespace\Factory as ResourceNamespaceFactory;
25
use Tubee\ResourceNamespace\ResourceNamespaceInterface;
26
use Zend\Mail\Message;
27
28
class Sync extends AbstractJob
29
{
30
    /**
31
     * ResourceNamespace factory.
32
     *
33
     * @var ResourceNamespaceFactory
34
     */
35
    protected $namespace_factory;
36
37
    /**
38
     * Scheduler.
39
     *
40
     * @var Scheduler
41
     */
42
    protected $scheduler;
43
44
    /**
45
     * Logger.
46
     *
47
     * @var LoggerInterface
48
     */
49
    protected $logger;
50
51
    /**
52
     * Error count.
53
     *
54
     * @var int
55
     */
56
    protected $error_count = 0;
57
58
    /**
59
     * Start timestamp.
60
     *
61
     * @var UTCDateTime
62
     */
63
    protected $timestamp;
64
65
    /**
66
     * Database.
67
     *
68
     * @var Database
69
     */
70
    protected $db;
71
72
    /**
73
     * Process stack.
74
     *
75
     * @var array
76
     */
77
    protected $stack = [];
78
79
    /**
80
     * Resource namespace.
81
     *
82
     * @var ResourceNamespaceInterface
83
     */
84
    protected $namespace;
85
86
    /**
87
     * Sync.
88
     */
89 5
    public function __construct(ResourceNamespaceFactory $namespace_factory, Database $db, Scheduler $scheduler, LoggerInterface $logger)
90
    {
91 5
        $this->namespace_factory = $namespace_factory;
92 5
        $this->scheduler = $scheduler;
93 5
        $this->logger = $logger;
94 5
        $this->db = $db;
95 5
        $this->timestamp = new UTCDateTime();
96 5
    }
97
98
    /**
99
     * Start job.
100
     */
101 5
    public function start(): bool
102
    {
103 5
        $this->namespace = $this->namespace_factory->getOne($this->data['namespace']);
104
105 5
        foreach ($this->data['collections'] as $collections) {
106 5
            $collections = (array) $collections;
107 5
            $filter = in_array('*', $collections) ? [] : ['name' => ['$in' => $collections]];
108 5
            $collections = iterator_to_array($this->namespace->getCollections($filter));
109
110 5
            $endpoints = $this->data['endpoints'];
111 5
            $this->loopCollections($collections, $endpoints);
112
        }
113
114 5
        $this->notify();
115
116 5
        return true;
117
    }
118
119
    /**
120
     * Loop collections.
121
     */
122 5
    protected function loopCollections(array $collections, array $endpoints)
123
    {
124 5
        foreach ($endpoints as $ep) {
125 5
            foreach ($collections as $collection) {
126 5
                $this->loopEndpoints($collection, $collections, (array) $ep, $endpoints);
127
            }
128
129 5
            $this->logger->debug('wait for child stack ['.count($this->stack).'] to be finished', [
130 5
                'category' => get_class($this),
131
            ]);
132
133 5
            foreach ($this->stack as $proc) {
134 5
                $proc->wait();
135
            }
136
        }
137 5
    }
138
139
    /**
140
     * Loop endpoints.
141
     */
142 5
    protected function loopEndpoints(CollectionInterface $collection, array $all_collections, array $endpoints, array $all_endpoints)
143
    {
144 5
        $filter = in_array('*', $endpoints) ? [] : ['name' => ['$in' => $endpoints]];
145 5
        $endpoints = iterator_to_array($collection->getEndpoints($filter));
146
147 5
        foreach ($endpoints as $endpoint) {
148 5
            if (count($all_endpoints) > 1 || count($all_collections) > 1) {
149 3
                $data = $this->data;
150 3
                $data = array_merge($data, [
151 3
                    'collections' => [$collection->getName()],
152 3
                    'endpoints' => [$endpoint->getName()],
153 3
                    'parent' => $this->getId(),
154
                ]);
155
156 3
                $this->stack[] = $this->scheduler->addJob(self::class, $data);
157
            } else {
158 5
                $this->execute($collection, $endpoint);
159
            }
160
        }
161 5
    }
162
163
    /**
164
     * Execute.
165
     */
166 2
    protected function execute(CollectionInterface $collection, EndpointInterface $endpoint)
167
    {
168 2
        $this->setupLogger(JobValidator::LOG_LEVELS[$this->data['log_level']], [
169 2
            'process' => (string) $this->getId(),
170 2
            'parent' => isset($this->data['parent']) ? (string) $this->data['parent'] : null,
171 2
            'start' => $this->timestamp,
172 2
            'namespace' => $this->namespace->getName(),
173 2
            'collection' => $collection->getName(),
174 2
            'endpoint' => $endpoint->getName(),
175
        ]);
176
177 2
        if ($endpoint->getType() === EndpointInterface::TYPE_SOURCE) {
178
            $this->import($collection, $this->data['filter'], ['name' => $endpoint->getName()], $this->data['simulate'], $this->data['ignore']);
179 2
        } elseif ($endpoint->getType() === EndpointInterface::TYPE_DESTINATION) {
180
            $this->export($collection, $this->data['filter'], ['name' => $endpoint->getName()], $this->data['simulate'], $this->data['ignore']);
181 View Code Duplication
        } else {
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated across your project.

Duplicated code is one of the most pungent code smells. If you need to duplicate the same code in three or more different places, we strongly encourage you to look into extracting the code into a single class or operation.

You can also find more detailed suggestions in the “Code” section of your repository.

Loading history...
182 2
            $this->logger->warning('skip endpoint ['.$endpoint->getIdentifier().'], endpoint type is neither source nor destination', [
183 2
                'category' => get_class($this),
184
            ]);
185
        }
186
187 2
        $this->logger->popProcessor();
188 2
    }
189
190
    /**
191
     * Set logger level.
192
     */
193 2
    protected function setupLogger(int $level, array $context): bool
194
    {
195 2
        if (isset($this->data['job'])) {
196
            $context['job'] = (string) $this->data['job'];
197
        }
198
199 2
        foreach ($this->logger->getHandlers() as $handler) {
200
            if ($handler instanceof MongoDBHandler) {
0 ignored issues
show
Bug introduced by
The class Monolog\Handler\MongoDBHandler does not exist. Did you forget a USE statement, or did you not list all dependencies?

This error could be the result of:

1. Missing dependencies

PHP Analyzer uses your composer.json file (if available) to determine the dependencies of your project and to determine all the available classes and functions. It expects the composer.json to be in the root folder of your repository.

Are you sure this class is defined by one of your dependencies, or did you maybe not list a dependency in either the require or require-dev section?

2. Missing use statement

PHP does not complain about undefined classes in ìnstanceof checks. For example, the following PHP code will work perfectly fine:

if ($x instanceof DoesNotExist) {
    // Do something.
}

If you have not tested against this specific condition, such errors might go unnoticed.

Loading history...
201
                $handler->setLevel($level);
202
203
                $this->logger->pushProcessor(function ($record) use ($context) {
204
                    $record['context'] = array_merge($record['context'], $context);
205
206
                    return $record;
207
                });
208
            }
209
        }
210
211 2
        return true;
212
    }
213
214
    /**
215
     * {@inheritdoc}
216
     */
217
    protected function export(CollectionInterface $collection, array $filter = [], array $endpoints = [], bool $simulate = false, bool $ignore = false): bool
218
    {
219
        $this->logger->info('start export to destination endpoints from data type ['.$collection->getIdentifier().']', [
220
            'category' => get_class($this),
221
        ]);
222
223
        $endpoints = iterator_to_array($collection->getDestinationEndpoints($endpoints));
224
        $workflows = [];
225
226
        foreach ($endpoints as $ep) {
227
            if ($ep->flushRequired()) {
228
                $ep->flush($simulate);
229
            }
230
231
            $ep->setup($simulate);
232
        }
233
234
        $i = 0;
235
        foreach ($collection->getObjects($filter) as $id => $object) {
236
            ++$i;
237
            $this->logger->debug('process ['.$i.'] export for object ['.(string) $id.'] - [{fields}] from data type ['.$collection->getIdentifier().']', [
238
                'category' => get_class($this),
239
                'fields' => array_keys($object->toArray()),
240
            ]);
241
242
            foreach ($endpoints as $ep) {
243
                $identifier = $ep->getIdentifier();
244
                $this->logger->info('start expot to destination endpoint ['.$identifier.']', [
245
                    'category' => get_class($this),
246
                ]);
247
248
                if (!isset($workflows[$identifier])) {
249
                    $workflows[$identifier] = iterator_to_array($ep->getWorkflows());
250
                }
251
252
                try {
253 View Code Duplication
                    foreach ($workflows[$identifier] as $workflow) {
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated across your project.

Duplicated code is one of the most pungent code smells. If you need to duplicate the same code in three or more different places, we strongly encourage you to look into extracting the code into a single class or operation.

You can also find more detailed suggestions in the “Code” section of your repository.

Loading history...
254
                        $this->logger->debug('start workflow ['.$workflow->getIdentifier().'] [ensure='.$workflow->getEnsure().'] for the current object', [
255
                            'category' => get_class($this),
256
                        ]);
257
258
                        if ($workflow->export($object, $this->timestamp, $simulate) === true) {
259
                            $this->logger->debug('workflow ['.$workflow->getIdentifier().'] executed for the object ['.(string) $id.'], skip any further workflows for the current data object', [
260
                                'category' => get_class($this),
261
                            ]);
262
263
                            continue 2;
264
                        }
265
                        $this->logger->debug('skip workflow ['.$workflow->getIdentifier().'] for object ['.(string) $id.'], condition does not match or unusable ensure', [
266
                                'category' => get_class($this),
267
                            ]);
268
                    }
269
270
                    $this->logger->debug('no workflow were executed within endpoint ['.$identifier.'] for the current object', [
271
                        'category' => get_class($this),
272
                    ]);
273
                } catch (\Exception $e) {
274
                    ++$this->error_count;
275
276
                    $this->logger->error('failed export object to destination endpoint ['.$identifier.']', [
277
                        'category' => get_class($this),
278
                        'object' => $object->getId(),
279
                        'exception' => $e,
280
                    ]);
281
282
                    if ($ignore === false) {
283
                        return false;
284
                    }
285
                }
286
            }
287
        }
288
289 View Code Duplication
        if (count($endpoints) === 0) {
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated across your project.

Duplicated code is one of the most pungent code smells. If you need to duplicate the same code in three or more different places, we strongly encourage you to look into extracting the code into a single class or operation.

You can also find more detailed suggestions in the “Code” section of your repository.

Loading history...
290
            $this->logger->warning('no destination endpoint available for collection ['.$collection->getIdentifier().'], skip export', [
291
                'category' => get_class($this),
292
            ]);
293
294
            return true;
295
        }
296
297
        foreach ($endpoints as $n => $ep) {
298
            $ep->shutdown($simulate);
299
        }
300
301
        return true;
302
    }
303
304
    /**
305
     * {@inheritdoc}
306
     */
307
    protected function import(CollectionInterface $collection, array $filter = [], array $endpoints = [], bool $simulate = false, bool $ignore = false): bool
308
    {
309
        $this->logger->info('start import from source endpoints into data type ['.$collection->getIdentifier().']', [
310
            'category' => get_class($this),
311
        ]);
312
313
        $endpoints = $collection->getSourceEndpoints($endpoints);
314
        $workflows = [];
315
316
        foreach ($endpoints as $ep) {
317
            $identifier = $ep->getIdentifier();
318
            $this->logger->info('start import from source endpoint ['.$identifier.']', [
319
                'category' => get_class($this),
320
            ]);
321
322
            if ($ep->flushRequired()) {
323
                $collection->flush($simulate);
324
            }
325
326
            $ep->setup($simulate);
327
            if (!isset($workflows[$identifier])) {
328
                $workflows[$identifier] = iterator_to_array($ep->getWorkflows());
329
            }
330
331
            $i = 0;
332
            foreach ($ep->getAll($filter) as $id => $object) {
333
                ++$i;
334
                $this->logger->debug('process ['.$i.'] import for object ['.$id.'] into data type ['.$collection->getIdentifier().']', [
335
                    'category' => get_class($this),
336
                    'attributes' => $object,
337
                ]);
338
339
                try {
340 View Code Duplication
                    foreach ($workflows[$identifier] as $workflow) {
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated across your project.

Duplicated code is one of the most pungent code smells. If you need to duplicate the same code in three or more different places, we strongly encourage you to look into extracting the code into a single class or operation.

You can also find more detailed suggestions in the “Code” section of your repository.

Loading history...
341
                        $this->logger->debug('start workflow ['.$workflow->getIdentifier().'] [ensure='.$workflow->getEnsure().'] for the current object', [
342
                            'category' => get_class($this),
343
                        ]);
344
345
                        if ($workflow->import($collection, $object, $this->timestamp, $simulate) === true) {
346
                            $this->logger->debug('workflow ['.$workflow->getIdentifier().'] executed for the object ['.(string) $id.'], skip any further workflows for the current data object', [
347
                                'category' => get_class($this),
348
                            ]);
349
350
                            continue 2;
351
                        }
352
                        $this->logger->debug('skip workflow ['.$workflow->getIdentifier().'] for object ['.(string) $id.'], condition does not match or unusable ensure', [
353
                                'category' => get_class($this),
354
                            ]);
355
                    }
356
357
                    $this->logger->debug('no workflow were executed within endpoint ['.$identifier.'] for the current object', [
358
                        'category' => get_class($this),
359
                    ]);
360
                } catch (\Exception $e) {
361
                    ++$this->error_count;
362
363
                    $this->logger->error('failed import data object from source endpoint ['.$identifier.']', [
364
                        'category' => get_class($this),
365
                        'namespace' => $collection->getResourceNamespace()->getName(),
366
                        'collection' => $collection->getName(),
367
                        'endpoint' => $ep->getName(),
368
                        'exception' => $e,
369
                    ]);
370
371
                    if ($ignore === false) {
372
                        return false;
373
                    }
374
                }
375
            }
376
377
            $this->garbageCollector($collection, $ep, $simulate, $ignore);
378
            $ep->shutdown($simulate);
379
        }
380
381 View Code Duplication
        if ($endpoints->getReturn() === 0) {
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated across your project.

Duplicated code is one of the most pungent code smells. If you need to duplicate the same code in three or more different places, we strongly encourage you to look into extracting the code into a single class or operation.

You can also find more detailed suggestions in the “Code” section of your repository.

Loading history...
382
            $this->logger->warning('no source endpoint available for collection ['.$collection->getIdentifier().'], skip import', [
383
                'category' => get_class($this),
384
            ]);
385
386
            return true;
387
        }
388
389
        return true;
390
    }
391
392
    /**
393
     * Garbage.
394
     */
395
    protected function garbageCollector(CollectionInterface $collection, EndpointInterface $endpoint, bool $simulate = false, bool $ignore = false): bool
396
    {
397
        $this->logger->info('start garbage collector workflows from data type ['.$collection->getIdentifier().']', [
398
            'category' => get_class($this),
399
        ]);
400
401
        $filter = [
402
            '$or' => [
403
                [
404
                    'endpoints.'.$endpoint->getName().'.last_sync' => [
405
                        '$lt' => $this->timestamp,
406
                    ],
407
                ],
408
            ],
409
        ];
410
411
        $this->db->{$collection->getCollection()}->updateMany($filter, ['$set' => [
412
            'endpoints.'.$endpoint->getName().'.garbage' => true,
413
        ]]);
414
415
        $workflows = iterator_to_array($endpoint->getWorkflows());
416
417
        foreach ($collection->getObjects($filter, false) as $id => $object) {
418
            $this->logger->debug('process garbage workflows for garbage object ['.$id.'] from data type ['.$collection->getIdentifier().']', [
419
                'category' => get_class($this),
420
            ]);
421
422
            try {
423
                foreach ($workflows as $workflow) {
424
                    $this->logger->debug('start workflow ['.$workflow->getIdentifier().'] for the current garbage object', [
425
                        'category' => get_class($this),
426
                    ]);
427
428
                    if ($workflow->cleanup($object, $this->timestamp, $simulate) === true) {
429
                        $this->logger->debug('workflow ['.$workflow->getIdentifier().'] executed for the current garbage object, skip any further workflows for the current garbage object', [
430
                            'category' => get_class($this),
431
                        ]);
432
433
                        break;
434
                    }
435
                }
436
            } catch (\Exception $e) {
437
                $this->logger->error('failed execute garbage collector for object ['.$id.'] from collection ['.$collection->getIdentifier().']', [
438
                    'category' => get_class($this),
439
                    'exception' => $e,
440
                ]);
441
442
                if ($ignore === false) {
443
                    return false;
444
                }
445
            }
446
        }
447
448
        return true;
449
    }
450
451
    /**
452
     * Notify.
453
     */
454 5
    protected function notify(): bool
455
    {
456 5 View Code Duplication
        if ($this->data['notification']['enabled'] === false) {
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated across your project.

Duplicated code is one of the most pungent code smells. If you need to duplicate the same code in three or more different places, we strongly encourage you to look into extracting the code into a single class or operation.

You can also find more detailed suggestions in the “Code” section of your repository.

Loading history...
457 5
            $this->logger->debug('skip notifiaction for process ['.$this->getId().'], notification is disabled', [
458 5
                'category' => get_class($this),
459
            ]);
460
461 5
            return false;
462
        }
463
464 View Code Duplication
        if (count($this->data['notification']['receiver']) === 0) {
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated across your project.

Duplicated code is one of the most pungent code smells. If you need to duplicate the same code in three or more different places, we strongly encourage you to look into extracting the code into a single class or operation.

You can also find more detailed suggestions in the “Code” section of your repository.

Loading history...
465
            $this->logger->debug('skip notifiaction for process ['.$this->getId().'], no receiver configured', [
466
                'category' => get_class($this),
467
            ]);
468
        }
469
470
        $iso = $this->timestamp->toDateTime()->format('c');
471
472
        if ($this->error_count === 0) {
473
            $subject = "Job ended with $this->error_count errors";
474
            $body = "Hi there\n\nThe sync process ".(string) $this->getId()." started at $iso ended with $this->error_count errors.";
475
        } else {
476
            $subject = 'Good job! The job finished with no errors';
477
            $body = "Hi there\n\nThe sync process ".(string) $this->getId()." started at $iso finished with no errors.";
478
        }
479
480
        $mail = (new Message())
481
          ->setSubject($subject)
482
          ->setBody($body)
483
          ->setEncoding('UTF-8');
484
485
        foreach ($this->data['notification']['receiver'] as $receiver) {
486
            $mail->setTo($receiver);
487
488
            $this->logger->debug('send process notification ['.$this->getId().'] to ['.$receiver.']', [
489
                'category' => get_class($this),
490
            ]);
491
492
            $this->scheduler->addJob(Mail::class, $mail->toString(), [
493
                Scheduler::OPTION_RETRY => 1,
494
            ]);
495
        }
496
497
        return true;
498
    }
499
}
500