Completed
Pull Request — master (#6)
by Daniel
02:39
created

EventQueueConsumer   A

Complexity

Total Complexity 6

Size/Duplication

Total Lines 94
Duplicated Lines 0 %

Importance

Changes 0
Metric Value
wmc 6
eloc 26
dl 0
loc 94
rs 10
c 0
b 0
f 0

3 Methods

Rating   Name   Duplication   Size   Complexity  
A __construct() 0 13 1
A dequeueEventAsProcess() 0 16 3
A dequeueEvent() 0 11 2
1
<?php
2
3
namespace Jellyfish\Event;
4
5
use Jellyfish\Event\Command\EventQueueConsumeCommand;
6
use Jellyfish\Process\ProcessFactoryInterface;
7
use Jellyfish\Queue\QueueClientInterface;
8
9
class EventQueueConsumer implements EventQueueConsumerInterface
10
{
11
    /**
12
     * @var \Jellyfish\Process\ProcessFactoryInterface
13
     */
14
    protected $processFactory;
15
16
    /**
17
     * @var \Jellyfish\Event\EventMapperInterface
18
     */
19
    protected $eventMapper;
20
21
    /**
22
     * @var \Jellyfish\Event\EventQueueNameGeneratorInterface
23
     */
24
    protected $eventQueueNameGenerator;
25
26
    /**
27
     * @var \Jellyfish\Queue\QueueClientInterface
28
     */
29
    protected $queueClient;
30
31
    /**
32
     * @var \Jellyfish\Process\ProcessInterface[]
33
     */
34
    protected $processList;
35
36
    /**
37
     * @var string
38
     */
39
    protected $rootDir;
40
41
    /**
42
     * @param \Jellyfish\Process\ProcessFactoryInterface $processFactory
43
     * @param \Jellyfish\Event\EventMapperInterface $eventMapper
44
     * @param \Jellyfish\Event\EventQueueNameGeneratorInterface $eventQueueNameGenerator
45
     * @param \Jellyfish\Queue\QueueClientInterface $queueClient
46
     */
47
    public function __construct(
48
        ProcessFactoryInterface $processFactory,
49
        EventMapperInterface $eventMapper,
50
        EventQueueNameGeneratorInterface $eventQueueNameGenerator,
51
        QueueClientInterface $queueClient,
52
        string $rootDir
53
    ) {
54
        $this->processFactory = $processFactory;
55
        $this->eventMapper = $eventMapper;
56
        $this->eventQueueNameGenerator = $eventQueueNameGenerator;
57
        $this->queueClient = $queueClient;
58
        $this->processList = [];
59
        $this->rootDir = $rootDir;
60
    }
61
62
    /**
63
     * @param string $eventName
64
     * @param string $listenerIdentifier
65
     *
66
     * @return \Jellyfish\Event\EventQueueConsumerInterface
67
     */
68
    public function dequeueEventAsProcess(string $eventName, string $listenerIdentifier): EventQueueConsumerInterface
69
    {
70
        $eventQueueName = $this->eventQueueNameGenerator->generate($eventName, $listenerIdentifier);
71
72
        if (!\array_key_exists($eventQueueName, $this->processList)) {
73
            $command = [$this->rootDir . 'vendor/bin/console', EventQueueConsumeCommand::NAME, $eventName, $listenerIdentifier];
74
            $this->processList[$eventQueueName] = $this->processFactory->create($command);
75
        }
76
77
        $process = $this->processList[$eventQueueName];
78
79
        if (!$process->isLocked()) {
80
            $process->start();
81
        }
82
83
        return $this;
84
    }
85
86
    /**
87
     * @param string $eventName
88
     * @param string $listenerIdentifier
89
     *
90
     * @return \Jellyfish\Event\EventInterface|null
91
     */
92
    public function dequeueEvent(string $eventName, string $listenerIdentifier): ?EventInterface
93
    {
94
        $eventQueueName = $this->eventQueueNameGenerator->generate($eventName, $listenerIdentifier);
95
96
        $message = $this->queueClient->receiveMessage($eventQueueName);
97
98
        if ($message === null) {
99
            return null;
100
        }
101
102
        return $this->eventMapper->fromMessage($message);
103
    }
104
}
105