WebSocketLoadTest.setUp()   A
last analyzed

Complexity

Conditions 1

Size

Total Lines 2

Duplication

Lines 0
Ratio 0 %

Importance

Changes 1
Bugs 0 Features 0
Metric Value
cc 1
c 1
b 0
f 0
dl 0
loc 2
rs 10
1
import json
2
import ssl
3
from random import randint
4
from random import random
5
from threading import Thread
6
from time import sleep
7
8
from django.conf import settings
9
from django.test import TestCase
10
from websocket import create_connection
11
12
# class ModelTest(TestCase):
13
#
14
# 	def test_gender(self):
15
# 		user = UserProfile(sex_str='Female')
16
# 		self.assertEqual(user.sex, 2)
17
# 		user.sex_str = 'Male'
18
# 		self.assertEqual(user.sex, 1)
19
# 		user.sex_str = 'WrongString'
20
# 		self.assertEqual(user.sex, 0)
21
#
22
#
23
# class RegisterUtilsTest(TestCase):
24
#
25
# 	def test_check_password(self):
26
# 		self.assertRaises(ValidationError, check_password, "ag")
27
# 		self.assertRaises(ValidationError, check_password, "")
28
# 		self.assertRaises(ValidationError, check_password, "  	")
29
# 		self.assertRaises(ValidationError, check_password, "  fs	")
30
# 		check_password("FineP@ssord")
31
#
32
# 	def test_send_email(self):
33
# 		up = UserProfile(username='Test', email='[email protected]', sex_str='Mail')
34
# 		send_email_verification(up, 'Any')
35
#
36
# 	def test_check_user(self):
37
# 		self.assertRaises(ValidationError, check_user, "d"*100)
38
# 		self.assertRaises(ValidationError, check_user, "asdfs,+")
39
# 		check_user("Fine")
40
#
41
#
42
# class SeleniumBrowserTest(TestCase):
43
#
44
# 	def test_check_main_page(self):
45
# 		driver = webdriver.Firefox()
46
# 		driver.get("localhost:8000")  # TODO inject url
47
# 		assert "chat" in driver.title
48
# 		elem = driver.find_element_by_id("userNameLabel")
49
# 		self.assertRegexpMatches(elem.text, "^[a-zA-Z-_0-9]{1,16}$")
50
# 		driver.close()
51
from chat.global_redis import sync_redis
52
from chat.models import UserProfile
53
from chat.socials import GoogleAuth
54
from chat.tornado.constants import VarNames, Actions
55
56
57
class RegisterTest(TestCase):
58
59
    def test_animals_can_speak(self):
60
		user_profile = UserProfile(
61
			 name='test',
62
			 surname='test',
63
			 email='[email protected]',
64
			 username='test'
65
		 )
66
		gauth = GoogleAuth()
67
		gauth.download_http_photo('https://lh4.googleusercontent.com/-CuLSUOTQ4Kw/AAAAAAAAAAI/AAAAAAAAANQ/VlgHrqehE90/s96-c/photo.jpg', user_profile)
68
69
class WebSocketLoadTest(TestCase):
70
71
	SITE_TO_SPAM = "127.0.0.1:8888"
72
73
	def setUp(self):
74
		pass
75
		# Room.objects.create(name=ANONYMOUS_REDIS_ROOM)
76
		# subprocess.Popen("/usr/bin/redis-server")
77
		# thread = Thread(target=call_command, args=('start_tornado',))
78
		# thread.start()
79
80
	def threaded_function(self, session, num):
81
		cookies = '{}={}'.format(settings.SESSION_COOKIE_NAME, session)
82
83
		ws = create_connection("{}://{}".format(settings.WEBSOCKET_PROTOCOL, self.SITE_TO_SPAM), cookie=cookies, sslopt={"cert_reqs": ssl.CERT_NONE})
84
		print("Connected #{}  with sessions {}".format(num, session))
85
		for i in range(randint(30, 50)):
86
			if i % 10 == 0:
87
				print("{}#{} sent {}".format(session, num, i))
88
			sleep(random())
89
			ws.send(json.dumps({
90
				VarNames.CONTENT: "{}".format(i),
91
				VarNames.EVENT: Actions.SEND_MESSAGE,
92
				VarNames.ROOM_ID: settings.ALL_ROOM_ID
93
			}))
94
95
	# def read_session(self):
96
	# 	with open('sessionsids.txt') as f:
97
	# 		lines =f.read().splitlines()
98
	# 		return lines
99
100
101
	def read_session(self):
102
		return [k for k in sync_redis.keys() if len(k) == 32]
103
104
	def test_simple(self):
105
		max_users = 10
106
		for session in self.read_session():
107
			max_users -= 1
108
			if max_users < 0:
109
				break
110
			for i in range(randint(3, 7)):
111
				thread = Thread(target=self.threaded_function, args=(session, i))
112
				thread.start()
113