Passed
Pull Request — master (#6)
by Daniel
02:58
created

EventQueueConsumer::__construct()   A

Complexity

Conditions 1
Paths 1

Size

Total Lines 11
Code Lines 5

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
eloc 5
dl 0
loc 11
rs 10
c 0
b 0
f 0
cc 1
nc 1
nop 4
1
<?php
2
3
namespace Jellyfish\Event;
4
5
use Jellyfish\Event\Command\EventQueueConsumeCommand;
6
use Jellyfish\Process\ProcessFactoryInterface;
7
use Jellyfish\Queue\QueueClientInterface;
8
9
class EventQueueConsumer implements EventQueueConsumerInterface
10
{
11
    /**
12
     * @var \Jellyfish\Process\ProcessFactoryInterface
13
     */
14
    protected $processFactory;
15
16
    /**
17
     * @var \Jellyfish\Event\EventMapperInterface
18
     */
19
    protected $eventMapper;
20
21
    /**
22
     * @var \Jellyfish\Event\EventQueueNameGeneratorInterface
23
     */
24
    protected $eventQueueNameGenerator;
25
26
    /**
27
     * @var \Jellyfish\Queue\QueueClientInterface
28
     */
29
    protected $queueClient;
30
31
    /**
32
     * @var \Jellyfish\Process\ProcessInterface[]
33
     */
34
    protected $processList;
35
36
    /**
37
     * @param \Jellyfish\Process\ProcessFactoryInterface $processFactory
38
     * @param \Jellyfish\Event\EventMapperInterface $eventMapper
39
     * @param \Jellyfish\Event\EventQueueNameGeneratorInterface $eventQueueNameGenerator
40
     * @param \Jellyfish\Queue\QueueClientInterface $queueClient
41
     */
42
    public function __construct(
43
        ProcessFactoryInterface $processFactory,
44
        EventMapperInterface $eventMapper,
45
        EventQueueNameGeneratorInterface $eventQueueNameGenerator,
46
        QueueClientInterface $queueClient
47
    ) {
48
        $this->processFactory = $processFactory;
49
        $this->eventMapper = $eventMapper;
50
        $this->eventQueueNameGenerator = $eventQueueNameGenerator;
51
        $this->queueClient = $queueClient;
52
        $this->processList = [];
53
    }
54
55
    /**
56
     * @param string $eventName
57
     * @param string $listenerIdentifier
58
     *
59
     * @return \Jellyfish\Event\EventQueueConsumerInterface
60
     */
61
    public function dequeueEventAsProcess(string $eventName, string $listenerIdentifier): EventQueueConsumerInterface
62
    {
63
        $eventQueueName = $this->eventQueueNameGenerator->generate($eventName, $listenerIdentifier);
64
65
        if (!\array_key_exists($eventQueueName, $this->processList)) {
66
            $command = ['', EventQueueConsumeCommand::NAME, $eventName, $listenerIdentifier];
67
            $this->processList[$eventQueueName] = $this->processFactory->create($command);
68
        }
69
70
        $process = $this->processList[$eventQueueName];
71
72
        if (!$process->isLocked()) {
73
            $process->start();
74
        }
75
76
        return $this;
77
    }
78
79
    /**
80
     * @param string $eventName
81
     * @param string $listenerIdentifier
82
     *
83
     * @return \Jellyfish\Event\EventInterface|null
84
     */
85
    public function dequeueEvent(string $eventName, string $listenerIdentifier): ?EventInterface
86
    {
87
        $eventQueueName = $this->eventQueueNameGenerator->generate($eventName, $listenerIdentifier);
88
89
        $message = $this->queueClient->receiveMessage($eventQueueName);
90
91
        if ($message === null) {
92
            return null;
93
        }
94
95
        return $this->eventMapper->fromMessage($message);
96
    }
97
}
98