Completed
Push — master ( de5c9f...2ef464 )
by Daniel
21s queued 11s
created

EventQueueConsumer::dequeueEventAsProcess()   A

Complexity

Conditions 2
Paths 2

Size

Total Lines 13
Code Lines 7

Duplication

Lines 0
Ratio 0 %

Importance

Changes 1
Bugs 0 Features 1
Metric Value
eloc 7
c 1
b 0
f 1
dl 0
loc 13
rs 10
cc 2
nc 2
nop 2
1
<?php
2
3
declare(strict_types=1);
4
5
namespace Jellyfish\Event;
6
7
use Jellyfish\Event\Command\EventQueueConsumeCommand;
8
use Jellyfish\Process\ProcessFactoryInterface;
9
use Jellyfish\Queue\QueueClientInterface;
10
11
use function array_key_exists;
12
use function sprintf;
13
14
class EventQueueConsumer implements EventQueueConsumerInterface
15
{
16
    /**
17
     * @var \Jellyfish\Process\ProcessFactoryInterface
18
     */
19
    protected $processFactory;
20
21
    /**
22
     * @var \Jellyfish\Event\EventMapperInterface
23
     */
24
    protected $eventMapper;
25
26
    /**
27
     * @var \Jellyfish\Event\EventQueueNameGeneratorInterface
28
     */
29
    protected $eventQueueNameGenerator;
30
31
    /**
32
     * @var \Jellyfish\Queue\QueueClientInterface
33
     */
34
    protected $queueClient;
35
36
    /**
37
     * @var \Jellyfish\Process\ProcessInterface[]
38
     */
39
    protected $processList;
40
41
    /**
42
     * @var string
43
     */
44
    protected $pathToConsole;
45
46
    /**
47
     * @param \Jellyfish\Process\ProcessFactoryInterface $processFactory
48
     * @param \Jellyfish\Event\EventMapperInterface $eventMapper
49
     * @param \Jellyfish\Event\EventQueueNameGeneratorInterface $eventQueueNameGenerator
50
     * @param \Jellyfish\Queue\QueueClientInterface $queueClient
51
     * @param string $rootDir
52
     */
53
    public function __construct(
54
        ProcessFactoryInterface $processFactory,
55
        EventMapperInterface $eventMapper,
56
        EventQueueNameGeneratorInterface $eventQueueNameGenerator,
57
        QueueClientInterface $queueClient,
58
        string $rootDir
59
    ) {
60
        $this->processFactory = $processFactory;
61
        $this->eventMapper = $eventMapper;
62
        $this->eventQueueNameGenerator = $eventQueueNameGenerator;
63
        $this->queueClient = $queueClient;
64
        $this->processList = [];
65
        $this->pathToConsole = sprintf('%svendor/bin/console', $rootDir);
66
    }
67
68
    /**
69
     * @param string $eventName
70
     * @param string $listenerIdentifier
71
     *
72
     * @return \Jellyfish\Event\EventQueueConsumerInterface
73
     */
74
    public function dequeueAsProcess(string $eventName, string $listenerIdentifier): EventQueueConsumerInterface
75
    {
76
        $eventQueueName = $this->eventQueueNameGenerator->generate($eventName, $listenerIdentifier);
77
78
        if (!array_key_exists($eventQueueName, $this->processList)) {
79
            $command = [$this->pathToConsole, EventQueueConsumeCommand::NAME, $eventName, $listenerIdentifier];
80
            $this->processList[$eventQueueName] = $this->processFactory->create($command);
81
        }
82
83
        $process = $this->processList[$eventQueueName];
84
        $process->start();
85
86
        return $this;
87
    }
88
89
    /**
90
     * @param string $eventName
91
     * @param string $listenerIdentifier
92
     *
93
     * @return \Jellyfish\Event\EventInterface|null
94
     */
95
    public function dequeue(string $eventName, string $listenerIdentifier): ?EventInterface
96
    {
97
        $eventQueueName = $this->eventQueueNameGenerator->generate($eventName, $listenerIdentifier);
98
99
        $message = $this->queueClient->receiveMessage($eventQueueName);
100
101
        if ($message === null) {
102
            return null;
103
        }
104
105
        return $this->eventMapper->fromMessage($message);
106
    }
107
108
    /**
109
     * @param string $eventName
110
     * @param string $listenerIdentifier
111
     * @param int $chunkSize
112
     *
113
     * @return \Jellyfish\Event\EventInterface[]
114
     */
115
    public function dequeueBulk(string $eventName, string $listenerIdentifier, int $chunkSize): array
116
    {
117
        $eventQueueName = $this->eventQueueNameGenerator->generate($eventName, $listenerIdentifier);
118
        $messages = $this->queueClient->receiveMessages($eventQueueName, $chunkSize);
119
        $events = [];
120
121
        foreach ($messages as $message) {
122
            $events[] = $this->eventMapper->fromMessage($message);
123
        }
124
125
        return $events;
126
    }
127
}
128