1
|
|
|
import json |
2
|
|
|
import ssl |
3
|
|
|
from random import randint |
4
|
|
|
from random import random |
5
|
|
|
from threading import Thread |
6
|
|
|
from time import sleep |
7
|
|
|
|
8
|
|
|
from django.conf import settings |
9
|
|
|
from django.test import TestCase |
10
|
|
|
from websocket import create_connection |
11
|
|
|
|
12
|
|
|
# class ModelTest(TestCase): |
13
|
|
|
# |
14
|
|
|
# def test_gender(self): |
15
|
|
|
# user = UserProfile(sex_str='Female') |
16
|
|
|
# self.assertEqual(user.sex, 2) |
17
|
|
|
# user.sex_str = 'Male' |
18
|
|
|
# self.assertEqual(user.sex, 1) |
19
|
|
|
# user.sex_str = 'WrongString' |
20
|
|
|
# self.assertEqual(user.sex, 0) |
21
|
|
|
# |
22
|
|
|
# |
23
|
|
|
# class RegisterUtilsTest(TestCase): |
24
|
|
|
# |
25
|
|
|
# def test_check_password(self): |
26
|
|
|
# self.assertRaises(ValidationError, check_password, "ag") |
27
|
|
|
# self.assertRaises(ValidationError, check_password, "") |
28
|
|
|
# self.assertRaises(ValidationError, check_password, " ") |
29
|
|
|
# self.assertRaises(ValidationError, check_password, " fs ") |
30
|
|
|
# check_password("FineP@ssord") |
31
|
|
|
# |
32
|
|
|
# def test_send_email(self): |
33
|
|
|
# up = UserProfile(username='Test', email='[email protected]', sex_str='Mail') |
34
|
|
|
# send_email_verification(up, 'Any') |
35
|
|
|
# |
36
|
|
|
# def test_check_user(self): |
37
|
|
|
# self.assertRaises(ValidationError, check_user, "d"*100) |
38
|
|
|
# self.assertRaises(ValidationError, check_user, "asdfs,+") |
39
|
|
|
# check_user("Fine") |
40
|
|
|
# |
41
|
|
|
# |
42
|
|
|
# class SeleniumBrowserTest(TestCase): |
43
|
|
|
# |
44
|
|
|
# def test_check_main_page(self): |
45
|
|
|
# driver = webdriver.Firefox() |
46
|
|
|
# driver.get("localhost:8000") # TODO inject url |
47
|
|
|
# assert "chat" in driver.title |
48
|
|
|
# elem = driver.find_element_by_id("userNameLabel") |
49
|
|
|
# self.assertRegexpMatches(elem.text, "^[a-zA-Z-_0-9]{1,16}$") |
50
|
|
|
# driver.close() |
51
|
|
|
from chat.global_redis import sync_redis |
52
|
|
|
from chat.models import UserProfile |
53
|
|
|
from chat.socials import GoogleAuth |
54
|
|
|
from chat.tornado.constants import VarNames, Actions |
55
|
|
|
|
56
|
|
|
|
57
|
|
|
class RegisterTest(TestCase): |
58
|
|
|
|
59
|
|
|
def test_animals_can_speak(self): |
60
|
|
|
user_profile = UserProfile( |
61
|
|
|
name='test', |
62
|
|
|
surname='test', |
63
|
|
|
email='[email protected]', |
64
|
|
|
username='test' |
65
|
|
|
) |
66
|
|
|
gauth = GoogleAuth() |
67
|
|
|
gauth.download_http_photo('https://lh4.googleusercontent.com/-CuLSUOTQ4Kw/AAAAAAAAAAI/AAAAAAAAANQ/VlgHrqehE90/s96-c/photo.jpg', user_profile) |
68
|
|
|
|
69
|
|
|
class WebSocketLoadTest(TestCase): |
70
|
|
|
|
71
|
|
|
SITE_TO_SPAM = "127.0.0.1:8888" |
72
|
|
|
|
73
|
|
|
def setUp(self): |
74
|
|
|
pass |
75
|
|
|
# Room.objects.create(name=ANONYMOUS_REDIS_ROOM) |
76
|
|
|
# subprocess.Popen("/usr/bin/redis-server") |
77
|
|
|
# thread = Thread(target=call_command, args=('start_tornado',)) |
78
|
|
|
# thread.start() |
79
|
|
|
|
80
|
|
|
def threaded_function(self, session, num): |
81
|
|
|
cookies = '{}={}'.format(settings.SESSION_COOKIE_NAME, session) |
82
|
|
|
|
83
|
|
|
ws = create_connection("{}://{}".format(settings.WEBSOCKET_PROTOCOL, self.SITE_TO_SPAM), cookie=cookies, sslopt={"cert_reqs": ssl.CERT_NONE}) |
84
|
|
|
print("Connected #{} with sessions {}".format(num, session)) |
85
|
|
|
for i in range(randint(30, 50)): |
86
|
|
|
if i % 10 == 0: |
87
|
|
|
print("{}#{} sent {}".format(session, num, i)) |
88
|
|
|
sleep(random()) |
89
|
|
|
ws.send(json.dumps({ |
90
|
|
|
VarNames.CONTENT: "{}".format(i), |
91
|
|
|
VarNames.EVENT: Actions.SEND_MESSAGE, |
92
|
|
|
VarNames.ROOM_ID: settings.ALL_ROOM_ID |
93
|
|
|
})) |
94
|
|
|
|
95
|
|
|
# def read_session(self): |
96
|
|
|
# with open('sessionsids.txt') as f: |
97
|
|
|
# lines =f.read().splitlines() |
98
|
|
|
# return lines |
99
|
|
|
|
100
|
|
|
|
101
|
|
|
def read_session(self): |
102
|
|
|
return [k for k in sync_redis.keys() if len(k) == 32] |
103
|
|
|
|
104
|
|
|
def test_simple(self): |
105
|
|
|
max_users = 10 |
106
|
|
|
for session in self.read_session(): |
107
|
|
|
max_users -= 1 |
108
|
|
|
if max_users < 0: |
109
|
|
|
break |
110
|
|
|
for i in range(randint(3, 7)): |
111
|
|
|
thread = Thread(target=self.threaded_function, args=(session, i)) |
112
|
|
|
thread.start() |
113
|
|
|
|