RedisCommandBusConnector::send()   C
last analyzed

Complexity

Conditions 7
Paths 18

Size

Total Lines 37
Code Lines 21

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 18
CRAP Score 7.7656

Importance

Changes 4
Bugs 0 Features 0
Metric Value
c 4
b 0
f 0
dl 0
loc 37
ccs 18
cts 24
cp 0.75
rs 6.7273
cc 7
eloc 21
nc 18
nop 3
crap 7.7656
1
<?php
2
/*
3
 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
4
 * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
5
 * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
6
 * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
7
 * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
8
 * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
9
 * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
10
 * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
11
 * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
12
 * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
13
 * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
14
 *
15
 * The software is based on the Axon Framework project which is
16
 * licensed under the Apache 2.0 license. For more information on the Axon Framework
17
 * see <http://www.axonframework.org/>.
18
 *
19
 * This software consists of voluntary contributions made by many individuals
20
 * and is licensed under the MIT license. For more information, see
21
 * <http://www.governor-framework.org/>.
22
 */
23
24
namespace Governor\Framework\CommandHandling\Distributed;
25
26
use Governor\Framework\CommandHandling\NoHandlerForCommandException;
27
use Governor\Framework\Serializer\MessageSerializer;
28
use Governor\Framework\CommandHandling\CommandBusInterface;
29
use Governor\Framework\Serializer\SerializerInterface;
30
use Governor\Framework\CommandHandling\CommandCallbackInterface;
31
use Governor\Framework\CommandHandling\CommandHandlerInterface;
32
use Governor\Framework\CommandHandling\CommandMessageInterface;
33
34
class RedisCommandBusConnector implements CommandBusConnectorInterface
35
{
36
37
    /**
38
     * @var RedisTemplate
39
     */
40
    private $template;
41
42
    /**
43
     * @var CommandBusInterface
44
     */
45
    private $localSegment;
46
47
    /**
48
     * @var MessageSerializer
49
     */
50
    private $serializer;
51
52
    /**
53
     * @param RedisTemplate $template
54
     * @param CommandBusInterface $localSegment
55
     * @param SerializerInterface $serializer
56
     */
57 4
    function __construct(RedisTemplate $template, CommandBusInterface $localSegment, SerializerInterface $serializer)
0 ignored issues
show
Best Practice introduced by
It is generally recommended to explicitly declare the visibility for methods.

Adding explicit visibility (private, protected, or public) is generally recommend to communicate to other developers how, and from where this method is intended to be used.

Loading history...
58
    {
59 4
        $this->template = $template;
60 4
        $this->localSegment = $localSegment;
61 4
        $this->serializer = new MessageSerializer($serializer);
62 4
    }
63
64
65
    /**
66
     * {@inheritdoc}
67
     */
68 4
    public function send($routingKey, CommandMessageInterface $command, CommandCallbackInterface $callback = null)
69
    {
70 4
        $destination = $this->template->getRoutingDestination($command->getCommandName(), $routingKey);
71
72 4
        if (null === $destination) {
73 4
            $destination = $this->findSuitableNode($command);
74 3
            $this->template->setRoutingDestination($destination, $command->getCommandName(), $routingKey);
75 3
        }
76
77
        // dispatch locally if destination matches this node
78 3
        if ($this->template->getNodeName() === $destination) {
79 1
            $this->localSegment->dispatch($command, $callback);
80 1
            return;
81
        }
82
83 2
        $awaitReply = $callback ? true : false;
84 2
        $dispatchMessage = new DispatchMessage($command, $this->serializer, $awaitReply);
85
86 2
        $this->template->enqueueCommand($destination, $dispatchMessage->toBytes());
87
88 2
        if ($awaitReply) {
89 1
            $reply = $this->template->readCommandReply($command->getIdentifier());
90
91 1
            if (null === $reply) {
92 1
                $callback->onFailure(new CommandTimeoutException($command->getIdentifier()));
0 ignored issues
show
Bug introduced by
It seems like $callback is not always an object, but can also be of type null. Maybe add an additional type check?

If a variable is not always an object, we recommend to add an additional type check to ensure your method call is safe:

function someFunction(A $objectMaybe = null)
{
    if ($objectMaybe instanceof A) {
        $objectMaybe->doSomething();
    }
}
Loading history...
93 1
                return;
94
            }
95
96
            $replyMessage = ReplyMessage::fromBytes($this->serializer, $reply[1]);
97
98
            if ($replyMessage->isSuccess()) {
99
                $callback->onSuccess($replyMessage->getReturnValue());
100
            } else {
101
                $callback->onFailure(new CommandDispatchException($replyMessage->getError()));
102
            }
103
        }
104 1
    }
105
106 4
    private function findSuitableNode(CommandMessageInterface $command)
107
    {
108 4
        $nodes = $this->template->getSubscriptions($command->getCommandName());
109
110 4
        if (empty($nodes)) {
111 1
            throw new NoHandlerForCommandException(
112 1
                sprintf(
113 1
                    "No handler in cluster was subscribed for command [%s]",
114 1
                    $command->getCommandName()
115 1
                )
116 1
            );
117
        }
118
119 3
        return $nodes[0]; // TODO temporary something more elaborate :)
120
    }
121
122
    /**
123
     * Subscribe the given <code>handler</code> to commands of type <code>commandType</code> to the local segment of the
124
     * command bus.
125
     * <p/>
126
     * If a subscription already exists for the given type, the behavior is undefined. Implementations may throw an
127
     * Exception to refuse duplicate subscription or alternatively decide whether the existing or new
128
     * <code>handler</code> gets the subscription.
129
     *
130
     * @param string $commandName The name of the command to subscribe the handler to
131
     * @param CommandHandlerInterface $handler The handler instance that handles the given type of command
132
     */
133
    public function subscribe($commandName, CommandHandlerInterface $handler)
134
    {
135
        $this->localSegment->subscribe($commandName, $handler);
136
    }
137
138
    /**
139
     * Unsubscribe the given <code>handler</code> to commands of type <code>commandType</code>. If the handler is not
140
     * currently assigned to that type of command, no action is taken.
141
     *
142
     * @param string $commandName The name of the command the handler is subscribed to
143
     * @param CommandHandlerInterface $handler The handler instance to unsubscribe from the CommandBus
144
     */
145
    public function unsubscribe($commandName, CommandHandlerInterface $handler)
146
    {
147
        $this->localSegment->unsubscribe($commandName, $handler);
148
    }
149
150
    /**
151
     *
152
     */
153 1
    public function saveSubscriptions()
154
    {
155 1
        foreach ($this->localSegment->getSubscriptions() as $command => $handler) {
156 1
            $this->template->subscribe($command);
157 1
        }
158 1
    }
159
160
    /**
161
     *
162
     */
163
    public function clearSubscriptions()
164
    {
165
        foreach ($this->localSegment->getSubscriptions() as $command => $handler) {
166
            $this->template->unsubscribe($command);
167
        }
168
    }
169
170
    /**
171
     * @return string
172
     */
173
    public function getNodeName()
174
    {
175
        return $this->template->getNodeName();
176
    }
177
178
}