Completed
Push — master ( 9c0fcf...cbcb64 )
by Tobias
05:38
created

DirectPublisher::__destruct()   A

Complexity

Conditions 1
Paths 1

Size

Total Lines 4
Code Lines 2

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 0
CRAP Score 2

Importance

Changes 0
Metric Value
dl 0
loc 4
ccs 0
cts 4
cp 0
rs 10
c 0
b 0
f 0
cc 1
eloc 2
nc 1
nop 0
crap 2
1
<?php
2
3
namespace Happyr\SimpleBusBundle\Message\Publisher;
4
5
use Happyr\Mq2phpBundle\Service\ConsumerWrapper;
6
use Happyr\SimpleBusBundle\Message\DelayedMessage;
7
use SimpleBus\Asynchronous\Publisher\Publisher;
8
use SimpleBus\Serialization\Envelope\Serializer\MessageInEnvelopSerializer;
9
10
class DirectPublisher implements Publisher
11
{
12
    /**
13
     * @var ConsumerWrapper
14
     */
15
    private $consumer;
16
17
    /**
18
     * @var MessageInEnvelopSerializer
19
     */
20
    private $serializer;
21
22
    /**
23
     * @var string
24
     */
25
    private $queueName;
26
27
    private $messages = [];
28
29
    private $delayedMessages = [];
30
31
    /**
32
     * @param ConsumerWrapper $consumer
33
     */
34
    public function __construct(ConsumerWrapper $consumer, MessageInEnvelopSerializer $serializer, string $queueName)
35
    {
36
        $this->consumer = $consumer;
37
        $this->serializer = $serializer;
38
        $this->queueName = $queueName;
39
    }
40
41
    public function publish($message)
42
    {
43
        if ($message instanceof DelayedMessage) {
44
            $this->delayedMessages[$message->getDelayedTime()][] = $this->serializer->wrapAndSerialize($message);
45
        } else {
46
            $this->messages[] = $this->serializer->wrapAndSerialize($message);
47
        }
48
    }
49
50
    public function consume()
51
    {
52
        foreach ($this->messages as $message) {
53
            $this->doConsume($message);
54
        }
55
56
        ksort($this->delayedMessages);
57
        foreach ($this->delayedMessages as $messages) {
58
            foreach ($messages as $message) {
59
                $this->doConsume($message);
60
            }
61
        }
62
    }
63
64
    public function __destruct()
65
    {
66
        $this->consume();
67
    }
68
69
    /**
70
     * @param string $data
71
     */
72
    private function doConsume(string $data)
73
    {
74
        $message = json_decode($data, true);
75
        $body = $message['body'];
76
77
        $this->consumer->consume($this->queueName, $body);
78
    }
79
}
80