1
|
|
|
import json |
2
|
|
|
import logging |
3
|
|
|
import sys |
4
|
|
|
from threading import Thread |
5
|
|
|
from urllib.request import urlopen |
6
|
|
|
|
7
|
|
|
import redis |
8
|
|
|
import time |
9
|
|
|
import tornado.gen |
10
|
|
|
import tornado.httpclient |
11
|
|
|
import tornado.ioloop |
12
|
|
|
import tornado.web |
13
|
|
|
import tornado.websocket |
14
|
|
|
import tornadoredis |
15
|
|
|
from django.conf import settings |
16
|
|
|
from django.core.exceptions import ValidationError |
17
|
|
|
from django.db import connection, OperationalError, InterfaceError |
18
|
|
|
from django.db.models import Q |
19
|
|
|
from redis_sessions.session import SessionStore |
20
|
|
|
from tornado.websocket import WebSocketHandler |
21
|
|
|
|
22
|
|
|
from chat.log_filters import id_generator |
23
|
|
|
|
24
|
|
|
try: |
25
|
|
|
from urllib.parse import urlparse # py2 |
26
|
|
|
except ImportError: |
27
|
|
|
from urlparse import urlparse # py3 |
28
|
|
|
|
29
|
|
|
from chat.settings import MAX_MESSAGE_SIZE, ANONYMOUS_REDIS_ROOM |
30
|
|
|
from chat.models import User, Message, Room, IpAddress, get_milliseconds, UserJoinedInfo |
31
|
|
|
from chat.utils import check_user |
32
|
|
|
|
33
|
|
|
PY3 = sys.version > '3' |
34
|
|
|
|
35
|
|
|
user_cookie_name = settings.USER_COOKIE_NAME |
36
|
|
|
api_url = getattr(settings, "IP_API_URL", None) |
37
|
|
|
|
38
|
|
|
ANONYMOUS_GENDER = 'Alien' |
39
|
|
|
SESSION_USER_VAR_KEY = 'user_name' |
40
|
|
|
|
41
|
|
|
MESSAGE_ID_VAR_NAME = 'id' |
42
|
|
|
RECEIVER_USERNAME_VAR_NAME = 'receiverName' |
43
|
|
|
RECEIVER_USERID_VAR_NAME = 'receiverId' |
44
|
|
|
COUNT_VAR_NAME = 'count' |
45
|
|
|
HEADER_ID_VAR_NAME = 'headerId' |
46
|
|
|
USER_VAR_NAME = 'user' |
47
|
|
|
USER_ID_VAR_NAME = 'userId' |
48
|
|
|
TIME_VAR_NAME = 'time' |
49
|
|
|
OLD_NAME_VAR_NAME = 'oldName' |
50
|
|
|
IS_ANONYMOUS_VAR_NAME = 'anonymous' |
51
|
|
|
CONTENT_VAR_NAME = 'content' |
52
|
|
|
EVENT_VAR_NAME = 'action' |
53
|
|
|
GENDER_VAR_NAME = 'sex' |
54
|
|
|
|
55
|
|
|
REFRESH_USER_EVENT = 'onlineUsers' |
56
|
|
|
SYSTEM_MESSAGE_EVENT = 'system' |
57
|
|
|
GROWL_MESSAGE_EVENT = 'growl' |
58
|
|
|
GET_MESSAGES_EVENT = 'messages' |
59
|
|
|
GET_MINE_USERNAME_EVENT = 'me' |
60
|
|
|
ROOMS_EVENT = 'rooms' # thread ex "main" , channel ex. 'r:main', "i:3" |
61
|
|
|
LOGIN_EVENT = 'joined' |
62
|
|
|
LOGOUT_EVENT = 'left' |
63
|
|
|
SEND_MESSAGE_EVENT = 'send' |
64
|
|
|
CHANGE_ANONYMOUS_NAME_EVENT = 'changed' |
65
|
|
|
|
66
|
|
|
REDIS_USERNAME_CHANNEL_PREFIX = 'u:%s' |
67
|
|
|
REDIS_USERID_CHANNEL_PREFIX = 'i:%s' |
68
|
|
|
REDIS_ROOM_CHANNEL_PREFIX = 'r:%d' |
69
|
|
|
REDIS_ONLINE_USERS = "online_users" |
70
|
|
|
|
71
|
|
|
|
72
|
|
|
# global connection to read synchronously |
73
|
|
|
sync_redis = redis.StrictRedis() |
74
|
|
|
# Redis connection cannot be shared between publishers and subscribers. |
75
|
|
|
async_redis_publisher = tornadoredis.Client() |
76
|
|
|
async_redis_publisher.connect() |
77
|
|
|
sync_redis.delete(REDIS_ONLINE_USERS) # TODO move it somewhere else |
78
|
|
|
|
79
|
|
|
try: |
80
|
|
|
anonymous_default_room = Room.objects.get(name=ANONYMOUS_REDIS_ROOM) |
81
|
|
|
except Room.DoesNotExist: |
82
|
|
|
anonymous_default_room = Room() |
83
|
|
|
anonymous_default_room.name = ANONYMOUS_REDIS_ROOM |
84
|
|
|
anonymous_default_room.save() |
85
|
|
|
|
86
|
|
|
ANONYMOUS_REDIS_CHANNEL = REDIS_ROOM_CHANNEL_PREFIX % anonymous_default_room.id |
87
|
|
|
ANONYMOUS_ROOM_NAMES = {anonymous_default_room.id: anonymous_default_room.name} |
88
|
|
|
|
89
|
|
|
sessionStore = SessionStore() |
90
|
|
|
|
91
|
|
|
logger = logging.getLogger(__name__) |
92
|
|
|
|
93
|
|
|
# TODO https://github.com/leporo/tornado-redis#connection-pool-support |
94
|
|
|
CONNECTION_POOL = tornadoredis.ConnectionPool( |
95
|
|
|
max_connections=500, |
96
|
|
|
wait_for_available=True) |
97
|
|
|
|
98
|
|
|
|
99
|
|
|
class MessagesCreator(object): |
100
|
|
|
|
101
|
|
|
def __init__(self, *args, **kwargs): |
102
|
|
|
super(MessagesCreator, self).__init__(*args, **kwargs) |
103
|
|
|
self.sex = ANONYMOUS_GENDER |
104
|
|
|
self.sender_name = None |
105
|
|
|
self.user_id = 0 # anonymous by default |
106
|
|
|
|
107
|
|
|
def online_user_names(self, user_names_dict, action): |
108
|
|
|
""" |
109
|
|
|
:type user_names_dict: dict |
110
|
|
|
:return: { Nick: male, NewName: alien, Joana: female} |
111
|
|
|
""" |
112
|
|
|
default_message = self.default(user_names_dict, action) |
113
|
|
|
default_message.update({ |
114
|
|
|
USER_VAR_NAME: self.sender_name, |
115
|
|
|
IS_ANONYMOUS_VAR_NAME: self.sex == ANONYMOUS_GENDER |
116
|
|
|
}) |
117
|
|
|
return default_message |
118
|
|
|
|
119
|
|
|
def change_user_nickname(self, old_nickname, online): |
120
|
|
|
""" |
121
|
|
|
set self.sender_name to new nickname before call it |
122
|
|
|
:return: {action : changed, content: { Nick: male, NewName: alien}, oldName : OldName, user: NewName} |
123
|
|
|
:type old_nickname: str |
124
|
|
|
:type online: dict |
125
|
|
|
""" |
126
|
|
|
default_message = self.online_user_names(online, CHANGE_ANONYMOUS_NAME_EVENT) |
127
|
|
|
default_message[OLD_NAME_VAR_NAME] = old_nickname |
128
|
|
|
return default_message |
129
|
|
|
|
130
|
|
|
@classmethod |
131
|
|
|
def default(cls, content, event=SYSTEM_MESSAGE_EVENT): |
132
|
|
|
""" |
133
|
|
|
:return: {"action": event, "content": content, "time": "20:48:57"} |
134
|
|
|
""" |
135
|
|
|
return { |
136
|
|
|
EVENT_VAR_NAME: event, |
137
|
|
|
CONTENT_VAR_NAME: content, |
138
|
|
|
TIME_VAR_NAME: get_milliseconds() |
139
|
|
|
} |
140
|
|
|
|
141
|
|
|
@classmethod |
142
|
|
|
def create_send_message(cls, message): |
143
|
|
|
""" |
144
|
|
|
:type message: Message |
145
|
|
|
""" |
146
|
|
|
result = cls.get_message(message) |
147
|
|
|
result[EVENT_VAR_NAME] = SEND_MESSAGE_EVENT |
148
|
|
|
return result |
149
|
|
|
|
150
|
|
|
@classmethod |
151
|
|
|
def get_message(cls, message): |
152
|
|
|
""" |
153
|
|
|
:param message: |
154
|
|
|
:return: "action": "joined", "content": {"v5bQwtWp": "alien", "tRD6emzs": "Alien"}, |
155
|
|
|
"sex": "Alien", "user": "tRD6emzs", "time": "20:48:57"} |
156
|
|
|
""" |
157
|
|
|
result = { |
158
|
|
|
USER_VAR_NAME: message.sender.username, |
159
|
|
|
USER_ID_VAR_NAME: message.sender.id, |
160
|
|
|
CONTENT_VAR_NAME: message.content, |
161
|
|
|
TIME_VAR_NAME: message.time, |
162
|
|
|
MESSAGE_ID_VAR_NAME: message.id, |
163
|
|
|
} |
164
|
|
|
if message.receiver is not None: |
165
|
|
|
result[RECEIVER_USERNAME_VAR_NAME] = message.receiver.username |
166
|
|
|
return result |
167
|
|
|
|
168
|
|
|
@classmethod |
169
|
|
|
def get_messages(cls, messages): |
170
|
|
|
""" |
171
|
|
|
:type messages: list[Messages] |
172
|
|
|
:type messages: QuerySet[Messages] |
173
|
|
|
""" |
174
|
|
|
return { |
175
|
|
|
CONTENT_VAR_NAME: [cls.create_send_message(message) for message in messages], |
176
|
|
|
EVENT_VAR_NAME: GET_MESSAGES_EVENT |
177
|
|
|
} |
178
|
|
|
|
179
|
|
|
def send_anonymous(self, content, receiver_anonymous, receiver_id): |
180
|
|
|
default_message = self.default(content, SEND_MESSAGE_EVENT) |
181
|
|
|
default_message[USER_VAR_NAME] = self.sender_name |
182
|
|
|
if receiver_anonymous is not None: |
183
|
|
|
default_message[RECEIVER_USERNAME_VAR_NAME] = receiver_anonymous |
184
|
|
|
default_message[RECEIVER_USERID_VAR_NAME] = receiver_id |
185
|
|
|
return default_message |
186
|
|
|
|
187
|
|
|
@property |
188
|
|
|
def stored_redis_user(self): |
189
|
|
|
return '%s:%s:%d' % (self.sender_name, self.sex, self.user_id) |
190
|
|
|
|
191
|
|
|
@property |
192
|
|
|
def channel(self): |
193
|
|
|
if self.user_id == 0: |
194
|
|
|
return REDIS_USERNAME_CHANNEL_PREFIX % self.sender_name |
195
|
|
|
else: |
196
|
|
|
return REDIS_USERID_CHANNEL_PREFIX % self.user_id |
197
|
|
|
|
198
|
|
|
@staticmethod |
199
|
|
|
def online_js_structure(name, sex, user_id): |
200
|
|
|
return { |
201
|
|
|
name: { |
202
|
|
|
GENDER_VAR_NAME: sex, |
203
|
|
|
USER_ID_VAR_NAME: user_id |
204
|
|
|
} |
205
|
|
|
} |
206
|
|
|
|
207
|
|
|
@property |
208
|
|
|
def online_self_js_structure(self): |
209
|
|
|
return self.online_js_structure(self.sender_name, self.sex, self.user_id) |
210
|
|
|
|
211
|
|
|
|
212
|
|
|
class MessagesHandler(MessagesCreator): |
213
|
|
|
|
214
|
|
|
def __init__(self, *args, **kwargs): |
215
|
|
|
super(MessagesHandler, self).__init__(*args, **kwargs) |
216
|
|
|
self.log_id = str(id(self) % 10000).rjust(4, '0') |
217
|
|
|
self.ip = None |
218
|
|
|
log_params = { |
219
|
|
|
'username': '00000000', |
220
|
|
|
'id': self.log_id, |
221
|
|
|
'ip': 'initializing' |
222
|
|
|
} |
223
|
|
|
self.logger = logging.LoggerAdapter(logger, log_params) |
224
|
|
|
self.async_redis = tornadoredis.Client() |
225
|
|
|
self.process_message = { |
226
|
|
|
GET_MINE_USERNAME_EVENT: self.process_change_username, |
227
|
|
|
GET_MESSAGES_EVENT: self.process_get_messages, |
228
|
|
|
SEND_MESSAGE_EVENT: self.process_send_message, |
229
|
|
|
} |
230
|
|
|
|
231
|
|
|
def do_db(self, callback, *arg, **args): |
232
|
|
|
try: |
233
|
|
|
return callback(*arg, **args) |
234
|
|
|
except (OperationalError, InterfaceError) as e: # Connection has gone away |
235
|
|
|
self.logger.warning('%s, reconnecting' % e) # TODO |
236
|
|
|
connection.close() |
237
|
|
|
return callback(*arg, **args) |
238
|
|
|
|
239
|
|
|
def get_online_from_redis(self, check_name=None, check_id=None): |
240
|
|
|
""" |
241
|
|
|
:rtype : dict |
242
|
|
|
returns (dict, bool) if check_type is present |
243
|
|
|
""" |
244
|
|
|
online = sync_redis.hgetall(REDIS_ONLINE_USERS) |
245
|
|
|
self.logger.debug('!! redis online: %s', online) |
246
|
|
|
result = {} |
247
|
|
|
user_is_online = False |
248
|
|
|
# redis stores REDIS_USER_FORMAT, so parse them |
249
|
|
|
if online: |
250
|
|
|
for key, raw_user_sex in online.items(): # py2 iteritems |
251
|
|
|
(name, sex, user_id) = raw_user_sex.decode('utf-8').split(':') |
252
|
|
|
if name == check_name and check_id != int(key.decode('utf-8')): |
253
|
|
|
user_is_online = True |
254
|
|
|
result.update(self.online_js_structure(name, sex, user_id)) |
255
|
|
|
if check_id: |
256
|
|
|
return result, user_is_online |
257
|
|
|
else: |
258
|
|
|
return result |
259
|
|
|
|
260
|
|
|
def add_online_user(self): |
261
|
|
|
""" |
262
|
|
|
adds to redis |
263
|
|
|
online_users = { connection_hash1 = stored_redis_user1, connection_hash_2 = stored_redis_user2 } |
264
|
|
|
:return: |
265
|
|
|
""" |
266
|
|
|
online = self.get_online_from_redis() |
267
|
|
|
async_redis_publisher.hset(REDIS_ONLINE_USERS, id(self), self.stored_redis_user) |
268
|
|
|
first_tab = False |
269
|
|
|
if self.sender_name not in online: # if a new tab has been opened |
270
|
|
|
online.update(self.online_self_js_structure) |
271
|
|
|
first_tab = True |
272
|
|
|
|
273
|
|
|
if first_tab: # Login event, sent user names to all |
274
|
|
|
online_user_names_mes = self.online_user_names(online, LOGIN_EVENT) |
275
|
|
|
self.logger.info('!! First tab, sending refresh online for all') |
276
|
|
|
self.publish(online_user_names_mes) |
277
|
|
|
else: # Send user names to self |
278
|
|
|
online_user_names_mes = self.online_user_names(online, REFRESH_USER_EVENT) |
279
|
|
|
self.logger.info('!! Second tab, retrieving online for self') |
280
|
|
|
self.safe_write(online_user_names_mes) |
281
|
|
|
# send usernamechat |
282
|
|
|
username_message = self.default(self.sender_name, GET_MINE_USERNAME_EVENT) |
283
|
|
|
self.safe_write(username_message) |
284
|
|
|
|
285
|
|
|
def set_username(self, session_key): |
286
|
|
|
""" |
287
|
|
|
Case registered: Fetch userName and its channels from database. returns them |
288
|
|
|
Case anonymous: generates a new name and saves it to session. returns default channel |
289
|
|
|
:return: channels user should subscribe |
290
|
|
|
""" |
291
|
|
|
session = SessionStore(session_key) |
292
|
|
|
try: |
293
|
|
|
self.user_id = int(session["_auth_user_id"]) |
294
|
|
|
user_db = self.do_db(User.objects.get, id=self.user_id) # everything but 0 is a registered user |
295
|
|
|
self.sender_name = user_db.username |
296
|
|
|
self.sex = user_db.sex_str |
297
|
|
|
rooms = user_db.rooms.all() # do_db is used already |
298
|
|
|
room_names = {} |
299
|
|
|
channels = [self.channel, ] |
300
|
|
|
for room in rooms: |
301
|
|
|
room_names[room.id] = room.name |
302
|
|
|
channels.append(REDIS_ROOM_CHANNEL_PREFIX % room.id) |
303
|
|
|
rooms_message = self.default(room_names, ROOMS_EVENT) |
304
|
|
|
self.logger.info("!! User %s subscribes for %s", self.sender_name, room_names) |
305
|
|
|
except (KeyError, User.DoesNotExist): |
306
|
|
|
# Anonymous |
307
|
|
|
self.sender_name = session.get(SESSION_USER_VAR_KEY) |
308
|
|
|
if self.sender_name is None: |
309
|
|
|
self.sender_name = id_generator(8) |
310
|
|
|
session[SESSION_USER_VAR_KEY] = self.sender_name |
311
|
|
|
session.save() |
312
|
|
|
self.logger.info("!! A new user log in, created username %s", self.sender_name) |
313
|
|
|
else: |
314
|
|
|
self.logger.info("!! Anonymous with name %s has logged", self.sender_name) |
315
|
|
|
channels = [ANONYMOUS_REDIS_CHANNEL, self.channel] |
316
|
|
|
rooms_message = self.default(ANONYMOUS_ROOM_NAMES, ROOMS_EVENT) |
317
|
|
|
finally: |
318
|
|
|
self.safe_write(rooms_message) |
319
|
|
|
return channels |
320
|
|
|
|
321
|
|
|
def publish(self, message, channel=ANONYMOUS_REDIS_CHANNEL): |
322
|
|
|
jsoned_mess = json.dumps(message) |
323
|
|
|
self.logger.debug('<%s> %s', channel, jsoned_mess) |
324
|
|
|
async_redis_publisher.publish(channel, jsoned_mess) |
325
|
|
|
|
326
|
|
|
# TODO really parse every single message for 1 action? |
327
|
|
|
def check_and_finish_change_name(self, message): |
328
|
|
|
if self.sex == ANONYMOUS_GENDER: |
329
|
|
|
parsed_message = json.loads(message) |
330
|
|
|
if parsed_message[EVENT_VAR_NAME] == CHANGE_ANONYMOUS_NAME_EVENT\ |
331
|
|
|
and parsed_message[OLD_NAME_VAR_NAME] == self.sender_name: |
332
|
|
|
self.async_redis.unsubscribe(REDIS_USERNAME_CHANNEL_PREFIX % self.sender_name) |
333
|
|
|
self.sender_name = parsed_message[USER_VAR_NAME] |
334
|
|
|
self.async_redis.subscribe(REDIS_USERNAME_CHANNEL_PREFIX % self.sender_name) |
335
|
|
|
async_redis_publisher.hset(REDIS_ONLINE_USERS, id(self), self.stored_redis_user) |
336
|
|
|
|
337
|
|
|
def new_message(self, message): |
338
|
|
|
if type(message.body) is not int: # subscribe event |
339
|
|
|
self.safe_write(message.body) |
340
|
|
|
self.check_and_finish_change_name(message.body) |
341
|
|
|
|
342
|
|
|
def safe_write(self, message): |
343
|
|
|
raise NotImplementedError('WebSocketHandler implements') |
344
|
|
|
|
345
|
|
|
def process_send_message(self, message): |
346
|
|
|
""" |
347
|
|
|
:type message: dict |
348
|
|
|
""" |
349
|
|
|
content = message[CONTENT_VAR_NAME] |
350
|
|
|
receiver_id = message.get(RECEIVER_USERID_VAR_NAME) # if receiver_id is None then its a private message |
351
|
|
|
receiver_name = message.get(RECEIVER_USERNAME_VAR_NAME) |
352
|
|
|
self.logger.info('!! Sending message %s to username:%s, id:%s', content, receiver_name, receiver_id) |
353
|
|
|
save_to_db = True |
354
|
|
|
if receiver_id is not None and receiver_id != 0: |
355
|
|
|
receiver_channel = REDIS_USERID_CHANNEL_PREFIX % receiver_id |
356
|
|
|
elif receiver_name is not None: |
357
|
|
|
receiver_channel = REDIS_USERNAME_CHANNEL_PREFIX % receiver_name |
358
|
|
|
save_to_db = False |
359
|
|
|
|
360
|
|
|
if self.user_id != 0 and save_to_db: |
361
|
|
|
self.logger.debug('!! Saving it to db') |
362
|
|
|
message_db = Message(sender_id=self.user_id, content=content, receiver_id=receiver_id) |
363
|
|
|
self.do_db(message_db.save) # exit on hacked id with exception |
364
|
|
|
prepared_message = self.create_send_message(message_db) |
365
|
|
|
else: |
366
|
|
|
self.logger.debug('!! NOT saving it') |
367
|
|
|
prepared_message = self.send_anonymous(content, receiver_name, receiver_id) |
368
|
|
|
|
369
|
|
|
if receiver_id is None: |
370
|
|
|
self.logger.debug('!! Detected as public') |
371
|
|
|
self.publish(prepared_message) |
372
|
|
|
else: |
373
|
|
|
self.publish(prepared_message, self.channel) |
374
|
|
|
self.logger.debug('!! Detected as private, channel %s', receiver_channel) |
375
|
|
|
if receiver_channel != self.channel: |
376
|
|
|
self.publish(prepared_message, receiver_channel) |
377
|
|
|
|
378
|
|
|
def process_change_username(self, message): |
379
|
|
|
""" |
380
|
|
|
:type message: dict |
381
|
|
|
""" |
382
|
|
|
self.logger.info('!! Changing username to %s', message[CONTENT_VAR_NAME]) |
383
|
|
|
new_username = message[CONTENT_VAR_NAME] |
384
|
|
|
try: |
385
|
|
|
self.do_db(check_user, new_username) |
386
|
|
|
online = self.get_online_from_redis() |
387
|
|
|
if new_username in online: |
388
|
|
|
self.logger.info('!! This name is already used') |
389
|
|
|
raise ValidationError('This name is already used by another anonymous!') |
390
|
|
|
session_key = self.get_cookie(settings.SESSION_COOKIE_NAME) |
391
|
|
|
session = SessionStore(session_key) |
392
|
|
|
session[SESSION_USER_VAR_KEY] = new_username |
393
|
|
|
session.save() |
394
|
|
|
try: |
395
|
|
|
del online[self.sender_name] |
396
|
|
|
except KeyError: # if two or more change_username events in fast time |
397
|
|
|
pass |
398
|
|
|
old_username = self.sender_name |
399
|
|
|
# temporary set username for creating messages with self.online_self_js_structure, change_user_nickname |
400
|
|
|
self.sender_name = new_username |
401
|
|
|
online.update(self.online_self_js_structure) |
402
|
|
|
message_all = self.change_user_nickname(old_username, online) |
403
|
|
|
self.sender_name = old_username # see check_and_finish_change_name |
404
|
|
|
self.publish(message_all) |
405
|
|
|
except ValidationError as e: |
406
|
|
|
self.safe_write(self.default(str(e.message), event=GROWL_MESSAGE_EVENT)) |
407
|
|
|
|
408
|
|
|
def process_get_messages(self, data): |
409
|
|
|
""" |
410
|
|
|
:type data: dict |
411
|
|
|
""" |
412
|
|
|
header_id = data.get(HEADER_ID_VAR_NAME, None) |
413
|
|
|
count = int(data.get(COUNT_VAR_NAME, 10)) |
414
|
|
|
self.logger.info('!! Fetching %d messages starting from %s', count, header_id) |
415
|
|
|
if header_id is None: |
416
|
|
|
messages = Message.objects.filter( |
417
|
|
|
# Only public or private or private |
418
|
|
|
Q(receiver=None) | Q(sender=self.user_id) | Q(receiver=self.user_id) |
419
|
|
|
).order_by('-pk')[:count] |
420
|
|
|
else: |
421
|
|
|
messages = Message.objects.filter( |
422
|
|
|
Q(id__lt=header_id), |
423
|
|
|
Q(receiver=None) | Q(sender=self.user_id) | Q(receiver=self.user_id) |
424
|
|
|
).order_by('-pk')[:count] |
425
|
|
|
response = self.do_db(self.get_messages, messages) |
426
|
|
|
self.safe_write(response) |
427
|
|
|
|
428
|
|
|
def save_ip(self): |
429
|
|
|
user_id = None if self.user_id == 0 else self.user_id |
430
|
|
|
anon_name = self.sender_name if self.user_id == 0 else None |
431
|
|
|
if (self.do_db(UserJoinedInfo.objects.filter( |
432
|
|
|
Q(ip__ip=self.ip) & Q(anon_name=anon_name) & Q(user_id=user_id)).exists)): |
433
|
|
|
return |
434
|
|
|
ip_address = self.get_or_create_ip() |
435
|
|
|
UserJoinedInfo.objects.create( |
436
|
|
|
ip=ip_address, |
437
|
|
|
user_id=user_id, |
438
|
|
|
anon_name=anon_name |
439
|
|
|
) |
440
|
|
|
|
441
|
|
|
def get_or_create_ip(self): |
442
|
|
|
try: |
443
|
|
|
ip_address = IpAddress.objects.get(ip=self.ip) |
444
|
|
|
except IpAddress.DoesNotExist: |
445
|
|
|
try: |
446
|
|
|
if not api_url: |
447
|
|
|
raise Exception('api url is absent') |
448
|
|
|
self.logger.debug("Creating ip record %s", self.ip) |
449
|
|
|
f = urlopen(api_url % self.ip) |
450
|
|
|
raw_response = f.read().decode("utf-8") |
451
|
|
|
response = json.loads(raw_response) |
452
|
|
|
if response['status'] != "success": |
453
|
|
|
raise Exception("Creating iprecord failed, server responded: %s" % raw_response) |
454
|
|
|
ip_address = IpAddress.objects.create( |
455
|
|
|
ip=self.ip, |
456
|
|
|
isp=response['isp'], |
457
|
|
|
country=response['country'], |
458
|
|
|
region=response['regionName'], |
459
|
|
|
city=response['city'] |
460
|
|
|
) |
461
|
|
|
except Exception as e: |
462
|
|
|
self.logger.error("Error while creating ip with country info, because %s", e) |
463
|
|
|
ip_address = IpAddress.objects.create(ip=self.ip) |
464
|
|
|
return ip_address |
465
|
|
|
|
466
|
|
|
|
467
|
|
|
class AntiSpam(object): |
468
|
|
|
|
469
|
|
|
def __init__(self): |
470
|
|
|
self.spammed = 0 |
471
|
|
|
self.info = {} |
472
|
|
|
|
473
|
|
|
def check_spam(self, json_message): |
474
|
|
|
message_length = len(json_message) |
475
|
|
|
info_key = int(round(time.time() * 100)) |
476
|
|
|
self.info[info_key] = message_length |
477
|
|
|
if message_length > MAX_MESSAGE_SIZE: |
478
|
|
|
self.spammed += 1 |
479
|
|
|
raise ValidationError("Message can't exceed %s symbols" % MAX_MESSAGE_SIZE) |
480
|
|
|
self.check_timed_spam() |
481
|
|
|
|
482
|
|
|
def check_timed_spam(self): |
483
|
|
|
# TODO implement me |
484
|
|
|
pass |
485
|
|
|
# raise ValidationError("You're chatting too much, calm down a bit!") |
486
|
|
|
|
487
|
|
|
|
488
|
|
|
class TornadoHandler(WebSocketHandler, MessagesHandler): |
489
|
|
|
|
490
|
|
|
def __init__(self, *args, **kwargs): |
491
|
|
|
super(TornadoHandler, self).__init__(*args, **kwargs) |
492
|
|
|
self.connected = False |
493
|
|
|
self.anti_spam = AntiSpam() |
494
|
|
|
|
495
|
|
|
@tornado.gen.engine |
496
|
|
|
def listen(self, channels): |
497
|
|
|
""" |
498
|
|
|
self.channel should been set before calling |
499
|
|
|
""" |
500
|
|
|
yield tornado.gen.Task( |
501
|
|
|
self.async_redis.subscribe, channels) |
502
|
|
|
self.async_redis.listen(self.new_message) |
503
|
|
|
|
504
|
|
|
def data_received(self, chunk): |
505
|
|
|
pass |
506
|
|
|
|
507
|
|
|
def on_message(self, json_message): |
508
|
|
|
try: |
509
|
|
|
if not self.connected: |
510
|
|
|
raise ValidationError('Skipping message %s, as websocket is not initialized yet' % json_message) |
511
|
|
|
if not json_message: |
512
|
|
|
raise ValidationError('Skipping null message') |
513
|
|
|
self.anti_spam.check_spam(json_message) |
514
|
|
|
self.logger.debug('<< %s', json_message) |
515
|
|
|
message = json.loads(json_message) |
516
|
|
|
self.process_message[message[EVENT_VAR_NAME]](message) |
517
|
|
|
except ValidationError as e: |
518
|
|
|
logger.warning("Message won't be send. Reason: %s", e.message) |
519
|
|
|
self.safe_write(self.default(e.message)) |
520
|
|
|
|
521
|
|
|
def on_close(self): |
522
|
|
|
try: |
523
|
|
|
self_id = id(self) |
524
|
|
|
async_redis_publisher.hdel(REDIS_ONLINE_USERS, self_id) |
525
|
|
|
if self.connected: |
526
|
|
|
# seems like async solves problem with connection lost and wrong data status |
527
|
|
|
# http://programmers.stackexchange.com/questions/294663/how-to-store-online-status |
528
|
|
|
online, is_online = self.get_online_from_redis(self.sender_name, self_id) |
529
|
|
|
self.logger.info('!! Closing connection, redis current online %s', online) |
530
|
|
|
if not is_online: |
531
|
|
|
message = self.online_user_names(online, LOGOUT_EVENT) |
532
|
|
|
self.logger.debug('!! User closed the last tab, refreshing online for all') |
533
|
|
|
self.publish(message) |
534
|
|
|
else: |
535
|
|
|
self.logger.debug('!! User is still online in other tabs') |
536
|
|
|
else: |
537
|
|
|
self.logger.warning('Dropping connection for not connected user') |
538
|
|
|
finally: |
539
|
|
|
if self.async_redis.subscribed: |
540
|
|
|
# TODO unsubscribe of all subscribed !IMPORTANT |
541
|
|
|
self.async_redis.unsubscribe([ |
542
|
|
|
ANONYMOUS_REDIS_CHANNEL, |
543
|
|
|
self.channel |
544
|
|
|
]) |
545
|
|
|
self.async_redis.disconnect() |
546
|
|
|
|
547
|
|
|
def open(self, *args, **kargs): |
548
|
|
|
session_key = self.get_cookie(settings.SESSION_COOKIE_NAME) |
549
|
|
|
if sessionStore.exists(session_key): |
550
|
|
|
self.logger.debug("!! Incoming connection, session %s, thread hash %s", session_key, id(self)) |
551
|
|
|
self.async_redis.connect() |
552
|
|
|
channels = self.set_username(session_key) |
553
|
|
|
self.ip = self.get_client_ip() |
554
|
|
|
log_params = { |
555
|
|
|
'username': self.sender_name.rjust(8), |
556
|
|
|
'id': self.log_id, |
557
|
|
|
'ip': self.ip |
558
|
|
|
} |
559
|
|
|
self.logger = logging.LoggerAdapter(logger, log_params) |
560
|
|
|
self.listen(channels) |
561
|
|
|
self.add_online_user() |
562
|
|
|
self.connected = True |
563
|
|
|
Thread(target=self.save_ip).start() |
564
|
|
|
else: |
565
|
|
|
self.logger.warning('!! Session key %s has been rejected', str(session_key)) |
566
|
|
|
self.close(403, "Session key %s has been rejected" % session_key) |
567
|
|
|
|
568
|
|
|
def check_origin(self, origin): |
569
|
|
|
""" |
570
|
|
|
check whether browser set domain matches origin |
571
|
|
|
""" |
572
|
|
|
parsed_origin = urlparse(origin) |
573
|
|
|
origin = parsed_origin.netloc |
574
|
|
|
origin_domain = origin.split(':')[0].lower() |
575
|
|
|
browser_set = self.request.headers.get("Host") |
576
|
|
|
browser_domain = browser_set.split(':')[0] |
577
|
|
|
return browser_domain == origin_domain |
578
|
|
|
|
579
|
|
|
def safe_write(self, message): |
580
|
|
|
""" |
581
|
|
|
Tries to send message, doesn't throw exception outside |
582
|
|
|
:type self: MessagesHandler |
583
|
|
|
""" |
584
|
|
|
try: |
585
|
|
|
if isinstance(message, dict): |
586
|
|
|
message = json.dumps(message) |
587
|
|
|
if not (isinstance(message, str) or (not PY3 and isinstance(message, unicode))): |
588
|
|
|
raise ValueError('Wrong message type : %s' % str(message)) |
589
|
|
|
self.logger.debug(">> %s", message) |
590
|
|
|
self.write_message(message) |
591
|
|
|
except tornado.websocket.WebSocketClosedError as e: |
592
|
|
|
self.logger.error("%s. Can't send << %s >> message", e, str(message)) |
593
|
|
|
|
594
|
|
|
def get_client_ip(self): |
595
|
|
|
x_real_ip = self.request.headers.get("X-Real-IP") |
596
|
|
|
return x_real_ip or self.request.remote_ip |
597
|
|
|
|
598
|
|
|
application = tornado.web.Application([ |
599
|
|
|
(r'.*', TornadoHandler), |
600
|
|
|
]) |
601
|
|
|
|