GitHub Access Token became invalid

It seems like the GitHub access token used for retrieving details about this repository from GitHub became invalid. This might prevent certain types of inspections from being run (in particular, everything related to pull requests).
Please ask an admin of your repository to re-new the access token on this website.
Completed
Push — master ( d2060a...fcabfa )
by Oana
18:37
created

crowdtruth.load.add_missing_values()   B

Complexity

Conditions 6

Size

Total Lines 12
Code Lines 10

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 8
CRAP Score 6.288

Importance

Changes 0
Metric Value
eloc 10
dl 0
loc 12
ccs 8
cts 10
cp 0.8
rs 8.6666
c 0
b 0
f 0
cc 6
nop 2
crap 6.288
1
#pylint: disable=W0223
2
3 1
"""
4
Module used to process and load the input files to be evaluated with the CrowdTruth metrics.
5
"""
6
7 1
import os
8
9 1
import logging
10 1
import datetime
11
12 1
from collections import Counter, OrderedDict
13
14 1
import pandas as pd
15
16 1
from crowdtruth.models.worker import Worker
17 1
from crowdtruth.models.unit import Unit
18 1
from crowdtruth.models.job import Job
19 1
from crowdtruth.configuration import DefaultConfig
20 1
from crowdtruth.crowd_platform import *
21
22
23
24
25
# create an ordered counter so that we can maintain
26
# the position of tags in the order they were annotated
27 1
class OrderedCounter(Counter, OrderedDict):
28
    """ Instantiates an ordered counter. """
29 1
    pass
30
31 1
def create_ordered_counter(ordered_counter, annotation_vector):
32
    """ Instantiates an ordered counter from a given annotation vector. """
33 1
    for relation in annotation_vector:
34 1
        if relation not in ordered_counter:
35 1
            ordered_counter.update({relation: 0})
36 1
    return ordered_counter
37
38 1
def validate_timestamp_field(date_string, date_format):
39
    """ Validates the time columns (started time and submitted time) in input files. """
40
41
    try:
42
        date_obj = datetime.datetime.strptime(date_string, date_format)
43
        print(date_obj)
44
    except ValueError:
45
        raise ValueError('Incorrect date format')
46
47 1
def get_file_list(directory):
48
    """ List the files in the directry given as argument. """
49
    filelist = []
50
51
    # go through all files in this folder
52
    for file in os.listdir(directory):
53
        # if it is a folder scan it
54
        if os.path.isdir(directory+'/'+file):
55
            sublist = get_file_list(directory+'/'+file)
56
            sublist_length = len(sublist)
57
            if sublist_length:
58
                filelist.append(sublist)
59
60
        # if it is a csv file open it
61
        if file.endswith('.csv') and file != 'groundtruth.csv':
62
            filelist.append(file)
63
    return filelist
64
65 1
def list_files(kwargs, results, config):
66
    """ Creates a list of files to be processed. """
67 1
    files = []
68 1
    directory = ""
69 1
    if 'file' in kwargs and kwargs['file'].endswith('.csv'):
70 1
        files = [kwargs['file']]
71
    elif 'directory' in kwargs:
72
        directory = kwargs['directory']
73
        files = get_file_list(directory)
74
        logging.info('Found ' + str(len(files)) + ' files')
75
    else:
76
        raise ValueError('No input was provided')
77
78 1
    for file in files:
79 1
        if 'directory' in locals() and directory != "":
80
            logging.info("Processing " + file)
81
            file = directory + "/" + file
82 1
        res, config = process_file(file, config)
83 1
        for value in res:
84 1
            results[value].append(res[value])
85
86 1
    return results
87
88 1
def load(**kwargs):
89
    """ Loads the input files. """
90
91
    # placeholder for aggregated results
92 1
    results = {
93
        'jobs' : [],
94
        'units' : [],
95
        'workers' : [],
96
        'judgments' : [],
97
        'annotations' : []
98
        }
99
100 1
    if 'config' not in kwargs:
101
        config = DefaultConfig()
102
    else:
103 1
        logging.info('Config loaded')
104 1
        config = kwargs['config']
105
106 1
    results = list_files(kwargs, results, config)
107
108 1
    for value in results:
109 1
        results[value] = pd.concat(results[value])
110
111
112
    # workers and annotations can appear across jobs, so we have to aggregate those extra
113 1
    results['workers'] = results['workers'].groupby(results['workers'].index).agg({
114
        'unit' : 'sum',
115
        'judgment' : 'sum',
116
        'job' : 'count',
117
        'duration' : 'mean'
118
        })
119
120
    # aggregate annotations
121 1
    results['annotations'] = results['annotations'].groupby(results['annotations'].index).sum()
122
123 1
    return results, config
124
125 1
def remove_empty_rows(config, judgments):
126
    """ remove rows where the worker did not give an answer (AMT issue) """
127 1
    empty_rows = set()
128 1
    for col in config.outputColumns:
129 1
        empty_rows = empty_rows.union(judgments[pd.isnull(judgments[col]) == True].index)
130 1
    for col in config.outputColumns:
131 1
        judgments = judgments[pd.isnull(judgments[col]) == False]
132 1
    judgments = judgments.reset_index(drop=True)
133 1
    count_empty_rows = len(empty_rows)
134 1
    if count_empty_rows > 0:
135
        if count_empty_rows == 1:
136
            logging.warning(str(count_empty_rows) + " row with incomplete information in "
137
                            "output columns was removed.")
138
        else:
139
            logging.warning(str(count_empty_rows) + " rows with incomplete information in "
140
                            "output columns were removed.")
141 1
    return judgments
142
143 1
def remove_single_judgment_units(judgments):
144
    """ remove units with just 1 judgment """
145 1
    units_1work = judgments.groupby('unit').filter(lambda x: len(x) == 1)["unit"]
146 1
    judgments = judgments[~judgments['unit'].isin(units_1work)]
147 1
    judgments = judgments.reset_index(drop=True)
148 1
    no_units_1work = len(units_1work)
149 1
    if no_units_1work > 0:
150
        if no_units_1work == 1:
151
            logging.warning(str(no_units_1work) + " Media Unit that was annotated by only"
152
                            " 1 Worker was omitted, since agreement cannot be calculated.")
153
        else:
154
            logging.warning(str(no_units_1work) + " Media Units that were annotated by only"
155
                            " 1 Worker were omitted, since agreement cannot be calculated.")
156 1
    return judgments
157
158 1
def make_output_cols_safe_keys(config, judgments):
159
    """ make output values safe keys """
160 1
    for col in config.output.values():
161 1
        if isinstance(judgments[col].iloc[0], dict):
162
            logging.info("Values stored as dictionary")
163
            if config.open_ended_task:
164
                judgments[col] = judgments[col].apply(lambda x: OrderedCounter(x))
165
            else:
166
                judgments[col] = judgments[col].apply(lambda x: create_ordered_counter( \
167
                                 OrderedCounter(x), config.annotation_vector))
168
        else:
169 1
            logging.info("Values not stored as dictionary")
170 1
            if config.open_ended_task:
171 1
                judgments[col] = judgments[col].apply(lambda x: OrderedCounter( \
172
                                                      x.split(config.annotation_separator)))
173
            else:
174 1
                judgments[col] = judgments[col].apply(lambda x: create_ordered_counter( \
175
                                 OrderedCounter(x.split(config.annotation_separator)), \
176
                                 config.annotation_vector))
177 1
    judgments['started'] = judgments['started'].apply(lambda x: pd.to_datetime(str(x)))
178 1
    judgments['submitted'] = judgments['submitted'].apply(lambda x: pd.to_datetime(str(x)))
179 1
    judgments['duration'] = judgments.apply(lambda row: (row['submitted'] - row['started']).seconds,
180
                                            axis=1)
181 1
    return judgments
182
183
184 1
def add_missing_values(config, units):
185
    """ Adds missing vector values if is a closed task """
186 1
    for col in config.output.values():
187 1
        try:
188
            # openended = config.open_ended_task
189 1
            for idx in list(units.index):
190 1
                for relation in config.annotation_vector:
191 1
                    if relation not in units[col][idx]:
192 1
                        units[col][idx].update({relation : 0})
193 1
            return units
194
        except AttributeError:
195
            continue
196 1
def aggregate_annotations(config, judgments):
197
    """ Aggregates annotations and adds judgments stats. """
198 1
    annotations = pd.DataFrame()
199 1
    for col in config.output.values():
200 1
        judgments[col+'.count'] = judgments[col].apply(lambda x: sum(x.values()))
201 1
        judgments[col+'.unique'] = judgments[col].apply(lambda x: len(x))
202 1
        res = pd.DataFrame(judgments[col].apply(lambda x: \
203
              pd.Series(list(x.keys())).value_counts()).sum(), columns=[col])
204 1
        annotations = pd.concat([annotations, res], axis=0)
205 1
    return annotations, judgments
206
207 1
def process_file(filename, config):
208
    """ Processes input files with the given configuration """
209
210 1
    judgments = pd.read_csv(filename)#, encoding=result['encoding'])
211
212 1
    platform = get_platform(judgments)
0 ignored issues
show
Comprehensibility Best Practice introduced by
The variable get_platform does not seem to be defined.
Loading history...
213
214 1
    if platform is False:
215 1
        logging.info("Custom crowdsourcing platform!")
216 1
        no_of_columns = len(config.customPlatformColumns)
217 1
        if no_of_columns != 5:
218
            logging.warning("The following column names are required: judgment id, "
219
                            "unit id, worker id, start time, submit time")
220
            raise ValueError('No custom platform configuration was provided')
221
        else:
222
223 1
            platform = {
224
                #'id'       : 'custom',
225
                config.customPlatformColumns[0] : 'judgment',
226
                config.customPlatformColumns[1] : 'unit',
227
                config.customPlatformColumns[2] : 'worker',
228
                config.customPlatformColumns[3] : 'started',
229
                config.customPlatformColumns[4] : 'submitted'
230
            }
231
232
233
    # we must establish which fields were part of the input data and which are output judgments
234
    # if there is a config, check if there is a definition of which fields to use
235
    #config = []
236
    # else use the default and select them automatically
237 1
    config = get_column_types(judgments, config)
0 ignored issues
show
Comprehensibility Best Practice introduced by
The variable get_column_types does not seem to be defined.
Loading history...
238
239 1
    judgments = remove_empty_rows(config, judgments)
240
    # allow customization of the judgments
241 1
    judgments = config.processJudgments(judgments)
242
243
    # update the config after the preprocessing of judgments
244 1
    config = get_column_types(judgments, config)
245
246 1
    all_columns = dict(list(config.input.items()) + list(config.output.items()) \
247
                       + list(platform.items()))
248
    # allColumns = dict(config.input.items() | config.output.items() | platform.items())
249 1
    judgments = judgments.rename(columns=all_columns)
250
251
    # remove columns we don't care about
252 1
    judgments = judgments[list(all_columns.values())]
253
254 1
    judgments['job'] = filename.split('.csv')[0]
255
256
    # make output values safe keys
257 1
    judgments = make_output_cols_safe_keys(config, judgments)
258
259
    # remove units with just 1 judgment
260 1
    judgments = remove_single_judgment_units(judgments)
261
262
    #
263
    # aggregate units
264
    #
265 1
    units = Unit.aggregate(judgments, config)
266
267
    #
268
    # aggregate annotations
269
    # i.e. output columns
270
    #
271 1
    annotations, judgments = aggregate_annotations(config, judgments)
272
273
    #
274
    # aggregate workers
275
    #
276 1
    workers = Worker.aggregate(judgments, config)
277
278
    #
279
    # aggregate job
280
    #
281 1
    job = Job.aggregate(units, judgments, config)
282
283
    # Clean up judgments
284
    # remove input columns from judgments
285 1
    output_cols = [col for col in judgments.columns.values \
286
                    if col.startswith('output') or col.startswith('metric')]
287 1
    judgments = judgments[output_cols + list(platform.values()) + ['duration', 'job']]
288
289
    # set judgment id as index
290 1
    judgments.set_index('judgment', inplace=True)
291
292
    # add missing vector values if closed task
293 1
    units = add_missing_values(config, units)
294
295 1
    return {
296
        'jobs' : job,
297
        'units' : units,
298
        'workers' : workers,
299
        'judgments' : judgments,
300
        'annotations' : annotations,
301
        }, config
302