Completed
Push — master ( abe4a2...2abbc3 )
by Sam
05:40
created

PersistenceManager::onSubscriptionStateChange()   B

Complexity

Conditions 4
Paths 3

Size

Total Lines 40
Code Lines 22

Duplication

Lines 0
Ratio 0 %

Importance

Changes 3
Bugs 0 Features 0
Metric Value
c 3
b 0
f 0
dl 0
loc 40
rs 8.5806
cc 4
eloc 22
nc 3
nop 1
1
<?php
2
3
namespace Jalle19\StatusManager\Manager;
4
5
use Jalle19\StatusManager\Database;
6
use Jalle19\StatusManager\Database\Channel;
7
use Jalle19\StatusManager\Database\ChannelQuery;
8
use Jalle19\StatusManager\Database\Connection;
9
use Jalle19\StatusManager\Database\ConnectionQuery;
10
use Jalle19\StatusManager\Database\Input;
11
use Jalle19\StatusManager\Database\InputError;
12
use Jalle19\StatusManager\Database\InputQuery;
13
use Jalle19\StatusManager\Database\InstanceQuery;
14
use Jalle19\StatusManager\Database\Subscription;
15
use Jalle19\StatusManager\Database\SubscriptionQuery;
16
use Jalle19\StatusManager\Database\User;
17
use Jalle19\StatusManager\Database\UserQuery;
18
use Jalle19\StatusManager\Event\ConnectionSeenEvent;
19
use Jalle19\StatusManager\Event\Events;
20
use Jalle19\StatusManager\Event\InputSeenEvent;
21
use Jalle19\StatusManager\Event\InstanceSeenEvent;
22
use Jalle19\StatusManager\Event\PersistInputErrorEvent;
23
use Jalle19\StatusManager\Event\SubscriptionSeenEvent;
24
use Jalle19\StatusManager\Event\SubscriptionStateChangeEvent;
25
use Jalle19\StatusManager\Subscription\StateChange;
26
use Jalle19\tvheadend\model\SubscriptionStatus;
27
use Symfony\Component\EventDispatcher\EventSubscriberInterface;
28
29
/**
30
 * Handles persisting of objects to the database
31
 *
32
 * @package   Jalle19\StatusManager\Manager
33
 * @copyright Copyright &copy; Sam Stenvall 2015-
34
 * @license   https://www.gnu.org/licenses/gpl.html The GNU General Public License v2.0
35
 */
36
class PersistenceManager extends AbstractManager implements EventSubscriberInterface
37
{
38
39
	/**
40
	 * @inheritdoc
41
	 */
42
	public static function getSubscribedEvents()
43
	{
44
		return [
45
			Events::MAIN_LOOP_STARTING        => 'onMainLoopStarted',
46
			Events::INSTANCE_SEEN             => 'onInstanceSeen',
47
			Events::CONNECTION_SEEN           => 'onConnectionSeen',
48
			Events::INPUT_SEEN                => 'onInputSeen',
49
			Events::SUBSCRIPTION_SEEN         => 'onSubscriptionSeen',
50
			Events::SUBSCRIPTION_STATE_CHANGE => 'onSubscriptionStateChange',
51
			Events::PERSIST_INPUT_ERROR       => 'onInputError',
52
		];
53
	}
54
55
56
	/**
57
	 * Removes stale subscriptions that haven't received a stop event
58
	 */
59
	public function onMainLoopStarted()
60
	{
61
		$numRemoved = SubscriptionQuery::create()->filterByStopped(null)->delete();
62
63
		$this->logger->info('Removed {numRemoved} stale subscriptions', [
64
			'numRemoved' => $numRemoved,
65
		]);
66
	}
67
68
69
	/**
70
	 * @param InstanceSeenEvent $event
71
	 *
72
	 * @throws \Propel\Runtime\Exception\PropelException
73
	 */
74
	public function onInstanceSeen(InstanceSeenEvent $event)
75
	{
76
		$instance = $event->getInstance();
77
78
		if (InstanceQuery::create()->hasInstance($instance))
79
			return;
80
81
		$instanceModel = new Database\Instance();
82
		$instanceModel->setPrimaryKey($instance->getHostname());
83
		$instanceModel->save();
84
85
		$this->logger->info('Stored new instance {instanceName}', [
86
			'instanceName' => $instance->getHostname(),
87
		]);
88
89
		// Create a special user for eventual DVR subscriptions
90
		$user = new User();
91
		$user->setInstance($instanceModel);
92
		$user->setName(User::NAME_DVR);
93
		$user->save();
94
95
		$this->logger
96
			->info('Stored new special user (instance: {instanceName}, user: {userName})', [
97
				'instanceName' => $instance->getHostname(),
98
				'userName'     => $user->getName(),
99
			]);
100
	}
101
102
103
	/**
104
	 * @param ConnectionSeenEvent $event
105
	 */
106
	public function onConnectionSeen(ConnectionSeenEvent $event)
107
	{
108
		$instanceName     = $event->getInstance();
109
		$connectionStatus = $event->getConnection();
110
111
		if (ConnectionQuery::create()->hasConnection($instanceName, $connectionStatus))
112
			return;
113
114
		$user = null;
115
116
		// Find the user object when applicable
117
		if (!$connectionStatus->isAnonymous())
118
		{
119
			$this->onUserSeen($instanceName, $connectionStatus->user);
120
121
			$user = UserQuery::create()->filterByInstanceName($instanceName)->filterByName($connectionStatus->user)
122
			                 ->findOne();
123
		}
124
125
		$connection = new Connection();
126
		$connection->setInstanceName($instanceName)->setPeer($connectionStatus->peer)
127
		           ->setUser($user)
128
		           ->setStarted($connectionStatus->started)->setType($connectionStatus->type)->save();
129
130
		$this->logger->info('Stored new connection (instance: {instanceName}, peer: {peer})', [
131
			'instanceName' => $instanceName,
132
			'peer'         => $connectionStatus->peer,
133
		]);
134
	}
135
136
137
	/**
138
	 * @param InputSeenEvent $event
139
	 */
140
	public function onInputSeen(InputSeenEvent $event)
141
	{
142
		$instanceName = $event->getInstance();
143
		$inputStatus  = $event->getInputStatus();
144
145
		// Update the input and started fields for existing inputs
146
		if (InputQuery::create()->hasInput($inputStatus->uuid))
147
		{
148
			$input = InputQuery::create()->findPk($inputStatus->uuid);
149
			$input->setStarted(new \DateTime())->setWeight($inputStatus->weight);
150
151
			return;
152
		}
153
154
		$input = new Input();
155
		$input->setInstanceName($instanceName)
156
		      ->setStarted(new \DateTime())
157
		      ->setFromInputStatus($inputStatus)->save();
158
159
		$this->logger
160
			->info('Stored new input (instance: {instanceName}, network: {network}, mux: {mux}, weight: {weight})',
161
				[
162
					'instanceName' => $instanceName,
163
					'network'      => $input->getNetwork(),
164
					'mux'          => $input->getMux(),
165
					'weight'       => $input->getWeight(),
166
				]);
167
	}
168
169
170
	/**
171
	 * @param SubscriptionSeenEvent $event
172
	 *
173
	 * @throws \Propel\Runtime\Exception\PropelException
174
	 */
175
	public function onSubscriptionSeen(SubscriptionSeenEvent $event)
176
	{
177
		$instanceName = $event->getInstance();
178
		$status       = $event->getSubscription();
179
180
		// Ignore certain subscriptions
181
		if (in_array($status->getType(), [SubscriptionStatus::TYPE_EPGGRAB, SubscriptionStatus::TYPE_SERVICE_OR_MUX]))
182
			return;
183
184
		// Determine the username to store for the subscription
185
		$username = $status->username;
186
187
		switch ($status->getType())
188
		{
189
			case SubscriptionStatus::TYPE_RECORDING:
190
				$username = 'dvr';
191
				break;
192
		}
193
194
		// Get the instance, user and channel
195
		$instance = InstanceQuery::create()->findPk($instanceName);
196
		$user     = UserQuery::create()->filterByInstance($instance)->filterByName($username)->findOne();
197
198
		// Ensure the channel exists
199
		$this->onChannelSeen($instanceName, $status->channel);
200
		$channel = ChannelQuery::create()->filterByInstance($instance)->filterByName($status->channel)->findOne();
201
202
		if (SubscriptionQuery::create()->hasSubscription($instance, $user, $channel, $status))
203
			return;
204
205
		// Try to determine which input is used by the subscription
206
		$input = InputQuery::create()->filterBySubscriptionStatus($instanceName, $status)->findOne();
207
208
		if ($input === null)
209
		{
210
			$this->logger
211
				->warning('Got subscription that cannot be tied to an input ({instanceName}, user: {userName}, channel: {channelName})',
212
					[
213
						'instanceName' => $instanceName,
214
						'userName'     => $user !== null ? $user->getName() : 'N/A',
215
						'channelName'  => $channel->getName(),
216
					]);
217
		}
218
219
		$subscription = new Subscription();
220
		$subscription->setInstance($instance)->setInput($input)->setUser($user)->setChannel($channel)
221
		             ->setSubscriptionId($status->id)->setStarted($status->start)->setTitle($status->title)
222
		             ->setService($status->service);
223
		$subscription->save();
224
225
		$this->logger
226
			->info('Stored new subscription (instance: {instanceName}, user: {userName}, channel: {channelName})',
227
				[
228
					'instanceName' => $instanceName,
229
					'userName'     => $user !== null ? $user->getName() : 'N/A',
230
					'channelName'  => $channel->getName(),
231
				]);
232
	}
233
234
235
	/**
236
	 * @param SubscriptionStateChangeEvent $event
237
	 */
238
	public function onSubscriptionStateChange(SubscriptionStateChangeEvent $event)
239
	{
240
		$instanceName = $event->getInstance();
241
		$stateChange  = $event->getStateChange();
242
243
		// We only need to persist subscription stops
244
		if ($stateChange->getState() === StateChange::STATE_SUBSCRIPTION_STARTED)
245
			return;
246
247
		// Find the latest matching subscription
248
		$subscription = SubscriptionQuery::create()
249
		                                 ->getNewestMatching($instanceName, $stateChange->getSubscriptionId());
250
251
		// EPG grab subscriptions are not stored so we don't want to log these with a high level
252
		if ($subscription === null)
253
		{
254
			$this->logger
255
				->debug('Got subscription stop without a matching start (instance: {instanceName}, subscription: {subscriptionId})',
256
					[
257
						'instanceName'   => $instanceName,
258
						'subscriptionId' => $stateChange->getSubscriptionId(),
259
					]);
260
261
			return;
262
		}
263
264
		$subscription->setStopped(new \DateTime());
265
		$subscription->save();
266
267
		$user    = $subscription->getUser();
268
		$channel = $subscription->getChannel();
269
270
		$this->logger
271
			->info('Stored subscription stop (instance: {instanceName}, user: {userName}, channel: {channelName})',
272
				[
273
					'instanceName' => $instanceName,
274
					'userName'     => $user !== null ? $user->getName() : 'N/A',
275
					'channelName'  => $channel->getName(),
276
				]);
277
	}
278
279
280
	/**
281
	 * @param PersistInputErrorEvent $event
282
	 */
283
	public function onInputError(PersistInputErrorEvent $event)
284
	{
285
		$input            = $event->getInput();
286
		$cumulativeErrors = $event->getCumulativeErrors();
287
288
		$inputError = new InputError();
289
		$inputError->setInput($input);
290
		$inputError->setFromInputErrorCumulative($cumulativeErrors);
291
		$inputError->save();
292
		
293
		$this->logger->debug('Persisted input errors (instance: {instanceName}, input: {friendlyName})', [
294
			'instanceName' => $input->getInstanceName(),
295
			'friendlyName' => $input->getFriendlyName(),
296
		]);
297
	}
298
299
300
	/**
301
	 * @param string $instanceName
302
	 * @param string $userName
303
	 *
304
	 * @throws \Propel\Runtime\Exception\PropelException
305
	 */
306
	private function onUserSeen($instanceName, $userName)
307
	{
308
		if (UserQuery::create()->hasUser($instanceName, $userName))
309
			return;
310
311
		$user = new User();
312
		$user->setInstanceName($instanceName)->setName($userName);
313
		$user->save();
314
315
		$this->logger->info('Stored new user (instance: {instanceName}, username: {userName})', [
316
			'instanceName' => $instanceName,
317
			'userName'     => $userName,
318
		]);
319
	}
320
321
322
	/**
323
	 * @param string $instanceName
324
	 * @param string $channelName
325
	 *
326
	 * @throws \Propel\Runtime\Exception\PropelException
327
	 */
328
	private function onChannelSeen($instanceName, $channelName)
329
	{
330
		if (ChannelQuery::create()->hasChannel($instanceName, $channelName))
331
			return;
332
333
		$channel = new Channel();
334
		$channel->setInstanceName($instanceName)->setName($channelName);
335
		$channel->save();
336
337
		$this->logger
338
			->info('Stored new channel (instance: {instanceName}, name: {channelName})', [
339
				'instanceName' => $instanceName,
340
				'channelName'  => $channelName,
341
			]);
342
	}
343
344
}
345