Completed
Push — master ( 896f50...904355 )
by Andrew
35s
created

TornadoHandler.http_client()   A

Complexity

Conditions 1

Size

Total Lines 6

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
cc 1
dl 0
loc 6
rs 9.4285
c 0
b 0
f 0
1
import json
2
import logging
3
from numbers import Number
4
from threading import Thread
5
6
from datetime import timedelta
7
from django.conf import settings
8
from django.core.exceptions import ValidationError
9
from django.db.models import F, Q
10
from redis_sessions.session import SessionStore
11
from tornado import ioloop
12
from tornado.httpclient import AsyncHTTPClient
13
from tornado.websocket import WebSocketHandler, WebSocketClosedError
14
15
from chat.cookies_middleware import create_id
16
from chat.models import User, Message, UserJoinedInfo
17
from chat.py2_3 import str_type, urlparse
18
from chat.settings import UPDATE_LAST_READ_MESSAGE
19
from chat.tornado.anti_spam import AntiSpam
20
from chat.tornado.constants import VarNames, HandlerNames, Actions
21
from chat.tornado.message_handler import MessagesHandler, WebRtcMessageHandler
22
from chat.utils import execute_query, do_db, get_or_create_ip, get_users_in_current_user_rooms, get_message_images, \
23
	prepare_img
24
25
sessionStore = SessionStore()
26
27
parent_logger = logging.getLogger(__name__)
28
29
30
class TornadoHandler(WebSocketHandler, WebRtcMessageHandler):
31
32
	def __init__(self, *args, **kwargs):
33
		super(TornadoHandler, self).__init__(*args, **kwargs)
34
		self.__connected__ = False
35
		self.__http_client__ = AsyncHTTPClient()
36
		self.anti_spam = AntiSpam()
37
38
	@property
39
	def connected(self):
40
		return self.__connected__
41
42
	@connected.setter
43
	def connected(self, value):
44
		self.__connected__ = value
45
46
	@property
47
	def http_client(self):
48
		"""
49
		@type: AsyncHTTPClient
50
		"""
51
		return self.__http_client__
52
53
	def data_received(self, chunk):
54
		pass
55
56
	def on_message(self, json_message):
57
		try:
58
			if not self.connected:
59
				raise ValidationError('Skipping message %s, as websocket is not initialized yet' % json_message)
60
			if not json_message:
61
				raise Exception('Skipping null message')
62
			# self.anti_spam.check_spam(json_message)
63
			self.logger.debug('<< %.1000s', json_message)
64
			message = json.loads(json_message)
65
			if message[VarNames.EVENT] not in self.pre_process_message:
66
				raise Exception("event {} is unknown".format(message[VarNames.EVENT]))
67
			channel = message.get(VarNames.CHANNEL)
68
			if channel and channel not in self.channels:
69
				raise Exception('Access denied for channel {}. Allowed channels: {}'.format(channel, self.channels))
70
			self.pre_process_message[message[VarNames.EVENT]](message)
71
		except ValidationError as e:
72
			error_message = self.default(str(e.message), Actions.GROWL_MESSAGE, HandlerNames.GROWL)
73
			self.ws_write(error_message)
74
75
	def publish_logout(self, channel, log_data):
76
		# seems like async solves problem with connection lost and wrong data status
77
		# http://programmers.stackexchange.com/questions/294663/how-to-store-online-status
78
		is_online, online = self.get_online_and_status_from_redis(channel)
79
		log_data[channel] = {'online': online, 'is_online': is_online}
80
		if not is_online:
81
			message = self.room_online(online, Actions.LOGOUT, channel)
82
			self.publish(message, channel)
83
			return True
84
85
	def on_close(self):
86
		if self.async_redis.subscribed:
87
			self.logger.info("Close event, unsubscribing from %s", self.channels)
88
			self.async_redis.unsubscribe(self.channels)
89
		else:
90
			self.logger.info("Close event, not subscribed, channels: %s", self.channels)
91
		log_data = {}
92
		gone_offline = False
93
		for channel in self.channels:
94
			if not isinstance(channel, Number):
95
				continue
96
			self.sync_redis.srem(channel, self.id)
97
			if self.connected:
98
				gone_offline = self.publish_logout(channel, log_data) or gone_offline
99
		if gone_offline:
100
			res = do_db(execute_query, UPDATE_LAST_READ_MESSAGE, [self.user_id, ])
101
			self.logger.info("Updated %s last read message", res)
102
		self.disconnect(json.dumps(log_data))
103
104
	def disconnect(self, log_data, tries=0):
105
		"""
106
		Closes a connection if it's not in proggress, otherwice timeouts closing
107
		https://github.com/evilkost/brukva/issues/25#issuecomment-9468227
108
		"""
109
		self.connected = False
110
		self.closed_channels = self.channels
111
		self.channels = []
112
		if self.async_redis.connection.in_progress and tries < 1000:  # failsafe eternal loop
113
			self.logger.debug('Closing a connection timeouts')
114
			ioloop.IOLoop.instance().add_timeout(timedelta(0.00001), self.disconnect, log_data, tries+1)
115
		else:
116
			self.logger.info("Close connection result: %s", log_data)
117
			self.async_redis.disconnect()
118
119
	def generate_self_id(self):
120
		"""
121
		When user opens new tab in browser wsHandler.wsConnectionId stores Id of current ws
122
		So if ws loses a connection it still can reconnect with same id,
123
		and TornadoHandler can restore webrtc_connections to previous state
124
		"""
125
		conn_arg = self.get_argument('id', None)
126
		self.id, random = create_id(self.user_id, conn_arg)
127
		if random != conn_arg:
128
			self.ws_write(self.set_ws_id(random, self.id))
129
130
	def open(self):
131
		session_key = self.get_cookie(settings.SESSION_COOKIE_NAME)
132
		if sessionStore.exists(session_key):
133
			self.ip = self.get_client_ip()
134
			session = SessionStore(session_key)
135
			self.user_id = int(session["_auth_user_id"])
136
			self.generate_self_id()
137
			log_params = {
138
				'id': self.id,
139
				'ip': self.ip
140
			}
141
			self._logger = logging.LoggerAdapter(parent_logger, log_params)
142
			self.logger.debug("!! Incoming connection, session %s, thread hash %s", session_key, self.id)
143
			self.async_redis.connect()
144
			user_db = do_db(User.objects.get, id=self.user_id)
145
			self.sender_name = user_db.username
146
			self.sex = user_db.sex_str
147
			user_rooms = get_users_in_current_user_rooms(self.user_id)
148
			self.ws_write(self.default(user_rooms, Actions.ROOMS, HandlerNames.CHANNELS))
149
			# get all missed messages
150
			self.channels = []  # py2 doesn't support clear()
151
			self.channels.append(self.channel)
152
			self.channels.append(self.id)
153
			for room_id in user_rooms:
154
				self.channels.append(room_id)
155
			self.listen(self.channels)
156
			off_messages = self.get_offline_messages()
157
			for room_id in user_rooms:
158
				self.add_online_user(room_id, off_messages.get(room_id))
159
			self.logger.info("!! User %s subscribes for %s", self.sender_name, self.channels)
160
			self.connected = True
161
			Thread(target=self.save_ip).start()
162
		else:
163
			self.logger.warning('!! Session key %s has been rejected', str(session_key))
164
			self.close(403, "Session key %s has been rejected" % session_key)
165
166
	def get_offline_messages(self):
167
		res = {}
168
		off_mess = Message.objects.filter(
169
			id__gt=F('room__roomusers__last_read_message_id'),
170
			deleted=False,
171
			room__roomusers__user_id=self.user_id
172
		)
173
		images = do_db(get_message_images, off_mess)
174
		for message in off_mess:
175
			prep_m = self.create_message(message, prepare_img(images, message.id))
176
			res.setdefault(message.room_id, []).append(prep_m)
177
		return res
178
179
	def check_origin(self, origin):
180
		"""
181
		check whether browser set domain matches origin
182
		"""
183
		parsed_origin = urlparse(origin)
184
		origin = parsed_origin.netloc
185
		origin_domain = origin.split(':')[0].lower()
186
		browser_set = self.request.headers.get("Host")
187
		browser_domain = browser_set.split(':')[0]
188
		return browser_domain == origin_domain
189
190
	def save_ip(self):
191
		if (do_db(UserJoinedInfo.objects.filter(
192
					Q(ip__ip=self.ip) & Q(user_id=self.user_id)).exists)):
193
			return
194
		ip_address = get_or_create_ip(self.ip, self.logger)
195
		UserJoinedInfo.objects.create(
196
			ip=ip_address,
197
			user_id=self.user_id
198
		)
199
200
	def ws_write(self, message):
201
		"""
202
		Tries to send message, doesn't throw exception outside
203
		:type self: MessagesHandler
204
		:type message object
205
		"""
206
		# self.logger.debug('<< THREAD %s >>', os.getppid())
207
		try:
208
			if isinstance(message, dict):
209
				message = json.dumps(message)
210
			if not isinstance(message, str_type):
211
				raise ValueError('Wrong message type : %s' % str(message))
212
			self.logger.debug(">> %.1000s", message)
213
			self.write_message(message)
214
		except WebSocketClosedError as e:
215
			self.logger.error("%s. Can't send << %s >> message", e, str(message))
216
217
	def get_client_ip(self):
218
		return self.request.headers.get("X-Real-IP") or self.request.remote_ip