Completed
Push — master ( 3dfec0...098987 )
by Sam
03:00
created

PersistenceManager   B

Complexity

Total Complexity 31

Size/Duplication

Total Lines 378
Duplicated Lines 0 %

Coupling/Cohesion

Components 3
Dependencies 18

Importance

Changes 3
Bugs 0 Features 0
Metric Value
wmc 31
c 3
b 0
f 0
lcom 3
cbo 18
dl 0
loc 378
rs 7.1866

15 Methods

Rating   Name   Duplication   Size   Complexity  
A getSubscribedEvents() 0 11 1
A onMainLoopStarted() 0 8 1
B onInstanceSeen() 0 27 2
B onConnectionSeen() 0 29 3
B onInputSeen() 0 32 2
B onSubscriptionSeen() 0 58 7
B onSubscriptionStateChange() 0 41 4
A onUserSeen() 0 14 2
A onChannelSeen() 0 15 2
A hasInstance() 0 4 1
A hasConnection() 0 5 1
A hasInput() 0 4 1
A hasUser() 0 4 1
A hasChannel() 0 5 1
A hasSubscription() 0 14 2
1
<?php
2
3
namespace Jalle19\StatusManager\Manager;
4
5
use Jalle19\StatusManager\Database;
6
use Jalle19\StatusManager\Database\Channel;
7
use Jalle19\StatusManager\Database\ChannelQuery;
8
use Jalle19\StatusManager\Database\Connection;
9
use Jalle19\StatusManager\Database\ConnectionQuery;
10
use Jalle19\StatusManager\Database\Input;
11
use Jalle19\StatusManager\Database\InputQuery;
12
use Jalle19\StatusManager\Database\InstanceQuery;
13
use Jalle19\StatusManager\Database\Subscription;
14
use Jalle19\StatusManager\Database\SubscriptionQuery;
15
use Jalle19\StatusManager\Database\User;
16
use Jalle19\StatusManager\Database\UserQuery;
17
use Jalle19\StatusManager\Event\ConnectionSeenEvent;
18
use Jalle19\StatusManager\Event\Events;
19
use Jalle19\StatusManager\Event\InputSeenEvent;
20
use Jalle19\StatusManager\Event\InstanceSeenEvent;
21
use Jalle19\StatusManager\Event\SubscriptionSeenEvent;
22
use Jalle19\StatusManager\Event\SubscriptionStateChangeEvent;
23
use Jalle19\StatusManager\Subscription\StateChange;
24
use Jalle19\tvheadend\model\ConnectionStatus;
25
use Jalle19\tvheadend\model\SubscriptionStatus;
26
use Jalle19\tvheadend\Tvheadend;
27
use Symfony\Component\EventDispatcher\EventSubscriberInterface;
28
29
/**
30
 * Handles persisting of objects to the database
31
 *
32
 * @package   Jalle19\StatusManager\Manager
33
 * @copyright Copyright &copy; Sam Stenvall 2015-
34
 * @license   https://www.gnu.org/licenses/gpl.html The GNU General Public License v2.0
35
 */
36
class PersistenceManager extends AbstractManager implements EventSubscriberInterface
37
{
38
39
	/**
40
	 * @inheritdoc
41
	 */
42
	public static function getSubscribedEvents()
43
	{
44
		return [
45
			Events::MAIN_LOOP_STARTING        => 'onMainLoopStarted',
46
			Events::INSTANCE_SEEN             => 'onInstanceSeen',
47
			Events::CONNECTION_SEEN           => 'onConnectionSeen',
48
			Events::INPUT_SEEN                => 'onInputSeen',
49
			Events::SUBSCRIPTION_SEEN         => 'onSubscriptionSeen',
50
			Events::SUBSCRIPTION_STATE_CHANGE => 'onSubscriptionStateChange',
51
		];
52
	}
53
54
55
	/**
56
	 * Removes stale subscriptions that haven't received a stop event
57
	 */
58
	public function onMainLoopStarted()
59
	{
60
		$numRemoved = SubscriptionQuery::create()->filterByStopped(null)->delete();
61
62
		$this->logger->info('Removed {numRemoved} stale subscriptions', [
63
			'numRemoved' => $numRemoved,
64
		]);
65
	}
66
67
68
	/**
69
	 * @param InstanceSeenEvent $event
70
	 *
71
	 * @throws \Propel\Runtime\Exception\PropelException
72
	 */
73
	public function onInstanceSeen(InstanceSeenEvent $event)
74
	{
75
		$instance = $event->getInstance();
76
77
		if ($this->hasInstance($instance))
78
			return;
79
80
		$instanceModel = new Database\Instance();
81
		$instanceModel->setPrimaryKey($instance->getHostname());
82
		$instanceModel->save();
83
84
		$this->logger->info('Stored new instance {instanceName}', [
85
			'instanceName' => $instance->getHostname(),
86
		]);
87
88
		// Create a special user for eventual DVR subscriptions
89
		$user = new User();
90
		$user->setInstance($instanceModel);
91
		$user->setName(User::NAME_DVR);
92
		$user->save();
93
94
		$this->logger
95
			->info('Stored new special user (instance: {instanceName}, user: {userName})', [
96
				'instanceName' => $instance->getHostname(),
97
				'userName'     => $user->getName(),
98
			]);
99
	}
100
101
102
	/**
103
	 * @param ConnectionSeenEvent $event
104
	 */
105
	public function onConnectionSeen(ConnectionSeenEvent $event)
106
	{
107
		$instanceName     = $event->getInstance();
108
		$connectionStatus = $event->getConnection();
109
110
		if ($this->hasConnection($instanceName, $connectionStatus))
111
			return;
112
113
		$user = null;
114
115
		// Find the user object when applicable
116
		if (!$connectionStatus->isAnonymous())
117
		{
118
			$this->onUserSeen($instanceName, $connectionStatus->user);
119
120
			$user = UserQuery::create()->filterByInstanceName($instanceName)->filterByName($connectionStatus->user)
121
			                 ->findOne();
122
		}
123
124
		$connection = new Connection();
125
		$connection->setInstanceName($instanceName)->setPeer($connectionStatus->peer)
126
		           ->setUser($user)
127
		           ->setStarted($connectionStatus->started)->setType($connectionStatus->type)->save();
128
129
		$this->logger->info('Stored new connection (instance: {instanceName}, peer: {peer})', [
130
			'instanceName' => $instanceName,
131
			'peer'         => $connectionStatus->peer,
132
		]);
133
	}
134
135
136
	/**
137
	 * @param InputSeenEvent $event
138
	 */
139
	public function onInputSeen(InputSeenEvent $event)
140
	{
141
		$instanceName = $event->getInstance();
142
		$inputStatus  = $event->getInput();
143
144
		// Update the input and started fields for existing inputs
145
		if ($this->hasInput($inputStatus->uuid))
146
		{
147
			$input = InputQuery::create()->findPk($inputStatus->uuid);
148
			$input->setStarted(new \DateTime())->setWeight($inputStatus->weight);
149
150
			return;
151
		}
152
153
		$input = new Input();
154
		$input->setPrimaryKey($inputStatus->uuid);
155
		$input->setInstanceName($instanceName)
156
		      ->setStarted(new \DateTime())
157
		      ->setInput($inputStatus->input)
158
		      ->setWeight($inputStatus->weight)
159
		      ->setNetwork(Input::parseNetwork($inputStatus))
160
		      ->setMux(Input::parseMux($inputStatus))->save();
161
162
		$this->logger
163
			->info('Stored new input (instance: {instanceName}, network: {network}, mux: {mux}, weight: {weight})',
164
				[
165
					'instanceName' => $instanceName,
166
					'network'      => $input->getNetwork(),
167
					'mux'          => $input->getMux(),
168
					'weight'       => $input->getWeight(),
169
				]);
170
	}
171
172
173
	/**
174
	 * @param SubscriptionSeenEvent $event
175
	 *
176
	 * @throws \Propel\Runtime\Exception\PropelException
177
	 */
178
	public function onSubscriptionSeen(SubscriptionSeenEvent $event)
179
	{
180
		$instanceName = $event->getInstance();
181
		$status       = $event->getSubscription();
182
183
		// Ignore certain subscriptions
184
		if (in_array($status->getType(), [SubscriptionStatus::TYPE_EPGGRAB, SubscriptionStatus::TYPE_SERVICE_OR_MUX]))
185
			return;
186
187
		// Determine the username to store for the subscription
188
		$username = $status->username;
189
190
		switch ($status->getType())
191
		{
192
			case SubscriptionStatus::TYPE_RECORDING:
193
				$username = 'dvr';
194
				break;
195
		}
196
197
		// Get the instance, user and channel
198
		$instance = InstanceQuery::create()->findPk($instanceName);
199
		$user     = UserQuery::create()->filterByInstance($instance)->filterByName($username)->findOne();
200
201
		// Ensure the channel exists
202
		$this->onChannelSeen($instanceName, $status->channel);
203
		$channel = ChannelQuery::create()->filterByInstance($instance)->filterByName($status->channel)->findOne();
204
205
		if ($this->hasSubscription($instance, $user, $channel, $status))
206
			return;
207
208
		// Try to determine which input is used by the subscription
209
		$input = InputQuery::create()->filterBySubscriptionStatus($instanceName, $status)->findOne();
210
211
		if ($input === null)
212
		{
213
			$this->logger
214
				->warning('Got subscription that cannot be tied to an input ({instanceName}, user: {userName}, channel: {channelName})',
215
					[
216
						'instanceName' => $instanceName,
217
						'userName'     => $user !== null ? $user->getName() : 'N/A',
218
						'channelName'  => $channel->getName(),
219
					]);
220
		}
221
222
		$subscription = new Subscription();
223
		$subscription->setInstance($instance)->setInput($input)->setUser($user)->setChannel($channel)
224
		             ->setSubscriptionId($status->id)->setStarted($status->start)->setTitle($status->title)
225
		             ->setService($status->service);
226
		$subscription->save();
227
228
		$this->logger
229
			->info('Stored new subscription (instance: {instanceName}, user: {userName}, channel: {channelName})',
230
				[
231
					'instanceName' => $instanceName,
232
					'userName'     => $user !== null ? $user->getName() : 'N/A',
233
					'channelName'  => $channel->getName(),
234
				]);
235
	}
236
237
238
	/**
239
	 * @param SubscriptionStateChangeEvent $event
240
	 */
241
	public function onSubscriptionStateChange(SubscriptionStateChangeEvent $event)
242
	{
243
		$instanceName = $event->getInstance();
244
		$stateChange  = $event->getStateChange();
245
246
		// We only need to persist subscription stops
247
		if ($stateChange->getState() === StateChange::STATE_SUBSCRIPTION_STARTED)
248
			return;
249
250
		// Find the latest matching subscription
251
		$subscription = SubscriptionQuery::create()->filterByInstanceName($instanceName)
252
		                                 ->filterBySubscriptionId($stateChange->getSubscriptionId())
253
		                                 ->addDescendingOrderByColumn('started')->findOne();
254
255
		// EPG grab subscriptions are not stored so we don't want to log these with a high level
256
		if ($subscription === null)
257
		{
258
			$this->logger
259
				->debug('Got subscription stop without a matching start (instance: {instanceName}, subscription: {subscriptionId})',
260
					[
261
						'instanceName'   => $instanceName,
262
						'subscriptionId' => $stateChange->getSubscriptionId(),
263
					]);
264
265
			return;
266
		}
267
268
		$subscription->setStopped(new \DateTime());
269
		$subscription->save();
270
271
		$user    = $subscription->getUser();
272
		$channel = $subscription->getChannel();
273
274
		$this->logger
275
			->info('Stored subscription stop (instance: {instanceName}, user: {userName}, channel: {channelName})',
276
				[
277
					'instanceName' => $instanceName,
278
					'userName'     => $user !== null ? $user->getName() : 'N/A',
279
					'channelName'  => $channel->getName(),
280
				]);
281
	}
282
283
284
	/**
285
	 * @param string $instanceName
286
	 * @param string $userName
287
	 *
288
	 * @throws \Propel\Runtime\Exception\PropelException
289
	 */
290
	private function onUserSeen($instanceName, $userName)
291
	{
292
		if ($this->hasUser($instanceName, $userName))
293
			return;
294
295
		$user = new User();
296
		$user->setInstanceName($instanceName)->setName($userName);
297
		$user->save();
298
299
		$this->logger->info('Stored new user (instance: {instanceName}, username: {userName})', [
300
			'instanceName' => $instanceName,
301
			'userName'     => $userName,
302
		]);
303
	}
304
305
306
	/**
307
	 * @param string $instanceName
308
	 * @param string $channelName
309
	 *
310
	 * @throws \Propel\Runtime\Exception\PropelException
311
	 */
312
	private function onChannelSeen($instanceName, $channelName)
313
	{
314
		if ($this->hasChannel($instanceName, $channelName))
315
			return;
316
317
		$channel = new Channel();
318
		$channel->setInstanceName($instanceName)->setName($channelName);
319
		$channel->save();
320
321
		$this->logger
322
			->info('Stored new channel (instance: {instanceName}, name: {channelName})', [
323
				'instanceName' => $instanceName,
324
				'channelName'  => $channelName,
325
			]);
326
	}
327
328
329
	/**
330
	 * @param Tvheadend $instance
331
	 *
332
	 * @return bool whether the instance exists in the database
333
	 */
334
	private function hasInstance(Tvheadend $instance)
335
	{
336
		return InstanceQuery::create()->findPk($instance->getHostname()) !== null;
337
	}
338
339
340
	/**
341
	 * @param                  $instanceName
342
	 * @param ConnectionStatus $connectionStatus
343
	 *
344
	 * @return bool whether the connection exists in the database
345
	 */
346
	private function hasConnection($instanceName, ConnectionStatus $connectionStatus)
347
	{
348
		return ConnectionQuery::create()->filterByInstanceName($instanceName)->filterByPeer($connectionStatus->peer)
349
		                      ->filterByStarted($connectionStatus->started)->findOne() !== null;
350
	}
351
352
353
	/**
354
	 * @param string $uuid
355
	 *
356
	 * @return bool
357
	 */
358
	private function hasInput($uuid)
359
	{
360
		return InputQuery::create()->findPk($uuid) !== null;
361
	}
362
363
364
	/**
365
	 * @param string $instanceName
366
	 * @param string $userName
367
	 *
368
	 * @return bool
369
	 */
370
	private function hasUser($instanceName, $userName)
371
	{
372
		return UserQuery::create()->filterByInstanceName($instanceName)->filterByName($userName)->findOne() !== null;
373
	}
374
375
376
	/**
377
	 * @param string $instanceName
378
	 * @param string $channelName
379
	 *
380
	 * @return bool
381
	 */
382
	private function hasChannel($instanceName, $channelName)
383
	{
384
		return ChannelQuery::create()->filterByInstanceName($instanceName)->filterByName($channelName)
385
		                   ->findOne() !== null;
386
	}
387
388
389
	/**
390
	 * @param Database\Instance  $instance
391
	 * @param User|null          $user
392
	 * @param Channel            $channel
393
	 * @param SubscriptionStatus $subscription
394
	 *
395
	 * @return bool
396
	 * @throws \Propel\Runtime\Exception\PropelException
397
	 */
398
	private function hasSubscription(
399
		Database\Instance $instance,
400
		$user,
401
		Channel $channel,
402
		SubscriptionStatus $subscription
403
	) {
404
		// Not all subscriptions are tied to a user
405
		$userId = $user !== null ? $user->getId() : null;
406
407
		return SubscriptionQuery::create()->filterByInstance($instance)->filterByUserId($userId)
408
		                        ->filterByChannel($channel)
409
		                        ->filterBySubscriptionId($subscription->id)->filterByStarted($subscription->start)
410
		                        ->findOne() !== null;
411
	}
412
413
}
414