1
|
|
|
import json |
2
|
|
|
import logging |
3
|
|
|
import sys |
4
|
|
|
import time |
5
|
|
|
from time import mktime |
6
|
|
|
|
7
|
|
|
import redis |
8
|
|
|
import tornado.gen |
9
|
|
|
import tornado.httpclient |
10
|
|
|
import tornado.ioloop |
11
|
|
|
import tornado.web |
12
|
|
|
import tornado.websocket |
13
|
|
|
import tornadoredis |
14
|
|
|
from django.conf import settings |
15
|
|
|
from django.core.exceptions import ValidationError |
16
|
|
|
from django.db import connection, OperationalError, InterfaceError |
17
|
|
|
from django.db.models import Q |
18
|
|
|
from redis_sessions.session import SessionStore |
19
|
|
|
from tornado.websocket import WebSocketHandler |
20
|
|
|
|
21
|
|
|
from chat.log_filters import id_generator |
22
|
|
|
|
23
|
|
|
try: |
24
|
|
|
from urllib.parse import urlparse # py2 |
25
|
|
|
except ImportError: |
26
|
|
|
from urlparse import urlparse # py3 |
27
|
|
|
|
28
|
|
|
from chat.settings import MAX_MESSAGE_SIZE, ANONYMOUS_REDIS_ROOM |
29
|
|
|
from chat.models import User, Message, Room |
30
|
|
|
from chat.utils import check_user |
31
|
|
|
|
32
|
|
|
PY3 = sys.version > '3' |
33
|
|
|
|
34
|
|
|
user_cookie_name = settings.USER_COOKIE_NAME |
35
|
|
|
|
36
|
|
|
ANONYMOUS_GENDER = 'Alien' |
37
|
|
|
SESSION_USER_VAR_KEY = 'user_name' |
38
|
|
|
|
39
|
|
|
MESSAGE_ID_VAR_NAME = 'id' |
40
|
|
|
RECEIVER_USERNAME_VAR_NAME = 'receiverName' |
41
|
|
|
RECEIVER_USERID_VAR_NAME = 'receiverId' |
42
|
|
|
COUNT_VAR_NAME = 'count' |
43
|
|
|
HEADER_ID_VAR_NAME = 'headerId' |
44
|
|
|
USER_VAR_NAME = 'user' |
45
|
|
|
USER_ID_VAR_NAME = 'userId' |
46
|
|
|
TIME_VAR_NAME = 'time' |
47
|
|
|
OLD_NAME_VAR_NAME = 'oldName' |
48
|
|
|
IS_ANONYMOUS_VAR_NAME = 'anonymous' |
49
|
|
|
CONTENT_VAR_NAME = 'content' |
50
|
|
|
EVENT_VAR_NAME = 'action' |
51
|
|
|
GENDER_VAR_NAME = 'sex' |
52
|
|
|
|
53
|
|
|
REFRESH_USER_EVENT = 'onlineUsers' |
54
|
|
|
SYSTEM_MESSAGE_EVENT = 'system' |
55
|
|
|
GET_MESSAGES_EVENT = 'messages' |
56
|
|
|
GET_MINE_USERNAME_EVENT = 'me' |
57
|
|
|
ROOMS_EVENT = 'rooms' # thread ex "main" , channel ex. 'r:main', "i:3" |
58
|
|
|
LOGIN_EVENT = 'joined' |
59
|
|
|
LOGOUT_EVENT = 'left' |
60
|
|
|
SEND_MESSAGE_EVENT = 'send' |
61
|
|
|
CHANGE_ANONYMOUS_NAME_EVENT = 'changed' |
62
|
|
|
|
63
|
|
|
REDIS_USERNAME_CHANNEL_PREFIX = 'u:%s' |
64
|
|
|
REDIS_USERID_CHANNEL_PREFIX = 'i:%s' |
65
|
|
|
REDIS_ROOM_CHANNEL_PREFIX = 'r:%d' |
66
|
|
|
REDIS_ONLINE_USERS = "online_users" |
67
|
|
|
|
68
|
|
|
|
69
|
|
|
# global connection to read synchronously |
70
|
|
|
sync_redis = redis.StrictRedis() |
71
|
|
|
# Redis connection cannot be shared between publishers and subscribers. |
72
|
|
|
async_redis_publisher = tornadoredis.Client() |
73
|
|
|
async_redis_publisher.connect() |
74
|
|
|
sync_redis.delete(REDIS_ONLINE_USERS) # TODO move it somewhere else |
75
|
|
|
|
76
|
|
|
try: |
77
|
|
|
anonymous_default_room = Room.objects.get(name=ANONYMOUS_REDIS_ROOM) |
78
|
|
|
except Room.DoesNotExist: |
79
|
|
|
anonymous_default_room = Room() |
80
|
|
|
anonymous_default_room.name = ANONYMOUS_REDIS_ROOM |
81
|
|
|
anonymous_default_room.save() |
82
|
|
|
|
83
|
|
|
ANONYMOUS_REDIS_CHANNEL = REDIS_ROOM_CHANNEL_PREFIX % anonymous_default_room.id |
84
|
|
|
ANONYMOUS_ROOM_NAMES = {anonymous_default_room.id: anonymous_default_room.name} |
85
|
|
|
|
86
|
|
|
sessionStore = SessionStore() |
87
|
|
|
|
88
|
|
|
logger = logging.getLogger(__name__) |
89
|
|
|
|
90
|
|
|
# TODO https://github.com/leporo/tornado-redis#connection-pool-support |
91
|
|
|
CONNECTION_POOL = tornadoredis.ConnectionPool( |
92
|
|
|
max_connections=500, |
93
|
|
|
wait_for_available=True) |
94
|
|
|
|
95
|
|
|
|
96
|
|
|
class MessagesCreator(object): |
97
|
|
|
|
98
|
|
|
def __init__(self, *args, **kwargs): |
99
|
|
|
super(MessagesCreator, self).__init__(*args, **kwargs) |
100
|
|
|
self.sex = ANONYMOUS_GENDER |
101
|
|
|
self.sender_name = None |
102
|
|
|
self.user_id = 0 # anonymous by default |
103
|
|
|
|
104
|
|
|
def online_user_names(self, user_names_dict, action): |
105
|
|
|
""" |
106
|
|
|
:type user_names_dict: dict |
107
|
|
|
:return: { Nick: male, NewName: alien, Joana: female} |
108
|
|
|
""" |
109
|
|
|
default_message = self.default(user_names_dict, action) |
110
|
|
|
default_message.update({ |
111
|
|
|
USER_VAR_NAME: self.sender_name, |
112
|
|
|
IS_ANONYMOUS_VAR_NAME: self.sex == ANONYMOUS_GENDER |
113
|
|
|
}) |
114
|
|
|
return default_message |
115
|
|
|
|
116
|
|
|
def change_user_nickname(self, old_nickname, online): |
117
|
|
|
""" |
118
|
|
|
set self.sender_name to new nickname before call it |
119
|
|
|
:return: {action : changed, content: { Nick: male, NewName: alien}, oldName : OldName, user: NewName} |
120
|
|
|
:type old_nickname: str |
121
|
|
|
:type online: dict |
122
|
|
|
""" |
123
|
|
|
default_message = self.online_user_names(online, CHANGE_ANONYMOUS_NAME_EVENT) |
124
|
|
|
default_message[OLD_NAME_VAR_NAME] = old_nickname |
125
|
|
|
return default_message |
126
|
|
|
|
127
|
|
|
@classmethod |
128
|
|
|
def default(cls, content, event=SYSTEM_MESSAGE_EVENT): |
129
|
|
|
""" |
130
|
|
|
:return: {"action": event, "content": content, "time": "20:48:57"} |
131
|
|
|
""" |
132
|
|
|
return { |
133
|
|
|
EVENT_VAR_NAME: event, |
134
|
|
|
CONTENT_VAR_NAME: content, |
135
|
|
|
TIME_VAR_NAME: cls.get_miliseconds() |
136
|
|
|
} |
137
|
|
|
|
138
|
|
|
@classmethod |
139
|
|
|
def create_send_message(cls, message): |
140
|
|
|
""" |
141
|
|
|
:type message: Message |
142
|
|
|
""" |
143
|
|
|
result = cls.get_message(message) |
144
|
|
|
result[EVENT_VAR_NAME] = SEND_MESSAGE_EVENT |
145
|
|
|
return result |
146
|
|
|
|
147
|
|
|
@classmethod |
148
|
|
|
def get_message(cls, message): |
149
|
|
|
""" |
150
|
|
|
:param message: |
151
|
|
|
:return: "action": "joined", "content": {"v5bQwtWp": "alien", "tRD6emzs": "Alien"}, |
152
|
|
|
"sex": "Alien", "user": "tRD6emzs", "time": "20:48:57"} |
153
|
|
|
""" |
154
|
|
|
result = { |
155
|
|
|
USER_VAR_NAME: message.sender.username, |
156
|
|
|
USER_ID_VAR_NAME: message.sender.id, |
157
|
|
|
CONTENT_VAR_NAME: message.content, |
158
|
|
|
TIME_VAR_NAME: cls.get_miliseconds(message), |
159
|
|
|
MESSAGE_ID_VAR_NAME: message.id, |
160
|
|
|
} |
161
|
|
|
if message.receiver is not None: |
162
|
|
|
result[RECEIVER_USERNAME_VAR_NAME] = message.receiver.username |
163
|
|
|
return result |
164
|
|
|
|
165
|
|
|
@classmethod |
166
|
|
|
def get_messages(cls, messages): |
167
|
|
|
""" |
168
|
|
|
:type messages: list[Messages] |
169
|
|
|
:type messages: QuerySet[Messages] |
170
|
|
|
""" |
171
|
|
|
return { |
172
|
|
|
CONTENT_VAR_NAME: [cls.create_send_message(message) for message in messages], |
173
|
|
|
EVENT_VAR_NAME: GET_MESSAGES_EVENT |
174
|
|
|
} |
175
|
|
|
|
176
|
|
|
def send_anonymous(self, content, receiver_anonymous, receiver_id): |
177
|
|
|
default_message = self.default(content, SEND_MESSAGE_EVENT) |
178
|
|
|
default_message[USER_VAR_NAME] = self.sender_name |
179
|
|
|
if receiver_anonymous is not None: |
180
|
|
|
default_message[RECEIVER_USERNAME_VAR_NAME] = receiver_anonymous |
181
|
|
|
default_message[RECEIVER_USERID_VAR_NAME] = receiver_id |
182
|
|
|
return default_message |
183
|
|
|
|
184
|
|
|
@property |
185
|
|
|
def stored_redis_user(self): |
186
|
|
|
return '%s:%s:%d' % (self.sender_name, self.sex, self.user_id) |
187
|
|
|
|
188
|
|
|
@property |
189
|
|
|
def channel(self): |
190
|
|
|
if self.user_id == 0: |
191
|
|
|
return REDIS_USERNAME_CHANNEL_PREFIX % self.sender_name |
192
|
|
|
else: |
193
|
|
|
return REDIS_USERID_CHANNEL_PREFIX % self.user_id |
194
|
|
|
|
195
|
|
|
@staticmethod |
196
|
|
|
def online_js_structure(name, sex, user_id): |
197
|
|
|
return { |
198
|
|
|
name: { |
199
|
|
|
GENDER_VAR_NAME: sex, |
200
|
|
|
USER_ID_VAR_NAME: user_id |
201
|
|
|
} |
202
|
|
|
} |
203
|
|
|
|
204
|
|
|
@property |
205
|
|
|
def online_self_js_structure(self): |
206
|
|
|
return self.online_js_structure(self.sender_name, self.sex, self.user_id) |
207
|
|
|
|
208
|
|
|
@staticmethod |
209
|
|
|
def get_miliseconds(dt=None): |
210
|
|
|
if dt is None: |
211
|
|
|
return int(time.time()*1000) |
212
|
|
|
if dt.time.timestamp: |
213
|
|
|
return int(dt.time.timestamp()*1000) |
214
|
|
|
else: |
215
|
|
|
return mktime(dt.time.timetuple())*1000 + int(dt.time.microsecond/1000), |
216
|
|
|
|
217
|
|
|
|
218
|
|
|
class MessagesHandler(MessagesCreator): |
219
|
|
|
|
220
|
|
|
def __init__(self, *args, **kwargs): |
221
|
|
|
super(MessagesHandler, self).__init__(*args, **kwargs) |
222
|
|
|
self.log_id = str(id(self) % 10000).rjust(4, '0') |
223
|
|
|
log_params = { |
224
|
|
|
'username': '00000000', |
225
|
|
|
'id': self.log_id, |
226
|
|
|
'ip': 'initializing' |
227
|
|
|
} |
228
|
|
|
self.logger = logging.LoggerAdapter(logger, log_params) |
229
|
|
|
self.async_redis = tornadoredis.Client() |
230
|
|
|
self.process_message = { |
231
|
|
|
GET_MINE_USERNAME_EVENT: self.process_change_username, |
232
|
|
|
GET_MESSAGES_EVENT: self.process_get_messages, |
233
|
|
|
SEND_MESSAGE_EVENT: self.process_send_message, |
234
|
|
|
} |
235
|
|
|
|
236
|
|
|
def do_db(self, callback, *arg, **args): |
237
|
|
|
try: |
238
|
|
|
return callback(*arg, **args) |
239
|
|
|
except (OperationalError, InterfaceError) as e: # Connection has gone away |
240
|
|
|
self.logger.warning('%s, reconnecting' % e) # TODO |
241
|
|
|
connection.close() |
242
|
|
|
return callback( *arg, **args) |
243
|
|
|
|
244
|
|
|
def get_online_from_redis(self, check_name=None, check_id=None): |
245
|
|
|
""" |
246
|
|
|
:rtype : dict |
247
|
|
|
returns (dict, bool) if check_type is present |
248
|
|
|
""" |
249
|
|
|
online = sync_redis.hgetall(REDIS_ONLINE_USERS) |
250
|
|
|
self.logger.debug('!! redis online: %s', online) |
251
|
|
|
result = {} |
252
|
|
|
user_is_online = False |
253
|
|
|
# redis stores REDIS_USER_FORMAT, so parse them |
254
|
|
|
if online: |
255
|
|
|
for key, raw_user_sex in online.items(): # py2 iteritems |
256
|
|
|
(name, sex, user_id) = raw_user_sex.decode('utf-8').split(':') |
257
|
|
|
if name == check_name and check_id != int(key.decode('utf-8')): |
258
|
|
|
user_is_online = True |
259
|
|
|
result.update(self.online_js_structure(name, sex, user_id)) |
260
|
|
|
if check_id: |
261
|
|
|
return result, user_is_online |
262
|
|
|
else: |
263
|
|
|
return result |
264
|
|
|
|
265
|
|
|
def add_online_user(self): |
266
|
|
|
""" |
267
|
|
|
adds to redis |
268
|
|
|
online_users = { connection_hash1 = stored_redis_user1, connection_hash_2 = stored_redis_user2 } |
269
|
|
|
:return: |
270
|
|
|
""" |
271
|
|
|
online = self.get_online_from_redis() |
272
|
|
|
async_redis_publisher.hset(REDIS_ONLINE_USERS, id(self), self.stored_redis_user) |
273
|
|
|
first_tab = False |
274
|
|
|
if self.sender_name not in online: # if a new tab has been opened |
275
|
|
|
online.update(self.online_self_js_structure) |
276
|
|
|
first_tab = True |
277
|
|
|
|
278
|
|
|
if first_tab: # Login event, sent user names to all |
279
|
|
|
online_user_names_mes = self.online_user_names(online, LOGIN_EVENT) |
280
|
|
|
self.logger.info('!! First tab, sending refresh online for all') |
281
|
|
|
self.publish(online_user_names_mes) |
282
|
|
|
else: # Send user names to self |
283
|
|
|
online_user_names_mes = self.online_user_names(online, REFRESH_USER_EVENT) |
284
|
|
|
self.logger.info('!! Second tab, retrieving online for self') |
285
|
|
|
self.safe_write(online_user_names_mes) |
286
|
|
|
# send usernamechat |
287
|
|
|
username_message = self.default(self.sender_name, GET_MINE_USERNAME_EVENT) |
288
|
|
|
self.safe_write(username_message) |
289
|
|
|
|
290
|
|
|
def set_username(self, session_key): |
291
|
|
|
""" |
292
|
|
|
Case registered: Fetch userName and its channels from database. returns them |
293
|
|
|
Case anonymous: generates a new name and saves it to session. returns default channel |
294
|
|
|
:return: channels user should subscribe |
295
|
|
|
""" |
296
|
|
|
session = SessionStore(session_key) |
297
|
|
|
try: |
298
|
|
|
self.user_id = int(session["_auth_user_id"]) |
299
|
|
|
user_db = self.do_db(User.objects.get, id=self.user_id) # everything but 0 is a registered user |
300
|
|
|
self.sender_name = user_db.username |
301
|
|
|
self.sex = user_db.sex_str |
302
|
|
|
rooms = user_db.rooms.all() # do_db is used already |
303
|
|
|
room_names = {} |
304
|
|
|
channels = [self.channel, ] |
305
|
|
|
for room in rooms: |
306
|
|
|
room_names[room.id] = room.name |
307
|
|
|
channels.append(REDIS_ROOM_CHANNEL_PREFIX % room.id) |
308
|
|
|
rooms_message = self.default(room_names, ROOMS_EVENT) |
309
|
|
|
self.logger.info("!! User %s subscribes for %s", self.sender_name, room_names) |
310
|
|
|
except (KeyError, User.DoesNotExist): |
311
|
|
|
# Anonymous |
312
|
|
|
self.sender_name = session.get(SESSION_USER_VAR_KEY) |
313
|
|
|
if self.sender_name is None: |
314
|
|
|
self.sender_name = id_generator(8) |
315
|
|
|
session[SESSION_USER_VAR_KEY] = self.sender_name |
316
|
|
|
session.save() |
317
|
|
|
self.logger.info("!! A new user log in, created username %s", self.sender_name) |
318
|
|
|
else: |
319
|
|
|
self.logger.info("!! Anonymous with name %s has logged", self.sender_name) |
320
|
|
|
channels = [ANONYMOUS_REDIS_CHANNEL, self.channel] |
321
|
|
|
rooms_message = self.default(ANONYMOUS_ROOM_NAMES, ROOMS_EVENT) |
322
|
|
|
finally: |
323
|
|
|
self.safe_write(rooms_message) |
324
|
|
|
return channels |
325
|
|
|
|
326
|
|
|
def publish(self, message, channel=ANONYMOUS_REDIS_CHANNEL): |
327
|
|
|
jsoned_mess = json.dumps(message) |
328
|
|
|
self.logger.debug('<%s> %s', channel, jsoned_mess) |
329
|
|
|
async_redis_publisher.publish(channel, jsoned_mess) |
330
|
|
|
|
331
|
|
|
# TODO really parse every single message for 1 action? |
332
|
|
|
def check_and_finish_change_name(self, message): |
333
|
|
|
if self.sex == ANONYMOUS_GENDER: |
334
|
|
|
parsed_message = json.loads(message) |
335
|
|
|
if parsed_message[EVENT_VAR_NAME] == GET_MINE_USERNAME_EVENT: |
336
|
|
|
self.async_redis.unsubscribe(REDIS_USERNAME_CHANNEL_PREFIX % self.sender_name) # TODO is it allowed? |
337
|
|
|
self.sender_name = parsed_message[CONTENT_VAR_NAME] |
338
|
|
|
self.async_redis.subscribe(REDIS_USERNAME_CHANNEL_PREFIX % self.sender_name) |
339
|
|
|
async_redis_publisher.hset(REDIS_ONLINE_USERS, id(self), self.stored_redis_user) |
340
|
|
|
|
341
|
|
|
def new_message(self, message): |
342
|
|
|
if type(message.body) is not int: # subscribe event |
343
|
|
|
self.safe_write(message.body) |
344
|
|
|
self.check_and_finish_change_name(message.body) |
345
|
|
|
|
346
|
|
|
def safe_write(self, message): |
347
|
|
|
raise NotImplementedError('WebSocketHandler implements') |
348
|
|
|
|
349
|
|
|
def process_send_message(self, message): |
350
|
|
|
""" |
351
|
|
|
:type message: dict |
352
|
|
|
""" |
353
|
|
|
content = message[CONTENT_VAR_NAME] |
354
|
|
|
receiver_id = message.get(RECEIVER_USERID_VAR_NAME) # if receiver_id is None then its a private message |
355
|
|
|
receiver_name = message.get(RECEIVER_USERNAME_VAR_NAME) |
356
|
|
|
self.logger.info('!! Sending message %s to username:%s, id:%s', content, receiver_name, receiver_id) |
357
|
|
|
save_to_db = True |
358
|
|
|
if receiver_id is not None and receiver_id != 0: |
359
|
|
|
receiver_channel = REDIS_USERID_CHANNEL_PREFIX % receiver_id |
360
|
|
|
elif receiver_name is not None: |
361
|
|
|
receiver_channel = REDIS_USERNAME_CHANNEL_PREFIX % receiver_name |
362
|
|
|
save_to_db = False |
363
|
|
|
|
364
|
|
|
if self.user_id != 0 and save_to_db: |
365
|
|
|
self.logger.debug('!! Saving it to db') |
366
|
|
|
message_db = Message(sender_id=self.user_id, content=content, receiver_id=receiver_id) |
367
|
|
|
self.do_db(message_db.save) # exit on hacked id with exception |
368
|
|
|
prepared_message = self.create_send_message(message_db) |
369
|
|
|
else: |
370
|
|
|
self.logger.debug('!! NOT saving it') |
371
|
|
|
prepared_message = self.send_anonymous(content, receiver_name, receiver_id) |
372
|
|
|
|
373
|
|
|
if receiver_id is None: |
374
|
|
|
self.logger.debug('!! Detected as public') |
375
|
|
|
self.publish(prepared_message) |
376
|
|
|
else: |
377
|
|
|
self.publish(prepared_message, self.channel) |
378
|
|
|
self.logger.debug('!! Detected as private, channel %s', receiver_channel) |
379
|
|
|
if receiver_channel != self.channel: |
380
|
|
|
self.publish(prepared_message, receiver_channel) |
381
|
|
|
|
382
|
|
|
def process_change_username(self, message): |
383
|
|
|
""" |
384
|
|
|
:type message: dict |
385
|
|
|
""" |
386
|
|
|
self.logger.info('!! Changing username to %s', message[CONTENT_VAR_NAME]) |
387
|
|
|
new_username = message[CONTENT_VAR_NAME] |
388
|
|
|
try: |
389
|
|
|
check_user(new_username) |
390
|
|
|
online = self.get_online_from_redis() |
391
|
|
|
if new_username in online: |
392
|
|
|
self.logger.info('!! This name is already used') |
393
|
|
|
raise ValidationError('Anonymous already has this name') |
394
|
|
|
session_key = self.get_cookie(settings.SESSION_COOKIE_NAME) |
395
|
|
|
session = SessionStore(session_key) |
396
|
|
|
session[SESSION_USER_VAR_KEY] = new_username |
397
|
|
|
session.save() |
398
|
|
|
|
399
|
|
|
del online[self.sender_name] |
400
|
|
|
old_name = self.sender_name |
401
|
|
|
old_channel = self.channel |
402
|
|
|
self.sender_name = new_username # change_user_name required new_username in sender_name |
403
|
|
|
online.update(self.online_self_js_structure) |
404
|
|
|
message_all = self.change_user_nickname(old_name, online) |
405
|
|
|
message_me = self.default(new_username, GET_MINE_USERNAME_EVENT) |
406
|
|
|
# TODO perform ton of checks or emit twice ? |
407
|
|
|
self.publish(message_me, self.channel) # to new user channel |
408
|
|
|
self.publish(message_me, old_channel) # to old user channel |
409
|
|
|
self.publish(message_all) |
410
|
|
|
except ValidationError as e: |
411
|
|
|
self.safe_write(self.default(str(e.message))) |
412
|
|
|
|
413
|
|
|
def process_get_messages(self, data): |
414
|
|
|
""" |
415
|
|
|
:type data: dict |
416
|
|
|
""" |
417
|
|
|
header_id = data.get(HEADER_ID_VAR_NAME, None) |
418
|
|
|
count = int(data.get(COUNT_VAR_NAME, 10)) |
419
|
|
|
self.logger.info('!! Fetching %d messages starting from %s', count, header_id) |
420
|
|
|
if header_id is None: |
421
|
|
|
messages = Message.objects.filter( |
422
|
|
|
Q(receiver=None) # Only public |
423
|
|
|
| Q(sender=self.user_id) # private s |
424
|
|
|
| Q(receiver=self.user_id) # and private |
425
|
|
|
).order_by('-pk')[:count] |
426
|
|
|
else: |
427
|
|
|
messages = Message.objects.filter( |
428
|
|
|
Q(id__lt=header_id), |
429
|
|
|
Q(receiver=None) |
430
|
|
|
| Q(sender=self.user_id) |
431
|
|
|
| Q(receiver=self.user_id) |
432
|
|
|
).order_by('-pk')[:count] |
433
|
|
|
response = self.do_db(self.get_messages, messages) |
434
|
|
|
self.safe_write(response) |
435
|
|
|
|
436
|
|
|
|
437
|
|
|
class AntiSpam: |
438
|
|
|
|
439
|
|
|
def __init__(self): |
440
|
|
|
self.spammed = 0 |
441
|
|
|
self.info = {} |
442
|
|
|
|
443
|
|
|
def check_spam(self, json_message): |
444
|
|
|
message_length = len(json_message) |
445
|
|
|
info_key = int(round(time.time() * 100)) |
446
|
|
|
self.info[info_key] = message_length |
447
|
|
|
if message_length > MAX_MESSAGE_SIZE: |
448
|
|
|
self.spammed += 1 |
449
|
|
|
raise ValidationError("Message can't exceed %s symbols" % MAX_MESSAGE_SIZE) |
450
|
|
|
self.check_timed_spam() |
451
|
|
|
|
452
|
|
|
def check_timed_spam(self): |
453
|
|
|
# TODO implement me |
454
|
|
|
pass |
455
|
|
|
# raise ValidationError("You're chatting too much, calm down a bit!") |
456
|
|
|
|
457
|
|
|
|
458
|
|
|
class TornadoHandler(WebSocketHandler, MessagesHandler): |
459
|
|
|
|
460
|
|
|
def __init__(self, *args, **kwargs): |
461
|
|
|
super(TornadoHandler, self).__init__(*args, **kwargs) |
462
|
|
|
self.connected = False |
463
|
|
|
self.anti_spam = AntiSpam() |
464
|
|
|
|
465
|
|
|
@tornado.gen.engine |
466
|
|
|
def listen(self, channels): |
467
|
|
|
""" |
468
|
|
|
self.channel should been set before calling |
469
|
|
|
""" |
470
|
|
|
yield tornado.gen.Task( |
471
|
|
|
self.async_redis.subscribe, channels) |
472
|
|
|
self.async_redis.listen(self.new_message) |
473
|
|
|
|
474
|
|
|
def data_received(self, chunk): |
475
|
|
|
pass |
476
|
|
|
|
477
|
|
|
def on_message(self, json_message): |
478
|
|
|
try: |
479
|
|
|
if not self.connected: |
480
|
|
|
raise ValidationError('Skipping message %s, as websocket is not initialized yet' % json_message) |
481
|
|
|
if not json_message: |
482
|
|
|
raise ValidationError('Skipping null message') |
483
|
|
|
self.anti_spam.check_spam(json_message) |
484
|
|
|
self.logger.debug('<< %s', json_message) |
485
|
|
|
message = json.loads(json_message) |
486
|
|
|
self.process_message[message[EVENT_VAR_NAME]](message) |
487
|
|
|
except ValidationError as e: |
488
|
|
|
logger.warning("Message won't be send. Reason: %s", e.message) |
489
|
|
|
self.safe_write(self.default(e.message)) |
490
|
|
|
|
491
|
|
|
def on_close(self): |
492
|
|
|
try: |
493
|
|
|
self_id = id(self) |
494
|
|
|
async_redis_publisher.hdel(REDIS_ONLINE_USERS, self_id) |
495
|
|
|
if self.connected: |
496
|
|
|
# seems like async solves problem with connection lost and wrong data status |
497
|
|
|
# http://programmers.stackexchange.com/questions/294663/how-to-store-online-status |
498
|
|
|
online, is_online = self.get_online_from_redis(self.sender_name, self_id) |
499
|
|
|
self.logger.info('!! Closing connection, redis current online %s', online) |
500
|
|
|
if not is_online: |
501
|
|
|
message = self.online_user_names(online, LOGOUT_EVENT) |
502
|
|
|
self.logger.debug('!! User closed the last tab, refreshing online for all') |
503
|
|
|
self.publish(message) |
504
|
|
|
else: |
505
|
|
|
self.logger.debug('!! User is still online in other tabs') |
506
|
|
|
else: |
507
|
|
|
self.logger.warning('Dropping connection for not connected user') |
508
|
|
|
finally: |
509
|
|
|
if self.async_redis.subscribed: |
510
|
|
|
# TODO unsubscribe of all subscribed !IMPORTANT |
511
|
|
|
self.async_redis.unsubscribe([ |
512
|
|
|
ANONYMOUS_REDIS_CHANNEL, |
513
|
|
|
self.channel |
514
|
|
|
]) |
515
|
|
|
self.async_redis.disconnect() |
516
|
|
|
|
517
|
|
|
def open(self, *args, **kargs): |
518
|
|
|
session_key = self.get_cookie(settings.SESSION_COOKIE_NAME) |
519
|
|
|
if sessionStore.exists(session_key): |
520
|
|
|
self.logger.debug("!! Incoming connection, session %s, thread hash %s", session_key, id(self)) |
521
|
|
|
self.async_redis.connect() |
522
|
|
|
channels = self.set_username(session_key) |
523
|
|
|
log_params = { |
524
|
|
|
'username': self.sender_name.rjust(8), |
525
|
|
|
'id': self.log_id, |
526
|
|
|
'ip': self.get_client_ip() |
527
|
|
|
} |
528
|
|
|
self.logger = logging.LoggerAdapter(logger, log_params) |
529
|
|
|
self.listen(channels) |
530
|
|
|
self.add_online_user() |
531
|
|
|
self.connected = True |
532
|
|
|
else: |
533
|
|
|
self.logger.warning('!! Session key %s has been rejected', str(session_key)) |
534
|
|
|
self.close(403, "Session key %s has been rejected" % session_key) |
535
|
|
|
|
536
|
|
|
def check_origin(self, origin): |
537
|
|
|
""" |
538
|
|
|
check whether browser set domain matches origin |
539
|
|
|
""" |
540
|
|
|
parsed_origin = urlparse(origin) |
541
|
|
|
origin = parsed_origin.netloc |
542
|
|
|
origin_domain = origin.split(':')[0].lower() |
543
|
|
|
browser_set = self.request.headers.get("Host") |
544
|
|
|
browser_domain = browser_set.split(':')[0] |
545
|
|
|
return browser_domain == origin_domain |
546
|
|
|
|
547
|
|
|
def safe_write(self, message): |
548
|
|
|
""" |
549
|
|
|
Tries to send message, doesn't throw exception outside |
550
|
|
|
:type self: MessagesHandler |
551
|
|
|
""" |
552
|
|
|
try: |
553
|
|
|
if isinstance(message, dict): |
554
|
|
|
message = json.dumps(message) |
555
|
|
|
if not (isinstance(message, str) or (not PY3 and isinstance(message, unicode))): |
556
|
|
|
raise ValueError('Wrong message type : %s' % str(message)) |
557
|
|
|
self.logger.debug(">> %s", message) |
558
|
|
|
self.write_message(message) |
559
|
|
|
except tornado.websocket.WebSocketClosedError as e: |
560
|
|
|
self.logger.error("%s. Can't send << %s >> message", e, str(message)) |
561
|
|
|
|
562
|
|
|
def get_client_ip(self): |
563
|
|
|
x_real_ip = self.request.headers.get("X-Real-IP") |
564
|
|
|
return x_real_ip or self.request.remote_ip |
565
|
|
|
|
566
|
|
|
application = tornado.web.Application([ |
567
|
|
|
(r'.*', TornadoHandler), |
568
|
|
|
]) |
569
|
|
|
|