Completed
Push — master ( 226f71...6d1fe2 )
by Andrew
34s
created

encode_message()   A

Complexity

Conditions 2

Size

Total Lines 11

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
cc 2
dl 0
loc 11
rs 9.4285
c 0
b 0
f 0
1
import json
2
from datetime import datetime
3
4
import logging
5
6
import redis
7
import tornadoredis
8
9
from chat.models import get_milliseconds
10
from chat.settings import ALL_REDIS_ROOM, REDIS_PORT, REDIS_HOST
11
from chat.settings_base import ALL_ROOM_ID
12
from chat.tornado.constants import RedisPrefix
13
from chat.tornado.message_creator import MessagesCreator
14
15
logger = logging.getLogger(__name__)
16
17
18
def new_read(instance, *args, **kwargs):
19
	try:
20
		return instance.old_read(*args, **kwargs)
21
	except Exception as e:
22
		redis_online = sync_redis.hgetall(ALL_REDIS_ROOM)
23
		logger.warning("Error occurred during reading tornadoredis connection. Redis online  %s", redis_online)
24
		raise e
25
26
27
def patch_read(tornado_redis):
28
	fabric = type(tornado_redis.connection.read)
29
	tornado_redis.connection.old_read = tornado_redis.connection.read
30
	tornado_redis.connection.read = fabric(new_read, tornado_redis.connection)
31
32
33
def new_hget(instance, *args, **kwargs):
34
	res = instance.hget(*args, **kwargs)
35
	return res.decode('utf-8') if res else None
36
37
38
def patch_hget(arg_red):
39
	fabric = type(arg_red.hget)
40
	arg_red.shget = fabric(new_hget, arg_red)
41
42
43
def patch_hgetall(arg_red):
44
	fabric = type(arg_red.hgetall)
45
	arg_red.shgetall = fabric(new_hgetall, arg_red)
46
47
48
def new_hgetall(instance, *args, **kwargs):
49
	res = instance.hgetall(*args, **kwargs) # neither key or value are null
50
	return {k.decode('utf-8'): res[k].decode('utf-8') for k in res}
51
52
53
def patch_smembers(arg_red):
54
	fabric = type(arg_red.smembers)
55
	arg_red.ssmembers = fabric(new_smembers, arg_red)
56
57
58
def new_smembers(instance, *args, **kwargs):
59
	res = instance.smembers(*args, **kwargs) # neither key or value are null
60
	return [k.decode('utf-8') for k in res]
61
62
63
def encode_message(message, parsable):
64
	"""
65
	@param parsable: Marks message with prefix to specify that
66
	it should be decoded and proccesed before sending to client
67
	@param message: message to mark
68
	@return: marked message
69
	"""
70
	jsoned_mess = json.dumps(message)
71
	if parsable:
72
		jsoned_mess = RedisPrefix.PARSABLE_PREFIX + jsoned_mess
73
	return jsoned_mess
74
75
76
def remove_parsable_prefix(message):
77
	if message.startswith(RedisPrefix.PARSABLE_PREFIX):
78
		return message[1:]
79
80
81
def ping_online():
82
	message = encode_message(MessagesCreator.ping_client(get_milliseconds()), True)
83
	logger.info("Pinging clients: %s", message)
84
	async_redis_publisher.publish(ALL_ROOM_ID, message)
85
86
87
# # global connection to read synchronously
88
sync_redis = redis.StrictRedis(host=REDIS_HOST, port=REDIS_PORT)
89
patch_hget(sync_redis)
90
patch_hgetall(sync_redis)
91
patch_smembers(sync_redis)
92
# patch(sync_redis)
93
# Redis connection cannot be shared between publishers and subscribers.
94
async_redis_publisher = tornadoredis.Client(host=REDIS_HOST, port=REDIS_PORT)
95
patch_read(async_redis_publisher)