Completed
Push — master ( c2ba48...1fd225 )
by Markus
14s queued 12s
created

EventQueueConsumer::dequeueEvent()   A

Complexity

Conditions 2
Paths 2

Size

Total Lines 11
Code Lines 5

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
eloc 5
dl 0
loc 11
rs 10
c 0
b 0
f 0
cc 2
nc 2
nop 2
1
<?php
2
3
namespace Jellyfish\Event;
4
5
use Jellyfish\Event\Command\EventQueueConsumeCommand;
6
use Jellyfish\Process\ProcessFactoryInterface;
7
use Jellyfish\Queue\QueueClientInterface;
8
9
class EventQueueConsumer implements EventQueueConsumerInterface
10
{
11
    /**
12
     * @var \Jellyfish\Process\ProcessFactoryInterface
13
     */
14
    protected $processFactory;
15
16
    /**
17
     * @var \Jellyfish\Event\EventMapperInterface
18
     */
19
    protected $eventMapper;
20
21
    /**
22
     * @var \Jellyfish\Event\EventQueueNameGeneratorInterface
23
     */
24
    protected $eventQueueNameGenerator;
25
26
    /**
27
     * @var \Jellyfish\Queue\QueueClientInterface
28
     */
29
    protected $queueClient;
30
31
    /**
32
     * @var \Jellyfish\Process\ProcessInterface[]
33
     */
34
    protected $processList;
35
36
    /**
37
     * @var string
38
     */
39
    protected $pathToConsole;
40
41
    /**
42
     * @param \Jellyfish\Process\ProcessFactoryInterface $processFactory
43
     * @param \Jellyfish\Event\EventMapperInterface $eventMapper
44
     * @param \Jellyfish\Event\EventQueueNameGeneratorInterface $eventQueueNameGenerator
45
     * @param \Jellyfish\Queue\QueueClientInterface $queueClient
46
     * @param string $rootDir
47
     */
48
    public function __construct(
49
        ProcessFactoryInterface $processFactory,
50
        EventMapperInterface $eventMapper,
51
        EventQueueNameGeneratorInterface $eventQueueNameGenerator,
52
        QueueClientInterface $queueClient,
53
        string $rootDir
54
    ) {
55
        $this->processFactory = $processFactory;
56
        $this->eventMapper = $eventMapper;
57
        $this->eventQueueNameGenerator = $eventQueueNameGenerator;
58
        $this->queueClient = $queueClient;
59
        $this->processList = [];
60
        $this->pathToConsole = sprintf('%svendor/bin/console', $rootDir);
61
    }
62
63
    /**
64
     * @param string $eventName
65
     * @param string $listenerIdentifier
66
     *
67
     * @return \Jellyfish\Event\EventQueueConsumerInterface
68
     */
69
    public function dequeueEventAsProcess(string $eventName, string $listenerIdentifier): EventQueueConsumerInterface
70
    {
71
        $eventQueueName = $this->eventQueueNameGenerator->generate($eventName, $listenerIdentifier);
72
73
        if (!\array_key_exists($eventQueueName, $this->processList)) {
74
            $command = [$this->pathToConsole, EventQueueConsumeCommand::NAME, $eventName, $listenerIdentifier];
75
            $this->processList[$eventQueueName] = $this->processFactory->create($command);
76
        }
77
78
        $process = $this->processList[$eventQueueName];
79
80
        if (!$process->isLocked()) {
81
            $process->start();
82
        }
83
84
        return $this;
85
    }
86
87
    /**
88
     * @param string $eventName
89
     * @param string $listenerIdentifier
90
     *
91
     * @return \Jellyfish\Event\EventInterface|null
92
     */
93
    public function dequeueEvent(string $eventName, string $listenerIdentifier): ?EventInterface
94
    {
95
        $eventQueueName = $this->eventQueueNameGenerator->generate($eventName, $listenerIdentifier);
96
97
        $message = $this->queueClient->receiveMessage($eventQueueName);
98
99
        if ($message === null) {
100
            return null;
101
        }
102
103
        return $this->eventMapper->fromMessage($message);
104
    }
105
}
106