Completed
Push — master ( 5850a4...5772bc )
by Andrew
01:00
created

get_or_create_ip()   B

Complexity

Conditions 5

Size

Total Lines 31

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
cc 5
dl 0
loc 31
rs 8.0894
c 0
b 0
f 0
1
import base64
2
import json
3
import logging
4
import re
5
import sys
6
from io import BytesIO
7
8
import requests
9
from django.contrib.auth import get_user_model
10
from django.core.exceptions import ValidationError
11
from django.core.files.uploadedfile import InMemoryUploadedFile
12
from django.core.mail import send_mail
13
from django.core.validators import validate_email
14
15
from chat import local
16
from chat import settings
17
from chat.models import User, UserProfile, Verification, RoomUsers, IpAddress
18
from chat.settings import ISSUES_REPORT_LINK, SITE_PROTOCOL, ALL_ROOM_ID
19
20
try:  # py2
21
	from urllib import urlopen
22
except ImportError:  # py3
23
	from urllib.request import urlopen
24
25
USERNAME_REGEX = str(settings.MAX_USERNAME_LENGTH).join(['^[a-zA-Z-_0-9]{1,', '}$'])
26
API_URL = getattr(settings, "IP_API_URL", None)
27
RECAPTCHA_SECRET_KEY = getattr(settings, "RECAPTCHA_SECRET_KEY", None)
28
GOOGLE_OAUTH_2_CLIENT_ID = getattr(settings, "GOOGLE_OAUTH_2_CLIENT_ID", None)
29
GOOGLE_OAUTH_2_HOST = getattr(settings, "GOOGLE_OAUTH_2_HOST", None)
30
FACEBOOK_ACCESS_TOKEN = getattr(settings, "FACEBOOK_ACCESS_TOKEN", None)
31
32
logger = logging.getLogger(__name__)
33
34
35
def is_blank(check_str):
36
	if check_str and check_str.strip():
37
		return False
38
	else:
39
		return True
40
41
42
def hide_fields(post, fields, huge=False, fill_with='****'):
43
	"""
44
	:param post: Object that will be copied
45
	:type post: QueryDict
46
	:param fields: fields that will be removed
47
	:param fill_with: replace characters for hidden fields
48
	:param huge: if true object will be cloned and then fields will be removed
49
	:return: a shallow copy of dictionary without specified fields
50
	"""
51
	if not huge:
52
		# hide *fields in shallow copy
53
		res = post.copy()
54
		for field in fields:
55
			if field in post:  # check if field was absent
56
				res[field] = fill_with
57
	else:
58
		# copy everything but *fields
59
		res = {}
60
		for field in post:
61
			# _______________________if this is field to remove
62
			res[field] = post[field] if field not in fields else fill_with
63
	return res
64
65
66
def check_password(password):
67
	"""
68
	Checks if password is secure
69
	:raises ValidationError exception if password is not valid
70
	"""
71
	if is_blank(password):
72
		raise ValidationError("password can't be empty")
73
	if not re.match(u'^\S.+\S$', password):
74
		raise ValidationError("password should be at least 3 symbols")
75
76
77
def check_email(email):
78
	"""
79
	:raises ValidationError if specified email is registered or not valid
80
	"""
81
	if not email:
82
		return
83
	try:
84
		validate_email(email)
85
		# theoretically can throw returning 'more than 1' error
86
		UserProfile.objects.get(email=email)
87
		raise ValidationError('Email {} is already used'.format(email))
88
	except User.DoesNotExist:
89
		pass
90
91
92
def check_user(username):
93
	"""
94
	Checks if specified username is free to register
95
	:type username str
96
	:raises ValidationError exception if username is not valid
97
	"""
98
	# Skip javascript validation, only summary message
99
	if is_blank(username):
100
		raise ValidationError("Username can't be empty")
101
	if not re.match(USERNAME_REGEX, username):
102
		raise ValidationError("Username {} doesn't match regex {}".format(username, USERNAME_REGEX))
103
	try:
104
		# theoretically can throw returning 'more than 1' error
105
		User.objects.get(username=username)
106
		raise ValidationError("Username {} is already used. Please select another one".format(username))
107
	except User.DoesNotExist:
108
		pass
109
110
111
def get_client_ip(request):
112
	x_forwarded_for = request.META.get('HTTP_X_FORWARDED_FOR')
113
	return x_forwarded_for.split(',')[-1].strip() if x_forwarded_for else request.META.get('REMOTE_ADDR')
114
115
116
def check_captcha(request):
117
	"""
118
	:type request: WSGIRequest
119
	:raises ValidationError: if captcha is not valid or not set
120
	If RECAPTCHA_SECRET_KEY is enabled in settings validates request with it
121
	"""
122
	if not RECAPTCHA_SECRET_KEY:
123
		logger.debug('Skipping captcha validation')
124
		return
125
	try:
126
		captcha_rs = request.POST.get('g-recaptcha-response')
127
		url = "https://www.google.com/recaptcha/api/siteverify"
128
		params = {
129
			'secret': RECAPTCHA_SECRET_KEY,
130
			'response': captcha_rs,
131
			'remoteip': local.client_ip
132
		}
133
		raw_response = requests.post(url, params=params, verify=True)
134
		response = raw_response.json()
135
		if not response.get('success', False):
136
			logger.debug('Captcha is NOT valid, response: %s', raw_response)
137
			raise ValidationError(
138
				response['error-codes'] if response.get('error-codes', None) else 'This captcha already used')
139
		logger.debug('Captcha is valid, response: %s', raw_response)
140
	except Exception as e:
141
		raise ValidationError('Unable to check captcha because {}'.format(e))
142
143
144
def send_email_verification(user, site_address):
145
	if user.email is not None:
146
		verification = Verification(user=user, type_enum=Verification.TypeChoices.register)
147
		verification.save()
148
		user.email_verification = verification
149
		user.save(update_fields=['email_verification'])
150
151
		text = ('Hi {}, you have registered pychat'
152
				  '\nTo complete your registration click on the url bellow: {}://{}/confirm_email?token={}'
153
				  '\n\nIf you find any bugs or propositions you can post them {}/report_issue or {}').format(
154
			user.username, SITE_PROTOCOL, site_address, verification.token, site_address, ISSUES_REPORT_LINK)
155
156
		logger.info('Sending verification email to userId %s (email %s)', user.id, user.email)
157
		send_mail("Confirm chat registration", text, site_address, [user.email, ])
158
		logger.info('Email %s has been sent', user.email)
159
160
161
def extract_photo(image_base64, filename=None):
162
	base64_type_data = re.search(r'data:(\w+/(\w+));base64,(.*)$', image_base64)
163
	logger.debug('Parsing base64 image')
164
	image_data = base64_type_data.group(3)
165
	file = BytesIO(base64.b64decode(image_data))
166
	content_type = base64_type_data.group(1)
167
	name = filename or ".{}".format(base64_type_data.group(2))
168
	logger.debug('Base64 filename extension %s, content_type %s', name, content_type)
169
	image = InMemoryUploadedFile(
170
		file,
171
		field_name='photo',
172
		name=name,
173
		content_type=content_type,
174
		size=sys.getsizeof(file),
175
		charset=None)
176
	return image
177
178
179
def create_user_model(user):
180
	user.save()
181
	RoomUsers(user_id=user.id, room_id=ALL_ROOM_ID).save()
182
	logger.info('Signed up new user %s, subscribed for channels with id %d', user, ALL_ROOM_ID)
183
	return user
184
185
186
def get_or_create_ip(ip, logger):
187
	"""
188
189
	@param ip: ip to fetch info from
190
	@param logger initialized logger:
191
	@type IpAddress
192
	"""
193
	try:
194
		ip_address = IpAddress.objects.get(ip=ip)
195
	except IpAddress.DoesNotExist:
196
		try:
197
			if not API_URL:
198
				raise Exception('api url is absent')
199
			logger.debug("Creating ip record %s", ip)
200
			f = urlopen(API_URL % ip)
201
			raw_response = f.read().decode("utf-8")
202
			response = json.loads(raw_response)
203
			if response['status'] != "success":
204
				raise Exception("Creating iprecord failed, server responded: %s" % raw_response)
205
			ip_address = IpAddress.objects.create(
206
				ip=ip,
207
				isp=response['isp'],
208
				country=response['country'],
209
				region=response['regionName'],
210
				city=response['city'],
211
				country_code=response['countryCode']
212
			)
213
		except Exception as e:
214
			logger.error("Error while creating ip with country info, because %s", e)
215
			ip_address = IpAddress.objects.create(ip=ip)
216
	return ip_address
217
218
class EmailOrUsernameModelBackend(object):
219
	"""
220
	This is a ModelBacked that allows authentication with either a username or an email address.
221
	"""
222
223
	def authenticate(self, username=None, password=None):
224
		try:
225
			if '@' in username:
226
				user = UserProfile.objects.get(email=username)
227
			else:
228
				user = UserProfile.objects.get(username=username)
229
			if user.check_password(password):
230
				return user
231
		except User.DoesNotExist:
232
			return None
233
234
	def get_user(self, username):
235
		try:
236
			return get_user_model().objects.get(pk=username)
237
		except get_user_model().DoesNotExist:
238
			return None
239