Completed
Pull Request — master (#143)
by Kristof
09:50 queued 04:32
created

AMQPPublisher::setContentType()   A

Complexity

Conditions 4
Paths 4

Size

Total Lines 21
Code Lines 11

Duplication

Lines 0
Ratio 0 %
Metric Value
dl 0
loc 21
rs 9.0534
cc 4
eloc 11
nc 4
nop 2
1
<?php
2
/**
3
 * @file
4
 */
5
6
namespace CultuurNet\UDB3\EventHandling;
7
8
use Broadway\Domain\DomainMessage;
9
use Broadway\EventHandling\EventListenerInterface;
10
use Broadway\Serializer\SerializableInterface;
11
use CultuurNet\UDB3\EventHandling\DomainMessage\SpecificationInterface;
12
use PhpAmqpLib\Channel\AMQPChannel;
13
use PhpAmqpLib\Message\AMQPMessage;
14
use Psr\Log\LoggerAwareInterface;
15
use Psr\Log\LoggerAwareTrait;
16
use Psr\Log\NullLogger;
17
18
class AMQPPublisher implements EventListenerInterface, LoggerAwareInterface
19
{
20
    use LoggerAwareTrait;
21
22
    /**
23
     * @var string
24
     */
25
    private $exchange;
26
27
    /**
28
     * @var SpecificationInterface
29
     */
30
    private $domainMessageSpecification;
31
32
    /**
33
     * @var AMQPChannel
34
     */
35
    private $channel;
36
37
    /**
38
     * @var string[]
39
     */
40
    private $payloadClassToContentTypeMap;
41
42
    /**
43
     * @param $exchange
44
     * @param SpecificationInterface $domainMessageSpecification
45
     */
46
    public function __construct(
47
        AMQPChannel $channel,
48
        $exchange,
49
        SpecificationInterface $domainMessageSpecification
50
    ) {
51
        $this->channel = $channel;
52
        $this->exchange = $exchange;
53
        $this->domainMessageSpecification = $domainMessageSpecification;
54
        $this->logger = new NullLogger();
55
    }
56
57
    /**
58
     * @inheritdoc
59
     */
60
    public function handle(DomainMessage $domainMessage)
61
    {
62
        if ($this->domainMessageSpecification->isSatisfiedBy($domainMessage)) {
63
            $this->publishWithAMQP($domainMessage);
64
            return;
65
        }
66
67
        $this->logger->info(
68
            'specification ' . get_class($this->domainMessageSpecification) . ' was not satisified by message'
69
        );
70
    }
71
72
    /**
73
     * @param DomainMessage $domainMessage
74
     */
75
    private function publishWithAMQP(DomainMessage $domainMessage)
76
    {
77
        $key = null;
78
79
        $message = $this->createAMQPMessage($domainMessage);
80
81
        $this->logger->info('publishing to exchange ' . $this->exchange);
82
83
        $this->channel->basic_publish(
84
            $message,
85
            $this->exchange,
86
            $key
87
        );
88
    }
89
90
    /**
91
     * @param DomainMessage $domainMessage
92
     * @return AMQPMessage
93
     */
94
    private function createAMQPMessage(DomainMessage $domainMessage)
95
    {
96
        $body = $this->createAMQPBody($domainMessage);
97
        $properties = $this->createAMQPProperties($domainMessage);
98
99
        return new AMQPMessage($body, $properties);
100
    }
101
102
    /**
103
     * @param DomainMessage $domainMessage
104
     * @return string
105
     */
106
    private function createAMQPBody(DomainMessage $domainMessage)
107
    {
108
        $payload = $domainMessage->getPayload();
109
110
        if ($payload instanceof SerializableInterface) {
111
            return json_encode(
112
                $payload->serialize()
113
            );
114
        }
115
116
        throw new \RuntimeException(
117
            'Unable to serialize ' . get_class($payload)
118
        );
119
    }
120
121
    /**
122
     * @param DomainMessage $domainMessage
123
     * @return array
124
     */
125
    private function createAMQPProperties(DomainMessage $domainMessage)
126
    {
127
        $properties = [];
128
129
        $properties['content_type'] = $this->getContentType($domainMessage);
130
        $properties['correlation_id'] = $this->getCorrelationId($domainMessage);
131
132
        return $properties;
133
    }
134
135
    private function getCorrelationId(DomainMessage $domainMessage)
136
    {
137
        return $domainMessage->getId() . '-' . $domainMessage->getPlayhead();
138
    }
139
140
    private function getContentType(DomainMessage $domainMessage)
141
    {
142
        $payload = $domainMessage->getPayload();
143
        $payloadClass = get_class($payload);
144
145
        if (isset($this->payloadClassToContentTypeMap[$payloadClass])) {
146
            return $this->payloadClassToContentTypeMap[$payloadClass];
147
        }
148
149
        throw new \RuntimeException(
150
            'Unable to find the content type of ' . $payloadClass
151
        );
152
    }
153
154
    /**
155
     * @param string $payloadClass
156
     * @param string $contentType
157
     * @return static
158
     */
159
    public function withContentType($payloadClass, $contentType)
160
    {
161
        $c = clone $this;
162
        $c->setContentType($payloadClass, $contentType);
163
        return $c;
164
    }
165
166
    /**
167
     * @param string $payloadClass
168
     * @param string $contentType
169
     */
170
    private function setContentType($payloadClass, $contentType)
171
    {
172
        if (!is_string($payloadClass)) {
173
            throw new \InvalidArgumentException(
174
                'Value for argument payloadClass should be a string'
175
            );
176
        }
177
178
        if (!is_string($contentType)) {
179
            throw new \InvalidArgumentException(
180
                'Value for argument contentType should be a string'
181
            );
182
        }
183
184
        if (isset($this->payloadClassToContentTypeMap[$payloadClass])) {
185
            throw new \RuntimeException(
186
                'Content type for class ' . $payloadClass . ' was already set to ' . $this->payloadClassToContentTypeMap[$payloadClass]
187
            );
188
        }
189
        $this->payloadClassToContentTypeMap[$payloadClass] = $contentType;
190
    }
191
}
192