|
1
|
|
|
import json |
|
2
|
|
|
from datetime import datetime |
|
3
|
|
|
|
|
4
|
|
|
import logging |
|
5
|
|
|
|
|
6
|
|
|
import redis |
|
7
|
|
|
import tornadoredis |
|
8
|
|
|
|
|
9
|
|
|
from chat.models import get_milliseconds |
|
10
|
|
|
from chat.settings import ALL_REDIS_ROOM, REDIS_PORT, REDIS_HOST |
|
11
|
|
|
from chat.settings_base import ALL_ROOM_ID |
|
12
|
|
|
from chat.tornado.constants import RedisPrefix |
|
13
|
|
|
from chat.tornado.message_creator import MessagesCreator |
|
14
|
|
|
|
|
15
|
|
|
logger = logging.getLogger(__name__) |
|
16
|
|
|
|
|
17
|
|
|
|
|
18
|
|
|
def new_read(instance, *args, **kwargs): |
|
19
|
|
|
try: |
|
20
|
|
|
return instance.old_read(*args, **kwargs) |
|
21
|
|
|
except Exception as e: |
|
22
|
|
|
redis_online = sync_redis.hgetall(ALL_REDIS_ROOM) |
|
23
|
|
|
logger.warning("Error occurred during reading tornadoredis connection. Redis online %s", redis_online) |
|
24
|
|
|
raise e |
|
25
|
|
|
|
|
26
|
|
|
|
|
27
|
|
|
def patch_read(tornado_redis): |
|
28
|
|
|
fabric = type(tornado_redis.connection.read) |
|
29
|
|
|
tornado_redis.connection.old_read = tornado_redis.connection.read |
|
30
|
|
|
tornado_redis.connection.read = fabric(new_read, tornado_redis.connection) |
|
31
|
|
|
|
|
32
|
|
|
|
|
33
|
|
|
def new_hget(instance, *args, **kwargs): |
|
34
|
|
|
res = instance.hget(*args, **kwargs) |
|
35
|
|
|
return res.decode('utf-8') if res else None |
|
36
|
|
|
|
|
37
|
|
|
|
|
38
|
|
|
def patch_hget(arg_red): |
|
39
|
|
|
fabric = type(arg_red.hget) |
|
40
|
|
|
arg_red.shget = fabric(new_hget, arg_red) |
|
41
|
|
|
|
|
42
|
|
|
|
|
43
|
|
|
def patch_hgetall(arg_red): |
|
44
|
|
|
fabric = type(arg_red.hgetall) |
|
45
|
|
|
arg_red.shgetall = fabric(new_hgetall, arg_red) |
|
46
|
|
|
|
|
47
|
|
|
|
|
48
|
|
|
def new_hgetall(instance, *args, **kwargs): |
|
49
|
|
|
res = instance.hgetall(*args, **kwargs) # neither key or value are null |
|
50
|
|
|
return {k.decode('utf-8'): res[k].decode('utf-8') for k in res} |
|
51
|
|
|
|
|
52
|
|
|
|
|
53
|
|
|
def patch_smembers(arg_red): |
|
54
|
|
|
fabric = type(arg_red.smembers) |
|
55
|
|
|
arg_red.ssmembers = fabric(new_smembers, arg_red) |
|
56
|
|
|
|
|
57
|
|
|
|
|
58
|
|
|
def new_smembers(instance, *args, **kwargs): |
|
59
|
|
|
res = instance.smembers(*args, **kwargs) # neither key or value are null |
|
60
|
|
|
return [k.decode('utf-8') for k in res] |
|
61
|
|
|
|
|
62
|
|
|
|
|
63
|
|
|
def encode_message(message, parsable): |
|
64
|
|
|
""" |
|
65
|
|
|
@param parsable: Marks message with prefix to specify that |
|
66
|
|
|
it should be decoded and proccesed before sending to client |
|
67
|
|
|
@param message: message to mark |
|
68
|
|
|
@return: marked message |
|
69
|
|
|
""" |
|
70
|
|
|
jsoned_mess = json.dumps(message) |
|
71
|
|
|
if parsable: |
|
72
|
|
|
jsoned_mess = RedisPrefix.PARSABLE_PREFIX + jsoned_mess |
|
73
|
|
|
return jsoned_mess |
|
74
|
|
|
|
|
75
|
|
|
|
|
76
|
|
|
def remove_parsable_prefix(message): |
|
77
|
|
|
if message.startswith(RedisPrefix.PARSABLE_PREFIX): |
|
78
|
|
|
return message[1:] |
|
79
|
|
|
|
|
80
|
|
|
|
|
81
|
|
|
def ping_online(): |
|
82
|
|
|
message = encode_message(MessagesCreator.ping_client(get_milliseconds()), True) |
|
83
|
|
|
logger.info("Pinging clients: %s", message) |
|
84
|
|
|
async_redis_publisher.publish(ALL_ROOM_ID, message) |
|
85
|
|
|
|
|
86
|
|
|
|
|
87
|
|
|
# # global connection to read synchronously |
|
88
|
|
|
sync_redis = redis.StrictRedis(host=REDIS_HOST, port=REDIS_PORT) |
|
89
|
|
|
patch_hget(sync_redis) |
|
90
|
|
|
patch_hgetall(sync_redis) |
|
91
|
|
|
patch_smembers(sync_redis) |
|
92
|
|
|
# patch(sync_redis) |
|
93
|
|
|
# Redis connection cannot be shared between publishers and subscribers. |
|
94
|
|
|
async_redis_publisher = tornadoredis.Client(host=REDIS_HOST, port=REDIS_PORT) |
|
95
|
|
|
patch_read(async_redis_publisher) |