GitHub Access Token became invalid

It seems like the GitHub access token used for retrieving details about this repository from GitHub became invalid. This might prevent certain types of inspections from being run (in particular, everything related to pull requests).
Please ask an admin of your repository to re-new the access token on this website.
Completed
Push — master ( 59002d...d6d549 )
by Oana
17:31
created

crowdtruth.load.getPlatform()   A

Complexity

Conditions 3

Size

Total Lines 26
Code Lines 16

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 3
CRAP Score 4.125

Importance

Changes 0
Metric Value
eloc 16
dl 0
loc 26
ccs 3
cts 6
cp 0.5
rs 9.6
c 0
b 0
f 0
cc 3
nop 1
crap 4.125
1
#pylint: disable=W0223
2 1
import os
3
4 1
import logging
5 1
import datetime
6
7 1
from collections import Counter, OrderedDict
8
9 1
import pandas as pd
10
11 1
from crowdtruth.models.worker import Worker
12 1
from crowdtruth.models.unit import Unit
13 1
from crowdtruth.models.job import Job
14 1
from crowdtruth.configuration import DefaultConfig
15
16
17
18
19
# create an ordered counter so that we can maintain
20
# the position of tags in the order they were annotated
21 1
class OrderedCounter(Counter, OrderedDict):
22
    """ ordered counter """
23 1
    pass
24
25 1
def create_ordered_counter(ordered_counter, annotation_vector):
26
    """ ordered counter """
27 1
    for relation in annotation_vector:
28 1
        if relation not in ordered_counter:
29 1
            ordered_counter.update({relation: 0})
30 1
    return ordered_counter
31
32
33 1
class Found(Exception):
34
    """ Exception """
35 1
    pass
36
37 1
def validate_timestamp_field(date_string, date_format):
38
    """ function to validate time columns in input files """
39
40
    try:
41
        date_obj = datetime.datetime.strptime(date_string, date_format)
42
        print(date_obj)
43
    except ValueError:
44
        raise ValueError('Incorrect date format')
45
46 1
def get_file_list(directory):
47
    """ return list of documents in folder """
48
    filelist = []
49
50
    # go through all files in this folder
51
    for file in os.listdir(directory):
52
        # if it is a folder scan it
53
        if os.path.isdir(directory+'/'+file):
54
            sublist = get_file_list(directory+'/'+file)
55
            sublist_length = len(sublist)
56
            if sublist_length:
57
                filelist.append(sublist)
58
59
        # if it is a csv file open it
60
        elif file.endswith('.csv') and file != 'groundtruth.csv':
61
            filelist.append(file)
62
    return filelist
63
64 1
def load(**kwargs):
65
    """ Load judgment files """
66
    # placeholder for aggregated results
67 1
    results = {
68
        'jobs' : [],
69
        'units' : [],
70
        'workers' : [],
71
        'judgments' : [],
72
        'annotations' : []
73
        }
74
75
76 1
    if 'config' not in kwargs:
77
        config = DefaultConfig()
78
    else:
79 1
        logging.info('Config loaded')
80 1
        config = kwargs['config']
81
82
    # check if files is a single file or folder
83 1
    if 'file' in kwargs and kwargs['file'].endswith('.csv'):
84 1
        files = [kwargs['file']]
85
    elif 'directory' in kwargs:
86
        directory = kwargs['directory']
87
        files = get_file_list(directory)
88
        logging.info('Found ' + str(len(files)) + ' files')
89
    else:
90
        raise ValueError('No input was provided')
91
92
93 1
    for file in files:
94 1
        if 'directory' in locals():
95
            logging.info("Processing " + file)
96
            file = directory + "/" + file
97 1
        res, config = process_file(file, config)
98 1
        for value in res:
99 1
            results[value].append(res[value])
100
101 1
    for value in results:
102 1
        results[value] = pd.concat(results[value])
103
104
105
    # workers and annotations can appear across jobs, so we have to aggregate those extra
106 1
    results['workers'] = results['workers'].groupby(results['workers'].index).agg({
107
        'unit' : 'sum',
108
        'judgment' : 'sum',
109
        'job' : 'count',
110
        'duration' : 'mean'
111
        })
112
113
    # aggregate annotations
114 1
    results['annotations'] = results['annotations'].groupby(results['annotations'].index).sum()
115
116 1
    return results, config
117
118 1
def remove_empty_rows(config, judgments):
119
    """ remove rows where the worker did not give an answer (AMT issue) """
120 1
    empty_rows = set()
121 1
    for col in config.outputColumns:
122 1
        empty_rows = empty_rows.union(judgments[pd.isnull(judgments[col]) == True].index)
123 1
    for col in config.outputColumns:
124 1
        judgments = judgments[pd.isnull(judgments[col]) == False]
125 1
    judgments = judgments.reset_index(drop=True)
126 1
    count_empty_rows = len(empty_rows)
127 1
    if count_empty_rows > 0:
128
        if count_empty_rows == 1:
129
            logging.warning(str(count_empty_rows) + " row with incomplete information in "
130
                            "output columns was removed.")
131
        else:
132
            logging.warning(str(count_empty_rows) + " rows with incomplete information in "
133
                            "output columns were removed.")
134 1
    return judgments
135
136 1
def remove_single_judgment_units(judgments):
137
    """ remove units with just 1 judgment """
138 1
    units_1work = judgments.groupby('unit').filter(lambda x: len(x) == 1)["unit"]
139 1
    judgments = judgments[~judgments['unit'].isin(units_1work)]
140 1
    judgments = judgments.reset_index(drop=True)
141 1
    no_units_1work = len(units_1work)
142 1
    if no_units_1work > 0:
143
        if no_units_1work == 1:
144
            logging.warning(str(no_units_1work) + " Media Unit that was annotated by only"
145
                            " 1 Worker was omitted, since agreement cannot be calculated.")
146
        else:
147
            logging.warning(str(no_units_1work) + " Media Units that were annotated by only"
148
                            " 1 Worker were omitted, since agreement cannot be calculated.")
149 1
    return judgments
150
151 1
def make_output_cols_safe_keys(config, judgments):
152
    """ make output values safe keys """
153 1
    for col in config.output.values():
154 1
        if isinstance(judgments[col].iloc[0], dict):
155
            logging.info("Values stored as dictionary")
156
            if config.open_ended_task:
157
                judgments[col] = judgments[col].apply(lambda x: OrderedCounter(x))
158
            else:
159
                judgments[col] = judgments[col].apply(lambda x: create_ordered_counter( \
160
                                 OrderedCounter(x), config.annotation_vector))
161
        else:
162 1
            logging.info("Values not stored as dictionary")
163 1
            if config.open_ended_task:
164 1
                judgments[col] = judgments[col].apply(lambda x: OrderedCounter( \
165
                                                      x.split(config.annotation_separator)))
166
            else:
167 1
                judgments[col] = judgments[col].apply(lambda x: create_ordered_counter( \
168
                                 OrderedCounter(x.split(config.annotation_separator)), \
169
                                 config.annotation_vector))
170 1
    return judgments
171
172 1
def process_file(filename, config):
173
    """ process input files with the given configuration"""
174
175 1
    judgments = pd.read_csv(filename)#, encoding=result['encoding'])
176
177 1
    platform = get_platform(judgments)
178
179 1
    if platform is False:
180
        logging.info("Custom crowdsourcing platform!")
181
        no_of_columns = len(config.customPlatformColumns)
182
        if no_of_columns != 5:
183
            logging.warning("The following column names are required: judgment id, "
184
                            "unit id, worker id, start time, submit time")
185
            raise ValueError('No custom platform configuration was provided')
186
        else:
187
188
            platform = {
189
                #'id'       : 'custom',
190
                config.customPlatformColumns[0] : 'judgment',
191
                config.customPlatformColumns[1] : 'unit',
192
                config.customPlatformColumns[2] : 'worker',
193
                config.customPlatformColumns[3] : 'started',
194
                config.customPlatformColumns[4] : 'submitted'
195
            }
196
197
198
    # we must establish which fields were part of the input data and which are output judgments
199
    # if there is a config, check if there is a definition of which fields to use
200
    #config = []
201
    # else use the default and select them automatically
202 1
    config = get_column_types(judgments, config)
203
204 1
    judgments = remove_empty_rows(config, judgments)
205
    # allow customization of the judgments
206 1
    judgments = config.processJudgments(judgments)
207
208
    # update the config after the preprocessing of judgments
209 1
    config = get_column_types(judgments, config)
210
211 1
    all_columns = dict(list(config.input.items()) + list(config.output.items()) \
212
                       + list(platform.items()))
213
    # allColumns = dict(config.input.items() | config.output.items() | platform.items())
214 1
    judgments = judgments.rename(columns=all_columns)
215
216
    # remove columns we don't care about
217 1
    judgments = judgments[list(all_columns.values())]
218
219 1
    judgments['job'] = filename.split('.csv')[0]
220
221
    # make output values safe keys
222 1
    judgments = make_output_cols_safe_keys(config, judgments)
223
224 1
    judgments['started'] = judgments['started'].apply(lambda x: pd.to_datetime(str(x)))
225 1
    judgments['submitted'] = judgments['submitted'].apply(lambda x: pd.to_datetime(str(x)))
226 1
    judgments['duration'] = judgments.apply(lambda row: (row['submitted'] - row['started']).seconds,
227
                                            axis=1)
228
229
    # remove units with just 1 judgment
230 1
    judgments = remove_single_judgment_units(judgments)
231
232
    #
233
    # aggregate units
234
    #
235 1
    units = Unit.aggregate(judgments, config)
236
237 1
    for col in config.output.values():
238 1
        judgments[col+'.count'] = judgments[col].apply(lambda x: sum(x.values()))
239 1
        judgments[col+'.unique'] = judgments[col].apply(lambda x: len(x))
240
241
242
    #
243
    # aggregate workers
244
    #
245 1
    workers = Worker.aggregate(judgments, config)
246
247
248
    #
249
    # aggregate annotations
250
    # i.e. output columns
251
    #
252 1
    annotations = pd.DataFrame()
253 1
    for col in config.output.values():
254 1
        res = pd.DataFrame(judgments[col].apply(lambda x: \
255
              pd.Series(list(x.keys())).value_counts()).sum(), columns=[col])
256 1
        annotations = pd.concat([annotations, res], axis=0)
257
258
    #
259
    # aggregate job
260
    #
261 1
    job = Job.aggregate(units, judgments, config)
262
263
    # Clean up judgments
264
    # remove input columns from judgments
265 1
    output_cols = [col for col in judgments.columns.values \
266
                    if col.startswith('output') or col.startswith('metric')]
267 1
    judgments = judgments[output_cols + list(platform.values()) + ['duration', 'job']]
268
269
    # set judgment id as index
270 1
    judgments.set_index('judgment', inplace=True)
271
272
    # add missing vector values if closed task
273 1
    for col in config.output.values():
274 1
        try:
275
            # openended = config.open_ended_task
276 1
            for idx in list(units.index):
277 1
                for relation in config.annotation_vector:
278 1
                    if relation not in units[col][idx]:
279 1
                        units[col][idx].update({relation : 0})
280
        except AttributeError:
281
            continue
282
283 1
    return {
284
        'jobs' : job,
285
        'units' : units,
286
        'workers' : workers,
287
        'judgments' : judgments,
288
        'annotations' : annotations,
289
        }, config
290
291
292 1
def get_platform(dframe):
293
    """ Get the crowdsourcing platform this file originates to """
294
295 1
    if dframe.columns.values[0] == '_unit_id':
296
        # CrowdFlower
297 1
        return {
298
            #'_platform'        : 'cf',
299
            '_id'           : 'judgment',
300
            '_unit_id'      : 'unit',
301
            '_worker_id'    : 'worker',
302
            '_started_at'   : 'started',
303
            '_created_at'   : 'submitted'
304
        }
305
    elif dframe.columns.values[0] == 'HITId':
306
        # Mturk
307
        return {
308
            #'id'       : 'amt',
309
            'AssignmentId'  : 'judgment',
310
            'HITId'         : 'unit',
311
            'WorkerId'      : 'worker',
312
            'AcceptTime'    : 'started',
313
            'SubmitTime'    : 'submitted'
314
        }
315
    return False
316
317
318 1
def get_column_types(dframe, config):
319
    """ return input and output columns """
320
    # returns a list of columns that contain are input content
321 1
    config.input = {}
322 1
    config.output = {}
323
324
    # get a dict of the columns with input content and the columns with output judgments
325
    # each entry matches [original column name]:[safestring column name]
326 1
    if dframe.columns.values[0] == 'HITId':
327
        # Mturk
328
        # if config is specified, use those columns
329
        if config.inputColumns:
330
            config.input = {c: 'input.'+c.replace('Input.', '')
331
                            for c in dframe.columns.values if c in config.inputColumns}
332
        else:
333
            config.input = {c: 'input.'+c.replace('Input.', '')
334
                            for c in dframe.columns.values if c.startswith('Input.')}
335
336
        # if config is specified, use those columns
337
        if config.outputColumns:
338
            config.output = {c: 'output.'+c.replace('Answer.', '')
339
                             for c in dframe.columns.values if c in config.outputColumns}
340
        else:
341
            config.output = {c: 'output.'+c.replace('Answer.', '')
342
                             for c in dframe.columns.values if c.startswith('Answer.')}
343
        return config
344
345 1
    elif dframe.columns.values[0] == '_unit_id':
346
347
        # if a config is specified, use those columns
348 1
        if config.inputColumns:
349 1
            config.input = {c: 'input.'+c for c in dframe.columns.values \
350
                            if c in config.inputColumns}
351 1
        if config.outputColumns:
352 1
            config.output = {c: 'output.'+c for c in dframe.columns.values \
353
                             if c in config.outputColumns}
354
        # if there is a config for both input and output columns, we can return those
355 1
        if config.inputColumns and config.outputColumns:
356 1
            return config
357
358
        # try to identify the input and output columns
359
        # this is the case if all the values in the column are identical
360
        # this is not failsafe but should give decent results without settings
361
        # it is best to make a settings.py file for a collection
362
363
        units = dframe.groupby('_unit_id')
364
        columns = [c for c in dframe.columns.values if c != 'clustering' and not c.startswith('_')
365
                   and not c.startswith('e_') and not c.endswith('_gold')
366
                   and not c.endswith('_reason') and not c.endswith('browser')]
367
        for colname in columns:
368
            try:
369
                for _, unit in units:
370
                    unique = unit[colname].nunique()
371
                    if unique != 1 and unique != 0:
372
                        raise Found
373
                if not config.inputColumns:
374
                    config.input[colname] = 'input.'+colname
375
376
            except Found:
377
                if not config.outputColumns:
378
                    config.output[colname] = 'output.'+colname
379
380
        return config
381
    else:
382
        # unknown platform type
383
384
        # if a config is specified, use those columns
385
        if config.inputColumns:
386
            config.input = {c: 'input.'+c for c in dframe.columns.values \
387
                            if c in config.inputColumns}
388
        if config.outputColumns:
389
            config.output = {c: 'output.'+c for c in dframe.columns.values \
390
                             if c in config.outputColumns}
391
        # if there is a config for both input and output columns, we can return those
392
        if config.inputColumns and config.outputColumns:
393
            return config
394