Completed
Push — master ( c0bcf5...f9d889 )
by Andrew
30s
created

MessagesCreator.prepare_img()   A

Complexity

Conditions 4

Size

Total Lines 4

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
cc 4
dl 0
loc 4
rs 9.2
c 0
b 0
f 0
1
import json
2
import logging
3
import sys
4
import time
5
from datetime import timedelta
6
from numbers import Number
7
from threading import Thread
8
9
import tornado.gen
10
import tornado.httpclient
11
import tornado.web
12
import tornado.websocket
13
import tornadoredis
14
from django.conf import settings
15
from django.core.exceptions import ValidationError
16
from django.db import connection, OperationalError, InterfaceError, IntegrityError
17
from django.db.models import Q, F
18
from redis_sessions.session import SessionStore
19
from tornado import ioloop
20
from tornado.websocket import WebSocketHandler
21
22
from chat.cookies_middleware import create_id
23
from chat.log_filters import id_generator
24
from chat.utils import extract_photo
25
from chat.utils import get_or_create_ip
26
27
try:  # py2
28
	from urlparse import urlparse
29
except ImportError:  # py3
30
	from urllib.parse import urlparse
31
from chat.settings import MAX_MESSAGE_SIZE, ALL_ROOM_ID, GENDERS, UPDATE_LAST_READ_MESSAGE, SELECT_SELF_ROOM, \
32
	TORNADO_REDIS_PORT, WEBRTC_CONNECTION
33
from chat.models import User, Message, Room, get_milliseconds, UserJoinedInfo, RoomUsers, Image
34
35
PY3 = sys.version > '3'
36
str_type = str if PY3 else basestring
37
38
39
sessionStore = SessionStore()
40
41
parent_logger = logging.getLogger(__name__)
42
base_logger = logging.LoggerAdapter(parent_logger, {
43
	'id': 0,
44
	'ip': '000.000.000.000'
45
})
46
47
# TODO https://github.com/leporo/tornado-redis#connection-pool-support
48
#CONNECTION_POOL = tornadoredis.ConnectionPool(
49
#	max_connections=500,
50
#	wait_for_available=True)
51
52
53
class Actions(object):
54
	LOGIN = 'addOnlineUser'
55
	SET_WS_ID = 'setWsId'
56
	LOGOUT = 'removeOnlineUser'
57
	SEND_MESSAGE = 'sendMessage'
58
	PRINT_MESSAGE = 'printMessage'
59
	WEBRTC = 'sendRtcData'
60
	CLOSE_FILE_CONNECTION = 'destroyFileConnection'
61
	CLOSE_CALL_CONNECTION = 'destroyCallConnection'
62
	CANCEL_CALL_CONNECTION = 'cancelCallConnection'
63
	ACCEPT_CALL = 'acceptCall'
64
	ACCEPT_FILE = 'acceptFile'
65
	ROOMS = 'setRooms'
66
	REFRESH_USER = 'setOnlineUsers'
67
	GROWL_MESSAGE = 'growl'
68
	GET_MESSAGES = 'loadMessages'
69
	CREATE_DIRECT_CHANNEL = 'addDirectChannel'
70
	DELETE_ROOM = 'deleteRoom'
71
	EDIT_MESSAGE = 'editMessage'
72
	DELETE_MESSAGE = 'deleteMessage'
73
	CREATE_ROOM_CHANNEL = 'addRoom'
74
	INVITE_USER = 'inviteUser'
75
	ADD_USER = 'addUserToDom'
76
	OFFLINE_MESSAGES = 'loadOfflineMessages'
77
	SET_WEBRTC_ID = 'setConnectionId'
78
	SET_WEBRTC_ERROR = 'setError'
79
	OFFER_FILE_CONNECTION = 'offerFile'
80
	OFFER_CALL_CONNECTION = 'offerCall'
81
	REPLY_FILE_CONNECTION = 'replyFile'
82
	REPLY_CALL_CONNECTION = 'replyCall'
83
84
85
class VarNames(object):
86
	WEBRTC_QUED_ID = 'id'
87
	USER = 'user'
88
	USER_ID = 'userId'
89
	TIME = 'time'
90
	CONTENT = 'content'
91
	IMG = 'images'
92
	IMG_B64 = 'b64'
93
	IMG_FILE_NAME = 'fileName'
94
	EVENT = 'action'
95
	MESSAGE_ID = 'id'
96
	GENDER = 'sex'
97
	ROOM_NAME = 'name'
98
	ROOM_ID = 'roomId'
99
	ROOM_USERS = 'users'
100
	CHANNEL = 'channel'
101
	WEBRTC_OPPONENT_ID = 'opponentWsId'
102
	GET_MESSAGES_COUNT = 'count'
103
	GET_MESSAGES_HEADER_ID = 'headerId'
104
	CHANNEL_NAME = 'channel'
105
	IS_ROOM_PRIVATE = 'private'
106
	CONNECTION_ID = 'connId'
107
	HANDLER_NAME = 'handler'
108
109
110
class HandlerNames:
111
	CHANNELS = 'channels'
112
	CHAT = 'chat'
113
	GROWL = 'growl'
114
	WEBRTC = 'webrtc'
115
	PEER_CONNECTION = 'peerConnection'
116
	WEBRTC_TRANSFER = 'webrtcTransfer'
117
	WS = 'ws'
118
119
120
class WebRtcRedisStates:
121
	RESPONDED = 'responded'
122
	READY = 'ready'
123
	OFFERED = 'offered'
124
	CLOSED = 'closed'
125
126
127
class RedisPrefix:
128
	USER_ID_CHANNEL_PREFIX = 'u'
129
	DEFAULT_CHANNEL = ALL_ROOM_ID
130
	CONNECTION_ID_LENGTH = 8  # should be secure
131
132
	@classmethod
133
	def generate_user(cls, key):
134
		return cls.USER_ID_CHANNEL_PREFIX + str(key)
135
136
137
class MessagesCreator(object):
138
139
	def __init__(self, *args, **kwargs):
140
		self.sex = None
141
		self.sender_name = None
142
		self.id = None  # child init
143
		self.user_id = 0  # anonymous by default
144
145
	def default(self, content, event, handler):
146
		"""
147
		:return: {"action": event, "content": content, "time": "20:48:57"}
148
		"""
149
		return {
150
			VarNames.EVENT: event,
151
			VarNames.CONTENT: content,
152
			VarNames.USER_ID: self.user_id,
153
			VarNames.TIME: get_milliseconds(),
154
			VarNames.HANDLER_NAME: handler
155
		}
156
157
	def reply_webrtc(self, event, connection_id):
158
		"""
159
		:return: {"action": event, "content": content, "time": "20:48:57"}
160
		"""
161
		return {
162
			VarNames.EVENT: event,
163
			VarNames.CONNECTION_ID: connection_id,
164
			VarNames.USER_ID: self.user_id,
165
			VarNames.USER: self.sender_name,
166
			VarNames.WEBRTC_OPPONENT_ID: self.id,
167
			VarNames.HANDLER_NAME: HandlerNames.WEBRTC_TRANSFER,
168
		}
169
170
	def set_ws_id(self, random, self_id):
171
		return {
172
			VarNames.HANDLER_NAME: HandlerNames.WS,
173
			VarNames.EVENT: Actions.SET_WS_ID,
174
			VarNames.CONTENT: random,
175
			VarNames.WEBRTC_OPPONENT_ID: self_id
176
		}
177
178
	def room_online(self, online, event, channel):
179
		"""
180
		:return: {"action": event, "content": content, "time": "20:48:57"}
181
		"""
182
		room_less = self.default(online, event, HandlerNames.CHAT)
183
		room_less[VarNames.CHANNEL_NAME] = channel
184
		room_less[VarNames.USER] = self.sender_name
185
		room_less[VarNames.GENDER] = self.sex
186
		return room_less
187
188
	def offer_webrtc(self, content, connection_id, room_id, action):
189
		"""
190
		:return: {"action": "call", "content": content, "time": "20:48:57"}
191
		"""
192
		message = self.default(content, action, HandlerNames.WEBRTC)
193
		message[VarNames.USER] = self.sender_name
194
		message[VarNames.CONNECTION_ID] = connection_id
195
		message[VarNames.WEBRTC_OPPONENT_ID] = self.id
196
		message[VarNames.CHANNEL] = room_id
197
		return message
198
199
	def set_connection_id(self, qued_id, connection_id):
200
		return {
201
			VarNames.EVENT: Actions.SET_WEBRTC_ID,
202
			VarNames.HANDLER_NAME: HandlerNames.WEBRTC,
203
			VarNames.CONNECTION_ID: connection_id,
204
			VarNames.WEBRTC_QUED_ID: qued_id
205
		}
206
207
	def set_webrtc_error(self, error, connection_id, qued_id=None):
208
		message = self.default(error, Actions.SET_WEBRTC_ERROR, HandlerNames.PEER_CONNECTION) # TODO file/call
209
		message[VarNames.CONNECTION_ID] = connection_id
210
		if qued_id:
211
			message[VarNames.WEBRTC_QUED_ID] = qued_id
212
		return message
213
214
	@classmethod
215
	def create_message(cls, message, images):
216
		res = {
217
			VarNames.USER_ID: message.sender_id,
218
			VarNames.CONTENT: message.content,
219
			VarNames.TIME: message.time,
220
			VarNames.MESSAGE_ID: message.id,
221
			VarNames.IMG: images
222
		}
223
		return res
224
225
	@classmethod
226
	def create_send_message(cls, message, event, imgs):
227
		"""
228
		:param message:
229
		:return: "action": "joined", "content": {"v5bQwtWp": "alien", "tRD6emzs": "Alien"},
230
		"sex": "Alien", "user": "tRD6emzs", "time": "20:48:57"}
231
		"""
232
		res = cls.create_message(message, imgs)
233
		res[VarNames.EVENT] = event
234
		res[VarNames.CHANNEL] = message.room_id
235
		res[VarNames.HANDLER_NAME] = HandlerNames.CHAT
236
		return res
237
238
	@classmethod
239
	def append_images(cls, messages, images):
240
		res_mess = []
241
		for message in messages:
242
			res_images = cls.prepare_img(images, message.id)
243
			res_mess.append(cls.create_message(message, res_images))
244
		return res_mess
245
246
	@classmethod
247
	def prepare_img(cls, images, message_id):
248
		if images:
249
			return {x.symbol: x.img.url for x in images if x.message_id == message_id}
250
251
	@classmethod
252
	def get_messages(cls, messages, channel, images):
253
		"""
254
		:type messages: list[Messages]
255
		:type channel: str
256
		:type messages: QuerySet[Messages]
257
		"""
258
		return {
259
			VarNames.CONTENT: cls.append_images(messages, images),
260
			VarNames.EVENT: Actions.GET_MESSAGES,
261
			VarNames.CHANNEL: channel,
262
			VarNames.HANDLER_NAME: HandlerNames.CHAT
263
		}
264
265
	@property
266
	def channel(self):
267
		return RedisPrefix.generate_user(self.user_id)
268
269
	def subscribe_direct_channel_message(self, room_id, other_user_id):
270
		return {
271
			VarNames.EVENT: Actions.CREATE_DIRECT_CHANNEL,
272
			VarNames.ROOM_ID: room_id,
273
			VarNames.ROOM_USERS: [self.user_id, other_user_id],
274
			VarNames.HANDLER_NAME: HandlerNames.CHANNELS
275
		}
276
277
	def subscribe_room_channel_message(self, room_id, room_name):
278
		return {
279
			VarNames.EVENT: Actions.CREATE_ROOM_CHANNEL,
280
			VarNames.ROOM_ID: room_id,
281
			VarNames.ROOM_USERS: [self.user_id],
282
			VarNames.HANDLER_NAME: HandlerNames.CHANNELS,
283
			VarNames.ROOM_NAME: room_name
284
		}
285
286
	def invite_room_channel_message(self, room_id, user_id, room_name, users):
287
		return {
288
			VarNames.EVENT: Actions.INVITE_USER,
289
			VarNames.ROOM_ID: room_id,
290
			VarNames.USER_ID: user_id,
291
			VarNames.HANDLER_NAME: HandlerNames.CHANNELS,
292
			VarNames.ROOM_NAME: room_name,
293
			VarNames.CONTENT: users
294
		}
295
296
	def add_user_to_room(self, channel, user_id, content):
297
		return {
298
			VarNames.EVENT: Actions.ADD_USER,
299
			VarNames.CHANNEL: channel,
300
			VarNames.USER_ID: user_id,
301
			VarNames.HANDLER_NAME: HandlerNames.CHAT,
302
			VarNames.GENDER: content[VarNames.GENDER], # SEX: 'Alien', USER: 'Andrew'
303
			VarNames.USER: content[VarNames.USER] # SEX: 'Alien', USER: 'Andrew'
304
		}
305
306
	def unsubscribe_direct_message(self, room_id):
307
		return {
308
			VarNames.EVENT: Actions.DELETE_ROOM,
309
			VarNames.ROOM_ID: room_id,
310
			VarNames.USER_ID: self.user_id,
311
			VarNames.HANDLER_NAME: HandlerNames.CHANNELS,
312
			VarNames.TIME: get_milliseconds()
313
		}
314
315
	def load_offline_message(self, offline_messages, channel_key):
316
		res = self.default(offline_messages, Actions.OFFLINE_MESSAGES, HandlerNames.CHAT)
317
		res[VarNames.CHANNEL] = channel_key
318
		return res
319
320
321
class MessagesHandler(MessagesCreator):
322
323
	def __init__(self, *args, **kwargs):
324
		self.closed_channels = None
325
		self.parsable_prefix = 'p'
326
		super(MessagesHandler, self).__init__(*args, **kwargs)
327
		self.webrtc_ids = {}
328
		self.ip = None
329
		from chat import global_redis
330
		self.async_redis_publisher = global_redis.async_redis_publisher
331
		self.sync_redis = global_redis.sync_redis
332
		self.channels = []
333
		self._logger = None
334
		self.async_redis = tornadoredis.Client(port=TORNADO_REDIS_PORT)
335
		self.patch_tornadoredis()
336
		self.pre_process_message = {
337
			Actions.GET_MESSAGES: self.process_get_messages,
338
			Actions.SEND_MESSAGE: self.process_send_message,
339
			Actions.WEBRTC: self.proxy_webrtc,
340
			Actions.CLOSE_FILE_CONNECTION: self.close_file_connection,
341
			Actions.CLOSE_CALL_CONNECTION: self.close_call_connection,
342
			Actions.CANCEL_CALL_CONNECTION: self.cancel_call_connection,
343
			Actions.ACCEPT_CALL: self.accept_call,
344
			Actions.ACCEPT_FILE: self.accept_file,
345
			Actions.CREATE_DIRECT_CHANNEL: self.create_user_channel,
346
			Actions.DELETE_ROOM: self.delete_channel,
347
			Actions.EDIT_MESSAGE: self.edit_message,
348
			Actions.CREATE_ROOM_CHANNEL: self.create_new_room,
349
			Actions.INVITE_USER: self.invite_user,
350
			Actions.OFFER_FILE_CONNECTION: self.offer_webrtc_connection,
351
			Actions.OFFER_CALL_CONNECTION: self.offer_webrtc_connection,
352
			Actions.REPLY_FILE_CONNECTION: self.reply_file_connection,
353
			Actions.REPLY_CALL_CONNECTION: self.reply_call_connection,
354
		}
355
		self.post_process_message = {
356
			Actions.CREATE_DIRECT_CHANNEL: self.send_client_new_channel,
357
			Actions.CREATE_ROOM_CHANNEL: self.send_client_new_channel,
358
			Actions.DELETE_ROOM: self.send_client_delete_channel,
359
			Actions.INVITE_USER: self.send_client_new_channel,
360
			Actions.OFFER_FILE_CONNECTION: self.set_opponent_call_channel,
361
			Actions.OFFER_CALL_CONNECTION: self.set_opponent_call_channel
362
		}
363
364
	def patch_tornadoredis(self):  # TODO remove this
365
		fabric = type(self.async_redis.connection.readline)
366
		self.async_redis.connection.old_read = self.async_redis.connection.readline
367
		def new_read(new_self, callback=None):
368
			try:
369
				return new_self.old_read(callback=callback)
370
			except Exception as e:
371
				current_online = self.get_online_from_redis(RedisPrefix.DEFAULT_CHANNEL)
372
				self.logger.error(e)
373
				self.logger.error(
374
					"Exception info: "
375
					"self.id: %s ;;; "
376
					"self.connected = '%s';;; "
377
					"Redis default channel online = '%s';;; "
378
					"self.channels = '%s';;; "
379
					"self.closed_channels  = '%s';;;",
380
					self.id, self.connected, current_online, self.channels, self.closed_channels
381
				)
382
				raise e
383
384
		self.async_redis.connection.readline = fabric(new_read, self.async_redis.connection)
385
386
	@property
387
	def connected(self):
388
		raise NotImplemented
389
390
	@connected.setter
391
	def connected(self, value):
392
		raise NotImplemented
393
394
	@tornado.gen.engine
395
	def listen(self, channels):
396
		yield tornado.gen.Task(
397
			self.async_redis.subscribe, channels)
398
		self.async_redis.listen(self.pub_sub_message)
399
400
	@property
401
	def logger(self):
402
		return self._logger if self._logger else base_logger
403
404
	@tornado.gen.engine
405
	def add_channel(self, channel):
406
		self.channels.append(channel)
407
		yield tornado.gen.Task(
408
			self.async_redis.subscribe, (channel,))
409
410
	def evaluate(self, query_set):
411
		self.do_db(len, query_set)
412
		return query_set
413
414
	def do_db(self, callback, *args, **kwargs):
415
		try:
416
			return callback(*args, **kwargs)
417
		except (OperationalError, InterfaceError) as e:
418
			if 'MySQL server has gone away' in str(e):
419
				self.logger.warning('%s, reconnecting' % e)
420
				connection.close()
421
				return callback(*args, **kwargs)
422
			else:
423
				raise e
424
425
	def execute_query(self, query, *args, **kwargs):
426
		cursor = connection.cursor()
427
		cursor.execute(query, *args, **kwargs)
428
		desc = cursor.description
429
		return [
430
			dict(zip([col[0] for col in desc], row))
431
			for row in cursor.fetchall()
432
			]
433
434
	def get_online_from_redis(self, channel, check_self_online=False):
435
		"""
436
		:rtype : dict
437
		returns (dict, bool) if check_type is present
438
		"""
439
		online = self.sync_redis.smembers(channel)
440
		self.logger.debug('!! channel %s redis online: %s', channel, online)
441
		result = set()
442
		user_is_online = False
443
		# redis stores8 REDIS_USER_FORMAT, so parse them
444
		if online:
445
			for raw in online:  # py2 iteritems
446
				decoded = raw.decode('utf-8')
447
				# : char specified in cookies_middleware.py.create_id
448
				user_id = int(decoded.split(':')[0])
449
				if user_id == self.user_id and decoded != self.id:
450
					user_is_online = True
451
				result.add(user_id)
452
		result = list(result)
453
		return (result, user_is_online) if check_self_online else result
454
455
	def add_online_user(self, room_id, offline_messages=None):
456
		"""
457
		adds to redis
458
		online_users = { connection_hash1 = stored_redis_user1, connection_hash_2 = stored_redis_user2 }
459
		:return:
460
		"""
461
		self.async_redis_publisher.sadd(room_id, self.id)
462
		# since we add user to online first, latest trigger will always show correct online
463
		online, is_online = self.get_online_from_redis(room_id, True)
464
		if not is_online:  # if a new tab has been opened
465
			online.append(self.user_id)
466
			online_user_names_mes = self.room_online(
467
				online,
468
				Actions.LOGIN,
469
				room_id
470
			)
471
			self.logger.info('!! First tab, sending refresh online for all')
472
			self.publish(online_user_names_mes, room_id)
473
			if offline_messages:
474
				self.ws_write(self.load_offline_message(offline_messages, room_id))
475
		else:  # Send user names to self
476
			online_user_names_mes = self.room_online(
477
				online,
478
				Actions.REFRESH_USER,
479
				room_id
480
			)
481
			self.logger.info('!! Second tab, retrieving online for self')
482
			self.ws_write(online_user_names_mes)
483
484
	def publish(self, message, channel, parsable=False):
485
		jsoned_mess = json.dumps(message)
486
		self.logger.debug('<%s> %s', channel, jsoned_mess)
487
		if parsable:
488
			jsoned_mess = self.encode(jsoned_mess)
489
		self.async_redis_publisher.publish(channel, jsoned_mess)
490
491
	def encode(self, message):
492
		"""
493
		Marks message with prefix to specify that
494
		it should be decoded and proccesed before sending to client
495
		@param message: message to mark
496
		@return: marked message
497
		"""
498
		return self.parsable_prefix + message
499
500
	def remove_parsable_prefix(self, message):
501
		if message.startswith(self.parsable_prefix):
502
			return message[1:]
503
504
	def pub_sub_message(self, message):
505
		data = message.body
506
		if isinstance(data, str_type):  # subscribe event
507
			prefixless_str = self.remove_parsable_prefix(data)
508
			if prefixless_str:
509
				dict_message = json.loads(prefixless_str)
510
				res = self.post_process_message[dict_message[VarNames.EVENT]](dict_message)
511
				if not res:
512
					self.ws_write(prefixless_str)
513
			else:
514
				self.ws_write(data)
515
516
	def ws_write(self, message):
517
		raise NotImplementedError('WebSocketHandler implements')
518
519
	def process_send_message(self, message):
520
		"""
521
		:type message: dict
522
		"""
523
		channel = message[VarNames.CHANNEL]
524
		message_db = Message(
525
			sender_id=self.user_id,
526
			content=message[VarNames.CONTENT]
527
		)
528
		message_db.room_id = channel
529
		self.do_db(message_db.save)
530
		imgs = message.get(VarNames.IMG)
531
		res_imgs = []
532
		if imgs:
533
			for k in imgs:
534
				img = extract_photo(imgs[k][VarNames.IMG_B64], message.get(VarNames.IMG_FILE_NAME))
535
				res_imgs.append(Image(message_id=message_db.id, img=img, symbol=k))
536 View Code Duplication
			Image.objects.bulk_create(res_imgs)
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
537
		prepared_message = self.create_send_message(message_db, Actions.PRINT_MESSAGE, self.prepare_img(res_imgs, message_db.id))
538
		self.publish(prepared_message, channel)
539
540
	def close_file_connection(self, in_message):
541
		connection_id = in_message[VarNames.CONNECTION_ID]
542
		self_channel_status = self.sync_redis.shget(connection_id, self.id)
543
		if not self_channel_status:
544
			raise Exception("Access Denied")
545
		if self_channel_status != WebRtcRedisStates.CLOSED:
546
			sender_id = self.sync_redis.shget(WEBRTC_CONNECTION, connection_id)
547
			if sender_id == self.id:
548
				self.close_sender(connection_id)
549
			else:
550
				self.close_receiver(connection_id, in_message, sender_id)
551
			self.async_redis_publisher.hset(connection_id, self.id, WebRtcRedisStates.CLOSED)
552
553
	def close_call_connection(self, in_message):
554
		connection_id = in_message[VarNames.CONNECTION_ID]
555
		conn_users = self.sync_redis.shgetall(connection_id)
556 View Code Duplication
		if conn_users[self.id] in [WebRtcRedisStates.READY, WebRtcRedisStates.RESPONDED]:
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
557
			self.async_redis_publisher.hset(connection_id, self.id, WebRtcRedisStates.CLOSED)
558
			del conn_users[self.id]
559
			message = {
560
				VarNames.EVENT: Actions.CLOSE_CALL_CONNECTION,
561
				VarNames.CONNECTION_ID: connection_id,
562
				VarNames.USER_ID: self.user_id,
563
				VarNames.WEBRTC_OPPONENT_ID: self.id,
564
				VarNames.HANDLER_NAME: HandlerNames.PEER_CONNECTION,
565
			}
566
			for user in conn_users:
567
				if conn_users[user] != WebRtcRedisStates.CLOSED:
568
					self.publish(message, user)
569
		else:
570
			raise ValidationError("Invalid channel status.")
571
572
	def cancel_call_connection(self, in_message):
573
		connection_id = in_message[VarNames.CONNECTION_ID]
574
		conn_users = self.sync_redis.shgetall(connection_id)
575
		if conn_users[self.id] == WebRtcRedisStates.OFFERED:
576
			self.async_redis_publisher.hset(connection_id, self.id, WebRtcRedisStates.CLOSED)
577
			del conn_users[self.id]
578
			message = self.reply_webrtc(Actions.CANCEL_CALL_CONNECTION, connection_id)
579
			for user in conn_users:
580
				if conn_users[user] != WebRtcRedisStates.CLOSED:
581
					self.publish(message, user)
582
		else:
583
			raise ValidationError("Invalid channel status.")
584
585
	def close_receiver(self, connection_id, in_message, sender_id): # TODO for call we should close all
586
		sender_status = self.sync_redis.shget(connection_id, sender_id)
587
		if not sender_status:
588
			raise Exception("Access denied")
589
		if sender_status != WebRtcRedisStates.CLOSED:
590
			in_message[VarNames.WEBRTC_OPPONENT_ID] = self.id
591
			in_message[VarNames.HANDLER_NAME] = HandlerNames.PEER_CONNECTION
592
			self.publish(in_message, sender_id)
593
594
	def close_sender(self, connection_id):
595
		values = self.sync_redis.shgetall(connection_id)
596
		del values[self.id]
597
		for ws_id in values:
598
			if values[ws_id] == WebRtcRedisStates.CLOSED:
599
				continue
600
			self.publish({
601
				VarNames.EVENT: Actions.CLOSE_FILE_CONNECTION,
602
				VarNames.CONNECTION_ID: connection_id,
603
				VarNames.WEBRTC_OPPONENT_ID: self.id,
604
				VarNames.HANDLER_NAME: HandlerNames.WEBRTC_TRANSFER,
605
			}, ws_id)
606
607
	def accept_file(self, in_message):
608
		connection_id = in_message[VarNames.CONNECTION_ID] # TODO accept all if call
609
		sender_ws_id = self.sync_redis.shget(WEBRTC_CONNECTION, connection_id)
610
		sender_ws_status = self.sync_redis.shget(connection_id, sender_ws_id)
611
		self_ws_status = self.sync_redis.shget(connection_id, self.id)
612
		if sender_ws_status == WebRtcRedisStates.READY and self_ws_status == WebRtcRedisStates.RESPONDED:
613
			self.async_redis_publisher.hset(connection_id, self.id, WebRtcRedisStates.READY)
614
			self.publish({
615
				VarNames.EVENT: Actions.ACCEPT_FILE,
616
				VarNames.CONNECTION_ID: connection_id,
617
				VarNames.WEBRTC_OPPONENT_ID: self.id,
618
				VarNames.HANDLER_NAME: HandlerNames.PEER_CONNECTION,
619
			}, sender_ws_id)
620
		else:
621
			raise ValidationError("Invalid channel status")
622
623
	# todo if we shgetall and only then do async hset
624
	# todo we can catch an issue when 2 concurrent users accepted the call
625 View Code Duplication
	# todo but we didn't  send them ACCEPT_CALL as they both were in status 'offered'
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
626
	# def accept_call(self, in_message):
627
	# 	connection_id = in_message[VarNames.CONNECTION_ID]
628
	# 	channel_status = self.sync_redis.shgetall(connection_id)
629
	# 	if channel_status and channel_status[self.id] == WebRtcRedisStates.RESPONDED:
630
	# 		self.async_redis_publisher.hset(connection_id, self.id, WebRtcRedisStates.READY)
631
	# 		for key in channel_status:  # del channel_status[self.id] not needed as self in responded
632
	# 			if channel_status[key] == WebRtcRedisStates.READY:
633
	# 				self.publish({
634
	# 					VarNames.EVENT: Actions.ACCEPT_CALL,
635
	# 					VarNames.CONNECTION_ID: connection_id,
636
	# 					VarNames.WEBRTC_OPPONENT_ID: self.id,
637
	# 					VarNames.HANDLER_NAME: HandlerNames.WEBRTC_TRANSFER,
638
	# 				}, key)
639
	# 	else:
640
	# 		raise ValidationError("Invalid channel status")
641
642
	def accept_call(self, in_message):
643
		connection_id = in_message[VarNames.CONNECTION_ID]
644
		self_status = self.sync_redis.shget(connection_id, self.id)
645 View Code Duplication
		if self_status == WebRtcRedisStates.RESPONDED:
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
646
			self.sync_redis.hset(connection_id, self.id, WebRtcRedisStates.READY)
647
			channel_status = self.sync_redis.shgetall(connection_id)
648
			del channel_status[self.id]
649
			message = {
650
				VarNames.EVENT: Actions.ACCEPT_CALL,
651
				VarNames.USER_ID: self.user_id,
652
				VarNames.CONNECTION_ID: connection_id,
653
				VarNames.WEBRTC_OPPONENT_ID: self.id,
654
				VarNames.HANDLER_NAME: HandlerNames.WEBRTC_TRANSFER,
655
			}
656
			for key in channel_status:
657
				if channel_status[key] != WebRtcRedisStates.CLOSED:
658
					self.publish(message, key)
659
		else:
660
			raise ValidationError("Invalid channel status")
661
662
	def offer_webrtc_connection(self, in_message):
663
		room_id = in_message[VarNames.CHANNEL]
664
		content = in_message.get(VarNames.CONTENT)
665
		qued_id = in_message[VarNames.WEBRTC_QUED_ID]
666
		connection_id = id_generator(RedisPrefix.CONNECTION_ID_LENGTH)
667
		# use list because sets dont have 1st element which is offerer
668
		self.async_redis_publisher.hset(WEBRTC_CONNECTION, connection_id, self.id)
669
		self.async_redis_publisher.hset(connection_id, self.id, WebRtcRedisStates.READY)
670
		opponents_message = self.offer_webrtc(content, connection_id, room_id, in_message[VarNames.EVENT])
671
		self_message = self.set_connection_id(qued_id, connection_id)
672
		self.ws_write(self_message)
673
		self.logger.info('!! Offering a webrtc, connection_id %s', connection_id)
674
		self.publish(opponents_message, room_id, True)
675
676
	def reply_call_connection(self, in_message):
677
		connection_id = in_message[VarNames.CONNECTION_ID]
678
		conn_users = self.sync_redis.shgetall(connection_id)
679
		if conn_users[self.id] == WebRtcRedisStates.OFFERED:
680
			self.async_redis_publisher.hset(connection_id, self.id, WebRtcRedisStates.RESPONDED)
681
			del conn_users[self.id]
682
			message = self.reply_webrtc(Actions.REPLY_CALL_CONNECTION, connection_id)
683
			for user in conn_users:
684
				if conn_users[user] != WebRtcRedisStates.CLOSED:
685
					self.publish(message, user)
686
		else:
687
			raise ValidationError("Invalid channel status.")
688
689
	def reply_file_connection(self, in_message):
690
		connection_id = in_message[VarNames.CONNECTION_ID]
691
		sender_ws_id = self.sync_redis.shget(WEBRTC_CONNECTION, connection_id)
692
		sender_ws_status = self.sync_redis.shget(connection_id, sender_ws_id)
693
		self_ws_status = self.sync_redis.shget(connection_id, self.id)
694
		if sender_ws_status == WebRtcRedisStates.READY and self_ws_status == WebRtcRedisStates.OFFERED:
695
			self.async_redis_publisher.hset(connection_id, self.id, WebRtcRedisStates.RESPONDED)
696
			self.publish(self.reply_webrtc(Actions.REPLY_FILE_CONNECTION, connection_id), sender_ws_id)
697
		else:
698
			raise ValidationError("Invalid channel status.")
699
700
	def proxy_webrtc(self, in_message):
701
		"""
702
		:type in_message: dict
703
		"""
704
		connection_id = in_message[VarNames.CONNECTION_ID]
705
		channel = in_message.get(VarNames.WEBRTC_OPPONENT_ID)
706
		self_channel_status = self.sync_redis.shget(connection_id, self.id)
707
		opponent_channel_status = self.sync_redis.shget(connection_id, channel)
708
		if not (self_channel_status == WebRtcRedisStates.READY and opponent_channel_status == WebRtcRedisStates.READY):
709
			raise ValidationError('Error in connection status, your status is {} while opponent is {}'.format(
710
				self_channel_status, opponent_channel_status
711
			)) # todo receiver should only accept proxy_webrtc from sender, sender can accept all
712
		# I mean somebody if there're 3 ppl in 1 channel and first is initing transfer to 2nd and 3rd,
713
		# 2nd guy can fraud 3rd guy webrtc traffic, which is allowed during the call, but not while transering file
714
		in_message[VarNames.WEBRTC_OPPONENT_ID] = self.id
715
		in_message[VarNames.HANDLER_NAME] = HandlerNames.PEER_CONNECTION
716
		self.logger.debug("Forwarding message to channel %s, self %s, other status %s",
717
			channel, self_channel_status, opponent_channel_status
718
		)
719
		self.publish(in_message, channel)
720
721
	def create_new_room(self, message):
722
		room_name = message[VarNames.ROOM_NAME]
723
		if not room_name or len(room_name) > 16:
724
			raise ValidationError('Incorrect room name "{}"'.format(room_name))
725
		room = Room(name=room_name)
726
		self.do_db(room.save)
727
		RoomUsers(room_id=room.id, user_id=self.user_id).save()
728
		subscribe_message = self.subscribe_room_channel_message(room.id, room_name)
729
		self.publish(subscribe_message, self.channel, True)
730
731
	def invite_user(self, message):
732
		room_id = message[VarNames.ROOM_ID]
733
		user_id = message[VarNames.USER_ID]
734
		if room_id not in self.channels:
735
			raise ValidationError("Access denied, only allowed for channels {}".format(self.channels))
736
		room = self.do_db(Room.objects.get, id=room_id)
737
		if room.is_private:
738
			raise ValidationError("You can't add users to direct room, create a new room instead")
739
		try:
740
			Room.users.through.objects.create(room_id=room_id, user_id=user_id)
741
		except IntegrityError:
742
			raise ValidationError("User is already in channel")
743
		users_in_room = {}
744
		for user in room.users.all():
745
			self.set_js_user_structure(users_in_room, user.id, user.username, user.sex)
746
		self.publish(self.add_user_to_room(room_id, user_id, users_in_room[user_id]), room_id)
747
		subscribe_message = self.invite_room_channel_message(room_id, user_id, room.name, users_in_room)
748
		self.publish(subscribe_message, RedisPrefix.generate_user(user_id), True)
749
750
	def create_room(self, user_rooms, user_id):
751
		if self.user_id == user_id:
752
			room_ids = list([room['room_id'] for room in self.evaluate(user_rooms)])
753
			query_res = self.execute_query(SELECT_SELF_ROOM, [room_ids, ])
754
		else:
755
			rooms_query = RoomUsers.objects.filter(user_id=user_id, room__in=user_rooms)
756
			query_res = rooms_query.values('room__id', 'room__disabled')
757
		try:
758
			room = self.do_db(query_res.get)
759
			room_id = room['room__id']
760
			self.update_room(room_id, room['room__disabled'])
761
		except RoomUsers.DoesNotExist:
762
			room = Room()
763
			room.save()
764
			room_id = room.id
765
			if self.user_id == user_id:
766
				RoomUsers(user_id=self.user_id, room_id=room_id).save()
767
			else:
768
				RoomUsers.objects.bulk_create([
769
					RoomUsers(user_id=user_id, room_id=room_id),
770
					RoomUsers(user_id=self.user_id, room_id=room_id),
771
				])
772
		return room_id
773
774
	def update_room(self, room_id, disabled):
775
		if not disabled:
776
			raise ValidationError('This room already exist')
777
		else:
778
			Room.objects.filter(id=room_id).update(disabled=False)
779
780
	def create_user_channel(self, message):
781
		user_id = message[VarNames.USER_ID]
782
		# get all self private rooms ids
783
		user_rooms = Room.users.through.objects.filter(user_id=self.user_id, room__name__isnull=True).values('room_id')
784
		# get private room that contains another user from rooms above
785
		room_id = self.create_room(user_rooms, user_id)
786
		subscribe_message = self.subscribe_direct_channel_message(room_id, user_id)
787
		self.publish(subscribe_message, self.channel, True)
788
		other_channel = RedisPrefix.generate_user(user_id)
789
		if self.channel != other_channel:
790
			self.publish(subscribe_message, other_channel, True)
791
792
	def delete_channel(self, message):
793
		room_id = message[VarNames.ROOM_ID]
794
		if room_id not in self.channels or room_id == ALL_ROOM_ID:
795
			raise ValidationError('You are not allowed to exit this room')
796
		room = self.do_db(Room.objects.get, id=room_id)
797
		if room.disabled:
798
			raise ValidationError('Room is already deleted')
799
		if room.name is None:  # if private then disable
800
			room.disabled = True
801
		else: # if public -> leave the room, delete the link
802
			RoomUsers.objects.filter(room_id=room.id, user_id=self.user_id).delete()
803
			online = self.get_online_from_redis(room_id)
804
			online.remove(self.user_id)
805
			self.publish(self.room_online(online, Actions.LOGOUT, room_id), room_id)
806
		room.save()
807
		message = self.unsubscribe_direct_message(room_id)
808
		self.publish(message, room_id, True)
809
810
	def edit_message(self, data):
811
		message_id = data[VarNames.MESSAGE_ID]
812
		message = Message.objects.get(id=message_id)
813
		if message.sender_id != self.user_id:
814
			raise ValidationError("You can only edit your messages")
815
		if message.time + 60000 < get_milliseconds():
816
			raise ValidationError("You can only edit messages that were send not more than 1 min ago")
817
		if message.deleted:
818
			raise ValidationError("Already deleted")
819
		message.content = data[VarNames.CONTENT]
820
		selector = Message.objects.filter(id=message_id)
821
		if message.content is None:
822
			selector.update(deleted=True)
823
			action = Actions.DELETE_MESSAGE
824
		else:
825
			action = Actions.EDIT_MESSAGE
826
			selector.update(content=message.content)
827
		self.publish(self.create_send_message(message, action, None), message.room_id) # TODO we're deleting old images
828
829
	def send_client_new_channel(self, message):
830
		room_id = message[VarNames.ROOM_ID]
831
		self.add_channel(room_id)
832
		self.add_online_user(room_id)
833
834
	def set_opponent_call_channel(self, message):
835
		connection_id = message[VarNames.CONNECTION_ID]
836
		if message[VarNames.WEBRTC_OPPONENT_ID] == self.id:
837
			return True
838
		self.sync_redis.hset(connection_id, self.id, WebRtcRedisStates.OFFERED)
839
840
	def send_client_delete_channel(self, message):
841
		room_id = message[VarNames.ROOM_ID]
842
		self.async_redis.unsubscribe((room_id,))
843
		self.async_redis_publisher.hdel(room_id, self.id)
844
		self.channels.remove(room_id)
845
846
	def process_get_messages(self, data):
847
		"""
848
		:type data: dict
849
		"""
850
		header_id = data.get(VarNames.GET_MESSAGES_HEADER_ID, None)
851
		count = int(data.get(VarNames.GET_MESSAGES_COUNT, 10))
852
		room_id = data[VarNames.CHANNEL]
853
		self.logger.info('!! Fetching %d messages starting from %s', count, header_id)
854
		if header_id is None:
855
			messages = Message.objects.filter(Q(room_id=room_id), Q(deleted=False)).order_by('-pk')[:count]
856
		else:
857
			messages = Message.objects.filter(Q(id__lt=header_id), Q(room_id=room_id), Q(deleted=False)).order_by('-pk')[:count]
858
		images = self.do_db(self.get_message_images, messages)
859
		response = self.get_messages(messages, room_id, images)
860
		self.ws_write(response)
861
862
	def get_message_images(self, messages):
863
		ids =[message.id for message in messages]
864
		images = Image.objects.filter(message_id__in=ids)
865
		self.logger.info('!! Messages have %d images', len(images))
866
		return images
867
868
	def get_offline_messages(self):
869
		res = {}
870
		off_mess = Message.objects.filter(
871
			id__gt=F('room__roomusers__last_read_message_id'),
872
			deleted=False,
873
			room__roomusers__user_id=self.user_id
874
		)
875
		images = self.do_db(self.get_message_images, off_mess)
876
		for message in off_mess:
877
			prep_m = self.create_message(message, self.prepare_img(images, message.id))
878
			res.setdefault(message.room_id, []).append(prep_m)
879
		return res
880
881
	def get_users_in_current_user_rooms(self):
882
		"""
883
		{
884
			"ROOM_ID:1": {
885
				"name": "All",
886
				"users": {
887
					"USER_ID:admin": {
888
						"name": "USER_NAME:admin",
889
						"sex": "SEX:Secret"
890
					},
891
					"USER_ID_2": {
892
						"name": "USER_NAME:Mike",
893
						"sex": "Male"
894
					}
895
				},
896
				"isPrivate": true
897
			}
898
		}
899
		"""
900
		user_rooms = Room.objects.filter(users__id=self.user_id, disabled=False).values('id', 'name')
901
		res = {room['id']: {
902
				VarNames.ROOM_NAME: room['name'],
903
				VarNames.ROOM_USERS: {}
904
			} for room in user_rooms}
905
		room_ids = (room_id for room_id in res)
906
		rooms_users = User.objects.filter(rooms__in=room_ids).values('id', 'username', 'sex', 'rooms__id')
907
		for user in rooms_users:
908
			self.set_js_user_structure(res[user['rooms__id']][VarNames.ROOM_USERS], user['id'], user['username'], user['sex'])
909
		return res
910
911
	def set_js_user_structure(self, user_dict, user_id, name, sex):
912
		user_dict[user_id] = {
913
			VarNames.USER: name,
914
			VarNames.GENDER: GENDERS[sex]
915
		}
916
917
	def save_ip(self):
918
		if (self.do_db(UserJoinedInfo.objects.filter(
919
				Q(ip__ip=self.ip) & Q(user_id=self.user_id)).exists)):
920
			return
921
		ip_address = get_or_create_ip(self.ip, self.logger)
922
		UserJoinedInfo.objects.create(
923
			ip=ip_address,
924
			user_id=self.user_id
925
		)
926
927
	def publish_logout(self, channel, log_data):
928
		# seems like async solves problem with connection lost and wrong data status
929
		# http://programmers.stackexchange.com/questions/294663/how-to-store-online-status
930
		online, is_online = self.get_online_from_redis(channel, True)
931
		log_data[channel] = {'online': online, 'is_online': is_online}
932
		if not is_online:
933
			message = self.room_online(online, Actions.LOGOUT, channel)
934
			self.publish(message, channel)
935
			return True
936
937
938
class AntiSpam(object):
939
940
	def __init__(self):
941
		self.spammed = 0
942
		self.info = {}
943
944
	def check_spam(self, json_message):
945
		message_length = len(json_message)
946
		info_key = int(round(time.time() * 100))
947
		self.info[info_key] = message_length
948
		if message_length > MAX_MESSAGE_SIZE:
949
			self.spammed += 1
950
			raise ValidationError("Message can't exceed %d symbols" % MAX_MESSAGE_SIZE)
951
		self.check_timed_spam()
952
953
	def check_timed_spam(self):
954
		# TODO implement me
955
		pass
956
		# raise ValidationError("You're chatting too much, calm down a bit!")
957
958
959
class TornadoHandler(WebSocketHandler, MessagesHandler):
960
961
	def __init__(self, *args, **kwargs):
962
		super(TornadoHandler, self).__init__(*args, **kwargs)
963
		self.__connected__ = False
964
		self.anti_spam = AntiSpam()
965
966
	@property
967
	def connected(self):
968
		return self.__connected__
969
970
	@connected.setter
971
	def connected(self, value):
972
		self.__connected__ = value
973
974
	def data_received(self, chunk):
975
		pass
976
977
	def on_message(self, json_message):
978
		try:
979
			if not self.connected:
980
				raise ValidationError('Skipping message %s, as websocket is not initialized yet' % json_message)
981
			if not json_message:
982
				raise Exception('Skipping null message')
983
			# self.anti_spam.check_spam(json_message)
984
			self.logger.debug('<< %.1000s', json_message)
985
			message = json.loads(json_message)
986
			if message[VarNames.EVENT] not in self.pre_process_message:
987
				raise Exception("event {} is unknown".format(message[VarNames.EVENT]))
988
			channel = message.get(VarNames.CHANNEL)
989
			if channel and channel not in self.channels:
990
				raise Exception('Access denied for channel {}. Allowed channels: {}'.format(channel, self.channels ))
991
			self.pre_process_message[message[VarNames.EVENT]](message)
992
		except ValidationError as e:
993
			error_message = self.default(str(e.message), Actions.GROWL_MESSAGE, HandlerNames.GROWL)
994
			self.ws_write(error_message)
995
996
	def on_close(self):
997
		if self.async_redis.subscribed:
998
			self.logger.info("Close event, unsubscribing from %s", self.channels)
999
			self.async_redis.unsubscribe(self.channels)
1000
		else:
1001
			self.logger.info("Close event, not subscribed, channels: %s", self.channels)
1002
		log_data = {}
1003
		gone_offline = False
1004
		for channel in self.channels:
1005
			if not isinstance(channel, Number):
1006
				continue
1007
			self.sync_redis.srem(channel, self.id)
1008
			if self.connected:
1009
				gone_offline = self.publish_logout(channel, log_data) or gone_offline
1010
		if gone_offline:
1011
			res = self.do_db(self.execute_query, UPDATE_LAST_READ_MESSAGE, [self.user_id, ])
1012
			self.logger.info("Updated %s last read message", res)
1013
		self.disconnect(json.dumps(log_data))
1014
1015
	def disconnect(self, log_data, tries=0):
1016
		"""
1017
		Closes a connection if it's not in proggress, otherwice timeouts closing
1018
		https://github.com/evilkost/brukva/issues/25#issuecomment-9468227
1019
		"""
1020
		self.connected = False
1021
		self.closed_channels = self.channels
1022
		self.channels = []
1023
		if self.async_redis.connection.in_progress and tries < 1000:  # failsafe eternal loop
1024
			self.logger.debug('Closing a connection timeouts')
1025
			ioloop.IOLoop.instance().add_timeout(timedelta(0.00001), self.disconnect, log_data, tries+1)
1026
		else:
1027
			self.logger.info("Close connection result: %s", log_data)
1028
			self.async_redis.disconnect()
1029
1030
	def generate_self_id(self):
1031
		"""
1032
		When user opens new tab in browser wsHandler.wsConnectionId stores Id of current ws
1033
		So if ws loses a connection it still can reconnect with same id,
1034
		and TornadoHandler can restore webrtc_connections to previous state
1035
		"""
1036
		conn_arg = self.get_argument('id', None)
1037
		self.id, random = create_id(self.user_id, conn_arg)
1038
		if random != conn_arg:
1039
			self.ws_write(self.set_ws_id(random, self.id))
1040
1041
	def open(self):
1042
		session_key = self.get_cookie(settings.SESSION_COOKIE_NAME)
1043
		if sessionStore.exists(session_key):
1044
			self.ip = self.get_client_ip()
1045
			session = SessionStore(session_key)
1046
			self.user_id = int(session["_auth_user_id"])
1047
			self.generate_self_id()
1048
			log_params = {
1049
				'id': self.id,
1050
				'ip': self.ip
1051
			}
1052
			self._logger = logging.LoggerAdapter(parent_logger, log_params)
1053
			self.logger.debug("!! Incoming connection, session %s, thread hash %s", session_key, self.id)
1054
			self.async_redis.connect()
1055
			user_db = self.do_db(User.objects.get, id=self.user_id)
1056
			self.sender_name = user_db.username
1057
			self.sex = user_db.sex_str
1058
			user_rooms = self.get_users_in_current_user_rooms()
1059
			self.ws_write(self.default(user_rooms, Actions.ROOMS, HandlerNames.CHANNELS))
1060
			# get all missed messages
1061
			self.channels = []  # py2 doesn't support clear()
1062
			self.channels.append(self.channel)
1063
			self.channels.append(self.id)
1064
			for room_id in user_rooms:
1065
				self.channels.append(room_id)
1066
			self.listen(self.channels)
1067
			off_messages = self.get_offline_messages()
1068
			for room_id in user_rooms:
1069
				self.add_online_user(room_id, off_messages.get(room_id))
1070
			self.logger.info("!! User %s subscribes for %s", self.sender_name, self.channels)
1071
			self.connected = True
1072
			Thread(target=self.save_ip).start()
1073
		else:
1074
			self.logger.warning('!! Session key %s has been rejected', str(session_key))
1075
			self.close(403, "Session key %s has been rejected" % session_key)
1076
1077
	def check_origin(self, origin):
1078
		"""
1079
		check whether browser set domain matches origin
1080
		"""
1081
		parsed_origin = urlparse(origin)
1082
		origin = parsed_origin.netloc
1083
		origin_domain = origin.split(':')[0].lower()
1084
		browser_set = self.request.headers.get("Host")
1085
		browser_domain = browser_set.split(':')[0]
1086
		return browser_domain == origin_domain
1087
1088
	def ws_write(self, message):
1089
		"""
1090
		Tries to send message, doesn't throw exception outside
1091
		:type self: MessagesHandler
1092
		"""
1093
		# self.logger.debug('<< THREAD %s >>', os.getppid())
1094
		try:
1095
			if isinstance(message, dict):
1096
				message = json.dumps(message)
1097
			if not isinstance(message, str_type):
1098
				raise ValueError('Wrong message type : %s' % str(message))
1099
			self.logger.debug(">> %.1000s", message)
1100
			self.write_message(message)
1101
		except tornado.websocket.WebSocketClosedError as e:
1102
			self.logger.error("%s. Can't send << %s >> message", e, str(message))
1103
1104
	def get_client_ip(self):
1105
		return self.request.headers.get("X-Real-IP") or self.request.remote_ip
1106