1 | import json |
||
2 | import logging |
||
3 | import re |
||
4 | import urllib |
||
5 | |||
6 | from django.core.exceptions import ValidationError |
||
7 | from django.db import transaction |
||
8 | from django.db.models import Q |
||
9 | from tornado import ioloop |
||
10 | from tornado.gen import engine, Task |
||
11 | from tornado.httpclient import AsyncHTTPClient, HTTPRequest |
||
12 | from tornado.ioloop import IOLoop |
||
13 | from tornado.web import asynchronous |
||
14 | from tornadoredis import Client |
||
15 | from django.conf import settings |
||
16 | from chat.global_redis import remove_parsable_prefix, encode_message |
||
17 | from chat.log_filters import id_generator |
||
18 | from chat.models import Message, Room, RoomUsers, Subscription, User, UserProfile, SubscriptionMessages, MessageHistory, \ |
||
19 | UploadedFile, Image |
||
20 | from chat.py2_3 import str_type, quote |
||
21 | from chat.settings import ALL_ROOM_ID, REDIS_PORT, WEBRTC_CONNECTION, GIPHY_URL, GIPHY_REGEX, FIREBASE_URL, REDIS_HOST |
||
22 | from chat.tornado.constants import VarNames, HandlerNames, Actions, RedisPrefix, WebRtcRedisStates |
||
23 | from chat.tornado.message_creator import WebRtcMessageCreator, MessagesCreator |
||
24 | from chat.utils import get_max_key, do_db, validate_edit_message, get_or_create_room, \ |
||
25 | create_room, get_message_images_videos, update_symbols, up_files_to_img |
||
26 | |||
27 | parent_logger = logging.getLogger(__name__) |
||
28 | base_logger = logging.LoggerAdapter(parent_logger, { |
||
29 | 'id': 0, |
||
30 | 'ip': '000.000.000.000' |
||
31 | }) |
||
32 | |||
33 | # TODO https://github.com/leporo/tornado-redis#connection-pool-support |
||
34 | # CONNECTION_POOL = tornadoredis.ConnectionPool( |
||
35 | # max_connections=500, |
||
36 | # wait_for_available=True) |
||
37 | |||
38 | GIPHY_API_KEY = getattr(settings, "GIPHY_API_KEY", None) |
||
39 | FIREBASE_API_KEY = getattr(settings, "FIREBASE_API_KEY", None) |
||
40 | |||
41 | class MessagesHandler(MessagesCreator): |
||
42 | |||
43 | def __init__(self, *args, **kwargs): |
||
44 | self.closed_channels = None |
||
45 | super(MessagesHandler, self).__init__() |
||
46 | self.webrtc_ids = {} |
||
47 | self.id = None # child init |
||
48 | self.sex = None |
||
49 | self.last_client_ping = 0 |
||
50 | self.sender_name = None |
||
51 | self.user_id = 0 # anonymous by default |
||
52 | self.ip = None |
||
53 | from chat import global_redis |
||
54 | self.async_redis_publisher = global_redis.async_redis_publisher |
||
55 | self.sync_redis = global_redis.sync_redis |
||
56 | self.channels = [] |
||
57 | self._logger = None |
||
58 | self.async_redis = Client(host=REDIS_HOST, port=REDIS_PORT) |
||
59 | self.patch_tornadoredis() |
||
60 | # input websocket messages handlers |
||
61 | # The handler is determined by @VarNames.EVENT |
||
62 | self.process_ws_message = { |
||
63 | Actions.GET_MESSAGES: self.process_get_messages, |
||
64 | Actions.SEND_MESSAGE: self.process_send_message, |
||
65 | Actions.CREATE_DIRECT_CHANNEL: self.create_user_channel, |
||
66 | Actions.DELETE_ROOM: self.delete_channel, |
||
67 | Actions.EDIT_MESSAGE: self.edit_message, |
||
68 | Actions.CREATE_ROOM_CHANNEL: self.create_new_room, |
||
69 | Actions.INVITE_USER: self.invite_user, |
||
70 | Actions.PING: self.respond_ping, |
||
71 | Actions.PONG: self.process_pong_message, |
||
72 | } |
||
73 | # Handlers for redis messages, if handler returns true - message won't be sent to client |
||
74 | # The handler is determined by @VarNames.EVENT |
||
75 | self.process_pubsub_message = { |
||
76 | Actions.CREATE_DIRECT_CHANNEL: self.send_client_new_channel, |
||
77 | Actions.CREATE_ROOM_CHANNEL: self.send_client_new_channel, |
||
78 | Actions.DELETE_ROOM: self.send_client_delete_channel, |
||
79 | Actions.INVITE_USER: self.send_client_new_channel, |
||
80 | Actions.PING: self.process_ping_message, |
||
81 | } |
||
82 | |||
83 | def patch_tornadoredis(self): # TODO remove this |
||
84 | fabric = type(self.async_redis.connection.readline) |
||
85 | self.async_redis.connection.old_read = self.async_redis.connection.readline |
||
86 | |||
87 | def new_read(new_self, callback=None): |
||
88 | try: |
||
89 | return new_self.old_read(callback=callback) |
||
90 | except Exception as e: |
||
91 | return |
||
92 | current_online = self.get_online_from_redis(RedisPrefix.DEFAULT_CHANNEL) |
||
93 | self.logger.error(e) |
||
94 | self.logger.error( |
||
95 | "Exception info: " |
||
96 | "self.id: %s ;;; " |
||
97 | "self.connected = '%s';;; " |
||
98 | "Redis default channel online = '%s';;; " |
||
99 | "self.channels = '%s';;; " |
||
100 | "self.closed_channels = '%s';;;", |
||
101 | self.id, self.connected, current_online, self.channels, self.closed_channels |
||
102 | ) |
||
103 | raise e |
||
104 | |||
105 | self.async_redis.connection.readline = fabric(new_read, self.async_redis.connection) |
||
106 | |||
107 | @property |
||
108 | def connected(self): |
||
109 | raise NotImplemented |
||
110 | |||
111 | @connected.setter |
||
112 | def connected(self, value): |
||
113 | raise NotImplemented |
||
114 | |||
115 | @property |
||
116 | def http_client(self): |
||
117 | raise NotImplemented |
||
118 | |||
119 | @engine |
||
120 | def listen(self, channels): |
||
121 | yield Task( |
||
122 | self.async_redis.subscribe, channels) |
||
123 | self.async_redis.listen(self.on_pub_sub_message) |
||
124 | |||
125 | @property |
||
126 | def logger(self): |
||
127 | return self._logger if self._logger else base_logger |
||
128 | |||
129 | @engine |
||
130 | def add_channel(self, channel): |
||
131 | self.channels.append(channel) |
||
132 | yield Task(self.async_redis.subscribe, (channel,)) |
||
133 | |||
134 | def get_online_from_redis(self, channel): |
||
135 | return self.get_online_and_status_from_redis(channel)[1] |
||
136 | |||
137 | def get_online_and_status_from_redis(self, channel): |
||
138 | """ |
||
139 | :rtype : (bool, list) |
||
140 | """ |
||
141 | online = self.sync_redis.ssmembers(channel) |
||
142 | self.logger.debug('!! channel %s redis online: %s', channel, online) |
||
143 | return self.parse_redis_online(online) if online else (False, []) |
||
144 | |||
145 | def parse_redis_online(self, online): |
||
146 | """ |
||
147 | :rtype : (bool, list) |
||
148 | """ |
||
149 | result = set() |
||
150 | user_is_online = False |
||
151 | for decoded in online: # py2 iteritems |
||
152 | # : char specified in cookies_middleware.py.create_id |
||
153 | user_id = int(decoded.split(':')[0]) |
||
154 | if user_id == self.user_id and decoded != self.id: |
||
155 | user_is_online = True |
||
156 | result.add(user_id) |
||
157 | return user_is_online, list(result) |
||
158 | |||
159 | def add_online_user(self, room_id): |
||
160 | """ |
||
161 | adds to redis |
||
162 | online_users = { connection_hash1 = stored_redis_user1, connection_hash_2 = stored_redis_user2 } |
||
163 | :return: if user is online |
||
164 | """ |
||
165 | self.async_redis_publisher.sadd(room_id, self.id) |
||
166 | # since we add user to online first, latest trigger will always show correct online |
||
167 | is_online, online = self.get_online_and_status_from_redis(room_id) |
||
168 | if is_online: # Send user names to self |
||
169 | online_user_names_mes = self.room_online(online, Actions.REFRESH_USER, room_id) |
||
170 | self.logger.info('!! Second tab, retrieving online for self') |
||
171 | self.ws_write(online_user_names_mes) |
||
172 | else: # if a new tab has been opened |
||
173 | online.append(self.user_id) |
||
174 | online_user_names_mes = self.room_online(online, Actions.LOGIN, room_id) |
||
175 | self.logger.info('!! First tab, sending refresh online for all') |
||
176 | self.publish(online_user_names_mes, room_id) |
||
177 | return is_online |
||
178 | |||
179 | def publish(self, message, channel, parsable=False): |
||
180 | jsoned_mess = encode_message(message, parsable) |
||
181 | self.logger.debug('<%s> %s', channel, jsoned_mess) |
||
182 | self.async_redis_publisher.publish(channel, jsoned_mess) |
||
183 | |||
184 | def on_pub_sub_message(self, message): |
||
185 | """ |
||
186 | All pubsub messages are automatically sent to client. |
||
187 | :param message: |
||
188 | :return: |
||
189 | """ |
||
190 | data = message.body |
||
191 | if isinstance(data, str_type): # not subscribe event |
||
192 | prefixless_str = remove_parsable_prefix(data) |
||
193 | if prefixless_str: |
||
194 | dict_message = json.loads(prefixless_str) |
||
195 | res = self.process_pubsub_message[dict_message[VarNames.EVENT]](dict_message) |
||
196 | if not res: |
||
197 | self.ws_write(prefixless_str) |
||
198 | else: |
||
199 | self.ws_write(data) |
||
200 | |||
201 | def ws_write(self, message): |
||
202 | raise NotImplementedError('WebSocketHandler implements') |
||
203 | |||
204 | @asynchronous |
||
205 | def search_giphy(self, message, query, cb): |
||
206 | self.logger.debug("!! Asking giphy for: %s", query) |
||
207 | def on_giphy_reply(response): |
||
208 | try: |
||
209 | self.logger.debug("!! Got giphy response: " + str(response.body)) |
||
210 | res = json.loads(response.body) |
||
211 | giphy = res['data'][0]['images']['downsized_medium']['url'] |
||
212 | except: |
||
213 | giphy = None |
||
214 | cb(message, giphy) |
||
215 | url = GIPHY_URL.format(GIPHY_API_KEY, quote(query, safe='')) |
||
216 | self.http_client.fetch(url, callback=on_giphy_reply) |
||
217 | |||
218 | def notify_offline(self, channel, message_id): |
||
219 | if FIREBASE_API_KEY is None: |
||
220 | return |
||
221 | online = self.get_online_from_redis(channel) |
||
222 | if channel == ALL_ROOM_ID: |
||
223 | return |
||
224 | offline_users = RoomUsers.objects.filter(room_id=channel, notifications=True).exclude(user_id__in=online).values_list('user_id') |
||
225 | subscriptions = Subscription.objects.filter(user__in=offline_users, inactive=False) |
||
226 | if len(subscriptions) == 0: |
||
227 | return |
||
228 | new_sub_mess =[SubscriptionMessages(message_id=message_id, subscription_id=r.id) for r in subscriptions] |
||
229 | reg_ids =[r.registration_id for r in subscriptions] |
||
230 | SubscriptionMessages.objects.bulk_create(new_sub_mess) |
||
231 | self.post_firebase(list(reg_ids)) |
||
232 | |||
233 | @asynchronous |
||
234 | def post_firebase(self, reg_ids): |
||
235 | def on_reply(response): |
||
236 | try: |
||
237 | self.logger.debug("!! FireBase response: " + str(response.body)) |
||
238 | response_obj = json.loads(response.body) |
||
239 | delete = [] |
||
240 | for index, elem in enumerate(response_obj['results']): |
||
241 | if elem.get('error') in ['NotRegistered', 'InvalidRegistration']: |
||
242 | delete.append(reg_ids[index]) |
||
243 | if len(delete) > 0: |
||
244 | self.logger.info("Deactivating subscriptions: %s", delete) |
||
245 | Subscription.objects.filter(registration_id__in=delete).update(inactive=True) |
||
246 | except Exception as e: |
||
247 | self.logger.error("Unable to parse response" + str(e)) |
||
248 | pass |
||
249 | |||
250 | headers = {"Content-Type": "application/json", "Authorization": "key=%s" % FIREBASE_API_KEY} |
||
251 | body = json.dumps({"registration_ids": reg_ids}) |
||
252 | self.logger.debug("!! post_fire_message %s", body) |
||
253 | r = HTTPRequest(FIREBASE_URL, method="POST", headers=headers, body=body) |
||
254 | self.http_client.fetch(r, callback=on_reply) |
||
255 | |||
256 | def isGiphy(self, content): |
||
257 | if GIPHY_API_KEY is not None and content is not None: |
||
258 | giphy_match = re.search(GIPHY_REGEX, content) |
||
259 | return giphy_match.group(1) if giphy_match is not None else None |
||
260 | |||
261 | def process_send_message(self, message): |
||
262 | """ |
||
263 | :type message: dict |
||
264 | """ |
||
265 | content = message.get(VarNames.CONTENT) |
||
266 | giphy_match = self.isGiphy(content) |
||
267 | |||
268 | # @transaction.atomic mysql has gone away |
||
269 | def send_message(message, giphy=None): |
||
270 | files = UploadedFile.objects.filter(id__in=message.get(VarNames.FILES), user_id=self.user_id) |
||
271 | symbol = get_max_key(files) |
||
272 | channel = message[VarNames.CHANNEL] |
||
273 | js_id = message[VarNames.JS_MESSAGE_ID] |
||
274 | message_db = Message( |
||
275 | sender_id=self.user_id, |
||
276 | content=message[VarNames.CONTENT], |
||
277 | symbol=symbol, |
||
278 | giphy=giphy, |
||
279 | room_id=channel |
||
280 | ) |
||
281 | res_files = [] |
||
282 | do_db(message_db.save) |
||
283 | if files: |
||
284 | images = up_files_to_img(files, message_db.id) |
||
285 | res_files = MessagesCreator.prepare_img_video(images, message_db.id) |
||
286 | prepared_message = self.create_send_message( |
||
287 | message_db, |
||
288 | Actions.PRINT_MESSAGE, |
||
289 | res_files, |
||
290 | js_id |
||
291 | ) |
||
292 | self.publish(prepared_message, channel) |
||
293 | self.notify_offline(channel, message_db.id) |
||
294 | if giphy_match is not None: |
||
295 | self.search_giphy(message, giphy_match, send_message) |
||
296 | else: |
||
297 | send_message(message) |
||
298 | |||
299 | def create_new_room(self, message): |
||
300 | room_name = message[VarNames.ROOM_NAME] |
||
301 | if not room_name or len(room_name) > 16: |
||
302 | raise ValidationError('Incorrect room name "{}"'.format(room_name)) |
||
303 | room = Room(name=room_name) |
||
304 | do_db(room.save) |
||
305 | RoomUsers(room_id=room.id, user_id=self.user_id).save() |
||
306 | subscribe_message = self.subscribe_room_channel_message(room.id, room_name) |
||
307 | self.publish(subscribe_message, self.channel, True) |
||
308 | |||
309 | def invite_user(self, message): |
||
310 | room_id = message[VarNames.ROOM_ID] |
||
311 | user_id = message[VarNames.USER_ID] |
||
312 | room = get_or_create_room(self.channels, room_id, user_id) |
||
313 | users_in_room = { |
||
314 | user.id: RedisPrefix.set_js_user_structure(user.username, user.sex) |
||
315 | for user in room.users.all() |
||
316 | } |
||
317 | self.publish(self.add_user_to_room(room_id, user_id, users_in_room[user_id]), room_id) |
||
318 | subscribe_message = self.invite_room_channel_message(room_id, user_id, room.name, users_in_room) |
||
319 | self.publish(subscribe_message, RedisPrefix.generate_user(user_id), True) |
||
320 | |||
321 | def respond_ping(self, message): |
||
322 | self.ws_write(self.responde_pong(message[VarNames.JS_MESSAGE_ID])) |
||
323 | |||
324 | def process_pong_message(self, message): |
||
325 | self.last_client_ping = message[VarNames.TIME] |
||
326 | |||
327 | def process_ping_message(self, message): |
||
328 | def call_check(): |
||
329 | if message[VarNames.TIME] != self.last_client_ping: |
||
330 | self.close(408, "Ping timeout") |
||
331 | IOLoop.instance().call_later(settings.PING_CLOSE_SERVER_DELAY, call_check) |
||
332 | |||
333 | def create_user_channel(self, message): |
||
334 | user_id = message[VarNames.USER_ID] |
||
335 | room_id = create_room(self.user_id, user_id) |
||
336 | subscribe_message = self.subscribe_direct_channel_message(room_id, user_id, self.user_id != user_id) |
||
337 | self.publish(subscribe_message, self.channel, True) |
||
338 | other_channel = RedisPrefix.generate_user(user_id) |
||
339 | if self.channel != other_channel: |
||
340 | self.publish(subscribe_message, other_channel, True) |
||
341 | |||
342 | def delete_channel(self, message): |
||
343 | room_id = message[VarNames.ROOM_ID] |
||
344 | if room_id not in self.channels or room_id == ALL_ROOM_ID: |
||
345 | raise ValidationError('You are not allowed to exit this room') |
||
346 | room = do_db(Room.objects.get, id=room_id) |
||
347 | if room.disabled: |
||
348 | raise ValidationError('Room is already deleted') |
||
349 | if room.name is None: # if private then disable |
||
350 | room.disabled = True |
||
351 | else: # if public -> leave the room, delete the link |
||
352 | RoomUsers.objects.filter(room_id=room.id, user_id=self.user_id).delete() |
||
353 | online = self.get_online_from_redis(room_id) |
||
354 | online.remove(self.user_id) |
||
355 | self.publish(self.room_online(online, Actions.LOGOUT, room_id), room_id) |
||
356 | room.save() |
||
357 | message = self.unsubscribe_direct_message(room_id) |
||
358 | self.publish(message, room_id, True) |
||
359 | |||
360 | def edit_message(self, data): |
||
361 | View Code Duplication | js_id = data[VarNames.JS_MESSAGE_ID] |
|
1 ignored issue
–
show
Duplication
introduced
by
Loading history...
|
|||
362 | message = do_db(Message.objects.get, id=data[VarNames.MESSAGE_ID]) |
||
363 | validate_edit_message(self.user_id, message) |
||
364 | message.content = data[VarNames.CONTENT] |
||
365 | MessageHistory(message=message, content=message.content, giphy=message.giphy).save() |
||
366 | message.edited_times += 1 |
||
367 | giphy_match = self.isGiphy(data[VarNames.CONTENT]) |
||
368 | if message.content is None: |
||
369 | Message.objects.filter(id=data[VarNames.MESSAGE_ID]).update(deleted=True, edited_times=message.edited_times) |
||
370 | self.publish(self.create_send_message(message, Actions.DELETE_MESSAGE, None, js_id), message.room_id) |
||
371 | elif giphy_match is not None: |
||
372 | self.edit_message_giphy(giphy_match, message, js_id) |
||
373 | else: |
||
374 | self.edit_message_edit(data, message, js_id) |
||
375 | |||
376 | def edit_message_giphy(self, giphy_match, message, js_id): |
||
377 | def edit_glyphy(message, giphy): |
||
378 | do_db(Message.objects.filter(id=message.id).update, content=message.content, symbol=message.symbol, giphy=giphy, |
||
379 | edited_times=message.edited_times) |
||
380 | message.giphy = giphy |
||
381 | self.publish(self.create_send_message(message, Actions.EDIT_MESSAGE, None, js_id), message.room_id) |
||
382 | |||
383 | self.search_giphy(message, giphy_match, edit_glyphy) |
||
384 | |||
385 | def edit_message_edit(self, data, message, js_id): |
||
386 | action = Actions.EDIT_MESSAGE |
||
387 | message.giphy = None |
||
388 | files = UploadedFile.objects.filter(id__in=data.get(VarNames.FILES), user_id=self.user_id) |
||
389 | if files: |
||
390 | update_symbols(files, message) |
||
391 | up_files_to_img(files, message.id) |
||
392 | if message.symbol: # fetch all, including that we just store |
||
393 | db_images = Image.objects.filter(message_id=message.id) |
||
394 | prep_files = MessagesCreator.prepare_img_video(db_images, message.id) |
||
395 | else: |
||
396 | prep_files = None |
||
397 | Message.objects.filter(id=message.id).update(content=message.content, symbol=message.symbol, giphy=None, edited_times=message.edited_times) |
||
398 | self.publish(self.create_send_message(message, action, prep_files, js_id), message.room_id) |
||
399 | |||
400 | def send_client_new_channel(self, message): |
||
401 | room_id = message[VarNames.ROOM_ID] |
||
402 | self.add_channel(room_id) |
||
403 | self.add_online_user(room_id) |
||
404 | |||
405 | def send_client_delete_channel(self, message): |
||
406 | room_id = message[VarNames.ROOM_ID] |
||
407 | self.async_redis.unsubscribe((room_id,)) |
||
408 | self.async_redis_publisher.hdel(room_id, self.id) |
||
409 | self.channels.remove(room_id) |
||
410 | |||
411 | def process_get_messages(self, data): |
||
412 | """ |
||
413 | :type data: dict |
||
414 | """ |
||
415 | header_id = data.get(VarNames.GET_MESSAGES_HEADER_ID, None) |
||
416 | count = int(data.get(VarNames.GET_MESSAGES_COUNT, 10)) |
||
417 | room_id = data[VarNames.CHANNEL] |
||
418 | self.logger.info('!! Fetching %d messages starting from %s', count, header_id) |
||
419 | if header_id is None: |
||
420 | messages = Message.objects.filter(Q(room_id=room_id), Q(deleted=False)).order_by('-pk')[:count] |
||
421 | else: |
||
422 | messages = Message.objects.filter(Q(id__lt=header_id), Q(room_id=room_id), Q(deleted=False)).order_by('-pk')[:count] |
||
423 | imv = do_db(get_message_images_videos, messages) |
||
424 | response = self.get_messages(messages, room_id, imv, MessagesCreator.prepare_img_video) |
||
425 | self.ws_write(response) |
||
426 | |||
427 | |||
428 | class WebRtcMessageHandler(MessagesHandler, WebRtcMessageCreator): |
||
429 | |||
430 | def __init__(self, *args, **kwargs): |
||
431 | super(WebRtcMessageHandler, self).__init__(*args, **kwargs) |
||
432 | self.process_ws_message.update({ |
||
433 | Actions.WEBRTC: self.proxy_webrtc, |
||
434 | Actions.CLOSE_FILE_CONNECTION: self.close_file_connection, |
||
435 | Actions.CLOSE_CALL_CONNECTION: self.close_call_connection, |
||
436 | Actions.CANCEL_CALL_CONNECTION: self.cancel_call_connection, |
||
437 | Actions.ACCEPT_CALL: self.accept_call, |
||
438 | Actions.ACCEPT_FILE: self.accept_file, |
||
439 | Actions.OFFER_FILE_CONNECTION: self.offer_webrtc_connection, |
||
440 | Actions.OFFER_CALL_CONNECTION: self.offer_webrtc_connection, |
||
441 | Actions.REPLY_FILE_CONNECTION: self.reply_file_connection, |
||
442 | Actions.RETRY_FILE_CONNECTION: self.retry_file_connection, |
||
443 | Actions.REPLY_CALL_CONNECTION: self.reply_call_connection, |
||
444 | }) |
||
445 | self.process_pubsub_message.update({ |
||
446 | Actions.OFFER_FILE_CONNECTION: self.set_opponent_call_channel, |
||
447 | Actions.OFFER_CALL_CONNECTION: self.set_opponent_call_channel |
||
448 | }) |
||
449 | |||
450 | def set_opponent_call_channel(self, message): |
||
451 | connection_id = message[VarNames.CONNECTION_ID] |
||
452 | if message[VarNames.WEBRTC_OPPONENT_ID] == self.id: |
||
453 | return True |
||
454 | self.sync_redis.hset(connection_id, self.id, WebRtcRedisStates.OFFERED) |
||
455 | |||
456 | def offer_webrtc_connection(self, in_message): |
||
457 | room_id = in_message[VarNames.CHANNEL] |
||
458 | content = in_message.get(VarNames.CONTENT) |
||
459 | View Code Duplication | qued_id = in_message[VarNames.WEBRTC_QUED_ID] |
|
1 ignored issue
–
show
|
|||
460 | connection_id = id_generator(RedisPrefix.CONNECTION_ID_LENGTH) |
||
461 | # use list because sets dont have 1st element which is offerer |
||
462 | self.async_redis_publisher.hset(WEBRTC_CONNECTION, connection_id, self.id) |
||
463 | self.async_redis_publisher.hset(connection_id, self.id, WebRtcRedisStates.READY) |
||
464 | opponents_message = self.offer_webrtc(content, connection_id, room_id, in_message[VarNames.EVENT]) |
||
465 | self_message = self.set_connection_id(qued_id, connection_id) |
||
466 | self.ws_write(self_message) |
||
467 | self.logger.info('!! Offering a webrtc, connection_id %s', connection_id) |
||
468 | self.publish(opponents_message, room_id, True) |
||
469 | |||
470 | def retry_file_connection(self, in_message): |
||
471 | connection_id = in_message[VarNames.CONNECTION_ID] |
||
472 | opponent_ws_id = in_message[VarNames.WEBRTC_OPPONENT_ID] |
||
473 | sender_ws_id = self.sync_redis.shget(WEBRTC_CONNECTION, connection_id) |
||
474 | receiver_ws_status = self.sync_redis.shget(connection_id, opponent_ws_id) |
||
475 | if receiver_ws_status == WebRtcRedisStates.READY and self.id == sender_ws_id: |
||
476 | self.publish(self.retry_file(connection_id), opponent_ws_id) |
||
477 | else: |
||
478 | raise ValidationError("Invalid channel status.") |
||
479 | |||
480 | def reply_file_connection(self, in_message): |
||
481 | connection_id = in_message[VarNames.CONNECTION_ID] |
||
482 | sender_ws_id = self.sync_redis.shget(WEBRTC_CONNECTION, connection_id) |
||
483 | sender_ws_status = self.sync_redis.shget(connection_id, sender_ws_id) |
||
484 | self_ws_status = self.sync_redis.shget(connection_id, self.id) |
||
485 | if sender_ws_status == WebRtcRedisStates.READY and self_ws_status == WebRtcRedisStates.OFFERED: |
||
486 | self.async_redis_publisher.hset(connection_id, self.id, WebRtcRedisStates.RESPONDED) |
||
487 | self.publish(self.reply_webrtc( |
||
488 | Actions.REPLY_FILE_CONNECTION, |
||
489 | connection_id, |
||
490 | HandlerNames.WEBRTC_TRANSFER, |
||
491 | in_message[VarNames.CONTENT] |
||
492 | ), sender_ws_id) |
||
493 | else: |
||
494 | raise ValidationError("Invalid channel status.") |
||
495 | |||
496 | def reply_call_connection(self, in_message): |
||
497 | self.send_call_answer( |
||
498 | in_message, |
||
499 | WebRtcRedisStates.RESPONDED, |
||
500 | Actions.REPLY_CALL_CONNECTION, |
||
501 | [WebRtcRedisStates.OFFERED], |
||
502 | HandlerNames.WEBRTC_TRANSFER |
||
503 | ) |
||
504 | |||
505 | def proxy_webrtc(self, in_message): |
||
506 | """ |
||
507 | :type in_message: dict |
||
508 | """ |
||
509 | connection_id = in_message[VarNames.CONNECTION_ID] |
||
510 | channel = in_message.get(VarNames.WEBRTC_OPPONENT_ID) |
||
511 | self_channel_status = self.sync_redis.shget(connection_id, self.id) |
||
512 | opponent_channel_status = self.sync_redis.shget(connection_id, channel) |
||
513 | if not (self_channel_status == WebRtcRedisStates.READY and opponent_channel_status == WebRtcRedisStates.READY): |
||
514 | raise ValidationError('Error in connection status, your status is {} while opponent is {}'.format( |
||
515 | self_channel_status, opponent_channel_status |
||
516 | )) # todo receiver should only accept proxy_webrtc from sender, sender can accept all |
||
517 | # I mean somebody if there're 3 ppl in 1 channel and first is initing transfer to 2nd and 3rd, |
||
518 | # 2nd guy can fraud 3rd guy webrtc traffic, which is allowed during the call, but not while transering file |
||
519 | in_message[VarNames.WEBRTC_OPPONENT_ID] = self.id |
||
520 | in_message[VarNames.HANDLER_NAME] = HandlerNames.PEER_CONNECTION |
||
521 | self.logger.debug( |
||
522 | "!! Forwarding message to channel %s, self %s, other status %s", |
||
523 | channel, |
||
524 | self_channel_status, |
||
525 | opponent_channel_status |
||
526 | ) |
||
527 | self.publish(in_message, channel) |
||
528 | |||
529 | def close_file_connection(self, in_message): |
||
530 | connection_id = in_message[VarNames.CONNECTION_ID] |
||
531 | self_channel_status = self.sync_redis.shget(connection_id, self.id) |
||
532 | if not self_channel_status: |
||
533 | raise Exception("Access Denied") |
||
534 | if self_channel_status != WebRtcRedisStates.CLOSED: |
||
535 | sender_id = self.sync_redis.shget(WEBRTC_CONNECTION, connection_id) |
||
536 | if sender_id == self.id: |
||
537 | self.close_file_sender(connection_id) |
||
538 | else: |
||
539 | self.close_file_receiver(connection_id, in_message, sender_id) |
||
540 | self.async_redis_publisher.hset(connection_id, self.id, WebRtcRedisStates.CLOSED) |
||
541 | |||
542 | def close_call_connection(self, in_message): |
||
543 | self.send_call_answer( |
||
544 | in_message, |
||
545 | WebRtcRedisStates.CLOSED, |
||
546 | Actions.CLOSE_CALL_CONNECTION, |
||
547 | [WebRtcRedisStates.READY, WebRtcRedisStates.RESPONDED], |
||
548 | HandlerNames.PEER_CONNECTION |
||
549 | ) |
||
550 | |||
551 | def cancel_call_connection(self, in_message): |
||
552 | self.send_call_answer( |
||
553 | in_message, |
||
554 | WebRtcRedisStates.CLOSED, |
||
555 | Actions.CANCEL_CALL_CONNECTION, |
||
556 | [WebRtcRedisStates.OFFERED], |
||
557 | HandlerNames.WEBRTC_TRANSFER |
||
558 | ) |
||
559 | |||
560 | def close_file_receiver(self, connection_id, in_message, sender_id): |
||
561 | sender_status = self.sync_redis.shget(connection_id, sender_id) |
||
562 | if not sender_status: |
||
563 | raise Exception("Access denied") |
||
564 | if sender_status != WebRtcRedisStates.CLOSED: |
||
565 | in_message[VarNames.WEBRTC_OPPONENT_ID] = self.id |
||
566 | in_message[VarNames.HANDLER_NAME] = HandlerNames.PEER_CONNECTION |
||
567 | self.publish(in_message, sender_id) |
||
568 | |||
569 | def close_file_sender(self, connection_id): |
||
570 | values = self.sync_redis.shgetall(connection_id) |
||
571 | del values[self.id] |
||
572 | message = self.get_close_file_sender_message(connection_id) |
||
573 | for ws_id in values: |
||
574 | if values[ws_id] == WebRtcRedisStates.CLOSED: |
||
575 | continue |
||
576 | self.publish(message, ws_id) |
||
577 | |||
578 | def accept_file(self, in_message): |
||
579 | connection_id = in_message[VarNames.CONNECTION_ID] |
||
580 | content = in_message[VarNames.CONTENT] |
||
581 | sender_ws_id = self.sync_redis.shget(WEBRTC_CONNECTION, connection_id) |
||
582 | sender_ws_status = self.sync_redis.shget(connection_id, sender_ws_id) |
||
583 | self_ws_status = self.sync_redis.shget(connection_id, self.id) |
||
584 | if sender_ws_status == WebRtcRedisStates.READY \ |
||
585 | and self_ws_status in [WebRtcRedisStates.RESPONDED, WebRtcRedisStates.READY]: |
||
586 | self.async_redis_publisher.hset(connection_id, self.id, WebRtcRedisStates.READY) |
||
587 | self.publish(self.get_accept_file_message(connection_id, content), sender_ws_id) |
||
588 | else: |
||
589 | raise ValidationError("Invalid channel status") |
||
590 | |||
591 | # todo |
||
592 | # we can use channel_status = self.sync_redis.shgetall(connection_id) |
||
593 | # and then self.async_redis_publisher.hset(connection_id, self.id, WebRtcRedisStates.READY) |
||
594 | # if we shgetall and only then do async hset |
||
595 | # we can catch an issue when 2 concurrent users accepted the call |
||
596 | # but we didn't send them ACCEPT_CALL as they both were in status 'offered' |
||
597 | def accept_call(self, in_message): |
||
598 | connection_id = in_message[VarNames.CONNECTION_ID] |
||
599 | self_status = self.sync_redis.shget(connection_id, self.id) |
||
600 | if self_status == WebRtcRedisStates.RESPONDED: |
||
601 | conn_users = self.sync_redis.shgetall(connection_id) |
||
602 | self.publish_call_answer( |
||
603 | conn_users, |
||
604 | connection_id, |
||
605 | HandlerNames.WEBRTC_TRANSFER, |
||
606 | Actions.ACCEPT_CALL, |
||
607 | WebRtcRedisStates.READY, |
||
608 | {} |
||
609 | ) |
||
610 | else: |
||
611 | raise ValidationError("Invalid channel status") |
||
612 | |||
613 | def send_call_answer(self, in_message, status_set, reply_action, allowed_state, message_handler): |
||
614 | connection_id = in_message[VarNames.CONNECTION_ID] |
||
615 | content = in_message.get(VarNames.CONTENT) # cancel call can skip browser |
||
616 | conn_users = self.sync_redis.shgetall(connection_id) |
||
617 | if conn_users[self.id] in allowed_state: |
||
618 | self.publish_call_answer(conn_users, connection_id, message_handler, reply_action, status_set, content) |
||
619 | else: |
||
620 | raise ValidationError("Invalid channel status.") |
||
621 | |||
622 | def publish_call_answer(self, conn_users, connection_id, message_handler, reply_action, status_set, content): |
||
623 | self.async_redis_publisher.hset(connection_id, self.id, status_set) |
||
624 | del conn_users[self.id] |
||
625 | message = self.reply_webrtc(reply_action, connection_id, message_handler, content) |
||
626 | for user in conn_users: |
||
627 | if conn_users[user] != WebRtcRedisStates.CLOSED: |
||
628 | self.publish(message, user) |