Completed
Push — master ( 6a09d7...558842 )
by Sam
04:37
created

PersistenceManager::onMainLoopStarted()   A

Complexity

Conditions 1
Paths 1

Size

Total Lines 8
Code Lines 4

Duplication

Lines 0
Ratio 0 %

Importance

Changes 1
Bugs 1 Features 0
Metric Value
c 1
b 1
f 0
dl 0
loc 8
rs 9.4285
cc 1
eloc 4
nc 1
nop 0
1
<?php
2
3
namespace Jalle19\StatusManager;
4
5
use Jalle19\StatusManager\Database;
6
use Jalle19\StatusManager\Database\Channel;
7
use Jalle19\StatusManager\Database\ChannelQuery;
8
use Jalle19\StatusManager\Database\Connection;
9
use Jalle19\StatusManager\Database\ConnectionQuery;
10
use Jalle19\StatusManager\Database\Input;
11
use Jalle19\StatusManager\Database\InputQuery;
12
use Jalle19\StatusManager\Database\InstanceQuery;
13
use Jalle19\StatusManager\Database\Subscription;
14
use Jalle19\StatusManager\Database\SubscriptionQuery;
15
use Jalle19\StatusManager\Database\User;
16
use Jalle19\StatusManager\Database\UserQuery;
17
use Jalle19\StatusManager\Event\ConnectionSeenEvent;
18
use Jalle19\StatusManager\Event\InputSeenEvent;
19
use Jalle19\StatusManager\Event\InstanceSeenEvent;
20
use Jalle19\StatusManager\Event\SubscriptionSeenEvent;
21
use Jalle19\StatusManager\Event\SubscriptionStateChangeEvent;
22
use Jalle19\StatusManager\Subscription\StateChange;
23
use Jalle19\tvheadend\model\ConnectionStatus;
24
use Jalle19\tvheadend\model\SubscriptionStatus;
25
use Jalle19\tvheadend\Tvheadend;
26
use Psr\Log\LoggerInterface;
27
28
/**
29
 * Handles persisting of objects to the database
30
 *
31
 * @package   Jalle19\StatusManager
32
 * @copyright Copyright &copy; Sam Stenvall 2015-
33
 * @license   https://www.gnu.org/licenses/gpl.html The GNU General Public License v2.0
34
 */
35
class PersistenceManager
36
{
37
38
	/**
39
	 * @var LoggerInterface
40
	 */
41
	private $_logger;
42
43
44
	/**
45
	 * @param LoggerInterface $logger
46
	 */
47
	public function __construct(LoggerInterface $logger)
48
	{
49
		$this->_logger = $logger;
50
	}
51
52
53
	/**
54
	 * Removes stale subscriptions that haven't received a stop event
55
	 */
56
	public function onMainLoopStarted()
57
	{
58
		$numRemoved = SubscriptionQuery::create()->filterByStopped(null)->deleteAll();
59
60
		$this->_logger->info('Removed {numRemoved} stale subscriptions', [
61
			'numRemoved' => $numRemoved,
62
		]);
63
	}
64
65
66
	/**
67
	 * @param InstanceSeenEvent $event
68
	 *
69
	 * @throws \Propel\Runtime\Exception\PropelException
70
	 */
71
	public function onInstanceSeen(InstanceSeenEvent $event)
72
	{
73
		$instance = $event->getInstance();
74
75
		if ($this->hasInstance($instance))
76
			return;
77
78
		$instanceModel = new Database\Instance();
79
		$instanceModel->setPrimaryKey($instance->getHostname());
80
		$instanceModel->save();
81
82
		$this->_logger->info('Stored new instance {instanceName}', [
83
			'instanceName' => $instance->getHostname(),
84
		]);
85
86
		// Create a special user for eventual DVR subscriptions
87
		$user = new User();
88
		$user->setInstance($instanceModel);
89
		$user->setName(User::NAME_DVR);
90
		$user->save();
91
92
		$this->_logger->info('Stored new special user (instance: {instanceName}, user: {userName})', [
93
			'instanceName' => $instance->getHostname(),
94
			'userName'     => $user->getName(),
95
		]);
96
	}
97
98
99
	/**
100
	 * @param ConnectionSeenEvent $event
101
	 */
102
	public function onConnectionSeen(ConnectionSeenEvent $event)
103
	{
104
		$instanceName     = $event->getInstance();
105
		$connectionStatus = $event->getConnection();
106
107
		if ($this->hasConnection($instanceName, $connectionStatus))
108
			return;
109
110
		$user = null;
111
112
		// Find the user object when applicable
113
		if (!$connectionStatus->isAnonymous())
114
		{
115
			$this->onUserSeen($instanceName, $connectionStatus->user);
116
117
			$user = UserQuery::create()->filterByInstanceName($instanceName)->filterByName($connectionStatus->user)
118
			                 ->findOne();
119
		}
120
121
		$connection = new Connection();
122
		$connection->setInstanceName($instanceName)->setPeer($connectionStatus->peer)
123
		           ->setUser($user)
124
		           ->setStarted($connectionStatus->started)->setType($connectionStatus->type)->save();
125
126
		$this->_logger->info('Stored new connection (instance: {instanceName}, peer: {peer})', [
127
			'instanceName' => $instanceName,
128
			'peer'         => $connectionStatus->peer,
129
		]);
130
	}
131
132
133
	/**
134
	 * @param InputSeenEvent $event
135
	 */
136
	public function onInputSeen(InputSeenEvent $event)
137
	{
138
		$instanceName = $event->getInstance();
139
		$inputStatus  = $event->getInput();
140
141
		// Update the input and started fields for existing inputs
142
		if ($this->hasInput($inputStatus->uuid))
143
		{
144
			$input = InputQuery::create()->findPk($inputStatus->uuid);
145
			$input->setStarted(new \DateTime())->setWeight($inputStatus->weight);
146
147
			return;
148
		}
149
150
		$input = new Input();
151
		$input->setPrimaryKey($inputStatus->uuid);
152
		$input->setInstanceName($instanceName)
153
		      ->setStarted(new \DateTime())
154
		      ->setInput($inputStatus->input)
155
		      ->setWeight($inputStatus->weight)
156
		      ->setNetwork(Input::parseNetwork($inputStatus))
157
		      ->setMux(Input::parseMux($inputStatus))->save();
158
159
		$this->_logger->info('Stored new input (instance: {instanceName}, network: {network}, mux: {mux}, weight: {weight})',
160
			[
161
				'instanceName' => $instanceName,
162
				'network'      => $input->getNetwork(),
163
				'mux'          => $input->getMux(),
164
				'weight'       => $input->getWeight(),
165
			]);
166
	}
167
168
169
	/**
170
	 * @param SubscriptionSeenEvent $event
171
	 *
172
	 * @throws \Propel\Runtime\Exception\PropelException
173
	 */
174
	public function onSubscriptionSeen(SubscriptionSeenEvent $event)
175
	{
176
		$instanceName = $event->getInstance();
177
		$status       = $event->getSubscription();
178
179
		// Ignore certain subscriptions
180
		if (in_array($status->getType(), [SubscriptionStatus::TYPE_EPGGRAB, SubscriptionStatus::TYPE_SERVICE_OR_MUX]))
181
			return;
182
183
		// Determine the username to store for the subscription
184
		$username = $status->username;
185
186
		switch ($status->getType())
187
		{
188
			case SubscriptionStatus::TYPE_RECORDING:
189
				$username = 'dvr';
190
				break;
191
		}
192
193
		// Get the instance, user and channel
194
		$instance = InstanceQuery::create()->findPk($instanceName);
195
		$user     = UserQuery::create()->filterByInstance($instance)->filterByName($username)->findOne();
196
197
		// Ensure the channel exists
198
		$this->onChannelSeen($instanceName, $status->channel);
199
		$channel = ChannelQuery::create()->filterByInstance($instance)->filterByName($status->channel)->findOne();
200
201
		if ($this->hasSubscription($instance, $user, $channel, $status))
202
			return;
203
204
		// Try to determine which input is used by the subscription
205
		$input = InputQuery::create()->filterBySubscriptionStatus($instanceName, $status)->findOne();
206
207
		if ($input === null)
208
		{
209
			$this->_logger->warning('Got subscription that cannot be tied to an input ({instanceName}, user: {userName}, channel: {channelName})',
210
				[
211
					'instanceName' => $instanceName,
212
					'userName'     => $user !== null ? $user->getName() : 'N/A',
213
					'channelName'  => $channel->getName(),
214
				]);
215
		}
216
217
		$subscription = new Subscription();
218
		$subscription->setInstance($instance)->setInput($input)->setUser($user)->setChannel($channel)
219
		             ->setSubscriptionId($status->id)->setStarted($status->start)->setTitle($status->title)
220
		             ->setService($status->service);
221
		$subscription->save();
222
223
		$this->_logger->info('Stored new subscription (instance: {instanceName}, user: {userName}, channel: {channelName})',
224
			[
225
				'instanceName' => $instanceName,
226
				'userName'     => $user !== null ? $user->getName() : 'N/A',
227
				'channelName'  => $channel->getName(),
228
			]);
229
	}
230
231
232
	/**
233
	 * @param SubscriptionStateChangeEvent $event
234
	 */
235
	public function onSubscriptionStateChange(SubscriptionStateChangeEvent $event)
236
	{
237
		$instanceName = $event->getInstance();
238
		$stateChange  = $event->getStateChange();
239
240
		// We only need to persist subscription stops
241
		if ($stateChange->getState() === StateChange::STATE_SUBSCRIPTION_STARTED)
242
			return;
243
244
		// Find the latest matching subscription
245
		$subscription = SubscriptionQuery::create()->filterByInstanceName($instanceName)
246
		                                 ->filterBySubscriptionId($stateChange->getSubscriptionId())
247
		                                 ->addDescendingOrderByColumn('started')->findOne();
248
249
		// EPG grab subscriptions are not stored so we don't want to log these with a high level
250
		if ($subscription === null)
251
		{
252
			$this->_logger->debug('Got subscription stop without a matching start (instance: {instanceName}, subscription: {subscriptionId})',
253
				[
254
					'instanceName'   => $instanceName,
255
					'subscriptionId' => $stateChange->getSubscriptionId(),
256
				]);
257
258
			return;
259
		}
260
261
		$subscription->setStopped(new \DateTime());
262
		$subscription->save();
263
264
		$user    = $subscription->getUser();
265
		$channel = $subscription->getChannel();
266
267
		$this->_logger->info('Stored subscription stop (instance: {instanceName}, user: {userName}, channel: {channelName})',
268
			[
269
				'instanceName' => $instanceName,
270
				'userName'     => $user !== null ? $user->getName() : 'N/A',
271
				'channelName'  => $channel->getName(),
272
			]);
273
	}
274
275
	/**
276
	 * @param string $instanceName
277
	 * @param string $userName
278
	 *
279
	 * @throws \Propel\Runtime\Exception\PropelException
280
	 */
281
	private function onUserSeen($instanceName, $userName)
282
	{
283
		if ($this->hasUser($instanceName, $userName))
284
			return;
285
286
		$user = new User();
287
		$user->setInstanceName($instanceName)->setName($userName);
288
		$user->save();
289
290
		$this->_logger->info('Stored new user (instance: {instanceName}, username: {userName})', [
291
			'instanceName' => $instanceName,
292
			'userName'     => $userName,
293
		]);
294
	}
295
296
297
	/**
298
	 * @param string $instanceName
299
	 * @param string $channelName
300
	 *
301
	 * @throws \Propel\Runtime\Exception\PropelException
302
	 */
303
	private function onChannelSeen($instanceName, $channelName)
304
	{
305
		if ($this->hasChannel($instanceName, $channelName))
306
			return;
307
308
		$channel = new Channel();
309
		$channel->setInstanceName($instanceName)->setName($channelName);
310
		$channel->save();
311
312
		$this->_logger->info('Stored new channel (instance: {instanceName}, name: {channelName})', [
313
			'instanceName' => $instanceName,
314
			'channelName'  => $channelName,
315
		]);
316
	}
317
318
319
	/**
320
	 * @param Tvheadend $instance
321
	 *
322
	 * @return bool whether the instance exists in the database
323
	 */
324
	private function hasInstance(Tvheadend $instance)
325
	{
326
		return InstanceQuery::create()->findPk($instance->getHostname()) !== null;
327
	}
328
329
330
	/**
331
	 * @param                  $instanceName
332
	 * @param ConnectionStatus $connectionStatus
333
	 *
334
	 * @return bool whether the connection exists in the database
335
	 */
336
	private function hasConnection($instanceName, ConnectionStatus $connectionStatus)
337
	{
338
		return ConnectionQuery::create()->filterByInstanceName($instanceName)->filterByPeer($connectionStatus->peer)
339
		                      ->filterByStarted($connectionStatus->started)->findOne() !== null;
340
	}
341
342
343
	/**
344
	 * @param string $uuid
345
	 *
346
	 * @return bool
347
	 */
348
	private function hasInput($uuid)
349
	{
350
		return InputQuery::create()->findPk($uuid) !== null;
351
	}
352
353
354
	/**
355
	 * @param string $instanceName
356
	 * @param string $userName
357
	 *
358
	 * @return bool
359
	 */
360
	private function hasUser($instanceName, $userName)
361
	{
362
		return UserQuery::create()->filterByInstanceName($instanceName)->filterByName($userName)->findOne() !== null;
363
	}
364
365
366
	/**
367
	 * @param string $instanceName
368
	 * @param string $channelName
369
	 *
370
	 * @return bool
371
	 */
372
	private function hasChannel($instanceName, $channelName)
373
	{
374
		return ChannelQuery::create()->filterByInstanceName($instanceName)->filterByName($channelName)
375
		                   ->findOne() !== null;
376
	}
377
378
379
	/**
380
	 * @param Database\Instance  $instance
381
	 * @param User|null          $user
382
	 * @param Channel            $channel
383
	 * @param SubscriptionStatus $subscription
384
	 *
385
	 * @return bool
386
	 * @throws \Propel\Runtime\Exception\PropelException
387
	 */
388
	private function hasSubscription(
389
		Database\Instance $instance,
390
		$user,
391
		Channel $channel,
392
		SubscriptionStatus $subscription
393
	) {
394
		// Not all subscriptions are tied to a user
395
		$userId = $user !== null ? $user->getId() : null;
396
397
		return SubscriptionQuery::create()->filterByInstance($instance)->filterByUserId($userId)
398
		                        ->filterByChannel($channel)
399
		                        ->filterBySubscriptionId($subscription->id)->filterByStarted($subscription->start)
400
		                        ->findOne() !== null;
401
	}
402
403
}
404