TornadoHandler.generate_self_id()   A
last analyzed

Complexity

Conditions 2

Size

Total Lines 13

Duplication

Lines 0
Ratio 0 %

Importance

Changes 1
Bugs 0 Features 0
Metric Value
cc 2
c 1
b 0
f 0
dl 0
loc 13
rs 9.75
1
import json
2
import logging
3
from datetime import timedelta
4
from itertools import chain
5
from numbers import Number
6
from threading import Thread
7
8
from django.conf import settings
9
from django.core.exceptions import ValidationError
10
from django.db.models import F, Q
11
from redis_sessions.session import SessionStore
12
from tornado import ioloop
13
from tornado.httpclient import AsyncHTTPClient, HTTPRequest
14
from tornado.web import asynchronous
15
from tornado.websocket import WebSocketHandler, WebSocketClosedError
16
17
from chat.cookies_middleware import create_id
18
from chat.models import User, Message, UserJoinedInfo, IpAddress, Room, RoomUsers
19
from chat.py2_3 import str_type, urlparse
20
from chat.tornado.anti_spam import AntiSpam
21
from chat.tornado.constants import VarNames, HandlerNames, Actions, RedisPrefix
22
from chat.tornado.message_creator import MessagesCreator
23
from chat.tornado.message_handler import MessagesHandler, WebRtcMessageHandler
24
from chat.utils import execute_query, do_db, \
25
	get_message_images_videos, get_or_create_ip_wrapper, create_ip_structure, get_history_message_query
26
27
sessionStore = SessionStore()
28
29
parent_logger = logging.getLogger(__name__)
30
31
32
class TornadoHandler(WebSocketHandler, WebRtcMessageHandler):
33
34
	def __init__(self, *args, **kwargs):
35
		super(TornadoHandler, self).__init__(*args, **kwargs)
36
		self.__connected__ = False
37
		self.restored_connection = False
38
		self.__http_client__ = AsyncHTTPClient()
39
		self.anti_spam = AntiSpam()
40
41
	@property
42
	def connected(self):
43
		return self.__connected__
44
45
	@connected.setter
46
	def connected(self, value):
47
		self.__connected__ = value
48
49
	@property
50
	def http_client(self):
51
		"""
52
		@type: AsyncHTTPClient
53
		"""
54
		return self.__http_client__
55
56
	def data_received(self, chunk):
57
		pass
58
59
	def on_message(self, json_message):
60
		try:
61
			if not self.connected:
62
				raise ValidationError('Skipping message %s, as websocket is not initialized yet' % json_message)
63
			if not json_message:
64
				raise Exception('Skipping null message')
65
			# self.anti_spam.check_spam(json_message)
66
			self.logger.debug('<< %.1000s', json_message)
67
			message = json.loads(json_message)
68
			if message[VarNames.EVENT] not in self.process_ws_message:
69
				raise Exception("event {} is unknown".format(message[VarNames.EVENT]))
70
			channel = message.get(VarNames.ROOM_ID)
71
			if channel and channel not in self.channels:
72
				raise ValidationError('Access denied for channel {}. Allowed channels: {}'.format(channel, self.channels))
73
			self.process_ws_message[message[VarNames.EVENT]](message)
74
		except ValidationError as e:
75
			error_message = self.default(str(e.message), Actions.GROWL_MESSAGE, HandlerNames.GROWL)
76
			self.ws_write(error_message)
77
78
	def on_close(self):
79
		if self.async_redis.subscribed:
80
			self.logger.info("Close event, unsubscribing from %s", self.channels)
81
			self.async_redis.unsubscribe(self.channels)
82
		else:
83
			self.logger.info("Close event, not subscribed, channels: %s", self.channels)
84
		self.async_redis_publisher.srem(RedisPrefix.ONLINE_VAR, self.id)
85
		is_online, online = self.get_online_and_status_from_redis()
86
		if self.connected:
87
			if not is_online:
88
				message = self.room_online(online, Actions.LOGOUT)
89
				self.publish(message, settings.ALL_ROOM_ID)
90
			res = do_db(execute_query, settings.UPDATE_LAST_READ_MESSAGE, [self.user_id, ])
91
			self.logger.info("Updated %s last read message", res)
92
		self.disconnect()
93
94
	def disconnect(self, tries=0):
95
		"""
96
		Closes a connection if it's not in proggress, otherwice timeouts closing
97
		https://github.com/evilkost/brukva/issues/25#issuecomment-9468227
98
		"""
99
		self.connected = False
100
		self.closed_channels = self.channels
101
		self.channels = []
102
		if self.async_redis.connection.in_progress and tries < 1000:  # failsafe eternal loop
103
			self.logger.debug('Closing a connection timeouts')
104
			ioloop.IOLoop.instance().add_timeout(timedelta(0.00001), self.disconnect, tries+1)
105
		else:
106
			self.logger.info("Close connection result: %s")
107
			self.async_redis.disconnect()
108
109
	def generate_self_id(self):
110
		"""
111
		When user opens new tab in browser wsHandler.wsConnectionId stores Id of current ws
112
		So if ws loses a connection it still can reconnect with same id,
113
		and TornadoHandler can restore webrtc_connections to previous state
114
		"""
115
		conn_arg = self.get_argument('id', None)
116
		self.id, random = create_id(self.user_id, conn_arg)
117
		if random != conn_arg:
118
			self.restored_connection = False
119
			self.ws_write(self.set_ws_id(random, self.id))
120
		else:
121
			self.restored_connection = True
122
123
	def open(self):
124
		session_key = self.get_cookie(settings.SESSION_COOKIE_NAME)
125
		if sessionStore.exists(session_key):
126
			self.ip = self.get_client_ip()
127
			session = SessionStore(session_key)
128
			self.user_id = int(session["_auth_user_id"])
129
			self.generate_self_id()
130
			self._logger = logging.LoggerAdapter(parent_logger, {
131
				'id': self.id,
132
				'ip': self.ip
133
			})
134
			cookies = ["{}={}".format(k, self.request.cookies[k].value) for k in self.request.cookies]
135
			self.logger.debug("!! Incoming connection, session %s, thread hash %s, cookies: %s", session_key, self.id, ";".join(cookies))
136
			self.async_redis.connect()
137
			self.async_redis_publisher.sadd(RedisPrefix.ONLINE_VAR, self.id)
138
			# since we add user to online first, latest trigger will always show correct online
139
			was_online, online = self.get_online_and_status_from_redis()
140
			user_db = do_db(User.objects.get, id=self.user_id)
141
			self.sender_name = user_db.username
142
			self.sex = user_db.sex_str
143
			user_rooms1 = Room.objects.filter(users__id=self.user_id, disabled=False)\
144
				.values('id', 'name', 'roomusers__notifications', 'roomusers__volume')
145
			user_rooms = MessagesCreator.create_user_rooms(user_rooms1)
146
			room_ids = [room_id for room_id in user_rooms]
147
			rooms_users = RoomUsers.objects.filter(room_id__in=room_ids).values('user_id', 'room_id')
148
			for ru in rooms_users:
149
				user_rooms[ru['room_id']][VarNames.ROOM_USERS].append(ru['user_id'])
150
			# get all missed messages
151
			self.channels = room_ids  # py2 doesn't support clear()
152
			self.channels.append(self.channel)
153
			self.channels.append(self.id)
154
			self.listen(self.channels)
155
			off_messages, history = self.get_offline_messages(user_rooms, was_online, self.get_argument('history', False))
156
			for room_id in user_rooms:
157
				h = history.get(room_id)
158
				o = off_messages.get(room_id)
159
				if h:
160
					user_rooms[room_id][VarNames.LOAD_MESSAGES_HISTORY] = h
161
				if o:
162
					user_rooms[room_id][VarNames.LOAD_MESSAGES_OFFLINE] = o
163
			user_dict = {}
164
			for user in User.objects.values('id', 'username', 'sex'):
165
				user_dict[user['id']] = RedisPrefix.set_js_user_structure(user['username'], user['sex'])
166
			if self.user_id not in online:
167
				online.append(self.user_id)
168
			self.ws_write(self.set_room(user_rooms, user_dict, online))
169
			if not was_online:  # if a new tab has been opened
170
				online_user_names_mes = self.room_online(online, Actions.LOGIN)
171
				self.logger.info('!! First tab, sending refresh online for all')
172
				self.publish(online_user_names_mes, settings.ALL_ROOM_ID)
173
			self.logger.info("!! User %s subscribes for %s", self.sender_name, self.channels)
174
			self.connected = True
175
		else:
176
			self.logger.warning('!! Session key %s has been rejected', str(session_key))
177
			self.close(403, "Session key %s has been rejected" % session_key)
178
179
	def get_offline_messages(self, user_rooms, was_online, with_history):
180
		q_objects = get_history_message_query(self.get_argument('messages', None), user_rooms, with_history)
181
		if was_online:
182
			off_messages = []
183
		else:
184
			off_messages = Message.objects.filter(
185
				id__gt=F('room__roomusers__last_read_message_id'),
186
				room__roomusers__user_id=self.user_id
187
			)
188
		off = {}
189
		history = {}
190
		if len(q_objects.children) > 0:
191
			history_messages = Message.objects.filter(q_objects)
192
			all = list(chain(off_messages, history_messages))
193
			self.logger.info("Offline messages IDs: %s, history messages: %s", [m.id for m in off_messages], [m.id for m in history_messages])
194
		else:
195
			history_messages = []
196
			all = off_messages
197
		if self.restored_connection:
198
			off_messages = all
199
			history_messages = []
200
		imv = get_message_images_videos(all)
201
		self.set_video_images_messages(imv, off_messages, off)
202
		self.set_video_images_messages(imv, history_messages, history)
203
		return off, history
204
205
	def set_video_images_messages(self, imv, inm, outm):
206
		for message in inm:
207
			files = MessagesCreator.prepare_img_video(imv, message.id)
208
			prep_m = self.create_message(message, files)
209
			outm.setdefault(message.room_id, []).append(prep_m)
210
211
	def check_origin(self, origin):
212
		"""
213
		check whether browser set domain matches origin
214
		"""
215
		parsed_origin = urlparse(origin)
216
		origin = parsed_origin.netloc
217
		origin_domain = origin.split(':')[0].lower()
218
		browser_set = self.request.headers.get("Host")
219
		browser_domain = browser_set.split(':')[0]
220
		return browser_domain == origin_domain
221
222
	def save_ip(self):
223
		"""
224
		This code is not used anymore
225
		"""
226
		if not do_db(UserJoinedInfo.objects.filter(
227
				Q(ip__ip=self.ip) & Q(user_id=self.user_id)).exists):
228
			res = get_or_create_ip_wrapper(self.ip, self.logger, self.fetch_and_save_ip_http)
229
			if res is not None:
230
				UserJoinedInfo.objects.create(ip=res, user_id=self.user_id)
231
232
	@asynchronous
233
	def fetch_and_save_ip_http(self):
234
		"""
235
			This code is not used anymore
236
		"""
237
		def fetch_response(response):
238
			try:
239
				ip_record = create_ip_structure(self.ip, response.body)
240
			except Exception as e:
241
				self.logger.error("Error while creating ip with country info, because %s", e)
242
				ip_record = IpAddress.objects.create(ip=self.ip)
243
			UserJoinedInfo.objects.create(ip=ip_record, user_id=self.user_id)
244
		r = HTTPRequest(settings.IP_API_URL % self.ip, method="GET")
245
		self.http_client.fetch(r, callback=fetch_response)
246
247
	def ws_write(self, message):
248
		"""
249
		Tries to send message, doesn't throw exception outside
250
		:type self: MessagesHandler
251
		:type message object
252
		"""
253
		# self.logger.debug('<< THREAD %s >>', os.getppid())
254
		try:
255
			if isinstance(message, dict):
256
				message = json.dumps(message)
257
			if not isinstance(message, str_type):
258
				raise ValueError('Wrong message type : %s' % str(message))
259
			self.logger.debug(">> %.1000s", message)
260
			self.write_message(message)
261
		except WebSocketClosedError as e:
262
			self.logger.warning("%s. Can't send message << %s >> ", e, str(message))
263
264
	def get_client_ip(self):
265
		return self.request.headers.get("X-Real-IP") or self.request.remote_ip