GitHub Access Token became invalid

It seems like the GitHub access token used for retrieving details about this repository from GitHub became invalid. This might prevent certain types of inspections from being run (in particular, everything related to pull requests).
Please ask an admin of your repository to re-new the access token on this website.
Completed
Push — master ( a2486a...0c98d4 )
by Oana
15:29
created

crowdtruth.load.load()   B

Complexity

Conditions 7

Size

Total Lines 44
Code Lines 28

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 16
CRAP Score 7.1929

Importance

Changes 0
Metric Value
eloc 28
dl 0
loc 44
ccs 16
cts 19
cp 0.8421
rs 7.808
c 0
b 0
f 0
cc 7
nop 1
crap 7.1929
1
#pylint: disable=W0223
2
3 1
"""
4
Module used to process and load the input files to be evaluated with the CrowdTruth metrics.
5
"""
6
7 1
import os
8
9 1
import logging
10 1
import datetime
11
12 1
from collections import Counter, OrderedDict
13
14 1
import pandas as pd
15
16 1
from crowdtruth.models.worker import Worker
17 1
from crowdtruth.models.unit import Unit
18 1
from crowdtruth.models.job import Job
19 1
from crowdtruth.configuration import DefaultConfig
20
21
22
23
24
# create an ordered counter so that we can maintain
25
# the position of tags in the order they were annotated
26 1
class OrderedCounter(Counter, OrderedDict):
27
    """ Instantiates an ordered counter. """
28 1
    pass
29
30 1
def create_ordered_counter(ordered_counter, annotation_vector):
31
    """ Instantiates an ordered counter from a given annotation vector. """
32 1
    for relation in annotation_vector:
33 1
        if relation not in ordered_counter:
34 1
            ordered_counter.update({relation: 0})
35 1
    return ordered_counter
36
37
38 1
class Found(Exception):
39
    """ Exception. """
40 1
    pass
41
42 1
def validate_timestamp_field(date_string, date_format):
43
    """ Validates the time columns (started time and submitted time) in input files. """
44
45
    try:
46
        date_obj = datetime.datetime.strptime(date_string, date_format)
47
        print(date_obj)
48
    except ValueError:
49
        raise ValueError('Incorrect date format')
50
51 1
def get_file_list(directory):
52
    """ List the files in the directry given as argument. """
53
    filelist = []
54
55
    # go through all files in this folder
56
    for file in os.listdir(directory):
57
        # if it is a folder scan it
58
        if os.path.isdir(directory+'/'+file):
59
            sublist = get_file_list(directory+'/'+file)
60
            sublist_length = len(sublist)
61
            if sublist_length:
62
                filelist.append(sublist)
63
64
        # if it is a csv file open it
65
        if file.endswith('.csv') and file != 'groundtruth.csv':
66
            filelist.append(file)
67
    return filelist
68
69 1
def list_files(kwargs):
70
    """ Creates a list of files to be processed. """
71 1
    files = []
72 1
    directory = ""
73 1
    if 'file' in kwargs and kwargs['file'].endswith('.csv'):
74 1
        files = [kwargs['file']]
75
    elif 'directory' in kwargs:
76
        directory = kwargs['directory']
77
        files = get_file_list(directory)
78
        logging.info('Found ' + str(len(files)) + ' files')
79
    else:
80
        raise ValueError('No input was provided')
81 1
    return files, directory
82
83 1
def load(**kwargs):
84
    """ Loads the input files. """
85
86
    # placeholder for aggregated results
87 1
    results = {
88
        'jobs' : [],
89
        'units' : [],
90
        'workers' : [],
91
        'judgments' : [],
92
        'annotations' : []
93
        }
94
95 1
    if 'config' not in kwargs:
96
        config = DefaultConfig()
97
    else:
98 1
        logging.info('Config loaded')
99 1
        config = kwargs['config']
100
101 1
    files, directory = list_files(kwargs)
102
103 1
    for file in files:
104 1
        if 'directory' in locals() and directory != "":
105
            logging.info("Processing " + file)
106
            file = directory + "/" + file
107 1
        res, config = process_file(file, config)
108 1
        for value in res:
109 1
            results[value].append(res[value])
110
111 1
    for value in results:
112 1
        results[value] = pd.concat(results[value])
113
114
115
    # workers and annotations can appear across jobs, so we have to aggregate those extra
116 1
    results['workers'] = results['workers'].groupby(results['workers'].index).agg({
117
        'unit' : 'sum',
118
        'judgment' : 'sum',
119
        'job' : 'count',
120
        'duration' : 'mean'
121
        })
122
123
    # aggregate annotations
124 1
    results['annotations'] = results['annotations'].groupby(results['annotations'].index).sum()
125
126 1
    return results, config
127
128 1
def remove_empty_rows(config, judgments):
129
    """ remove rows where the worker did not give an answer (AMT issue) """
130 1
    empty_rows = set()
131 1
    for col in config.outputColumns:
132 1
        empty_rows = empty_rows.union(judgments[pd.isnull(judgments[col]) == True].index)
133 1
    for col in config.outputColumns:
134 1
        judgments = judgments[pd.isnull(judgments[col]) == False]
135 1
    judgments = judgments.reset_index(drop=True)
136 1
    count_empty_rows = len(empty_rows)
137 1
    if count_empty_rows > 0:
138
        if count_empty_rows == 1:
139
            logging.warning(str(count_empty_rows) + " row with incomplete information in "
140
                            "output columns was removed.")
141
        else:
142
            logging.warning(str(count_empty_rows) + " rows with incomplete information in "
143
                            "output columns were removed.")
144 1
    return judgments
145
146 1
def remove_single_judgment_units(judgments):
147
    """ remove units with just 1 judgment """
148 1
    units_1work = judgments.groupby('unit').filter(lambda x: len(x) == 1)["unit"]
149 1
    judgments = judgments[~judgments['unit'].isin(units_1work)]
150 1
    judgments = judgments.reset_index(drop=True)
151 1
    no_units_1work = len(units_1work)
152 1
    if no_units_1work > 0:
153
        if no_units_1work == 1:
154
            logging.warning(str(no_units_1work) + " Media Unit that was annotated by only"
155
                            " 1 Worker was omitted, since agreement cannot be calculated.")
156
        else:
157
            logging.warning(str(no_units_1work) + " Media Units that were annotated by only"
158
                            " 1 Worker were omitted, since agreement cannot be calculated.")
159 1
    return judgments
160
161 1
def make_output_cols_safe_keys(config, judgments):
162
    """ make output values safe keys """
163 1
    for col in config.output.values():
164 1
        if isinstance(judgments[col].iloc[0], dict):
165
            logging.info("Values stored as dictionary")
166
            if config.open_ended_task:
167
                judgments[col] = judgments[col].apply(lambda x: OrderedCounter(x))
168
            else:
169
                judgments[col] = judgments[col].apply(lambda x: create_ordered_counter( \
170
                                 OrderedCounter(x), config.annotation_vector))
171
        else:
172 1
            logging.info("Values not stored as dictionary")
173 1
            if config.open_ended_task:
174 1
                judgments[col] = judgments[col].apply(lambda x: OrderedCounter( \
175
                                                      x.split(config.annotation_separator)))
176
            else:
177 1
                judgments[col] = judgments[col].apply(lambda x: create_ordered_counter( \
178
                                 OrderedCounter(x.split(config.annotation_separator)), \
179
                                 config.annotation_vector))
180 1
    return judgments
181
182 1
def process_file(filename, config):
183
    """ process input files with the given configuration"""
184
185 1
    judgments = pd.read_csv(filename)#, encoding=result['encoding'])
186
187 1
    platform = get_platform(judgments)
188
189 1
    if platform is False:
190 1
        logging.info("Custom crowdsourcing platform!")
191 1
        no_of_columns = len(config.customPlatformColumns)
192 1
        if no_of_columns != 5:
193
            logging.warning("The following column names are required: judgment id, "
194
                            "unit id, worker id, start time, submit time")
195
            raise ValueError('No custom platform configuration was provided')
196
        else:
197
198 1
            platform = {
199
                #'id'       : 'custom',
200
                config.customPlatformColumns[0] : 'judgment',
201
                config.customPlatformColumns[1] : 'unit',
202
                config.customPlatformColumns[2] : 'worker',
203
                config.customPlatformColumns[3] : 'started',
204
                config.customPlatformColumns[4] : 'submitted'
205
            }
206
207
208
    # we must establish which fields were part of the input data and which are output judgments
209
    # if there is a config, check if there is a definition of which fields to use
210
    #config = []
211
    # else use the default and select them automatically
212 1
    config = get_column_types(judgments, config)
213
214 1
    judgments = remove_empty_rows(config, judgments)
215
    # allow customization of the judgments
216 1
    judgments = config.processJudgments(judgments)
217
218
    # update the config after the preprocessing of judgments
219 1
    config = get_column_types(judgments, config)
220
221 1
    all_columns = dict(list(config.input.items()) + list(config.output.items()) \
222
                       + list(platform.items()))
223
    # allColumns = dict(config.input.items() | config.output.items() | platform.items())
224 1
    judgments = judgments.rename(columns=all_columns)
225
226
    # remove columns we don't care about
227 1
    judgments = judgments[list(all_columns.values())]
228
229 1
    judgments['job'] = filename.split('.csv')[0]
230
231
    # make output values safe keys
232 1
    judgments = make_output_cols_safe_keys(config, judgments)
233
234 1
    judgments['started'] = judgments['started'].apply(lambda x: pd.to_datetime(str(x)))
235 1
    judgments['submitted'] = judgments['submitted'].apply(lambda x: pd.to_datetime(str(x)))
236 1
    judgments['duration'] = judgments.apply(lambda row: (row['submitted'] - row['started']).seconds,
237
                                            axis=1)
238
239
    # remove units with just 1 judgment
240 1
    judgments = remove_single_judgment_units(judgments)
241
242
    #
243
    # aggregate units
244
    #
245 1
    units = Unit.aggregate(judgments, config)
246
247 1
    for col in config.output.values():
248 1
        judgments[col+'.count'] = judgments[col].apply(lambda x: sum(x.values()))
249 1
        judgments[col+'.unique'] = judgments[col].apply(lambda x: len(x))
250
251
252
    #
253
    # aggregate workers
254
    #
255 1
    workers = Worker.aggregate(judgments, config)
256
257
258
    #
259
    # aggregate annotations
260
    # i.e. output columns
261
    #
262 1
    annotations = pd.DataFrame()
263 1
    for col in config.output.values():
264 1
        res = pd.DataFrame(judgments[col].apply(lambda x: \
265
              pd.Series(list(x.keys())).value_counts()).sum(), columns=[col])
266 1
        annotations = pd.concat([annotations, res], axis=0)
267
268
    #
269
    # aggregate job
270
    #
271 1
    job = Job.aggregate(units, judgments, config)
272
273
    # Clean up judgments
274
    # remove input columns from judgments
275 1
    output_cols = [col for col in judgments.columns.values \
276
                    if col.startswith('output') or col.startswith('metric')]
277 1
    judgments = judgments[output_cols + list(platform.values()) + ['duration', 'job']]
278
279
    # set judgment id as index
280 1
    judgments.set_index('judgment', inplace=True)
281
282
    # add missing vector values if closed task
283 1
    for col in config.output.values():
284 1
        try:
285
            # openended = config.open_ended_task
286 1
            for idx in list(units.index):
287 1
                for relation in config.annotation_vector:
288 1
                    if relation not in units[col][idx]:
289 1
                        units[col][idx].update({relation : 0})
290
        except AttributeError:
291
            continue
292
293 1
    return {
294
        'jobs' : job,
295
        'units' : units,
296
        'workers' : workers,
297
        'judgments' : judgments,
298
        'annotations' : annotations,
299
        }, config
300
301
302 1
def get_platform(dframe):
303
    """ Get the crowdsourcing platform this file originates to """
304
305 1
    if dframe.columns.values[0] == '_unit_id':
306
        # CrowdFlower
307 1
        return {
308
            #'_platform'        : 'cf',
309
            '_id'           : 'judgment',
310
            '_unit_id'      : 'unit',
311
            '_worker_id'    : 'worker',
312
            '_started_at'   : 'started',
313
            '_created_at'   : 'submitted'
314
        }
315 1
    elif dframe.columns.values[0] == 'HITId':
316
        # Mturk
317
        return {
318
            #'id'       : 'amt',
319
            'AssignmentId'  : 'judgment',
320
            'HITId'         : 'unit',
321
            'WorkerId'      : 'worker',
322
            'AcceptTime'    : 'started',
323
            'SubmitTime'    : 'submitted'
324
        }
325 1
    return False
326
327 1
def configure_amt_columns(dframe, config):
328
    """ Configures AMT input and output columns. """
329
    config.input = {}
330
    config.output = {}
331
332
    if config.inputColumns:
333
        config.input = {c: 'input.'+c.replace('Input.', '') \
334
                        for c in dframe.columns.values if c in config.inputColumns}
335
    else:
336
        config.input = {c: 'input.'+c.replace('Input.', '') \
337
                        for c in dframe.columns.values if c.startswith('Input.')}
338
339
    # if config is specified, use those columns
340
    if config.outputColumns:
341
        config.output = {c: 'output.'+c.replace('Answer.', '') \
342
                         for c in dframe.columns.values if c in config.outputColumns}
343
    else:
344
        config.output = {c: 'output.'+c.replace('Answer.', '') \
345
                         for c in dframe.columns.values if c.startswith('Answer.')}
346
    return config.input, config.output
347
348 1
def configure_platform_columns(dframe, config):
349
    """ Configures FigureEight and custom platforms input and output columns. """
350 1
    config.input = {}
351 1
    config.output = {}
352
353 1
    if config.inputColumns:
354 1
        config.input = {c: 'input.'+c for c in dframe.columns.values \
355
                        if c in config.inputColumns}
356 1
    if config.outputColumns:
357 1
        config.output = {c: 'output.'+c for c in dframe.columns.values \
358
                         if c in config.outputColumns}
359 1
    return config.input, config.output
360
361
362 1
def get_column_types(dframe, config):
363
    """ return input and output columns """
364
    # returns a list of columns that contain are input content
365 1
    config.input = {}
366 1
    config.output = {}
367
368
    # get a dict of the columns with input content and the columns with output judgments
369
    # each entry matches [original column name]:[safestring column name]
370 1
    if dframe.columns.values[0] == 'HITId':
371
        # Mturk
372
        # if config is specified, use those columns
373
        config.input, config.output = configure_amt_columns(dframe, config)
374
375
        return config
376
377 1
    elif dframe.columns.values[0] == '_unit_id':
378
379
        # if a config is specified, use those columns
380 1
        config.input, config.output = configure_platform_columns(dframe, config)
381
        # if there is a config for both input and output columns, we can return those
382 1
        if config.inputColumns and config.outputColumns:
383 1
            return config
384
385
        # try to identify the input and output columns
386
        # this is the case if all the values in the column are identical
387
        # this is not failsafe but should give decent results without settings
388
        # it is best to make a settings.py file for a collection
389
390
        units = dframe.groupby('_unit_id')
391
        columns = [c for c in dframe.columns.values if c != 'clustering' and not c.startswith('_')
392
                   and not c.startswith('e_') and not c.endswith('_gold')
393
                   and not c.endswith('_reason') and not c.endswith('browser')]
394
        for colname in columns:
395
            try:
396
                for _, unit in units:
397
                    unique = unit[colname].nunique()
398
                    if unique != 1 and unique != 0:
399
                        raise Found
400
                if not config.inputColumns:
401
                    config.input[colname] = 'input.'+colname
402
403
            except Found:
404
                if not config.outputColumns:
405
                    config.output[colname] = 'output.'+colname
406
407
        return config
408
    else:
409
        # unknown platform type
410
411
        # if a config is specified, use those columns
412 1
        config.input, config.output = configure_platform_columns(dframe, config)
413
        # if there is a config for both input and output columns, we can return those
414 1
        if config.inputColumns and config.outputColumns:
415
            return config
416