1
|
|
|
import json |
2
|
|
|
import logging |
3
|
|
|
import sys |
4
|
|
|
import time |
5
|
|
|
from threading import Thread |
6
|
|
|
from urllib.request import urlopen |
7
|
|
|
|
8
|
|
|
import tornado.gen |
9
|
|
|
import tornado.httpclient |
10
|
|
|
import tornado.ioloop |
11
|
|
|
import tornado.web |
12
|
|
|
import tornado.websocket |
13
|
|
|
import tornadoredis |
14
|
|
|
from django.conf import settings |
15
|
|
|
from django.core.exceptions import ValidationError |
16
|
|
|
from django.db import connection, OperationalError, InterfaceError |
17
|
|
|
from django.db.models import Q |
18
|
|
|
from redis_sessions.session import SessionStore |
19
|
|
|
from tornado.websocket import WebSocketHandler |
20
|
|
|
|
21
|
|
|
from chat.utils import extract_photo |
22
|
|
|
|
23
|
|
|
try: |
24
|
|
|
from urllib.parse import urlparse # py2 |
25
|
|
|
except ImportError: |
26
|
|
|
from urlparse import urlparse # py3 |
27
|
|
|
|
28
|
|
|
from chat.settings import MAX_MESSAGE_SIZE, ALL_REDIS_ROOM |
29
|
|
|
from chat.models import User, Message, Room, IpAddress, get_milliseconds, UserJoinedInfo |
30
|
|
|
|
31
|
|
|
PY3 = sys.version > '3' |
32
|
|
|
|
33
|
|
|
user_cookie_name = settings.USER_COOKIE_NAME |
34
|
|
|
api_url = getattr(settings, "IP_API_URL", None) |
35
|
|
|
|
36
|
|
|
SESSION_USER_VAR_KEY = 'user_name' |
37
|
|
|
|
38
|
|
|
MESSAGE_ID_VAR_NAME = 'id' |
39
|
|
|
RECEIVER_USERNAME_VAR_NAME = 'receiverName' |
40
|
|
|
RECEIVER_USERID_VAR_NAME = 'receiverId' |
41
|
|
|
CALL_TYPE_VAR_NAME = 'type' |
42
|
|
|
COUNT_VAR_NAME = 'count' |
43
|
|
|
HEADER_ID_VAR_NAME = 'headerId' |
44
|
|
|
USER_VAR_NAME = 'user' |
45
|
|
|
USER_ID_VAR_NAME = 'userId' |
46
|
|
|
TIME_VAR_NAME = 'time' |
47
|
|
|
OLD_NAME_VAR_NAME = 'oldName' |
48
|
|
|
CONTENT_VAR_NAME = 'content' |
49
|
|
|
IMG_VAR_NAME = 'image' |
50
|
|
|
EVENT_VAR_NAME = 'action' |
51
|
|
|
GENDER_VAR_NAME = 'sex' |
52
|
|
|
|
53
|
|
|
REFRESH_USER_EVENT = 'onlineUsers' |
54
|
|
|
SYSTEM_MESSAGE_EVENT = 'system' |
55
|
|
|
GROWL_MESSAGE_EVENT = 'growl' |
56
|
|
|
GET_MESSAGES_EVENT = 'messages' |
57
|
|
|
ROOMS_EVENT = 'rooms' # thread ex "main" , channel ex. 'r:main', "i:3" |
58
|
|
|
LOGIN_EVENT = 'joined' |
59
|
|
|
LOGOUT_EVENT = 'left' |
60
|
|
|
SEND_MESSAGE_EVENT = 'send' |
61
|
|
|
WEBRTC_EVENT = 'webrtc' |
62
|
|
|
CALL_EVENT = 'call' |
63
|
|
|
|
64
|
|
|
REDIS_USERID_CHANNEL_PREFIX = 'i:%s' |
65
|
|
|
REDIS_ROOM_CHANNEL_PREFIX = 'r:%d' |
66
|
|
|
REDIS_ONLINE_USERS = "online_users" |
67
|
|
|
|
68
|
|
|
(default_room, c) = Room.objects.get_or_create(name=ALL_REDIS_ROOM) |
69
|
|
|
|
70
|
|
|
ANONYMOUS_REDIS_CHANNEL = REDIS_ROOM_CHANNEL_PREFIX % default_room.id |
71
|
|
|
ANONYMOUS_ROOM_NAMES = {default_room.id: default_room.name} |
72
|
|
|
|
73
|
|
|
sessionStore = SessionStore() |
74
|
|
|
|
75
|
|
|
logger = logging.getLogger(__name__) |
76
|
|
|
|
77
|
|
|
# TODO https://github.com/leporo/tornado-redis#connection-pool-support |
78
|
|
|
CONNECTION_POOL = tornadoredis.ConnectionPool( |
79
|
|
|
max_connections=500, |
80
|
|
|
wait_for_available=True) |
81
|
|
|
|
82
|
|
|
|
83
|
|
|
class MessagesCreator(object): |
84
|
|
|
|
85
|
|
|
def __init__(self, *args, **kwargs): |
86
|
|
|
super(MessagesCreator, self).__init__(*args, **kwargs) |
87
|
|
|
self.sex = None |
88
|
|
|
self.sender_name = None |
89
|
|
|
self.user_id = 0 # anonymous by default |
90
|
|
|
|
91
|
|
|
def default(self, content, event=SYSTEM_MESSAGE_EVENT): |
92
|
|
|
""" |
93
|
|
|
:return: {"action": event, "content": content, "time": "20:48:57"} |
94
|
|
|
""" |
95
|
|
|
return { |
96
|
|
|
EVENT_VAR_NAME: event, |
97
|
|
|
CONTENT_VAR_NAME: content, |
98
|
|
|
USER_ID_VAR_NAME: self.user_id, |
99
|
|
|
USER_VAR_NAME: self.sender_name, |
100
|
|
|
TIME_VAR_NAME: get_milliseconds() |
101
|
|
|
} |
102
|
|
|
|
103
|
|
|
def offer_call(self, content, message_type): |
104
|
|
|
""" |
105
|
|
|
:return: {"action": "call", "content": content, "time": "20:48:57"} |
106
|
|
|
""" |
107
|
|
|
message = self.default(content, CALL_EVENT) |
108
|
|
|
message[CALL_TYPE_VAR_NAME] = message_type |
109
|
|
|
return message |
110
|
|
|
|
111
|
|
|
@classmethod |
112
|
|
|
def create_send_message(cls, message): |
113
|
|
|
""" |
114
|
|
|
:param message: |
115
|
|
|
:return: "action": "joined", "content": {"v5bQwtWp": "alien", "tRD6emzs": "Alien"}, |
116
|
|
|
"sex": "Alien", "user": "tRD6emzs", "time": "20:48:57"} |
117
|
|
|
""" |
118
|
|
|
result = { |
119
|
|
|
USER_VAR_NAME: message.sender.username, |
120
|
|
|
USER_ID_VAR_NAME: message.sender.id, |
121
|
|
|
CONTENT_VAR_NAME: message.content, |
122
|
|
|
TIME_VAR_NAME: message.time, |
123
|
|
|
MESSAGE_ID_VAR_NAME: message.id, |
124
|
|
|
EVENT_VAR_NAME: SEND_MESSAGE_EVENT |
125
|
|
|
} |
126
|
|
|
if message.img.name is not None: |
127
|
|
|
result[IMG_VAR_NAME] = message.img.url |
128
|
|
|
if message.receiver is not None: |
129
|
|
|
result[RECEIVER_USERID_VAR_NAME] = message.receiver.id |
130
|
|
|
result[RECEIVER_USERNAME_VAR_NAME] = message.receiver.username |
131
|
|
|
return result |
132
|
|
|
|
133
|
|
|
@classmethod |
134
|
|
|
def get_messages(cls, messages): |
135
|
|
|
""" |
136
|
|
|
:type messages: list[Messages] |
137
|
|
|
:type messages: QuerySet[Messages] |
138
|
|
|
""" |
139
|
|
|
return { |
140
|
|
|
CONTENT_VAR_NAME: [cls.create_send_message(message) for message in messages], |
141
|
|
|
EVENT_VAR_NAME: GET_MESSAGES_EVENT |
142
|
|
|
} |
143
|
|
|
|
144
|
|
|
@property |
145
|
|
|
def stored_redis_user(self): |
146
|
|
|
return '%s:%s:%d' % (self.sender_name, self.sex, self.user_id) |
147
|
|
|
|
148
|
|
|
@property |
149
|
|
|
def channel(self): |
150
|
|
|
return REDIS_USERID_CHANNEL_PREFIX % self.user_id |
151
|
|
|
|
152
|
|
|
@staticmethod |
153
|
|
|
def online_js_structure(name, sex, user_id): |
154
|
|
|
return { |
155
|
|
|
name: { |
156
|
|
|
GENDER_VAR_NAME: sex, |
157
|
|
|
USER_ID_VAR_NAME: user_id |
158
|
|
|
} |
159
|
|
|
} |
160
|
|
|
|
161
|
|
|
@property |
162
|
|
|
def online_self_js_structure(self): |
163
|
|
|
return self.online_js_structure(self.sender_name, self.sex, self.user_id) |
164
|
|
|
|
165
|
|
|
|
166
|
|
|
class MessagesHandler(MessagesCreator): |
167
|
|
|
|
168
|
|
|
def __init__(self, *args, **kwargs): |
169
|
|
|
super(MessagesHandler, self).__init__(*args, **kwargs) |
170
|
|
|
self.log_id = str(id(self) % 10000).rjust(4, '0') |
171
|
|
|
self.ip = None |
172
|
|
|
log_params = { |
173
|
|
|
'username': '00000000', |
174
|
|
|
'id': self.log_id, |
175
|
|
|
'ip': 'initializing' |
176
|
|
|
} |
177
|
|
|
self.logger = logging.LoggerAdapter(logger, log_params) |
178
|
|
|
self.async_redis = tornadoredis.Client() |
179
|
|
|
self.process_message = { |
180
|
|
|
GET_MESSAGES_EVENT: self.process_get_messages, |
181
|
|
|
SEND_MESSAGE_EVENT: self.process_send_message, |
182
|
|
|
CALL_EVENT: self.process_call |
183
|
|
|
} |
184
|
|
|
|
185
|
|
|
def do_db(self, callback, *arg, **args): |
186
|
|
|
try: |
187
|
|
|
return callback(*arg, **args) |
188
|
|
|
except (OperationalError, InterfaceError) as e: # Connection has gone away |
189
|
|
|
self.logger.warning('%s, reconnecting' % e) # TODO |
190
|
|
|
connection.close() |
191
|
|
|
return callback(*arg, **args) |
192
|
|
|
|
193
|
|
|
def get_online_from_redis(self, check_name=None, check_id=None): |
194
|
|
|
""" |
195
|
|
|
:rtype : dict |
196
|
|
|
returns (dict, bool) if check_type is present |
197
|
|
|
""" |
198
|
|
|
online = self.sync_redis.hgetall(REDIS_ONLINE_USERS) |
199
|
|
|
self.logger.debug('!! redis online: %s', online) |
200
|
|
|
result = {} |
201
|
|
|
user_is_online = False |
202
|
|
|
# redis stores REDIS_USER_FORMAT, so parse them |
203
|
|
|
if online: |
204
|
|
|
for key, raw_user_sex in online.items(): # py2 iteritems |
205
|
|
|
(name, sex, user_id) = raw_user_sex.decode('utf-8').split(':') |
206
|
|
|
if name == check_name and check_id != int(key.decode('utf-8')): |
207
|
|
|
user_is_online = True |
208
|
|
|
result.update(self.online_js_structure(name, sex, user_id)) |
209
|
|
|
return (result, user_is_online) if check_id else result |
210
|
|
|
|
211
|
|
|
def add_online_user(self): |
212
|
|
|
""" |
213
|
|
|
adds to redis |
214
|
|
|
online_users = { connection_hash1 = stored_redis_user1, connection_hash_2 = stored_redis_user2 } |
215
|
|
|
:return: |
216
|
|
|
""" |
217
|
|
|
online = self.get_online_from_redis() |
218
|
|
|
self.async_redis_publisher.hset(REDIS_ONLINE_USERS, id(self), self.stored_redis_user) |
219
|
|
|
if self.sender_name not in online: # if a new tab has been opened |
220
|
|
|
online.update(self.online_self_js_structure) |
221
|
|
|
online_user_names_mes = self.default(online, LOGIN_EVENT) |
222
|
|
|
self.logger.info('!! First tab, sending refresh online for all') |
223
|
|
|
self.publish(online_user_names_mes) |
224
|
|
|
else: # Send user names to self |
225
|
|
|
online_user_names_mes = self.default(online, REFRESH_USER_EVENT) |
226
|
|
|
self.logger.info('!! Second tab, retrieving online for self') |
227
|
|
|
self.safe_write(online_user_names_mes) |
228
|
|
|
|
229
|
|
|
def set_username(self, session_key): |
230
|
|
|
""" |
231
|
|
|
Case registered: Fetch userName and its channels from database. returns them |
232
|
|
|
:return: channels user should subscribe |
233
|
|
|
""" |
234
|
|
|
session = SessionStore(session_key) |
235
|
|
|
self.user_id = int(session["_auth_user_id"]) |
236
|
|
|
user_db = self.do_db(User.objects.get, id=self.user_id) # everything but 0 is a registered user |
237
|
|
|
self.sender_name = user_db.username |
238
|
|
|
self.sex = user_db.sex_str |
239
|
|
|
rooms = user_db.rooms.all() # do_db is used already |
240
|
|
|
room_names = {} |
241
|
|
|
channels = [self.channel, ] |
242
|
|
|
for room in rooms: |
243
|
|
|
room_names[room.id] = room.name |
244
|
|
|
channels.append(REDIS_ROOM_CHANNEL_PREFIX % room.id) |
245
|
|
|
rooms_message = self.default(room_names, ROOMS_EVENT) |
246
|
|
|
self.logger.info("!! User %s subscribes for %s", self.sender_name, room_names) |
247
|
|
|
self.safe_write(rooms_message) |
248
|
|
|
return channels |
249
|
|
|
|
250
|
|
|
def publish(self, message, channel=ANONYMOUS_REDIS_CHANNEL): |
251
|
|
|
jsoned_mess = json.dumps(message) |
252
|
|
|
self.logger.debug('<%s> %s', channel, jsoned_mess) |
253
|
|
|
self.async_redis_publisher.publish(channel, jsoned_mess) |
254
|
|
|
|
255
|
|
|
def new_message(self, message): |
256
|
|
|
if type(message.body) is not int: # subscribe event |
257
|
|
|
self.safe_write(message.body) |
258
|
|
|
|
259
|
|
|
def safe_write(self, message): |
260
|
|
|
raise NotImplementedError('WebSocketHandler implements') |
261
|
|
|
|
262
|
|
|
def process_send_message(self, message): |
263
|
|
|
""" |
264
|
|
|
:type message: dict |
265
|
|
|
""" |
266
|
|
|
content = message[CONTENT_VAR_NAME] |
267
|
|
|
receiver_id = message.get(RECEIVER_USERID_VAR_NAME) # if receiver_id is None then its a private message |
268
|
|
|
self.logger.info('!! Sending message %s to user with id %s', content, receiver_id) |
269
|
|
|
receiver_channel = REDIS_USERID_CHANNEL_PREFIX % receiver_id |
270
|
|
|
message_db = Message( |
271
|
|
|
sender_id=self.user_id, |
272
|
|
|
content=content, |
273
|
|
|
receiver_id=receiver_id, |
274
|
|
|
) |
275
|
|
|
if IMG_VAR_NAME in message: |
276
|
|
|
message_db.img = extract_photo(message[IMG_VAR_NAME]) |
277
|
|
|
self.do_db(message_db.save) # exit on hacked id with exception |
278
|
|
|
prepared_message = self.create_send_message(message_db) |
279
|
|
|
if receiver_id is None: |
280
|
|
|
self.logger.debug('!! Detected as public') |
281
|
|
|
self.publish(prepared_message) |
282
|
|
|
else: |
283
|
|
|
self.publish(prepared_message, self.channel) |
284
|
|
|
self.logger.debug('!! Detected as private, channel %s', receiver_channel) |
285
|
|
|
if receiver_channel != self.channel: |
286
|
|
|
self.publish(prepared_message, receiver_channel) |
287
|
|
|
|
288
|
|
|
def process_call(self, message): |
289
|
|
|
""" |
290
|
|
|
:type message: dict |
291
|
|
|
""" |
292
|
|
|
receiver_id = message.get(RECEIVER_USERID_VAR_NAME) # if receiver_id is None then its a private message |
293
|
|
|
self.logger.info('!! Offering a call to user with id %s', receiver_id) |
294
|
|
|
message = self.offer_call(message.get(CONTENT_VAR_NAME), message.get(CALL_TYPE_VAR_NAME)) |
295
|
|
|
self.publish(message, REDIS_USERID_CHANNEL_PREFIX % receiver_id) |
296
|
|
|
|
297
|
|
|
def process_get_messages(self, data): |
298
|
|
|
""" |
299
|
|
|
:type data: dict |
300
|
|
|
""" |
301
|
|
|
header_id = data.get(HEADER_ID_VAR_NAME, None) |
302
|
|
|
count = int(data.get(COUNT_VAR_NAME, 10)) |
303
|
|
|
self.logger.info('!! Fetching %d messages starting from %s', count, header_id) |
304
|
|
|
if header_id is None: |
305
|
|
|
messages = Message.objects.filter( |
306
|
|
|
# Only public or private or private |
307
|
|
|
Q(receiver=None) | Q(sender=self.user_id) | Q(receiver=self.user_id) |
308
|
|
|
).order_by('-pk')[:count] |
309
|
|
|
else: |
310
|
|
|
messages = Message.objects.filter( |
311
|
|
|
Q(id__lt=header_id), |
312
|
|
|
Q(receiver=None) | Q(sender=self.user_id) | Q(receiver=self.user_id) |
313
|
|
|
).order_by('-pk')[:count] |
314
|
|
|
response = self.do_db(self.get_messages, messages) |
315
|
|
|
self.safe_write(response) |
316
|
|
|
|
317
|
|
|
def save_ip(self): |
318
|
|
|
if (self.do_db(UserJoinedInfo.objects.filter( |
319
|
|
|
Q(ip__ip=self.ip) & Q(user_id=self.user_id)).exists)): |
320
|
|
|
return |
321
|
|
|
ip_address = self.get_or_create_ip() |
322
|
|
|
UserJoinedInfo.objects.create( |
323
|
|
|
ip=ip_address, |
324
|
|
|
user_id=self.user_id |
325
|
|
|
) |
326
|
|
|
|
327
|
|
|
def get_or_create_ip(self): |
328
|
|
|
try: |
329
|
|
|
ip_address = IpAddress.objects.get(ip=self.ip) |
330
|
|
|
except IpAddress.DoesNotExist: |
331
|
|
|
try: |
332
|
|
|
if not api_url: |
333
|
|
|
raise Exception('api url is absent') |
334
|
|
|
self.logger.debug("Creating ip record %s", self.ip) |
335
|
|
|
f = urlopen(api_url % self.ip) |
336
|
|
|
raw_response = f.read().decode("utf-8") |
337
|
|
|
response = json.loads(raw_response) |
338
|
|
|
if response['status'] != "success": |
339
|
|
|
raise Exception("Creating iprecord failed, server responded: %s" % raw_response) |
340
|
|
|
ip_address = IpAddress.objects.create( |
341
|
|
|
ip=self.ip, |
342
|
|
|
isp=response['isp'], |
343
|
|
|
country=response['country'], |
344
|
|
|
region=response['regionName'], |
345
|
|
|
city=response['city'], |
346
|
|
|
country_code=response['countryCode'] |
347
|
|
|
) |
348
|
|
|
except Exception as e: |
349
|
|
|
self.logger.error("Error while creating ip with country info, because %s", e) |
350
|
|
|
ip_address = IpAddress.objects.create(ip=self.ip) |
351
|
|
|
return ip_address |
352
|
|
|
|
353
|
|
|
|
354
|
|
|
class AntiSpam(object): |
355
|
|
|
|
356
|
|
|
def __init__(self): |
357
|
|
|
self.spammed = 0 |
358
|
|
|
self.info = {} |
359
|
|
|
|
360
|
|
|
def check_spam(self, json_message): |
361
|
|
|
message_length = len(json_message) |
362
|
|
|
info_key = int(round(time.time() * 100)) |
363
|
|
|
self.info[info_key] = message_length |
364
|
|
|
if message_length > MAX_MESSAGE_SIZE: |
365
|
|
|
self.spammed += 1 |
366
|
|
|
raise ValidationError("Message can't exceed %d symbols" % MAX_MESSAGE_SIZE) |
367
|
|
|
self.check_timed_spam() |
368
|
|
|
|
369
|
|
|
def check_timed_spam(self): |
370
|
|
|
# TODO implement me |
371
|
|
|
pass |
372
|
|
|
# raise ValidationError("You're chatting too much, calm down a bit!") |
373
|
|
|
|
374
|
|
|
|
375
|
|
|
class TornadoHandler(WebSocketHandler, MessagesHandler): |
376
|
|
|
|
377
|
|
|
def __init__(self, *args, **kwargs): |
378
|
|
|
super(TornadoHandler, self).__init__(*args, **kwargs) |
379
|
|
|
self.connected = False |
380
|
|
|
self.anti_spam = AntiSpam() |
381
|
|
|
from chat import global_redis |
382
|
|
|
self.async_redis_publisher = global_redis.async_redis_publisher |
383
|
|
|
self.sync_redis = global_redis.sync_redis |
384
|
|
|
|
385
|
|
|
@tornado.gen.engine |
386
|
|
|
def listen(self, channels): |
387
|
|
|
""" |
388
|
|
|
self.channel should been set before calling |
389
|
|
|
""" |
390
|
|
|
yield tornado.gen.Task( |
391
|
|
|
self.async_redis.subscribe, channels) |
392
|
|
|
self.async_redis.listen(self.new_message) |
393
|
|
|
|
394
|
|
|
def data_received(self, chunk): |
395
|
|
|
pass |
396
|
|
|
|
397
|
|
|
def on_message(self, json_message): |
398
|
|
|
try: |
399
|
|
|
if not self.connected: |
400
|
|
|
raise ValidationError('Skipping message %s, as websocket is not initialized yet' % json_message) |
401
|
|
|
if not json_message: |
402
|
|
|
raise ValidationError('Skipping null message') |
403
|
|
|
# self.anti_spam.check_spam(json_message) |
404
|
|
|
self.logger.debug('<< %s', json_message) |
405
|
|
|
message = json.loads(json_message) |
406
|
|
|
if not message[EVENT_VAR_NAME] in self.process_message: |
407
|
|
|
raise ValidationError("event {} is unknown".format(message[EVENT_VAR_NAME])) |
408
|
|
|
self.process_message[message[EVENT_VAR_NAME]](message) |
409
|
|
|
except ValidationError as e: |
410
|
|
|
self.logger.warning("Message won't be send, because : %s", e.message) |
411
|
|
|
self.safe_write(self.default(str(e.message), event=GROWL_MESSAGE_EVENT)) |
412
|
|
|
|
413
|
|
|
def on_close(self): |
414
|
|
|
try: |
415
|
|
|
self_id = id(self) |
416
|
|
|
self.async_redis_publisher.hdel(REDIS_ONLINE_USERS, self_id) |
417
|
|
|
if self.connected: |
418
|
|
|
# seems like async solves problem with connection lost and wrong data status |
419
|
|
|
# http://programmers.stackexchange.com/questions/294663/how-to-store-online-status |
420
|
|
|
online, is_online = self.get_online_from_redis(self.sender_name, self_id) |
421
|
|
|
self.logger.info('!! Closing connection, redis current online %s', online) |
422
|
|
|
if not is_online: |
423
|
|
|
message = self.default(online, LOGOUT_EVENT) |
424
|
|
|
self.logger.debug('!! User closed the last tab, refreshing online for all') |
425
|
|
|
self.publish(message) |
426
|
|
|
else: |
427
|
|
|
self.logger.debug('!! User is still online in other tabs') |
428
|
|
|
else: |
429
|
|
|
self.logger.warning('Dropping connection for not connected user') |
430
|
|
|
finally: |
431
|
|
|
if self.async_redis.subscribed: |
432
|
|
|
# TODO unsubscribe of all subscribed !IMPORTANT |
433
|
|
|
self.async_redis.unsubscribe([ |
434
|
|
|
ANONYMOUS_REDIS_CHANNEL, |
435
|
|
|
self.channel |
436
|
|
|
]) |
437
|
|
|
self.async_redis.disconnect() |
438
|
|
|
|
439
|
|
|
def open(self): |
440
|
|
|
session_key = self.get_cookie(settings.SESSION_COOKIE_NAME) |
441
|
|
|
if sessionStore.exists(session_key): |
442
|
|
|
self.logger.debug("!! Incoming connection, session %s, thread hash %s", session_key, id(self)) |
443
|
|
|
self.async_redis.connect() |
444
|
|
|
channels = self.set_username(session_key) |
445
|
|
|
self.ip = self.get_client_ip() |
446
|
|
|
log_params = { |
447
|
|
|
'username': self.sender_name.rjust(8), |
448
|
|
|
'id': self.log_id, |
449
|
|
|
'ip': self.ip |
450
|
|
|
} |
451
|
|
|
self.logger = logging.LoggerAdapter(logger, log_params) |
452
|
|
|
self.listen(channels) |
453
|
|
|
self.add_online_user() |
454
|
|
|
self.connected = True |
455
|
|
|
Thread(target=self.save_ip).start() |
456
|
|
|
else: |
457
|
|
|
self.logger.warning('!! Session key %s has been rejected', str(session_key)) |
458
|
|
|
self.close(403, "Session key %s has been rejected" % session_key) |
459
|
|
|
|
460
|
|
|
def check_origin(self, origin): |
461
|
|
|
""" |
462
|
|
|
check whether browser set domain matches origin |
463
|
|
|
""" |
464
|
|
|
parsed_origin = urlparse(origin) |
465
|
|
|
origin = parsed_origin.netloc |
466
|
|
|
origin_domain = origin.split(':')[0].lower() |
467
|
|
|
browser_set = self.request.headers.get("Host") |
468
|
|
|
browser_domain = browser_set.split(':')[0] |
469
|
|
|
return browser_domain == origin_domain |
470
|
|
|
|
471
|
|
|
def safe_write(self, message): |
472
|
|
|
""" |
473
|
|
|
Tries to send message, doesn't throw exception outside |
474
|
|
|
:type self: MessagesHandler |
475
|
|
|
""" |
476
|
|
|
# self.logger.debug('<< THREAD %s >>', os.getppid()) |
477
|
|
|
try: |
478
|
|
|
if isinstance(message, dict): |
479
|
|
|
message = json.dumps(message) |
480
|
|
|
if not (isinstance(message, str) or (not PY3 and isinstance(message, unicode))): |
481
|
|
|
raise ValueError('Wrong message type : %s' % str(message)) |
482
|
|
|
self.logger.debug(">> %s", message) |
483
|
|
|
self.write_message(message) |
484
|
|
|
except tornado.websocket.WebSocketClosedError as e: |
485
|
|
|
self.logger.error("%s. Can't send << %s >> message", e, str(message)) |
486
|
|
|
|
487
|
|
|
def get_client_ip(self): |
488
|
|
|
return self.request.headers.get("X-Real-IP") or self.request.remote_ip |
489
|
|
|
|