| Conditions | 54 |
| Total Lines | 193 |
| Code Lines | 130 |
| Lines | 0 |
| Ratio | 0 % |
| Changes | 0 | ||
Small methods make your code easier to understand, in particular if combined with a good name. Besides, if your method is small, finding a good name is usually much easier.
For example, if you find yourself adding comments to a method's body, this is usually a good sign to extract the commented part to a new method, and use the comment as a starting point when coming up with a good name for this new method.
Commonly applied refactorings include:
If many parameters/temporary variables are present:
Complex classes like exabgp.reactor.loop.Reactor.run() often do a lot of different things. To break such a class down, we need to identify a cohesive component within that class. A common approach to find such a component is to look for fields/methods that share the same prefixes, or suffixes.
Once you have determined the fields that belong together, you can apply the Extract Class refactoring. If the component makes sense as a sub-class, Extract Subclass is also a candidate, and is often faster.
| 1 | # encoding: utf-8 |
||
| 224 | def run (self, validate, root): |
||
| 225 | self.daemon.daemonise() |
||
| 226 | |||
| 227 | # Make sure we create processes once we have closed file descriptor |
||
| 228 | # unfortunately, this must be done before reading the configuration file |
||
| 229 | # so we can not do it with dropped privileges |
||
| 230 | self.processes = Processes() |
||
| 231 | |||
| 232 | # we have to read the configuration possibly with root privileges |
||
| 233 | # as we need the MD5 information when we bind, and root is needed |
||
| 234 | # to bind to a port < 1024 |
||
| 235 | |||
| 236 | # this is undesirable as : |
||
| 237 | # - handling user generated data as root should be avoided |
||
| 238 | # - we may not be able to reload the configuration once the privileges are dropped |
||
| 239 | |||
| 240 | # but I can not see any way to avoid it |
||
| 241 | for ip in self._ips: |
||
| 242 | if not self.listener.listen_on(ip, None, self._port, None, False, None): |
||
| 243 | return self.Exit.listening |
||
| 244 | |||
| 245 | if not self.load(): |
||
| 246 | return self.Exit.configuration |
||
| 247 | |||
| 248 | if validate: # only validate configuration |
||
| 249 | self.logger.warning('','configuration') |
||
| 250 | self.logger.warning('parsed Neighbors, un-templated','configuration') |
||
| 251 | self.logger.warning('------------------------------','configuration') |
||
| 252 | self.logger.warning('','configuration') |
||
| 253 | for key in self._peers: |
||
| 254 | self.logger.warning(str(self._peers[key].neighbor),'configuration') |
||
| 255 | self.logger.warning('','configuration') |
||
| 256 | return self.Exit.validate |
||
| 257 | |||
| 258 | for neighbor in self.configuration.neighbors.values(): |
||
| 259 | if neighbor.listen: |
||
| 260 | if not self.listener.listen_on(neighbor.md5_ip, neighbor.peer_address, neighbor.listen, neighbor.md5_password, neighbor.md5_base64, neighbor.ttl_in): |
||
| 261 | return self.Exit.listening |
||
| 262 | |||
| 263 | if not self.early_drop: |
||
| 264 | self.processes.start(self.configuration.processes) |
||
| 265 | |||
| 266 | if not self.daemon.drop_privileges(): |
||
| 267 | self.logger.critical('could not drop privileges to \'%s\' refusing to run as root' % self.daemon.user,'reactor') |
||
| 268 | self.logger.critical('set the environmemnt value exabgp.daemon.user to change the unprivileged user','reactor') |
||
| 269 | return self.Exit.privileges |
||
| 270 | |||
| 271 | if self.early_drop: |
||
| 272 | self.processes.start(self.configuration.processes) |
||
| 273 | |||
| 274 | # This is required to make sure we can write in the log location as we now have dropped root privileges |
||
| 275 | if not self.logger.restart(): |
||
| 276 | self.logger.critical('could not setup the logger, aborting','reactor') |
||
| 277 | return self.Exit.log |
||
| 278 | |||
| 279 | if not self.daemon.savepid(): |
||
| 280 | return self.Exit.pid |
||
| 281 | |||
| 282 | # did we complete the run of updates caused by the last SIGUSR1/SIGUSR2 ? |
||
| 283 | reload_completed = False |
||
| 284 | |||
| 285 | wait = environment.settings().tcp.delay |
||
| 286 | if wait: |
||
| 287 | sleeptime = (wait * 60) - int(time.time()) % (wait * 60) |
||
| 288 | self.logger.debug('waiting for %d seconds before connecting' % sleeptime,'reactor') |
||
| 289 | time.sleep(float(sleeptime)) |
||
| 290 | |||
| 291 | workers = {} |
||
| 292 | peers = set() |
||
| 293 | api_fds = [] |
||
| 294 | |||
| 295 | while True: |
||
| 296 | try: |
||
| 297 | if self.signal.received: |
||
| 298 | for key in self._peers: |
||
| 299 | if self._peers[key].neighbor.api['signal']: |
||
| 300 | self._peers[key].reactor.processes.signal(self._peers[key].neighbor,self.signal.number) |
||
| 301 | |||
| 302 | signaled = self.signal.received |
||
| 303 | self.signal.rearm() |
||
| 304 | |||
| 305 | if signaled == Signal.SHUTDOWN: |
||
| 306 | self.shutdown() |
||
| 307 | break |
||
| 308 | |||
| 309 | if signaled == Signal.RESTART: |
||
| 310 | self.restart() |
||
| 311 | continue |
||
| 312 | |||
| 313 | if not reload_completed: |
||
| 314 | continue |
||
| 315 | |||
| 316 | if signaled == Signal.FULL_RELOAD: |
||
| 317 | self._reload_processes = True |
||
| 318 | |||
| 319 | if signaled in (Signal.RELOAD, Signal.FULL_RELOAD): |
||
| 320 | self.load() |
||
| 321 | self.processes.start(self.configuration.processes,self._reload_processes) |
||
| 322 | self._reload_processes = False |
||
| 323 | continue |
||
| 324 | |||
| 325 | if self.listener.incoming(): |
||
| 326 | # check all incoming connection |
||
| 327 | self.asynchronous.schedule(str(uuid.uuid1()),'checking for new connection(s)',self.listener.new_connections()) |
||
| 328 | |||
| 329 | peers = self.active_peers() |
||
| 330 | if self._completed(peers): |
||
| 331 | reload_completed = True |
||
| 332 | |||
| 333 | sleep = self._sleep_time |
||
| 334 | |||
| 335 | # do not attempt to listen on closed sockets even if the peer is still here |
||
| 336 | for io in list(workers.keys()): |
||
| 337 | if io == -1: |
||
| 338 | self._poller.unregister(io) |
||
| 339 | del workers[io] |
||
| 340 | |||
| 341 | # give a turn to all the peers |
||
| 342 | for key in list(peers): |
||
| 343 | peer = self._peers[key] |
||
| 344 | |||
| 345 | # limit the number of message handling per second |
||
| 346 | if self._rate_limited(key,peer.neighbor.rate_limit): |
||
| 347 | peers.discard(key) |
||
| 348 | continue |
||
| 349 | |||
| 350 | # handle the peer |
||
| 351 | action = peer.run() |
||
| 352 | |||
| 353 | # .run() returns an ACTION enum: |
||
| 354 | # * immediate if it wants to be called again |
||
| 355 | # * later if it should be called again but has no work atm |
||
| 356 | # * close if it is finished and is closing down, or restarting |
||
| 357 | if action == ACTION.CLOSE: |
||
| 358 | if key in self._peers: |
||
| 359 | del self._peers[key] |
||
| 360 | peers.discard(key) |
||
| 361 | # we are loosing this peer, not point to schedule more process work |
||
| 362 | elif action == ACTION.LATER: |
||
| 363 | io = peer.socket() |
||
| 364 | if io != -1: |
||
| 365 | self._poller.register(io, self.READ_ONLY) |
||
| 366 | workers[io] = key |
||
| 367 | # no need to come back to it before a a full cycle |
||
| 368 | peers.discard(key) |
||
| 369 | elif action == ACTION.NOW: |
||
| 370 | sleep = 0 |
||
| 371 | |||
| 372 | if not peers: |
||
| 373 | break |
||
| 374 | |||
| 375 | # read at least on message per process if there is some and parse it |
||
| 376 | for service,command in self.processes.received(): |
||
| 377 | self.api.text(self,service,command) |
||
| 378 | sleep = 0 |
||
| 379 | |||
| 380 | self.asynchronous.run() |
||
| 381 | |||
| 382 | if api_fds != self.processes.fds: |
||
| 383 | for fd in api_fds: |
||
| 384 | if fd == -1: |
||
| 385 | continue |
||
| 386 | if fd not in self.processes.fds: |
||
| 387 | self._poller.unregister(fd) |
||
| 388 | for fd in self.processes.fds: |
||
| 389 | if fd == -1: |
||
| 390 | continue |
||
| 391 | if fd not in api_fds: |
||
| 392 | self._poller.register(fd, self.READ_ONLY) |
||
| 393 | api_fds = self.processes.fds |
||
| 394 | |||
| 395 | for io in self._wait_for_io(sleep): |
||
| 396 | if io not in api_fds: |
||
| 397 | peers.add(workers[io]) |
||
| 398 | |||
| 399 | if self._stopping and not self._peers.keys(): |
||
| 400 | self._termination('exiting on peer termination',self.Exit.normal) |
||
| 401 | |||
| 402 | except KeyboardInterrupt: |
||
| 403 | self._termination('^C received',self.Exit.normal) |
||
| 404 | except SystemExit: |
||
| 405 | self._termination('exiting', self.Exit.normal) |
||
| 406 | # socket.error is a subclass of IOError (so catch it first) |
||
| 407 | except socket.error: |
||
| 408 | self._termination('socket error received',self.Exit.socket) |
||
| 409 | except IOError: |
||
| 410 | self._termination('I/O Error received, most likely ^C during IO',self.Exit.io_error) |
||
| 411 | except ProcessError: |
||
| 412 | self._termination('Problem when sending message(s) to helper program, stopping',self.Exit.process) |
||
| 413 | except select.error: |
||
| 414 | self._termination('problem using select, stopping',self.Exit.select) |
||
| 415 | |||
| 416 | return self.exit_code |
||
| 417 | |||
| 497 |