submissions.forms   A
last analyzed

Complexity

Total Complexity 24

Size/Duplication

Total Lines 263
Duplicated Lines 0 %

Importance

Changes 0
Metric Value
wmc 24
eloc 146
dl 0
loc 263
rs 10
c 0
b 0
f 0

6 Methods

Rating   Name   Duplication   Size   Complexity  
A UniqueSubmissionMixin.check_submission_exists() 0 35 5
B SubmissionFormMixin.clean() 0 21 8
A UpdateSubmissionForm.clean() 0 3 1
A SubmissionFormMixin.check_crbanim_columns() 0 19 2
A SubmissionFormMixin.check_file_encoding() 0 24 2
B SubmissionFormMixin.check_template_file() 0 40 6
1
#!/usr/bin/env python3
2
# -*- coding: utf-8 -*-
3
"""
4
Created on Tue Jul 24 15:51:05 2018
5
6
@author: Paolo Cozzi <[email protected]>
7
"""
8
9
import magic
10
import logging
11
import tempfile
12
13
from django import forms
14
from django.conf import settings
15
16
from common.constants import CRB_ANIM_TYPE, TEMPLATE_TYPE
17
from common.forms import RequestFormMixin
18
from common.helpers import get_admin_emails
19
from uid.models import Submission
20
from crbanim.helpers import CRBAnimReader
21
from excel.helpers import ExcelTemplateReader
22
23
# Get an instance of a logger
24
logger = logging.getLogger(__name__)
25
26
27
class UniqueSubmissionMixin():
28
    # a custom attribute in order to determine if I'm reloading or not
29
    is_reload = False
30
31
    def check_submission_exists(self):
32
        """Test if I already have a submission with the same data"""
33
34
        logger.info(self.changed_data)
35
36
        if (self.is_reload and (
37
                'datasource_type' not in self.changed_data and
38
                'datasource_version' not in self.changed_data)):
39
40
            logger.info("Replacing a datasource: %s" % self.cleaned_data)
41
42
            # I'm replacing data, no need to test if a submission with the
43
            # same data exists
44
            return
45
46
        # get unique attributes
47
        unique_together = Submission._meta.unique_together[0]
48
49
        # get submitted attributes
50
        data = {key: self.cleaned_data.get(key) for key in unique_together
51
                if self.cleaned_data.get(key) is not None}
52
53
        # ovverride owner attribute
54
        data['owner'] = self.request.user
55
56
        # test for a submission object with the same attributes
57
        if Submission.objects.filter(**data).exists():
58
            msg = (
59
                "Error: There is already a submission with the same "
60
                "attributes. Please change one of the following: "
61
                "Gene bank name, Gene bank country, Data source type and "
62
                "Data source version")
63
64
            # raising an exception:
65
            raise forms.ValidationError(msg, code='invalid')
66
67
68
class SubmissionFormMixin(UniqueSubmissionMixin):
69
    def clean(self):
70
        # test if I have a submission with the provided data
71
        self.check_submission_exists()
72
73
        # I can call this method without providing a 'uploaded file'
74
        # (for instance, when omitting uploaded file)
75
        if "uploaded_file" in self.cleaned_data:
76
            # avoid file type for excel types (is not an text file)
77
            if ("datasource_type" in self.cleaned_data and
78
                    self.cleaned_data["datasource_type"] != TEMPLATE_TYPE):
79
                self.check_file_encoding()
80
81
            # check crbanim files only if provided
82
            if ("datasource_type" in self.cleaned_data and
83
                    self.cleaned_data["datasource_type"] == CRB_ANIM_TYPE):
84
                self.check_crbanim_columns()
85
86
            # check template files only if provided
87
            if ("datasource_type" in self.cleaned_data and
88
                    self.cleaned_data["datasource_type"] == TEMPLATE_TYPE):
89
                self.check_template_file()
90
91
    def check_file_encoding(self):
92
        uploaded_file = self.cleaned_data['uploaded_file']
93
94
        # read one chunk of such file
95
        chunk = next(uploaded_file.chunks())
96
        magic_line = magic.from_buffer(chunk)
97
        file_type = magic_line.split(",")[0]
98
99
        # changed cause the different behavior of libmagic1
100
        if file_type not in [
101
                "CSV text",
102
                "ASCII",
103
                "UTF-8",
104
                "UTF-8 Unicode text"
105
                ]:
106
107
            # create message and add error
108
            msg = (
109
                "Error: file is not a CSV in UTF-8 nor ASCII format: "
110
                "format was '%s'" % file_type)
111
            logger.error(msg)
112
113
            # raising an exception:
114
            raise forms.ValidationError(msg, code='invalid')
115
116
    def check_crbanim_columns(self):
117
        """Check if a CRBanim file has mandatory columns"""
118
119
        uploaded_file = self.cleaned_data['uploaded_file']
120
121
        # read one chunk of such file
122
        chunk = next(uploaded_file.chunks())
123
124
        # now determine if CRBanim file is valid. chunk is in binary format
125
        # neet to convert to a string, fortunately I've already check that
126
        # file is in UTF-8
127
        check, not_found = CRBAnimReader.is_valid(chunk.decode("utf-8"))
128
129
        if check is False:
130
            msg = "Error: file lacks of CRBanim mandatory columns: %s" % (
131
                not_found)
132
133
            # raising an exception:
134
            raise forms.ValidationError(msg, code='invalid')
135
136
    def check_template_file(self):
137
        """Check if template file has columns and sheets"""
138
139
        uploaded_file = self.cleaned_data['uploaded_file']
140
141
        chunk = next(uploaded_file.chunks())
142
        magic_line = magic.from_buffer(chunk)
143
144
        if 'Microsoft' not in magic_line:
145
            msg = "The file you provided is not a Template file"
146
            raise forms.ValidationError(msg, code='invalid')
147
148
        # xlrd can manage only files. Write a temporary file
149
        with tempfile.NamedTemporaryFile(delete=True) as tmpfile:
150
            for chunk in uploaded_file.chunks():
151
                tmpfile.write(chunk)
152
153
            # open the file with proper model
154
            reader = ExcelTemplateReader()
155
            reader.read_file(tmpfile.name)
156
157
            # check that template has at least breed, animal, sample sheets
158
            check, not_found = reader.check_sheets()
159
160
            if check is False:
161
                msg = "Error: file lacks of Template mandatory sheets: %s" % (
162
                    not_found)
163
164
                # raising an exception:
165
                raise forms.ValidationError(msg, code='invalid')
166
167
            # check that template has at least breed, animal, sample sheets
168
            check, not_found = reader.check_columns()
169
170
            if check is False:
171
                msg = "Error: file lacks of Template mandatory columns: %s" % (
172
                    not_found)
173
174
                # raising an exception:
175
                raise forms.ValidationError(msg, code='invalid')
176
177
178
class SubmissionForm(SubmissionFormMixin, RequestFormMixin, forms.ModelForm):
179
    class Meta:
180
        model = Submission
181
        fields = (
182
            'title',
183
            'description',
184
            'gene_bank_name',
185
            'gene_bank_country',
186
            'organization',
187
            'datasource_type',
188
            'datasource_version',
189
            'uploaded_file'
190
        )
191
192
        help_texts = {
193
            'uploaded_file': 'Need to be in UTF-8 or ASCII format',
194
            'organization': (
195
                """Who owns the data. Not listed? please """
196
                """<a href="mailto:{0}?subject=please add my organization">"""
197
                """contact us</a>""".format(get_admin_emails()[0])
198
            ),
199
            'datasource_type': (
200
                """example: CryoWeb. Need an empty template file? """
201
                """download it from <a href="%s%s">here</a>""" % (
202
                    settings.MEDIA_URL,
203
                    "Image_sample_empty_template_20191002_v2.1.xlsx")
204
            )
205
        }
206
207
208
# I use forms.Form since I need to pass primary key as a field,
209
# and I can't use it with a modelform
210
class ReloadForm(SubmissionFormMixin, RequestFormMixin, forms.ModelForm):
211
    # a custom attribute in order to determine if I'm reloading or not
212
    is_reload = True
213
214
    # custom attributes
215
    agree_reload = forms.BooleanField(
216
        label="That's fine. Replace my submission data with this file",
217
        help_text="You have to check this box to reload your data")
218
219
    class Meta:
220
        model = Submission
221
        fields = (
222
            'datasource_type',
223
            'datasource_version',
224
            'uploaded_file',
225
        )
226
227
        help_texts = {
228
            'uploaded_file': 'Need to be in UTF-8 or ASCII format',
229
            'datasource_type': (
230
                """example: CryoWeb. Need an empty template file? """
231
                """download it from <a href="%s%s">here</a>""" % (
232
                    settings.MEDIA_URL,
233
                    "Image_sample_empty_template_20191002_v2.1.xlsx")
234
            )
235
        }
236
237
238
class UpdateSubmissionForm(
239
        UniqueSubmissionMixin, RequestFormMixin, forms.ModelForm):
240
    class Meta:
241
        model = Submission
242
        fields = (
243
            'title',
244
            'description',
245
            'gene_bank_name',
246
            'gene_bank_country',
247
            'organization',
248
            "datasource_type",
249
            "datasource_version",
250
        )
251
252
        help_texts = {
253
            'organization': (
254
                """Who owns the data. Not listed? please """
255
                """<a href="mailto:{0}?subject=please add my organization">"""
256
                """contact us</a>""".format(get_admin_emails()[0])
257
            )
258
        }
259
260
    def clean(self):
261
        # test if I have a submission with the provided data
262
        self.check_submission_exists()
263