Completed
Push — master ( 519f76...318dd5 )
by Raffael
24:13 queued 15:29
created

Sync::import()   F

Complexity

Conditions 13
Paths 438

Size

Total Lines 105

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 0
CRAP Score 182

Importance

Changes 0
Metric Value
dl 0
loc 105
ccs 0
cts 58
cp 0
rs 2.5844
c 0
b 0
f 0
cc 13
nc 438
nop 5
crap 182

How to fix   Long Method    Complexity   

Long Method

Small methods make your code easier to understand, in particular if combined with a good name. Besides, if your method is small, finding a good name is usually much easier.

For example, if you find yourself adding comments to a method's body, this is usually a good sign to extract the commented part to a new method, and use the comment as a starting point when coming up with a good name for this new method.

Commonly applied refactorings include:

1
<?php
2
3
declare(strict_types=1);
4
5
/**
6
 * tubee
7
 *
8
 * @copyright   Copryright (c) 2017-2019 gyselroth GmbH (https://gyselroth.com)
9
 * @license     GPL-3.0 https://opensource.org/licenses/GPL-3.0
10
 */
11
12
namespace Tubee\Async;
13
14
use MongoDB\BSON\UTCDateTime;
15
use MongoDB\Database;
16
use Monolog\Handler\MongoDBHandler;
17
use Monolog\Logger;
18
use Psr\Log\LoggerInterface;
19
use TaskScheduler\AbstractJob;
20
use TaskScheduler\Scheduler;
21
use Tubee\Collection\CollectionInterface;
22
use Tubee\Endpoint\EndpointInterface;
23
use Tubee\Helper;
24
use Tubee\Log\MongoDBFormatter;
25
use Tubee\ResourceNamespace\Factory as ResourceNamespaceFactory;
26
use Tubee\ResourceNamespace\ResourceNamespaceInterface;
27
use Zend\Mail\Message;
28
29
class Sync extends AbstractJob
30
{
31
    /**
32
     * Log levels.
33
     */
34
    public const LOG_LEVELS = [
35
        'debug' => Logger::DEBUG,
36
        'info' => Logger::INFO,
37
        'notice' => Logger::NOTICE,
38
        'warning' => Logger::WARNING,
39
        'error' => Logger::ERROR,
40
        'critical' => Logger::CRITICAL,
41
        'alert' => Logger::ALERT,
42
        'emergency' => Logger::EMERGENCY,
43
    ];
44
45
    /**
46
     * ResourceNamespace factory.
47
     *
48
     * @var ResourceNamespaceFactory
49
     */
50
    protected $namespace_factory;
51
52
    /**
53
     * Scheduler.
54
     *
55
     * @var Scheduler
56
     */
57
    protected $scheduler;
58
59
    /**
60
     * Logger.
61
     *
62
     * @var LoggerInterface
63
     */
64
    protected $logger;
65
66
    /**
67
     * Error count.
68
     *
69
     * @var int
70
     */
71
    protected $error_count = 0;
72
73
    /**
74
     * Start timestamp.
75
     *
76
     * @var UTCDateTime
77
     */
78
    protected $timestamp;
79
80
    /**
81
     * Database.
82
     *
83
     * @var Database
84
     */
85
    protected $db;
86
87
    /**
88
     * Process stack.
89
     *
90
     * @var array
91
     */
92
    protected $stack = [];
93
94
    /**
95
     * Resource namespace.
96
     *
97
     * @var ResourceNamespaceInterface
98
     */
99
    protected $namespace;
100
101
    /**
102
     * Sync.
103
     */
104 5
    public function __construct(ResourceNamespaceFactory $namespace_factory, Database $db, Scheduler $scheduler, LoggerInterface $logger)
105
    {
106 5
        $this->namespace_factory = $namespace_factory;
107 5
        $this->scheduler = $scheduler;
108 5
        $this->logger = $logger;
109 5
        $this->db = $db;
110 5
        $this->timestamp = new UTCDateTime();
111 5
    }
112
113
    /**
114
     * Start job.
115
     */
116 5
    public function start(): bool
117
    {
118 5
        $this->namespace = $this->namespace_factory->getOne($this->data['namespace']);
119
120 5
        foreach ($this->data['collections'] as $collections) {
121 5
            $collections = (array) $collections;
122 5
            $filter = in_array('*', $collections) ? [] : ['name' => ['$in' => $collections]];
123 5
            $collections = iterator_to_array($this->namespace->getCollections($filter));
124
125 5
            $endpoints = $this->data['endpoints'];
126 5
            $this->loopCollections($collections, $endpoints);
127
        }
128
129 2
        $this->notify();
130
131 2
        return true;
132
    }
133
134
    /**
135
     * Loop collections.
136
     */
137 5
    protected function loopCollections(array $collections, array $endpoints)
138
    {
139 5
        foreach ($endpoints as $ep) {
140 5
            foreach ($collections as $collection) {
141 5
                $this->loopEndpoints($collection, $collections, (array) $ep, $endpoints);
142
            }
143
144 5
            $this->logger->debug('wait for child stack ['.count($this->stack).'] to be finished', [
145 5
                'category' => get_class($this),
146
            ]);
147
148 5
            $i = 0;
149 5
            foreach ($this->stack as $proc) {
150 3
                ++$i;
151 3
                $proc->wait();
152 3
                $this->updateProgress($i / count($this->stack) * 100);
0 ignored issues
show
Bug introduced by
The method updateProgress() does not seem to exist on object<Tubee\Async\Sync>.

This check looks for calls to methods that do not seem to exist on a given type. It looks for the method on the type itself as well as in inherited classes or implemented interfaces.

This is most likely a typographical error or the method has been renamed.

Loading history...
153
154
                $record = $this->db->{$this->scheduler->getJobQueue()}->findOne([
155
                    '_id' => $proc->getId(),
156
                ]);
157
158
                $this->error_count += $record['data']['error_count'] ?? 0;
159
                $this->increaseErrorCount();
160
            }
161
162 2
            $this->stack = [];
163
        }
164 2
    }
165
166
    /**
167
     * Update error count.
168
     */
169 2
    protected function increaseErrorCount(): self
170
    {
171 2
        $this->db->{$this->scheduler->getJobQueue()}->updateOne([
172 2
            '_id' => $this->getId(),
173
            'data.error_count' => ['$exists' => true],
174
        ], [
175 2
            '$set' => ['data.error_count' => $this->error_count],
176
        ]);
177
178 2
        return $this;
179
    }
180
181
    /**
182
     * Loop endpoints.
183
     */
184 5
    protected function loopEndpoints(CollectionInterface $collection, array $all_collections, array $endpoints, array $all_endpoints)
185
    {
186 5
        $filter = in_array('*', $endpoints) ? [] : ['name' => ['$in' => $endpoints]];
187 5
        $endpoints = iterator_to_array($collection->getEndpoints($filter));
188
189 5
        foreach ($endpoints as $endpoint) {
190 5
            if (count($all_endpoints) > 1 || count($all_collections) > 1) {
191 3
                $data = $this->data;
192 3
                $data = array_merge($data, [
193 3
                    'collections' => [$collection->getName()],
194 3
                    'endpoints' => [$endpoint->getName()],
195 3
                    'parent' => $this->getId(),
196
                ]);
197
198 3
                $this->stack[] = $this->scheduler->addJob(self::class, $data);
199
            } else {
200 2
                $this->execute($collection, $endpoint);
201 2
                $this->increaseErrorCount();
202
            }
203
        }
204 5
    }
205
206
    /**
207
     * Execute.
208
     */
209 2
    protected function execute(CollectionInterface $collection, EndpointInterface $endpoint)
210
    {
211 2
        $this->setupLogger(self::LOG_LEVELS[$this->data['log_level']], [
212 2
            'process' => (string) $this->getId(),
213 2
            'parent' => isset($this->data['parent']) ? (string) $this->data['parent'] : null,
214 2
            'start' => $this->timestamp,
215 2
            'namespace' => $this->namespace->getName(),
216 2
            'collection' => $collection->getName(),
217 2
            'endpoint' => $endpoint->getName(),
218
        ]);
219
220 2
        if ($endpoint->getType() === EndpointInterface::TYPE_SOURCE) {
221
            $this->import($collection, $this->getFilter(), ['name' => $endpoint->getName()], $this->data['simulate'], $this->data['ignore']);
222 2
        } elseif ($endpoint->getType() === EndpointInterface::TYPE_DESTINATION) {
223
            $this->export($collection, $this->getFilter(), ['name' => $endpoint->getName()], $this->data['simulate'], $this->data['ignore']);
224
        } else {
225 2
            $this->logger->warning('skip endpoint ['.$endpoint->getIdentifier().'], endpoint type is neither source nor destination', [
226 2
                'category' => get_class($this),
227
            ]);
228
        }
229
230 2
        $this->logger->popProcessor();
0 ignored issues
show
Bug introduced by
It seems like you code against a concrete implementation and not the interface Psr\Log\LoggerInterface as the method popProcessor() does only exist in the following implementations of said interface: Monolog\Logger.

Let’s take a look at an example:

interface User
{
    /** @return string */
    public function getPassword();
}

class MyUser implements User
{
    public function getPassword()
    {
        // return something
    }

    public function getDisplayName()
    {
        // return some name.
    }
}

class AuthSystem
{
    public function authenticate(User $user)
    {
        $this->logger->info(sprintf('Authenticating %s.', $user->getDisplayName()));
        // do something.
    }
}

In the above example, the authenticate() method works fine as long as you just pass instances of MyUser. However, if you now also want to pass a different implementation of User which does not have a getDisplayName() method, the code will break.

Available Fixes

  1. Change the type-hint for the parameter:

    class AuthSystem
    {
        public function authenticate(MyUser $user) { /* ... */ }
    }
    
  2. Add an additional type-check:

    class AuthSystem
    {
        public function authenticate(User $user)
        {
            if ($user instanceof MyUser) {
                $this->logger->info(/** ... */);
            }
    
            // or alternatively
            if ( ! $user instanceof MyUser) {
                throw new \LogicException(
                    '$user must be an instance of MyUser, '
                   .'other instances are not supported.'
                );
            }
    
        }
    }
    
Note: PHP Analyzer uses reverse abstract interpretation to narrow down the types inside the if block in such a case.
  1. Add the method to the interface:

    interface User
    {
        /** @return string */
        public function getPassword();
    
        /** @return string */
        public function getDisplayName();
    }
    
Loading history...
231 2
    }
232
233
    /**
234
     * Decode filter.
235
     */
236
    protected function getFilter(): array
237
    {
238
        if ($this->data['filter'] === null) {
239
            return [];
240
        }
241
242
        return (array) Helper::jsonDecode($this->data['filter']);
243
    }
244
245
    /**
246
     * Set logger level.
247
     */
248 2
    protected function setupLogger(int $level, array $context): bool
249
    {
250 2
        if (isset($this->data['job'])) {
251
            $context['job'] = (string) $this->data['job'];
252
        }
253
254 2
        foreach ($this->logger->getHandlers() as $handler) {
0 ignored issues
show
Bug introduced by
It seems like you code against a concrete implementation and not the interface Psr\Log\LoggerInterface as the method getHandlers() does only exist in the following implementations of said interface: Monolog\Logger.

Let’s take a look at an example:

interface User
{
    /** @return string */
    public function getPassword();
}

class MyUser implements User
{
    public function getPassword()
    {
        // return something
    }

    public function getDisplayName()
    {
        // return some name.
    }
}

class AuthSystem
{
    public function authenticate(User $user)
    {
        $this->logger->info(sprintf('Authenticating %s.', $user->getDisplayName()));
        // do something.
    }
}

In the above example, the authenticate() method works fine as long as you just pass instances of MyUser. However, if you now also want to pass a different implementation of User which does not have a getDisplayName() method, the code will break.

Available Fixes

  1. Change the type-hint for the parameter:

    class AuthSystem
    {
        public function authenticate(MyUser $user) { /* ... */ }
    }
    
  2. Add an additional type-check:

    class AuthSystem
    {
        public function authenticate(User $user)
        {
            if ($user instanceof MyUser) {
                $this->logger->info(/** ... */);
            }
    
            // or alternatively
            if ( ! $user instanceof MyUser) {
                throw new \LogicException(
                    '$user must be an instance of MyUser, '
                   .'other instances are not supported.'
                );
            }
    
        }
    }
    
Note: PHP Analyzer uses reverse abstract interpretation to narrow down the types inside the if block in such a case.
  1. Add the method to the interface:

    interface User
    {
        /** @return string */
        public function getPassword();
    
        /** @return string */
        public function getDisplayName();
    }
    
Loading history...
255
            if ($handler instanceof MongoDBHandler) {
256
                $handler->setLevel($level);
257
                $handler->setFormatter(new MongoDBFormatter());
258
            }
259
        }
260
261 2
        while (count($this->logger->getProcessors()) > 1) {
0 ignored issues
show
Bug introduced by
It seems like you code against a concrete implementation and not the interface Psr\Log\LoggerInterface as the method getProcessors() does only exist in the following implementations of said interface: Monolog\Logger.

Let’s take a look at an example:

interface User
{
    /** @return string */
    public function getPassword();
}

class MyUser implements User
{
    public function getPassword()
    {
        // return something
    }

    public function getDisplayName()
    {
        // return some name.
    }
}

class AuthSystem
{
    public function authenticate(User $user)
    {
        $this->logger->info(sprintf('Authenticating %s.', $user->getDisplayName()));
        // do something.
    }
}

In the above example, the authenticate() method works fine as long as you just pass instances of MyUser. However, if you now also want to pass a different implementation of User which does not have a getDisplayName() method, the code will break.

Available Fixes

  1. Change the type-hint for the parameter:

    class AuthSystem
    {
        public function authenticate(MyUser $user) { /* ... */ }
    }
    
  2. Add an additional type-check:

    class AuthSystem
    {
        public function authenticate(User $user)
        {
            if ($user instanceof MyUser) {
                $this->logger->info(/** ... */);
            }
    
            // or alternatively
            if ( ! $user instanceof MyUser) {
                throw new \LogicException(
                    '$user must be an instance of MyUser, '
                   .'other instances are not supported.'
                );
            }
    
        }
    }
    
Note: PHP Analyzer uses reverse abstract interpretation to narrow down the types inside the if block in such a case.
  1. Add the method to the interface:

    interface User
    {
        /** @return string */
        public function getPassword();
    
        /** @return string */
        public function getDisplayName();
    }
    
Loading history...
262
            $this->logger->popProcessor();
0 ignored issues
show
Bug introduced by
It seems like you code against a concrete implementation and not the interface Psr\Log\LoggerInterface as the method popProcessor() does only exist in the following implementations of said interface: Monolog\Logger.

Let’s take a look at an example:

interface User
{
    /** @return string */
    public function getPassword();
}

class MyUser implements User
{
    public function getPassword()
    {
        // return something
    }

    public function getDisplayName()
    {
        // return some name.
    }
}

class AuthSystem
{
    public function authenticate(User $user)
    {
        $this->logger->info(sprintf('Authenticating %s.', $user->getDisplayName()));
        // do something.
    }
}

In the above example, the authenticate() method works fine as long as you just pass instances of MyUser. However, if you now also want to pass a different implementation of User which does not have a getDisplayName() method, the code will break.

Available Fixes

  1. Change the type-hint for the parameter:

    class AuthSystem
    {
        public function authenticate(MyUser $user) { /* ... */ }
    }
    
  2. Add an additional type-check:

    class AuthSystem
    {
        public function authenticate(User $user)
        {
            if ($user instanceof MyUser) {
                $this->logger->info(/** ... */);
            }
    
            // or alternatively
            if ( ! $user instanceof MyUser) {
                throw new \LogicException(
                    '$user must be an instance of MyUser, '
                   .'other instances are not supported.'
                );
            }
    
        }
    }
    
Note: PHP Analyzer uses reverse abstract interpretation to narrow down the types inside the if block in such a case.
  1. Add the method to the interface:

    interface User
    {
        /** @return string */
        public function getPassword();
    
        /** @return string */
        public function getDisplayName();
    }
    
Loading history...
263
        }
264
265 2
        $this->logger->pushProcessor(function ($record) use ($context) {
0 ignored issues
show
Bug introduced by
It seems like you code against a concrete implementation and not the interface Psr\Log\LoggerInterface as the method pushProcessor() does only exist in the following implementations of said interface: Monolog\Logger.

Let’s take a look at an example:

interface User
{
    /** @return string */
    public function getPassword();
}

class MyUser implements User
{
    public function getPassword()
    {
        // return something
    }

    public function getDisplayName()
    {
        // return some name.
    }
}

class AuthSystem
{
    public function authenticate(User $user)
    {
        $this->logger->info(sprintf('Authenticating %s.', $user->getDisplayName()));
        // do something.
    }
}

In the above example, the authenticate() method works fine as long as you just pass instances of MyUser. However, if you now also want to pass a different implementation of User which does not have a getDisplayName() method, the code will break.

Available Fixes

  1. Change the type-hint for the parameter:

    class AuthSystem
    {
        public function authenticate(MyUser $user) { /* ... */ }
    }
    
  2. Add an additional type-check:

    class AuthSystem
    {
        public function authenticate(User $user)
        {
            if ($user instanceof MyUser) {
                $this->logger->info(/** ... */);
            }
    
            // or alternatively
            if ( ! $user instanceof MyUser) {
                throw new \LogicException(
                    '$user must be an instance of MyUser, '
                   .'other instances are not supported.'
                );
            }
    
        }
    }
    
Note: PHP Analyzer uses reverse abstract interpretation to narrow down the types inside the if block in such a case.
  1. Add the method to the interface:

    interface User
    {
        /** @return string */
        public function getPassword();
    
        /** @return string */
        public function getDisplayName();
    }
    
Loading history...
266
            $record['context'] = array_merge($record['context'], $context);
267
268
            return $record;
269 2
        });
270
271 2
        return true;
272
    }
273
274
    /**
275
     * {@inheritdoc}
276
     */
277
    protected function export(CollectionInterface $collection, array $filter = [], array $endpoints = [], bool $simulate = false, bool $ignore = false): bool
278
    {
279
        $this->logger->info('start export to destination endpoints from data type ['.$collection->getIdentifier().']', [
280
            'category' => get_class($this),
281
        ]);
282
283
        $endpoints = iterator_to_array($collection->getDestinationEndpoints($endpoints));
284
        $workflows = [];
285
286
        foreach ($endpoints as $ep) {
287
            if ($ep->flushRequired()) {
288
                $ep->flush($simulate);
289
            }
290
291
            $ep->setup($simulate);
292
        }
293
294
        $total = $collection->countObjects($filter);
295
        $i = 0;
296
297
        foreach ($collection->getObjects($filter) as $id => $object) {
298
            if ($total !== 0) {
299
                $this->updateProgress($i / $total * 100);
0 ignored issues
show
Bug introduced by
The method updateProgress() does not seem to exist on object<Tubee\Async\Sync>.

This check looks for calls to methods that do not seem to exist on a given type. It looks for the method on the type itself as well as in inherited classes or implemented interfaces.

This is most likely a typographical error or the method has been renamed.

Loading history...
300
            }
301
302
            ++$i;
303
            $this->logger->debug('process ['.$i.'] export for object ['.(string) $id.'] - [{fields}] from data type ['.$collection->getIdentifier().']', [
304
                'category' => get_class($this),
305
                'fields' => array_keys($object->toArray()),
306
            ]);
307
308
            foreach ($endpoints as $ep) {
309
                $identifier = $ep->getIdentifier();
310
                $this->logger->info('start export to destination endpoint ['.$identifier.']', [
311
                    'category' => get_class($this),
312
                ]);
313
314
                if (!isset($workflows[$identifier])) {
315
                    $workflows[$identifier] = iterator_to_array($ep->getWorkflows(['kind' => 'Workflow']));
316
317
                    if (count($workflows[$identifier]) === 0) {
318
                        $this->logger->warning('no workflows available in destination endpoint ['.$ep->getIdentifier().'], skip export', [
319
                            'category' => get_class($this),
320
                        ]);
321
322
                        continue;
323
                    }
324
                }
325
326
                try {
327
                    foreach ($workflows[$identifier] as $workflow) {
328
                        $this->logger->debug('start workflow ['.$workflow->getIdentifier().'] [ensure='.$workflow->getEnsure().'] for the current object', [
329
                            'category' => get_class($this),
330
                        ]);
331
332
                        if ($workflow->export($object, $this->timestamp, $simulate) === true) {
333
                            $this->logger->debug('workflow ['.$workflow->getIdentifier().'] executed for the object ['.(string) $id.'], skip any further workflows for the current data object', [
334
                                'category' => get_class($this),
335
                            ]);
336
337
                            continue 2;
338
                        }
339
                        $this->logger->debug('skip workflow ['.$workflow->getIdentifier().'] for object ['.(string) $id.'], condition does not match or unusable ensure', [
340
                                'category' => get_class($this),
341
                            ]);
342
                    }
343
344
                    $this->logger->debug('no workflow were executed within endpoint ['.$identifier.'] for the current object', [
345
                        'category' => get_class($this),
346
                    ]);
347
                } catch (\Throwable $e) {
348
                    ++$this->error_count;
349
350
                    $this->logger->error('failed export object to destination endpoint ['.$identifier.']', [
351
                        'category' => get_class($this),
352
                        'object' => $object->getId(),
353
                        'exception' => $e,
354
                    ]);
355
356
                    if ($ignore === false) {
357
                        return false;
358
                    }
359
                }
360
            }
361
        }
362
363
        if (count($endpoints) === 0) {
364
            $this->logger->warning('no destination endpoint available for collection ['.$collection->getIdentifier().'], skip export', [
365
                'category' => get_class($this),
366
            ]);
367
368
            return true;
369
        }
370
371
        foreach ($endpoints as $n => $ep) {
372
            $ep->shutdown($simulate);
373
        }
374
375
        return true;
376
    }
377
378
    /**
379
     * {@inheritdoc}
380
     */
381
    protected function import(CollectionInterface $collection, array $filter = [], array $endpoints = [], bool $simulate = false, bool $ignore = false): bool
382
    {
383
        $this->logger->info('start import from source endpoints into data type ['.$collection->getIdentifier().']', [
384
            'category' => get_class($this),
385
        ]);
386
387
        $endpoints = $collection->getSourceEndpoints($endpoints);
388
        $workflows = [];
389
390
        foreach ($endpoints as $ep) {
391
            $identifier = $ep->getIdentifier();
392
            $this->logger->info('start import from source endpoint ['.$identifier.']', [
393
                'category' => get_class($this),
394
            ]);
395
396
            if ($ep->flushRequired()) {
397
                $collection->flush($simulate);
398
            }
399
400
            $ep->setup($simulate);
401
            if (!isset($workflows[$identifier])) {
402
                $workflows[$identifier] = iterator_to_array($ep->getWorkflows(['kind' => 'Workflow']));
403
404
                if (count($workflows[$identifier]) === 0) {
405
                    $this->logger->warning('no workflows available in source endpoint ['.$ep->getIdentifier().'], skip import', [
406
                        'category' => get_class($this),
407
                    ]);
408
409
                    continue;
410
                }
411
            }
412
413
            $i = 0;
414
            $total = $ep->count($filter);
415
416
            foreach ($ep->getAll($filter) as $id => $object) {
417
                if ($total !== 0) {
418
                    $this->updateProgress($i / $total * 100);
0 ignored issues
show
Bug introduced by
The method updateProgress() does not seem to exist on object<Tubee\Async\Sync>.

This check looks for calls to methods that do not seem to exist on a given type. It looks for the method on the type itself as well as in inherited classes or implemented interfaces.

This is most likely a typographical error or the method has been renamed.

Loading history...
419
                }
420
421
                ++$i;
422
                $this->logger->debug('process object ['.$i.'] import for object ['.$object->getId().'] into data type ['.$collection->getIdentifier().']', [
423
                    'category' => get_class($this),
424
                    'attributes' => $object,
425
                ]);
426
427
                try {
428
                    foreach ($workflows[$identifier] as $workflow) {
429
                        $this->logger->debug('start workflow ['.$workflow->getIdentifier().'] [ensure='.$workflow->getEnsure().'] for the current object', [
430
                            'category' => get_class($this),
431
                        ]);
432
433
                        if ($workflow->import($collection, $object, $this->timestamp, $simulate) === true) {
434
                            $this->logger->debug('workflow ['.$workflow->getIdentifier().'] executed for the object ['.(string) $object->getId().'], skip any further workflows for the current data object', [
435
                                'category' => get_class($this),
436
                            ]);
437
438
                            continue 2;
439
                        }
440
                        $this->logger->debug('skip workflow ['.$workflow->getIdentifier().'] for object ['.(string) $object->getId().'], condition does not match or unusable ensure', [
441
                                'category' => get_class($this),
442
                            ]);
443
                    }
444
445
                    $this->logger->debug('no workflow were executed within endpoint ['.$identifier.'] for the current object', [
446
                        'category' => get_class($this),
447
                    ]);
448
                } catch (\Throwable $e) {
449
                    ++$this->error_count;
450
451
                    $this->logger->error('failed import data object from source endpoint ['.$identifier.']', [
452
                        'category' => get_class($this),
453
                        'namespace' => $collection->getResourceNamespace()->getName(),
454
                        'collection' => $collection->getName(),
455
                        'endpoint' => $ep->getName(),
456
                        'exception' => $e,
457
                    ]);
458
459
                    if ($ignore === false) {
460
                        return false;
461
                    }
462
                }
463
            }
464
465
            if (empty($filter)) {
466
                $this->garbageCollector($collection, $ep, $simulate, $ignore);
467
            } else {
468
                $this->logger->info('skip garbage collection, a query has been issued for import', [
469
                    'category' => get_class($this),
470
                ]);
471
            }
472
473
            $ep->shutdown($simulate);
474
        }
475
476
        if ($endpoints->getReturn() === 0) {
477
            $this->logger->warning('no source endpoint available for collection ['.$collection->getIdentifier().'], skip import', [
478
                'category' => get_class($this),
479
            ]);
480
481
            return true;
482
        }
483
484
        return true;
485
    }
486
487
    /**
488
     * Garbage.
489
     */
490
    protected function garbageCollector(CollectionInterface $collection, EndpointInterface $endpoint, bool $simulate = false, bool $ignore = false): bool
491
    {
492
        $this->logger->info('start garbage collector workflows from data type ['.$collection->getIdentifier().']', [
493
            'category' => get_class($this),
494
        ]);
495
496
        $filter = [
497
            'endpoints.'.$endpoint->getName().'.last_sync' => [
498
                '$lt' => $this->timestamp,
499
            ],
500
        ];
501
502
        $this->db->{$collection->getCollection()}->updateMany($filter, ['$set' => [
503
            'endpoints.'.$endpoint->getName().'.garbage' => true,
504
        ]]);
505
506
        $workflows = iterator_to_array($endpoint->getWorkflows(['kind' => 'GarbageWorkflow']));
507
        if (count($workflows) === 0) {
508
            $this->logger->info('no garbage workflows available in ['.$endpoint->getIdentifier().'], skip garbage collection', [
509
                'category' => get_class($this),
510
            ]);
511
512
            return false;
513
        }
514
515
        $i = 0;
516
        foreach ($collection->getObjects($filter, false) as $id => $object) {
517
            ++$i;
518
            $this->logger->debug('process ['.$i.'] garbage workflows for garbage object ['.$id.'] from data type ['.$collection->getIdentifier().']', [
519
                'category' => get_class($this),
520
            ]);
521
522
            try {
523
                foreach ($workflows as $workflow) {
524
                    $this->logger->debug('start workflow ['.$workflow->getIdentifier().'] for the current garbage object', [
525
                        'category' => get_class($this),
526
                    ]);
527
528
                    if ($workflow->cleanup($object, $this->timestamp, $simulate) === true) {
529
                        $this->logger->debug('workflow ['.$workflow->getIdentifier().'] executed for the current garbage object, skip any further workflows for the current garbage object', [
530
                            'category' => get_class($this),
531
                        ]);
532
533
                        break;
534
                    }
535
                }
536
            } catch (\Exception $e) {
537
                $this->logger->error('failed execute garbage collector for object ['.$id.'] from collection ['.$collection->getIdentifier().']', [
538
                    'category' => get_class($this),
539
                    'exception' => $e,
540
                ]);
541
542
                if ($ignore === false) {
543
                    return false;
544
                }
545
            }
546
        }
547
548
        $this->relationGarbageCollector($collection, $endpoint, $workflows);
549
550
        return true;
551
    }
552
553
    /**
554
     * Relation garbage collector.
555
     */
556
    protected function relationGarbageCollector(CollectionInterface $collection, EndpointInterface $endpoint, $workflows)
0 ignored issues
show
Unused Code introduced by
The parameter $collection is not used and could be removed.

This check looks from parameters that have been defined for a function or method, but which are not used in the method body.

Loading history...
557
    {
558
        $namespace = $endpoint->getCollection()->getResourceNamespace()->getName();
559
        $collection = $endpoint->getCollection()->getName();
560
        $ep = $endpoint->getName();
561
        $key = join('/', [$namespace, $collection, $ep]);
562
563
        $filter = [
564
            'endpoints.'.$key.'.last_sync' => [
565
                '$lt' => $this->timestamp,
566
            ],
567
        ];
568
569
        $this->db->relations->updateMany($filter, ['$set' => [
570
            'endpoints.'.$key.'.garbage' => true,
571
        ]]);
572
573
        foreach ($workflows as $workflow) {
574
            foreach ($workflow->getAttributeMap()->getMap() as $attr) {
575
                if (isset($attr['map']) && $attr['map']['ensure'] === 'absent') {
576
                    $this->db->relations->deleteMany(['endpoints.'.$key.'.garbage' => true]);
577
                }
578
            }
579
        }
580
    }
581
582
    /**
583
     * Notify.
584
     */
585 2
    protected function notify(): bool
586
    {
587 2
        if ($this->data['notification']['enabled'] === false) {
588 2
            $this->logger->debug('skip notifiaction for process ['.$this->getId().'], notification is disabled', [
589 2
                'category' => get_class($this),
590
            ]);
591
592 2
            return false;
593
        }
594
595
        if (count($this->data['notification']['receiver']) === 0) {
596
            $this->logger->debug('skip notifiaction for process ['.$this->getId().'], no receiver configured', [
597
                'category' => get_class($this),
598
            ]);
599
        }
600
601
        $iso = $this->timestamp->toDateTime()->format('c');
602
603
        if ($this->error_count === 0) {
604
            $subject = 'Good job! The job finished with no errors';
605
            $body = "Hi there\n\nThe sync process ".(string) $this->getId()." started at $iso finished with no errors.";
606
        } else {
607
            $subject = "Job ended with $this->error_count errors";
608
            $body = "Hi there\n\nThe sync process ".(string) $this->getId()." started at $iso ended with $this->error_count errors.";
609
        }
610
611
        $mail = (new Message())
612
          ->setSubject($subject)
613
          ->setBody($body)
614
          ->setEncoding('UTF-8');
615
616
        foreach ($this->data['notification']['receiver'] as $receiver) {
617
            $mail->setTo($receiver);
618
619
            $this->logger->debug('send process notification ['.$this->getId().'] to ['.$receiver.']', [
620
                'category' => get_class($this),
621
            ]);
622
623
            $this->scheduler->addJob(Mail::class, $mail->toString(), [
624
                Scheduler::OPTION_RETRY => 1,
625
            ]);
626
        }
627
628
        return true;
629
    }
630
}
631