Completed
Push — master ( 6ca66a...a573ac )
by Andrew
01:29
created

get_client_ip()   A

Complexity

Conditions 2

Size

Total Lines 3

Duplication

Lines 0
Ratio 0 %
Metric Value
cc 2
dl 0
loc 3
rs 10
1
import base64
2
import logging
3
import re
4
import sys
5
from io import BytesIO
6
from threading import Thread
7
8
import requests
9
from django.core.exceptions import ValidationError
10
from django.core.files.uploadedfile import InMemoryUploadedFile
11
from django.core.mail import send_mail
12
from django.core.validators import validate_email
13
14
from chat import local
15
from chat import settings
16
from chat.models import User, UserProfile, Room, Verification
17
from chat.settings import ISSUES_REPORT_LINK, ALL_REDIS_ROOM, SITE_PROTOCOL
18
19
USERNAME_REGEX = "".join(['^[a-zA-Z-_0-9]{1,', str(settings.MAX_USERNAME_LENGTH), '}$'])
20
21
logger = logging.getLogger(__name__)
22
23
24
def is_blank(check_str):
25
	if check_str and check_str.strip():
26
		return False
27
	else:
28
		return True
29
30
31
def hide_fields(post, *fields, huge=False, fill_with='****'):
32
	"""
33
	:param post: Object that will be copied
34
	:type post: QueryDict
35
	:param fields: fields that will be removed
36
	:param huge: if true object will be cloned and then fields will be removed
37
	:return: a shallow copy of dictionary without specified fields
38
	"""
39
	if not huge:
40
		# hide *fields in shallow copy
41
		res = post.copy()
42
		for field in fields:
43
			if field in post:  # check if field was absent
44
				res[field] = fill_with
45
	else:
46
		# copy everything but *fields
47
		res = {}
48
		for field in post:
49
			# _______________________if this is field to remove
50
			res[field] = post[field] if field not in fields else fill_with
51
	return res
52
53
54
def check_password(password):
55
	"""
56
	Checks if password is secure
57
	:raises ValidationError exception if password is not valid
58
	"""
59
	if is_blank(password):
60
		raise ValidationError("password can't be empty")
61
	if not re.match(u'^\S.+\S$', password):
62
		raise ValidationError("password should be at least 3 symbols")
63
64
65
def check_email(email):
66
	"""
67
	:raises ValidationError if specified email is registered or not valid
68
	"""
69
	if not email:
70
		return
71
	try:
72
		validate_email(email)
73
		# theoretically can throw returning 'more than 1' error
74
		UserProfile.objects.get(email=email)
75
		raise ValidationError('Email {} is already used'.format(email))
76
	except User.DoesNotExist:
77
		pass
78
79
80
def check_user(username):
81
	"""
82
	Checks if specified username is free to register
83
	:type username str
84
	:raises ValidationError exception if username is not valid
85
	"""
86
	# Skip javascript validation, only summary message
87
	if is_blank(username):
88
		raise ValidationError("Username can't be empty")
89
	if not re.match(USERNAME_REGEX, username):
90
		raise ValidationError("Username {} doesn't match regex {}".format(username, USERNAME_REGEX))
91
	try:
92
		# theoretically can throw returning 'more than 1' error
93
		User.objects.get(username=username)
94
		raise ValidationError("Username {} is already used. Please select another one".format(username))
95
	except User.DoesNotExist:
96
		pass
97
98
99
def get_client_ip(request):
100
	x_forwarded_for = request.META.get('HTTP_X_FORWARDED_FOR')
101
	return x_forwarded_for.split(',')[-1].strip() if x_forwarded_for else request.META.get('REMOTE_ADDR')
102
103
104
def check_captcha(request):
105
	"""
106
	:type request: WSGIRequest
107
	:raises ValidationError: if captcha is not valid or not set
108
	If RECAPTCHA_SECRET_KEY is enabled in settings validates request with it
109
	"""
110
	captcha_private_key = getattr(settings, "RECAPTCHA_SECRET_KEY", None)
111
	if not captcha_private_key:
112
		logger.debug('Skipping captcha validation')
113
		return
114
	try:
115
		captcha_rs = request.POST.get('g-recaptcha-response')
116
		url = "https://www.google.com/recaptcha/api/siteverify"
117
		params = {
118
			'secret': captcha_private_key,
119
			'response': captcha_rs,
120
			'remoteip': local.client_ip
121
		}
122
		raw_response = requests.post(url, params=params, verify=True)
123
		response = raw_response.json()
124
		if not response.get('success', False):
125
			logger.debug('Captcha is NOT valid, response: %s', raw_response)
126
			raise ValidationError(response['error-codes'] if response.get('error-codes', None) else 'This captcha already used')
127
		logger.debug('Captcha is valid, response: %s', raw_response)
128
	except Exception as e:
129
		raise ValidationError('Unable to check captcha because {}'.format(e))
130
131
132
def send_email_verification(user, site_address):
133
	if user.email is not None:
134
		verification = Verification(user=user, type_enum=Verification.TypeChoices.register)
135
		verification.save()
136
		user.email_verification = verification
137
		user.save(update_fields=['email_verification'])
138
139
		text = ('Hi {}, you have registered pychat'
140
				'\nTo complete your registration click on the url bellow: {}://{}/confirm_email?token={}'
141
				'\n\nIf you find any bugs or propositions you can post them {}/report_issue or {}').format(
142
				user.username, SITE_PROTOCOL, site_address, verification.token, site_address, ISSUES_REPORT_LINK)
143
144
		mail_thread = Thread(
145
			target=send_mail,
146
			args=("Confirm chat registration", text, site_address, [user.email]))
147
		logger.info('Sending verification email to userId %s (email %s)', user.id, user.email)
148
		mail_thread.start()
149
150
151
def extract_photo(image_base64):
152
	base64_type_data = re.search(r'data:(\w+/(\w+));base64,(.*)$', image_base64)
153
	logger.debug('Parsing base64 image')
154
	image_data = base64_type_data.group(3)
155
	file = BytesIO(base64.b64decode(image_data))
156
	content_type = base64_type_data.group(1)
157
	name = base64_type_data.group(2)
158
	logger.debug('Base64 filename extension %s, content_type %s', name, content_type)
159
	image = InMemoryUploadedFile(
160
		file,
161
		field_name='photo',
162
		name=name,
163
		content_type=content_type,
164
		size=sys.getsizeof(file),
165
		charset=None)
166
	return image
167
168
169
def create_user_profile(email, password, sex, username):
170
	user = UserProfile(username=username, email=email, sex_str=sex)
171
	user.set_password(password)
172
	default_thread = Room.objects.get_or_create(name=ALL_REDIS_ROOM)[0]
173
	user.save()
174
	user.rooms.add(default_thread)
175
	user.save()
176
	logger.info(
177
		'Signed up new user %s, subscribed for channels %s',
178
		user, default_thread.name
179
	)
180
	return user