Communicator::unbindTransport()   A
last analyzed

Complexity

Conditions 3
Paths 3

Size

Total Lines 20

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
dl 0
loc 20
rs 9.6
c 0
b 0
f 0
cc 3
nc 3
nop 2
1
<?php
2
/**
3
 * Communicator (https://github.com/waltertamboer/communicator)
4
 *
5
 * @link https://github.com/waltertamboer/communicator for the canonical source repository
6
 * @copyright Copyright (c) 2017 Communicator (https://github.com/waltertamboer/communicator)
7
 * @license https://github.com/waltertamboer/communicator/blob/master/LICENSE.md MIT
8
 */
9
10
namespace Communicator;
11
12
use Communicator\Transport\TransportInterface;
13
use InvalidArgumentException;
14
15
/**
16
 * The Communicator class is the entry point for broadcasting messages.
17
 */
18
final class Communicator
19
{
20
    /**
21
     * The channels over which messages can be broadcasted.
22
     *
23
     * @var array
24
     */
25
    private $channels;
26
27
    /**
28
     * A list with event listeners that will be called when a message is broadcasted.
29
     *
30
     * @var array
31
     */
32
    private $listeners;
33
34
    /**
35
     * Initializes a new instance of this class.
36
     */
37
    public function __construct()
38
    {
39
        $this->channels = [];
40
        $this->listeners = [];
41
42
        $this->addListener([$this, 'onSend'], 0);
43
    }
44
45
    /**
46
     * Attaches a listener to the communicator.
47
     *
48
     * @param callable $listener The listener to attach.
49
     * @param int $priority The priority to add the listener at.
50
     */
51
    public function addListener(callable $listener, int $priority = 0): void
52
    {
53
        $this->listeners[$priority][] = $listener;
54
    }
55
56
    /**
57
     * Detaches a listener from the communicator.
58
     *
59
     * @param callable $listener The listener to attach.
60
     */
61
    public function removeListener(callable $listener): void
62
    {
63
        foreach ($this->listeners as $priority => $priorityListeners) {
64
            foreach ($priorityListeners as $index => $priorityListener) {
65
                if ($priorityListener === $listener) {
66
                    unset($this->listeners[$priority][$index]);
67
                }
68
            }
69
        }
70
    }
71
72
    /**
73
     * Binds a transport to a communication channel.
74
     *
75
     * @param string $channel The channel to bind to.
76
     * @param TransportInterface $transport The transport to bind.
77
     * @return void
78
     */
79
    public function bindTransport($channel, TransportInterface $transport): void
80
    {
81
        if (!array_key_exists($channel, $this->channels)) {
82
            $this->channels[$channel] = [];
83
        }
84
85
        $this->channels[$channel][] = $transport;
86
    }
87
88
    /**
89
     * Unbinds a transport from a communication channel.
90
     *
91
     * @param string $channel The channel to unbind from.
92
     * @param TransportInterface $transport The transport to unbind.
93
     * @return void
94
     * @throws InvalidArgumentException
95
     */
96
    public function unbindTransport($channel, TransportInterface $transport): void
97
    {
98
        if (!array_key_exists($channel, $this->channels)) {
99
            throw new InvalidArgumentException(sprintf(
100
                'The channel "%s" does not exists.',
101
                $channel
102
            ));
103
        }
104
105
        $index = array_search($transport, $this->channels[$channel]);
106
107
        if ($index === false) {
108
            throw new InvalidArgumentException(sprintf(
109
                'The provided transport is not bound for channel "%s"',
110
                $channel
111
            ));
112
        }
113
114
        unset($this->channels[$channel][$index]);
115
    }
116
117
    /**
118
     * Broadcasts a message to the given channel.
119
     *
120
     * @param array $recipients A list with recipients that the message will be broadcasted to.
121
     * @param string $channel The name of the channel to broadcast the message to.
122
     * @param array $params The parameters of the message.
123
     * @param array $options The options that should be provided to the transport.
124
     * @return void
125
     */
126
    public function broadcast(array $recipients, $channel, array $params, array $options = []): void
127
    {
128
        $message = new Message($channel, $params, $options);
129
130
        $this->broadcastMessage($recipients, $message);
131
    }
132
133
    /**
134
     * Sends the given message to all bound transports.
135
     *
136
     * @param array $recipients A list with recipients that the message will be broadcasted to.
137
     * @param Message $message The message to broadcast.
138
     * @return void
139
     */
140
    public function broadcastMessage(array $recipients, Message $message): void
141
    {
142
        krsort($this->listeners);
143
144
        foreach ($this->listeners as $priority => $priorityListeners) {
145
            foreach ($priorityListeners as $priorityListener) {
146
                call_user_func_array($priorityListener, [
147
                    'recipients' => $recipients,
148
                    'message' => $message,
149
                ]);
150
            }
151
        }
152
    }
153
154
    /**
155
     * Called when the messages can be send to the list of transports.
156
     *
157
     * @param array $recipients A list with recipients that the message will be broadcasted to.
158
     * @param Message $message The message to broadcast.
159
     * @return void
160
     * @internal Will be called internally once all listeners have been executed in order.
161
     */
162
    public function onSend(array $recipients, Message $message): void
163
    {
164
        if (!array_key_exists($message->getChannel(), $this->channels)) {
165
            return;
166
        }
167
168
        $transportList = $this->channels[$message->getChannel()];
169
170
        /** @var TransportInterface $transport */
171
        foreach ($transportList as $transport) {
172
            $transport->send($recipients, $message);
173
        }
174
    }
175
}
176