Total Complexity | 64 |
Total Lines | 269 |
Duplicated Lines | 0 % |
Complex classes like csvScriptBuilder often do a lot of different things. To break such a class down, we need to identify a cohesive component within that class. A common approach to find such a component is to look for fields/methods that share the same prefixes, or suffixes.
Once you have determined the fields that belong together, you can apply the Extract Class refactoring. If the component makes sense as a sub-class, Extract Subclass is also a candidate, and is often faster.
1 | import requests |
||
9 | class csvScriptBuilder: |
||
10 | |||
11 | MAX_PROPERTY_NUMBER = 4000 |
||
12 | |||
13 | CSV_FILE_NAME = "constraints.csv" |
||
14 | |||
15 | CONSTRAINT_BEGIN_STRING = "{{Constraint:" |
||
16 | |||
17 | def __init__(self): |
||
18 | self.parameters = {} |
||
19 | self.constraint_name = "" |
||
20 | |||
21 | def find_next_seperator(self, constraint_parameters, equal_sign): |
||
22 | next_equal_sign = constraint_parameters.find('=', equal_sign + 1) |
||
23 | if next_equal_sign == -1: |
||
24 | next_seperator = len(constraint_parameters) |
||
25 | else: |
||
26 | next_seperator = constraint_parameters.rfind('|', equal_sign, next_equal_sign) |
||
27 | if next_seperator == -1: |
||
28 | next_seperator = len(constraint_parameters) |
||
29 | else: |
||
30 | next_seperator = next_seperator + 1 |
||
31 | return next_seperator |
||
32 | |||
33 | def to_comma_seperated_string(self, values): |
||
34 | return values.replace("{", "").replace("}", "").replace("|", "").replace(" ", "").replace("[", "").replace("]", "").strip() |
||
35 | |||
36 | def add_property(self, values): |
||
37 | self.parameters['property'] = values.strip() |
||
38 | |||
39 | def add_classes(self, values): |
||
40 | self.parameters['class'] = self.to_comma_seperated_string(values) |
||
41 | |||
42 | def add_exceptions(self, values): |
||
43 | self.parameters['known_exception'] = self.to_comma_seperated_string(values).replace(";", ",") |
||
44 | |||
45 | def add_group_by(self, values): |
||
46 | self.parameters['group_by'] = values.strip() |
||
47 | |||
48 | def add_items(self, values): |
||
49 | itemString = "" |
||
50 | snakString = "" |
||
51 | for element in self.to_comma_seperated_string(values).split(","): |
||
52 | if element.startswith("Q"): |
||
53 | itemString = itemString + element + "," |
||
54 | elif element.lower() == "somevalue" or element.lower() == "novalue": |
||
55 | snakString = snakString + element + "," |
||
56 | if itemString != "": |
||
57 | self.parameters['item'] = itemString.rstrip(",") |
||
58 | if snakString != "": |
||
59 | self.parameters['snak'] = snakString.rstrip(",") |
||
60 | |||
61 | def add_list(self, values): |
||
62 | if self.constraint_name == "Qualifiers" or self.constraint_name == "Mandatory qualifiers": |
||
63 | self.parameters['property'] = self.to_comma_seperated_string(values) |
||
64 | else: |
||
65 | self.list_parameter = self.to_comma_seperated_string(values) |
||
66 | |||
67 | def set_constraint_name(self, values): |
||
68 | if values == 'true': |
||
69 | self.constraint_name = 'Mandatory qualifiers' |
||
70 | |||
71 | def add_status(self, values): |
||
72 | self.parameters['constraint_status'] = 'mandatory' |
||
73 | |||
74 | def add_max(self, values): |
||
75 | self.parameters['maximum_quantity'] = values.strip() |
||
76 | |||
77 | def add_min(self, values): |
||
78 | self.parameters['minimum_quantity'] = values.strip() |
||
79 | |||
80 | def add_namespace(self, values): |
||
81 | self.parameters['namespace'] = values.strip() |
||
82 | |||
83 | def add_pattern(self, values): |
||
84 | self.parameters['pattern'] = values.strip() |
||
85 | |||
86 | def add_relation(self, values): |
||
87 | self.parameters['relation'] = values.strip() |
||
88 | |||
89 | def write_one_line(self, property_number, constraint_name): |
||
90 | self.write_element_into_csv(property_number, constraint_name) |
||
91 | self.reset_parameter() |
||
92 | |||
93 | def write_multiple_lines(self, property_number, constraint_name): |
||
94 | for line in self.list_parameter.split(';'): |
||
95 | self.split_list_parameter(line) |
||
96 | self.write_element_into_csv(property_number, constraint_name) |
||
97 | self.parameters.pop('item', None) |
||
98 | self.reset_parameter() |
||
99 | |||
100 | def write_into_csv_file(self, property_number, constraint_name): |
||
101 | if self.list_parameter != 'NULL': |
||
102 | self.write_multiple_lines(property_number, constraint_name) |
||
103 | else: |
||
104 | self.write_one_line(property_number, constraint_name) |
||
105 | |||
106 | def write_element_into_csv(self, property_number, constraint_name): |
||
107 | json_blob_string = json.dumps(self.parameters).replace("<nowiki>","").replace("</nowiki>","").replace("&lt;nowiki&lt;","").replace("&lt;/nowiki&gt;","").replace("<nowiki>","").replace("</nowiki>","") |
||
108 | self.csv_writer.writerow((str(uuid.uuid4()), str(property_number), constraint_name.strip(), json_blob_string)) |
||
109 | |||
110 | |||
111 | def split_list_parameter(self, line): |
||
112 | if ':' in line: |
||
113 | self.parameters['item'] = line[line.index(':')+1:] |
||
114 | self.parameters['property'] = line[:line.index(':')] |
||
115 | else: |
||
116 | self.parameters['property'] = line |
||
117 | |||
118 | def reset_parameter(self): |
||
119 | self.parameters = {} |
||
120 | self.list_parameter = 'NULL' |
||
121 | |||
122 | |||
123 | def get_constraint_part(self, property_talk_page): |
||
124 | start = property_talk_page.find("{{Constraint:") |
||
125 | end = property_talk_page.find("==", start) |
||
126 | if end != -1: |
||
127 | property_talk_page = property_talk_page[start:end] |
||
128 | else: |
||
129 | property_talk_page = property_talk_page[start:] |
||
130 | |||
131 | #delete <!-- --> comments from site |
||
132 | open_index = property_talk_page.find("<!--") |
||
133 | while open_index != -1: |
||
134 | close_index = property_talk_page.find("-->", open_index) |
||
135 | if close_index == -1: |
||
136 | break |
||
137 | |||
138 | property_talk_page = property_talk_page[:open_index] + property_talk_page[close_index+3:] |
||
139 | |||
140 | open_index = property_talk_page.find("<!--") |
||
141 | |||
142 | return property_talk_page |
||
143 | |||
144 | |||
145 | def progress_print(self, number, maxNumber): |
||
146 | if number % 10 == 0: |
||
147 | print(str(number) + "/" + str(maxNumber)) |
||
148 | |||
149 | |||
150 | def property_exists(self, propertyTalkPage): |
||
151 | # return not (propertyTalkPage.find("Creating Property talk") != -1 or |
||
152 | # propertyTalkPage == "") |
||
153 | regex = re.compile('<title>(.*)</title>') |
||
154 | match = regex.search(propertyTalkPage) |
||
155 | if match: |
||
156 | return not "Creating Property talk" in match.group(0) |
||
157 | else: |
||
158 | return False |
||
159 | |||
160 | |||
161 | def get_constraint_end_index(self, constraintPart): |
||
162 | #match brackets to find end of constraint |
||
163 | count = 2 |
||
164 | for i, c in enumerate(constraintPart): |
||
165 | if c == '{': |
||
166 | count += 1 |
||
167 | elif c == '}': |
||
168 | count -= 1 |
||
169 | if count == 0: |
||
170 | return (i - 1) |
||
171 | |||
172 | |||
173 | def split_constraint_block(self, constraint_part): |
||
174 | start_index = constraint_part.find(self.CONSTRAINT_BEGIN_STRING) |
||
175 | if start_index != -1: |
||
176 | start_index += len(self.CONSTRAINT_BEGIN_STRING) |
||
177 | constraint_part = constraint_part[start_index:] |
||
178 | |||
179 | end_index = self.get_constraint_end_index(constraint_part) |
||
180 | constraint_string = constraint_part[:end_index] |
||
181 | remaining_constraint = constraint_part[end_index:] |
||
182 | |||
183 | return constraint_string, remaining_constraint |
||
184 | else: |
||
185 | return "", "" |
||
186 | |||
187 | |||
188 | call_method = { |
||
189 | 'base_property' : add_property, |
||
190 | 'class' : add_classes, |
||
191 | 'classes' : add_classes, |
||
192 | 'exceptions' : add_exceptions, |
||
193 | 'group by' : add_group_by, |
||
194 | 'group property' : add_group_by, |
||
195 | 'item' : add_items, |
||
196 | 'items' : add_items, |
||
197 | 'list' : add_list, |
||
198 | 'mandatory' : add_status, |
||
199 | 'max' : add_max, |
||
200 | 'min' : add_min, |
||
201 | 'namespace' : add_namespace, |
||
202 | 'pattern' : add_pattern, |
||
203 | 'property' : add_property, |
||
204 | 'relation' : add_relation, |
||
205 | 'required' : set_constraint_name, |
||
206 | 'value' : add_items, |
||
207 | 'values' : add_items |
||
208 | } |
||
209 | |||
210 | |||
211 | def split_parameters(self, constraint_parameters): |
||
212 | equal_sign_pos = constraint_parameters.find('=') |
||
213 | next_seperator = self.find_next_seperator(constraint_parameters, equal_sign_pos) |
||
214 | value_end_pos = max(-1, next_seperator - 1) |
||
215 | |||
216 | parameter_name = constraint_parameters[:equal_sign_pos].strip() |
||
217 | parameter_value = constraint_parameters[equal_sign_pos + 1 : value_end_pos] |
||
218 | remaining_constraint_parameters = constraint_parameters[next_seperator:] |
||
219 | |||
220 | return parameter_name, parameter_value, remaining_constraint_parameters |
||
221 | |||
222 | |||
223 | def add_all_parameters(self, constraint_parameters): |
||
224 | while constraint_parameters != None and constraint_parameters.find('=') != -1: |
||
225 | p_name, p_value, constraint_parameters = self.split_parameters(constraint_parameters) |
||
226 | try: |
||
227 | self.call_method[p_name](self, p_value) |
||
228 | except KeyError, e: # other Exceptions will be raised |
||
229 | pass |
||
230 | |||
231 | def process_constraint_part(self, constraint_part, property_number): |
||
232 | constraint_string, remaining_constraint = self.split_constraint_block(constraint_part) |
||
233 | while constraint_string != "": |
||
234 | self.constraint_name = None |
||
235 | self.list_parameter = 'NULL' |
||
236 | |||
237 | delimiter_index = constraint_string.find('|') |
||
238 | |||
239 | if delimiter_index == -1: |
||
240 | self.constraint_name = constraint_string |
||
241 | else: |
||
242 | self.constraint_name = constraint_string[:delimiter_index] |
||
243 | constraint_parameters = constraint_string[delimiter_index+1:] |
||
244 | self.add_all_parameters(constraint_parameters) |
||
245 | |||
246 | self.write_into_csv_file(property_number, self.constraint_name) |
||
247 | |||
248 | constraint_string, remaining_constraint = self.split_constraint_block(remaining_constraint) |
||
249 | |||
250 | |||
251 | def get_property_talk_page(self, property_number): |
||
252 | url = "http://www.wikidata.org/w/index.php?title=Property_talk:P" + \ |
||
253 | str(property_number) + "&action=edit" |
||
254 | property_talk_page = requests.get(url).text |
||
255 | return property_talk_page |
||
256 | |||
257 | |||
258 | def process_property_talk_page(self, property_number): |
||
259 | property_talk_page = self.get_property_talk_page(property_number) |
||
260 | if self.property_exists(property_talk_page): |
||
261 | constraintPart = self.get_constraint_part(property_talk_page) |
||
262 | self.process_constraint_part(constraintPart, property_number) |
||
263 | |||
264 | |||
265 | # only purpose: Build csv-Statement to fill table with constraints |
||
266 | # fetches constraints from property talk pages |
||
267 | # nonetheless: use table layout that will suit the new way of storing |
||
268 | # constraints as statements on properties |
||
269 | |||
270 | def run(self): |
||
271 | with open(self.CSV_FILE_NAME, 'wb') as csv_file: |
||
272 | self.csv_writer = csv.writer(csv_file) |
||
273 | for property_number in range(1, self.MAX_PROPERTY_NUMBER+1): |
||
274 | |||
275 | self.progress_print(property_number, self.MAX_PROPERTY_NUMBER) |
||
276 | |||
277 | self.process_property_talk_page(property_number) |
||
278 | |||
285 |