Completed
Push — master ( 37a203...0fb357 )
by Andrew
35s
created

get_max_key()   A

Complexity

Conditions 2

Size

Total Lines 2

Duplication

Lines 0
Ratio 0 %

Importance

Changes 1
Bugs 1 Features 0
Metric Value
cc 2
c 1
b 1
f 0
dl 0
loc 2
rs 10
1
import base64
2
import json
3
import logging
4
import re
5
import sys
6
from io import BytesIO
7
8
import requests
9
from django.contrib.auth import get_user_model
10
from django.core.exceptions import ValidationError
11
from django.core.files.uploadedfile import InMemoryUploadedFile
12
from django.core.mail import send_mail
13
from django.core.validators import validate_email
14
from django.template import RequestContext
15
from django.template.loader import render_to_string
16
from django.utils.safestring import mark_safe
17
18
from chat import local
19
from chat import settings
20
from chat.models import User, UserProfile, Verification, RoomUsers, IpAddress
21
from chat.settings import ISSUES_REPORT_LINK, SITE_PROTOCOL, ALL_ROOM_ID
22
23
try:  # py2
24
	from urllib import urlopen
25
except ImportError:  # py3
26
	from urllib.request import urlopen
27
28
USERNAME_REGEX = str(settings.MAX_USERNAME_LENGTH).join(['^[a-zA-Z-_0-9]{1,', '}$'])
29
API_URL = getattr(settings, "IP_API_URL", None)
30
RECAPTCHA_SECRET_KEY = getattr(settings, "RECAPTCHA_SECRET_KEY", None)
31
GOOGLE_OAUTH_2_CLIENT_ID = getattr(settings, "GOOGLE_OAUTH_2_CLIENT_ID", None)
32
GOOGLE_OAUTH_2_HOST = getattr(settings, "GOOGLE_OAUTH_2_HOST", None)
33
FACEBOOK_ACCESS_TOKEN = getattr(settings, "FACEBOOK_ACCESS_TOKEN", None)
34
35
logger = logging.getLogger(__name__)
36
37
38
def is_blank(check_str):
39
	if check_str and check_str.strip():
40
		return False
41
	else:
42
		return True
43
44
45
def hide_fields(post, fields, huge=False, fill_with='****'):
46
	"""
47
	:param post: Object that will be copied
48
	:type post: QueryDict
49
	:param fields: fields that will be removed
50
	:param fill_with: replace characters for hidden fields
51
	:param huge: if true object will be cloned and then fields will be removed
52
	:return: a shallow copy of dictionary without specified fields
53
	"""
54
	if not huge:
55
		# hide *fields in shallow copy
56
		res = post.copy()
57
		for field in fields:
58
			if field in post:  # check if field was absent
59
				res[field] = fill_with
60
	else:
61
		# copy everything but *fields
62
		res = {}
63
		for field in post:
64
			# _______________________if this is field to remove
65
			res[field] = post[field] if field not in fields else fill_with
66
	return res
67
68
69
def check_password(password):
70
	"""
71
	Checks if password is secure
72
	:raises ValidationError exception if password is not valid
73
	"""
74
	if is_blank(password):
75
		raise ValidationError("password can't be empty")
76
	if not re.match(u'^\S.+\S$', password):
77
		raise ValidationError("password should be at least 3 symbols")
78
79
80
def check_email(email):
81
	"""
82
	:raises ValidationError if specified email is registered or not valid
83
	"""
84
	if not email:
85
		return
86
	try:
87
		validate_email(email)
88
		# theoretically can throw returning 'more than 1' error
89
		UserProfile.objects.get(email=email)
90
		raise ValidationError('Email {} is already used'.format(email))
91
	except User.DoesNotExist:
92
		pass
93
94
95
def check_user(username):
96
	"""
97
	Checks if specified username is free to register
98
	:type username str
99
	:raises ValidationError exception if username is not valid
100
	"""
101
	# Skip javascript validation, only summary message
102
	if is_blank(username):
103
		raise ValidationError("Username can't be empty")
104
	if not re.match(USERNAME_REGEX, username):
105
		raise ValidationError("Username {} doesn't match regex {}".format(username, USERNAME_REGEX))
106
	try:
107
		# theoretically can throw returning 'more than 1' error
108
		User.objects.get(username=username)
109
		raise ValidationError("Username {} is already used. Please select another one".format(username))
110
	except User.DoesNotExist:
111
		pass
112
113
114
def get_client_ip(request):
115
	x_forwarded_for = request.META.get('HTTP_X_FORWARDED_FOR')
116
	return x_forwarded_for.split(',')[-1].strip() if x_forwarded_for else request.META.get('REMOTE_ADDR')
117
118
119
def check_captcha(request):
120
	"""
121
	:type request: WSGIRequest
122
	:raises ValidationError: if captcha is not valid or not set
123
	If RECAPTCHA_SECRET_KEY is enabled in settings validates request with it
124
	"""
125
	if not RECAPTCHA_SECRET_KEY:
126
		logger.debug('Skipping captcha validation')
127
		return
128
	try:
129
		captcha_rs = request.POST.get('g-recaptcha-response')
130
		url = "https://www.google.com/recaptcha/api/siteverify"
131
		params = {
132
			'secret': RECAPTCHA_SECRET_KEY,
133
			'response': captcha_rs,
134
			'remoteip': local.client_ip
135
		}
136
		raw_response = requests.post(url, params=params, verify=True)
137
		response = raw_response.json()
138
		if not response.get('success', False):
139
			logger.debug('Captcha is NOT valid, response: %s', raw_response)
140
			raise ValidationError(
141
				response['error-codes'] if response.get('error-codes', None) else 'This captcha already used')
142
		logger.debug('Captcha is valid, response: %s', raw_response)
143
	except Exception as e:
144
		raise ValidationError('Unable to check captcha because {}'.format(e))
145
146
147
def send_sign_up_email(user, site_address, request):
148
	if user.email is not None:
149
		verification = Verification(user=user, type_enum=Verification.TypeChoices.register)
150
		verification.save()
151
		user.email_verification = verification
152
		user.save(update_fields=['email_verification'])
153
		link = "{}://{}/confirm_email?token={}".format(SITE_PROTOCOL, site_address, verification.token)
154
		text = ('Hi {}, you have registered pychat'
155
				  '\nTo complete your registration please click on the url bellow: {}'
156
				  '\n\nIf you find any bugs or propositions you can post them {}').format(
157
			user.username, link, ISSUES_REPORT_LINK)
158
		start_message = mark_safe((
159
			"You have registered in <b>Pychat</b>. If you find any bugs or propositions you can post them"
160
			" <a href='{}'>here</a>. To complete your registration please click on the link below.").format(
161
				ISSUES_REPORT_LINK))
162
		context = {
163
			'username': user.username,
164
			'magicLink': link,
165
			'btnText': "CONFIRM SIGN UP",
166
			'greetings': start_message
167
		}
168
		html_message = render_to_string('sign_up_email.html', context, context_instance=RequestContext(request))
169
		logger.info('Sending verification email to userId %s (email %s)', user.id, user.email)
170
		send_mail("Confirm chat registration", text, site_address, [user.email, ], html_message=html_message)
171
		logger.info('Email %s has been sent', user.email)
172
173
174
def send_reset_password_email(request, user_profile, verification):
175
	link = "{}://{}/restore_password?token={}".format(SITE_PROTOCOL, request.get_host(), verification.token)
176
	message = "{},\n" \
177
				 "You requested to change a password on site {}.\n" \
178
				 "To proceed click on the link {}\n" \
179
				 "If you didn't request the password change just ignore this mail" \
180
		.format(user_profile.username, request.get_host(), link)
181
	ip_info = get_or_create_ip(get_client_ip(request), logger)
182
	start_message = mark_safe(
183
		"You have requested to send you a magic link to quickly restore password to <b>Pychat</b>. "
184
		"If it wasn't you, you can safely ignore this email")
185
	context = {
186
		'username': user_profile.username,
187
		'magicLink': link,
188
		'ipInfo': ip_info.info,
189
		'ip': ip_info.ip,
190
		'btnText': "CHANGE PASSWORD",
191
		'timeCreated': verification.time,
192
		'greetings': start_message
193
	}
194
	html_message = render_to_string('reset_pass_email.html', context, context_instance=RequestContext(request))
195
	send_mail("Pychat: restore password", message, request.get_host(), (user_profile.email,), fail_silently=False,
196
				 html_message=html_message)
197
198
199
def extract_photo(image_base64, filename=None):
200
	base64_type_data = re.search(r'data:(\w+/(\w+));base64,(.*)$', image_base64)
201
	logger.debug('Parsing base64 image')
202
	image_data = base64_type_data.group(3)
203
	f = BytesIO(base64.b64decode(image_data))
204
	content_type = base64_type_data.group(1)
205
	name = filename or ".{}".format(base64_type_data.group(2))
206
	logger.debug('Base64 filename extension %s, content_type %s', name, content_type)
207
	image = InMemoryUploadedFile(
208
		f,
209
		field_name='photo',
210
		name=name,
211
		content_type=content_type,
212
		size=sys.getsizeof(f),
213
		charset=None)
214
	return image
215
216
217
def get_max_key(dictionary):
218
	return max(dictionary.keys()) if dictionary else None
219
220
221
def create_user_model(user):
222
	user.save()
223
	RoomUsers(user_id=user.id, room_id=ALL_ROOM_ID).save()
224
	logger.info('Signed up new user %s, subscribed for channels with id %d', user, ALL_ROOM_ID)
225
	return user
226
227
228
def get_or_create_ip(ip, logger):
229
	"""
230
231
	@param ip: ip to fetch info from
232
	@param logger initialized logger:
233
	@type IpAddress
234
	"""
235
	try:
236
		ip_address = IpAddress.objects.get(ip=ip)
237
	except IpAddress.DoesNotExist:
238
		try:
239
			if not API_URL:
240
				raise Exception('api url is absent')
241
			logger.debug("Creating ip record %s", ip)
242
			f = urlopen(API_URL % ip)
243
			raw_response = f.read().decode("utf-8")
244
			response = json.loads(raw_response)
245
			if response['status'] != "success":
246
				raise Exception("Creating iprecord failed, server responded: %s" % raw_response)
247
			ip_address = IpAddress.objects.create(
248
				ip=ip,
249
				isp=response['isp'],
250
				country=response['country'],
251
				region=response['regionName'],
252
				city=response['city'],
253
				country_code=response['countryCode']
254
			)
255
		except Exception as e:
256
			logger.error("Error while creating ip with country info, because %s", e)
257
			ip_address = IpAddress.objects.create(ip=ip)
258
	return ip_address
259
260
261
class EmailOrUsernameModelBackend(object):
262
	"""
263
	This is a ModelBacked that allows authentication with either a username or an email address.
264
	"""
265
266
	def authenticate(self, username=None, password=None):
267
		try:
268
			if '@' in username:
269
				user = UserProfile.objects.get(email=username)
270
			else:
271
				user = UserProfile.objects.get(username=username)
272
			if user.check_password(password):
273
				return user
274
		except User.DoesNotExist:
275
			return None
276
277
	def get_user(self, username):
278
		try:
279
			return get_user_model().objects.get(pk=username)
280
		except get_user_model().DoesNotExist:
281
			return None
282