Passed
Pull Request — master (#31)
by Daniel
04:39
created

EventQueueConsumer::dequeueEvent()   A

Complexity

Conditions 2
Paths 2

Size

Total Lines 11
Code Lines 5

Duplication

Lines 0
Ratio 0 %

Importance

Changes 1
Bugs 0 Features 1
Metric Value
eloc 5
c 1
b 0
f 1
dl 0
loc 11
rs 10
cc 2
nc 2
nop 2
1
<?php
2
3
declare(strict_types=1);
4
5
namespace Jellyfish\Event;
6
7
use Jellyfish\Event\Command\EventQueueConsumeCommand;
8
use Jellyfish\Process\ProcessFactoryInterface;
9
use Jellyfish\Queue\QueueClientInterface;
10
11
use function array_key_exists;
12
13
class EventQueueConsumer implements EventQueueConsumerInterface
14
{
15
    /**
16
     * @var \Jellyfish\Process\ProcessFactoryInterface
17
     */
18
    protected $processFactory;
19
20
    /**
21
     * @var \Jellyfish\Event\EventMapperInterface
22
     */
23
    protected $eventMapper;
24
25
    /**
26
     * @var \Jellyfish\Event\EventQueueNameGeneratorInterface
27
     */
28
    protected $eventQueueNameGenerator;
29
30
    /**
31
     * @var \Jellyfish\Queue\QueueClientInterface
32
     */
33
    protected $queueClient;
34
35
    /**
36
     * @var \Jellyfish\Process\ProcessInterface[]
37
     */
38
    protected $processList;
39
40
    /**
41
     * @var string
42
     */
43
    protected $pathToConsole;
44
45
    /**
46
     * @param \Jellyfish\Process\ProcessFactoryInterface $processFactory
47
     * @param \Jellyfish\Event\EventMapperInterface $eventMapper
48
     * @param \Jellyfish\Event\EventQueueNameGeneratorInterface $eventQueueNameGenerator
49
     * @param \Jellyfish\Queue\QueueClientInterface $queueClient
50
     * @param string $rootDir
51
     */
52
    public function __construct(
53
        ProcessFactoryInterface $processFactory,
54
        EventMapperInterface $eventMapper,
55
        EventQueueNameGeneratorInterface $eventQueueNameGenerator,
56
        QueueClientInterface $queueClient,
57
        string $rootDir
58
    ) {
59
        $this->processFactory = $processFactory;
60
        $this->eventMapper = $eventMapper;
61
        $this->eventQueueNameGenerator = $eventQueueNameGenerator;
62
        $this->queueClient = $queueClient;
63
        $this->processList = [];
64
        $this->pathToConsole = \sprintf('%svendor/bin/console', $rootDir);
65
    }
66
67
    /**
68
     * @param string $eventName
69
     * @param string $listenerIdentifier
70
     *
71
     * @return \Jellyfish\Event\EventQueueConsumerInterface
72
     */
73
    public function dequeueAsProcess(string $eventName, string $listenerIdentifier): EventQueueConsumerInterface
74
    {
75
        $eventQueueName = $this->eventQueueNameGenerator->generate($eventName, $listenerIdentifier);
76
77
        if (!array_key_exists($eventQueueName, $this->processList)) {
78
            $command = [$this->pathToConsole, EventQueueConsumeCommand::NAME, $eventName, $listenerIdentifier];
79
            $this->processList[$eventQueueName] = $this->processFactory->create($command);
80
        }
81
82
        $process = $this->processList[$eventQueueName];
83
        $process->start();
84
85
        return $this;
86
    }
87
88
    /**
89
     * @param string $eventName
90
     * @param string $listenerIdentifier
91
     *
92
     * @return \Jellyfish\Event\EventInterface|null
93
     */
94
    public function dequeue(string $eventName, string $listenerIdentifier): ?EventInterface
95
    {
96
        $eventQueueName = $this->eventQueueNameGenerator->generate($eventName, $listenerIdentifier);
97
98
        $message = $this->queueClient->receiveMessage($eventQueueName);
99
100
        if ($message === null) {
101
            return null;
102
        }
103
104
        return $this->eventMapper->fromMessage($message);
105
    }
106
107
    /**
108
     * @param string $eventName
109
     * @param string $listenerIdentifier
110
     * @param int $chunkSize
111
     *
112
     * @return \Jellyfish\Event\EventInterface[]
113
     */
114
    public function dequeueBulk(string $eventName, string $listenerIdentifier, int $chunkSize): array
115
    {
116
        $eventQueueName = $this->eventQueueNameGenerator->generate($eventName, $listenerIdentifier);
117
        $messages = $this->queueClient->receiveMessages($eventQueueName, $chunkSize);
118
        $events = [];
119
120
        foreach ($messages as $message) {
121
            $events[] = $this->eventMapper->fromMessage($message);
122
        }
123
124
        return $events;
125
    }
126
}
127