Passed
Push — dev ( eeaa0f...91a85a )
by Janko
26:10
created

EventScheduler::scheduleEvent()   A

Complexity

Conditions 3
Paths 10

Size

Total Lines 21
Code Lines 16

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 15
CRAP Score 3

Importance

Changes 0
Metric Value
cc 3
eloc 16
nc 10
nop 1
dl 0
loc 21
ccs 15
cts 15
cp 1
crap 3
rs 9.7333
c 0
b 0
f 0
1
<?php
2
3
namespace Stu\Component\Event;
4
5
use Exception;
6
use Stu\Component\Concurrency\EntityLockManagerInterface;
7
use Stu\Component\Event\Strategy\EventStrategyFactoryInterface;
8
use Stu\Module\Logging\LoggerTypeEnum;
9
use Stu\Module\Logging\LoggerUtilFactoryInterface;
10
use Stu\Module\Logging\RotatingLoggerInterface;
11
use Stu\Orm\Entity\EventInterface;
12
use Stu\Orm\Repository\EventRepositoryInterface;
13
use Stu\Orm\Transaction\EntityManagerTypeEnum;
14
use Stu\Orm\Transaction\SwitchableEntityManagerInterface;
15
16
class EventScheduler implements EventSchedulerInterface
17
{
18
    private RotatingLoggerInterface $logger;
19
20 4
    public function __construct(
21
        private EventRepositoryInterface $eventRepository,
22
        private EventQueueInterface $eventQueue,
23
        private EntityLockManagerInterface $entityLockManager,
24
        private EventProcessorInterface $eventProcessor,
25
        private SwitchableEntityManagerInterface $switchableEntityManager,
26
        private EventStrategyFactoryInterface $eventStrategyFactory,
27
        LoggerUtilFactoryInterface $loggerUtilFactory
28
    ) {
29 4
        $this->logger = $loggerUtilFactory->getRotatingLogger(LoggerTypeEnum::EVENT_AND_ENTITY_LOCK);
30
    }
31
32 4
    public function processEvents(bool $doInfiniteLoops = false): void
33
    {
34
35
        do {
36
37 4
            $this->switchableEntityManager->switchType(EntityManagerTypeEnum::EVENT_SCHEDULER);
38
39 4
            $queueSize = $this->eventQueue->getSize();
40
41 4
            if ($queueSize > 0) {
42 3
                $this->logger->log('start event scheduling');
43 3
                $this->logger->logf('  queue size: %d', $queueSize);
44
            } else {
45 1
                continue;
46
            }
47
48 3
            while ($this->eventQueue->getSize() > 0) {
49
50 3
                $event = $this->eventQueue->getNextEvent();
51
52 3
                $this->logger->logf('  now %s', $event->toShortString());
53
54 3
                $this->scheduleEvent($event);
55
            }
56
57 3
            $this->switchableEntityManager->flush();
58
59 3
            sleep(1);
60 4
        } while ($doInfiniteLoops);
61
    }
62
63 3
    private function scheduleEvent(EventInterface $event): void
64
    {
65
        try {
66 3
            $eventStrategy = $this->eventStrategyFactory->createEventStrategy($event->getType());
67
68 3
            if ($this->entityLockManager->lockEntities($eventStrategy->getEntitiesToLock($event))) {
69 2
                $this->logger->log('    processing...');
70 2
                $this->eventProcessor->processEvent($event, $eventStrategy);
71 1
                $event->setState(EventStateEnum::DONE);
72 1
                $this->logger->log('    done.');
73
            } else {
74 1
                $this->logger->log('    could not lock entities, postponed');
75 2
                $event->increaseRetries();
76
            }
77 1
        } catch (Exception $e) {
78 1
            $event->setState(EventStateEnum::ERROR);
79 1
            $this->eventQueue->clear();
80 1
            $this->logger->logf('    event caused exception: %s', $e->getMessage());
81
        }
82 3
        $this->eventRepository->save($event);
83 3
        $this->eventQueue->removeEvent($event);
84
    }
85
}
86