1
|
|
|
import json |
2
|
|
|
import logging |
3
|
|
|
from datetime import timedelta |
4
|
|
|
from itertools import chain |
5
|
|
|
from numbers import Number |
6
|
|
|
from threading import Thread |
7
|
|
|
|
8
|
|
|
from django.conf import settings |
9
|
|
|
from django.core.exceptions import ValidationError |
10
|
|
|
from django.db.models import F, Q |
11
|
|
|
from redis_sessions.session import SessionStore |
12
|
|
|
from tornado import ioloop |
13
|
|
|
from tornado.httpclient import AsyncHTTPClient, HTTPRequest |
14
|
|
|
from tornado.web import asynchronous |
15
|
|
|
from tornado.websocket import WebSocketHandler, WebSocketClosedError |
16
|
|
|
|
17
|
|
|
from chat.cookies_middleware import create_id |
18
|
|
|
from chat.models import User, Message, UserJoinedInfo, IpAddress, Room, RoomUsers |
19
|
|
|
from chat.py2_3 import str_type, urlparse |
20
|
|
|
from chat.tornado.anti_spam import AntiSpam |
21
|
|
|
from chat.tornado.constants import VarNames, HandlerNames, Actions, RedisPrefix |
22
|
|
|
from chat.tornado.message_creator import MessagesCreator |
23
|
|
|
from chat.tornado.message_handler import MessagesHandler, WebRtcMessageHandler |
24
|
|
|
from chat.utils import execute_query, do_db, \ |
25
|
|
|
get_message_images_videos, get_or_create_ip_wrapper, create_ip_structure, get_history_message_query |
26
|
|
|
|
27
|
|
|
sessionStore = SessionStore() |
28
|
|
|
|
29
|
|
|
parent_logger = logging.getLogger(__name__) |
30
|
|
|
|
31
|
|
|
|
32
|
|
|
class TornadoHandler(WebSocketHandler, WebRtcMessageHandler): |
33
|
|
|
|
34
|
|
|
def __init__(self, *args, **kwargs): |
35
|
|
|
super(TornadoHandler, self).__init__(*args, **kwargs) |
36
|
|
|
self.__connected__ = False |
37
|
|
|
self.restored_connection = False |
38
|
|
|
self.__http_client__ = AsyncHTTPClient() |
39
|
|
|
self.anti_spam = AntiSpam() |
40
|
|
|
|
41
|
|
|
@property |
42
|
|
|
def connected(self): |
43
|
|
|
return self.__connected__ |
44
|
|
|
|
45
|
|
|
@connected.setter |
46
|
|
|
def connected(self, value): |
47
|
|
|
self.__connected__ = value |
48
|
|
|
|
49
|
|
|
@property |
50
|
|
|
def http_client(self): |
51
|
|
|
""" |
52
|
|
|
@type: AsyncHTTPClient |
53
|
|
|
""" |
54
|
|
|
return self.__http_client__ |
55
|
|
|
|
56
|
|
|
def data_received(self, chunk): |
57
|
|
|
pass |
58
|
|
|
|
59
|
|
|
def on_message(self, json_message): |
60
|
|
|
try: |
61
|
|
|
if not self.connected: |
62
|
|
|
raise ValidationError('Skipping message %s, as websocket is not initialized yet' % json_message) |
63
|
|
|
if not json_message: |
64
|
|
|
raise Exception('Skipping null message') |
65
|
|
|
# self.anti_spam.check_spam(json_message) |
66
|
|
|
self.logger.debug('<< %.1000s', json_message) |
67
|
|
|
message = json.loads(json_message) |
68
|
|
|
if message[VarNames.EVENT] not in self.process_ws_message: |
69
|
|
|
raise Exception("event {} is unknown".format(message[VarNames.EVENT])) |
70
|
|
|
channel = message.get(VarNames.ROOM_ID) |
71
|
|
|
if channel and channel not in self.channels: |
72
|
|
|
raise ValidationError('Access denied for channel {}. Allowed channels: {}'.format(channel, self.channels)) |
73
|
|
|
self.process_ws_message[message[VarNames.EVENT]](message) |
74
|
|
|
except ValidationError as e: |
75
|
|
|
error_message = self.default(str(e.message), Actions.GROWL_MESSAGE, HandlerNames.GROWL) |
76
|
|
|
self.ws_write(error_message) |
77
|
|
|
|
78
|
|
|
def on_close(self): |
79
|
|
|
if self.async_redis.subscribed: |
80
|
|
|
self.logger.info("Close event, unsubscribing from %s", self.channels) |
81
|
|
|
self.async_redis.unsubscribe(self.channels) |
82
|
|
|
else: |
83
|
|
|
self.logger.info("Close event, not subscribed, channels: %s", self.channels) |
84
|
|
|
self.async_redis_publisher.srem(RedisPrefix.ONLINE_VAR, self.id) |
85
|
|
|
is_online, online = self.get_online_and_status_from_redis() |
86
|
|
|
if self.connected: |
87
|
|
|
if not is_online: |
88
|
|
|
message = self.room_online(online, Actions.LOGOUT) |
89
|
|
|
self.publish(message, settings.ALL_ROOM_ID) |
90
|
|
|
res = do_db(execute_query, settings.UPDATE_LAST_READ_MESSAGE, [self.user_id, ]) |
91
|
|
|
self.logger.info("Updated %s last read message", res) |
92
|
|
|
self.disconnect() |
93
|
|
|
|
94
|
|
|
def disconnect(self, tries=0): |
95
|
|
|
""" |
96
|
|
|
Closes a connection if it's not in proggress, otherwice timeouts closing |
97
|
|
|
https://github.com/evilkost/brukva/issues/25#issuecomment-9468227 |
98
|
|
|
""" |
99
|
|
|
self.connected = False |
100
|
|
|
self.closed_channels = self.channels |
101
|
|
|
self.channels = [] |
102
|
|
|
if self.async_redis.connection.in_progress and tries < 1000: # failsafe eternal loop |
103
|
|
|
self.logger.debug('Closing a connection timeouts') |
104
|
|
|
ioloop.IOLoop.instance().add_timeout(timedelta(0.00001), self.disconnect, tries+1) |
105
|
|
|
else: |
106
|
|
|
self.logger.info("Close connection result: %s") |
107
|
|
|
self.async_redis.disconnect() |
108
|
|
|
|
109
|
|
|
def generate_self_id(self): |
110
|
|
|
""" |
111
|
|
|
When user opens new tab in browser wsHandler.wsConnectionId stores Id of current ws |
112
|
|
|
So if ws loses a connection it still can reconnect with same id, |
113
|
|
|
and TornadoHandler can restore webrtc_connections to previous state |
114
|
|
|
""" |
115
|
|
|
conn_arg = self.get_argument('id', None) |
116
|
|
|
self.id, random = create_id(self.user_id, conn_arg) |
117
|
|
|
if random != conn_arg: |
118
|
|
|
self.restored_connection = False |
119
|
|
|
self.ws_write(self.set_ws_id(random, self.id)) |
120
|
|
|
else: |
121
|
|
|
self.restored_connection = True |
122
|
|
|
|
123
|
|
|
def open(self): |
124
|
|
|
session_key = self.get_cookie(settings.SESSION_COOKIE_NAME) |
125
|
|
|
if sessionStore.exists(session_key): |
126
|
|
|
self.ip = self.get_client_ip() |
127
|
|
|
session = SessionStore(session_key) |
128
|
|
|
self.user_id = int(session["_auth_user_id"]) |
129
|
|
|
self.generate_self_id() |
130
|
|
|
self._logger = logging.LoggerAdapter(parent_logger, { |
131
|
|
|
'id': self.id, |
132
|
|
|
'ip': self.ip |
133
|
|
|
}) |
134
|
|
|
cookies = ["{}={}".format(k, self.request.cookies[k].value) for k in self.request.cookies] |
135
|
|
|
self.logger.debug("!! Incoming connection, session %s, thread hash %s, cookies: %s", session_key, self.id, ";".join(cookies)) |
136
|
|
|
self.async_redis.connect() |
137
|
|
|
self.async_redis_publisher.sadd(RedisPrefix.ONLINE_VAR, self.id) |
138
|
|
|
# since we add user to online first, latest trigger will always show correct online |
139
|
|
|
was_online, online = self.get_online_and_status_from_redis() |
140
|
|
|
user_db = do_db(User.objects.get, id=self.user_id) |
141
|
|
|
self.sender_name = user_db.username |
142
|
|
|
self.sex = user_db.sex_str |
143
|
|
|
user_rooms1 = Room.objects.filter(users__id=self.user_id, disabled=False)\ |
144
|
|
|
.values('id', 'name', 'roomusers__notifications', 'roomusers__volume') |
145
|
|
|
user_rooms = MessagesCreator.create_user_rooms(user_rooms1) |
146
|
|
|
room_ids = [room_id for room_id in user_rooms] |
147
|
|
|
rooms_users = RoomUsers.objects.filter(room_id__in=room_ids).values('user_id', 'room_id') |
148
|
|
|
for ru in rooms_users: |
149
|
|
|
user_rooms[ru['room_id']][VarNames.ROOM_USERS].append(ru['user_id']) |
150
|
|
|
# get all missed messages |
151
|
|
|
self.channels = room_ids # py2 doesn't support clear() |
152
|
|
|
self.channels.append(self.channel) |
153
|
|
|
self.channels.append(self.id) |
154
|
|
|
self.listen(self.channels) |
155
|
|
|
off_messages, history = self.get_offline_messages(user_rooms, was_online, self.get_argument('history', False)) |
156
|
|
|
for room_id in user_rooms: |
157
|
|
|
h = history.get(room_id) |
158
|
|
|
o = off_messages.get(room_id) |
159
|
|
|
if h: |
160
|
|
|
user_rooms[room_id][VarNames.LOAD_MESSAGES_HISTORY] = h |
161
|
|
|
if o: |
162
|
|
|
user_rooms[room_id][VarNames.LOAD_MESSAGES_OFFLINE] = o |
163
|
|
|
user_dict = {} |
164
|
|
|
for user in User.objects.values('id', 'username', 'sex'): |
165
|
|
|
user_dict[user['id']] = RedisPrefix.set_js_user_structure(user['username'], user['sex']) |
166
|
|
|
if self.user_id not in online: |
167
|
|
|
online.append(self.user_id) |
168
|
|
|
self.ws_write(self.set_room(user_rooms, user_dict, online)) |
169
|
|
|
if not was_online: # if a new tab has been opened |
170
|
|
|
online_user_names_mes = self.room_online(online, Actions.LOGIN) |
171
|
|
|
self.logger.info('!! First tab, sending refresh online for all') |
172
|
|
|
self.publish(online_user_names_mes, settings.ALL_ROOM_ID) |
173
|
|
|
self.logger.info("!! User %s subscribes for %s", self.sender_name, self.channels) |
174
|
|
|
self.connected = True |
175
|
|
|
else: |
176
|
|
|
self.logger.warning('!! Session key %s has been rejected', str(session_key)) |
177
|
|
|
self.close(403, "Session key %s has been rejected" % session_key) |
178
|
|
|
|
179
|
|
|
def get_offline_messages(self, user_rooms, was_online, with_history): |
180
|
|
|
q_objects = get_history_message_query(self.get_argument('messages', None), user_rooms, with_history) |
181
|
|
|
if was_online: |
182
|
|
|
off_messages = [] |
183
|
|
|
else: |
184
|
|
|
off_messages = Message.objects.filter( |
185
|
|
|
id__gt=F('room__roomusers__last_read_message_id'), |
186
|
|
|
room__roomusers__user_id=self.user_id |
187
|
|
|
) |
188
|
|
|
off = {} |
189
|
|
|
history = {} |
190
|
|
|
if len(q_objects.children) > 0: |
191
|
|
|
history_messages = Message.objects.filter(q_objects) |
192
|
|
|
all = list(chain(off_messages, history_messages)) |
193
|
|
|
self.logger.info("Offline messages IDs: %s, history messages: %s", [m.id for m in off_messages], [m.id for m in history_messages]) |
194
|
|
|
else: |
195
|
|
|
history_messages = [] |
196
|
|
|
all = off_messages |
197
|
|
|
if self.restored_connection: |
198
|
|
|
off_messages = all |
199
|
|
|
history_messages = [] |
200
|
|
|
imv = get_message_images_videos(all) |
201
|
|
|
self.set_video_images_messages(imv, off_messages, off) |
202
|
|
|
self.set_video_images_messages(imv, history_messages, history) |
203
|
|
|
return off, history |
204
|
|
|
|
205
|
|
|
def set_video_images_messages(self, imv, inm, outm): |
206
|
|
|
for message in inm: |
207
|
|
|
files = MessagesCreator.prepare_img_video(imv, message.id) |
208
|
|
|
prep_m = self.create_message(message, files) |
209
|
|
|
outm.setdefault(message.room_id, []).append(prep_m) |
210
|
|
|
|
211
|
|
|
def check_origin(self, origin): |
212
|
|
|
""" |
213
|
|
|
check whether browser set domain matches origin |
214
|
|
|
""" |
215
|
|
|
parsed_origin = urlparse(origin) |
216
|
|
|
origin = parsed_origin.netloc |
217
|
|
|
origin_domain = origin.split(':')[0].lower() |
218
|
|
|
browser_set = self.request.headers.get("Host") |
219
|
|
|
browser_domain = browser_set.split(':')[0] |
220
|
|
|
return browser_domain == origin_domain |
221
|
|
|
|
222
|
|
|
def save_ip(self): |
223
|
|
|
""" |
224
|
|
|
This code is not used anymore |
225
|
|
|
""" |
226
|
|
|
if not do_db(UserJoinedInfo.objects.filter( |
227
|
|
|
Q(ip__ip=self.ip) & Q(user_id=self.user_id)).exists): |
228
|
|
|
res = get_or_create_ip_wrapper(self.ip, self.logger, self.fetch_and_save_ip_http) |
229
|
|
|
if res is not None: |
230
|
|
|
UserJoinedInfo.objects.create(ip=res, user_id=self.user_id) |
231
|
|
|
|
232
|
|
|
@asynchronous |
233
|
|
|
def fetch_and_save_ip_http(self): |
234
|
|
|
""" |
235
|
|
|
This code is not used anymore |
236
|
|
|
""" |
237
|
|
|
def fetch_response(response): |
238
|
|
|
try: |
239
|
|
|
ip_record = create_ip_structure(self.ip, response.body) |
240
|
|
|
except Exception as e: |
241
|
|
|
self.logger.error("Error while creating ip with country info, because %s", e) |
242
|
|
|
ip_record = IpAddress.objects.create(ip=self.ip) |
243
|
|
|
UserJoinedInfo.objects.create(ip=ip_record, user_id=self.user_id) |
244
|
|
|
r = HTTPRequest(settings.IP_API_URL % self.ip, method="GET") |
245
|
|
|
self.http_client.fetch(r, callback=fetch_response) |
246
|
|
|
|
247
|
|
|
def ws_write(self, message): |
248
|
|
|
""" |
249
|
|
|
Tries to send message, doesn't throw exception outside |
250
|
|
|
:type self: MessagesHandler |
251
|
|
|
:type message object |
252
|
|
|
""" |
253
|
|
|
# self.logger.debug('<< THREAD %s >>', os.getppid()) |
254
|
|
|
try: |
255
|
|
|
if isinstance(message, dict): |
256
|
|
|
message = json.dumps(message) |
257
|
|
|
if not isinstance(message, str_type): |
258
|
|
|
raise ValueError('Wrong message type : %s' % str(message)) |
259
|
|
|
self.logger.debug(">> %.1000s", message) |
260
|
|
|
self.write_message(message) |
261
|
|
|
except WebSocketClosedError as e: |
262
|
|
|
self.logger.warning("%s. Can't send message << %s >> ", e, str(message)) |
263
|
|
|
|
264
|
|
|
def get_client_ip(self): |
265
|
|
|
return self.request.headers.get("X-Real-IP") or self.request.remote_ip |