|
1
|
|
|
import json |
|
2
|
|
|
import ssl |
|
3
|
|
|
from random import randint |
|
4
|
|
|
from random import random |
|
5
|
|
|
from threading import Thread |
|
6
|
|
|
from time import sleep |
|
7
|
|
|
|
|
8
|
|
|
from django.conf import settings |
|
9
|
|
|
from django.test import TestCase |
|
10
|
|
|
from websocket import create_connection |
|
11
|
|
|
|
|
12
|
|
|
# class ModelTest(TestCase): |
|
13
|
|
|
# |
|
14
|
|
|
# def test_gender(self): |
|
15
|
|
|
# user = UserProfile(sex_str='Female') |
|
16
|
|
|
# self.assertEqual(user.sex, 2) |
|
17
|
|
|
# user.sex_str = 'Male' |
|
18
|
|
|
# self.assertEqual(user.sex, 1) |
|
19
|
|
|
# user.sex_str = 'WrongString' |
|
20
|
|
|
# self.assertEqual(user.sex, 0) |
|
21
|
|
|
# |
|
22
|
|
|
# |
|
23
|
|
|
# class RegisterUtilsTest(TestCase): |
|
24
|
|
|
# |
|
25
|
|
|
# def test_check_password(self): |
|
26
|
|
|
# self.assertRaises(ValidationError, check_password, "ag") |
|
27
|
|
|
# self.assertRaises(ValidationError, check_password, "") |
|
28
|
|
|
# self.assertRaises(ValidationError, check_password, " ") |
|
29
|
|
|
# self.assertRaises(ValidationError, check_password, " fs ") |
|
30
|
|
|
# check_password("FineP@ssord") |
|
31
|
|
|
# |
|
32
|
|
|
# def test_send_email(self): |
|
33
|
|
|
# up = UserProfile(username='Test', email='[email protected]', sex_str='Mail') |
|
34
|
|
|
# send_email_verification(up, 'Any') |
|
35
|
|
|
# |
|
36
|
|
|
# def test_check_user(self): |
|
37
|
|
|
# self.assertRaises(ValidationError, check_user, "d"*100) |
|
38
|
|
|
# self.assertRaises(ValidationError, check_user, "asdfs,+") |
|
39
|
|
|
# check_user("Fine") |
|
40
|
|
|
# |
|
41
|
|
|
# |
|
42
|
|
|
# class SeleniumBrowserTest(TestCase): |
|
43
|
|
|
# |
|
44
|
|
|
# def test_check_main_page(self): |
|
45
|
|
|
# driver = webdriver.Firefox() |
|
46
|
|
|
# driver.get("localhost:8000") # TODO inject url |
|
47
|
|
|
# assert "chat" in driver.title |
|
48
|
|
|
# elem = driver.find_element_by_id("userNameLabel") |
|
49
|
|
|
# self.assertRegexpMatches(elem.text, "^[a-zA-Z-_0-9]{1,16}$") |
|
50
|
|
|
# driver.close() |
|
51
|
|
|
from chat.global_redis import sync_redis |
|
52
|
|
|
from chat.models import UserProfile |
|
53
|
|
|
from chat.socials import GoogleAuth |
|
54
|
|
|
from chat.tornado.constants import VarNames, Actions |
|
55
|
|
|
|
|
56
|
|
|
|
|
57
|
|
|
class RegisterTest(TestCase): |
|
58
|
|
|
|
|
59
|
|
|
def test_animals_can_speak(self): |
|
60
|
|
|
user_profile = UserProfile( |
|
61
|
|
|
name='test', |
|
62
|
|
|
surname='test', |
|
63
|
|
|
email='[email protected]', |
|
64
|
|
|
username='test' |
|
65
|
|
|
) |
|
66
|
|
|
gauth = GoogleAuth() |
|
67
|
|
|
gauth.download_http_photo('https://lh4.googleusercontent.com/-CuLSUOTQ4Kw/AAAAAAAAAAI/AAAAAAAAANQ/VlgHrqehE90/s96-c/photo.jpg', user_profile) |
|
68
|
|
|
|
|
69
|
|
|
class WebSocketLoadTest(TestCase): |
|
70
|
|
|
|
|
71
|
|
|
SITE_TO_SPAM = "127.0.0.1:8888" |
|
72
|
|
|
|
|
73
|
|
|
def setUp(self): |
|
74
|
|
|
pass |
|
75
|
|
|
# Room.objects.create(name=ANONYMOUS_REDIS_ROOM) |
|
76
|
|
|
# subprocess.Popen("/usr/bin/redis-server") |
|
77
|
|
|
# thread = Thread(target=call_command, args=('start_tornado',)) |
|
78
|
|
|
# thread.start() |
|
79
|
|
|
|
|
80
|
|
|
def threaded_function(self, session, num): |
|
81
|
|
|
cookies = '{}={}'.format(settings.SESSION_COOKIE_NAME, session) |
|
82
|
|
|
|
|
83
|
|
|
ws = create_connection("{}://{}".format(settings.WEBSOCKET_PROTOCOL, self.SITE_TO_SPAM), cookie=cookies, sslopt={"cert_reqs": ssl.CERT_NONE}) |
|
84
|
|
|
print("Connected #{} with sessions {}".format(num, session)) |
|
85
|
|
|
for i in range(randint(30, 50)): |
|
86
|
|
|
if i % 10 == 0: |
|
87
|
|
|
print("{}#{} sent {}".format(session, num, i)) |
|
88
|
|
|
sleep(random()) |
|
89
|
|
|
ws.send(json.dumps({ |
|
90
|
|
|
VarNames.CONTENT: "{}".format(i), |
|
91
|
|
|
VarNames.EVENT: Actions.SEND_MESSAGE, |
|
92
|
|
|
VarNames.ROOM_ID: settings.ALL_ROOM_ID |
|
93
|
|
|
})) |
|
94
|
|
|
|
|
95
|
|
|
# def read_session(self): |
|
96
|
|
|
# with open('sessionsids.txt') as f: |
|
97
|
|
|
# lines =f.read().splitlines() |
|
98
|
|
|
# return lines |
|
99
|
|
|
|
|
100
|
|
|
|
|
101
|
|
|
def read_session(self): |
|
102
|
|
|
return [k for k in sync_redis.keys() if len(k) == 32] |
|
103
|
|
|
|
|
104
|
|
|
def test_simple(self): |
|
105
|
|
|
max_users = 10 |
|
106
|
|
|
for session in self.read_session(): |
|
107
|
|
|
max_users -= 1 |
|
108
|
|
|
if max_users < 0: |
|
109
|
|
|
break |
|
110
|
|
|
for i in range(randint(3, 7)): |
|
111
|
|
|
thread = Thread(target=self.threaded_function, args=(session, i)) |
|
112
|
|
|
thread.start() |
|
113
|
|
|
|