Completed
Push — master ( eecf08...313cfe )
by Paolo
15s queued 12s
created

ExcelTemplateReader.check_countries()   A

Complexity

Conditions 1

Size

Total Lines 9
Code Lines 5

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
eloc 5
dl 0
loc 9
rs 10
c 0
b 0
f 0
cc 1
nop 1
1
#!/usr/bin/env python3
2
# -*- coding: utf-8 -*-
3
"""
4
Created on Fri Jul  5 16:31:45 2019
5
6
@author: Paolo Cozzi <[email protected]>
7
"""
8
9
import xlrd
10
import logging
11
import datetime
12
13
from collections import defaultdict, namedtuple
14
15
from common.constants import ACCURACIES
16
from image_app.helpers import FileDataSourceMixin
17
from image_app.models import DictSex, DictCountry
18
19
from .exceptions import ExcelImportError
20
21
# Get an instance of a logger
22
logger = logging.getLogger(__name__)
23
24
# defining the template columns in need for data import
25
TEMPLATE_COLUMNS = {
26
    'breed': [
27
        'Supplied breed',
28
        # 'Mapped breed',
29
        # 'Mapped breed ontology library',
30
        # 'Mapped breed ontology accession',
31
        'EFABIS Breed country',
32
        'Species',
33
        # 'Species ontology library',
34
        # 'Species ontology accession'
35
    ],
36
    'animal': [
37
        'Animal id in data source',
38
        'Animal description',
39
        'Alternative animal ID',
40
        'Father id in data source',
41
        'Mother id in data source',
42
        'Breed',
43
        'Species',
44
        'Sex',
45
        'Birth date',
46
        'Birth location',
47
        'Birth location longitude',
48
        'Birth location latitude',
49
        'Birth location accuracy'
50
    ],
51
    'sample': [
52
        'Sample id in data source',
53
        'Alternative sample ID',
54
        'Sample description',
55
        'Animal id in data source',
56
        'Specimen collection protocol',
57
        'availability',
58
        'Collection date',
59
        'Collection place latitude',
60
        'Collection place longitude',
61
        'Collection place',
62
        'Collection place accuracy',
63
        'Organism part',
64
        'Developmental stage',
65
        'Physiological stage',
66
        'Animal age at collection',
67
        'Sample storage',
68
        'Sample storage processing',
69
        'Sampling to preparation interval'
70
    ]
71
}
72
73
74
class ExcelTemplateReader(FileDataSourceMixin):
75
    """A class to read template excel files"""
76
77
    def __init__(self):
78
        # read xls file and track it
79
        self.book = None
80
        self.sheet_names = []
81
82
    def read_file(self, filename):
83
        # read xls file and track it
84
        self.book = xlrd.open_workbook(filename)
85
        self.sheet_names = self.book.sheet_names()
86
87
    def check_sheets(self):
88
        """Test for the minimal sheets required to upload data"""
89
90
        not_found = []
91
92
        for sheet_name in TEMPLATE_COLUMNS.keys():
93
            if sheet_name not in self.sheet_names:
94
                not_found.append(sheet_name)
95
                logger.error(
96
                    "required sheet {name} not found in template".format(
97
                        name=sheet_name)
98
                )
99
100
        if len(not_found) > 0:
101
            return False, not_found
102
103
        else:
104
            logger.debug("This seems to be a valid Template file")
105
            return True, not_found
106
107
    def check_columns(self):
108
        """Test for minimal column required for template load"""
109
110
        not_found = defaultdict(list)
111
112
        for sheet_name in TEMPLATE_COLUMNS.keys():
113
            # get a sheet from xls workbook
114
            sheet = self.book.sheet_by_name(sheet_name)
115
116
            # get header from sheet
117
            header = sheet.row_values(0)
118
119
            for column in TEMPLATE_COLUMNS[sheet_name]:
120
                if column not in header:
121
                    not_found[sheet_name].append(column)
122
                    logger.error(
123
                        "required column {column} not found in sheet "
124
                        "{sheet_name}".format(
125
                            sheet_name=sheet_name,
126
                            column=column)
127
                    )
128
129
        if len(not_found) > 0:
130
            return False, not_found
131
132
        else:
133
            logger.debug("This seems to be a valid Template file")
134
            return True, not_found
135
136
    def get_sheet_records(self, sheet_name):
137
        """Generic functions to iterate on excel records"""
138
139
        # this is the sheet I need
140
        sheet = self.book.sheet_by_name(sheet_name)
141
142
        # now get columns to create a collection objects
143
        header = sheet.row_values(0)
144
145
        column_idxs = {}
146
147
        # get the column index I need
148
        for column in TEMPLATE_COLUMNS[sheet_name]:
149
            try:
150
                idx = header.index(column)
151
152
            except ValueError as e:
153
                logger.error(e)
154
                raise ExcelImportError(
155
                    "Column '%s' not found in '%s' sheet" % (
156
                        column, sheet_name))
157
158
            column_idxs[column.lower().replace(" ", "_")] = idx
159
160
        # get new column names
161
        columns = column_idxs.keys()
162
163
        # create a namedtuple object
164
        Record = namedtuple(sheet_name.capitalize(), columns)
165
166
        # iterate over record, mind the header column
167
        for i in range(1, sheet.nrows):
168
            # get a row from excel file
169
            row = sheet.row_values(i)
170
171
            # get the data I need
172
            data = [row[column_idxs[column]] for column in columns]
173
174
            # replace all empty  occurences in a list
175
            data = [None if col in [""]
176
                    else col for col in data]
177
178
            # stripping columns
179
            data = [col.strip() if type(col) is str
180
                    else col for col in data]
181
182
            # treat integers as integers
183
            data = [int(col) if type(col) is float and col.is_integer()
184
                    else col for col in data]
185
186
            # fix date fields. Search for 'date' in column names
187
            date_idxs = [column_idxs[column] for column in columns if
188
                         'date' in column]
189
190
            # fix date objects using datetime, as described here:
191
            # https://stackoverflow.com/a/13962976/4385116
192
            for idx in date_idxs:
193
                if not data[idx]:
194
                    continue
195
196
                data[idx] = datetime.datetime(
197
                    *xlrd.xldate_as_tuple(
198
                        data[idx],
199
                        self.book.datemode
200
                    )
201
                )
202
203
            # get a new object
204
            record = Record._make(data)
205
206
            yield record
207
208
    def get_breed_records(self):
209
        """Iterate among breeds record"""
210
211
        # this is the sheet I need
212
        sheet_name = "breed"
213
        return self.get_sheet_records(sheet_name)
214
215
    def get_animal_records(self):
216
        """Iterate among animal records"""
217
218
        # this is the sheet I need
219
        sheet_name = "animal"
220
        return self.get_sheet_records(sheet_name)
221
222
    def get_sample_records(self):
223
        """Iterate among sample records"""
224
225
        # this is the sheet I need
226
        sheet_name = "sample"
227
        return self.get_sheet_records(sheet_name)
228
229
    def check_species(self, country):
230
        """Check if all species are defined in UID DictSpecies. If not,
231
        create dictionary term"""
232
233
        column = 'species'
234
        item_set = set([breed.species for breed in self.get_breed_records()])
235
236
        # call FileDataSourceMixin.check_species
237
        return super().check_species(column, item_set, country, create=True)
238
239
    def check_species_in_animal_sheet(self):
240
        """Check if all animal species are defined in breed sheet"""
241
242
        check = True
243
        not_found = []
244
245
        reference_set = set(
246
            [breed.species for breed in self.get_breed_records()])
247
248
        test_set = set(
249
            [animal.species for animal in self.get_animal_records()])
250
251
        for specie in test_set:
252
            if specie not in reference_set:
253
                check = False
254
                not_found.append(specie)
255
256
        return check, not_found
257
258
    def check_sex(self):
259
        """Check that all sex records are present in database"""
260
261
        column = 'sex'
262
        item_set = set([animal.sex for animal in self.get_animal_records()])
263
264
        # call FileDataSourceMixin.check_items
265
        return self.check_items(item_set, DictSex, column)
266
267
    def check_countries(self):
268
        """Check that all efabis countries are present in database"""
269
270
        column = "efabis_breed_country"
271
        item_set = set([breed.efabis_breed_country for
272
                        breed in self.get_breed_records()])
273
274
        # call FileDataSourceMixin.check_items
275
        return self.check_items(item_set, DictCountry, column)
276
277
    def __check_accuracy(self, item_set):
278
        """A generic method to test for accuracies"""
279
280
        # a list of not found terms and a status to see if something is missing
281
        # or not
282
        not_found = []
283
        result = True
284
285
        for item in item_set:
286
            try:
287
                ACCURACIES.get_value_by_desc(item)
288
289
            except KeyError:
290
                logger.warning("accuracy level '%s' not found" % (item))
291
                not_found.append(item)
292
293
        if len(not_found) != 0:
294
            result = False
295
296
        return result, not_found
297
298
    def check_accuracies(self):
299
        """Check accuracy specified in table"""
300
301
        item_set = set([animal.birth_location_accuracy
302
                        for animal in self.get_animal_records()])
303
304
        # test for accuracy in animal table
305
        result_animal, not_found_animal = self.__check_accuracy(item_set)
306
307
        item_set = set([sample.collection_place_accuracy
308
                        for sample in self.get_sample_records()])
309
310
        # test for accuracy in sample table
311
        result_sample, not_found_sample = self.__check_accuracy(item_set)
312
313
        # merge two results
314
        check = result_animal and result_sample
315
        not_found = set(not_found_animal + not_found_sample)
316
317
        if check is False:
318
            logger.error(
319
                    "Couldnt' find those accuracies in constants:")
320
            logger.error(not_found)
321
322
        return check, not_found
323