Completed
Push — master ( 7365f5...41cb1b )
by Andrew
33s
created

TornadoHandler   B

Complexity

Total Complexity 38

Size/Duplication

Total Lines 181
Duplicated Lines 0 %

Importance

Changes 9
Bugs 0 Features 0
Metric Value
c 9
b 0
f 0
dl 0
loc 181
rs 8.3999
wmc 38

14 Methods

Rating   Name   Duplication   Size   Complexity  
A data_received() 0 2 1
A save_ip() 0 8 2
B on_close() 0 18 6
A check_origin() 0 10 1
A disconnect() 0 14 3
A ws_write() 0 16 4
A generate_self_id() 0 10 2
A __init__() 0 4 1
B on_message() 0 18 7
A get_offline_messages() 0 12 2
A connected() 0 3 1
A publish_logout() 0 9 2
A get_client_ip() 0 2 1
B open() 0 35 4
1
import json
2
import logging
3
from datetime import timedelta
4
from numbers import Number
5
from threading import Thread
6
7
from django.conf import settings
8
from django.core.exceptions import ValidationError
9
from django.db.models import F, Q
10
from redis_sessions.session import SessionStore
11
from tornado import ioloop
12
from tornado.websocket import WebSocketHandler, WebSocketClosedError
13
14
from chat.cookies_middleware import create_id
15
from chat.models import User, Message, UserJoinedInfo
16
from chat.py2_3 import str_type, urlparse
17
from chat.settings import UPDATE_LAST_READ_MESSAGE
18
from chat.tornado.anti_spam import AntiSpam
19
from chat.tornado.constants import VarNames, HandlerNames, Actions
20
from chat.tornado.image_utils import get_message_images, prepare_img
21
from chat.tornado.message_handler import MessagesHandler
22
from chat.utils import execute_query, do_db, get_or_create_ip, get_users_in_current_user_rooms
23
24
sessionStore = SessionStore()
25
26
parent_logger = logging.getLogger(__name__)
27
28
29
class TornadoHandler(WebSocketHandler, MessagesHandler):
30
31
	def __init__(self, *args, **kwargs):
32
		super(TornadoHandler, self).__init__(*args, **kwargs)
33
		self.__connected__ = False
34
		self.anti_spam = AntiSpam()
35
36
	@property
37
	def connected(self):
38
		return self.__connected__
39
40
	@connected.setter
41
	def connected(self, value):
42
		self.__connected__ = value
43
44
	def data_received(self, chunk):
45
		pass
46
47
	def on_message(self, json_message):
48
		try:
49
			if not self.connected:
50
				raise ValidationError('Skipping message %s, as websocket is not initialized yet' % json_message)
51
			if not json_message:
52
				raise Exception('Skipping null message')
53
			# self.anti_spam.check_spam(json_message)
54
			self.logger.debug('<< %.1000s', json_message)
55
			message = json.loads(json_message)
56
			if message[VarNames.EVENT] not in self.pre_process_message:
57
				raise Exception("event {} is unknown".format(message[VarNames.EVENT]))
58
			channel = message.get(VarNames.CHANNEL)
59
			if channel and channel not in self.channels:
60
				raise Exception('Access denied for channel {}. Allowed channels: {}'.format(channel, self.channels))
61
			self.pre_process_message[message[VarNames.EVENT]](message)
62
		except ValidationError as e:
63
			error_message = self.default(str(e.message), Actions.GROWL_MESSAGE, HandlerNames.GROWL)
64
			self.ws_write(error_message)
65
66
	def publish_logout(self, channel, log_data):
67
		# seems like async solves problem with connection lost and wrong data status
68
		# http://programmers.stackexchange.com/questions/294663/how-to-store-online-status
69
		online, is_online = self.get_online_from_redis(channel, True)
70
		log_data[channel] = {'online': online, 'is_online': is_online}
71
		if not is_online:
72
			message = self.room_online(online, Actions.LOGOUT, channel)
73
			self.publish(message, channel)
74
			return True
75
76
	def on_close(self):
77
		if self.async_redis.subscribed:
78
			self.logger.info("Close event, unsubscribing from %s", self.channels)
79
			self.async_redis.unsubscribe(self.channels)
80
		else:
81
			self.logger.info("Close event, not subscribed, channels: %s", self.channels)
82
		log_data = {}
83
		gone_offline = False
84
		for channel in self.channels:
85
			if not isinstance(channel, Number):
86
				continue
87
			self.sync_redis.srem(channel, self.id)
88
			if self.connected:
89
				gone_offline = self.publish_logout(channel, log_data) or gone_offline
90
		if gone_offline:
91
			res = do_db(execute_query, UPDATE_LAST_READ_MESSAGE, [self.user_id, ])
92
			self.logger.info("Updated %s last read message", res)
93
		self.disconnect(json.dumps(log_data))
94
95
	def disconnect(self, log_data, tries=0):
96
		"""
97
		Closes a connection if it's not in proggress, otherwice timeouts closing
98
		https://github.com/evilkost/brukva/issues/25#issuecomment-9468227
99
		"""
100
		self.connected = False
101
		self.closed_channels = self.channels
102
		self.channels = []
103
		if self.async_redis.connection.in_progress and tries < 1000:  # failsafe eternal loop
104
			self.logger.debug('Closing a connection timeouts')
105
			ioloop.IOLoop.instance().add_timeout(timedelta(0.00001), self.disconnect, log_data, tries+1)
106
		else:
107
			self.logger.info("Close connection result: %s", log_data)
108
			self.async_redis.disconnect()
109
110
	def generate_self_id(self):
111
		"""
112
		When user opens new tab in browser wsHandler.wsConnectionId stores Id of current ws
113
		So if ws loses a connection it still can reconnect with same id,
114
		and TornadoHandler can restore webrtc_connections to previous state
115
		"""
116
		conn_arg = self.get_argument('id', None)
117
		self.id, random = create_id(self.user_id, conn_arg)
118
		if random != conn_arg:
119
			self.ws_write(self.set_ws_id(random, self.id))
120
121
	def open(self):
122
		session_key = self.get_cookie(settings.SESSION_COOKIE_NAME)
123
		if sessionStore.exists(session_key):
124
			self.ip = self.get_client_ip()
125
			session = SessionStore(session_key)
126
			self.user_id = int(session["_auth_user_id"])
127
			self.generate_self_id()
128
			log_params = {
129
				'id': self.id,
130
				'ip': self.ip
131
			}
132
			self._logger = logging.LoggerAdapter(parent_logger, log_params)
133
			self.logger.debug("!! Incoming connection, session %s, thread hash %s", session_key, self.id)
134
			self.async_redis.connect()
135
			user_db = do_db(User.objects.get, id=self.user_id)
136
			self.sender_name = user_db.username
137
			self.sex = user_db.sex_str
138
			user_rooms = get_users_in_current_user_rooms(self.user_id)
139
			self.ws_write(self.default(user_rooms, Actions.ROOMS, HandlerNames.CHANNELS))
140
			# get all missed messages
141
			self.channels = []  # py2 doesn't support clear()
142
			self.channels.append(self.channel)
143
			self.channels.append(self.id)
144
			for room_id in user_rooms:
145
				self.channels.append(room_id)
146
			self.listen(self.channels)
147
			off_messages = self.get_offline_messages()
148
			for room_id in user_rooms:
149
				self.add_online_user(room_id, off_messages.get(room_id))
150
			self.logger.info("!! User %s subscribes for %s", self.sender_name, self.channels)
151
			self.connected = True
152
			Thread(target=self.save_ip).start()
153
		else:
154
			self.logger.warning('!! Session key %s has been rejected', str(session_key))
155
			self.close(403, "Session key %s has been rejected" % session_key)
156
157
	def get_offline_messages(self):
158
		res = {}
159
		off_mess = Message.objects.filter(
160
			id__gt=F('room__roomusers__last_read_message_id'),
161
			deleted=False,
162
			room__roomusers__user_id=self.user_id
163
		)
164
		images = do_db(get_message_images, off_mess)
165
		for message in off_mess:
166
			prep_m = self.create_message(message, prepare_img(images, message.id))
167
			res.setdefault(message.room_id, []).append(prep_m)
168
		return res
169
170
	def check_origin(self, origin):
171
		"""
172
		check whether browser set domain matches origin
173
		"""
174
		parsed_origin = urlparse(origin)
175
		origin = parsed_origin.netloc
176
		origin_domain = origin.split(':')[0].lower()
177
		browser_set = self.request.headers.get("Host")
178
		browser_domain = browser_set.split(':')[0]
179
		return browser_domain == origin_domain
180
181
	def save_ip(self):
182
		if (do_db(UserJoinedInfo.objects.filter(
183
					Q(ip__ip=self.ip) & Q(user_id=self.user_id)).exists)):
184
			return
185
		ip_address = get_or_create_ip(self.ip, self.logger)
186
		UserJoinedInfo.objects.create(
187
			ip=ip_address,
188
			user_id=self.user_id
189
		)
190
191
	def ws_write(self, message):
192
		"""
193
		Tries to send message, doesn't throw exception outside
194
		:type self: MessagesHandler
195
		:type message object
196
		"""
197
		# self.logger.debug('<< THREAD %s >>', os.getppid())
198
		try:
199
			if isinstance(message, dict):
200
				message = json.dumps(message)
201
			if not isinstance(message, str_type):
202
				raise ValueError('Wrong message type : %s' % str(message))
203
			self.logger.debug(">> %.1000s", message)
204
			self.write_message(message)
205
		except WebSocketClosedError as e:
206
			self.logger.error("%s. Can't send << %s >> message", e, str(message))
207
208
	def get_client_ip(self):
209
		return self.request.headers.get("X-Real-IP") or self.request.remote_ip