Conditions | 53 |
Total Lines | 213 |
Code Lines | 141 |
Lines | 0 |
Ratio | 0 % |
Changes | 0 |
Small methods make your code easier to understand, in particular if combined with a good name. Besides, if your method is small, finding a good name is usually much easier.
For example, if you find yourself adding comments to a method's body, this is usually a good sign to extract the commented part to a new method, and use the comment as a starting point when coming up with a good name for this new method.
Commonly applied refactorings include:
If many parameters/temporary variables are present:
Complex classes like exabgp.reactor.loop.Reactor.run() often do a lot of different things. To break such a class down, we need to identify a cohesive component within that class. A common approach to find such a component is to look for fields/methods that share the same prefixes, or suffixes.
Once you have determined the fields that belong together, you can apply the Extract Class refactoring. If the component makes sense as a sub-class, Extract Subclass is also a candidate, and is often faster.
1 | # encoding: utf-8 |
||
219 | def run(self, validate, root): |
||
220 | self.daemon.daemonise() |
||
221 | |||
222 | # Make sure we create processes once we have closed file descriptor |
||
223 | # unfortunately, this must be done before reading the configuration file |
||
224 | # so we can not do it with dropped privileges |
||
225 | self.processes = Processes() |
||
226 | |||
227 | # we have to read the configuration possibly with root privileges |
||
228 | # as we need the MD5 information when we bind, and root is needed |
||
229 | # to bind to a port < 1024 |
||
230 | |||
231 | # this is undesirable as : |
||
232 | # - handling user generated data as root should be avoided |
||
233 | # - we may not be able to reload the configuration once the privileges are dropped |
||
234 | |||
235 | # but I can not see any way to avoid it |
||
236 | for ip in self._ips: |
||
237 | if not self.listener.listen_on(ip, None, self._port, None, False, None): |
||
238 | return self.Exit.listening |
||
239 | |||
240 | if not self.reload(): |
||
241 | return self.Exit.configuration |
||
242 | |||
243 | if validate: # only validate configuration |
||
244 | log.warning('', 'configuration') |
||
245 | log.warning('parsed Neighbors, un-templated', 'configuration') |
||
246 | log.warning('------------------------------', 'configuration') |
||
247 | log.warning('', 'configuration') |
||
248 | for key in self._peers: |
||
249 | log.warning(str(self._peers[key].neighbor), 'configuration') |
||
250 | log.warning('', 'configuration') |
||
251 | return self.Exit.validate |
||
252 | |||
253 | for neighbor in self.configuration.neighbors.values(): |
||
254 | if neighbor.listen: |
||
255 | if not self.listener.listen_on( |
||
256 | neighbor.md5_ip, |
||
257 | neighbor.peer_address, |
||
258 | neighbor.listen, |
||
259 | neighbor.md5_password, |
||
260 | neighbor.md5_base64, |
||
261 | neighbor.ttl_in, |
||
262 | ): |
||
263 | return self.Exit.listening |
||
264 | |||
265 | if not self.early_drop: |
||
266 | self.processes.start(self.configuration.processes) |
||
267 | |||
268 | if not self.daemon.drop_privileges(): |
||
269 | log.critical( |
||
270 | 'could not drop privileges to \'%s\' refusing to run as root' % self.daemon.user, 'reactor' |
||
271 | ) |
||
272 | log.critical( |
||
273 | 'set the environmemnt value exabgp.daemon.user to change the unprivileged user', 'reactor' |
||
274 | ) |
||
275 | return self.Exit.privileges |
||
276 | |||
277 | if self.early_drop: |
||
278 | self.processes.start(self.configuration.processes) |
||
279 | |||
280 | # This is required to make sure we can write in the log location as we now have dropped root privileges |
||
281 | if not log.restart(): |
||
282 | log.critical('could not setup the logger, aborting', 'reactor') |
||
283 | return self.Exit.log |
||
284 | |||
285 | if not self.daemon.savepid(): |
||
286 | return self.Exit.pid |
||
287 | |||
288 | wait = getenv().tcp.delay |
||
289 | if wait: |
||
290 | sleeptime = (wait * 60) - int(time.time()) % (wait * 60) |
||
291 | log.debug('waiting for %d seconds before connecting' % sleeptime, 'reactor') |
||
292 | time.sleep(float(sleeptime)) |
||
293 | |||
294 | workers = {} |
||
295 | peers = set() |
||
296 | api_fds = [] |
||
297 | ms_sleep = int(self._sleep_time * 1000) |
||
298 | |||
299 | while True: |
||
300 | try: |
||
301 | if self.signal.received: |
||
302 | signaled = self.signal.received |
||
303 | |||
304 | # report that we received a signal |
||
305 | for key in self._peers: |
||
306 | if self._peers[key].neighbor.api['signal']: |
||
307 | self._peers[key].reactor.processes.signal(self._peers[key].neighbor, self.signal.number) |
||
308 | |||
309 | self.signal.rearm() |
||
310 | |||
311 | # we always want to exit |
||
312 | if signaled == Signal.SHUTDOWN: |
||
313 | self.exit_code = self.Exit.normal |
||
314 | self.shutdown() |
||
315 | break |
||
316 | |||
317 | # it does mot matter what we did if we are restarting |
||
318 | # as the peers and network stack are replaced by new ones |
||
319 | if signaled == Signal.RESTART: |
||
320 | self.restart() |
||
321 | continue |
||
322 | |||
323 | # did we complete the run of updates caused by the last SIGUSR1/SIGUSR2 ? |
||
324 | if self._pending_adjribout(): |
||
325 | continue |
||
326 | |||
327 | if signaled == Signal.RELOAD: |
||
328 | self.reload() |
||
329 | self.processes.start(self.configuration.processes, False) |
||
330 | continue |
||
331 | |||
332 | if signaled == Signal.FULL_RELOAD: |
||
333 | self.reload() |
||
334 | self.processes.start(self.configuration.processes, True) |
||
335 | continue |
||
336 | |||
337 | if self.listener.incoming(): |
||
338 | # check all incoming connection |
||
339 | self.asynchronous.schedule( |
||
340 | str(uuid.uuid1()), 'checking for new connection(s)', self.listener.new_connections() |
||
341 | ) |
||
342 | |||
343 | sleep = ms_sleep |
||
344 | |||
345 | # do not attempt to listen on closed sockets even if the peer is still here |
||
346 | for io in list(workers.keys()): |
||
347 | if io == -1: |
||
348 | self._poller.unregister(io) |
||
349 | del workers[io] |
||
350 | |||
351 | peers = self.active_peers() |
||
352 | # give a turn to all the peers |
||
353 | for key in list(peers): |
||
354 | peer = self._peers[key] |
||
355 | |||
356 | # limit the number of message handling per second |
||
357 | if self._rate_limited(key, peer.neighbor.rate_limit): |
||
358 | peers.discard(key) |
||
359 | continue |
||
360 | |||
361 | # handle the peer |
||
362 | action = peer.run() |
||
363 | |||
364 | # .run() returns an ACTION enum: |
||
365 | # * immediate if it wants to be called again |
||
366 | # * later if it should be called again but has no work atm |
||
367 | # * close if it is finished and is closing down, or restarting |
||
368 | if action == ACTION.CLOSE: |
||
369 | if key in self._peers: |
||
370 | del self._peers[key] |
||
371 | peers.discard(key) |
||
372 | # we are loosing this peer, not point to schedule more process work |
||
373 | elif action == ACTION.LATER: |
||
374 | io = peer.socket() |
||
375 | if io != -1: |
||
376 | self._poller.register( |
||
377 | io, select.POLLIN | select.POLLPRI | select.POLLHUP | select.POLLNVAL | select.POLLERR |
||
378 | ) |
||
379 | workers[io] = key |
||
380 | # no need to come back to it before a a full cycle |
||
381 | peers.discard(key) |
||
382 | elif action == ACTION.NOW: |
||
383 | sleep = 0 |
||
384 | |||
385 | if not peers: |
||
386 | break |
||
387 | |||
388 | # read at least on message per process if there is some and parse it |
||
389 | for service, command in self.processes.received(): |
||
390 | self.api.text(self, service, command) |
||
391 | sleep = 0 |
||
392 | |||
393 | self.asynchronous.run() |
||
394 | |||
395 | if api_fds != self.processes.fds: |
||
396 | for fd in api_fds: |
||
397 | if fd == -1: |
||
398 | continue |
||
399 | if fd not in self.processes.fds: |
||
400 | self._poller.unregister(fd) |
||
401 | for fd in self.processes.fds: |
||
402 | if fd == -1: |
||
403 | continue |
||
404 | if fd not in api_fds: |
||
405 | self._poller.register( |
||
406 | fd, select.POLLIN | select.POLLPRI | select.POLLHUP | select.POLLNVAL | select.POLLERR |
||
407 | ) |
||
408 | api_fds = self.processes.fds |
||
409 | |||
410 | for io in self._wait_for_io(sleep): |
||
411 | if io not in api_fds: |
||
412 | peers.add(workers[io]) |
||
413 | |||
414 | if self._stopping and not self._peers.keys(): |
||
415 | self._termination('exiting on peer termination', self.Exit.normal) |
||
416 | |||
417 | except KeyboardInterrupt: |
||
418 | self._termination('^C received', self.Exit.normal) |
||
419 | except SystemExit: |
||
420 | self._termination('exiting', self.Exit.normal) |
||
421 | # socket.error is a subclass of IOError (so catch it first) |
||
422 | except socket.error: |
||
423 | self._termination('socket error received', self.Exit.socket) |
||
424 | except IOError: |
||
425 | self._termination('I/O Error received, most likely ^C during IO', self.Exit.io_error) |
||
426 | except ProcessError: |
||
427 | self._termination('Problem when sending message(s) to helper program, stopping', self.Exit.process) |
||
428 | except select.error: |
||
429 | self._termination('problem using select, stopping', self.Exit.select) |
||
430 | |||
431 | return self.exit_code |
||
432 | |||
516 |