Completed
Push — master ( 15eca4...cb1015 )
by Andrew
01:31 queued 13s
created

TornadoHandler.publish_logout()   A

Complexity

Conditions 2

Size

Total Lines 9

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
cc 2
dl 0
loc 9
rs 9.6666
c 0
b 0
f 0
1
import json
2
import logging
3
from datetime import timedelta
4
from itertools import chain
5
from numbers import Number
6
from threading import Thread
7
8
from django.conf import settings
9
from django.core.exceptions import ValidationError
10
from django.db.models import F, Q
11
from redis_sessions.session import SessionStore
12
from tornado import ioloop
13
from tornado.httpclient import AsyncHTTPClient, HTTPRequest
14
from tornado.web import asynchronous
15
from tornado.websocket import WebSocketHandler, WebSocketClosedError
16
17
from chat.cookies_middleware import create_id
18
from chat.models import User, Message, UserJoinedInfo, IpAddress
19
from chat.py2_3 import str_type, urlparse
20
from chat.tornado.anti_spam import AntiSpam
21
from chat.tornado.constants import VarNames, HandlerNames, Actions
22
from chat.tornado.message_handler import MessagesHandler, WebRtcMessageHandler
23
from chat.utils import execute_query, do_db, get_or_create_ip, get_users_in_current_user_rooms, get_message_images, \
24
	prepare_img, get_or_create_ip_wrapper, create_ip_structure
25
26
sessionStore = SessionStore()
27
28
parent_logger = logging.getLogger(__name__)
29
30
31
class TornadoHandler(WebSocketHandler, WebRtcMessageHandler):
32
33
	def __init__(self, *args, **kwargs):
34
		super(TornadoHandler, self).__init__(*args, **kwargs)
35
		self.__connected__ = False
36
		self.__http_client__ = AsyncHTTPClient()
37
		self.anti_spam = AntiSpam()
38
39
	@property
40
	def connected(self):
41
		return self.__connected__
42
43
	@connected.setter
44
	def connected(self, value):
45
		self.__connected__ = value
46
47
	@property
48
	def http_client(self):
49
		"""
50
		@type: AsyncHTTPClient
51
		"""
52
		return self.__http_client__
53
54
	def data_received(self, chunk):
55
		pass
56
57
	def on_message(self, json_message):
58
		try:
59
			if not self.connected:
60
				raise ValidationError('Skipping message %s, as websocket is not initialized yet' % json_message)
61
			if not json_message:
62
				raise Exception('Skipping null message')
63
			# self.anti_spam.check_spam(json_message)
64
			self.logger.debug('<< %.1000s', json_message)
65
			message = json.loads(json_message)
66
			if message[VarNames.EVENT] not in self.pre_process_message:
67
				raise Exception("event {} is unknown".format(message[VarNames.EVENT]))
68
			channel = message.get(VarNames.CHANNEL)
69
			if channel and channel not in self.channels:
70
				raise ValidationError('Access denied for channel {}. Allowed channels: {}'.format(channel, self.channels))
71
			self.pre_process_message[message[VarNames.EVENT]](message)
72
		except ValidationError as e:
73
			error_message = self.default(str(e.message), Actions.GROWL_MESSAGE, HandlerNames.GROWL)
74
			self.ws_write(error_message)
75
76
	def publish_logout(self, channel, log_data):
77
		# seems like async solves problem with connection lost and wrong data status
78
		# http://programmers.stackexchange.com/questions/294663/how-to-store-online-status
79
		is_online, online = self.get_online_and_status_from_redis(channel)
80
		log_data[channel] = {'online': online, 'is_online': is_online}
81
		if not is_online:
82
			message = self.room_online(online, Actions.LOGOUT, channel)
83
			self.publish(message, channel)
84
			return True
85
86
	def on_close(self):
87
		if self.async_redis.subscribed:
88
			self.logger.info("Close event, unsubscribing from %s", self.channels)
89
			self.async_redis.unsubscribe(self.channels)
90
		else:
91
			self.logger.info("Close event, not subscribed, channels: %s", self.channels)
92
		log_data = {}
93
		gone_offline = False
94
		for channel in self.channels:
95
			if not isinstance(channel, Number):
96
				continue
97
			self.sync_redis.srem(channel, self.id)
98
			if self.connected:
99
				gone_offline = self.publish_logout(channel, log_data) or gone_offline
100
		if gone_offline:
101
			res = do_db(execute_query, settings.UPDATE_LAST_READ_MESSAGE, [self.user_id, ])
102
			self.logger.info("Updated %s last read message", res)
103
		self.disconnect(json.dumps(log_data))
104
105
	def disconnect(self, log_data, tries=0):
106
		"""
107
		Closes a connection if it's not in proggress, otherwice timeouts closing
108
		https://github.com/evilkost/brukva/issues/25#issuecomment-9468227
109
		"""
110
		self.connected = False
111
		self.closed_channels = self.channels
112
		self.channels = []
113
		if self.async_redis.connection.in_progress and tries < 1000:  # failsafe eternal loop
114
			self.logger.debug('Closing a connection timeouts')
115
			ioloop.IOLoop.instance().add_timeout(timedelta(0.00001), self.disconnect, log_data, tries+1)
116
		else:
117
			self.logger.info("Close connection result: %s", log_data)
118
			self.async_redis.disconnect()
119
120
	def generate_self_id(self):
121
		"""
122
		When user opens new tab in browser wsHandler.wsConnectionId stores Id of current ws
123
		So if ws loses a connection it still can reconnect with same id,
124
		and TornadoHandler can restore webrtc_connections to previous state
125
		"""
126
		conn_arg = self.get_argument('id', None)
127
		self.id, random = create_id(self.user_id, conn_arg)
128
		if random != conn_arg:
129
			self.ws_write(self.set_ws_id(random, self.id))
130
131
	def open(self):
132
		session_key = self.get_cookie(settings.SESSION_COOKIE_NAME)
133
		if sessionStore.exists(session_key):
134
			self.ip = self.get_client_ip()
135
			session = SessionStore(session_key)
136
			self.user_id = int(session["_auth_user_id"])
137
			self.generate_self_id()
138
			log_params = {
139
				'id': self.id,
140
				'ip': self.ip
141
			}
142
			self._logger = logging.LoggerAdapter(parent_logger, log_params)
143
			cookies = ["{}={}".format(k, self.request.cookies[k].value) for k in self.request.cookies]
144
			self.logger.debug("!! Incoming connection, session %s, thread hash %s, cookies: %s", session_key, self.id, ";".join(cookies))
145
			self.async_redis.connect()
146
			user_db = do_db(User.objects.get, id=self.user_id)
147
			self.sender_name = user_db.username
148
			self.sex = user_db.sex_str
149
			user_rooms = get_users_in_current_user_rooms(self.user_id)
150
			self.ws_write(self.default(user_rooms, Actions.ROOMS, HandlerNames.CHANNELS))
151
			# get all missed messages
152
			self.channels = []  # py2 doesn't support clear()
153
			self.channels.append(self.channel)
154
			self.channels.append(self.id)
155
			for room_id in user_rooms:
156
				self.channels.append(room_id)
157
			self.listen(self.channels)
158
			off_messages, history = self.get_offline_messages(user_rooms)
159
			for room_id in user_rooms:
160
				is_online = self.add_online_user(room_id)
161
				# there're no offline messages if we're online
162
				offl_mess = [] if is_online else off_messages.get(room_id)
163
				if offl_mess or history.get(room_id):
164
					self.ws_write(self.load_offline_message(offl_mess, history.get(room_id), room_id))
165
			self.logger.info("!! User %s subscribes for %s", self.sender_name, self.channels)
166
			self.connected = True
167
			self.save_ip()
168
		else:
169
			self.logger.warning('!! Session key %s has been rejected', str(session_key))
170
			self.close(403, "Session key %s has been rejected" % session_key)
171
172
	def get_offline_messages(self, user_rooms):
173
		q_objects = Q()
174
		for room_id in user_rooms:
175
			c = self.get_cookie(str(room_id))
176
			if c is not None:
177
				q_objects.add(Q(id__gte=c, room_id=room_id), Q.OR)
178
		off_messages = Message.objects.filter(
179
			id__gt=F('room__roomusers__last_read_message_id'),
180
			deleted=False,
181
			room__roomusers__user_id=self.user_id
182
		)
183
		off = {}
184
		history = {}
185
		if len(q_objects.children) > 0:
186
			history_messages = Message.objects.filter(q_objects)
187
			all = list(chain(off_messages, history_messages))
188
		else:
189
			history_messages = []
190
			all = off_messages
191
		images = get_message_images(all)
192
		for message in off_messages:
193
			prep_m = self.create_message(message, prepare_img(images, message.id))
194
			off.setdefault(message.room_id, []).append(prep_m)
195
		for message in history_messages:
196
			prep_m = self.create_message(message, prepare_img(images, message.id))
197
			history.setdefault(message.room_id, []).append(prep_m)
198
		return off, history
199
200
	def check_origin(self, origin):
201
		"""
202
		check whether browser set domain matches origin
203
		"""
204
		parsed_origin = urlparse(origin)
205
		origin = parsed_origin.netloc
206
		origin_domain = origin.split(':')[0].lower()
207
		browser_set = self.request.headers.get("Host")
208
		browser_domain = browser_set.split(':')[0]
209
		return browser_domain == origin_domain
210
211
	def save_ip(self):
212
		if not do_db(UserJoinedInfo.objects.filter(
213
				Q(ip__ip=self.ip) & Q(user_id=self.user_id)).exists):
214
			res = get_or_create_ip_wrapper(self.ip, self.logger, self.fetch_and_save_ip_http)
215
			if res is not None:
216
				UserJoinedInfo.objects.create(ip=res, user_id=self.user_id)
217
218
	@asynchronous
219
	def fetch_and_save_ip_http(self):
220
		def fetch_response(response):
221
			try:
222
				ip_record = create_ip_structure(self.ip, response.body)
223
			except Exception as e:
224
				self.logger.error("Error while creating ip with country info, because %s", e)
225
				ip_record = IpAddress.objects.create(ip=self.ip)
226
			UserJoinedInfo.objects.create(ip=ip_record, user_id=self.user_id)
227
		r = HTTPRequest(settings.IP_API_URL % self.ip, method="GET")
228
		self.http_client.fetch(r, callback=fetch_response)
229
230
	def ws_write(self, message):
231
		"""
232
		Tries to send message, doesn't throw exception outside
233
		:type self: MessagesHandler
234
		:type message object
235
		"""
236
		# self.logger.debug('<< THREAD %s >>', os.getppid())
237
		try:
238
			if isinstance(message, dict):
239
				message = json.dumps(message)
240
			if not isinstance(message, str_type):
241
				raise ValueError('Wrong message type : %s' % str(message))
242
			self.logger.debug(">> %.1000s", message)
243
			self.write_message(message)
244
		except WebSocketClosedError as e:
245
			self.logger.error("%s. Can't send << %s >> message", e, str(message))
246
247
	def get_client_ip(self):
248
		return self.request.headers.get("X-Real-IP") or self.request.remote_ip