|
1
|
|
|
import json |
|
2
|
|
|
import logging |
|
3
|
|
|
from datetime import timedelta |
|
4
|
|
|
from itertools import chain |
|
5
|
|
|
from numbers import Number |
|
6
|
|
|
from threading import Thread |
|
7
|
|
|
|
|
8
|
|
|
from django.conf import settings |
|
9
|
|
|
from django.core.exceptions import ValidationError |
|
10
|
|
|
from django.db.models import F, Q |
|
11
|
|
|
from redis_sessions.session import SessionStore |
|
12
|
|
|
from tornado import ioloop |
|
13
|
|
|
from tornado.httpclient import AsyncHTTPClient, HTTPRequest |
|
14
|
|
|
from tornado.web import asynchronous |
|
15
|
|
|
from tornado.websocket import WebSocketHandler, WebSocketClosedError |
|
16
|
|
|
|
|
17
|
|
|
from chat.cookies_middleware import create_id |
|
18
|
|
|
from chat.models import User, Message, UserJoinedInfo, IpAddress |
|
19
|
|
|
from chat.py2_3 import str_type, urlparse |
|
20
|
|
|
from chat.tornado.anti_spam import AntiSpam |
|
21
|
|
|
from chat.tornado.constants import VarNames, HandlerNames, Actions |
|
22
|
|
|
from chat.tornado.message_creator import MessagesCreator |
|
23
|
|
|
from chat.tornado.message_handler import MessagesHandler, WebRtcMessageHandler |
|
24
|
|
|
from chat.utils import execute_query, do_db, get_or_create_ip, get_users_in_current_user_rooms, \ |
|
25
|
|
|
get_message_images_videos, get_or_create_ip_wrapper, create_ip_structure, get_history_message_query |
|
26
|
|
|
|
|
27
|
|
|
sessionStore = SessionStore() |
|
28
|
|
|
|
|
29
|
|
|
parent_logger = logging.getLogger(__name__) |
|
30
|
|
|
|
|
31
|
|
|
|
|
32
|
|
|
class TornadoHandler(WebSocketHandler, WebRtcMessageHandler): |
|
33
|
|
|
|
|
34
|
|
|
def __init__(self, *args, **kwargs): |
|
35
|
|
|
super(TornadoHandler, self).__init__(*args, **kwargs) |
|
36
|
|
|
self.__connected__ = False |
|
37
|
|
|
self.restored_connection = False |
|
38
|
|
|
self.__http_client__ = AsyncHTTPClient() |
|
39
|
|
|
self.anti_spam = AntiSpam() |
|
40
|
|
|
|
|
41
|
|
|
@property |
|
42
|
|
|
def connected(self): |
|
43
|
|
|
return self.__connected__ |
|
44
|
|
|
|
|
45
|
|
|
@connected.setter |
|
46
|
|
|
def connected(self, value): |
|
47
|
|
|
self.__connected__ = value |
|
48
|
|
|
|
|
49
|
|
|
@property |
|
50
|
|
|
def http_client(self): |
|
51
|
|
|
""" |
|
52
|
|
|
@type: AsyncHTTPClient |
|
53
|
|
|
""" |
|
54
|
|
|
return self.__http_client__ |
|
55
|
|
|
|
|
56
|
|
|
def data_received(self, chunk): |
|
57
|
|
|
pass |
|
58
|
|
|
|
|
59
|
|
|
def on_message(self, json_message): |
|
60
|
|
|
try: |
|
61
|
|
|
if not self.connected: |
|
62
|
|
|
raise ValidationError('Skipping message %s, as websocket is not initialized yet' % json_message) |
|
63
|
|
|
if not json_message: |
|
64
|
|
|
raise Exception('Skipping null message') |
|
65
|
|
|
# self.anti_spam.check_spam(json_message) |
|
66
|
|
|
self.logger.debug('<< %.1000s', json_message) |
|
67
|
|
|
message = json.loads(json_message) |
|
68
|
|
|
if message[VarNames.EVENT] not in self.process_ws_message: |
|
69
|
|
|
raise Exception("event {} is unknown".format(message[VarNames.EVENT])) |
|
70
|
|
|
channel = message.get(VarNames.CHANNEL) |
|
71
|
|
|
if channel and channel not in self.channels: |
|
72
|
|
|
raise ValidationError('Access denied for channel {}. Allowed channels: {}'.format(channel, self.channels)) |
|
73
|
|
|
self.process_ws_message[message[VarNames.EVENT]](message) |
|
74
|
|
|
except ValidationError as e: |
|
75
|
|
|
error_message = self.default(str(e.message), Actions.GROWL_MESSAGE, HandlerNames.GROWL) |
|
76
|
|
|
self.ws_write(error_message) |
|
77
|
|
|
|
|
78
|
|
|
def publish_logout(self, channel, log_data): |
|
79
|
|
|
# seems like async solves problem with connection lost and wrong data status |
|
80
|
|
|
# http://programmers.stackexchange.com/questions/294663/how-to-store-online-status |
|
81
|
|
|
is_online, online = self.get_online_and_status_from_redis(channel) |
|
82
|
|
|
log_data[channel] = {'online': online, 'is_online': is_online} |
|
83
|
|
|
if not is_online: |
|
84
|
|
|
message = self.room_online(online, Actions.LOGOUT, channel) |
|
85
|
|
|
self.publish(message, channel) |
|
86
|
|
|
return True |
|
87
|
|
|
|
|
88
|
|
|
def on_close(self): |
|
89
|
|
|
if self.async_redis.subscribed: |
|
90
|
|
|
self.logger.info("Close event, unsubscribing from %s", self.channels) |
|
91
|
|
|
self.async_redis.unsubscribe(self.channels) |
|
92
|
|
|
else: |
|
93
|
|
|
self.logger.info("Close event, not subscribed, channels: %s", self.channels) |
|
94
|
|
|
log_data = {} |
|
95
|
|
|
for channel in self.channels: |
|
96
|
|
|
if not isinstance(channel, Number): |
|
97
|
|
|
continue |
|
98
|
|
|
self.sync_redis.srem(channel, self.id) |
|
99
|
|
|
if self.connected: |
|
100
|
|
|
self.publish_logout(channel, log_data) |
|
101
|
|
|
if self.connected: |
|
102
|
|
|
res = do_db(execute_query, settings.UPDATE_LAST_READ_MESSAGE, [self.user_id, ]) |
|
103
|
|
|
self.logger.info("Updated %s last read message", res) |
|
104
|
|
|
self.disconnect(json.dumps(log_data)) |
|
105
|
|
|
|
|
106
|
|
|
def disconnect(self, log_data, tries=0): |
|
107
|
|
|
""" |
|
108
|
|
|
Closes a connection if it's not in proggress, otherwice timeouts closing |
|
109
|
|
|
https://github.com/evilkost/brukva/issues/25#issuecomment-9468227 |
|
110
|
|
|
""" |
|
111
|
|
|
self.connected = False |
|
112
|
|
|
self.closed_channels = self.channels |
|
113
|
|
|
self.channels = [] |
|
114
|
|
|
if self.async_redis.connection.in_progress and tries < 1000: # failsafe eternal loop |
|
115
|
|
|
self.logger.debug('Closing a connection timeouts') |
|
116
|
|
|
ioloop.IOLoop.instance().add_timeout(timedelta(0.00001), self.disconnect, log_data, tries+1) |
|
117
|
|
|
else: |
|
118
|
|
|
self.logger.info("Close connection result: %s", log_data) |
|
119
|
|
|
self.async_redis.disconnect() |
|
120
|
|
|
|
|
121
|
|
|
def generate_self_id(self): |
|
122
|
|
|
""" |
|
123
|
|
|
When user opens new tab in browser wsHandler.wsConnectionId stores Id of current ws |
|
124
|
|
|
So if ws loses a connection it still can reconnect with same id, |
|
125
|
|
|
and TornadoHandler can restore webrtc_connections to previous state |
|
126
|
|
|
""" |
|
127
|
|
|
conn_arg = self.get_argument('id', None) |
|
128
|
|
|
self.id, random = create_id(self.user_id, conn_arg) |
|
129
|
|
|
if random != conn_arg: |
|
130
|
|
|
self.restored_connection = False |
|
131
|
|
|
self.ws_write(self.set_ws_id(random, self.id)) |
|
132
|
|
|
else: |
|
133
|
|
|
self.restored_connection = True |
|
134
|
|
|
|
|
135
|
|
|
def open(self): |
|
136
|
|
|
session_key = self.get_cookie(settings.SESSION_COOKIE_NAME) |
|
137
|
|
|
if sessionStore.exists(session_key): |
|
138
|
|
|
self.ip = self.get_client_ip() |
|
139
|
|
|
session = SessionStore(session_key) |
|
140
|
|
|
self.user_id = int(session["_auth_user_id"]) |
|
141
|
|
|
self.generate_self_id() |
|
142
|
|
|
log_params = { |
|
143
|
|
|
'id': self.id, |
|
144
|
|
|
'ip': self.ip |
|
145
|
|
|
} |
|
146
|
|
|
self._logger = logging.LoggerAdapter(parent_logger, log_params) |
|
147
|
|
|
cookies = ["{}={}".format(k, self.request.cookies[k].value) for k in self.request.cookies] |
|
148
|
|
|
self.logger.debug("!! Incoming connection, session %s, thread hash %s, cookies: %s", session_key, self.id, ";".join(cookies)) |
|
149
|
|
|
self.async_redis.connect() |
|
150
|
|
|
user_db = do_db(User.objects.get, id=self.user_id) |
|
151
|
|
|
self.sender_name = user_db.username |
|
152
|
|
|
self.sex = user_db.sex_str |
|
153
|
|
|
user_rooms = get_users_in_current_user_rooms(self.user_id) |
|
154
|
|
|
self.ws_write(self.default(user_rooms, Actions.ROOMS, HandlerNames.CHANNELS)) |
|
155
|
|
|
# get all missed messages |
|
156
|
|
|
self.channels = [] # py2 doesn't support clear() |
|
157
|
|
|
self.channels.append(self.channel) |
|
158
|
|
|
self.channels.append(self.id) |
|
159
|
|
|
rooms_online = {} |
|
160
|
|
|
was_online = False |
|
161
|
|
|
for room_id in user_rooms: |
|
162
|
|
|
self.channels.append(room_id) |
|
163
|
|
|
rooms_online[room_id] = self.get_is_online(room_id) |
|
164
|
|
|
was_online = was_online or rooms_online[room_id][0] |
|
165
|
|
|
self.listen(self.channels) |
|
166
|
|
|
off_messages, history = self.get_offline_messages(user_rooms, was_online) |
|
167
|
|
|
for room_id in user_rooms: |
|
168
|
|
|
self.get_is_online(room_id) |
|
169
|
|
|
is_online = self.add_online_user(room_id, rooms_online[room_id][0], rooms_online[room_id][1]) |
|
170
|
|
|
if off_messages.get(room_id) or history.get(room_id): |
|
171
|
|
|
self.ws_write(self.load_offline_message(off_messages.get(room_id), history.get(room_id), room_id)) |
|
172
|
|
|
self.logger.info("!! User %s subscribes for %s", self.sender_name, self.channels) |
|
173
|
|
|
self.connected = True |
|
174
|
|
|
# self.save_ip() |
|
175
|
|
|
else: |
|
176
|
|
|
self.logger.warning('!! Session key %s has been rejected', str(session_key)) |
|
177
|
|
|
self.close(403, "Session key %s has been rejected" % session_key) |
|
178
|
|
|
|
|
179
|
|
|
def get_offline_messages(self, user_rooms, was_online): |
|
180
|
|
|
q_objects = get_history_message_query(self.get_argument('messages', None), user_rooms, self.restored_connection) |
|
181
|
|
|
if was_online: |
|
182
|
|
|
off_messages = [] |
|
183
|
|
|
else: |
|
184
|
|
|
off_messages = Message.objects.filter( |
|
185
|
|
|
id__gt=F('room__roomusers__last_read_message_id'), |
|
186
|
|
|
deleted=False, |
|
187
|
|
|
room__roomusers__user_id=self.user_id |
|
188
|
|
|
) |
|
189
|
|
|
off = {} |
|
190
|
|
|
history = {} |
|
191
|
|
|
if len(q_objects.children) > 0: |
|
192
|
|
|
history_messages = Message.objects.filter(q_objects) |
|
193
|
|
|
all = list(chain(off_messages, history_messages)) |
|
194
|
|
|
self.logger.info("Offline messages IDs: %s, history messages: %s", [m.id for m in off_messages], [m.id for m in history_messages]) |
|
195
|
|
|
else: |
|
196
|
|
|
history_messages = [] |
|
197
|
|
|
all = off_messages |
|
198
|
|
|
if self.restored_connection: |
|
199
|
|
|
off_messages = all |
|
200
|
|
|
history_messages = [] |
|
201
|
|
|
imv = get_message_images_videos(all) |
|
202
|
|
|
self.set_video_images_messages(imv, off_messages, off) |
|
203
|
|
|
self.set_video_images_messages(imv, history_messages, history) |
|
204
|
|
|
return off, history |
|
205
|
|
|
|
|
206
|
|
|
def set_video_images_messages(self, imv, inm, outm): |
|
207
|
|
|
for message in inm: |
|
208
|
|
|
files = MessagesCreator.prepare_img_video(imv, message.id) |
|
209
|
|
|
prep_m = self.create_message(message, files) |
|
210
|
|
|
outm.setdefault(message.room_id, []).append(prep_m) |
|
211
|
|
|
|
|
212
|
|
|
def check_origin(self, origin): |
|
213
|
|
|
""" |
|
214
|
|
|
check whether browser set domain matches origin |
|
215
|
|
|
""" |
|
216
|
|
|
parsed_origin = urlparse(origin) |
|
217
|
|
|
origin = parsed_origin.netloc |
|
218
|
|
|
origin_domain = origin.split(':')[0].lower() |
|
219
|
|
|
browser_set = self.request.headers.get("Host") |
|
220
|
|
|
browser_domain = browser_set.split(':')[0] |
|
221
|
|
|
return browser_domain == origin_domain |
|
222
|
|
|
|
|
223
|
|
|
def save_ip(self): |
|
224
|
|
|
""" |
|
225
|
|
|
This code is not used anymore |
|
226
|
|
|
""" |
|
227
|
|
|
if not do_db(UserJoinedInfo.objects.filter( |
|
228
|
|
|
Q(ip__ip=self.ip) & Q(user_id=self.user_id)).exists): |
|
229
|
|
|
res = get_or_create_ip_wrapper(self.ip, self.logger, self.fetch_and_save_ip_http) |
|
230
|
|
|
if res is not None: |
|
231
|
|
|
UserJoinedInfo.objects.create(ip=res, user_id=self.user_id) |
|
232
|
|
|
|
|
233
|
|
|
@asynchronous |
|
234
|
|
|
def fetch_and_save_ip_http(self): |
|
235
|
|
|
""" |
|
236
|
|
|
This code is not used anymore |
|
237
|
|
|
""" |
|
238
|
|
|
def fetch_response(response): |
|
239
|
|
|
try: |
|
240
|
|
|
ip_record = create_ip_structure(self.ip, response.body) |
|
241
|
|
|
except Exception as e: |
|
242
|
|
|
self.logger.error("Error while creating ip with country info, because %s", e) |
|
243
|
|
|
ip_record = IpAddress.objects.create(ip=self.ip) |
|
244
|
|
|
UserJoinedInfo.objects.create(ip=ip_record, user_id=self.user_id) |
|
245
|
|
|
r = HTTPRequest(settings.IP_API_URL % self.ip, method="GET") |
|
246
|
|
|
self.http_client.fetch(r, callback=fetch_response) |
|
247
|
|
|
|
|
248
|
|
|
def ws_write(self, message): |
|
249
|
|
|
""" |
|
250
|
|
|
Tries to send message, doesn't throw exception outside |
|
251
|
|
|
:type self: MessagesHandler |
|
252
|
|
|
:type message object |
|
253
|
|
|
""" |
|
254
|
|
|
# self.logger.debug('<< THREAD %s >>', os.getppid()) |
|
255
|
|
|
try: |
|
256
|
|
|
if isinstance(message, dict): |
|
257
|
|
|
message = json.dumps(message) |
|
258
|
|
|
if not isinstance(message, str_type): |
|
259
|
|
|
raise ValueError('Wrong message type : %s' % str(message)) |
|
260
|
|
|
self.logger.debug(">> %.1000s", message) |
|
261
|
|
|
self.write_message(message) |
|
262
|
|
|
except WebSocketClosedError as e: |
|
263
|
|
|
self.logger.warning("%s. Can't send message << %s >> ", e, str(message)) |
|
264
|
|
|
|
|
265
|
|
|
def get_client_ip(self): |
|
266
|
|
|
return self.request.headers.get("X-Real-IP") or self.request.remote_ip |