Completed
Push — master ( 741841...b4e10c )
by Alexey
36:54
created

Broadcast   A

Complexity

Total Complexity 22

Size/Duplication

Total Lines 181
Duplicated Lines 0 %

Coupling/Cohesion

Components 2
Dependencies 6

Importance

Changes 0
Metric Value
wmc 22
lcom 2
cbo 6
dl 0
loc 181
rs 10
c 0
b 0
f 0

8 Methods

Rating   Name   Duplication   Size   Complexity  
A __construct() 0 7 1
A on() 0 11 1
B process() 0 37 8
B emit() 0 35 6
A publish() 0 4 1
A channelName() 0 4 1
A channels() 0 14 3
A setProcessMiddlewares() 0 4 1
1
<?php
2
3
namespace SfCod\SocketIoBundle\Service;
4
5
use Exception;
6
use HTMLPurifier;
7
use Psr\Log\LoggerInterface;
8
use SfCod\SocketIoBundle\Events\EventInterface;
9
use SfCod\SocketIoBundle\events\EventPolicyInterface;
10
use SfCod\SocketIoBundle\Events\EventPublisherInterface;
11
use SfCod\SocketIoBundle\events\EventRoomInterface;
12
use SfCod\SocketIoBundle\Events\EventSubscriberInterface;
13
use SfCod\SocketIoBundle\Middleware\Process\ProcessMiddlewareInterface;
14
15
/**
16
 * Class Broadcast.
17
 *
18
 * @package SfCod\SocketIoBundle
19
 */
20
class Broadcast
21
{
22
    /**
23
     * @var array
24
     */
25
    protected static $channels = [];
26
    /**
27
     * @var ProcessMiddlewareInterface[]
28
     */
29
    protected $processMiddlewares = [];
30
    /**
31
     * @var RedisDriver
32
     */
33
    protected $redis;
34
    /**
35
     * @var LoggerInterface
36
     */
37
    protected $logger;
38
    /**
39
     * @var EventManager
40
     */
41
    protected $manager;
42
    /**
43
     * @var Process
44
     */
45
    protected $process;
46
47
    /**
48
     * Broadcast constructor.
49
     */
50
    public function __construct(RedisDriver $redis, EventManager $manager, LoggerInterface $logger, Process $process)
51
    {
52
        $this->redis = $redis;
53
        $this->logger = $logger;
54
        $this->manager = $manager;
55
        $this->process = $process;
56
    }
57
58
    /**
59
     * Subscribe to event from client.
60
     *
61
     * @return \Symfony\Component\Process\Process
62
     *
63
     * @throws Exception
64
     */
65
    public function on(string $event, array $data)
66
    {
67
        // Clear data
68
        array_walk_recursive($data, function (&$item, $key) {
0 ignored issues
show
Unused Code introduced by
The parameter $key is not used and could be removed.

This check looks from parameters that have been defined for a function or method, but which are not used in the method body.

Loading history...
69
            $item = (new HtmlPurifier())->purify($item);
70
        });
71
72
        $this->logger->info(json_encode(['type' => 'on', 'name' => $event, 'data' => $data]));
73
74
        return $this->process->run($event, $data);
75
    }
76
77
    /**
78
     * Run process.
79
     */
80
    public function process(string $handler, array $data)
81
    {
82
        try {
83
            /** @var EventInterface|EventSubscriberInterface|EventPolicyInterface $eventHandler */
84
            $eventHandler = $this->manager->resolve($handler); //container->get(sprintf('socketio.%s', $handler));
85
86
            if (false === $eventHandler instanceof EventInterface) {
87
                throw new Exception('Event should implement EventInterface');
88
            }
89
90
            $eventHandler->setPayload($data);
91
92
            if (false === $eventHandler instanceof EventSubscriberInterface) {
93
                throw new Exception('Event should implement EventSubscriberInterface');
94
            }
95
96
            foreach ($this->processMiddlewares as $processMiddleware) {
97
                if (false === $processMiddleware($handler, $data)) {
98
                    $this->logger->info(json_encode(['type' => 'process_terminated', 'name' => $handler, 'data' => $data]));
99
100
                    return;
101
                }
102
            }
103
104
            if (true === $eventHandler instanceof EventPolicyInterface && false === $eventHandler->can($data)) {
105
                $this->logger->info(json_encode(['type' => 'process_policy_forbidden', 'name' => $handler, 'data' => $data]));
106
107
                return;
108
            }
109
110
            $this->logger->info(json_encode(['type' => 'process', 'name' => $handler, 'data' => $data]));
111
112
            $eventHandler->handle();
113
        } catch (Exception $e) {
114
            $this->logger->error($e);
115
        }
116
    }
117
118
    /**
119
     * Emit event to client.
120
     *
121
     * @throws Exception
122
     */
123
    public function emit(string $event, array $data)
124
    {
125
        $this->logger->info(json_encode(['type' => 'emit', 'name' => $event, 'data' => $data]));
126
127
        try {
128
            /** @var EventInterface|EventPublisherInterface|EventRoomInterface $eventHandler */
129
            $eventHandler = $this->manager->resolve($event); // container->get(sprintf('socketio.%s', $event));
130
131
            if (false === $eventHandler instanceof EventInterface) {
132
                throw new Exception('Event should implement EventInterface');
133
            }
134
135
            $eventHandler->setPayload($data);
136
137
            if (false === $eventHandler instanceof EventPublisherInterface) {
138
                throw new Exception('Event should implement EventPublisherInterface');
139
            }
140
141
            $data = $eventHandler->fire();
142
143
            if ($eventHandler instanceof EventRoomInterface) {
144
                $data['room'] = $eventHandler->room();
145
            }
146
147
            $eventHandlerClass = get_class($eventHandler);
148
            foreach ($eventHandlerClass::broadcastOn() as $channel) {
149
                $this->publish($this->channelName($channel), [
150
                    'name' => $eventHandlerClass::name(),
151
                    'data' => $data,
152
                ]);
153
            }
154
        } catch (Exception $e) {
155
            $this->logger->error($e);
156
        }
157
    }
158
159
    /**
160
     * Publish data to redis channel.
161
     */
162
    protected function publish(string $channel, array $data)
163
    {
164
        $this->redis->getClient(true)->publish($channel, json_encode($data));
165
    }
166
167
    /**
168
     * Prepare channel name.
169
     */
170
    protected function channelName(string $name): string
171
    {
172
        return $name . getenv('SOCKET_IO_NSP');
173
    }
174
175
    /**
176
     * Redis channels names.
177
     */
178
    public function channels(): array
179
    {
180
        if (empty(self::$channels)) {
181
            foreach ($this->manager->getList() as $eventHandlerClass) {
182
                self::$channels = array_merge(self::$channels, $eventHandlerClass::broadcastOn());
183
            }
184
            self::$channels = array_unique(self::$channels);
185
            self::$channels = array_map(function ($channel) {
186
                return $this->channelName($channel);
187
            }, self::$channels);
188
        }
189
190
        return self::$channels;
191
    }
192
193
    /**
194
     * @param ProcessMiddlewareInterface[] $processMiddlewares
195
     */
196
    public function setProcessMiddlewares($processMiddlewares)
197
    {
198
        $this->processMiddlewares = $processMiddlewares;
199
    }
200
}
201