GitHub Access Token became invalid

It seems like the GitHub access token used for retrieving details about this repository from GitHub became invalid. This might prevent certain types of inspections from being run (in particular, everything related to pull requests).
Please ask an admin of your repository to re-new the access token on this website.
Completed
Push — master ( fcabfa...fec086 )
by Oana
19:59
created

crowdtruth.load.process_file()   B

Complexity

Conditions 6

Size

Total Lines 99
Code Lines 46

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 30
CRAP Score 6.0087

Importance

Changes 0
Metric Value
cc 6
eloc 46
nop 2
dl 0
loc 99
ccs 30
cts 32
cp 0.9375
crap 6.0087
rs 7.8339
c 0
b 0
f 0

How to fix   Long Method   

Long Method

Small methods make your code easier to understand, in particular if combined with a good name. Besides, if your method is small, finding a good name is usually much easier.

For example, if you find yourself adding comments to a method's body, this is usually a good sign to extract the commented part to a new method, and use the comment as a starting point when coming up with a good name for this new method.

Commonly applied refactorings include:

1
#pylint: disable=W0223
2
3 1
"""
4
Module used to process and load the input files to be evaluated with the CrowdTruth metrics.
5
"""
6
7 1
import os
8
9 1
import logging
10 1
import datetime
11
12 1
from collections import Counter, OrderedDict
13
14 1
import pandas as pd
15
16 1
from crowdtruth.models.worker import Worker
17 1
from crowdtruth.models.unit import Unit
18 1
from crowdtruth.models.job import Job
19 1
from crowdtruth.configuration import DefaultConfig
20 1
from crowdtruth.crowd_platform import *
21
22
23
24
25
# create an ordered counter so that we can maintain
26
# the position of tags in the order they were annotated
27 1
class OrderedCounter(Counter, OrderedDict):
28
    """ Instantiates an ordered counter. """
29 1
    pass
30
31 1
def create_ordered_counter(ordered_counter, annotation_vector):
32
    """ Instantiates an ordered counter from a given annotation vector. """
33 1
    for relation in annotation_vector:
34 1
        if relation not in ordered_counter:
35 1
            ordered_counter.update({relation: 0})
36 1
    return ordered_counter
37
38 1
def validate_timestamp_field(date_string, date_format):
39
    """ Validates the time columns (started time and submitted time) in input files. """
40
41
    try:
42
        date_obj = datetime.datetime.strptime(date_string, date_format)
43
        print(date_obj)
44
    except ValueError:
45
        raise ValueError('Incorrect date format')
46
47 1
def get_file_list(directory):
48
    """ List the files in the directry given as argument. """
49
    filelist = []
50
51
    # go through all files in this folder
52
    for file in os.listdir(directory):
53
        # if it is a folder scan it
54
        if os.path.isdir(directory+'/'+file):
55
            sublist = get_file_list(directory+'/'+file)
56
            sublist_length = len(sublist)
57
            if sublist_length:
58
                filelist.append(sublist)
59
60
        # if it is a csv file open it
61
        if file.endswith('.csv') and file != 'groundtruth.csv':
62
            filelist.append(file)
63
    return filelist
64
65 1
def list_files(kwargs, results, config):
66
    """ Creates a list of files to be processed. """
67 1
    files = []
68 1
    directory = ""
69 1
    if 'file' in kwargs and kwargs['file'].endswith('.csv'):
70 1
        files = [kwargs['file']]
71
    elif 'directory' in kwargs:
72
        directory = kwargs['directory']
73
        files = get_file_list(directory)
74
        logging.info('Found ' + str(len(files)) + ' files')
75
    else:
76
        raise ValueError('No input was provided')
77
78 1
    for file in files:
79 1
        if 'directory' in locals() and directory != "":
80
            logging.info("Processing " + file)
81
            file = directory + "/" + file
82 1
        res, config = process_file(file, config)
83 1
        for value in res:
84 1
            results[value].append(res[value])
85
86 1
    return results
87
88 1
def load(**kwargs):
89
    """ Loads the input files. """
90
91
    # placeholder for aggregated results
92 1
    results = {
93
        'jobs' : [],
94
        'units' : [],
95
        'workers' : [],
96
        'judgments' : [],
97
        'annotations' : []
98
        }
99
100 1
    if 'config' not in kwargs:
101
        config = DefaultConfig()
102
    else:
103 1
        logging.info('Config loaded')
104 1
        config = kwargs['config']
105
106 1
    results = list_files(kwargs, results, config)
107
108 1
    for value in results:
109 1
        results[value] = pd.concat(results[value])
110
111
112
    # workers and annotations can appear across jobs, so we have to aggregate those extra
113 1
    results['workers'] = results['workers'].groupby(results['workers'].index).agg({
114
        'unit' : 'sum',
115
        'judgment' : 'sum',
116
        'job' : 'count',
117
        'duration' : 'mean'
118
        })
119
120
    # aggregate annotations
121 1
    results['annotations'] = results['annotations'].groupby(results['annotations'].index).sum()
122
123 1
    return results, config
124
125 1
def remove_empty_rows(config, judgments):
126
    """ remove rows where the worker did not give an answer (AMT issue) """
127 1
    empty_rows = set()
128 1
    for col in config.outputColumns:
129 1
        empty_rows = empty_rows.union(judgments[pd.isnull(judgments[col]) == True].index)
130 1
    for col in config.outputColumns:
131 1
        judgments = judgments[pd.isnull(judgments[col]) == False]
132 1
    judgments = judgments.reset_index(drop=True)
133 1
    count_empty_rows = len(empty_rows)
134 1
    if count_empty_rows > 0:
135
        if count_empty_rows == 1:
136
            logging.warning(str(count_empty_rows) + " row with incomplete information in "
137
                            "output columns was removed.")
138
        else:
139
            logging.warning(str(count_empty_rows) + " rows with incomplete information in "
140
                            "output columns were removed.")
141 1
    return judgments
142
143 1
def remove_single_judgment_units(judgments):
144
    """ remove units with just 1 judgment """
145 1
    units_1work = judgments.groupby('unit').filter(lambda x: len(x) == 1)["unit"]
146 1
    judgments = judgments[~judgments['unit'].isin(units_1work)]
147 1
    judgments = judgments.reset_index(drop=True)
148 1
    no_units_1work = len(units_1work)
149 1
    if no_units_1work > 0:
150
        if no_units_1work == 1:
151
            logging.warning(str(no_units_1work) + " Media Unit that was annotated by only"
152
                            " 1 Worker was omitted, since agreement cannot be calculated.")
153
        else:
154
            logging.warning(str(no_units_1work) + " Media Units that were annotated by only"
155
                            " 1 Worker were omitted, since agreement cannot be calculated.")
156 1
    return judgments
157
158 1
def make_output_cols_safe_keys(config, judgments):
159
    """ make output values safe keys """
160 1
    for col in config.output.values():
161 1
        if isinstance(judgments[col].iloc[0], dict):
162
            logging.info("Values stored as dictionary")
163
            if config.open_ended_task:
164
                judgments[col] = judgments[col].apply(lambda x: OrderedCounter(x))
165
            else:
166
                judgments[col] = judgments[col].apply(lambda x: create_ordered_counter( \
167
                                 OrderedCounter(x), config.annotation_vector))
168
        else:
169 1
            logging.info("Values not stored as dictionary")
170 1
            if config.open_ended_task:
171 1
                judgments[col] = judgments[col].apply(lambda x: OrderedCounter( \
172
                                                      x.split(config.annotation_separator)))
173
            else:
174 1
                judgments[col] = judgments[col].apply(lambda x: create_ordered_counter( \
175
                                 OrderedCounter(x.split(config.annotation_separator)), \
176
                                 config.annotation_vector))
177 1
    return judgments
178
179
180 1
def add_missing_values(config, units):
181
    """ Adds missing vector values if is a closed task """
182 1
    for col in config.output.values():
183 1
        try:
184
            # openended = config.open_ended_task
185 1
            for idx in list(units.index):
186 1
                for relation in config.annotation_vector:
187 1
                    if relation not in units[col][idx]:
188 1
                        units[col][idx].update({relation : 0})
189 1
            return units
190
        except AttributeError:
191
            continue
192 1
def aggregate_annotations(config, judgments):
193
    """ Aggregates annotations and adds judgments stats. """
194 1
    annotations = pd.DataFrame()
195 1
    for col in config.output.values():
196 1
        judgments[col+'.count'] = judgments[col].apply(lambda x: sum(x.values()))
197 1
        judgments[col+'.unique'] = judgments[col].apply(lambda x: len(x))
198 1
        res = pd.DataFrame(judgments[col].apply(lambda x: \
199
              pd.Series(list(x.keys())).value_counts()).sum(), columns=[col])
200 1
        annotations = pd.concat([annotations, res], axis=0)
201 1
    return annotations, judgments
202
203 1
def process_file(filename, config):
204
    """ Processes input files with the given configuration """
205
206 1
    judgments = pd.read_csv(filename)#, encoding=result['encoding'])
207
208 1
    platform = get_platform(judgments)
0 ignored issues
show
Comprehensibility Best Practice introduced by
The variable get_platform does not seem to be defined.
Loading history...
209
210 1
    if platform is False:
211 1
        logging.info("Custom crowdsourcing platform!")
212 1
        no_of_columns = len(config.customPlatformColumns)
213 1
        if no_of_columns != 5:
214
            logging.warning("The following column names are required: judgment id, "
215
                            "unit id, worker id, start time, submit time")
216
            raise ValueError('No custom platform configuration was provided')
217
        else:
218
219 1
            platform = {
220
                #'id'       : 'custom',
221
                config.customPlatformColumns[0] : 'judgment',
222
                config.customPlatformColumns[1] : 'unit',
223
                config.customPlatformColumns[2] : 'worker',
224
                config.customPlatformColumns[3] : 'started',
225
                config.customPlatformColumns[4] : 'submitted'
226
            }
227
228
229
    # we must establish which fields were part of the input data and which are output judgments
230
    # if there is a config, check if there is a definition of which fields to use
231
    #config = []
232
    # else use the default and select them automatically
233 1
    config = get_column_types(judgments, config)
0 ignored issues
show
Comprehensibility Best Practice introduced by
The variable get_column_types does not seem to be defined.
Loading history...
234
235 1
    judgments = remove_empty_rows(config, judgments)
236
    # allow customization of the judgments
237 1
    judgments = config.processJudgments(judgments)
238
239
    # update the config after the preprocessing of judgments
240 1
    config = get_column_types(judgments, config)
241
242 1
    all_columns = dict(list(config.input.items()) + list(config.output.items()) \
243
                       + list(platform.items()))
244
    # allColumns = dict(config.input.items() | config.output.items() | platform.items())
245 1
    judgments = judgments.rename(columns=all_columns)
246
247
    # remove columns we don't care about
248 1
    judgments = judgments[list(all_columns.values())]
249
250 1
    judgments['job'] = filename.split('.csv')[0]
251
252
    # make output values safe keys
253 1
    judgments = make_output_cols_safe_keys(config, judgments)
254
255
    # remove units with just 1 judgment
256 1
    judgments = remove_single_judgment_units(judgments)
257 1
    judgments['started'] = judgments['started'].apply(lambda x: pd.to_datetime(str(x)))
258 1
    judgments['submitted'] = judgments['submitted'].apply(lambda x: pd.to_datetime(str(x)))
259 1
    judgments['duration'] = judgments.apply(lambda row: (row['submitted'] - row['started']).seconds,
260
                                            axis=1)
261
262
    #
263
    # aggregate units
264
    #
265 1
    units = Unit.aggregate(judgments, config)
266
267
    #
268
    # aggregate annotations
269
    # i.e. output columns
270
    #
271 1
    annotations, judgments = aggregate_annotations(config, judgments)
272
273
    #
274
    # aggregate workers
275
    #
276 1
    workers = Worker.aggregate(judgments, config)
277
278
    #
279
    # aggregate job
280
    #
281 1
    job = Job.aggregate(units, judgments, config)
282
283
    # Clean up judgments
284
    # remove input columns from judgments
285 1
    output_cols = [col for col in judgments.columns.values \
286
                    if col.startswith('output') or col.startswith('metric')]
287 1
    judgments = judgments[output_cols + list(platform.values()) + ['duration', 'job']]
288
289
    # set judgment id as index
290 1
    judgments.set_index('judgment', inplace=True)
291
292
    # add missing vector values if closed task
293 1
    units = add_missing_values(config, units)
294
295 1
    return {
296
        'jobs' : job,
297
        'units' : units,
298
        'workers' : workers,
299
        'judgments' : judgments,
300
        'annotations' : annotations,
301
        }, config
302