1
|
|
|
import json |
2
|
|
|
import logging |
3
|
|
|
from datetime import timedelta |
4
|
|
|
from numbers import Number |
5
|
|
|
from threading import Thread |
6
|
|
|
|
7
|
|
|
from django.conf import settings |
8
|
|
|
from django.core.exceptions import ValidationError |
9
|
|
|
from django.db.models import F, Q |
10
|
|
|
from redis_sessions.session import SessionStore |
11
|
|
|
from tornado import ioloop |
12
|
|
|
from tornado.websocket import WebSocketHandler, WebSocketClosedError |
13
|
|
|
|
14
|
|
|
from chat.cookies_middleware import create_id |
15
|
|
|
from chat.models import User, Message, UserJoinedInfo |
16
|
|
|
from chat.py2_3 import str_type, urlparse |
17
|
|
|
from chat.settings import UPDATE_LAST_READ_MESSAGE |
18
|
|
|
from chat.tornado.anti_spam import AntiSpam |
19
|
|
|
from chat.tornado.constants import VarNames, HandlerNames, Actions |
20
|
|
|
from chat.tornado.image_utils import get_message_images, prepare_img |
21
|
|
|
from chat.tornado.message_handler import MessagesHandler |
22
|
|
|
from chat.utils import execute_query, do_db, get_or_create_ip, get_users_in_current_user_rooms |
23
|
|
|
|
24
|
|
|
sessionStore = SessionStore() |
25
|
|
|
|
26
|
|
|
parent_logger = logging.getLogger(__name__) |
27
|
|
|
|
28
|
|
|
|
29
|
|
|
class TornadoHandler(WebSocketHandler, MessagesHandler): |
30
|
|
|
|
31
|
|
|
def __init__(self, *args, **kwargs): |
32
|
|
|
super(TornadoHandler, self).__init__(*args, **kwargs) |
33
|
|
|
self.__connected__ = False |
34
|
|
|
self.anti_spam = AntiSpam() |
35
|
|
|
|
36
|
|
|
@property |
37
|
|
|
def connected(self): |
38
|
|
|
return self.__connected__ |
39
|
|
|
|
40
|
|
|
@connected.setter |
41
|
|
|
def connected(self, value): |
42
|
|
|
self.__connected__ = value |
43
|
|
|
|
44
|
|
|
def data_received(self, chunk): |
45
|
|
|
pass |
46
|
|
|
|
47
|
|
|
def on_message(self, json_message): |
48
|
|
|
try: |
49
|
|
|
if not self.connected: |
50
|
|
|
raise ValidationError('Skipping message %s, as websocket is not initialized yet' % json_message) |
51
|
|
|
if not json_message: |
52
|
|
|
raise Exception('Skipping null message') |
53
|
|
|
# self.anti_spam.check_spam(json_message) |
54
|
|
|
self.logger.debug('<< %.1000s', json_message) |
55
|
|
|
message = json.loads(json_message) |
56
|
|
|
if message[VarNames.EVENT] not in self.pre_process_message: |
57
|
|
|
raise Exception("event {} is unknown".format(message[VarNames.EVENT])) |
58
|
|
|
channel = message.get(VarNames.CHANNEL) |
59
|
|
|
if channel and channel not in self.channels: |
60
|
|
|
raise Exception('Access denied for channel {}. Allowed channels: {}'.format(channel, self.channels)) |
61
|
|
|
self.pre_process_message[message[VarNames.EVENT]](message) |
62
|
|
|
except ValidationError as e: |
63
|
|
|
error_message = self.default(str(e.message), Actions.GROWL_MESSAGE, HandlerNames.GROWL) |
64
|
|
|
self.ws_write(error_message) |
65
|
|
|
|
66
|
|
|
def publish_logout(self, channel, log_data): |
67
|
|
|
# seems like async solves problem with connection lost and wrong data status |
68
|
|
|
# http://programmers.stackexchange.com/questions/294663/how-to-store-online-status |
69
|
|
|
online, is_online = self.get_online_from_redis(channel, True) |
70
|
|
|
log_data[channel] = {'online': online, 'is_online': is_online} |
71
|
|
|
if not is_online: |
72
|
|
|
message = self.room_online(online, Actions.LOGOUT, channel) |
73
|
|
|
self.publish(message, channel) |
74
|
|
|
return True |
75
|
|
|
|
76
|
|
|
def on_close(self): |
77
|
|
|
if self.async_redis.subscribed: |
78
|
|
|
self.logger.info("Close event, unsubscribing from %s", self.channels) |
79
|
|
|
self.async_redis.unsubscribe(self.channels) |
80
|
|
|
else: |
81
|
|
|
self.logger.info("Close event, not subscribed, channels: %s", self.channels) |
82
|
|
|
log_data = {} |
83
|
|
|
gone_offline = False |
84
|
|
|
for channel in self.channels: |
85
|
|
|
if not isinstance(channel, Number): |
86
|
|
|
continue |
87
|
|
|
self.sync_redis.srem(channel, self.id) |
88
|
|
|
if self.connected: |
89
|
|
|
gone_offline = self.publish_logout(channel, log_data) or gone_offline |
90
|
|
|
if gone_offline: |
91
|
|
|
res = do_db(execute_query, UPDATE_LAST_READ_MESSAGE, [self.user_id, ]) |
92
|
|
|
self.logger.info("Updated %s last read message", res) |
93
|
|
|
self.disconnect(json.dumps(log_data)) |
94
|
|
|
|
95
|
|
|
def disconnect(self, log_data, tries=0): |
96
|
|
|
""" |
97
|
|
|
Closes a connection if it's not in proggress, otherwice timeouts closing |
98
|
|
|
https://github.com/evilkost/brukva/issues/25#issuecomment-9468227 |
99
|
|
|
""" |
100
|
|
|
self.connected = False |
101
|
|
|
self.closed_channels = self.channels |
102
|
|
|
self.channels = [] |
103
|
|
|
if self.async_redis.connection.in_progress and tries < 1000: # failsafe eternal loop |
104
|
|
|
self.logger.debug('Closing a connection timeouts') |
105
|
|
|
ioloop.IOLoop.instance().add_timeout(timedelta(0.00001), self.disconnect, log_data, tries+1) |
106
|
|
|
else: |
107
|
|
|
self.logger.info("Close connection result: %s", log_data) |
108
|
|
|
self.async_redis.disconnect() |
109
|
|
|
|
110
|
|
|
def generate_self_id(self): |
111
|
|
|
""" |
112
|
|
|
When user opens new tab in browser wsHandler.wsConnectionId stores Id of current ws |
113
|
|
|
So if ws loses a connection it still can reconnect with same id, |
114
|
|
|
and TornadoHandler can restore webrtc_connections to previous state |
115
|
|
|
""" |
116
|
|
|
conn_arg = self.get_argument('id', None) |
117
|
|
|
self.id, random = create_id(self.user_id, conn_arg) |
118
|
|
|
if random != conn_arg: |
119
|
|
|
self.ws_write(self.set_ws_id(random, self.id)) |
120
|
|
|
|
121
|
|
|
def open(self): |
122
|
|
|
session_key = self.get_cookie(settings.SESSION_COOKIE_NAME) |
123
|
|
|
if sessionStore.exists(session_key): |
124
|
|
|
self.ip = self.get_client_ip() |
125
|
|
|
session = SessionStore(session_key) |
126
|
|
|
self.user_id = int(session["_auth_user_id"]) |
127
|
|
|
self.generate_self_id() |
128
|
|
|
log_params = { |
129
|
|
|
'id': self.id, |
130
|
|
|
'ip': self.ip |
131
|
|
|
} |
132
|
|
|
self._logger = logging.LoggerAdapter(parent_logger, log_params) |
133
|
|
|
self.logger.debug("!! Incoming connection, session %s, thread hash %s", session_key, self.id) |
134
|
|
|
self.async_redis.connect() |
135
|
|
|
user_db = do_db(User.objects.get, id=self.user_id) |
136
|
|
|
self.sender_name = user_db.username |
137
|
|
|
self.sex = user_db.sex_str |
138
|
|
|
user_rooms = get_users_in_current_user_rooms(self.user_id) |
139
|
|
|
self.ws_write(self.default(user_rooms, Actions.ROOMS, HandlerNames.CHANNELS)) |
140
|
|
|
# get all missed messages |
141
|
|
|
self.channels = [] # py2 doesn't support clear() |
142
|
|
|
self.channels.append(self.channel) |
143
|
|
|
self.channels.append(self.id) |
144
|
|
|
for room_id in user_rooms: |
145
|
|
|
self.channels.append(room_id) |
146
|
|
|
self.listen(self.channels) |
147
|
|
|
off_messages = self.get_offline_messages() |
148
|
|
|
for room_id in user_rooms: |
149
|
|
|
self.add_online_user(room_id, off_messages.get(room_id)) |
150
|
|
|
self.logger.info("!! User %s subscribes for %s", self.sender_name, self.channels) |
151
|
|
|
self.connected = True |
152
|
|
|
Thread(target=self.save_ip).start() |
153
|
|
|
else: |
154
|
|
|
self.logger.warning('!! Session key %s has been rejected', str(session_key)) |
155
|
|
|
self.close(403, "Session key %s has been rejected" % session_key) |
156
|
|
|
|
157
|
|
|
def get_offline_messages(self): |
158
|
|
|
res = {} |
159
|
|
|
off_mess = Message.objects.filter( |
160
|
|
|
id__gt=F('room__roomusers__last_read_message_id'), |
161
|
|
|
deleted=False, |
162
|
|
|
room__roomusers__user_id=self.user_id |
163
|
|
|
) |
164
|
|
|
images = do_db(get_message_images, off_mess) |
165
|
|
|
for message in off_mess: |
166
|
|
|
prep_m = self.create_message(message, prepare_img(images, message.id)) |
167
|
|
|
res.setdefault(message.room_id, []).append(prep_m) |
168
|
|
|
return res |
169
|
|
|
|
170
|
|
|
def check_origin(self, origin): |
171
|
|
|
""" |
172
|
|
|
check whether browser set domain matches origin |
173
|
|
|
""" |
174
|
|
|
parsed_origin = urlparse(origin) |
175
|
|
|
origin = parsed_origin.netloc |
176
|
|
|
origin_domain = origin.split(':')[0].lower() |
177
|
|
|
browser_set = self.request.headers.get("Host") |
178
|
|
|
browser_domain = browser_set.split(':')[0] |
179
|
|
|
return browser_domain == origin_domain |
180
|
|
|
|
181
|
|
|
def save_ip(self): |
182
|
|
|
if (do_db(UserJoinedInfo.objects.filter( |
183
|
|
|
Q(ip__ip=self.ip) & Q(user_id=self.user_id)).exists)): |
184
|
|
|
return |
185
|
|
|
ip_address = get_or_create_ip(self.ip, self.logger) |
186
|
|
|
UserJoinedInfo.objects.create( |
187
|
|
|
ip=ip_address, |
188
|
|
|
user_id=self.user_id |
189
|
|
|
) |
190
|
|
|
|
191
|
|
|
def ws_write(self, message): |
192
|
|
|
""" |
193
|
|
|
Tries to send message, doesn't throw exception outside |
194
|
|
|
:type self: MessagesHandler |
195
|
|
|
:type message object |
196
|
|
|
""" |
197
|
|
|
# self.logger.debug('<< THREAD %s >>', os.getppid()) |
198
|
|
|
try: |
199
|
|
|
if isinstance(message, dict): |
200
|
|
|
message = json.dumps(message) |
201
|
|
|
if not isinstance(message, str_type): |
202
|
|
|
raise ValueError('Wrong message type : %s' % str(message)) |
203
|
|
|
self.logger.debug(">> %.1000s", message) |
204
|
|
|
self.write_message(message) |
205
|
|
|
except WebSocketClosedError as e: |
206
|
|
|
self.logger.error("%s. Can't send << %s >> message", e, str(message)) |
207
|
|
|
|
208
|
|
|
def get_client_ip(self): |
209
|
|
|
return self.request.headers.get("X-Real-IP") or self.request.remote_ip |