1
|
|
|
import json |
2
|
|
|
import logging |
3
|
|
|
from datetime import timedelta |
4
|
|
|
from itertools import chain |
5
|
|
|
from numbers import Number |
6
|
|
|
from threading import Thread |
7
|
|
|
|
8
|
|
|
from django.conf import settings |
9
|
|
|
from django.core.exceptions import ValidationError |
10
|
|
|
from django.db.models import F, Q |
11
|
|
|
from redis_sessions.session import SessionStore |
12
|
|
|
from tornado import ioloop |
13
|
|
|
from tornado.httpclient import AsyncHTTPClient, HTTPRequest |
14
|
|
|
from tornado.web import asynchronous |
15
|
|
|
from tornado.websocket import WebSocketHandler, WebSocketClosedError |
16
|
|
|
|
17
|
|
|
from chat.cookies_middleware import create_id |
18
|
|
|
from chat.models import User, Message, UserJoinedInfo, IpAddress |
19
|
|
|
from chat.py2_3 import str_type, urlparse |
20
|
|
|
from chat.tornado.anti_spam import AntiSpam |
21
|
|
|
from chat.tornado.constants import VarNames, HandlerNames, Actions |
22
|
|
|
from chat.tornado.message_creator import MessagesCreator |
23
|
|
|
from chat.tornado.message_handler import MessagesHandler, WebRtcMessageHandler |
24
|
|
|
from chat.utils import execute_query, do_db, get_or_create_ip, get_users_in_current_user_rooms, get_message_images_videos, get_or_create_ip_wrapper, create_ip_structure |
25
|
|
|
|
26
|
|
|
sessionStore = SessionStore() |
27
|
|
|
|
28
|
|
|
parent_logger = logging.getLogger(__name__) |
29
|
|
|
|
30
|
|
|
|
31
|
|
|
class TornadoHandler(WebSocketHandler, WebRtcMessageHandler): |
32
|
|
|
|
33
|
|
|
def __init__(self, *args, **kwargs): |
34
|
|
|
super(TornadoHandler, self).__init__(*args, **kwargs) |
35
|
|
|
self.__connected__ = False |
36
|
|
|
self.restored_connection = False |
37
|
|
|
self.__http_client__ = AsyncHTTPClient() |
38
|
|
|
self.anti_spam = AntiSpam() |
39
|
|
|
|
40
|
|
|
@property |
41
|
|
|
def connected(self): |
42
|
|
|
return self.__connected__ |
43
|
|
|
|
44
|
|
|
@connected.setter |
45
|
|
|
def connected(self, value): |
46
|
|
|
self.__connected__ = value |
47
|
|
|
|
48
|
|
|
@property |
49
|
|
|
def http_client(self): |
50
|
|
|
""" |
51
|
|
|
@type: AsyncHTTPClient |
52
|
|
|
""" |
53
|
|
|
return self.__http_client__ |
54
|
|
|
|
55
|
|
|
def data_received(self, chunk): |
56
|
|
|
pass |
57
|
|
|
|
58
|
|
|
def on_message(self, json_message): |
59
|
|
|
try: |
60
|
|
|
if not self.connected: |
61
|
|
|
raise ValidationError('Skipping message %s, as websocket is not initialized yet' % json_message) |
62
|
|
|
if not json_message: |
63
|
|
|
raise Exception('Skipping null message') |
64
|
|
|
# self.anti_spam.check_spam(json_message) |
65
|
|
|
self.logger.debug('<< %.1000s', json_message) |
66
|
|
|
message = json.loads(json_message) |
67
|
|
|
if message[VarNames.EVENT] not in self.process_ws_message: |
68
|
|
|
raise Exception("event {} is unknown".format(message[VarNames.EVENT])) |
69
|
|
|
channel = message.get(VarNames.CHANNEL) |
70
|
|
|
if channel and channel not in self.channels: |
71
|
|
|
raise ValidationError('Access denied for channel {}. Allowed channels: {}'.format(channel, self.channels)) |
72
|
|
|
self.process_ws_message[message[VarNames.EVENT]](message) |
73
|
|
|
except ValidationError as e: |
74
|
|
|
error_message = self.default(str(e.message), Actions.GROWL_MESSAGE, HandlerNames.GROWL) |
75
|
|
|
self.ws_write(error_message) |
76
|
|
|
|
77
|
|
|
def publish_logout(self, channel, log_data): |
78
|
|
|
# seems like async solves problem with connection lost and wrong data status |
79
|
|
|
# http://programmers.stackexchange.com/questions/294663/how-to-store-online-status |
80
|
|
|
is_online, online = self.get_online_and_status_from_redis(channel) |
81
|
|
|
log_data[channel] = {'online': online, 'is_online': is_online} |
82
|
|
|
if not is_online: |
83
|
|
|
message = self.room_online(online, Actions.LOGOUT, channel) |
84
|
|
|
self.publish(message, channel) |
85
|
|
|
return True |
86
|
|
|
|
87
|
|
|
def on_close(self): |
88
|
|
|
if self.async_redis.subscribed: |
89
|
|
|
self.logger.info("Close event, unsubscribing from %s", self.channels) |
90
|
|
|
self.async_redis.unsubscribe(self.channels) |
91
|
|
|
else: |
92
|
|
|
self.logger.info("Close event, not subscribed, channels: %s", self.channels) |
93
|
|
|
log_data = {} |
94
|
|
|
gone_offline = False |
95
|
|
|
for channel in self.channels: |
96
|
|
|
if not isinstance(channel, Number): |
97
|
|
|
continue |
98
|
|
|
self.sync_redis.srem(channel, self.id) |
99
|
|
|
if self.connected: |
100
|
|
|
gone_offline = self.publish_logout(channel, log_data) or gone_offline |
101
|
|
|
if gone_offline: |
102
|
|
|
res = do_db(execute_query, settings.UPDATE_LAST_READ_MESSAGE, [self.user_id, ]) |
103
|
|
|
self.logger.info("Updated %s last read message", res) |
104
|
|
|
self.disconnect(json.dumps(log_data)) |
105
|
|
|
|
106
|
|
|
def disconnect(self, log_data, tries=0): |
107
|
|
|
""" |
108
|
|
|
Closes a connection if it's not in proggress, otherwice timeouts closing |
109
|
|
|
https://github.com/evilkost/brukva/issues/25#issuecomment-9468227 |
110
|
|
|
""" |
111
|
|
|
self.connected = False |
112
|
|
|
self.closed_channels = self.channels |
113
|
|
|
self.channels = [] |
114
|
|
|
if self.async_redis.connection.in_progress and tries < 1000: # failsafe eternal loop |
115
|
|
|
self.logger.debug('Closing a connection timeouts') |
116
|
|
|
ioloop.IOLoop.instance().add_timeout(timedelta(0.00001), self.disconnect, log_data, tries+1) |
117
|
|
|
else: |
118
|
|
|
self.logger.info("Close connection result: %s", log_data) |
119
|
|
|
self.async_redis.disconnect() |
120
|
|
|
|
121
|
|
|
def generate_self_id(self): |
122
|
|
|
""" |
123
|
|
|
When user opens new tab in browser wsHandler.wsConnectionId stores Id of current ws |
124
|
|
|
So if ws loses a connection it still can reconnect with same id, |
125
|
|
|
and TornadoHandler can restore webrtc_connections to previous state |
126
|
|
|
""" |
127
|
|
|
conn_arg = self.get_argument('id', None) |
128
|
|
|
self.id, random = create_id(self.user_id, conn_arg) |
129
|
|
|
if random != conn_arg: |
130
|
|
|
self.restored_connection = False |
131
|
|
|
self.ws_write(self.set_ws_id(random, self.id)) |
132
|
|
|
else: |
133
|
|
|
self.restored_connection = True |
134
|
|
|
|
135
|
|
|
def open(self): |
136
|
|
|
session_key = self.get_cookie(settings.SESSION_COOKIE_NAME) |
137
|
|
|
if sessionStore.exists(session_key): |
138
|
|
|
self.ip = self.get_client_ip() |
139
|
|
|
session = SessionStore(session_key) |
140
|
|
|
self.user_id = int(session["_auth_user_id"]) |
141
|
|
|
self.generate_self_id() |
142
|
|
|
log_params = { |
143
|
|
|
'id': self.id, |
144
|
|
|
'ip': self.ip |
145
|
|
|
} |
146
|
|
|
self._logger = logging.LoggerAdapter(parent_logger, log_params) |
147
|
|
|
cookies = ["{}={}".format(k, self.request.cookies[k].value) for k in self.request.cookies] |
148
|
|
|
self.logger.debug("!! Incoming connection, session %s, thread hash %s, cookies: %s", session_key, self.id, ";".join(cookies)) |
149
|
|
|
self.async_redis.connect() |
150
|
|
|
user_db = do_db(User.objects.get, id=self.user_id) |
151
|
|
|
self.sender_name = user_db.username |
152
|
|
|
self.sex = user_db.sex_str |
153
|
|
|
user_rooms = get_users_in_current_user_rooms(self.user_id) |
154
|
|
|
self.ws_write(self.default(user_rooms, Actions.ROOMS, HandlerNames.CHANNELS)) |
155
|
|
|
# get all missed messages |
156
|
|
|
self.channels = [] # py2 doesn't support clear() |
157
|
|
|
self.channels.append(self.channel) |
158
|
|
|
self.channels.append(self.id) |
159
|
|
|
for room_id in user_rooms: |
160
|
|
|
self.channels.append(room_id) |
161
|
|
|
self.listen(self.channels) |
162
|
|
|
off_messages, history = self.get_offline_messages(user_rooms) |
163
|
|
|
for room_id in user_rooms: |
164
|
|
|
self.add_online_user(room_id) |
165
|
|
|
if off_messages.get(room_id) or history.get(room_id): |
166
|
|
|
self.ws_write(self.load_offline_message(off_messages.get(room_id), history.get(room_id), room_id)) |
167
|
|
|
self.logger.info("!! User %s subscribes for %s", self.sender_name, self.channels) |
168
|
|
|
self.connected = True |
169
|
|
|
# self.save_ip() |
170
|
|
|
else: |
171
|
|
|
self.logger.warning('!! Session key %s has been rejected', str(session_key)) |
172
|
|
|
self.close(403, "Session key %s has been rejected" % session_key) |
173
|
|
|
|
174
|
|
|
def get_offline_messages(self, user_rooms): |
175
|
|
|
q_objects = Q() |
176
|
|
|
messages = self.get_argument('messages', None) |
177
|
|
|
if messages: |
178
|
|
|
pmessages = json.loads(messages) |
179
|
|
|
else: |
180
|
|
|
pmessages = {} |
181
|
|
|
for room_id in user_rooms: |
182
|
|
|
room_hf = pmessages.get(str(room_id)) |
183
|
|
|
if room_hf: |
184
|
|
|
h = room_hf['h'] |
185
|
|
|
f = room_hf['f'] |
186
|
|
|
if not self.restored_connection: |
187
|
|
|
q_objects.add(Q(id__gte=h, room_id=room_id, deleted=False), Q.OR) |
188
|
|
|
else: |
189
|
|
|
q_objects.add(Q(room_id=room_id, deleted=False) & (( Q(id__gte=h) & Q(id__lte=f) & Q(edited_times__gt=0)) | Q(id__gt=f)), Q.OR) |
190
|
|
|
off_messages = Message.objects.filter( |
191
|
|
|
id__gt=F('room__roomusers__last_read_message_id'), |
192
|
|
|
deleted=False, |
193
|
|
|
room__roomusers__user_id=self.user_id |
194
|
|
|
) |
195
|
|
|
off = {} |
196
|
|
|
history = {} |
197
|
|
|
if len(q_objects.children) > 0: |
198
|
|
|
history_messages = Message.objects.filter(q_objects) |
199
|
|
|
all = list(chain(off_messages, history_messages)) |
200
|
|
|
else: |
201
|
|
|
history_messages = [] |
202
|
|
|
all = off_messages |
203
|
|
|
if self.restored_connection: |
204
|
|
|
off_messages = all |
205
|
|
|
history_messages = [] |
206
|
|
|
imv = get_message_images_videos(all) |
207
|
|
|
self.set_video_images_messages(imv, off_messages, off) |
208
|
|
|
self.set_video_images_messages(imv, history_messages, history) |
209
|
|
|
return off, history |
210
|
|
|
|
211
|
|
|
def set_video_images_messages(self, imv, inm, outm): |
212
|
|
|
for message in inm: |
213
|
|
|
files = MessagesCreator.prepare_img_video(imv, message.id) |
214
|
|
|
prep_m = self.create_message(message, files) |
215
|
|
|
outm.setdefault(message.room_id, []).append(prep_m) |
216
|
|
|
|
217
|
|
|
def check_origin(self, origin): |
218
|
|
|
""" |
219
|
|
|
check whether browser set domain matches origin |
220
|
|
|
""" |
221
|
|
|
parsed_origin = urlparse(origin) |
222
|
|
|
origin = parsed_origin.netloc |
223
|
|
|
origin_domain = origin.split(':')[0].lower() |
224
|
|
|
browser_set = self.request.headers.get("Host") |
225
|
|
|
browser_domain = browser_set.split(':')[0] |
226
|
|
|
return browser_domain == origin_domain |
227
|
|
|
|
228
|
|
|
def save_ip(self): |
229
|
|
|
""" |
230
|
|
|
This code is not used anymore |
231
|
|
|
""" |
232
|
|
|
if not do_db(UserJoinedInfo.objects.filter( |
233
|
|
|
Q(ip__ip=self.ip) & Q(user_id=self.user_id)).exists): |
234
|
|
|
res = get_or_create_ip_wrapper(self.ip, self.logger, self.fetch_and_save_ip_http) |
235
|
|
|
if res is not None: |
236
|
|
|
UserJoinedInfo.objects.create(ip=res, user_id=self.user_id) |
237
|
|
|
|
238
|
|
|
@asynchronous |
239
|
|
|
def fetch_and_save_ip_http(self): |
240
|
|
|
""" |
241
|
|
|
This code is not used anymore |
242
|
|
|
""" |
243
|
|
|
def fetch_response(response): |
244
|
|
|
try: |
245
|
|
|
ip_record = create_ip_structure(self.ip, response.body) |
246
|
|
|
except Exception as e: |
247
|
|
|
self.logger.error("Error while creating ip with country info, because %s", e) |
248
|
|
|
ip_record = IpAddress.objects.create(ip=self.ip) |
249
|
|
|
UserJoinedInfo.objects.create(ip=ip_record, user_id=self.user_id) |
250
|
|
|
r = HTTPRequest(settings.IP_API_URL % self.ip, method="GET") |
251
|
|
|
self.http_client.fetch(r, callback=fetch_response) |
252
|
|
|
|
253
|
|
|
def ws_write(self, message): |
254
|
|
|
""" |
255
|
|
|
Tries to send message, doesn't throw exception outside |
256
|
|
|
:type self: MessagesHandler |
257
|
|
|
:type message object |
258
|
|
|
""" |
259
|
|
|
# self.logger.debug('<< THREAD %s >>', os.getppid()) |
260
|
|
|
try: |
261
|
|
|
if isinstance(message, dict): |
262
|
|
|
message = json.dumps(message) |
263
|
|
|
if not isinstance(message, str_type): |
264
|
|
|
raise ValueError('Wrong message type : %s' % str(message)) |
265
|
|
|
self.logger.debug(">> %.1000s", message) |
266
|
|
|
self.write_message(message) |
267
|
|
|
except WebSocketClosedError as e: |
268
|
|
|
self.logger.warning("%s. Can't send message << %s >> ", e, str(message)) |
269
|
|
|
|
270
|
|
|
def get_client_ip(self): |
271
|
|
|
return self.request.headers.get("X-Real-IP") or self.request.remote_ip |