GitHub Access Token became invalid

It seems like the GitHub access token used for retrieving details about this repository from GitHub became invalid. This might prevent certain types of inspections from being run (in particular, everything related to pull requests).
Please ask an admin of your repository to re-new the access token on this website.
Completed
Push — master ( 4984d7...2915fd )
by Oana
16:42
created

crowdtruth.load.get_platform()   A

Complexity

Conditions 3

Size

Total Lines 24
Code Lines 16

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 3
CRAP Score 4.125

Importance

Changes 0
Metric Value
eloc 16
dl 0
loc 24
rs 9.6
c 0
b 0
f 0
ccs 3
cts 6
cp 0.5
cc 3
nop 1
crap 4.125
1 1
import os
2 1
import logging
3
4 1
import re
5 1
import string
6 1
import datetime
7
8 1
from collections import Counter
9 1
from collections import OrderedDict
10
11 1
import chardet
12
13 1
import pandas as pd
14 1
import numpy as np
15
16
17 1
from crowdtruth.models.worker import Worker
18 1
from crowdtruth.models.unit import Unit
19 1
from crowdtruth.models.job import Job
20 1
from crowdtruth.configuration import DefaultConfig
21
22
23
24
25
# create an ordered counter so that we can maintain the position
26
# of tags in the order they were annotated
27 1
class OrderedCounter(Counter, OrderedDict):
28 1
    pass
29
30 1
def create_ordered_counter(ordered_counter, annotation_vector):
31 1
    for relation in annotation_vector:
32 1
        if relation not in ordered_counter:
33 1
            ordered_counter.update({relation: 0})
34 1
    return ordered_counter
35
36
37 1
class Found(Exception):
38 1
    pass
39
40 1
def validate_timestamp_field(date_string, date_format):
41
    try:
42
        date_obj = datetime.datetime.strptime(date_string, date_format)
43
        print(date_obj)
44
    except ValueError:
45
        raise ValueError('Incorrect date format')
46
47 1
def get_file_list(directory):
48
    filelist = []
49
50
    # go through all files in this folder
51
    for file in os.listdir(directory):
52
        # if it is a folder scan it
53
        if os.path.isdir(directory+'/'+file):
54
            sublist = get_file_list(directory+'/'+file)
55
            len_sublist = len(sublist)
56
            if len_sublist:
57
                filelist.append(sublist)
58
59
        # if it is a csv file open it
60
        elif file.endswith('.csv') and file != 'groundtruth.csv':
61
            filelist.append(file)
62
    return filelist
63
64 1
def load(**kwargs):
65
66
    # placeholder for aggregated results
67 1
    results = {
68
        'jobs' : [],
69
        'units' : [],
70
        'workers' : [],
71
        'judgments' : [],
72
        'annotations' : []
73
        }
74
75
76 1
    if 'config' not in kwargs:
77
        config = DefaultConfig()
78
    else:
79 1
        logging.info('Config loaded')
80 1
        config = kwargs['config']
81
82
    # check if files is a single file or folder
83 1
    if 'file' in kwargs and kwargs['file'].endswith('.csv'):
84 1
        files = [kwargs['file']]
85
    elif 'directory' in kwargs:
86
        directory = kwargs['directory']
87
        files = get_file_list(directory)
88
        logging.info('Found ' + str(len(files)) + ' files')
89
    else:
90
        raise ValueError('No input was provided')
91
92
93 1
    for file in files:
94 1
        if 'directory' in locals():
95
            logging.info("Processing " + file)
96
            file = directory + "/" + file
97 1
        res, config = process_file(file, config)
98 1
        for resource in res:
99 1
            results[resource].append(res[resource])
100
101 1
    for resource in results:
102 1
        results[resource] = pd.concat(results[resource])
103
104
105
    # workers and annotations can appear across jobs, so we have to aggregate those extra
106 1
    results['workers'] = results['workers'].groupby(results['workers'].index).agg({
107
        'unit' : 'sum',
108
        'judgment' : 'sum',
109
        'job' : 'count',
110
        'duration' : 'mean'
111
        })
112
113
    # aggregate annotations
114 1
    results['annotations'] = results['annotations'].groupby(results['annotations'].index).sum()
115
116 1
    return results, config
117
118
119 1
def process_file(filename, config):
120
121 1
    job = filename.split('.csv')[0]
122
123 1
    judgments = pd.read_csv(filename)#, encoding=result['encoding'])
124
125 1
    platform = get_platform(judgments)
126
127 1
    if platform == False:
128
        logging.info("Custom crowdsourcing platform!")
129
        no_of_columns = len(config.customPlatformColumns)
130
        if no_of_columns != 5:
131
            logging.warning("The following column names are required: judgment id, "
132
                            "unit id, worker id, start time, submit time")
133
            raise ValueError('No custom platform configuration was provided')
134
        else:
135
136
            platform = {
137
                #'id'       : 'custom',
138
                config.customPlatformColumns[0] : 'judgment',
139
                config.customPlatformColumns[1] : 'unit',
140
                config.customPlatformColumns[2] : 'worker',
141
                config.customPlatformColumns[3] : 'started',
142
                config.customPlatformColumns[4] : 'submitted'
143
            }
144
145
146
    # we must establish which fields were part of the input data and which are output judgments
147
    # if there is a config, check if there is a definition of which fields to use
148
    #config = []
149
    # else use the default and select them automatically
150 1
    config = get_column_types(judgments, config)
151
152
    # remove rows where the worker did not give an answer (AMT issue)
153 1
    empty_rows = set()
154 1
    for col in config.outputColumns:
155 1
        empty_rows = empty_rows.union(judgments[pd.isnull(judgments[col]) == True].index)
156 1
    for col in config.outputColumns:
157 1
        judgments = judgments[pd.isnull(judgments[col]) == False]
158 1
    judgments = judgments.reset_index(drop=True)
159 1
    count_empty_rows = len(empty_rows)
160 1
    if count_empty_rows > 0:
161
        if count_empty_rows == 1:
162
            logging.warning(str(count_empty_rows) + " row with incomplete information in "
163
                            "output columns was removed.")
164
        else:
165
            logging.warning(str(count_empty_rows) + " rows with incomplete information in "
166
                            "output columns were removed.")
167
168
    # allow customization of the judgments
169 1
    judgments = config.processJudgments(judgments)
170
171
    # update the config after the preprocessing of judgments
172 1
    config = get_column_types(judgments, config)
173
174 1
    all_columns = dict(list(config.input.items()) + list(config.output.items()) \
175
                       + list(platform.items()))
176
    # allColumns = dict(config.input.items() | config.output.items() | platform.items())
177 1
    judgments = judgments.rename(columns=all_columns)
178
179
    # remove columns we don't care about
180 1
    judgments = judgments[list(all_columns.values())]
181
182 1
    judgments['job'] = job
183
184
    # make output values safe keys
185 1
    for col in config.output.values():
186 1
        if type(judgments[col].iloc[0]) is dict:
187
            logging.info("Values stored as dictionary")
188
            if config.open_ended_task:
189
                judgments[col] = judgments[col].apply(lambda x: OrderedCounter(x))
190
            else:
191
                judgments[col] = judgments[col].apply(lambda x: create_ordered_counter( \
192
                                 OrderedCounter(x), config.annotation_vector))
193
        else:
194 1
            logging.info("Values not stored as dictionary")
195 1
            if config.open_ended_task:
196 1
                judgments[col] = judgments[col].apply(lambda x: OrderedCounter( \
197
                                                      x.split(config.annotation_separator)))
198
            else:
199 1
                judgments[col] = judgments[col].apply(lambda x: create_ordered_counter( \
200
                                 OrderedCounter(x.split(config.annotation_separator)), \
201
                                 config.annotation_vector))
202
203 1
    judgments['started'] = judgments['started'].apply(lambda x: pd.to_datetime(str(x)))
204 1
    judgments['submitted'] = judgments['submitted'].apply(lambda x: pd.to_datetime(str(x)))
205 1
    judgments['duration'] = judgments.apply(lambda row: (row['submitted'] - row['started']).seconds,
206
                                            axis=1)
207
208
    # remove units with just 1 judgment
209 1
    units_1work = judgments.groupby('unit').filter(lambda x: len(x) == 1)["unit"]
210 1
    judgments = judgments[~judgments['unit'].isin(units_1work)]
211 1
    judgments = judgments.reset_index(drop=True)
212 1
    no_units_1work = len(units_1work)
213 1
    if no_units_1work > 0:
214
        if no_units_1work == 1:
215
            logging.warning(str(no_units_1work) + " Media Unit that was annotated by only"
216
                            " 1 Worker was omitted, since agreement cannot be calculated.")
217
        else:
218
            logging.warning(str(no_units_1work) + " Media Units that were annotated by only"
219
                            " 1 Worker were omitted, since agreement cannot be calculated.")
220
221
    #
222
    # aggregate units
223
    #
224 1
    units = Unit.aggregate(judgments, config)
225
226 1
    for col in config.output.values():
227 1
        judgments[col+'.count'] = judgments[col].apply(lambda x: sum(x.values()))
228 1
        judgments[col+'.unique'] = judgments[col].apply(lambda x: len(x))
229
230
231
    #
232
    # aggregate workers
233
    #
234 1
    workers = Worker.aggregate(judgments, config)
235
236
237
    #
238
    # aggregate annotations
239
    # i.e. output columns
240
    #
241 1
    annotations = pd.DataFrame()
242 1
    for col in config.output.values():
243 1
        res = pd.DataFrame(judgments[col].apply(lambda x: \
244
              pd.Series(list(x.keys())).value_counts()).sum(), columns=[col])
245 1
        annotations = pd.concat([annotations, res], axis=0)
246
247
    #
248
    # aggregate job
249
    #
250 1
    job = Job.aggregate(units, judgments, config)
251
252
    # Clean up judgments
253
    # remove input columns from judgments
254 1
    output_cols = [col for col in judgments.columns.values \
255
                    if col.startswith('output') or col.startswith('metric')]
256 1
    judgments = judgments[output_cols + list(platform.values()) + ['duration', 'job']]
257
258
    # set judgment id as index
259 1
    judgments.set_index('judgment', inplace=True)
260
261
    # add missing vector values if closed task
262 1
    for col in config.output.values():
263 1
        try:
264
            # openended = config.open_ended_task
265 1
            for idx in list(units.index):
266 1
                for relation in config.annotation_vector:
267 1
                    if relation not in units[col][idx]:
268 1
                        units[col][idx].update({relation : 0})
269
        except AttributeError:
270
            continue
271
272 1
    return {
273
        'jobs' : job,
274
        'units' : units,
275
        'workers' : workers,
276
        'judgments' : judgments,
277
        'annotations' : annotations,
278
        }, config
279
280
281 1
def get_platform(dframe):
282
    # Get the crowdsourcing platform this file originates to
283
284 1
    if dframe.columns.values[0] == '_unit_id':
285
        # CrowdFlower
286 1
        return {
287
            #'_platform'        : 'cf',
288
            '_id'           : 'judgment',
289
            '_unit_id'      : 'unit',
290
            '_worker_id'    : 'worker',
291
            '_started_at'   : 'started',
292
            '_created_at'   : 'submitted'
293
        }
294
    elif dframe.columns.values[0] == 'HITId':
295
        # Mturk
296
        return {
297
            #'id'       : 'amt',
298
            'AssignmentId'  : 'judgment',
299
            'HITId'         : 'unit',
300
            'WorkerId'      : 'worker',
301
            'AcceptTime'    : 'started',
302
            'SubmitTime'    : 'submitted'
303
        }
304
    return False
305
306
307 1
def get_column_types(dframe, config):
308
309
    # returns a list of columns that contain are input content
310 1
    config.input = {}
311 1
    config.output = {}
312
313
    # get a dict of the columns with input content and the columns with output judgments
314
    # each entry matches [original column name]:[safestring column name]
315 1
    if dframe.columns.values[0] == 'HITId':
316
        # Mturk
317
        # if config is specified, use those columns
318
        if config.inputColumns:
319
            config.input = {c: 'input.'+c.replace('Input.', '')
320
                            for c in dframe.columns.values if c in config.inputColumns}
321
        else:
322
            config.input = {c: 'input.'+c.replace('Input.', '')
323
                            for c in dframe.columns.values if c.startswith('Input.')}
324
325
        # if config is specified, use those columns
326
        if config.outputColumns:
327
            config.output = {c: 'output.'+c.replace('Answer.', '')
328
                             for c in dframe.columns.values if c in config.outputColumns}
329
        else:
330
            config.output = {c: 'output.'+c.replace('Answer.', '')
331
                             for c in dframe.columns.values if c.startswith('Answer.')}
332
        return config
333
334 1
    elif dframe.columns.values[0] == '_unit_id':
335
336
        # if a config is specified, use those columns
337 1
        if config.inputColumns:
338 1
            config.input = {c: 'input.'+c for c in dframe.columns.values \
339
                            if c in config.inputColumns}
340 1
        if config.outputColumns:
341 1
            config.output = {c: 'output.'+c for c in dframe.columns.values \
342
                             if c in config.outputColumns}
343
        # if there is a config for both input and output columns, we can return those
344 1
        if config.inputColumns and config.outputColumns:
345 1
            return config
346
347
        # try to identify the input and output columns
348
        # this is the case if all the values in the column are identical
349
        # this is not failsafe but should give decent results without settings
350
        # it is best to make a settings.py file for a collection
351
352
        units = dframe.groupby('_unit_id')
353
        columns = [c for c in dframe.columns.values if c != 'clustering' and not c.startswith('_')
354
                   and not c.startswith('e_') and not c.endswith('_gold')
355
                   and not c.endswith('_reason') and not c.endswith('browser')]
356
        for colname in columns:
357
            try:
358
                for i, unit in units:
359
                    unique = unit[colname].nunique()
360
                    if unique != 1 and unique != 0:
361
                        raise Found
362
                if not config.inputColumns:
363
                    config.input[colname] = 'input.'+colname
364
365
            except Found:
366
                if not config.outputColumns:
367
                    config.output[colname] = 'output.'+colname
368
369
        return config
370
    else:
371
        # unknown platform type
372
373
        # if a config is specified, use those columns
374
        if config.inputColumns:
375
            config.input = {c: 'input.'+c for c in dframe.columns.values \
376
                            if c in config.inputColumns}
377
        if config.outputColumns:
378
            config.output = {c: 'output.'+c for c in dframe.columns.values \
379
                             if c in config.outputColumns}
380
        # if there is a config for both input and output columns, we can return those
381
        if config.inputColumns and config.outputColumns:
382
            return config
383