Completed
Push — master ( 633e29...acef0c )
by Andrew
36s
created

TornadoHandler.get_offline_messages()   C

Complexity

Conditions 7

Size

Total Lines 36

Duplication

Lines 0
Ratio 0 %

Importance

Changes 2
Bugs 0 Features 0
Metric Value
cc 7
c 2
b 0
f 0
dl 0
loc 36
rs 5.5
1
import json
2
import logging
3
from datetime import timedelta
4
from itertools import chain
5
from numbers import Number
6
from threading import Thread
7
8
from django.conf import settings
9
from django.core.exceptions import ValidationError
10
from django.db.models import F, Q
11
from redis_sessions.session import SessionStore
12
from tornado import ioloop
13
from tornado.httpclient import AsyncHTTPClient, HTTPRequest
14
from tornado.web import asynchronous
15
from tornado.websocket import WebSocketHandler, WebSocketClosedError
16
17
from chat.cookies_middleware import create_id
18
from chat.models import User, Message, UserJoinedInfo, IpAddress
19
from chat.py2_3 import str_type, urlparse
20
from chat.tornado.anti_spam import AntiSpam
21
from chat.tornado.constants import VarNames, HandlerNames, Actions
22
from chat.tornado.message_creator import MessagesCreator
23
from chat.tornado.message_handler import MessagesHandler, WebRtcMessageHandler
24
from chat.utils import execute_query, do_db, get_or_create_ip, get_users_in_current_user_rooms, get_message_images_videos, get_or_create_ip_wrapper, create_ip_structure
25
26
sessionStore = SessionStore()
27
28
parent_logger = logging.getLogger(__name__)
29
30
31
class TornadoHandler(WebSocketHandler, WebRtcMessageHandler):
32
33
	def __init__(self, *args, **kwargs):
34
		super(TornadoHandler, self).__init__(*args, **kwargs)
35
		self.__connected__ = False
36
		self.restored_connection = False
37
		self.__http_client__ = AsyncHTTPClient()
38
		self.anti_spam = AntiSpam()
39
40
	@property
41
	def connected(self):
42
		return self.__connected__
43
44
	@connected.setter
45
	def connected(self, value):
46
		self.__connected__ = value
47
48
	@property
49
	def http_client(self):
50
		"""
51
		@type: AsyncHTTPClient
52
		"""
53
		return self.__http_client__
54
55
	def data_received(self, chunk):
56
		pass
57
58
	def on_message(self, json_message):
59
		try:
60
			if not self.connected:
61
				raise ValidationError('Skipping message %s, as websocket is not initialized yet' % json_message)
62
			if not json_message:
63
				raise Exception('Skipping null message')
64
			# self.anti_spam.check_spam(json_message)
65
			self.logger.debug('<< %.1000s', json_message)
66
			message = json.loads(json_message)
67
			if message[VarNames.EVENT] not in self.process_ws_message:
68
				raise Exception("event {} is unknown".format(message[VarNames.EVENT]))
69
			channel = message.get(VarNames.CHANNEL)
70
			if channel and channel not in self.channels:
71
				raise ValidationError('Access denied for channel {}. Allowed channels: {}'.format(channel, self.channels))
72
			self.process_ws_message[message[VarNames.EVENT]](message)
73
		except ValidationError as e:
74
			error_message = self.default(str(e.message), Actions.GROWL_MESSAGE, HandlerNames.GROWL)
75
			self.ws_write(error_message)
76
77
	def publish_logout(self, channel, log_data):
78
		# seems like async solves problem with connection lost and wrong data status
79
		# http://programmers.stackexchange.com/questions/294663/how-to-store-online-status
80
		is_online, online = self.get_online_and_status_from_redis(channel)
81
		log_data[channel] = {'online': online, 'is_online': is_online}
82
		if not is_online:
83
			message = self.room_online(online, Actions.LOGOUT, channel)
84
			self.publish(message, channel)
85
			return True
86
87
	def on_close(self):
88
		if self.async_redis.subscribed:
89
			self.logger.info("Close event, unsubscribing from %s", self.channels)
90
			self.async_redis.unsubscribe(self.channels)
91
		else:
92
			self.logger.info("Close event, not subscribed, channels: %s", self.channels)
93
		log_data = {}
94
		gone_offline = False
95
		for channel in self.channels:
96
			if not isinstance(channel, Number):
97
				continue
98
			self.sync_redis.srem(channel, self.id)
99
			if self.connected:
100
				gone_offline = self.publish_logout(channel, log_data) or gone_offline
101
		if gone_offline:
102
			res = do_db(execute_query, settings.UPDATE_LAST_READ_MESSAGE, [self.user_id, ])
103
			self.logger.info("Updated %s last read message", res)
104
		self.disconnect(json.dumps(log_data))
105
106
	def disconnect(self, log_data, tries=0):
107
		"""
108
		Closes a connection if it's not in proggress, otherwice timeouts closing
109
		https://github.com/evilkost/brukva/issues/25#issuecomment-9468227
110
		"""
111
		self.connected = False
112
		self.closed_channels = self.channels
113
		self.channels = []
114
		if self.async_redis.connection.in_progress and tries < 1000:  # failsafe eternal loop
115
			self.logger.debug('Closing a connection timeouts')
116
			ioloop.IOLoop.instance().add_timeout(timedelta(0.00001), self.disconnect, log_data, tries+1)
117
		else:
118
			self.logger.info("Close connection result: %s", log_data)
119
			self.async_redis.disconnect()
120
121
	def generate_self_id(self):
122
		"""
123
		When user opens new tab in browser wsHandler.wsConnectionId stores Id of current ws
124
		So if ws loses a connection it still can reconnect with same id,
125
		and TornadoHandler can restore webrtc_connections to previous state
126
		"""
127
		conn_arg = self.get_argument('id', None)
128
		self.id, random = create_id(self.user_id, conn_arg)
129
		if random != conn_arg:
130
			self.restored_connection = False
131
			self.ws_write(self.set_ws_id(random, self.id))
132
		else:
133
			self.restored_connection = True
134
135
	def open(self):
136
		session_key = self.get_cookie(settings.SESSION_COOKIE_NAME)
137
		if sessionStore.exists(session_key):
138
			self.ip = self.get_client_ip()
139
			session = SessionStore(session_key)
140
			self.user_id = int(session["_auth_user_id"])
141
			self.generate_self_id()
142
			log_params = {
143
				'id': self.id,
144
				'ip': self.ip
145
			}
146
			self._logger = logging.LoggerAdapter(parent_logger, log_params)
147
			cookies = ["{}={}".format(k, self.request.cookies[k].value) for k in self.request.cookies]
148
			self.logger.debug("!! Incoming connection, session %s, thread hash %s, cookies: %s", session_key, self.id, ";".join(cookies))
149
			self.async_redis.connect()
150
			user_db = do_db(User.objects.get, id=self.user_id)
151
			self.sender_name = user_db.username
152
			self.sex = user_db.sex_str
153
			user_rooms = get_users_in_current_user_rooms(self.user_id)
154
			self.ws_write(self.default(user_rooms, Actions.ROOMS, HandlerNames.CHANNELS))
155
			# get all missed messages
156
			self.channels = []  # py2 doesn't support clear()
157
			self.channels.append(self.channel)
158
			self.channels.append(self.id)
159
			for room_id in user_rooms:
160
				self.channels.append(room_id)
161
			self.listen(self.channels)
162
			off_messages, history = self.get_offline_messages(user_rooms)
163
			for room_id in user_rooms:
164
				self.add_online_user(room_id)
165
				if off_messages.get(room_id) or history.get(room_id):
166
					self.ws_write(self.load_offline_message(off_messages.get(room_id), history.get(room_id), room_id))
167
			self.logger.info("!! User %s subscribes for %s", self.sender_name, self.channels)
168
			self.connected = True
169
			# self.save_ip()
170
		else:
171
			self.logger.warning('!! Session key %s has been rejected', str(session_key))
172
			self.close(403, "Session key %s has been rejected" % session_key)
173
174
	def get_offline_messages(self, user_rooms):
175
		q_objects = Q()
176
		messages = self.get_argument('messages', None)
177
		if messages:
178
			pmessages = json.loads(messages)
179
		else:
180
			pmessages = {}
181
		for room_id in user_rooms:
182
			room_hf = pmessages.get(str(room_id))
183
			if room_hf:
184
				h = room_hf['h']
185
				f = room_hf['f']
186
				if not self.restored_connection:
187
					q_objects.add(Q(id__gte=h, room_id=room_id, deleted=False), Q.OR)
188
				else:
189
					q_objects.add(Q(room_id=room_id, deleted=False) & (( Q(id__gte=h) & Q(id__lte=f) & Q(edited_times__gt=0)) | Q(id__gt=f)), Q.OR)
190
		off_messages = Message.objects.filter(
191
			id__gt=F('room__roomusers__last_read_message_id'),
192
			deleted=False,
193
			room__roomusers__user_id=self.user_id
194
		)
195
		off = {}
196
		history = {}
197
		if len(q_objects.children) > 0:
198
			history_messages = Message.objects.filter(q_objects)
199
			all = list(chain(off_messages, history_messages))
200
		else:
201
			history_messages = []
202
			all = off_messages
203
		if self.restored_connection:
204
			off_messages = all
205
			history_messages = []
206
		imv = get_message_images_videos(all)
207
		self.set_video_images_messages(imv, off_messages, off)
208
		self.set_video_images_messages(imv, history_messages, history)
209
		return off, history
210
211
	def set_video_images_messages(self, imv, inm, outm):
212
		for message in inm:
213
			files = MessagesCreator.prepare_img_video(imv, message.id)
214
			prep_m = self.create_message(message, files)
215
			outm.setdefault(message.room_id, []).append(prep_m)
216
217
	def check_origin(self, origin):
218
		"""
219
		check whether browser set domain matches origin
220
		"""
221
		parsed_origin = urlparse(origin)
222
		origin = parsed_origin.netloc
223
		origin_domain = origin.split(':')[0].lower()
224
		browser_set = self.request.headers.get("Host")
225
		browser_domain = browser_set.split(':')[0]
226
		return browser_domain == origin_domain
227
228
	def save_ip(self):
229
		"""
230
		This code is not used anymore
231
		"""
232
		if not do_db(UserJoinedInfo.objects.filter(
233
				Q(ip__ip=self.ip) & Q(user_id=self.user_id)).exists):
234
			res = get_or_create_ip_wrapper(self.ip, self.logger, self.fetch_and_save_ip_http)
235
			if res is not None:
236
				UserJoinedInfo.objects.create(ip=res, user_id=self.user_id)
237
238
	@asynchronous
239
	def fetch_and_save_ip_http(self):
240
		"""
241
			This code is not used anymore
242
		"""
243
		def fetch_response(response):
244
			try:
245
				ip_record = create_ip_structure(self.ip, response.body)
246
			except Exception as e:
247
				self.logger.error("Error while creating ip with country info, because %s", e)
248
				ip_record = IpAddress.objects.create(ip=self.ip)
249
			UserJoinedInfo.objects.create(ip=ip_record, user_id=self.user_id)
250
		r = HTTPRequest(settings.IP_API_URL % self.ip, method="GET")
251
		self.http_client.fetch(r, callback=fetch_response)
252
253
	def ws_write(self, message):
254
		"""
255
		Tries to send message, doesn't throw exception outside
256
		:type self: MessagesHandler
257
		:type message object
258
		"""
259
		# self.logger.debug('<< THREAD %s >>', os.getppid())
260
		try:
261
			if isinstance(message, dict):
262
				message = json.dumps(message)
263
			if not isinstance(message, str_type):
264
				raise ValueError('Wrong message type : %s' % str(message))
265
			self.logger.debug(">> %.1000s", message)
266
			self.write_message(message)
267
		except WebSocketClosedError as e:
268
			self.logger.warning("%s. Can't send message << %s >> ", e, str(message))
269
270
	def get_client_ip(self):
271
		return self.request.headers.get("X-Real-IP") or self.request.remote_ip