Completed
Push — master ( 0e978b...bede48 )
by Andrew
31s
created

TornadoHandler.open()   B

Complexity

Conditions 5

Size

Total Lines 43

Duplication

Lines 0
Ratio 0 %

Importance

Changes 3
Bugs 0 Features 0
Metric Value
cc 5
dl 0
loc 43
rs 8.0894
c 3
b 0
f 0
1
import json
2
import logging
3
from datetime import timedelta
4
from itertools import chain
5
from numbers import Number
6
from threading import Thread
7
8
from django.conf import settings
9
from django.core.exceptions import ValidationError
10
from django.db.models import F, Q
11
from redis_sessions.session import SessionStore
12
from tornado import ioloop
13
from tornado.httpclient import AsyncHTTPClient, HTTPRequest
14
from tornado.web import asynchronous
15
from tornado.websocket import WebSocketHandler, WebSocketClosedError
16
17
from chat.cookies_middleware import create_id
18
from chat.models import User, Message, UserJoinedInfo, IpAddress
19
from chat.py2_3 import str_type, urlparse
20
from chat.tornado.anti_spam import AntiSpam
21
from chat.tornado.constants import VarNames, HandlerNames, Actions
22
from chat.tornado.message_creator import MessagesCreator
23
from chat.tornado.message_handler import MessagesHandler, WebRtcMessageHandler
24
from chat.utils import execute_query, do_db, get_or_create_ip, get_users_in_current_user_rooms, \
25
	get_message_images_videos, get_or_create_ip_wrapper, create_ip_structure, get_history_message_query
26
27
sessionStore = SessionStore()
28
29
parent_logger = logging.getLogger(__name__)
30
31
32
class TornadoHandler(WebSocketHandler, WebRtcMessageHandler):
33
34
	def __init__(self, *args, **kwargs):
35
		super(TornadoHandler, self).__init__(*args, **kwargs)
36
		self.__connected__ = False
37
		self.restored_connection = False
38
		self.__http_client__ = AsyncHTTPClient()
39
		self.anti_spam = AntiSpam()
40
41
	@property
42
	def connected(self):
43
		return self.__connected__
44
45
	@connected.setter
46
	def connected(self, value):
47
		self.__connected__ = value
48
49
	@property
50
	def http_client(self):
51
		"""
52
		@type: AsyncHTTPClient
53
		"""
54
		return self.__http_client__
55
56
	def data_received(self, chunk):
57
		pass
58
59
	def on_message(self, json_message):
60
		try:
61
			if not self.connected:
62
				raise ValidationError('Skipping message %s, as websocket is not initialized yet' % json_message)
63
			if not json_message:
64
				raise Exception('Skipping null message')
65
			# self.anti_spam.check_spam(json_message)
66
			self.logger.debug('<< %.1000s', json_message)
67
			message = json.loads(json_message)
68
			if message[VarNames.EVENT] not in self.process_ws_message:
69
				raise Exception("event {} is unknown".format(message[VarNames.EVENT]))
70
			channel = message.get(VarNames.CHANNEL)
71
			if channel and channel not in self.channels:
72
				raise ValidationError('Access denied for channel {}. Allowed channels: {}'.format(channel, self.channels))
73
			self.process_ws_message[message[VarNames.EVENT]](message)
74
		except ValidationError as e:
75
			error_message = self.default(str(e.message), Actions.GROWL_MESSAGE, HandlerNames.GROWL)
76
			self.ws_write(error_message)
77
78
	def publish_logout(self, channel, log_data):
79
		# seems like async solves problem with connection lost and wrong data status
80
		# http://programmers.stackexchange.com/questions/294663/how-to-store-online-status
81
		is_online, online = self.get_online_and_status_from_redis(channel)
82
		log_data[channel] = {'online': online, 'is_online': is_online}
83
		if not is_online:
84
			message = self.room_online(online, Actions.LOGOUT, channel)
85
			self.publish(message, channel)
86
			return True
87
88
	def on_close(self):
89
		if self.async_redis.subscribed:
90
			self.logger.info("Close event, unsubscribing from %s", self.channels)
91
			self.async_redis.unsubscribe(self.channels)
92
		else:
93
			self.logger.info("Close event, not subscribed, channels: %s", self.channels)
94
		log_data = {}
95
		for channel in self.channels:
96
			if not isinstance(channel, Number):
97
				continue
98
			self.sync_redis.srem(channel, self.id)
99
			if self.connected:
100
				self.publish_logout(channel, log_data)
101
		if self.connected:
102
			res = do_db(execute_query, settings.UPDATE_LAST_READ_MESSAGE, [self.user_id, ])
103
			self.logger.info("Updated %s last read message", res)
104
		self.disconnect(json.dumps(log_data))
105
106
	def disconnect(self, log_data, tries=0):
107
		"""
108
		Closes a connection if it's not in proggress, otherwice timeouts closing
109
		https://github.com/evilkost/brukva/issues/25#issuecomment-9468227
110
		"""
111
		self.connected = False
112
		self.closed_channels = self.channels
113
		self.channels = []
114
		if self.async_redis.connection.in_progress and tries < 1000:  # failsafe eternal loop
115
			self.logger.debug('Closing a connection timeouts')
116
			ioloop.IOLoop.instance().add_timeout(timedelta(0.00001), self.disconnect, log_data, tries+1)
117
		else:
118
			self.logger.info("Close connection result: %s", log_data)
119
			self.async_redis.disconnect()
120
121
	def generate_self_id(self):
122
		"""
123
		When user opens new tab in browser wsHandler.wsConnectionId stores Id of current ws
124
		So if ws loses a connection it still can reconnect with same id,
125
		and TornadoHandler can restore webrtc_connections to previous state
126
		"""
127
		conn_arg = self.get_argument('id', None)
128
		self.id, random = create_id(self.user_id, conn_arg)
129
		if random != conn_arg:
130
			self.restored_connection = False
131
			self.ws_write(self.set_ws_id(random, self.id))
132
		else:
133
			self.restored_connection = True
134
135
	def open(self):
136
		session_key = self.get_cookie(settings.SESSION_COOKIE_NAME)
137
		if sessionStore.exists(session_key):
138
			self.ip = self.get_client_ip()
139
			session = SessionStore(session_key)
140
			self.user_id = int(session["_auth_user_id"])
141
			self.generate_self_id()
142
			log_params = {
143
				'id': self.id,
144
				'ip': self.ip
145
			}
146
			self._logger = logging.LoggerAdapter(parent_logger, log_params)
147
			cookies = ["{}={}".format(k, self.request.cookies[k].value) for k in self.request.cookies]
148
			self.logger.debug("!! Incoming connection, session %s, thread hash %s, cookies: %s", session_key, self.id, ";".join(cookies))
149
			self.async_redis.connect()
150
			user_db = do_db(User.objects.get, id=self.user_id)
151
			self.sender_name = user_db.username
152
			self.sex = user_db.sex_str
153
			user_rooms = get_users_in_current_user_rooms(self.user_id)
154
			# get all missed messages
155
			self.channels = []  # py2 doesn't support clear()
156
			self.channels.append(self.channel)
157
			self.channels.append(self.id)
158
			rooms_online = {}
159
			was_online = False
160
			for room_id in user_rooms:
161
				self.channels.append(room_id)
162
				rooms_online[room_id] = self.get_is_online(room_id)
163
				was_online = was_online or rooms_online[room_id][0]
164
			self.listen(self.channels)
165
			off_messages, history = self.get_offline_messages(user_rooms, was_online)
166
			for room_id in user_rooms:
167
				self.get_is_online(room_id)
168
				is_online = self.add_online_user(room_id, rooms_online[room_id][0], rooms_online[room_id][1])
169
				user_rooms[room_id][VarNames.LOAD_MESSAGES_HISTORY] = history.get(room_id)
170
				user_rooms[room_id][VarNames.LOAD_MESSAGES_OFFLINE] = off_messages.get(room_id)
171
			self.ws_write(self.set_room(user_rooms))
172
			self.logger.info("!! User %s subscribes for %s", self.sender_name, self.channels)
173
			self.connected = True
174
			# self.save_ip()
175
		else:
176
			self.logger.warning('!! Session key %s has been rejected', str(session_key))
177
			self.close(403, "Session key %s has been rejected" % session_key)
178
179
	def get_offline_messages(self, user_rooms, was_online):
180
		q_objects = get_history_message_query(self.get_argument('messages', None), user_rooms, self.restored_connection)
181
		if was_online:
182
			off_messages = []
183
		else:
184
			off_messages = Message.objects.filter(
185
				id__gt=F('room__roomusers__last_read_message_id'),
186
				deleted=False,
187
				room__roomusers__user_id=self.user_id
188
			)
189
		off = {}
190
		history = {}
191
		if len(q_objects.children) > 0:
192
			history_messages = Message.objects.filter(q_objects)
193
			all = list(chain(off_messages, history_messages))
194
			self.logger.info("Offline messages IDs: %s, history messages: %s", [m.id for m in off_messages], [m.id for m in history_messages])
195
		else:
196
			history_messages = []
197
			all = off_messages
198
		if self.restored_connection:
199
			off_messages = all
200
			history_messages = []
201
		imv = get_message_images_videos(all)
202
		self.set_video_images_messages(imv, off_messages, off)
203
		self.set_video_images_messages(imv, history_messages, history)
204
		return off, history
205
206
	def set_video_images_messages(self, imv, inm, outm):
207
		for message in inm:
208
			files = MessagesCreator.prepare_img_video(imv, message.id)
209
			prep_m = self.create_message(message, files)
210
			outm.setdefault(message.room_id, []).append(prep_m)
211
212
	def check_origin(self, origin):
213
		"""
214
		check whether browser set domain matches origin
215
		"""
216
		parsed_origin = urlparse(origin)
217
		origin = parsed_origin.netloc
218
		origin_domain = origin.split(':')[0].lower()
219
		browser_set = self.request.headers.get("Host")
220
		browser_domain = browser_set.split(':')[0]
221
		return browser_domain == origin_domain
222
223
	def save_ip(self):
224
		"""
225
		This code is not used anymore
226
		"""
227
		if not do_db(UserJoinedInfo.objects.filter(
228
				Q(ip__ip=self.ip) & Q(user_id=self.user_id)).exists):
229
			res = get_or_create_ip_wrapper(self.ip, self.logger, self.fetch_and_save_ip_http)
230
			if res is not None:
231
				UserJoinedInfo.objects.create(ip=res, user_id=self.user_id)
232
233
	@asynchronous
234
	def fetch_and_save_ip_http(self):
235
		"""
236
			This code is not used anymore
237
		"""
238
		def fetch_response(response):
239
			try:
240
				ip_record = create_ip_structure(self.ip, response.body)
241
			except Exception as e:
242
				self.logger.error("Error while creating ip with country info, because %s", e)
243
				ip_record = IpAddress.objects.create(ip=self.ip)
244
			UserJoinedInfo.objects.create(ip=ip_record, user_id=self.user_id)
245
		r = HTTPRequest(settings.IP_API_URL % self.ip, method="GET")
246
		self.http_client.fetch(r, callback=fetch_response)
247
248
	def ws_write(self, message):
249
		"""
250
		Tries to send message, doesn't throw exception outside
251
		:type self: MessagesHandler
252
		:type message object
253
		"""
254
		# self.logger.debug('<< THREAD %s >>', os.getppid())
255
		try:
256
			if isinstance(message, dict):
257
				message = json.dumps(message)
258
			if not isinstance(message, str_type):
259
				raise ValueError('Wrong message type : %s' % str(message))
260
			self.logger.debug(">> %.1000s", message)
261
			self.write_message(message)
262
		except WebSocketClosedError as e:
263
			self.logger.warning("%s. Can't send message << %s >> ", e, str(message))
264
265
	def get_client_ip(self):
266
		return self.request.headers.get("X-Real-IP") or self.request.remote_ip