GitHub Access Token became invalid

It seems like the GitHub access token used for retrieving details about this repository from GitHub became invalid. This might prevent certain types of inspections from being run (in particular, everything related to pull requests).
Please ask an admin of your repository to re-new the access token on this website.
Completed
Push — master ( 77d3da...d2060a )
by Oana
14:13 queued 10s
created

crowdtruth.load.create_ordered_counter()   A

Complexity

Conditions 3

Size

Total Lines 6
Code Lines 5

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 5
CRAP Score 3

Importance

Changes 0
Metric Value
eloc 5
dl 0
loc 6
rs 10
c 0
b 0
f 0
ccs 5
cts 5
cp 1
cc 3
nop 2
crap 3
1
#pylint: disable=W0223
2
3 1
"""
4
Module used to process and load the input files to be evaluated with the CrowdTruth metrics.
5
"""
6
7 1
import os
8
9 1
import logging
10 1
import datetime
11
12 1
from collections import Counter, OrderedDict
13
14 1
import pandas as pd
15
16 1
from crowdtruth.models.worker import Worker
17 1
from crowdtruth.models.unit import Unit
18 1
from crowdtruth.models.job import Job
19 1
from crowdtruth.configuration import DefaultConfig
20
21
22
23
24
# create an ordered counter so that we can maintain
25
# the position of tags in the order they were annotated
26 1
class OrderedCounter(Counter, OrderedDict):
27
    """ Instantiates an ordered counter. """
28 1
    pass
29
30 1
def create_ordered_counter(ordered_counter, annotation_vector):
31
    """ Instantiates an ordered counter from a given annotation vector. """
32 1
    for relation in annotation_vector:
33 1
        if relation not in ordered_counter:
34 1
            ordered_counter.update({relation: 0})
35 1
    return ordered_counter
36
37
38 1
class Found(Exception):
39
    """ Exception. """
40 1
    pass
41
42 1
def validate_timestamp_field(date_string, date_format):
43
    """ Validates the time columns (started time and submitted time) in input files. """
44
45
    try:
46
        date_obj = datetime.datetime.strptime(date_string, date_format)
47
        print(date_obj)
48
    except ValueError:
49
        raise ValueError('Incorrect date format')
50
51 1
def get_file_list(directory):
52
    """ List the files in the directry given as argument. """
53
    filelist = []
54
55
    # go through all files in this folder
56
    for file in os.listdir(directory):
57
        # if it is a folder scan it
58
        if os.path.isdir(directory+'/'+file):
59
            sublist = get_file_list(directory+'/'+file)
60
            sublist_length = len(sublist)
61
            if sublist_length:
62
                filelist.append(sublist)
63
64
        # if it is a csv file open it
65
        if file.endswith('.csv') and file != 'groundtruth.csv':
66
            filelist.append(file)
67
    return filelist
68
69 1
def list_files(kwargs, results, config):
70
    """ Creates a list of files to be processed. """
71 1
    files = []
72 1
    directory = ""
73 1
    if 'file' in kwargs and kwargs['file'].endswith('.csv'):
74 1
        files = [kwargs['file']]
75
    elif 'directory' in kwargs:
76
        directory = kwargs['directory']
77
        files = get_file_list(directory)
78
        logging.info('Found ' + str(len(files)) + ' files')
79
    else:
80
        raise ValueError('No input was provided')
81
82 1
    for file in files:
83 1
        if 'directory' in locals() and directory != "":
84
            logging.info("Processing " + file)
85
            file = directory + "/" + file
86 1
        res, config = process_file(file, config)
87 1
        for value in res:
88 1
            results[value].append(res[value])
89
90 1
    return results
91
92 1
def load(**kwargs):
93
    """ Loads the input files. """
94
95
    # placeholder for aggregated results
96 1
    results = {
97
        'jobs' : [],
98
        'units' : [],
99
        'workers' : [],
100
        'judgments' : [],
101
        'annotations' : []
102
        }
103
104 1
    if 'config' not in kwargs:
105
        config = DefaultConfig()
106
    else:
107 1
        logging.info('Config loaded')
108 1
        config = kwargs['config']
109
110 1
    results = list_files(kwargs, results, config)
111
112 1
    for value in results:
113 1
        results[value] = pd.concat(results[value])
114
115
116
    # workers and annotations can appear across jobs, so we have to aggregate those extra
117 1
    results['workers'] = results['workers'].groupby(results['workers'].index).agg({
118
        'unit' : 'sum',
119
        'judgment' : 'sum',
120
        'job' : 'count',
121
        'duration' : 'mean'
122
        })
123
124
    # aggregate annotations
125 1
    results['annotations'] = results['annotations'].groupby(results['annotations'].index).sum()
126
127 1
    return results, config
128
129 1
def remove_empty_rows(config, judgments):
130
    """ remove rows where the worker did not give an answer (AMT issue) """
131 1
    empty_rows = set()
132 1
    for col in config.outputColumns:
133 1
        empty_rows = empty_rows.union(judgments[pd.isnull(judgments[col]) == True].index)
134 1
    for col in config.outputColumns:
135 1
        judgments = judgments[pd.isnull(judgments[col]) == False]
136 1
    judgments = judgments.reset_index(drop=True)
137 1
    count_empty_rows = len(empty_rows)
138 1
    if count_empty_rows > 0:
139
        if count_empty_rows == 1:
140
            logging.warning(str(count_empty_rows) + " row with incomplete information in "
141
                            "output columns was removed.")
142
        else:
143
            logging.warning(str(count_empty_rows) + " rows with incomplete information in "
144
                            "output columns were removed.")
145 1
    return judgments
146
147 1
def remove_single_judgment_units(judgments):
148
    """ remove units with just 1 judgment """
149 1
    units_1work = judgments.groupby('unit').filter(lambda x: len(x) == 1)["unit"]
150 1
    judgments = judgments[~judgments['unit'].isin(units_1work)]
151 1
    judgments = judgments.reset_index(drop=True)
152 1
    no_units_1work = len(units_1work)
153 1
    if no_units_1work > 0:
154
        if no_units_1work == 1:
155
            logging.warning(str(no_units_1work) + " Media Unit that was annotated by only"
156
                            " 1 Worker was omitted, since agreement cannot be calculated.")
157
        else:
158
            logging.warning(str(no_units_1work) + " Media Units that were annotated by only"
159
                            " 1 Worker were omitted, since agreement cannot be calculated.")
160 1
    return judgments
161
162 1
def make_output_cols_safe_keys(config, judgments):
163
    """ make output values safe keys """
164 1
    for col in config.output.values():
165 1
        if isinstance(judgments[col].iloc[0], dict):
166
            logging.info("Values stored as dictionary")
167
            if config.open_ended_task:
168
                judgments[col] = judgments[col].apply(lambda x: OrderedCounter(x))
169
            else:
170
                judgments[col] = judgments[col].apply(lambda x: create_ordered_counter( \
171
                                 OrderedCounter(x), config.annotation_vector))
172
        else:
173 1
            logging.info("Values not stored as dictionary")
174 1
            if config.open_ended_task:
175 1
                judgments[col] = judgments[col].apply(lambda x: OrderedCounter( \
176
                                                      x.split(config.annotation_separator)))
177
            else:
178 1
                judgments[col] = judgments[col].apply(lambda x: create_ordered_counter( \
179
                                 OrderedCounter(x.split(config.annotation_separator)), \
180
                                 config.annotation_vector))
181 1
    return judgments
182
183
184 1
def add_missing_values(config, units):
185
    """ Adds missing vector values if is a closed task """
186 1
    for col in config.output.values():
187 1
        try:
188
            # openended = config.open_ended_task
189 1
            for idx in list(units.index):
190 1
                for relation in config.annotation_vector:
191 1
                    if relation not in units[col][idx]:
192 1
                        units[col][idx].update({relation : 0})
193 1
            return units
194
        except AttributeError:
195
            continue
196
197 1
def process_file(filename, config):
198
    """ Processes input files with the given configuration """
199
200 1
    judgments = pd.read_csv(filename)#, encoding=result['encoding'])
201
202 1
    platform = get_platform(judgments)
203
204 1
    if platform is False:
205 1
        logging.info("Custom crowdsourcing platform!")
206 1
        no_of_columns = len(config.customPlatformColumns)
207 1
        if no_of_columns != 5:
208
            logging.warning("The following column names are required: judgment id, "
209
                            "unit id, worker id, start time, submit time")
210
            raise ValueError('No custom platform configuration was provided')
211
        else:
212
213 1
            platform = {
214
                #'id'       : 'custom',
215
                config.customPlatformColumns[0] : 'judgment',
216
                config.customPlatformColumns[1] : 'unit',
217
                config.customPlatformColumns[2] : 'worker',
218
                config.customPlatformColumns[3] : 'started',
219
                config.customPlatformColumns[4] : 'submitted'
220
            }
221
222
223
    # we must establish which fields were part of the input data and which are output judgments
224
    # if there is a config, check if there is a definition of which fields to use
225
    #config = []
226
    # else use the default and select them automatically
227 1
    config = get_column_types(judgments, config)
228
229 1
    judgments = remove_empty_rows(config, judgments)
230
    # allow customization of the judgments
231 1
    judgments = config.processJudgments(judgments)
232
233
    # update the config after the preprocessing of judgments
234 1
    config = get_column_types(judgments, config)
235
236 1
    all_columns = dict(list(config.input.items()) + list(config.output.items()) \
237
                       + list(platform.items()))
238
    # allColumns = dict(config.input.items() | config.output.items() | platform.items())
239 1
    judgments = judgments.rename(columns=all_columns)
240
241
    # remove columns we don't care about
242 1
    judgments = judgments[list(all_columns.values())]
243
244 1
    judgments['job'] = filename.split('.csv')[0]
245
246
    # make output values safe keys
247 1
    judgments = make_output_cols_safe_keys(config, judgments)
248
249 1
    judgments['started'] = judgments['started'].apply(lambda x: pd.to_datetime(str(x)))
250 1
    judgments['submitted'] = judgments['submitted'].apply(lambda x: pd.to_datetime(str(x)))
251 1
    judgments['duration'] = judgments.apply(lambda row: (row['submitted'] - row['started']).seconds,
252
                                            axis=1)
253
254
    # remove units with just 1 judgment
255 1
    judgments = remove_single_judgment_units(judgments)
256
257
    #
258
    # aggregate units
259
    #
260 1
    units = Unit.aggregate(judgments, config)
261
262 1
    for col in config.output.values():
263 1
        judgments[col+'.count'] = judgments[col].apply(lambda x: sum(x.values()))
264 1
        judgments[col+'.unique'] = judgments[col].apply(lambda x: len(x))
265
266
267
    #
268
    # aggregate workers
269
    #
270 1
    workers = Worker.aggregate(judgments, config)
271
272
273
    #
274
    # aggregate annotations
275
    # i.e. output columns
276
    #
277 1
    annotations = pd.DataFrame()
278 1
    for col in config.output.values():
279 1
        res = pd.DataFrame(judgments[col].apply(lambda x: \
280
              pd.Series(list(x.keys())).value_counts()).sum(), columns=[col])
281 1
        annotations = pd.concat([annotations, res], axis=0)
282
283
    #
284
    # aggregate job
285
    #
286 1
    job = Job.aggregate(units, judgments, config)
287
288
    # Clean up judgments
289
    # remove input columns from judgments
290 1
    output_cols = [col for col in judgments.columns.values \
291
                    if col.startswith('output') or col.startswith('metric')]
292 1
    judgments = judgments[output_cols + list(platform.values()) + ['duration', 'job']]
293
294
    # set judgment id as index
295 1
    judgments.set_index('judgment', inplace=True)
296
297
    # add missing vector values if closed task
298 1
    units = add_missing_values(config, units)
299
300 1
    return {
301
        'jobs' : job,
302
        'units' : units,
303
        'workers' : workers,
304
        'judgments' : judgments,
305
        'annotations' : annotations,
306
        }, config
307
308
309 1
def get_platform(dframe):
310
    """ Get the crowdsourcing platform this file originates to """
311
312 1
    if dframe.columns.values[0] == '_unit_id':
313
        # CrowdFlower
314 1
        return {
315
            #'_platform'        : 'cf',
316
            '_id'           : 'judgment',
317
            '_unit_id'      : 'unit',
318
            '_worker_id'    : 'worker',
319
            '_started_at'   : 'started',
320
            '_created_at'   : 'submitted'
321
        }
322 1
    elif dframe.columns.values[0] == 'HITId':
323
        # Mturk
324
        return {
325
            #'id'       : 'amt',
326
            'AssignmentId'  : 'judgment',
327
            'HITId'         : 'unit',
328
            'WorkerId'      : 'worker',
329
            'AcceptTime'    : 'started',
330
            'SubmitTime'    : 'submitted'
331
        }
332 1
    return False
333
334 1
def configure_amt_columns(dframe, config):
335
    """ Configures AMT input and output columns. """
336
    config.input = {}
337
    config.output = {}
338
339
    if config.inputColumns:
340
        config.input = {c: 'input.'+c.replace('Input.', '') \
341
                        for c in dframe.columns.values if c in config.inputColumns}
342
    else:
343
        config.input = {c: 'input.'+c.replace('Input.', '') \
344
                        for c in dframe.columns.values if c.startswith('Input.')}
345
346
    # if config is specified, use those columns
347
    if config.outputColumns:
348
        config.output = {c: 'output.'+c.replace('Answer.', '') \
349
                         for c in dframe.columns.values if c in config.outputColumns}
350
    else:
351
        config.output = {c: 'output.'+c.replace('Answer.', '') \
352
                         for c in dframe.columns.values if c.startswith('Answer.')}
353
    return config.input, config.output
354
355 1
def configure_platform_columns(dframe, config):
356
    """ Configures FigureEight and custom platforms input and output columns. """
357 1
    config.input = {}
358 1
    config.output = {}
359
360 1
    if config.inputColumns:
361 1
        config.input = {c: 'input.'+c for c in dframe.columns.values \
362
                        if c in config.inputColumns}
363 1
    if config.outputColumns:
364 1
        config.output = {c: 'output.'+c for c in dframe.columns.values \
365
                         if c in config.outputColumns}
366 1
    return config.input, config.output
367
368 1
def configure_with_missing_columns(dframe, config):
369
    """ Identifies the type of the column based on naming """
370
    units = dframe.groupby('_unit_id')
371
    columns = [c for c in dframe.columns.values if c != 'clustering' and not c.startswith('_') \
372
                   and not c.startswith('e_') and not c.endswith('_gold') \
373
                   and not c.endswith('_reason') and not c.endswith('browser')]
374
    for colname in columns:
375
        try:
376
            for _, unit in units:
377
                unique = unit[colname].nunique()
378
                if unique != 1 and unique != 0:
379
                    raise Found
380
            if not config.inputColumns:
381
                config.input[colname] = 'input.'+colname
382
383
        except Found:
384
            if not config.outputColumns:
385
                config.output[colname] = 'output.'+colname
386
387
    return config
388
389 1
def get_column_types(dframe, config):
390
    """ return input and output columns """
391
    # returns a list of columns that contain are input content
392 1
    config.input = {}
393 1
    config.output = {}
394
395
    # get a dict of the columns with input content and the columns with output judgments
396
    # each entry matches [original column name]:[safestring column name]
397 1
    if dframe.columns.values[0] == 'HITId':
398
        # Mturk
399
        # if config is specified, use those columns
400
        config.input, config.output = configure_amt_columns(dframe, config)
401
402
        return config
403
404 1
    elif dframe.columns.values[0] == '_unit_id':
405
406
        # if a config is specified, use those columns
407 1
        config.input, config.output = configure_platform_columns(dframe, config)
408
        # if there is a config for both input and output columns, we can return those
409 1
        if config.inputColumns and config.outputColumns:
410 1
            return config
411
412
        # try to identify the input and output columns
413
        # this is the case if all the values in the column are identical
414
        # this is not failsafe but should give decent results without settings
415
        # it is best to make a settings.py file for a collection
416
417
        return configure_with_missing_columns(dframe, config)
418
419
    else:
420
        # unknown platform type
421
422
        # if a config is specified, use those columns
423 1
        config.input, config.output = configure_platform_columns(dframe, config)
424
        # if there is a config for both input and output columns, we can return those
425 1
        if config.inputColumns and config.outputColumns:
426
            return config
427