GitHub Access Token became invalid

It seems like the GitHub access token used for retrieving details about this repository from GitHub became invalid. This might prevent certain types of inspections from being run (in particular, everything related to pull requests).
Please ask an admin of your repository to re-new the access token on this website.
Completed
Push — master ( 7eb0af...62a905 )
by Anca
20:54
created

crowdtruth.load.process_file()   B

Complexity

Conditions 7

Size

Total Lines 103
Code Lines 47

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 31
CRAP Score 7.0109

Importance

Changes 0
Metric Value
eloc 47
dl 0
loc 103
ccs 31
cts 33
cp 0.9394
rs 7.3345
c 0
b 0
f 0
cc 7
nop 3
crap 7.0109

How to fix   Long Method   

Long Method

Small methods make your code easier to understand, in particular if combined with a good name. Besides, if your method is small, finding a good name is usually much easier.

For example, if you find yourself adding comments to a method's body, this is usually a good sign to extract the commented part to a new method, and use the comment as a starting point when coming up with a good name for this new method.

Commonly applied refactorings include:

1
#pylint: disable=W0223
2
3 1
"""
4
Module used to process and load the input files to be evaluated with the CrowdTruth metrics.
5
"""
6
7 1
import os
8
9 1
import logging
10 1
import datetime
11 1
from collections import Counter, OrderedDict
12 1
import dateparser
13
14 1
import pandas as pd
15
16 1
from crowdtruth.models.worker import Worker
17 1
from crowdtruth.models.unit import Unit
18 1
from crowdtruth.models.job import Job
19 1
from crowdtruth.configuration import DefaultConfig
20 1
from crowdtruth.crowd_platform import *
21
22
23
24
25
# create an ordered counter so that we can maintain
26
# the position of tags in the order they were annotated
27 1
class OrderedCounter(Counter, OrderedDict):
28
    """ Instantiates an ordered counter. """
29 1
    pass
30
31 1
def create_ordered_counter(ordered_counter, annotation_vector):
32
    """ Instantiates an ordered counter from a given annotation vector. """
33 1
    for relation in annotation_vector:
34 1
        if relation not in ordered_counter:
35 1
            ordered_counter.update({relation: 0})
36 1
    return ordered_counter
37
38 1
def validate_timestamp_field(date_string, date_format):
39
    """ Validates the time columns (started time and submitted time) in input files. """
40
41
    try:
42
        date_obj = datetime.datetime.strptime(date_string, date_format)
43
        print(date_obj)
44
    except ValueError:
45
        raise ValueError('Incorrect date format')
46
47 1
def get_file_list(directory):
48
    """ List the files in the directry given as argument. """
49 1
    filelist = []
50
51
    # go through all files in this folder
52 1
    for file in os.listdir(directory):
53
        # if it is a folder scan it
54 1
        if os.path.isdir(directory+'/'+file):
55
            sublist = get_file_list(directory+'/'+file)
56
            sublist_length = len(sublist)
57
            if sublist_length:
58
                filelist.append(sublist)
59
60
        # if it is a csv file open it
61 1
        if file.endswith('.csv') and file != 'groundtruth.csv':
62 1
            filelist.append(file)
63 1
    return filelist
64
65 1
def list_files(kwargs, results, config):
66
    """ Creates a list of files to be processed. """
67 1
    files = []
68 1
    directory = ""
69 1
    if 'data_frame' in kwargs:
70 1
        res, config = process_file(kwargs['data_frame'], config)
71 1
        for value in res:
72 1
            results[value].append(res[value])
73 1
        return results
74 1
    elif 'file' in kwargs and kwargs['file'].endswith('.csv'):
75 1
        files = [kwargs['file']]
76 1
    elif 'directory' in kwargs:
77 1
        directory = kwargs['directory']
78 1
        files = get_file_list(directory)
79 1
        logging.info('Found ' + str(len(files)) + ' files')
80
    else:
81
        raise ValueError('No input was provided')
82
83 1
    for file in files:
84 1
        if 'directory' in locals() and directory != "":
85 1
            logging.info("Processing " + file)
86 1
            file = directory + "/" + file
87
88 1
        judgments = pd.read_csv(file)#, encoding=result['encoding'])
89 1
        res, config = process_file(judgments, config, filename=file)
90 1
        for value in res:
91 1
            results[value].append(res[value])
92
93 1
    return results
94
95 1
def load(**kwargs):
96
    """ Loads the input files. """
97
98
    # placeholder for aggregated results
99 1
    results = {
100
        'jobs' : [],
101
        'units' : [],
102
        'workers' : [],
103
        'judgments' : [],
104
        'annotations' : []
105
        }
106
107 1
    if 'config' not in kwargs:
108
        config = DefaultConfig()
109
    else:
110 1
        logging.info('Config loaded')
111 1
        config = kwargs['config']
112
113 1
    results = list_files(kwargs, results, config)
114 1
    for value in results:
115 1
        results[value] = pd.concat(results[value])
116
117
118
    # workers and annotations can appear across jobs, so we have to aggregate those extra
119 1
    results['workers'] = results['workers'].groupby(results['workers'].index).agg({
120
        'unit' : 'sum',
121
        'judgment' : 'sum',
122
        'job' : 'count',
123
        'duration' : 'mean'
124
        })
125
126
    # aggregate annotations
127 1
    results['annotations'] = results['annotations'].groupby(results['annotations'].index).sum()
128
129 1
    return results, config
130
131 1
def remove_empty_rows(config, judgments):
132
    """ handle rows where the worker did not give an answer (AMT issue) """
133
134
    # if config keeps empty rows, add NONE placehoder token
135 1
    if not config.remove_empty_rows:
136 1
        for col in config.outputColumns:
137 1
            for idx in range(len(judgments[col])):
138 1
                if (pd.isnull(judgments[col][idx]) or
139
                        judgments[col][idx] is None or
140
                        judgments[col][idx] == '' or
141
                        judgments[col][idx] == 'nan'):
142 1
                    logging.info('judgments[' + str(idx) + '][' + col + '] is None')
143 1
                    judgments.at[idx, col] = config.none_token
144
    # remove empty rows
145
    else:
146 1
        empty_rows = set()
147 1
        for col in config.outputColumns:
148 1
            empty_rows = empty_rows.union(judgments[pd.isnull(judgments[col]) == True].index)
149 1
            empty_rows = empty_rows.union(judgments[judgments[col] == 'nan'].index)
150 1
        for col in config.outputColumns:
151 1
            judgments = judgments[pd.isnull(judgments[col]) == False]
152 1
            judgments = judgments[judgments[col] != 'nan']
153 1
        judgments = judgments.reset_index(drop=True)
154 1
        count_empty_rows = len(empty_rows)
155 1
        if count_empty_rows > 0:
156 1
            if count_empty_rows == 1:
157
                logging.warning(str(count_empty_rows) + " row with incomplete information in "
158
                                "output columns was removed.")
159
            else:
160 1
                logging.warning(str(count_empty_rows) + " rows with incomplete information in "
161
                            "output columns were removed.")
162 1
    return judgments
163
164 1
def remove_single_judgment_units(judgments):
165
    """ remove units with just 1 judgment """
166 1
    units_1work = judgments.groupby('unit').filter(lambda x: len(x) == 1)["unit"]
167 1
    judgments = judgments[~judgments['unit'].isin(units_1work)]
168 1
    judgments = judgments.reset_index(drop=True)
169 1
    no_units_1work = len(units_1work)
170 1
    if no_units_1work > 0:
171
        if no_units_1work == 1:
172
            logging.warning(str(no_units_1work) + " Media Unit that was annotated by only"
173
                            " 1 Worker was omitted, since agreement cannot be calculated.")
174
        else:
175
            logging.warning(str(no_units_1work) + " Media Units that were annotated by only"
176
                            " 1 Worker were omitted, since agreement cannot be calculated.")
177 1
    return judgments
178
179 1
def make_output_cols_safe_keys(config, judgments):
180
    """ make output values safe keys """
181 1
    for col in config.output.values():
182 1
        if isinstance(judgments[col].iloc[0], dict):
183
            logging.info("Values stored as dictionary")
184
            if config.open_ended_task:
185
                judgments[col] = judgments[col].apply(lambda x: OrderedCounter(x))
186
            else:
187
                judgments[col] = judgments[col].apply(lambda x: create_ordered_counter( \
188
                                 OrderedCounter(x), config.annotation_vector))
189
        else:
190 1
            logging.info("Values not stored as dictionary")
191 1
            if config.open_ended_task:
192 1
                judgments[col] = judgments[col].apply(lambda x: OrderedCounter( \
193
                                                      x.split(config.annotation_separator)))
194
            else:
195 1
                judgments[col] = judgments[col].apply(lambda x: create_ordered_counter( \
196
                                 OrderedCounter(x.split(config.annotation_separator)), \
197
                                 config.annotation_vector))
198 1
    return judgments
199
200
201 1
def add_missing_values(config, units):
202
    """ Adds missing vector values if is a closed task """
203 1
    for col in config.output.values():
204 1
        try:
205
            # openended = config.open_ended_task
206 1
            for idx in list(units.index):
207 1
                for relation in config.annotation_vector:
208 1
                    if relation not in units[col][idx]:
209 1
                        units[col][idx].update({relation : 0})
210 1
            return units
211
        except AttributeError:
212
            continue
213 1
def aggregate_annotations(config, judgments):
214
    """ Aggregates annotations and adds judgments stats. """
215 1
    annotations = pd.DataFrame()
216 1
    for col in config.output.values():
217 1
        judgments[col+'.count'] = judgments[col].apply(lambda x: sum(x.values()))
218 1
        judgments[col+'.unique'] = judgments[col].apply(lambda x: len(x))
219 1
        res = pd.DataFrame(judgments[col].apply(lambda x: \
220
              pd.Series(list(x.keys())).value_counts()).sum(), columns=[col])
221 1
        annotations = pd.concat([annotations, res], axis=0)
222 1
    return annotations, judgments
223
224 1
def process_file(judgments, config, filename=""):
225
    """ Processes input files with the given configuration """
226
227 1
    platform = get_platform(judgments)
228
229 1
    if platform is False:
230 1
        logging.info("Custom crowdsourcing platform!")
231 1
        no_of_columns = len(config.customPlatformColumns)
232 1
        if no_of_columns != 5:
233
            logging.warning("The following column names are required: judgment id, "
234
                            "unit id, worker id, start time, submit time")
235
            raise ValueError('No custom platform configuration was provided')
236
        else:
237
238 1
            platform = {
239
                #'id'       : 'custom',
240
                config.customPlatformColumns[0] : 'judgment',
241
                config.customPlatformColumns[1] : 'unit',
242
                config.customPlatformColumns[2] : 'worker',
243
                config.customPlatformColumns[3] : 'started',
244
                config.customPlatformColumns[4] : 'submitted'
245
            }
246
247
248
    # we must establish which fields were part of the input data and which are output judgments
249
    # if there is a config, check if there is a definition of which fields to use
250
    #config = []
251
    # else use the default and select them automatically
252 1
    config = get_column_types(judgments, config)
253
254
    # allow customization of the judgments
255 1
    judgments = config.processJudgments(judgments)
256
257
    # handle empty rows
258 1
    judgments = remove_empty_rows(config, judgments)
259
260
    # update the config after the preprocessing of judgments
261 1
    config = get_column_types(judgments, config)
262
263 1
    all_columns = dict(list(config.input.items()) + list(config.output.items()) \
264
                       + list(platform.items()))
265
    # allColumns = dict(config.input.items() | config.output.items() | platform.items())
266 1
    judgments = judgments.rename(columns=all_columns)
267
268
    # remove columns we don't care about
269 1
    judgments = judgments[list(all_columns.values())]
270
271 1
    if filename != "":
272 1
        judgments['job'] = filename.split('.csv')[0]
273
    else:
274 1
        judgments['job'] = "pd.DataFrame"
275
276
    # make output values safe keys
277 1
    judgments = make_output_cols_safe_keys(config, judgments)
278
279
    # remove units with just 1 judgment
280 1
    judgments = remove_single_judgment_units(judgments)
281
282 1
    judgments['started'] = judgments['started'].apply(lambda x: dateparser.parse(str(x)))
283 1
    judgments['submitted'] = judgments['submitted'].apply(lambda x: dateparser.parse(str(x)))
284 1
    judgments['duration'] = judgments.apply(lambda row: (row['submitted'] - row['started']).seconds,
285
                                            axis=1)
286
287
    #
288
    # aggregate units
289
    #
290 1
    units = Unit.aggregate(judgments, config)
291
292
    #
293
    # aggregate annotations
294
    # i.e. output columns
295
    #
296 1
    annotations, judgments = aggregate_annotations(config, judgments)
297
298
    #
299
    # aggregate workers
300
    #
301 1
    workers = Worker.aggregate(judgments, config)
302
303
    #
304
    # aggregate job
305
    #
306 1
    job = Job.aggregate(units, judgments, config)
307
308
    # Clean up judgments
309
    # remove input columns from judgments
310 1
    output_cols = [col for col in judgments.columns.values \
311
                    if col.startswith('output') or col.startswith('metric')]
312 1
    judgments = judgments[output_cols + list(platform.values()) + ['duration', 'job']]
313
314
    # set judgment id as index
315 1
    judgments.set_index('judgment', inplace=True)
316
317
    # add missing vector values if closed task
318 1
    units = add_missing_values(config, units)
319
320 1
    return {
321
        'jobs' : job,
322
        'units' : units,
323
        'workers' : workers,
324
        'judgments' : judgments,
325
        'annotations' : annotations,
326
        }, config
327