GitHub Access Token became invalid

It seems like the GitHub access token used for retrieving details about this repository from GitHub became invalid. This might prevent certain types of inspections from being run (in particular, everything related to pull requests).
Please ask an admin of your repository to re-new the access token on this website.
Completed
Push — master ( 288929...90f372 )
by Anca
29:14 queued 07:50
created

crowdtruth.load   C

Complexity

Total Complexity 57

Size/Duplication

Total Lines 304
Duplicated Lines 0 %

Test Coverage

Coverage 83.44%

Importance

Changes 0
Metric Value
wmc 57
eloc 180
dl 0
loc 304
ccs 126
cts 151
cp 0.8344
rs 5.04
c 0
b 0
f 0

11 Functions

Rating   Name   Duplication   Size   Complexity  
B add_missing_values() 0 12 6
A validate_timestamp_field() 0 8 2
A create_ordered_counter() 0 6 3
A load() 0 36 3
B list_files() 0 22 8
A remove_empty_rows() 0 17 5
A remove_single_judgment_units() 0 14 4
A aggregate_annotations() 0 10 5
B process_file() 0 100 6
C make_output_cols_safe_keys() 0 20 9
B get_file_list() 0 17 6

How to fix   Complexity   

Complexity

Complex classes like crowdtruth.load often do a lot of different things. To break such a class down, we need to identify a cohesive component within that class. A common approach to find such a component is to look for fields/methods that share the same prefixes, or suffixes.

Once you have determined the fields that belong together, you can apply the Extract Class refactoring. If the component makes sense as a sub-class, Extract Subclass is also a candidate, and is often faster.

1
#pylint: disable=W0223
2
3 1
"""
4
Module used to process and load the input files to be evaluated with the CrowdTruth metrics.
5
"""
6
7 1
import os
8
9 1
import logging
10 1
import datetime
11 1
import dateparser
12
13 1
from collections import Counter, OrderedDict
14
15 1
import pandas as pd
16
17 1
from crowdtruth.models.worker import Worker
18 1
from crowdtruth.models.unit import Unit
19 1
from crowdtruth.models.job import Job
20 1
from crowdtruth.configuration import DefaultConfig
21 1
from crowdtruth.crowd_platform import *
22
23
24
25
26
# create an ordered counter so that we can maintain
27
# the position of tags in the order they were annotated
28 1
class OrderedCounter(Counter, OrderedDict):
29
    """ Instantiates an ordered counter. """
30 1
    pass
31
32 1
def create_ordered_counter(ordered_counter, annotation_vector):
33
    """ Instantiates an ordered counter from a given annotation vector. """
34 1
    for relation in annotation_vector:
35 1
        if relation not in ordered_counter:
36 1
            ordered_counter.update({relation: 0})
37 1
    return ordered_counter
38
39 1
def validate_timestamp_field(date_string, date_format):
40
    """ Validates the time columns (started time and submitted time) in input files. """
41
42
    try:
43
        date_obj = datetime.datetime.strptime(date_string, date_format)
44
        print(date_obj)
45
    except ValueError:
46
        raise ValueError('Incorrect date format')
47
48 1
def get_file_list(directory):
49
    """ List the files in the directry given as argument. """
50 1
    filelist = []
51
52
    # go through all files in this folder
53 1
    for file in os.listdir(directory):
54
        # if it is a folder scan it
55 1
        if os.path.isdir(directory+'/'+file):
56
            sublist = get_file_list(directory+'/'+file)
57
            sublist_length = len(sublist)
58
            if sublist_length:
59
                filelist.append(sublist)
60
61
        # if it is a csv file open it
62 1
        if file.endswith('.csv') and file != 'groundtruth.csv':
63 1
            filelist.append(file)
64 1
    return filelist
65
66 1
def list_files(kwargs, results, config):
67
    """ Creates a list of files to be processed. """
68 1
    files = []
69 1
    directory = ""
70 1
    if 'file' in kwargs and kwargs['file'].endswith('.csv'):
71 1
        files = [kwargs['file']]
72 1
    elif 'directory' in kwargs:
73 1
        directory = kwargs['directory']
74 1
        files = get_file_list(directory)
75 1
        logging.info('Found ' + str(len(files)) + ' files')
76
    else:
77
        raise ValueError('No input was provided')
78
79 1
    for file in files:
80 1
        if 'directory' in locals() and directory != "":
81 1
            logging.info("Processing " + file)
82 1
            file = directory + "/" + file
83 1
        res, config = process_file(file, config)
84 1
        for value in res:
85 1
            results[value].append(res[value])
86
87 1
    return results
88
89 1
def load(**kwargs):
90
    """ Loads the input files. """
91
92
    # placeholder for aggregated results
93 1
    results = {
94
        'jobs' : [],
95
        'units' : [],
96
        'workers' : [],
97
        'judgments' : [],
98
        'annotations' : []
99
        }
100
101 1
    if 'config' not in kwargs:
102
        config = DefaultConfig()
103
    else:
104 1
        logging.info('Config loaded')
105 1
        config = kwargs['config']
106
107 1
    results = list_files(kwargs, results, config)
108
109 1
    for value in results:
110 1
        results[value] = pd.concat(results[value])
111
112
113
    # workers and annotations can appear across jobs, so we have to aggregate those extra
114 1
    results['workers'] = results['workers'].groupby(results['workers'].index).agg({
115
        'unit' : 'sum',
116
        'judgment' : 'sum',
117
        'job' : 'count',
118
        'duration' : 'mean'
119
        })
120
121
    # aggregate annotations
122 1
    results['annotations'] = results['annotations'].groupby(results['annotations'].index).sum()
123
124 1
    return results, config
125
126 1
def remove_empty_rows(config, judgments):
127
    """ remove rows where the worker did not give an answer (AMT issue) """
128 1
    empty_rows = set()
129 1
    for col in config.outputColumns:
130 1
        empty_rows = empty_rows.union(judgments[pd.isnull(judgments[col]) == True].index)
131 1
    for col in config.outputColumns:
132 1
        judgments = judgments[pd.isnull(judgments[col]) == False]
133 1
    judgments = judgments.reset_index(drop=True)
134 1
    count_empty_rows = len(empty_rows)
135 1
    if count_empty_rows > 0:
136
        if count_empty_rows == 1:
137
            logging.warning(str(count_empty_rows) + " row with incomplete information in "
138
                            "output columns was removed.")
139
        else:
140
            logging.warning(str(count_empty_rows) + " rows with incomplete information in "
141
                            "output columns were removed.")
142 1
    return judgments
143
144 1
def remove_single_judgment_units(judgments):
145
    """ remove units with just 1 judgment """
146 1
    units_1work = judgments.groupby('unit').filter(lambda x: len(x) == 1)["unit"]
147 1
    judgments = judgments[~judgments['unit'].isin(units_1work)]
148 1
    judgments = judgments.reset_index(drop=True)
149 1
    no_units_1work = len(units_1work)
150 1
    if no_units_1work > 0:
151
        if no_units_1work == 1:
152
            logging.warning(str(no_units_1work) + " Media Unit that was annotated by only"
153
                            " 1 Worker was omitted, since agreement cannot be calculated.")
154
        else:
155
            logging.warning(str(no_units_1work) + " Media Units that were annotated by only"
156
                            " 1 Worker were omitted, since agreement cannot be calculated.")
157 1
    return judgments
158
159 1
def make_output_cols_safe_keys(config, judgments):
160
    """ make output values safe keys """
161 1
    for col in config.output.values():
162 1
        if isinstance(judgments[col].iloc[0], dict):
163
            logging.info("Values stored as dictionary")
164
            if config.open_ended_task:
165
                judgments[col] = judgments[col].apply(lambda x: OrderedCounter(x))
166
            else:
167
                judgments[col] = judgments[col].apply(lambda x: create_ordered_counter( \
168
                                 OrderedCounter(x), config.annotation_vector))
169
        else:
170 1
            logging.info("Values not stored as dictionary")
171 1
            if config.open_ended_task:
172 1
                judgments[col] = judgments[col].apply(lambda x: OrderedCounter( \
173
                                                      x.split(config.annotation_separator)))
174
            else:
175 1
                judgments[col] = judgments[col].apply(lambda x: create_ordered_counter( \
176
                                 OrderedCounter(x.split(config.annotation_separator)), \
177
                                 config.annotation_vector))
178 1
    return judgments
179
180
181 1
def add_missing_values(config, units):
182
    """ Adds missing vector values if is a closed task """
183 1
    for col in config.output.values():
184 1
        try:
185
            # openended = config.open_ended_task
186 1
            for idx in list(units.index):
187 1
                for relation in config.annotation_vector:
188 1
                    if relation not in units[col][idx]:
189 1
                        units[col][idx].update({relation : 0})
190 1
            return units
191
        except AttributeError:
192
            continue
193 1
def aggregate_annotations(config, judgments):
194
    """ Aggregates annotations and adds judgments stats. """
195 1
    annotations = pd.DataFrame()
196 1
    for col in config.output.values():
197 1
        judgments[col+'.count'] = judgments[col].apply(lambda x: sum(x.values()))
198 1
        judgments[col+'.unique'] = judgments[col].apply(lambda x: len(x))
199 1
        res = pd.DataFrame(judgments[col].apply(lambda x: \
200
              pd.Series(list(x.keys())).value_counts()).sum(), columns=[col])
201 1
        annotations = pd.concat([annotations, res], axis=0)
202 1
    return annotations, judgments
203
204 1
def process_file(filename, config):
205
    """ Processes input files with the given configuration """
206
207 1
    judgments = pd.read_csv(filename)#, encoding=result['encoding'])
208
209 1
    platform = get_platform(judgments)
0 ignored issues
show
Comprehensibility Best Practice introduced by
The variable get_platform does not seem to be defined.
Loading history...
210
211 1
    if platform is False:
212 1
        logging.info("Custom crowdsourcing platform!")
213 1
        no_of_columns = len(config.customPlatformColumns)
214 1
        if no_of_columns != 5:
215
            logging.warning("The following column names are required: judgment id, "
216
                            "unit id, worker id, start time, submit time")
217
            raise ValueError('No custom platform configuration was provided')
218
        else:
219
220 1
            platform = {
221
                #'id'       : 'custom',
222
                config.customPlatformColumns[0] : 'judgment',
223
                config.customPlatformColumns[1] : 'unit',
224
                config.customPlatformColumns[2] : 'worker',
225
                config.customPlatformColumns[3] : 'started',
226
                config.customPlatformColumns[4] : 'submitted'
227
            }
228
229
230
    # we must establish which fields were part of the input data and which are output judgments
231
    # if there is a config, check if there is a definition of which fields to use
232
    #config = []
233
    # else use the default and select them automatically
234 1
    config = get_column_types(judgments, config)
0 ignored issues
show
Comprehensibility Best Practice introduced by
The variable get_column_types does not seem to be defined.
Loading history...
235
236 1
    judgments = remove_empty_rows(config, judgments)
237
    # allow customization of the judgments
238 1
    judgments = config.processJudgments(judgments)
239
240
    # update the config after the preprocessing of judgments
241 1
    config = get_column_types(judgments, config)
242
243 1
    all_columns = dict(list(config.input.items()) + list(config.output.items()) \
244
                       + list(platform.items()))
245
    # allColumns = dict(config.input.items() | config.output.items() | platform.items())
246 1
    judgments = judgments.rename(columns=all_columns)
247
248
    # remove columns we don't care about
249 1
    judgments = judgments[list(all_columns.values())]
250
251 1
    judgments['job'] = filename.split('.csv')[0]
252
253
    # make output values safe keys
254 1
    judgments = make_output_cols_safe_keys(config, judgments)
255
256
    # remove units with just 1 judgment
257 1
    judgments = remove_single_judgment_units(judgments)
258
259 1
    judgments['started'] = judgments['started'].apply(lambda x: dateparser.parse(str(x)))
260 1
    judgments['submitted'] = judgments['submitted'].apply(lambda x: dateparser.parse(str(x)))
261 1
    judgments['duration'] = judgments.apply(lambda row: (row['submitted'] - row['started']).seconds,
262
                                            axis=1)
263
264
    #
265
    # aggregate units
266
    #
267 1
    units = Unit.aggregate(judgments, config)
268
269
    #
270
    # aggregate annotations
271
    # i.e. output columns
272
    #
273 1
    annotations, judgments = aggregate_annotations(config, judgments)
274
275
    #
276
    # aggregate workers
277
    #
278 1
    workers = Worker.aggregate(judgments, config)
279
280
    #
281
    # aggregate job
282
    #
283 1
    job = Job.aggregate(units, judgments, config)
284
285
    # Clean up judgments
286
    # remove input columns from judgments
287 1
    output_cols = [col for col in judgments.columns.values \
288
                    if col.startswith('output') or col.startswith('metric')]
289 1
    judgments = judgments[output_cols + list(platform.values()) + ['duration', 'job']]
290
291
    # set judgment id as index
292 1
    judgments.set_index('judgment', inplace=True)
293
294
    # add missing vector values if closed task
295 1
    units = add_missing_values(config, units)
296
297 1
    return {
298
        'jobs' : job,
299
        'units' : units,
300
        'workers' : workers,
301
        'judgments' : judgments,
302
        'annotations' : annotations,
303
        }, config
304