Completed
Push — master ( 5772bc...8baba8 )
by Andrew
59s
created

send_email_verification()   A

Complexity

Conditions 2

Size

Total Lines 15

Duplication

Lines 0
Ratio 0 %

Importance

Changes 1
Bugs 0 Features 0
Metric Value
cc 2
dl 0
loc 15
rs 9.4285
c 1
b 0
f 0
1
import base64
2
import json
3
import logging
4
import re
5
import sys
6
from io import BytesIO
7
8
import requests
9
from django.contrib.auth import get_user_model
10
from django.core.exceptions import ValidationError
11
from django.core.files.uploadedfile import InMemoryUploadedFile
12
from django.core.mail import send_mail
13
from django.core.validators import validate_email
14
from django.template import RequestContext
15
from django.template.loader import render_to_string
16
from django.utils.safestring import mark_safe
17
18
from chat import local
19
from chat import settings
20
from chat.models import User, UserProfile, Verification, RoomUsers, IpAddress
21
from chat.settings import ISSUES_REPORT_LINK, SITE_PROTOCOL, ALL_ROOM_ID
22
23
try:  # py2
24
	from urllib import urlopen
25
except ImportError:  # py3
26
	from urllib.request import urlopen
27
28
USERNAME_REGEX = str(settings.MAX_USERNAME_LENGTH).join(['^[a-zA-Z-_0-9]{1,', '}$'])
29
API_URL = getattr(settings, "IP_API_URL", None)
30
RECAPTCHA_SECRET_KEY = getattr(settings, "RECAPTCHA_SECRET_KEY", None)
31
GOOGLE_OAUTH_2_CLIENT_ID = getattr(settings, "GOOGLE_OAUTH_2_CLIENT_ID", None)
32
GOOGLE_OAUTH_2_HOST = getattr(settings, "GOOGLE_OAUTH_2_HOST", None)
33
FACEBOOK_ACCESS_TOKEN = getattr(settings, "FACEBOOK_ACCESS_TOKEN", None)
34
35
logger = logging.getLogger(__name__)
36
37
38
def is_blank(check_str):
39
	if check_str and check_str.strip():
40
		return False
41
	else:
42
		return True
43
44
45
def hide_fields(post, fields, huge=False, fill_with='****'):
46
	"""
47
	:param post: Object that will be copied
48
	:type post: QueryDict
49
	:param fields: fields that will be removed
50
	:param fill_with: replace characters for hidden fields
51
	:param huge: if true object will be cloned and then fields will be removed
52
	:return: a shallow copy of dictionary without specified fields
53
	"""
54
	if not huge:
55
		# hide *fields in shallow copy
56
		res = post.copy()
57
		for field in fields:
58
			if field in post:  # check if field was absent
59
				res[field] = fill_with
60
	else:
61
		# copy everything but *fields
62
		res = {}
63
		for field in post:
64
			# _______________________if this is field to remove
65
			res[field] = post[field] if field not in fields else fill_with
66
	return res
67
68
69
def check_password(password):
70
	"""
71
	Checks if password is secure
72
	:raises ValidationError exception if password is not valid
73
	"""
74
	if is_blank(password):
75
		raise ValidationError("password can't be empty")
76
	if not re.match(u'^\S.+\S$', password):
77
		raise ValidationError("password should be at least 3 symbols")
78
79
80
def check_email(email):
81
	"""
82
	:raises ValidationError if specified email is registered or not valid
83
	"""
84
	if not email:
85
		return
86
	try:
87
		validate_email(email)
88
		# theoretically can throw returning 'more than 1' error
89
		UserProfile.objects.get(email=email)
90
		raise ValidationError('Email {} is already used'.format(email))
91
	except User.DoesNotExist:
92
		pass
93
94
95
def check_user(username):
96
	"""
97
	Checks if specified username is free to register
98
	:type username str
99
	:raises ValidationError exception if username is not valid
100
	"""
101
	# Skip javascript validation, only summary message
102
	if is_blank(username):
103
		raise ValidationError("Username can't be empty")
104
	if not re.match(USERNAME_REGEX, username):
105
		raise ValidationError("Username {} doesn't match regex {}".format(username, USERNAME_REGEX))
106
	try:
107
		# theoretically can throw returning 'more than 1' error
108
		User.objects.get(username=username)
109
		raise ValidationError("Username {} is already used. Please select another one".format(username))
110
	except User.DoesNotExist:
111
		pass
112
113
114
def get_client_ip(request):
115
	x_forwarded_for = request.META.get('HTTP_X_FORWARDED_FOR')
116
	return x_forwarded_for.split(',')[-1].strip() if x_forwarded_for else request.META.get('REMOTE_ADDR')
117
118
119
def check_captcha(request):
120
	"""
121
	:type request: WSGIRequest
122
	:raises ValidationError: if captcha is not valid or not set
123
	If RECAPTCHA_SECRET_KEY is enabled in settings validates request with it
124
	"""
125
	if not RECAPTCHA_SECRET_KEY:
126
		logger.debug('Skipping captcha validation')
127
		return
128
	try:
129
		captcha_rs = request.POST.get('g-recaptcha-response')
130
		url = "https://www.google.com/recaptcha/api/siteverify"
131
		params = {
132
			'secret': RECAPTCHA_SECRET_KEY,
133
			'response': captcha_rs,
134
			'remoteip': local.client_ip
135
		}
136
		raw_response = requests.post(url, params=params, verify=True)
137
		response = raw_response.json()
138
		if not response.get('success', False):
139
			logger.debug('Captcha is NOT valid, response: %s', raw_response)
140
			raise ValidationError(
141
				response['error-codes'] if response.get('error-codes', None) else 'This captcha already used')
142
		logger.debug('Captcha is valid, response: %s', raw_response)
143
	except Exception as e:
144
		raise ValidationError('Unable to check captcha because {}'.format(e))
145
146
147
def send_sign_up_email(user, site_address, request):
148
	if user.email is not None:
149
		verification = Verification(user=user, type_enum=Verification.TypeChoices.register)
150
		verification.save()
151
		user.email_verification = verification
152
		user.save(update_fields=['email_verification'])
153
		link = "{}://{}/confirm_email?token={}".format(SITE_PROTOCOL, site_address, verification.token)
154
		text = ('Hi {}, you have registered pychat'
155
				  '\nTo complete your registration please click on the url bellow: {}'
156
				  '\n\nIf you find any bugs or propositions you can post them {}').format(
157
			user.username, link, ISSUES_REPORT_LINK)
158
		start_message = mark_safe((
159
			"You have registered in <b>Pychat</b>. If you find any bugs or propositions you can post them"
160
			" <a href='{}'>here</a>. To complete your registration please click on the link below.").format(
161
				ISSUES_REPORT_LINK))
162
		context = {
163
			'username': user.username,
164
			'magicLink': link,
165
			'btnText': "CONFIRM SIGN UP",
166
			'greetings': start_message
167
		}
168
		html_message = render_to_string('sign_up_email.html', context, context_instance=RequestContext(request))
169
		logger.info('Sending verification email to userId %s (email %s)', user.id, user.email)
170
		send_mail("Confirm chat registration", text, site_address, [user.email, ], html_message=html_message)
171
		logger.info('Email %s has been sent', user.email)
172
173
174
def send_reset_password_email(request, user_profile, verification):
175
	link = "{}://{}/restore_password?token={}".format(SITE_PROTOCOL, request.get_host(), verification.token)
176
	message = "{},\n" \
177
				 "You requested to change a password on site {}.\n" \
178
				 "To proceed click on the link {}\n" \
179
				 "If you didn't request the password change just ignore this mail" \
180
		.format(user_profile.username, request.get_host(), link)
181
	ip_info = get_or_create_ip(get_client_ip(request), logger)
182
	start_message = mark_safe(
183
		"You have requested to send you a magic link to quickly restore password to <b>Pychat</b>. "
184
		"If it wasn't you, you can safely ignore this email")
185
	context = {
186
		'username': user_profile.username,
187
		'magicLink': link,
188
		'ipInfo': ip_info.info,
189
		'ip': ip_info.ip,
190
		'btnText': "CHANGE PASSWORD",
191
		'timeCreated': verification.time,
192
		'greetings': start_message
193
	}
194
	html_message = render_to_string('reset_pass_email.html', context, context_instance=RequestContext(request))
195
	send_mail("Pychat: restore password", message, request.get_host(), (user_profile.email,), fail_silently=False,
196
				 html_message=html_message)
197
198
199
def extract_photo(image_base64, filename=None):
200
	base64_type_data = re.search(r'data:(\w+/(\w+));base64,(.*)$', image_base64)
201
	logger.debug('Parsing base64 image')
202
	image_data = base64_type_data.group(3)
203
	file = BytesIO(base64.b64decode(image_data))
204
	content_type = base64_type_data.group(1)
205
	name = filename or ".{}".format(base64_type_data.group(2))
206
	logger.debug('Base64 filename extension %s, content_type %s', name, content_type)
207
	image = InMemoryUploadedFile(
208
		file,
209
		field_name='photo',
210
		name=name,
211
		content_type=content_type,
212
		size=sys.getsizeof(file),
213
		charset=None)
214
	return image
215
216
217
def create_user_model(user):
218
	user.save()
219
	RoomUsers(user_id=user.id, room_id=ALL_ROOM_ID).save()
220
	logger.info('Signed up new user %s, subscribed for channels with id %d', user, ALL_ROOM_ID)
221
	return user
222
223
224
def get_or_create_ip(ip, logger):
225
	"""
226
227
	@param ip: ip to fetch info from
228
	@param logger initialized logger:
229
	@type IpAddress
230
	"""
231
	try:
232
		ip_address = IpAddress.objects.get(ip=ip)
233
	except IpAddress.DoesNotExist:
234
		try:
235
			if not API_URL:
236
				raise Exception('api url is absent')
237
			logger.debug("Creating ip record %s", ip)
238
			f = urlopen(API_URL % ip)
239
			raw_response = f.read().decode("utf-8")
240
			response = json.loads(raw_response)
241
			if response['status'] != "success":
242
				raise Exception("Creating iprecord failed, server responded: %s" % raw_response)
243
			ip_address = IpAddress.objects.create(
244
				ip=ip,
245
				isp=response['isp'],
246
				country=response['country'],
247
				region=response['regionName'],
248
				city=response['city'],
249
				country_code=response['countryCode']
250
			)
251
		except Exception as e:
252
			logger.error("Error while creating ip with country info, because %s", e)
253
			ip_address = IpAddress.objects.create(ip=ip)
254
	return ip_address
255
256
257
class EmailOrUsernameModelBackend(object):
258
	"""
259
	This is a ModelBacked that allows authentication with either a username or an email address.
260
	"""
261
262
	def authenticate(self, username=None, password=None):
263
		try:
264
			if '@' in username:
265
				user = UserProfile.objects.get(email=username)
266
			else:
267
				user = UserProfile.objects.get(username=username)
268
			if user.check_password(password):
269
				return user
270
		except User.DoesNotExist:
271
			return None
272
273
	def get_user(self, username):
274
		try:
275
			return get_user_model().objects.get(pk=username)
276
		except get_user_model().DoesNotExist:
277
			return None
278