Completed
Push — master ( 8d3325...7753d5 )
by Andrew
01:03 queued 24s
created

MessagesCreator.reply_webrtc()   A

Complexity

Conditions 1

Size

Total Lines 11

Duplication

Lines 0
Ratio 0 %

Importance

Changes 2
Bugs 0 Features 0
Metric Value
cc 1
c 2
b 0
f 0
dl 0
loc 11
rs 9.4285
1
import json
2
import logging
3
import sys
4
import time
5
from datetime import timedelta
6
from numbers import Number
7
from threading import Thread
8
9
import tornado.gen
10
import tornado.httpclient
11
import tornado.web
12
import tornado.websocket
13
import tornadoredis
14
from django.conf import settings
15
from django.core.exceptions import ValidationError
16
from django.db import connection, OperationalError, InterfaceError, IntegrityError
17
from django.db.models import Q, F
18
from redis_sessions.session import SessionStore
19
from tornado import ioloop
20
from tornado.websocket import WebSocketHandler
21
22
from chat.cookies_middleware import create_id
23
from chat.log_filters import id_generator
24
from chat.utils import extract_photo
25
from chat.utils import get_or_create_ip
26
27
try:  # py2
28
	from urlparse import urlparse
29
except ImportError:  # py3
30
	from urllib.parse import urlparse
31
32
from chat.settings import MAX_MESSAGE_SIZE, ALL_ROOM_ID, GENDERS, UPDATE_LAST_READ_MESSAGE, SELECT_SELF_ROOM, \
33
	TORNADO_REDIS_PORT, WEBRTC_CONNECTION
34
from chat.models import User, Message, Room, get_milliseconds, UserJoinedInfo, RoomUsers
35
36
PY3 = sys.version > '3'
37
str_type = str if PY3 else basestring
38
39
40
sessionStore = SessionStore()
41
42
parent_logger = logging.getLogger(__name__)
43
base_logger = logging.LoggerAdapter(parent_logger, {
44
	'id': 0,
45
	'ip': '000.000.000.000'
46
})
47
48
# TODO https://github.com/leporo/tornado-redis#connection-pool-support
49
#CONNECTION_POOL = tornadoredis.ConnectionPool(
50
#	max_connections=500,
51
#	wait_for_available=True)
52
53
54
class Actions(object):
55
	LOGIN = 'addOnlineUser'
56
	SET_WS_ID = 'setWsId'
57
	LOGOUT = 'removeOnlineUser'
58
	SEND_MESSAGE = 'sendMessage'
59
	PRINT_MESSAGE = 'printMessage'
60
	WEBRTC = 'sendRtcData'
61
	CLOSE_FILE_CONNECTION = 'destroyFileConnection'
62
	CLOSE_CALL_CONNECTION = 'destroyCallConnection'
63
	CANCEL_CALL_CONNECTION = 'cancelCallConnection'
64
	ACCEPT_CALL = 'acceptCall'
65
	ACCEPT_FILE = 'acceptFile'
66
	ROOMS = 'setRooms'
67
	REFRESH_USER = 'setOnlineUsers'
68
	GROWL_MESSAGE = 'growl'
69
	GET_MESSAGES = 'loadMessages'
70
	CREATE_DIRECT_CHANNEL = 'addDirectChannel'
71
	DELETE_ROOM = 'deleteRoom'
72
	EDIT_MESSAGE = 'editMessage'
73
	DELETE_MESSAGE = 'deleteMessage'
74
	CREATE_ROOM_CHANNEL = 'addRoom'
75
	INVITE_USER = 'inviteUser'
76
	ADD_USER = 'addUserToDom'
77
	OFFLINE_MESSAGES = 'loadOfflineMessages'
78
	SET_WEBRTC_ID = 'setConnectionId'
79
	SET_WEBRTC_ERROR = 'setError'
80
	OFFER_FILE_CONNECTION = 'offerFile'
81
	OFFER_CALL_CONNECTION = 'offerCall'
82
	REPLY_FILE_CONNECTION = 'replyFile'
83
	REPLY_CALL_CONNECTION = 'replyCall'
84
85
86
class VarNames(object):
87
	WEBRTC_QUED_ID = 'id'
88
	USER = 'user'
89
	USER_ID = 'userId'
90
	TIME = 'time'
91
	CONTENT = 'content'
92
	IMG = 'image'
93
	EVENT = 'action'
94
	MESSAGE_ID = 'id'
95
	GENDER = 'sex'
96
	ROOM_NAME = 'name'
97
	FILE_NAME = 'filename'
98
	ROOM_ID = 'roomId'
99
	ROOM_USERS = 'users'
100
	CHANNEL = 'channel'
101
	WEBRTC_OPPONENT_ID = 'opponentWsId'
102
	GET_MESSAGES_COUNT = 'count'
103
	GET_MESSAGES_HEADER_ID = 'headerId'
104
	CHANNEL_NAME = 'channel'
105
	IS_ROOM_PRIVATE = 'private'
106
	CONNECTION_ID = 'connId'
107
	HANDLER_NAME = 'handler'
108
109
110
class HandlerNames:
111
	CHANNELS = 'channels'
112
	CHAT = 'chat'
113
	GROWL = 'growl'
114
	WEBRTC = 'webrtc'
115
	PEER_CONNECTION = 'peerConnection'
116
	WEBRTC_TRANSFER = 'webrtcTransfer'
117
	WS = 'ws'
118
119
120
class WebRtcRedisStates:
121
	RESPONDED = 'responded'
122
	READY = 'ready'
123
	OFFERED = 'offered'
124
	CLOSED = 'closed'
125
126
127
class RedisPrefix:
128
	USER_ID_CHANNEL_PREFIX = 'u'
129
	DEFAULT_CHANNEL = ALL_ROOM_ID
130
	CONNECTION_ID_LENGTH = 8  # should be secure
131
132
	@classmethod
133
	def generate_user(cls, key):
134
		return cls.USER_ID_CHANNEL_PREFIX + str(key)
135
136
137
class MessagesCreator(object):
138
139
	def __init__(self, *args, **kwargs):
140
		self.sex = None
141
		self.sender_name = None
142
		self.id = None  # child init
143
		self.user_id = 0  # anonymous by default
144
145
	def default(self, content, event, handler):
146
		"""
147
		:return: {"action": event, "content": content, "time": "20:48:57"}
148
		"""
149
		return {
150
			VarNames.EVENT: event,
151
			VarNames.CONTENT: content,
152
			VarNames.USER_ID: self.user_id,
153
			VarNames.TIME: get_milliseconds(),
154
			VarNames.HANDLER_NAME: handler
155
		}
156
157
	def reply_webrtc(self, event, connection_id):
158
		"""
159
		:return: {"action": event, "content": content, "time": "20:48:57"}
160
		"""
161
		return {
162
			VarNames.EVENT: event,
163
			VarNames.CONNECTION_ID: connection_id,
164
			VarNames.USER_ID: self.user_id,
165
			VarNames.USER: self.sender_name,
166
			VarNames.WEBRTC_OPPONENT_ID: self.id,
167
			VarNames.HANDLER_NAME: HandlerNames.WEBRTC_TRANSFER,
168
		}
169
170
	def set_ws_id(self, random, self_id):
171
		return {
172
			VarNames.HANDLER_NAME: HandlerNames.WS,
173
			VarNames.EVENT: Actions.SET_WS_ID,
174
			VarNames.CONTENT: random,
175
			VarNames.WEBRTC_OPPONENT_ID: self_id
176
		}
177
178
	def room_online(self, online, event, channel):
179
		"""
180
		:return: {"action": event, "content": content, "time": "20:48:57"}
181
		"""
182
		room_less = self.default(online, event, HandlerNames.CHAT)
183
		room_less[VarNames.CHANNEL_NAME] = channel
184
		room_less[VarNames.USER] = self.sender_name
185
		room_less[VarNames.GENDER] = self.sex
186
		return room_less
187
188
	def offer_webrtc(self, content, connection_id, room_id, action):
189
		"""
190
		:return: {"action": "call", "content": content, "time": "20:48:57"}
191
		"""
192
		message = self.default(content, action, HandlerNames.WEBRTC)
193
		message[VarNames.USER] = self.sender_name
194
		message[VarNames.CONNECTION_ID] = connection_id
195
		message[VarNames.WEBRTC_OPPONENT_ID] = self.id
196
		message[VarNames.CHANNEL] = room_id
197
		return message
198
199
	def set_connection_id(self, qued_id, connection_id):
200
		return {
201
			VarNames.EVENT: Actions.SET_WEBRTC_ID,
202
			VarNames.HANDLER_NAME: HandlerNames.WEBRTC,
203
			VarNames.CONNECTION_ID: connection_id,
204
			VarNames.WEBRTC_QUED_ID: qued_id
205
		}
206
207
	def set_webrtc_error(self, error, connection_id, qued_id=None):
208
		message = self.default(error, Actions.SET_WEBRTC_ERROR, HandlerNames.PEER_CONNECTION) # TODO file/call
209
		message[VarNames.CONNECTION_ID] = connection_id
210
		if qued_id:
211
			message[VarNames.WEBRTC_QUED_ID] = qued_id
212
		return message
213
214
	@classmethod
215
	def create_message(cls, message):
216
		res = {
217
			VarNames.USER_ID: message.sender_id,
218
			VarNames.CONTENT: message.content,
219
			VarNames.TIME: message.time,
220
			VarNames.MESSAGE_ID: message.id,
221
		}
222
		if message.img.name:
223
			res[VarNames.IMG] = message.img.url
224
		return res
225
226
	@classmethod
227
	def create_send_message(cls, message, event=Actions.PRINT_MESSAGE):
228
		"""
229
		:param message:
230
		:return: "action": "joined", "content": {"v5bQwtWp": "alien", "tRD6emzs": "Alien"},
231
		"sex": "Alien", "user": "tRD6emzs", "time": "20:48:57"}
232
		"""
233
		res = cls.create_message(message)
234
		res[VarNames.EVENT] = event
235
		res[VarNames.CHANNEL] = message.room_id
236
		res[VarNames.HANDLER_NAME] = HandlerNames.CHAT
237
		return res
238
239
	@classmethod
240
	def get_messages(cls, messages, channel):
241
		"""
242
		:type messages: list[Messages]
243
		:type channel: str
244
		:type messages: QuerySet[Messages]
245
		"""
246
		return {
247
			VarNames.CONTENT: [cls.create_message(message) for message in messages],
248
			VarNames.EVENT: Actions.GET_MESSAGES,
249
			VarNames.CHANNEL: channel,
250
			VarNames.HANDLER_NAME: HandlerNames.CHAT
251
		}
252
253
	@property
254
	def channel(self):
255
		return RedisPrefix.generate_user(self.user_id)
256
257
	def subscribe_direct_channel_message(self, room_id, other_user_id):
258
		return {
259
			VarNames.EVENT: Actions.CREATE_DIRECT_CHANNEL,
260
			VarNames.ROOM_ID: room_id,
261
			VarNames.ROOM_USERS: [self.user_id, other_user_id],
262
			VarNames.HANDLER_NAME: HandlerNames.CHANNELS
263
		}
264
265
	def subscribe_room_channel_message(self, room_id, room_name):
266
		return {
267
			VarNames.EVENT: Actions.CREATE_ROOM_CHANNEL,
268
			VarNames.ROOM_ID: room_id,
269
			VarNames.ROOM_USERS: [self.user_id],
270
			VarNames.HANDLER_NAME: HandlerNames.CHANNELS,
271
			VarNames.ROOM_NAME: room_name
272
		}
273
274
	def invite_room_channel_message(self, room_id, user_id, room_name, users):
275
		return {
276
			VarNames.EVENT: Actions.INVITE_USER,
277
			VarNames.ROOM_ID: room_id,
278
			VarNames.USER_ID: user_id,
279
			VarNames.HANDLER_NAME: HandlerNames.CHANNELS,
280
			VarNames.ROOM_NAME: room_name,
281
			VarNames.CONTENT: users
282
		}
283
284
	def add_user_to_room(self, channel, user_id, content):
285
		return {
286
			VarNames.EVENT: Actions.ADD_USER,
287
			VarNames.CHANNEL: channel,
288
			VarNames.USER_ID: user_id,
289
			VarNames.HANDLER_NAME: HandlerNames.CHAT,
290
			VarNames.GENDER: content[VarNames.GENDER], # SEX: 'Alien', USER: 'Andrew'
291
			VarNames.USER: content[VarNames.USER] # SEX: 'Alien', USER: 'Andrew'
292
		}
293
294
	def unsubscribe_direct_message(self, room_id):
295
		return {
296
			VarNames.EVENT: Actions.DELETE_ROOM,
297
			VarNames.ROOM_ID: room_id,
298
			VarNames.USER_ID: self.user_id,
299
			VarNames.HANDLER_NAME: HandlerNames.CHANNELS,
300
			VarNames.TIME: get_milliseconds()
301
		}
302
303
	def load_offline_message(self, offline_messages, channel_key):
304
		res = self.default(offline_messages, Actions.OFFLINE_MESSAGES, HandlerNames.CHAT)
305
		res[VarNames.CHANNEL] = channel_key
306
		return res
307
308
309
class MessagesHandler(MessagesCreator):
310
311
	def __init__(self, *args, **kwargs):
312
		self.closed_channels = None
313
		self.parsable_prefix = 'p'
314
		super(MessagesHandler, self).__init__(*args, **kwargs)
315
		self.webrtc_ids = {}
316
		self.ip = None
317
		from chat import global_redis
318
		self.async_redis_publisher = global_redis.async_redis_publisher
319
		self.sync_redis = global_redis.sync_redis
320
		self.channels = []
321
		self._logger = None
322
		self.async_redis = tornadoredis.Client(port=TORNADO_REDIS_PORT)
323
		self.patch_tornadoredis()
324
		self.pre_process_message = {
325
			Actions.GET_MESSAGES: self.process_get_messages,
326
			Actions.SEND_MESSAGE: self.process_send_message,
327
			Actions.WEBRTC: self.proxy_webrtc,
328
			Actions.CLOSE_FILE_CONNECTION: self.close_file_connection,
329
			Actions.CLOSE_CALL_CONNECTION: self.close_call_connection,
330
			Actions.CANCEL_CALL_CONNECTION: self.cancel_call_connection,
331
			Actions.ACCEPT_CALL: self.accept_call,
332
			Actions.ACCEPT_FILE: self.accept_file,
333
			Actions.CREATE_DIRECT_CHANNEL: self.create_user_channel,
334
			Actions.DELETE_ROOM: self.delete_channel,
335
			Actions.EDIT_MESSAGE: self.edit_message,
336
			Actions.CREATE_ROOM_CHANNEL: self.create_new_room,
337
			Actions.INVITE_USER: self.invite_user,
338
			Actions.OFFER_FILE_CONNECTION: self.offer_webrtc_connection,
339
			Actions.OFFER_CALL_CONNECTION: self.offer_webrtc_connection,
340
			Actions.REPLY_FILE_CONNECTION: self.reply_file_connection,
341
			Actions.REPLY_CALL_CONNECTION: self.reply_call_connection,
342
		}
343
		self.post_process_message = {
344
			Actions.CREATE_DIRECT_CHANNEL: self.send_client_new_channel,
345
			Actions.CREATE_ROOM_CHANNEL: self.send_client_new_channel,
346
			Actions.DELETE_ROOM: self.send_client_delete_channel,
347
			Actions.INVITE_USER: self.send_client_new_channel,
348
			Actions.OFFER_FILE_CONNECTION: self.set_opponent_call_channel,
349
			Actions.OFFER_CALL_CONNECTION: self.set_opponent_call_channel
350
		}
351
352
	def patch_tornadoredis(self):  # TODO remove this
353
		fabric = type(self.async_redis.connection.readline)
354
		self.async_redis.connection.old_read = self.async_redis.connection.readline
355
		def new_read(new_self, callback=None):
356
			try:
357
				return new_self.old_read(callback=callback)
358
			except Exception as e:
359
				current_online = self.get_online_from_redis(RedisPrefix.DEFAULT_CHANNEL)
360
				self.logger.error(e)
361
				self.logger.error(
362
					"Exception info: "
363
					"self.id: %s ;;; "
364
					"self.connected = '%s';;; "
365
					"Redis default channel online = '%s';;; "
366
					"self.channels = '%s';;; "
367
					"self.closed_channels  = '%s';;;",
368
					self.id, self.connected, current_online, self.channels, self.closed_channels
369
				)
370
				raise e
371
372
		self.async_redis.connection.readline = fabric(new_read, self.async_redis.connection)
373
374
	@property
375
	def connected(self):
376
		raise NotImplemented
377
378
	@connected.setter
379
	def connected(self, value):
380
		raise NotImplemented
381
382
	@tornado.gen.engine
383
	def listen(self, channels):
384
		yield tornado.gen.Task(
385
			self.async_redis.subscribe, channels)
386
		self.async_redis.listen(self.pub_sub_message)
387
388
	@property
389
	def logger(self):
390
		return self._logger if self._logger else base_logger
391
392
	@tornado.gen.engine
393
	def add_channel(self, channel):
394
		self.channels.append(channel)
395
		yield tornado.gen.Task(
396
			self.async_redis.subscribe, (channel,))
397
398
	def evaluate(self, query_set):
399
		self.do_db(len, query_set)
400
		return query_set
401
402
	def do_db(self, callback, *args, **kwargs):
403
		try:
404
			return callback(*args, **kwargs)
405
		except (OperationalError, InterfaceError) as e:
406
			if 'MySQL server has gone away' in str(e):
407
				self.logger.warning('%s, reconnecting' % e)
408
				connection.close()
409
				return callback(*args, **kwargs)
410
			else:
411
				raise e
412
413
	def execute_query(self, query, *args, **kwargs):
414
		cursor = connection.cursor()
415
		cursor.execute(query, *args, **kwargs)
416
		desc = cursor.description
417
		return [
418
			dict(zip([col[0] for col in desc], row))
419
			for row in cursor.fetchall()
420
			]
421
422
	def get_online_from_redis(self, channel, check_self_online=False):
423
		"""
424
		:rtype : dict
425
		returns (dict, bool) if check_type is present
426
		"""
427
		online = self.sync_redis.smembers(channel)
428
		self.logger.debug('!! channel %s redis online: %s', channel, online)
429
		result = set()
430
		user_is_online = False
431
		# redis stores8 REDIS_USER_FORMAT, so parse them
432
		if online:
433
			for raw in online:  # py2 iteritems
434
				decoded = raw.decode('utf-8')
435
				# : char specified in cookies_middleware.py.create_id
436
				user_id = int(decoded.split(':')[0])
437
				if user_id == self.user_id and decoded != self.id:
438
					user_is_online = True
439
				result.add(user_id)
440
		result = list(result)
441
		return (result, user_is_online) if check_self_online else result
442
443
	def add_online_user(self, room_id, offline_messages=None):
444
		"""
445
		adds to redis
446
		online_users = { connection_hash1 = stored_redis_user1, connection_hash_2 = stored_redis_user2 }
447
		:return:
448
		"""
449
		self.async_redis_publisher.sadd(room_id, self.id)
450
		# since we add user to online first, latest trigger will always show correct online
451
		online, is_online = self.get_online_from_redis(room_id, True)
452
		if not is_online:  # if a new tab has been opened
453
			online.append(self.user_id)
454
			online_user_names_mes = self.room_online(
455
				online,
456
				Actions.LOGIN,
457
				room_id
458
			)
459
			self.logger.info('!! First tab, sending refresh online for all')
460
			self.publish(online_user_names_mes, room_id)
461
			if offline_messages:
462
				self.ws_write(self.load_offline_message(offline_messages, room_id))
463
		else:  # Send user names to self
464
			online_user_names_mes = self.room_online(
465
				online,
466
				Actions.REFRESH_USER,
467
				room_id
468
			)
469
			self.logger.info('!! Second tab, retrieving online for self')
470
			self.ws_write(online_user_names_mes)
471
472
	def publish(self, message, channel, parsable=False):
473
		jsoned_mess = json.dumps(message)
474
		self.logger.debug('<%s> %s', channel, jsoned_mess)
475
		if parsable:
476
			jsoned_mess = self.encode(jsoned_mess)
477
		self.async_redis_publisher.publish(channel, jsoned_mess)
478
479
	def encode(self, message):
480
		"""
481
		Marks message with prefix to specify that
482
		it should be decoded and proccesed before sending to client
483
		@param message: message to mark
484
		@return: marked message
485
		"""
486
		return self.parsable_prefix + message
487
488
	def remove_parsable_prefix(self, message):
489
		if message.startswith(self.parsable_prefix):
490
			return message[1:]
491
492
	def pub_sub_message(self, message):
493
		data = message.body
494
		if isinstance(data, str_type):  # subscribe event
495
			prefixless_str = self.remove_parsable_prefix(data)
496
			if prefixless_str:
497
				dict_message = json.loads(prefixless_str)
498
				res = self.post_process_message[dict_message[VarNames.EVENT]](dict_message)
499
				if not res:
500
					self.ws_write(prefixless_str)
501
			else:
502
				self.ws_write(data)
503
504
	def ws_write(self, message):
505
		raise NotImplementedError('WebSocketHandler implements')
506
507
	def process_send_message(self, message):
508
		"""
509
		:type message: dict
510
		"""
511
		channel = message[VarNames.CHANNEL]
512
		message_db = Message(
513
			sender_id=self.user_id,
514
			content=message[VarNames.CONTENT]
515
		)
516
		message_db.room_id = channel
517
		if VarNames.IMG in message:
518
			message_db.img = extract_photo(message[VarNames.IMG], message.get(VarNames.FILE_NAME))
519
		self.do_db(message_db.save)  # exit on hacked id with exception
520
		prepared_message = self.create_send_message(message_db)
521
		self.publish(prepared_message, channel)
522
523
	def close_file_connection(self, in_message):
524
		connection_id = in_message[VarNames.CONNECTION_ID]
525
		self_channel_status = self.sync_redis.shget(connection_id, self.id)
526
		if not self_channel_status:
527
			raise Exception("Access Denied")
528
		if self_channel_status != WebRtcRedisStates.CLOSED:
529
			sender_id = self.sync_redis.shget(WEBRTC_CONNECTION, connection_id)
530
			if sender_id == self.id:
531
				self.close_sender(connection_id)
532
			else:
533
				self.close_receiver(connection_id, in_message, sender_id)
534
			self.async_redis_publisher.hset(connection_id, self.id, WebRtcRedisStates.CLOSED)
535
536 View Code Duplication
	def close_call_connection(self, in_message):
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
537
		connection_id = in_message[VarNames.CONNECTION_ID]
538
		conn_users = self.sync_redis.shgetall(connection_id)
539
		if conn_users[self.id] in [WebRtcRedisStates.READY, WebRtcRedisStates.RESPONDED]:
540
			self.async_redis_publisher.hset(connection_id, self.id, WebRtcRedisStates.CLOSED)
541
			del conn_users[self.id]
542
			message = {
543
				VarNames.EVENT: Actions.CLOSE_CALL_CONNECTION,
544
				VarNames.CONNECTION_ID: connection_id,
545
				VarNames.USER_ID: self.user_id,
546
				VarNames.WEBRTC_OPPONENT_ID: self.id,
547
				VarNames.HANDLER_NAME: HandlerNames.PEER_CONNECTION,
548
			}
549
			for user in conn_users:
550
				if conn_users[user] != WebRtcRedisStates.CLOSED:
551
					self.publish(message, user)
552
		else:
553
			raise ValidationError("Invalid channel status.")
554
555 View Code Duplication
	def cancel_call_connection(self, in_message):
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
556
		connection_id = in_message[VarNames.CONNECTION_ID]
557
		conn_users = self.sync_redis.shgetall(connection_id)
558
		if conn_users[self.id] == WebRtcRedisStates.OFFERED:
559
			self.async_redis_publisher.hset(connection_id, self.id, WebRtcRedisStates.CLOSED)
560
			del conn_users[self.id]
561
			message = self.reply_webrtc(Actions.CANCEL_CALL_CONNECTION, connection_id)
562
			for user in conn_users:
563
				if conn_users[user] != WebRtcRedisStates.CLOSED:
564
					self.publish(message, user)
565
		else:
566
			raise ValidationError("Invalid channel status.")
567
568
	def close_receiver(self, connection_id, in_message, sender_id): # TODO for call we should close all
569
		sender_status = self.sync_redis.shget(connection_id, sender_id)
570
		if not sender_status:
571
			raise Exception("Access denied")
572
		if sender_status != WebRtcRedisStates.CLOSED:
573
			in_message[VarNames.WEBRTC_OPPONENT_ID] = self.id
574
			in_message[VarNames.HANDLER_NAME] = HandlerNames.PEER_CONNECTION
575
			self.publish(in_message, sender_id)
576
577
	def close_sender(self, connection_id):
578
		values = self.sync_redis.shgetall(connection_id)
579
		del values[self.id]
580
		for ws_id in values:
581
			if values[ws_id] == WebRtcRedisStates.CLOSED:
582
				continue
583
			self.publish({
584
				VarNames.EVENT: Actions.CLOSE_FILE_CONNECTION,
585
				VarNames.CONNECTION_ID: connection_id,
586
				VarNames.WEBRTC_OPPONENT_ID: self.id,
587
				VarNames.HANDLER_NAME: HandlerNames.WEBRTC_TRANSFER,
588
			}, ws_id)
589
590
	def accept_file(self, in_message):
591
		connection_id = in_message[VarNames.CONNECTION_ID] # TODO accept all if call
592
		sender_ws_id = self.sync_redis.shget(WEBRTC_CONNECTION, connection_id)
593
		sender_ws_status = self.sync_redis.shget(connection_id, sender_ws_id)
594
		self_ws_status = self.sync_redis.shget(connection_id, self.id)
595
		if sender_ws_status == WebRtcRedisStates.READY and self_ws_status == WebRtcRedisStates.RESPONDED:
596
			self.async_redis_publisher.hset(connection_id, self.id, WebRtcRedisStates.READY)
597
			self.publish({
598
				VarNames.EVENT: Actions.ACCEPT_FILE,
599
				VarNames.CONNECTION_ID: connection_id,
600
				VarNames.WEBRTC_OPPONENT_ID: self.id,
601
				VarNames.HANDLER_NAME: HandlerNames.PEER_CONNECTION,
602
			}, sender_ws_id)
603
		else:
604
			raise ValidationError("Invalid channel status")
605
606
	# todo if we shgetall and only then do async hset
607
	# todo we can catch an issue when 2 concurrent users accepted the call
608
	# todo but we didn't  send them ACCEPT_CALL as they both were in status 'offered'
609
	# def accept_call(self, in_message):
610
	# 	connection_id = in_message[VarNames.CONNECTION_ID]
611
	# 	channel_status = self.sync_redis.shgetall(connection_id)
612
	# 	if channel_status and channel_status[self.id] == WebRtcRedisStates.RESPONDED:
613
	# 		self.async_redis_publisher.hset(connection_id, self.id, WebRtcRedisStates.READY)
614
	# 		for key in channel_status:  # del channel_status[self.id] not needed as self in responded
615
	# 			if channel_status[key] == WebRtcRedisStates.READY:
616
	# 				self.publish({
617
	# 					VarNames.EVENT: Actions.ACCEPT_CALL,
618
	# 					VarNames.CONNECTION_ID: connection_id,
619
	# 					VarNames.WEBRTC_OPPONENT_ID: self.id,
620
	# 					VarNames.HANDLER_NAME: HandlerNames.WEBRTC_TRANSFER,
621
	# 				}, key)
622
	# 	else:
623
	# 		raise ValidationError("Invalid channel status")
624
625 View Code Duplication
	def accept_call(self, in_message):
1 ignored issue
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
626
		connection_id = in_message[VarNames.CONNECTION_ID]
627
		self_status = self.sync_redis.shget(connection_id, self.id)
628
		if self_status == WebRtcRedisStates.RESPONDED:
629
			self.sync_redis.hset(connection_id, self.id, WebRtcRedisStates.READY)
630
			channel_status = self.sync_redis.shgetall(connection_id)
631
			del channel_status[self.id]
632
			message = {
633
				VarNames.EVENT: Actions.ACCEPT_CALL,
634
				VarNames.USER_ID: self.user_id,
635
				VarNames.CONNECTION_ID: connection_id,
636
				VarNames.WEBRTC_OPPONENT_ID: self.id,
637
				VarNames.HANDLER_NAME: HandlerNames.WEBRTC_TRANSFER,
638
			}
639
			for key in channel_status:
640
				if channel_status[key] != WebRtcRedisStates.CLOSED:
641
					self.publish(message, key)
642
		else:
643
			raise ValidationError("Invalid channel status")
644
645
	def offer_webrtc_connection(self, in_message):
646
		room_id = in_message[VarNames.CHANNEL]
647
		content = in_message.get(VarNames.CONTENT)
648
		qued_id = in_message[VarNames.WEBRTC_QUED_ID]
649
		connection_id = id_generator(RedisPrefix.CONNECTION_ID_LENGTH)
650
		# use list because sets dont have 1st element which is offerer
651
		self.async_redis_publisher.hset(WEBRTC_CONNECTION, connection_id, self.id)
652
		self.async_redis_publisher.hset(connection_id, self.id, WebRtcRedisStates.READY)
653
		opponents_message = self.offer_webrtc(content, connection_id, room_id, in_message[VarNames.EVENT])
654
		self_message = self.set_connection_id(qued_id, connection_id)
655
		self.ws_write(self_message)
656
		self.logger.info('!! Offering a webrtc, connection_id %s', connection_id)
657
		self.publish(opponents_message, room_id, True)
658
659 View Code Duplication
	def reply_call_connection(self, in_message):
1 ignored issue
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
660
		connection_id = in_message[VarNames.CONNECTION_ID]
661
		conn_users = self.sync_redis.shgetall(connection_id)
662
		if conn_users[self.id] == WebRtcRedisStates.OFFERED:
663
			self.async_redis_publisher.hset(connection_id, self.id, WebRtcRedisStates.RESPONDED)
664
			del conn_users[self.id]
665
			message = self.reply_webrtc(Actions.REPLY_CALL_CONNECTION, connection_id)
666
			for user in conn_users:
667
				if conn_users[user] != WebRtcRedisStates.CLOSED:
668
					self.publish(message, user)
669
		else:
670
			raise ValidationError("Invalid channel status.")
671
672
	def reply_file_connection(self, in_message):
673
		connection_id = in_message[VarNames.CONNECTION_ID]
674
		sender_ws_id = self.sync_redis.shget(WEBRTC_CONNECTION, connection_id)
675
		sender_ws_status = self.sync_redis.shget(connection_id, sender_ws_id)
676
		self_ws_status = self.sync_redis.shget(connection_id, self.id)
677
		if sender_ws_status == WebRtcRedisStates.READY and self_ws_status == WebRtcRedisStates.OFFERED:
678
			self.async_redis_publisher.hset(connection_id, self.id, WebRtcRedisStates.RESPONDED)
679
			self.publish(self.reply_webrtc(Actions.REPLY_FILE_CONNECTION, connection_id), sender_ws_id)
680
		else:
681
			raise ValidationError("Invalid channel status.")
682
683
	def proxy_webrtc(self, in_message):
684
		"""
685
		:type in_message: dict
686
		"""
687
		connection_id = in_message[VarNames.CONNECTION_ID]
688
		channel = in_message.get(VarNames.WEBRTC_OPPONENT_ID)
689
		self_channel_status = self.sync_redis.shget(connection_id, self.id)
690
		opponent_channel_status = self.sync_redis.shget(connection_id, channel)
691
		if not (self_channel_status == WebRtcRedisStates.READY and opponent_channel_status == WebRtcRedisStates.READY):
692
			raise ValidationError('Error in connection status, your status is {} while opponent is {}'.format(
693
				self_channel_status, opponent_channel_status
694
			)) # todo receiver should only accept proxy_webrtc from sender, sender can accept all
695
		# I mean somebody if there're 3 ppl in 1 channel and first is initing transfer to 2nd and 3rd,
696
		# 2nd guy can fraud 3rd guy webrtc traffic, which is allowed during the call, but not while transering file
697
		in_message[VarNames.WEBRTC_OPPONENT_ID] = self.id
698
		in_message[VarNames.HANDLER_NAME] = HandlerNames.PEER_CONNECTION
699
		self.logger.debug("Forwarding message to channel %s, self %s, other status %s",
700
			channel, self_channel_status, opponent_channel_status
701
		)
702
		self.publish(in_message, channel)
703
704
	def create_new_room(self, message):
705
		room_name = message[VarNames.ROOM_NAME]
706
		if not room_name or len(room_name) > 16:
707
			raise ValidationError('Incorrect room name "{}"'.format(room_name))
708
		room = Room(name=room_name)
709
		self.do_db(room.save)
710
		RoomUsers(room_id=room.id, user_id=self.user_id).save()
711
		subscribe_message = self.subscribe_room_channel_message(room.id, room_name)
712
		self.publish(subscribe_message, self.channel, True)
713
714
	def invite_user(self, message):
715
		room_id = message[VarNames.ROOM_ID]
716
		user_id = message[VarNames.USER_ID]
717
		if room_id not in self.channels:
718
			raise ValidationError("Access denied, only allowed for channels {}".format(self.channels))
719
		room = self.do_db(Room.objects.get, id=room_id)
720
		if room.is_private:
721
			raise ValidationError("You can't add users to direct room, create a new room instead")
722
		try:
723
			Room.users.through.objects.create(room_id=room_id, user_id=user_id)
724
		except IntegrityError:
725
			raise ValidationError("User is already in channel")
726
		users_in_room = {}
727
		for user in room.users.all():
728
			self.set_js_user_structure(users_in_room, user.id, user.username, user.sex)
729
		self.publish(self.add_user_to_room(room_id, user_id, users_in_room[user_id]), room_id)
730
		subscribe_message = self.invite_room_channel_message(room_id, user_id, room.name, users_in_room)
731
		self.publish(subscribe_message, RedisPrefix.generate_user(user_id), True)
732
733
	def create_room(self, user_rooms, user_id):
734
		if self.user_id == user_id:
735
			room_ids = list([room['room_id'] for room in self.evaluate(user_rooms)])
736
			query_res = self.execute_query(SELECT_SELF_ROOM, [room_ids, ])
737
		else:
738
			rooms_query = RoomUsers.objects.filter(user_id=user_id, room__in=user_rooms)
739
			query_res = rooms_query.values('room__id', 'room__disabled')
740
		try:
741
			room = self.do_db(query_res.get)
742
			room_id = room['room__id']
743
			self.update_room(room_id, room['room__disabled'])
744
		except RoomUsers.DoesNotExist:
745
			room = Room()
746
			room.save()
747
			room_id = room.id
748
			if self.user_id == user_id:
749
				RoomUsers(user_id=self.user_id, room_id=room_id).save()
750
			else:
751
				RoomUsers.objects.bulk_create([
752
					RoomUsers(user_id=user_id, room_id=room_id),
753
					RoomUsers(user_id=self.user_id, room_id=room_id),
754
				])
755
		return room_id
756
757
	def update_room(self, room_id, disabled):
758
		if not disabled:
759
			raise ValidationError('This room already exist')
760
		else:
761
			Room.objects.filter(id=room_id).update(disabled=False)
762
763
	def create_user_channel(self, message):
764
		user_id = message[VarNames.USER_ID]
765
		# get all self private rooms ids
766
		user_rooms = Room.users.through.objects.filter(user_id=self.user_id, room__name__isnull=True).values('room_id')
767
		# get private room that contains another user from rooms above
768
		room_id = self.create_room(user_rooms, user_id)
769
		subscribe_message = self.subscribe_direct_channel_message(room_id, user_id)
770
		self.publish(subscribe_message, self.channel, True)
771
		other_channel = RedisPrefix.generate_user(user_id)
772
		if self.channel != other_channel:
773
			self.publish(subscribe_message, other_channel, True)
774
775
	def delete_channel(self, message):
776
		room_id = message[VarNames.ROOM_ID]
777
		if room_id not in self.channels or room_id == ALL_ROOM_ID:
778
			raise ValidationError('You are not allowed to exit this room')
779
		room = self.do_db(Room.objects.get, id=room_id)
780
		if room.disabled:
781
			raise ValidationError('Room is already deleted')
782
		if room.name is None:  # if private then disable
783
			room.disabled = True
784
		else: # if public -> leave the room, delete the link
785
			RoomUsers.objects.filter(room_id=room.id, user_id=self.user_id).delete()
786
			online = self.get_online_from_redis(room_id)
787
			online.remove(self.user_id)
788
			self.publish(self.room_online(online, Actions.LOGOUT, room_id), room_id)
789
		room.save()
790
		message = self.unsubscribe_direct_message(room_id)
791
		self.publish(message, room_id, True)
792
793
	def edit_message(self, data):
794
		message_id = data[VarNames.MESSAGE_ID]
795
		message = Message.objects.get(id=message_id)
796
		if message.sender_id != self.user_id:
797
			raise ValidationError("You can only edit your messages")
798
		if message.time + 60000 < get_milliseconds():
799
			raise ValidationError("You can only edit messages that were send not more than 1 min ago")
800
		if message.deleted:
801
			raise ValidationError("Already deleted")
802
		message.content = data[VarNames.CONTENT]
803
		selector = Message.objects.filter(id=message_id)
804
		if message.content is None:
805
			selector.update(deleted=True)
806
			action = Actions.DELETE_MESSAGE
807
		else:
808
			action = Actions.EDIT_MESSAGE
809
			selector.update(content=message.content)
810
		self.publish(self.create_send_message(message, action), message.room_id)
811
812
	def send_client_new_channel(self, message):
813
		room_id = message[VarNames.ROOM_ID]
814
		self.add_channel(room_id)
815
		self.add_online_user(room_id)
816
817
	def set_opponent_call_channel(self, message):
818
		connection_id = message[VarNames.CONNECTION_ID]
819
		if message[VarNames.WEBRTC_OPPONENT_ID] == self.id:
820
			return True
821
		self.sync_redis.hset(connection_id, self.id, WebRtcRedisStates.OFFERED)
822
823
	def send_client_delete_channel(self, message):
824
		room_id = message[VarNames.ROOM_ID]
825
		self.async_redis.unsubscribe((room_id,))
826
		self.async_redis_publisher.hdel(room_id, self.id)
827
		self.channels.remove(room_id)
828
829
	def process_get_messages(self, data):
830
		"""
831
		:type data: dict
832
		"""
833
		header_id = data.get(VarNames.GET_MESSAGES_HEADER_ID, None)
834
		count = int(data.get(VarNames.GET_MESSAGES_COUNT, 10))
835
		room_id = data[VarNames.CHANNEL]
836
		self.logger.info('!! Fetching %d messages starting from %s', count, header_id)
837
		if header_id is None:
838
			messages = Message.objects.filter(Q(room_id=room_id), Q(deleted=False)).order_by('-pk')[:count]
839
		else:
840
			messages = Message.objects.filter(Q(id__lt=header_id), Q(room_id=room_id), Q(deleted=False)).order_by('-pk')[:count]
841
		response = self.do_db(self.get_messages, messages, room_id)
842
		self.ws_write(response)
843
844
	def get_offline_messages(self):
845
		res = {}
846
		offline_messages = Message.objects.filter(
847
			id__gt=F('room__roomusers__last_read_message_id'),
848
			deleted=False,
849
			room__roomusers__user_id=self.user_id
850
		)
851
		for message in offline_messages:
852
			res.setdefault(message.room_id, []).append(self.create_message(message))
853
		return res
854
855
	def get_users_in_current_user_rooms(self):
856
		"""
857
		{
858
			"ROOM_ID:1": {
859
				"name": "All",
860
				"users": {
861
					"USER_ID:admin": {
862
						"name": "USER_NAME:admin",
863
						"sex": "SEX:Secret"
864
					},
865
					"USER_ID_2": {
866
						"name": "USER_NAME:Mike",
867
						"sex": "Male"
868
					}
869
				},
870
				"isPrivate": true
871
			}
872
		}
873
		"""
874
		user_rooms = Room.objects.filter(users__id=self.user_id, disabled=False).values('id', 'name')
875
		res = {room['id']: {
876
				VarNames.ROOM_NAME: room['name'],
877
				VarNames.ROOM_USERS: {}
878
			} for room in user_rooms}
879
		room_ids = (room_id for room_id in res)
880
		rooms_users = User.objects.filter(rooms__in=room_ids).values('id', 'username', 'sex', 'rooms__id')
881
		for user in rooms_users:
882
			self.set_js_user_structure(res[user['rooms__id']][VarNames.ROOM_USERS], user['id'], user['username'], user['sex'])
883
		return res
884
885
	def set_js_user_structure(self, user_dict, user_id, name, sex):
886
		user_dict[user_id] = {
887
			VarNames.USER: name,
888
			VarNames.GENDER: GENDERS[sex]
889
		}
890
891
	def save_ip(self):
892
		if (self.do_db(UserJoinedInfo.objects.filter(
893
				Q(ip__ip=self.ip) & Q(user_id=self.user_id)).exists)):
894
			return
895
		ip_address = get_or_create_ip(self.ip, self.logger)
896
		UserJoinedInfo.objects.create(
897
			ip=ip_address,
898
			user_id=self.user_id
899
		)
900
901
	def publish_logout(self, channel, log_data):
902
		# seems like async solves problem with connection lost and wrong data status
903
		# http://programmers.stackexchange.com/questions/294663/how-to-store-online-status
904
		online, is_online = self.get_online_from_redis(channel, True)
905
		log_data[channel] = {'online': online, 'is_online': is_online}
906
		if not is_online:
907
			message = self.room_online(online, Actions.LOGOUT, channel)
908
			self.publish(message, channel)
909
			return True
910
911
912
class AntiSpam(object):
913
914
	def __init__(self):
915
		self.spammed = 0
916
		self.info = {}
917
918
	def check_spam(self, json_message):
919
		message_length = len(json_message)
920
		info_key = int(round(time.time() * 100))
921
		self.info[info_key] = message_length
922
		if message_length > MAX_MESSAGE_SIZE:
923
			self.spammed += 1
924
			raise ValidationError("Message can't exceed %d symbols" % MAX_MESSAGE_SIZE)
925
		self.check_timed_spam()
926
927
	def check_timed_spam(self):
928
		# TODO implement me
929
		pass
930
		# raise ValidationError("You're chatting too much, calm down a bit!")
931
932
933
class TornadoHandler(WebSocketHandler, MessagesHandler):
934
935
	def __init__(self, *args, **kwargs):
936
		super(TornadoHandler, self).__init__(*args, **kwargs)
937
		self.__connected__ = False
938
		self.anti_spam = AntiSpam()
939
940
	@property
941
	def connected(self):
942
		return self.__connected__
943
944
	@connected.setter
945
	def connected(self, value):
946
		self.__connected__ = value
947
948
	def data_received(self, chunk):
949
		pass
950
951
	def on_message(self, json_message):
952
		try:
953
			if not self.connected:
954
				raise ValidationError('Skipping message %s, as websocket is not initialized yet' % json_message)
955
			if not json_message:
956
				raise Exception('Skipping null message')
957
			# self.anti_spam.check_spam(json_message)
958
			self.logger.debug('<< %.1000s', json_message)
959
			message = json.loads(json_message)
960
			if message[VarNames.EVENT] not in self.pre_process_message:
961
				raise Exception("event {} is unknown".format(message[VarNames.EVENT]))
962
			channel = message.get(VarNames.CHANNEL)
963
			if channel and channel not in self.channels:
964
				raise Exception('Access denied for channel {}. Allowed channels: {}'.format(channel, self.channels ))
965
			self.pre_process_message[message[VarNames.EVENT]](message)
966
		except ValidationError as e:
967
			error_message = self.default(str(e.message), Actions.GROWL_MESSAGE, HandlerNames.GROWL)
968
			self.ws_write(error_message)
969
970
	def on_close(self):
971
		if self.async_redis.subscribed:
972
			self.logger.info("Close event, unsubscribing from %s", self.channels)
973
			self.async_redis.unsubscribe(self.channels)
974
		else:
975
			self.logger.info("Close event, not subscribed, channels: %s", self.channels)
976
		log_data = {}
977
		gone_offline = False
978
		for channel in self.channels:
979
			if not isinstance(channel, Number):
980
				continue
981
			self.sync_redis.srem(channel, self.id)
982
			if self.connected:
983
				gone_offline = self.publish_logout(channel, log_data) or gone_offline
984
		if gone_offline:
985
			res = self.do_db(self.execute_query, UPDATE_LAST_READ_MESSAGE, [self.user_id, ])
986
			self.logger.info("Updated %s last read message", res)
987
		self.disconnect(json.dumps(log_data))
988
989
	def disconnect(self, log_data, tries=0):
990
		"""
991
		Closes a connection if it's not in proggress, otherwice timeouts closing
992
		https://github.com/evilkost/brukva/issues/25#issuecomment-9468227
993
		"""
994
		self.connected = False
995
		self.closed_channels = self.channels
996
		self.channels = []
997
		if self.async_redis.connection.in_progress and tries < 1000:  # failsafe eternal loop
998
			self.logger.debug('Closing a connection timeouts')
999
			ioloop.IOLoop.instance().add_timeout(timedelta(0.00001), self.disconnect, log_data, tries+1)
1000
		else:
1001
			self.logger.info("Close connection result: %s", log_data)
1002
			self.async_redis.disconnect()
1003
1004
	def generate_self_id(self):
1005
		"""
1006
		When user opens new tab in browser wsHandler.wsConnectionId stores Id of current ws
1007
		So if ws loses a connection it still can reconnect with same id,
1008
		and TornadoHandler can restore webrtc_connections to previous state
1009
		"""
1010
		conn_arg = self.get_argument('id', None)
1011
		self.id, random = create_id(self.user_id, conn_arg)
1012
		if random != conn_arg:
1013
			self.ws_write(self.set_ws_id(random, self.id))
1014
1015
	def open(self):
1016
		session_key = self.get_cookie(settings.SESSION_COOKIE_NAME)
1017
		if sessionStore.exists(session_key):
1018
			self.ip = self.get_client_ip()
1019
			session = SessionStore(session_key)
1020
			self.user_id = int(session["_auth_user_id"])
1021
			self.generate_self_id()
1022
			log_params = {
1023
				'id': self.id,
1024
				'ip': self.ip
1025
			}
1026
			self._logger = logging.LoggerAdapter(parent_logger, log_params)
1027
			self.logger.debug("!! Incoming connection, session %s, thread hash %s", session_key, self.id)
1028
			self.async_redis.connect()
1029
			user_db = self.do_db(User.objects.get, id=self.user_id)
1030
			self.sender_name = user_db.username
1031
			self.sex = user_db.sex_str
1032
			user_rooms = self.get_users_in_current_user_rooms()
1033
			self.ws_write(self.default(user_rooms, Actions.ROOMS, HandlerNames.CHANNELS))
1034
			# get all missed messages
1035
			self.channels = []  # py2 doesn't support clear()
1036
			self.channels.append(self.channel)
1037
			self.channels.append(self.id)
1038
			for room_id in user_rooms:
1039
				self.channels.append(room_id)
1040
			self.listen(self.channels)
1041
			off_messages = self.get_offline_messages()
1042
			for room_id in user_rooms:
1043
				self.add_online_user(room_id, off_messages.get(room_id))
1044
			self.logger.info("!! User %s subscribes for %s", self.sender_name, self.channels)
1045
			self.connected = True
1046
			Thread(target=self.save_ip).start()
1047
		else:
1048
			self.logger.warning('!! Session key %s has been rejected', str(session_key))
1049
			self.close(403, "Session key %s has been rejected" % session_key)
1050
1051
	def check_origin(self, origin):
1052
		"""
1053
		check whether browser set domain matches origin
1054
		"""
1055
		parsed_origin = urlparse(origin)
1056
		origin = parsed_origin.netloc
1057
		origin_domain = origin.split(':')[0].lower()
1058
		browser_set = self.request.headers.get("Host")
1059
		browser_domain = browser_set.split(':')[0]
1060
		return browser_domain == origin_domain
1061
1062
	def ws_write(self, message):
1063
		"""
1064
		Tries to send message, doesn't throw exception outside
1065
		:type self: MessagesHandler
1066
		"""
1067
		# self.logger.debug('<< THREAD %s >>', os.getppid())
1068
		try:
1069
			if isinstance(message, dict):
1070
				message = json.dumps(message)
1071
			if not isinstance(message, str_type):
1072
				raise ValueError('Wrong message type : %s' % str(message))
1073
			self.logger.debug(">> %.1000s", message)
1074
			self.write_message(message)
1075
		except tornado.websocket.WebSocketClosedError as e:
1076
			self.logger.error("%s. Can't send << %s >> message", e, str(message))
1077
1078
	def get_client_ip(self):
1079
		return self.request.headers.get("X-Real-IP") or self.request.remote_ip
1080