Passed
Push — master ( c86efc...70422d )
by Raffael
07:33 queued 12s
created

Sync::__construct()   A

Complexity

Conditions 1
Paths 1

Size

Total Lines 8

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 7
CRAP Score 1

Importance

Changes 0
Metric Value
dl 0
loc 8
ccs 7
cts 7
cp 1
rs 10
c 0
b 0
f 0
cc 1
nc 1
nop 4
crap 1
1
<?php
2
3
declare(strict_types=1);
4
5
/**
6
 * tubee
7
 *
8
 * @copyright   Copryright (c) 2017-2019 gyselroth GmbH (https://gyselroth.com)
9
 * @license     GPL-3.0 https://opensource.org/licenses/GPL-3.0
10
 */
11
12
namespace Tubee\Async;
13
14
use MongoDB\BSON\UTCDateTime;
15
use MongoDB\Database;
16
use Monolog\Handler\MongoDBHandler;
17
use Monolog\Logger;
18
use Psr\Log\LoggerInterface;
19
use TaskScheduler\AbstractJob;
20
use TaskScheduler\Scheduler;
21
use Tubee\Collection\CollectionInterface;
22
use Tubee\Endpoint\EndpointInterface;
23
use Tubee\Helper;
24
use Tubee\Log\MongoDBFormatter;
25
use Tubee\ResourceNamespace\Factory as ResourceNamespaceFactory;
26
use Tubee\ResourceNamespace\ResourceNamespaceInterface;
27
use Zend\Mail\Message;
28
29
class Sync extends AbstractJob
30
{
31
    /**
32
     * Log levels.
33
     */
34
    public const LOG_LEVELS = [
35
        'debug' => Logger::DEBUG,
36
        'info' => Logger::INFO,
37
        'notice' => Logger::NOTICE,
38
        'warning' => Logger::WARNING,
39
        'error' => Logger::ERROR,
40
        'critical' => Logger::CRITICAL,
41
        'alert' => Logger::ALERT,
42
        'emergency' => Logger::EMERGENCY,
43
    ];
44
45
    /**
46
     * ResourceNamespace factory.
47
     *
48
     * @var ResourceNamespaceFactory
49
     */
50
    protected $namespace_factory;
51
52
    /**
53
     * Scheduler.
54
     *
55
     * @var Scheduler
56
     */
57
    protected $scheduler;
58
59
    /**
60
     * Logger.
61
     *
62
     * @var LoggerInterface
63
     */
64
    protected $logger;
65
66
    /**
67
     * Error count.
68
     *
69
     * @var int
70
     */
71
    protected $error_count = 0;
72
73
    /**
74
     * Start timestamp.
75
     *
76
     * @var UTCDateTime
77
     */
78
    protected $timestamp;
79
80
    /**
81
     * Database.
82
     *
83
     * @var Database
84
     */
85
    protected $db;
86
87
    /**
88
     * Process stack.
89
     *
90
     * @var array
91
     */
92
    protected $stack = [];
93
94
    /**
95
     * Resource namespace.
96
     *
97
     * @var ResourceNamespaceInterface
98
     */
99
    protected $namespace;
100
101
    /**
102
     * Sync.
103
     */
104 5
    public function __construct(ResourceNamespaceFactory $namespace_factory, Database $db, Scheduler $scheduler, LoggerInterface $logger)
105
    {
106 5
        $this->namespace_factory = $namespace_factory;
107 5
        $this->scheduler = $scheduler;
108 5
        $this->logger = $logger;
109 5
        $this->db = $db;
110 5
        $this->timestamp = new UTCDateTime();
111 5
    }
112
113
    /**
114
     * Start job.
115
     */
116 5
    public function start(): bool
117
    {
118 5
        $this->namespace = $this->namespace_factory->getOne($this->data['namespace']);
119
120 5
        foreach ($this->data['collections'] as $collections) {
121 5
            $collections = (array) $collections;
122 5
            $filter = in_array('*', $collections) ? [] : ['name' => ['$in' => $collections]];
123 5
            $collections = iterator_to_array($this->namespace->getCollections($filter));
124
125 5
            $endpoints = $this->data['endpoints'];
126 5
            $this->loopCollections($collections, $endpoints);
127
        }
128
129 2
        $this->notify();
130
131 2
        return true;
132
    }
133
134
    /**
135
     * Loop collections.
136
     */
137 5
    protected function loopCollections(array $collections, array $endpoints)
138
    {
139 5
        foreach ($endpoints as $ep) {
140 5
            foreach ($collections as $collection) {
141 5
                $this->loopEndpoints($collection, $collections, (array) $ep, $endpoints);
142
            }
143
144 5
            $this->logger->debug('wait for child stack ['.count($this->stack).'] to be finished', [
145 5
                'category' => get_class($this),
146
            ]);
147
148 5
            $i = 0;
149 5
            foreach ($this->stack as $proc) {
150 3
                ++$i;
151 3
                $proc->wait();
152 3
                $this->updateProgress($i / count($this->stack) * 100);
153
154 3
                $record = $this->db->{$this->scheduler->getJobQueue()}->findOne([
155 3
                    '_id' => $proc->getId(),
156
                ]);
157
158
                $this->error_count += $record['data']['error_count'] ?? 0;
159
                $this->increaseErrorCount();
160
            }
161
162 2
            $this->stack = [];
163
        }
164 2
    }
165
166
    /**
167
     * Update error count.
168
     */
169 2
    protected function increaseErrorCount(): self
170
    {
171 2
        $this->db->{$this->scheduler->getJobQueue()}->updateOne([
172 2
            '_id' => $this->getId(),
173
            'data.error_count' => ['$exists' => true],
174
        ], [
175 2
            '$set' => ['data.error_count' => $this->error_count],
176
        ]);
177
178 2
        return $this;
179
    }
180
181
    /**
182
     * Loop endpoints.
183
     */
184 5
    protected function loopEndpoints(CollectionInterface $collection, array $all_collections, array $endpoints, array $all_endpoints)
185
    {
186 5
        $filter = in_array('*', $endpoints) ? [] : ['name' => ['$in' => $endpoints]];
187 5
        $endpoints = iterator_to_array($collection->getEndpoints($filter));
188
189 5
        foreach ($endpoints as $endpoint) {
190 5
            if (count($all_endpoints) > 1 || count($all_collections) > 1) {
191 3
                $data = $this->data;
192 3
                $data = array_merge($data, [
193 3
                    'collections' => [$collection->getName()],
194 3
                    'endpoints' => [$endpoint->getName()],
195 3
                    'parent' => $this->getId(),
196
                ]);
197
198 3
                $data['notification'] = ['enabled' => false, 'receiver' => []];
199 3
                $this->stack[] = $this->scheduler->addJob(self::class, $data);
200
            } else {
201 2
                $this->execute($collection, $endpoint);
202 2
                $this->increaseErrorCount();
203
            }
204
        }
205 5
    }
206
207
    /**
208
     * Execute.
209
     */
210 2
    protected function execute(CollectionInterface $collection, EndpointInterface $endpoint)
211
    {
212 2
        $this->setupLogger(self::LOG_LEVELS[$this->data['log_level']], [
213 2
            'process' => (string) $this->getId(),
214 2
            'parent' => isset($this->data['parent']) ? (string) $this->data['parent'] : null,
215 2
            'start' => $this->timestamp,
216 2
            'namespace' => $this->namespace->getName(),
217 2
            'collection' => $collection->getName(),
218 2
            'endpoint' => $endpoint->getName(),
219
        ]);
220
221 2
        if ($endpoint->getType() === EndpointInterface::TYPE_SOURCE) {
222
            $this->import($collection, $this->getFilter(), ['name' => $endpoint->getName()], $this->data['simulate'], $this->data['ignore']);
223 2
        } elseif ($endpoint->getType() === EndpointInterface::TYPE_DESTINATION) {
224
            $this->export($collection, $this->getFilter(), ['name' => $endpoint->getName()], $this->data['simulate'], $this->data['ignore']);
225
        } else {
226 2
            $this->logger->warning('skip endpoint ['.$endpoint->getIdentifier().'], endpoint type is neither source nor destination', [
227 2
                'category' => get_class($this),
228
            ]);
229
        }
230
231 2
        $this->logger->popProcessor();
0 ignored issues
show
Bug introduced by
It seems like you code against a concrete implementation and not the interface Psr\Log\LoggerInterface as the method popProcessor() does only exist in the following implementations of said interface: Monolog\Logger.

Let’s take a look at an example:

interface User
{
    /** @return string */
    public function getPassword();
}

class MyUser implements User
{
    public function getPassword()
    {
        // return something
    }

    public function getDisplayName()
    {
        // return some name.
    }
}

class AuthSystem
{
    public function authenticate(User $user)
    {
        $this->logger->info(sprintf('Authenticating %s.', $user->getDisplayName()));
        // do something.
    }
}

In the above example, the authenticate() method works fine as long as you just pass instances of MyUser. However, if you now also want to pass a different implementation of User which does not have a getDisplayName() method, the code will break.

Available Fixes

  1. Change the type-hint for the parameter:

    class AuthSystem
    {
        public function authenticate(MyUser $user) { /* ... */ }
    }
    
  2. Add an additional type-check:

    class AuthSystem
    {
        public function authenticate(User $user)
        {
            if ($user instanceof MyUser) {
                $this->logger->info(/** ... */);
            }
    
            // or alternatively
            if ( ! $user instanceof MyUser) {
                throw new \LogicException(
                    '$user must be an instance of MyUser, '
                   .'other instances are not supported.'
                );
            }
    
        }
    }
    
Note: PHP Analyzer uses reverse abstract interpretation to narrow down the types inside the if block in such a case.
  1. Add the method to the interface:

    interface User
    {
        /** @return string */
        public function getPassword();
    
        /** @return string */
        public function getDisplayName();
    }
    
Loading history...
232 2
    }
233
234
    /**
235
     * Decode filter.
236
     */
237
    protected function getFilter(): array
238
    {
239
        if ($this->data['filter'] === null) {
240
            return [];
241
        }
242
243
        return (array) Helper::jsonDecode($this->data['filter']);
244
    }
245
246
    /**
247
     * Set logger level.
248
     */
249 2
    protected function setupLogger(int $level, array $context): bool
250
    {
251 2
        if (isset($this->data['job'])) {
252
            $context['job'] = (string) $this->data['job'];
253
        }
254
255 2
        foreach ($this->logger->getHandlers() as $handler) {
0 ignored issues
show
Bug introduced by
It seems like you code against a concrete implementation and not the interface Psr\Log\LoggerInterface as the method getHandlers() does only exist in the following implementations of said interface: Monolog\Logger.

Let’s take a look at an example:

interface User
{
    /** @return string */
    public function getPassword();
}

class MyUser implements User
{
    public function getPassword()
    {
        // return something
    }

    public function getDisplayName()
    {
        // return some name.
    }
}

class AuthSystem
{
    public function authenticate(User $user)
    {
        $this->logger->info(sprintf('Authenticating %s.', $user->getDisplayName()));
        // do something.
    }
}

In the above example, the authenticate() method works fine as long as you just pass instances of MyUser. However, if you now also want to pass a different implementation of User which does not have a getDisplayName() method, the code will break.

Available Fixes

  1. Change the type-hint for the parameter:

    class AuthSystem
    {
        public function authenticate(MyUser $user) { /* ... */ }
    }
    
  2. Add an additional type-check:

    class AuthSystem
    {
        public function authenticate(User $user)
        {
            if ($user instanceof MyUser) {
                $this->logger->info(/** ... */);
            }
    
            // or alternatively
            if ( ! $user instanceof MyUser) {
                throw new \LogicException(
                    '$user must be an instance of MyUser, '
                   .'other instances are not supported.'
                );
            }
    
        }
    }
    
Note: PHP Analyzer uses reverse abstract interpretation to narrow down the types inside the if block in such a case.
  1. Add the method to the interface:

    interface User
    {
        /** @return string */
        public function getPassword();
    
        /** @return string */
        public function getDisplayName();
    }
    
Loading history...
256
            if ($handler instanceof MongoDBHandler) {
257
                $handler->setLevel($level);
258
                $handler->setFormatter(new MongoDBFormatter());
259
            }
260
        }
261
262 2
        while (count($this->logger->getProcessors()) > 1) {
0 ignored issues
show
Bug introduced by
It seems like you code against a concrete implementation and not the interface Psr\Log\LoggerInterface as the method getProcessors() does only exist in the following implementations of said interface: Monolog\Logger.

Let’s take a look at an example:

interface User
{
    /** @return string */
    public function getPassword();
}

class MyUser implements User
{
    public function getPassword()
    {
        // return something
    }

    public function getDisplayName()
    {
        // return some name.
    }
}

class AuthSystem
{
    public function authenticate(User $user)
    {
        $this->logger->info(sprintf('Authenticating %s.', $user->getDisplayName()));
        // do something.
    }
}

In the above example, the authenticate() method works fine as long as you just pass instances of MyUser. However, if you now also want to pass a different implementation of User which does not have a getDisplayName() method, the code will break.

Available Fixes

  1. Change the type-hint for the parameter:

    class AuthSystem
    {
        public function authenticate(MyUser $user) { /* ... */ }
    }
    
  2. Add an additional type-check:

    class AuthSystem
    {
        public function authenticate(User $user)
        {
            if ($user instanceof MyUser) {
                $this->logger->info(/** ... */);
            }
    
            // or alternatively
            if ( ! $user instanceof MyUser) {
                throw new \LogicException(
                    '$user must be an instance of MyUser, '
                   .'other instances are not supported.'
                );
            }
    
        }
    }
    
Note: PHP Analyzer uses reverse abstract interpretation to narrow down the types inside the if block in such a case.
  1. Add the method to the interface:

    interface User
    {
        /** @return string */
        public function getPassword();
    
        /** @return string */
        public function getDisplayName();
    }
    
Loading history...
263
            $this->logger->popProcessor();
0 ignored issues
show
Bug introduced by
It seems like you code against a concrete implementation and not the interface Psr\Log\LoggerInterface as the method popProcessor() does only exist in the following implementations of said interface: Monolog\Logger.

Let’s take a look at an example:

interface User
{
    /** @return string */
    public function getPassword();
}

class MyUser implements User
{
    public function getPassword()
    {
        // return something
    }

    public function getDisplayName()
    {
        // return some name.
    }
}

class AuthSystem
{
    public function authenticate(User $user)
    {
        $this->logger->info(sprintf('Authenticating %s.', $user->getDisplayName()));
        // do something.
    }
}

In the above example, the authenticate() method works fine as long as you just pass instances of MyUser. However, if you now also want to pass a different implementation of User which does not have a getDisplayName() method, the code will break.

Available Fixes

  1. Change the type-hint for the parameter:

    class AuthSystem
    {
        public function authenticate(MyUser $user) { /* ... */ }
    }
    
  2. Add an additional type-check:

    class AuthSystem
    {
        public function authenticate(User $user)
        {
            if ($user instanceof MyUser) {
                $this->logger->info(/** ... */);
            }
    
            // or alternatively
            if ( ! $user instanceof MyUser) {
                throw new \LogicException(
                    '$user must be an instance of MyUser, '
                   .'other instances are not supported.'
                );
            }
    
        }
    }
    
Note: PHP Analyzer uses reverse abstract interpretation to narrow down the types inside the if block in such a case.
  1. Add the method to the interface:

    interface User
    {
        /** @return string */
        public function getPassword();
    
        /** @return string */
        public function getDisplayName();
    }
    
Loading history...
264
        }
265
266 2
        $this->logger->pushProcessor(function ($record) use ($context) {
0 ignored issues
show
Bug introduced by
It seems like you code against a concrete implementation and not the interface Psr\Log\LoggerInterface as the method pushProcessor() does only exist in the following implementations of said interface: Monolog\Logger.

Let’s take a look at an example:

interface User
{
    /** @return string */
    public function getPassword();
}

class MyUser implements User
{
    public function getPassword()
    {
        // return something
    }

    public function getDisplayName()
    {
        // return some name.
    }
}

class AuthSystem
{
    public function authenticate(User $user)
    {
        $this->logger->info(sprintf('Authenticating %s.', $user->getDisplayName()));
        // do something.
    }
}

In the above example, the authenticate() method works fine as long as you just pass instances of MyUser. However, if you now also want to pass a different implementation of User which does not have a getDisplayName() method, the code will break.

Available Fixes

  1. Change the type-hint for the parameter:

    class AuthSystem
    {
        public function authenticate(MyUser $user) { /* ... */ }
    }
    
  2. Add an additional type-check:

    class AuthSystem
    {
        public function authenticate(User $user)
        {
            if ($user instanceof MyUser) {
                $this->logger->info(/** ... */);
            }
    
            // or alternatively
            if ( ! $user instanceof MyUser) {
                throw new \LogicException(
                    '$user must be an instance of MyUser, '
                   .'other instances are not supported.'
                );
            }
    
        }
    }
    
Note: PHP Analyzer uses reverse abstract interpretation to narrow down the types inside the if block in such a case.
  1. Add the method to the interface:

    interface User
    {
        /** @return string */
        public function getPassword();
    
        /** @return string */
        public function getDisplayName();
    }
    
Loading history...
267
            $record['context'] = array_merge($record['context'], $context);
268
269
            return $record;
270 2
        });
271
272 2
        return true;
273
    }
274
275
    /**
276
     * {@inheritdoc}
277
     */
278
    protected function export(CollectionInterface $collection, array $filter = [], array $endpoints = [], bool $simulate = false, bool $ignore = false): bool
279
    {
280
        $this->logger->info('start export to destination endpoints from data type ['.$collection->getIdentifier().']', [
281
            'category' => get_class($this),
282
        ]);
283
284
        $endpoints = iterator_to_array($collection->getDestinationEndpoints($endpoints));
285
        $workflows = [];
286
287
        foreach ($endpoints as $ep) {
288
            if ($ep->flushRequired()) {
289
                $ep->flush($simulate);
290
            }
291
292
            $ep->setup($simulate);
293
        }
294
295
        $total = $collection->countObjects($filter);
296
        $i = 0;
297
298
        foreach ($collection->getObjects($filter) as $id => $object) {
299
            if ($total !== 0) {
300
                $this->updateProgress($i / $total * 100);
301
            }
302
303
            ++$i;
304
            $this->logger->debug('process ['.$i.'] export for object ['.(string) $id.'] - [{fields}] from data type ['.$collection->getIdentifier().']', [
305
                'category' => get_class($this),
306
                'fields' => array_keys($object->toArray()),
307
            ]);
308
309
            foreach ($endpoints as $ep) {
310
                $identifier = $ep->getIdentifier();
311
                $this->logger->info('start export to destination endpoint ['.$identifier.']', [
312
                    'category' => get_class($this),
313
                ]);
314
315
                if (!isset($workflows[$identifier])) {
316
                    $workflows[$identifier] = iterator_to_array($ep->getWorkflows(['kind' => 'Workflow']));
317
318
                    if (count($workflows[$identifier]) === 0) {
319
                        $this->logger->warning('no workflows available in destination endpoint ['.$ep->getIdentifier().'], skip export', [
320
                            'category' => get_class($this),
321
                        ]);
322
323
                        continue;
324
                    }
325
                }
326
327
                try {
328
                    foreach ($workflows[$identifier] as $workflow) {
329
                        $this->logger->debug('start workflow ['.$workflow->getIdentifier().'] [ensure='.$workflow->getEnsure().'] for the current object', [
330
                            'category' => get_class($this),
331
                        ]);
332
333
                        if ($workflow->export($object, $this->timestamp, $simulate) === true) {
334
                            $this->logger->debug('workflow ['.$workflow->getIdentifier().'] executed for the object ['.(string) $id.'], skip any further workflows for the current data object', [
335
                                'category' => get_class($this),
336
                            ]);
337
338
                            continue 2;
339
                        }
340
                        $this->logger->debug('skip workflow ['.$workflow->getIdentifier().'] for object ['.(string) $id.'], condition does not match or unusable ensure', [
341
                                'category' => get_class($this),
342
                            ]);
343
                    }
344
345
                    $this->logger->debug('no workflow were executed within endpoint ['.$identifier.'] for the current object', [
346
                        'category' => get_class($this),
347
                    ]);
348
                } catch (\Throwable $e) {
349
                    ++$this->error_count;
350
351
                    $this->logger->error('failed export object to destination endpoint ['.$identifier.']', [
352
                        'category' => get_class($this),
353
                        'object' => $object->getId(),
354
                        'exception' => $e,
355
                    ]);
356
357
                    if ($ignore === false) {
358
                        return false;
359
                    }
360
                }
361
            }
362
        }
363
364
        if (count($endpoints) === 0) {
365
            $this->logger->warning('no destination endpoint available for collection ['.$collection->getIdentifier().'], skip export', [
366
                'category' => get_class($this),
367
            ]);
368
369
            return true;
370
        }
371
372
        foreach ($endpoints as $n => $ep) {
373
            $ep->shutdown($simulate);
374
        }
375
376
        return true;
377
    }
378
379
    /**
380
     * {@inheritdoc}
381
     */
382
    protected function import(CollectionInterface $collection, array $filter = [], array $endpoints = [], bool $simulate = false, bool $ignore = false): bool
383
    {
384
        $this->logger->info('start import from source endpoints into data type ['.$collection->getIdentifier().']', [
385
            'category' => get_class($this),
386
        ]);
387
388
        $endpoints = $collection->getSourceEndpoints($endpoints);
389
        $workflows = [];
390
391
        foreach ($endpoints as $ep) {
392
            $identifier = $ep->getIdentifier();
393
            $this->logger->info('start import from source endpoint ['.$identifier.']', [
394
                'category' => get_class($this),
395
            ]);
396
397
            if ($ep->flushRequired()) {
398
                $collection->flush($simulate);
399
            }
400
401
            $ep->setup($simulate);
402
            if (!isset($workflows[$identifier])) {
403
                $workflows[$identifier] = iterator_to_array($ep->getWorkflows(['kind' => 'Workflow']));
404
405
                if (count($workflows[$identifier]) === 0) {
406
                    $this->logger->warning('no workflows available in source endpoint ['.$ep->getIdentifier().'], skip import', [
407
                        'category' => get_class($this),
408
                    ]);
409
410
                    continue;
411
                }
412
            }
413
414
            $i = 0;
415
            $total = $ep->count($filter);
416
417
            foreach ($ep->getAll($filter) as $id => $object) {
418
                if ($total !== 0) {
419
                    $this->updateProgress($i / $total * 100);
420
                }
421
422
                ++$i;
423
                $this->logger->debug('process object ['.$i.'] import for object ['.$object->getId().'] into data type ['.$collection->getIdentifier().']', [
424
                    'category' => get_class($this),
425
                    'attributes' => $object,
426
                ]);
427
428
                try {
429
                    foreach ($workflows[$identifier] as $workflow) {
430
                        $this->logger->debug('start workflow ['.$workflow->getIdentifier().'] [ensure='.$workflow->getEnsure().'] for the current object', [
431
                            'category' => get_class($this),
432
                        ]);
433
434
                        if ($workflow->import($collection, $object, $this->timestamp, $simulate) === true) {
435
                            $this->logger->debug('workflow ['.$workflow->getIdentifier().'] executed for the object ['.(string) $object->getId().'], skip any further workflows for the current data object', [
436
                                'category' => get_class($this),
437
                            ]);
438
439
                            continue 2;
440
                        }
441
                        $this->logger->debug('skip workflow ['.$workflow->getIdentifier().'] for object ['.(string) $object->getId().'], condition does not match or unusable ensure', [
442
                                'category' => get_class($this),
443
                            ]);
444
                    }
445
446
                    $this->logger->debug('no workflow were executed within endpoint ['.$identifier.'] for the current object', [
447
                        'category' => get_class($this),
448
                    ]);
449
                } catch (\Throwable $e) {
450
                    ++$this->error_count;
451
452
                    $this->logger->error('failed import data object from source endpoint ['.$identifier.']', [
453
                        'category' => get_class($this),
454
                        'namespace' => $collection->getResourceNamespace()->getName(),
455
                        'collection' => $collection->getName(),
456
                        'endpoint' => $ep->getName(),
457
                        'exception' => $e,
458
                    ]);
459
460
                    if ($ignore === false) {
461
                        return false;
462
                    }
463
                }
464
            }
465
466
            if (empty($filter)) {
467
                $this->garbageCollector($collection, $ep, $simulate, $ignore);
468
            } else {
469
                $this->logger->info('skip garbage collection, a query has been issued for import', [
470
                    'category' => get_class($this),
471
                ]);
472
            }
473
474
            $ep->shutdown($simulate);
475
        }
476
477
        if ($endpoints->getReturn() === 0) {
478
            $this->logger->warning('no source endpoint available for collection ['.$collection->getIdentifier().'], skip import', [
479
                'category' => get_class($this),
480
            ]);
481
482
            return true;
483
        }
484
485
        return true;
486
    }
487
488
    /**
489
     * Garbage.
490
     */
491
    protected function garbageCollector(CollectionInterface $collection, EndpointInterface $endpoint, bool $simulate = false, bool $ignore = false): bool
492
    {
493
        $this->logger->info('start garbage collector workflows from data type ['.$collection->getIdentifier().']', [
494
            'category' => get_class($this),
495
        ]);
496
497
        $filter = [
498
            'endpoints.'.$endpoint->getName().'.last_sync' => [
499
                '$lt' => $this->timestamp,
500
            ],
501
        ];
502
503
        $this->db->{$collection->getCollection()}->updateMany($filter, ['$set' => [
504
            'endpoints.'.$endpoint->getName().'.garbage' => true,
505
        ]]);
506
507
        $workflows = iterator_to_array($endpoint->getWorkflows(['kind' => 'GarbageWorkflow']));
508
        if (count($workflows) === 0) {
509
            $this->logger->info('no garbage workflows available in ['.$endpoint->getIdentifier().'], skip garbage collection', [
510
                'category' => get_class($this),
511
            ]);
512
513
            return false;
514
        }
515
516
        $i = 0;
517
        foreach ($collection->getObjects($filter, false) as $id => $object) {
518
            ++$i;
519
            $this->logger->debug('process ['.$i.'] garbage workflows for garbage object ['.$id.'] from data type ['.$collection->getIdentifier().']', [
520
                'category' => get_class($this),
521
            ]);
522
523
            try {
524
                foreach ($workflows as $workflow) {
525
                    $this->logger->debug('start workflow ['.$workflow->getIdentifier().'] for the current garbage object', [
526
                        'category' => get_class($this),
527
                    ]);
528
529
                    if ($workflow->cleanup($object, $this->timestamp, $simulate) === true) {
530
                        $this->logger->debug('workflow ['.$workflow->getIdentifier().'] executed for the current garbage object, skip any further workflows for the current garbage object', [
531
                            'category' => get_class($this),
532
                        ]);
533
534
                        break;
535
                    }
536
                }
537
            } catch (\Exception $e) {
538
                $this->logger->error('failed execute garbage collector for object ['.$id.'] from collection ['.$collection->getIdentifier().']', [
539
                    'category' => get_class($this),
540
                    'exception' => $e,
541
                ]);
542
543
                if ($ignore === false) {
544
                    return false;
545
                }
546
            }
547
        }
548
549
        $this->relationGarbageCollector($collection, $endpoint, $workflows);
550
551
        return true;
552
    }
553
554
    /**
555
     * Relation garbage collector.
556
     */
557
    protected function relationGarbageCollector(CollectionInterface $collection, EndpointInterface $endpoint, $workflows)
0 ignored issues
show
Unused Code introduced by
The parameter $collection is not used and could be removed.

This check looks from parameters that have been defined for a function or method, but which are not used in the method body.

Loading history...
558
    {
559
        $namespace = $endpoint->getCollection()->getResourceNamespace()->getName();
560
        $collection = $endpoint->getCollection()->getName();
561
        $ep = $endpoint->getName();
562
        $key = join('/', [$namespace, $collection, $ep]);
563
564
        $filter = [
565
            'endpoints.'.$key.'.last_sync' => [
566
                '$lt' => $this->timestamp,
567
            ],
568
        ];
569
570
        $this->db->relations->updateMany($filter, ['$set' => [
571
            'endpoints.'.$key.'.garbage' => true,
572
        ]]);
573
574
        foreach ($workflows as $workflow) {
575
            foreach ($workflow->getAttributeMap()->getMap() as $attr) {
576
                if (isset($attr['map']) && $attr['map']['ensure'] === 'absent') {
577
                    $this->db->relations->deleteMany(['endpoints.'.$key.'.garbage' => true]);
578
                }
579
            }
580
        }
581
    }
582
583
    /**
584
     * Notify.
585
     */
586 2
    protected function notify(): bool
587
    {
588 2
        if ($this->data['notification']['enabled'] === false) {
589 2
            $this->logger->debug('skip notifiaction for process ['.$this->getId().'], notification is disabled', [
590 2
                'category' => get_class($this),
591
            ]);
592
593 2
            return false;
594
        }
595
596
        if (count($this->data['notification']['receiver']) === 0) {
597
            $this->logger->debug('skip notifiaction for process ['.$this->getId().'], no receiver configured', [
598
                'category' => get_class($this),
599
            ]);
600
        }
601
602
        $iso = $this->timestamp->toDateTime()->format('c');
603
604
        if ($this->error_count === 0) {
605
            $subject = 'Good job! The job finished with no errors';
606
            $body = "Hi there\n\nThe sync process ".(string) $this->getId()." started at $iso finished with no errors.";
607
        } else {
608
            $subject = "Job ended with $this->error_count errors";
609
            $body = "Hi there\n\nThe sync process ".(string) $this->getId()." started at $iso ended with $this->error_count errors.";
610
        }
611
612
        $mail = (new Message())
613
          ->setSubject($subject)
614
          ->setBody($body)
615
          ->setEncoding('UTF-8');
616
617
        foreach ($this->data['notification']['receiver'] as $receiver) {
618
            $mail->setTo($receiver);
619
620
            $this->logger->debug('send process notification ['.$this->getId().'] to ['.$receiver.']', [
621
                'category' => get_class($this),
622
            ]);
623
624
            $this->scheduler->addJob(Mail::class, $mail->toString(), [
625
                Scheduler::OPTION_RETRY => 1,
626
            ]);
627
        }
628
629
        return true;
630
    }
631
}
632