Completed
Push — master ( 5b5cdc...a6b764 )
by Sam
02:28
created

PersistenceManager::hasInstance()   A

Complexity

Conditions 1
Paths 1

Size

Total Lines 4
Code Lines 2

Duplication

Lines 0
Ratio 0 %

Importance

Changes 1
Bugs 0 Features 0
Metric Value
c 1
b 0
f 0
dl 0
loc 4
rs 10
cc 1
eloc 2
nc 1
nop 1
1
<?php
2
3
namespace Jalle19\StatusManager\Manager;
4
5
use Jalle19\StatusManager\Database;
6
use Jalle19\StatusManager\Database\Channel;
7
use Jalle19\StatusManager\Database\ChannelQuery;
8
use Jalle19\StatusManager\Database\Connection;
9
use Jalle19\StatusManager\Database\ConnectionQuery;
10
use Jalle19\StatusManager\Database\Input;
11
use Jalle19\StatusManager\Database\InputQuery;
12
use Jalle19\StatusManager\Database\InstanceQuery;
13
use Jalle19\StatusManager\Database\Subscription;
14
use Jalle19\StatusManager\Database\SubscriptionQuery;
15
use Jalle19\StatusManager\Database\User;
16
use Jalle19\StatusManager\Database\UserQuery;
17
use Jalle19\StatusManager\Event\ConnectionSeenEvent;
18
use Jalle19\StatusManager\Event\Events;
19
use Jalle19\StatusManager\Event\InputSeenEvent;
20
use Jalle19\StatusManager\Event\InstanceSeenEvent;
21
use Jalle19\StatusManager\Event\SubscriptionSeenEvent;
22
use Jalle19\StatusManager\Event\SubscriptionStateChangeEvent;
23
use Jalle19\StatusManager\Subscription\StateChange;
24
use Jalle19\tvheadend\model\SubscriptionStatus;
25
use Symfony\Component\EventDispatcher\EventSubscriberInterface;
26
27
/**
28
 * Handles persisting of objects to the database
29
 *
30
 * @package   Jalle19\StatusManager\Manager
31
 * @copyright Copyright &copy; Sam Stenvall 2015-
32
 * @license   https://www.gnu.org/licenses/gpl.html The GNU General Public License v2.0
33
 */
34
class PersistenceManager extends AbstractManager implements EventSubscriberInterface
35
{
36
37
	/**
38
	 * @inheritdoc
39
	 */
40
	public static function getSubscribedEvents()
41
	{
42
		return [
43
			Events::MAIN_LOOP_STARTING        => 'onMainLoopStarted',
44
			Events::INSTANCE_SEEN             => 'onInstanceSeen',
45
			Events::CONNECTION_SEEN           => 'onConnectionSeen',
46
			Events::INPUT_SEEN                => 'onInputSeen',
47
			Events::SUBSCRIPTION_SEEN         => 'onSubscriptionSeen',
48
			Events::SUBSCRIPTION_STATE_CHANGE => 'onSubscriptionStateChange',
49
		];
50
	}
51
52
53
	/**
54
	 * Removes stale subscriptions that haven't received a stop event
55
	 */
56
	public function onMainLoopStarted()
57
	{
58
		$numRemoved = SubscriptionQuery::create()->filterByStopped(null)->delete();
59
60
		$this->logger->info('Removed {numRemoved} stale subscriptions', [
61
			'numRemoved' => $numRemoved,
62
		]);
63
	}
64
65
66
	/**
67
	 * @param InstanceSeenEvent $event
68
	 *
69
	 * @throws \Propel\Runtime\Exception\PropelException
70
	 */
71
	public function onInstanceSeen(InstanceSeenEvent $event)
72
	{
73
		$instance = $event->getInstance();
74
75
		if (InstanceQuery::create()->hasInstance($instance))
76
			return;
77
78
		$instanceModel = new Database\Instance();
79
		$instanceModel->setPrimaryKey($instance->getHostname());
80
		$instanceModel->save();
81
82
		$this->logger->info('Stored new instance {instanceName}', [
83
			'instanceName' => $instance->getHostname(),
84
		]);
85
86
		// Create a special user for eventual DVR subscriptions
87
		$user = new User();
88
		$user->setInstance($instanceModel);
89
		$user->setName(User::NAME_DVR);
90
		$user->save();
91
92
		$this->logger
93
			->info('Stored new special user (instance: {instanceName}, user: {userName})', [
94
				'instanceName' => $instance->getHostname(),
95
				'userName'     => $user->getName(),
96
			]);
97
	}
98
99
100
	/**
101
	 * @param ConnectionSeenEvent $event
102
	 */
103
	public function onConnectionSeen(ConnectionSeenEvent $event)
104
	{
105
		$instanceName     = $event->getInstance();
106
		$connectionStatus = $event->getConnection();
107
108
		if (ConnectionQuery::create()->hasConnection($instanceName, $connectionStatus))
109
			return;
110
111
		$user = null;
112
113
		// Find the user object when applicable
114
		if (!$connectionStatus->isAnonymous())
115
		{
116
			$this->onUserSeen($instanceName, $connectionStatus->user);
117
118
			$user = UserQuery::create()->filterByInstanceName($instanceName)->filterByName($connectionStatus->user)
119
			                 ->findOne();
120
		}
121
122
		$connection = new Connection();
123
		$connection->setInstanceName($instanceName)->setPeer($connectionStatus->peer)
124
		           ->setUser($user)
125
		           ->setStarted($connectionStatus->started)->setType($connectionStatus->type)->save();
126
127
		$this->logger->info('Stored new connection (instance: {instanceName}, peer: {peer})', [
128
			'instanceName' => $instanceName,
129
			'peer'         => $connectionStatus->peer,
130
		]);
131
	}
132
133
134
	/**
135
	 * @param InputSeenEvent $event
136
	 */
137
	public function onInputSeen(InputSeenEvent $event)
138
	{
139
		$instanceName = $event->getInstance();
140
		$inputStatus  = $event->getInput();
141
142
		// Update the input and started fields for existing inputs
143
		if (InputQuery::create()->hasInput($inputStatus->uuid))
144
		{
145
			$input = InputQuery::create()->findPk($inputStatus->uuid);
146
			$input->setStarted(new \DateTime())->setWeight($inputStatus->weight);
147
148
			return;
149
		}
150
151
		$input = new Input();
152
		$input->setInstanceName($instanceName)
153
		      ->setStarted(new \DateTime())
154
		      ->setFromInputStatus($inputStatus)->save();
155
156
		$this->logger
157
			->info('Stored new input (instance: {instanceName}, network: {network}, mux: {mux}, weight: {weight})',
158
				[
159
					'instanceName' => $instanceName,
160
					'network'      => $input->getNetwork(),
161
					'mux'          => $input->getMux(),
162
					'weight'       => $input->getWeight(),
163
				]);
164
	}
165
166
167
	/**
168
	 * @param SubscriptionSeenEvent $event
169
	 *
170
	 * @throws \Propel\Runtime\Exception\PropelException
171
	 */
172
	public function onSubscriptionSeen(SubscriptionSeenEvent $event)
173
	{
174
		$instanceName = $event->getInstance();
175
		$status       = $event->getSubscription();
176
177
		// Ignore certain subscriptions
178
		if (in_array($status->getType(), [SubscriptionStatus::TYPE_EPGGRAB, SubscriptionStatus::TYPE_SERVICE_OR_MUX]))
179
			return;
180
181
		// Determine the username to store for the subscription
182
		$username = $status->username;
183
184
		switch ($status->getType())
185
		{
186
			case SubscriptionStatus::TYPE_RECORDING:
187
				$username = 'dvr';
188
				break;
189
		}
190
191
		// Get the instance, user and channel
192
		$instance = InstanceQuery::create()->findPk($instanceName);
193
		$user     = UserQuery::create()->filterByInstance($instance)->filterByName($username)->findOne();
194
195
		// Ensure the channel exists
196
		$this->onChannelSeen($instanceName, $status->channel);
197
		$channel = ChannelQuery::create()->filterByInstance($instance)->filterByName($status->channel)->findOne();
198
199
		if (SubscriptionQuery::create()->hasSubscription($instance, $user, $channel, $status))
200
			return;
201
202
		// Try to determine which input is used by the subscription
203
		$input = InputQuery::create()->filterBySubscriptionStatus($instanceName, $status)->findOne();
204
205
		if ($input === null)
206
		{
207
			$this->logger
208
				->warning('Got subscription that cannot be tied to an input ({instanceName}, user: {userName}, channel: {channelName})',
209
					[
210
						'instanceName' => $instanceName,
211
						'userName'     => $user !== null ? $user->getName() : 'N/A',
212
						'channelName'  => $channel->getName(),
213
					]);
214
		}
215
216
		$subscription = new Subscription();
217
		$subscription->setInstance($instance)->setInput($input)->setUser($user)->setChannel($channel)
218
		             ->setSubscriptionId($status->id)->setStarted($status->start)->setTitle($status->title)
219
		             ->setService($status->service);
220
		$subscription->save();
221
222
		$this->logger
223
			->info('Stored new subscription (instance: {instanceName}, user: {userName}, channel: {channelName})',
224
				[
225
					'instanceName' => $instanceName,
226
					'userName'     => $user !== null ? $user->getName() : 'N/A',
227
					'channelName'  => $channel->getName(),
228
				]);
229
	}
230
231
232
	/**
233
	 * @param SubscriptionStateChangeEvent $event
234
	 */
235
	public function onSubscriptionStateChange(SubscriptionStateChangeEvent $event)
236
	{
237
		$instanceName = $event->getInstance();
238
		$stateChange  = $event->getStateChange();
239
240
		// We only need to persist subscription stops
241
		if ($stateChange->getState() === StateChange::STATE_SUBSCRIPTION_STARTED)
242
			return;
243
244
		// Find the latest matching subscription
245
		$subscription = SubscriptionQuery::create()
246
		                                 ->getNewestMatching($instanceName, $stateChange->getSubscriptionId());
247
248
		// EPG grab subscriptions are not stored so we don't want to log these with a high level
249
		if ($subscription === null)
250
		{
251
			$this->logger
252
				->debug('Got subscription stop without a matching start (instance: {instanceName}, subscription: {subscriptionId})',
253
					[
254
						'instanceName'   => $instanceName,
255
						'subscriptionId' => $stateChange->getSubscriptionId(),
256
					]);
257
258
			return;
259
		}
260
261
		$subscription->setStopped(new \DateTime());
262
		$subscription->save();
263
264
		$user    = $subscription->getUser();
265
		$channel = $subscription->getChannel();
266
267
		$this->logger
268
			->info('Stored subscription stop (instance: {instanceName}, user: {userName}, channel: {channelName})',
269
				[
270
					'instanceName' => $instanceName,
271
					'userName'     => $user !== null ? $user->getName() : 'N/A',
272
					'channelName'  => $channel->getName(),
273
				]);
274
	}
275
276
277
	/**
278
	 * @param string $instanceName
279
	 * @param string $userName
280
	 *
281
	 * @throws \Propel\Runtime\Exception\PropelException
282
	 */
283
	private function onUserSeen($instanceName, $userName)
284
	{
285
		if (UserQuery::create()->hasUser($instanceName, $userName))
286
			return;
287
288
		$user = new User();
289
		$user->setInstanceName($instanceName)->setName($userName);
290
		$user->save();
291
292
		$this->logger->info('Stored new user (instance: {instanceName}, username: {userName})', [
293
			'instanceName' => $instanceName,
294
			'userName'     => $userName,
295
		]);
296
	}
297
298
299
	/**
300
	 * @param string $instanceName
301
	 * @param string $channelName
302
	 *
303
	 * @throws \Propel\Runtime\Exception\PropelException
304
	 */
305
	private function onChannelSeen($instanceName, $channelName)
306
	{
307
		if (ChannelQuery::create()->hasChannel($instanceName, $channelName))
308
			return;
309
310
		$channel = new Channel();
311
		$channel->setInstanceName($instanceName)->setName($channelName);
312
		$channel->save();
313
314
		$this->logger
315
			->info('Stored new channel (instance: {instanceName}, name: {channelName})', [
316
				'instanceName' => $instanceName,
317
				'channelName'  => $channelName,
318
			]);
319
	}
320
321
}
322