Completed
Push — master ( 6a09d7...558842 )
by Sam
04:37
created

StatusManager   C

Complexity

Total Complexity 18

Size/Duplication

Total Lines 269
Duplicated Lines 0 %

Coupling/Cohesion

Components 1
Dependencies 20

Importance

Changes 20
Bugs 1 Features 4
Metric Value
wmc 18
c 20
b 1
f 4
lcom 1
cbo 20
dl 0
loc 269
rs 6.4705

5 Methods

Rating   Name   Duplication   Size   Complexity  
A __construct() 0 17 2
A configureEventDispatcher() 0 17 2
A run() 0 50 2
B handleInstanceUpdates() 0 41 6
B getStatusMessages() 0 70 6
1
<?php
2
3
namespace Jalle19\StatusManager;
4
5
use Jalle19\StatusManager\Database;
6
use Jalle19\StatusManager\Event\ConnectionSeenEvent;
7
use Jalle19\StatusManager\Event\Events;
8
use Jalle19\StatusManager\Event\InputSeenEvent;
9
use Jalle19\StatusManager\Event\InstanceSeenEvent;
10
use Jalle19\StatusManager\Event\InstanceStatusUpdatesEvent;
11
use Jalle19\StatusManager\Event\SubscriptionSeenEvent;
12
use Jalle19\StatusManager\Event\SubscriptionStateChangeEvent;
13
use Jalle19\StatusManager\Subscription\StateChangeParser;
14
use Psr\Log\LoggerInterface;
15
use Ratchet\Http\HttpServer;
16
use Ratchet\Server\IoServer;
17
use Ratchet\WebSocket\WsServer;
18
use Symfony\Component\EventDispatcher\EventDispatcher;
19
20
/**
21
 * Class StatusManager
22
 * @package   Jalle19\StatusManager
23
 * @copyright Copyright &copy; Sam Stenvall 2015-
24
 * @license   https://www.gnu.org/licenses/gpl.html The GNU General Public License v2.0
25
 */
26
class StatusManager
27
{
28
29
	/**
30
	 * The number of cycles to wait until retrying an unreachable instance
31
	 */
32
	const UNREACHABLE_CYCLES_UNTIL_RETRY = 10;
33
34
	/**
35
	 * @var Configuration the configuration
36
	 */
37
	private $_configuration;
38
39
	/**
40
	 * @var \SplObjectStorage the instances to connect to and their individual state
41
	 */
42
	private $_instances;
43
44
	/**
45
	 * @var LoggerInterface the logger
46
	 */
47
	private $_logger;
48
49
	/**
50
	 * @var IoServer the Websocket server
51
	 */
52
	private $_websocket;
53
54
	/**
55
	 * @var WebSocketManager
56
	 */
57
	private $_WebSocketManager;
58
59
	/**
60
	 * @var EventDispatcher
61
	 */
62
	private $_eventDispatcher;
63
64
	/**
65
	 * @var PersistenceManager the persistence manager
66
	 */
67
	private $_persistenceManager;
68
69
70
	/**
71
	 * StatusManager constructor.
72
	 *
73
	 * @param Configuration   $configuration
74
	 * @param LoggerInterface $logger
75
	 */
76
	public function __construct(Configuration $configuration, LoggerInterface $logger)
77
	{
78
		$this->_configuration = $configuration;
79
		$this->_logger        = $logger;
80
		$this->_instances     = new \SplObjectStorage();
81
82
		// Attach a state to each instance
83
		foreach ($this->_configuration->getInstances() as $instance)
84
			$this->_instances->attach($instance, new InstanceState());
85
86
		// Create manager instances
87
		$this->_persistenceManager = new PersistenceManager($logger);
88
		$this->_WebSocketManager   = new WebSocketManager($logger);
89
90
		// Configure the event dispatcher
91
		$this->configureEventDispatcher();
92
	}
93
94
95
	/**
96
	 * Configures the event dispatcher and attaches event listeners to it
97
	 */
98
	private function configureEventDispatcher()
99
	{
100
		$this->_eventDispatcher = new EventDispatcher();
101
102
		$eventDefinitions = [
103
			[Events::MAIN_LOOP_STARTING, $this->_persistenceManager, 'onMainLoopStarted'],
104
			[Events::INSTANCE_STATUS_UPDATES, $this->_WebSocketManager, 'onInstanceStatusUpdates'],
105
			[Events::INSTANCE_SEEN, $this->_persistenceManager, 'onInstanceSeen'],
106
			[Events::CONNECTION_SEEN, $this->_persistenceManager, 'onConnectionSeen'],
107
			[Events::INPUT_SEEN, $this->_persistenceManager, 'onInputSeen'],
108
			[Events::SUBSCRIPTION_SEEN, $this->_persistenceManager, 'onSubscriptionSeen'],
109
			[Events::SUBSCRIPTION_STATE_CHANGE, $this->_persistenceManager, 'onSubscriptionStateChange'],
110
		];
111
112
		foreach ($eventDefinitions as $eventDefinition)
113
			$this->_eventDispatcher->addListener($eventDefinition[0], [$eventDefinition[1], $eventDefinition[2]]);
114
	}
115
116
117
	/**
118
	 * Runs the application
119
	 */
120
	public function run()
121
	{
122
		// Configure the WebSocket server
123
		$address = $this->_configuration->getListenAddress();
124
		$port    = $this->_configuration->getListenPort();
125
126
		$this->_websocket = IoServer::factory(
127
			new HttpServer(new WsServer($this->_WebSocketManager)),
128
			$port,
129
			$address
130
		);
131
132
		// Add the instance polling mechanism to the event loop
133
		$this->_websocket->loop->addPeriodicTimer($this->_configuration->getUpdateInterval(),
134
			[$this, 'handleInstanceUpdates']);
135
136
		// Log information about the database
137
		$this->_logger->debug('Using database at {databasePath}', [
138
			'databasePath' => $this->_configuration->getDatabasePath(),
139
		]);
140
141
		// Log information about the configured instances
142
		$instances = $this->_configuration->getInstances();
143
144
		$this->_logger->info('Managing {instances} instances:', [
145
			'instances' => count($instances),
146
		]);
147
148
		foreach ($instances as $configuredInstance)
149
		{
150
			$instance = $configuredInstance->getInstance();
151
152
			$this->_logger->info('  {address}:{port}', [
153
				'address' => $instance->getHostname(),
154
				'port'    => $instance->getPort(),
155
			]);
156
157
			$this->_eventDispatcher->dispatch(Events::INSTANCE_SEEN, new InstanceSeenEvent($instance));
158
		}
159
160
		$this->_eventDispatcher->dispatch(Events::MAIN_LOOP_STARTING);
161
162
		// Start the main loop
163
		$this->_logger->info('Starting the Websocket server on {address}:{port}', [
164
			'address' => $address,
165
			'port'    => $port,
166
		]);
167
168
		$this->_websocket->run();
169
	}
170
171
172
	/**
173
	 * Handles the updates polled from the instances
174
	 */
175
	public function handleInstanceUpdates()
176
	{
177
		$statusCollection = $this->getStatusMessages();
178
179
		foreach ($statusCollection->getInstanceStatuses() as $instanceStatus)
180
		{
181
			$instanceName = $instanceStatus->getInstanceName();
182
183
			$this->_logger->debug('Got status updates from {instanceName}', [
184
				'instanceName' => $instanceName,
185
			]);
186
187
			// Persist connections
188
			foreach ($instanceStatus->getConnections() as $connection)
189
			{
190
				$this->_eventDispatcher->dispatch(Events::CONNECTION_SEEN,
191
					new ConnectionSeenEvent($instanceName, $connection));
192
			}
193
194
			// Persist inputs
195
			foreach ($instanceStatus->getInputs() as $input)
196
				$this->_eventDispatcher->dispatch(Events::INPUT_SEEN, new InputSeenEvent($instanceName, $input));
197
198
			// Persist running subscriptions
199
			foreach ($instanceStatus->getSubscriptions() as $subscription)
200
			{
201
				$this->_eventDispatcher->dispatch(Events::SUBSCRIPTION_SEEN,
202
					new SubscriptionSeenEvent($instanceName, $subscription));
203
			}
204
205
			// Handle subscription state changes
206
			foreach ($instanceStatus->getSubscriptionStateChanges() as $subscriptionStateChange)
207
			{
208
				$this->_eventDispatcher->dispatch(Events::SUBSCRIPTION_STATE_CHANGE,
209
					new SubscriptionStateChangeEvent($instanceName, $subscriptionStateChange));
210
			}
211
		}
212
213
		$this->_eventDispatcher->dispatch(Events::INSTANCE_STATUS_UPDATES,
214
			new InstanceStatusUpdatesEvent($statusCollection));
215
	}
216
217
218
	/**
219
	 * Retrieves and returns all status messages for the configured
220
	 * instances
221
	 * @return InstanceStatusCollection
222
	 */
223
	private function getStatusMessages()
224
	{
225
		$collection = new InstanceStatusCollection();
226
227
		foreach ($this->_instances as $instance)
228
		{
229
			/* @var Instance $instance */
230
			$tvheadend    = $instance->getInstance();
231
			$instanceName = $instance->getName();
232
233
			/* @var InstanceState $instanceState */
234
			$instanceState = $this->_instances[$instance];
235
236
			// Collect statuses from currently reachable instances
237
			if ($instanceState->isReachable())
238
			{
239
				try
240
				{
241
					$collection->add(new InstanceStatus(
242
						$instanceName,
243
						$tvheadend->getInputStatus(),
244
						$tvheadend->getSubscriptionStatus(),
245
						$tvheadend->getConnectionStatus(),
246
						StateChangeParser::parseStateChanges($tvheadend->getLogMessages())));
247
248
					// Update reachability state now that we know the instance is reachable
249
					if ($instanceState->getReachability() === InstanceState::MAYBE_REACHABLE)
250
					{
251
						$this->_logger->info('Instance {instanceName} is now reachable, will start polling for updates',
252
							[
253
								'instanceName' => $instanceName,
254
							]);
255
256
						$instanceState->setReachability(InstanceState::REACHABLE);
257
					}
258
				}
259
				catch (\Exception $e)
260
				{
261
					// Mark the instance as unreachable
262
					$message = 'Instance {instanceName} not reachable, will wait for {cycles} cycles before retrying.
263
								The exception was: {exception}';
264
265
					$this->_logger->alert($message, [
266
						'instanceName' => $instanceName,
267
						'cycles'       => self::UNREACHABLE_CYCLES_UNTIL_RETRY,
268
						'exception'    => $e->getMessage(),
269
					]);
270
271
					$instanceState->setReachability(InstanceState::UNREACHABLE);
272
				}
273
			}
274
			else
275
			{
276
				// Wait for some cycles and then mark unreachable instances as maybe reachable
277
				if ($instanceState->getRetryCount() === self::UNREACHABLE_CYCLES_UNTIL_RETRY - 1)
278
				{
279
					$instanceState->setReachability(InstanceState::MAYBE_REACHABLE);
280
					$instanceState->resetRetryCount();
281
282
					$this->_logger->info('Retrying instance {instanceName} during next cycle', [
283
						'instanceName' => $instanceName,
284
					]);
285
				}
286
				else
287
					$instanceState->incrementRetryCount();
288
			}
289
		}
290
291
		return $collection;
292
	}
293
294
}
295