AsynchronousMessageProducer::__invoke()   A
last analyzed

Complexity

Conditions 2
Paths 2

Size

Total Lines 10
Code Lines 5

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
dl 0
loc 10
rs 9.4285
c 0
b 0
f 0
cc 2
eloc 5
nc 2
nop 2
1
<?php
2
3
declare(strict_types=1);
4
namespace TomCizek\AsynchronousRouter;
5
6
use Prooph\Common\Messaging\Message;
7
use Prooph\Common\Messaging\MessageConverter;
8
use Prooph\ServiceBus\Async\MessageProducer;
9
use Prooph\ServiceBus\Exception\RuntimeException;
10
use React\Promise\Deferred;
11
12
class AsynchronousMessageProducer implements MessageProducer
13
{
14
    /* @var AsynchronousMessageProducerBridge */
15
    private $producerBridge;
16
17
    /** @var MessageConverter */
18
    private $messageConverter;
19
20
    /** @var array */
21
    private $routes;
22
23
    public function __construct(AsynchronousMessageProducerBridge $producerBridge, MessageConverter $messageConverter)
24
    {
25
        $this->producerBridge = $producerBridge;
26
        $this->messageConverter = $messageConverter;
27
    }
28
29
    public function injectRoutes(array $routes)
30
    {
31
        $this->routes = $routes;
32
    }
33
34
    public function __invoke(Message $message, Deferred $deferred = null): void
35
    {
36
        if ($deferred !== null) {
37
            throw new RuntimeException(__CLASS__ . ' cannot handle query messages which require future responses.');
38
        }
39
40
        $routingKey = $this->getRoutingKeyFromMessageRoute($message);
41
42
        $this->producerBridge->publishWithRoutingKey($message, $routingKey);
43
    }
44
45
    private function getRoutingKeyFromMessageRoute(Message $message): string
46
    {
47
        if (empty($this->routes[$message->messageName()])) {
48
            throw new RuntimeException(
49
                sprintf(
50
                    'Producer route key for message of name "%s" in asynchronous routing not found.',
51
                    $message->messageName()
52
                )
53
            );
54
        }
55
56
        return $this->routes[$message->messageName()];
57
    }
58
}
59