EventQueueConsumer::dequeueAsProcess()   A
last analyzed

Complexity

Conditions 2
Paths 2

Size

Total Lines 13
Code Lines 7

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
eloc 7
c 0
b 0
f 0
dl 0
loc 13
rs 10
cc 2
nc 2
nop 2
1
<?php
2
3
declare(strict_types=1);
4
5
namespace Jellyfish\Event;
6
7
use Jellyfish\Event\Command\EventQueueConsumeCommand;
8
use Jellyfish\Process\ProcessFactoryInterface;
9
use Jellyfish\Queue\DestinationFactoryInterface;
10
use Jellyfish\Queue\DestinationInterface;
11
use Jellyfish\Queue\QueueClientInterface;
12
13
use function array_key_exists;
14
use function sprintf;
15
16
class EventQueueConsumer implements EventQueueConsumerInterface
17
{
18
    /**
19
     * @var \Jellyfish\Process\ProcessFactoryInterface
20
     */
21
    protected $processFactory;
22
23
    /**
24
     * @var \Jellyfish\Event\EventMapperInterface
25
     */
26
    protected $eventMapper;
27
28
    /**
29
     * @var \Jellyfish\Event\EventQueueNameGeneratorInterface
30
     */
31
    protected $eventQueueNameGenerator;
32
33
    /**
34
     * @var \Jellyfish\Queue\QueueClientInterface
35
     */
36
    protected $queueClient;
37
38
    /**
39
     * @var \Jellyfish\Process\ProcessInterface[]
40
     */
41
    protected $processList;
42
43
    /**
44
     * @var string
45
     */
46
    protected $pathToConsole;
47
48
    /**
49
     * @var \Jellyfish\Queue\DestinationFactoryInterface
50
     */
51
    protected $destinationFactory;
52
53
    /**
54
     * @param \Jellyfish\Process\ProcessFactoryInterface $processFactory
55
     * @param \Jellyfish\Event\EventMapperInterface $eventMapper
56
     * @param \Jellyfish\Event\EventQueueNameGeneratorInterface $eventQueueNameGenerator
57
     * @param \Jellyfish\Queue\QueueClientInterface $queueClient
58
     * @param \Jellyfish\Queue\DestinationFactoryInterface $destinationFactory
59
     * @param string $rootDir
60
     */
61
    public function __construct(
62
        ProcessFactoryInterface $processFactory,
63
        EventMapperInterface $eventMapper,
64
        EventQueueNameGeneratorInterface $eventQueueNameGenerator,
65
        QueueClientInterface $queueClient,
66
        DestinationFactoryInterface $destinationFactory,
67
        string $rootDir
68
    ) {
69
        $this->processFactory = $processFactory;
70
        $this->eventMapper = $eventMapper;
71
        $this->eventQueueNameGenerator = $eventQueueNameGenerator;
72
        $this->queueClient = $queueClient;
73
        $this->destinationFactory = $destinationFactory;
74
        $this->processList = [];
75
        $this->pathToConsole = sprintf('%svendor/bin/console', $rootDir);
76
    }
77
78
    /**
79
     * @param string $eventName
80
     * @param string $listenerIdentifier
81
     *
82
     * @return \Jellyfish\Event\EventQueueConsumerInterface
83
     */
84
    public function dequeueAsProcess(string $eventName, string $listenerIdentifier): EventQueueConsumerInterface
85
    {
86
        $eventQueueName = $this->eventQueueNameGenerator->generate($eventName, $listenerIdentifier);
87
88
        if (!array_key_exists($eventQueueName, $this->processList)) {
89
            $command = [$this->pathToConsole, EventQueueConsumeCommand::NAME, $eventName, $listenerIdentifier];
90
            $this->processList[$eventQueueName] = $this->processFactory->create($command);
91
        }
92
93
        $process = $this->processList[$eventQueueName];
94
        $process->start();
95
96
        return $this;
97
    }
98
99
    /**
100
     * @param string $eventName
101
     * @param string $listenerIdentifier
102
     *
103
     * @return \Jellyfish\Event\EventInterface|null
104
     */
105
    public function dequeue(string $eventName, string $listenerIdentifier): ?EventInterface
106
    {
107
        $eventQueueName = $this->eventQueueNameGenerator->generate($eventName, $listenerIdentifier);
108
109
        $destination = $this->destinationFactory->create()
110
            ->setName($eventQueueName)
111
            ->setType(DestinationInterface::TYPE_FANOUT)
112
            ->setProperty('bind', $eventName);
113
114
        $message = $this->queueClient->receiveMessage($destination);
115
116
        if ($message === null) {
117
            return null;
118
        }
119
120
        return $this->eventMapper->fromMessage($message);
121
    }
122
123
    /**
124
     * @param string $eventName
125
     * @param string $listenerIdentifier
126
     * @param int $chunkSize
127
     *
128
     * @return \Jellyfish\Event\EventInterface[]
129
     */
130
    public function dequeueBulk(string $eventName, string $listenerIdentifier, int $chunkSize): array
131
    {
132
        $eventQueueName = $this->eventQueueNameGenerator->generate($eventName, $listenerIdentifier);
133
134
        $destination = $this->destinationFactory->create()
135
            ->setName($eventQueueName)
136
            ->setType(DestinationInterface::TYPE_FANOUT)
137
            ->setProperty('bind', $eventName);
138
139
        $messages = $this->queueClient->receiveMessages($destination, $chunkSize);
140
141
        $events = [];
142
143
        foreach ($messages as $message) {
144
            $events[] = $this->eventMapper->fromMessage($message);
145
        }
146
147
        return $events;
148
    }
149
}
150