exabgp.reactor.api.processes.Processes.message()   A
last analyzed

Complexity

Conditions 1

Size

Total Lines 3
Code Lines 3

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
cc 1
eloc 3
nop 8
dl 0
loc 3
rs 10
c 0
b 0
f 0

How to fix   Many Parameters   

Many Parameters

Methods with many parameters are not only hard to understand, but their parameters also often become inconsistent when you need more, or different data.

There are several approaches to avoid long parameter lists:

1
"""
2
process.py
3
4
Created by Thomas Mangin on 2011-05-02.
5
Copyright (c) 2009-2017 Exa Networks. All rights reserved.
6
License: 3-clause BSD. (See the COPYRIGHT file)
7
"""
8
9
import os
10
import errno
11
import time
12
import subprocess
13
import select
14
import fcntl
15
16
from exabgp.util.errstr import errstr
17
from exabgp.reactor.network.error import error
18
19
from exabgp.configuration.core.format import formated
20
from exabgp.reactor.api.response import Response
21
from exabgp.reactor.api.response.answer import Answer
22
23
from exabgp.bgp.message import Message
24
from exabgp.logger import log
25
26
from exabgp.version import json as json_version
27
from exabgp.version import text as text_version
28
29
from exabgp.environment import getenv
30
from threading import Thread
31
32
33
# pylint: disable=no-self-argument,not-callable,unused-argument,invalid-name
34
35
36
class ProcessError(Exception):
37
    pass
38
39
40
def preexec_helper():
41
    # make this process a new process group
42
    # os.setsid()
43
    # This prevent the signal to be sent to the children (and create a new process group)
44
    os.setpgrp()
45
    # signal.signal(signal.SIGINT, signal.SIG_IGN)
46
47
48
class Processes(object):
49
    # how many time can a process can respawn in the time interval
50
    respawn_timemask = 0xFFFFFF - 0b111111
51
    # '0b111111111111111111000000' (around a minute, 63 seconds)
52
53
    _dispatch = {}
54
55
    def __init__(self):
56
        self.clean()
57
        self.silence = False
58
        self._buffer = {}
59
        self._configuration = {}
60
        self._restart = {}
61
62
        self.respawn_number = 5 if getenv().api.respawn else 0
63
        self.terminate_on_error = getenv().api.terminate
64
        self.ack = getenv().api.ack
65
66
    def number(self):
67
        return len(self._process)
68
69
    def clean(self):
70
        self.fds = []
71
        self._process = {}
72
        self._encoder = {}
73
        self._broken = []
74
        self._respawning = {}
75
76
    def _handle_problem(self, process):
77
        if process not in self._process:
78
            return
79
        if self.respawn_number and self._restart[process]:
80
            log.debug('process %s ended, restarting it' % process, 'process')
81
            self._terminate(process)
82
            self._start(process)
83
        else:
84
            log.debug('process %s ended' % process, 'process')
85
            self._terminate(process)
86
87
    def _terminate(self, process_name):
88
        log.debug('terminating process %s' % process_name, 'process')
89
        process = self._process[process_name]
90
        del self._process[process_name]
91
        self._update_fds()
92
        thread = Thread(target=self._terminate_run, args=(process,))
93
        thread.start()
94
        return thread
95
96
    def _terminate_run(self, process):
97
        try:
98
            process.terminate()
99
            process.wait()
100
        except (OSError, KeyError):
101
            # the process is most likely already dead
102
            pass
103
104
    def terminate(self):
105
        for process in list(self._process):
106
            if not self.silence:
107
                try:
108
                    self.write(process, self._encoder[process].shutdown())
109
                except ProcessError:
110
                    pass
111
        self.silence = True
112
        # waiting a little to make sure IO is flushed to the pipes
113
        # we are using unbuffered IO but still ..
114
        time.sleep(0.1)
115
        for process in list(self._process):
116
            try:
117
                t = self._terminate(process)
118
                t.join()
119
            except OSError:
120
                # we most likely received a SIGTERM signal and our child is already dead
121
                log.debug('child process %s was already dead' % process, 'process')
122
        self.clean()
123
124
    def _start(self, process):
125
        if not self._restart.get(process, True):
126
            return
127
128
        try:
129
130
            if process in self._process:
131
                log.debug('process already running', 'process')
132
                return
133
134
            if process not in self._configuration:
135
                log.debug('can not start process, no configuration for it', 'process')
136
                return
137
            # Prevent some weird termcap data to be created at the start of the PIPE
138
            # \x1b[?1034h (no-eol) (esc)
139
            os.environ['TERM'] = 'dumb'
140
141
            configuration = self._configuration[process]
142
143
            run = configuration.get('run', '')
144
            if run:
145
                api = configuration.get('encoder', '')
146
                self._encoder[process] = Response.Text(text_version) if api == 'text' else Response.JSON(json_version)
147
148
                self._process[process] = subprocess.Popen(
149
                    run,
150
                    stdin=subprocess.PIPE,
151
                    stdout=subprocess.PIPE,
152
                    preexec_fn=preexec_helper
153
                    # This flags exists for python 2.7.3 in the documentation but on on my MAC
154
                    # creationflags=subprocess.CREATE_NEW_PROCESS_GROUP
155
                )
156
                self._update_fds()
157
                fcntl.fcntl(self._process[process].stdout.fileno(), fcntl.F_SETFL, os.O_NONBLOCK)
158
159
                log.debug('forked process %s' % process, 'process')
160
161
                self._restart[process] = self._configuration[process]['respawn']
162
                around_now = int(time.time()) & self.respawn_timemask
163
                if process in self._respawning:
164
                    if around_now in self._respawning[process]:
165
                        self._respawning[process][around_now] += 1
166
                        # we are respawning too fast
167
                        if self._respawning[process][around_now] > self.respawn_number:
168
                            log.critical(
169
                                'Too many death for %s (%d) terminating program' % (process, self.respawn_number),
170
                                'process',
171
                            )
172
                            raise ProcessError()
173
                    else:
174
                        # reset long time since last respawn
175
                        self._respawning[process] = {around_now: 1}
176
                else:
177
                    # record respawing
178
                    self._respawning[process] = {around_now: 1}
179
180
        except (subprocess.CalledProcessError, OSError, ValueError) as exc:
181
            self._broken.append(process)
182
            log.debug('could not start process %s' % process, 'process')
183
            log.debug('reason: %s' % str(exc), 'process')
184
185
    def start(self, configuration, restart=False):
186
        for process in list(self._process):
187
            if process not in configuration:
188
                self._terminate(process)
189
        self._configuration = configuration
190
        for process in configuration:
191
            if process in list(self._process):
192
                if restart:
193
                    self._terminate(process)
194
                    self._start(process)
195
                continue
196
            self._start(process)
197
198
    def broken(self, neighbor):
199
        if self._broken:
200
            for process in self._configuration:
201
                if process in self._broken:
202
                    return True
203
        return False
204
205
    def _update_fds(self):
206
        self.fds = [self._process[process].stdout.fileno() for process in self._process]
207
208
    def received(self):
209
        consumed_data = False
210
211
        for process in list(self._process):
212
            try:
213
                proc = self._process[process]
214
                poll = proc.poll()
215
216
                poller = select.poll()
217
                poller.register(
218
                    proc.stdout, select.POLLIN | select.POLLPRI | select.POLLHUP | select.POLLNVAL | select.POLLERR
219
                )
220
221
                ready = False
222
                for _, event in poller.poll(0):
223
                    if event & select.POLLIN or event & select.POLLPRI:
224
                        ready = True
225
                    elif event & select.POLLHUP or event & select.POLLERR or event & select.POLLNVAL:
226
                        self._handle_problem(process)
227
228
                if not ready:
229
                    continue
230
231
                try:
232
                    # Calling next() on Linux and OSX works perfectly well
233
                    # but not on OpenBSD where it always raise StopIteration
234
                    # and only read() works (not even readline)
235
                    buf = str(proc.stdout.read(16384), 'ascii')
236
                    if buf == '' and poll is not None:
237
                        # if proc.poll() is None then
238
                        # process is fine, we received an empty line because
239
                        # we're doing .read() on a non-blocking pipe and
240
                        # the process maybe has nothing to send yet
241
                        self._handle_problem(process)
242
                        continue
243
244
                    raw = self._buffer.get(process, '') + buf
245
246
                    while '\n' in raw:
247
                        line, raw = raw.split('\n', 1)
248
                        line = line.rstrip()
249
                        consumed_data = True
250
                        log.debug('command from process %s : %s ' % (process, line), 'process')
251
                        yield (process, formated(line))
252
253
                    self._buffer[process] = raw
254
255
                except IOError as exc:
256
                    if not exc.errno or exc.errno in error.fatal:
257
                        # if the program exits we can get an IOError with errno code zero !
258
                        self._handle_problem(process)
259
                    elif exc.errno in error.block:
260
                        # we often see errno.EINTR: call interrupted and
261
                        # we most likely have data, we will try to read them a the next loop iteration
262
                        pass
263
                    else:
264
                        log.debug('unexpected errno received from forked process (%s)' % errstr(exc), 'process')
265
                    continue
266
                except StopIteration:
267
                    if not consumed_data:
268
                        self._handle_problem(process)
269
                    continue
270
271
                # proc.poll returns None if the process is still fine
272
                # -[signal], like -15, if the process was terminated
273
                if poll is not None:
274
                    self._handle_problem(process)
275
                    return
276
277
            except KeyError:
278
                pass
279
            except (subprocess.CalledProcessError, OSError, ValueError):
280
                self._handle_problem(process)
281
282
    def write(self, process, string, neighbor=None):
283
        if string is None:
284
            return True
285
286
        # XXX: FIXME: This is potentially blocking
287
        while True:
288
            try:
289
                self._process[process].stdin.write(bytes('%s\n' % string, 'ascii'))
290
            except IOError as exc:
291
                self._broken.append(process)
292
                if exc.errno == errno.EPIPE:
293
                    self._broken.append(process)
294
                    log.debug('issue while sending data to our helper program', 'process')
295
                    raise ProcessError()
296
                else:
297
                    # Could it have been caused by a signal ? What to do.
298
                    log.debug(
299
                        'error received while sending data to helper program, retrying (%s)' % errstr(exc), 'process'
300
                    )
301
                    continue
302
            break
303
304
        try:
305
            self._process[process].stdin.flush()
306
        except IOError as exc:
307
            # AFAIK, the buffer should be flushed at the next attempt.
308
            log.debug('error received while FLUSHING data to helper program, retrying (%s)' % errstr(exc), 'process')
309
310
        return True
311
312
    def _answer(self, service, string, force=False):
313
        if force or self.ack:
314
            log.debug('responding to %s : %s' % (service, string.replace('\n', '\\n')), 'process')
315
            self.write(service, string)
316
317
    def answer_done(self, service):
318
        self._answer(service, Answer.done)
319
320
    def answer_error(self, service):
321
        self._answer(service, Answer.error)
322
323
    def _notify(self, neighbor, event):
324
        for process in neighbor.api[event]:
325
            yield process
326
327
    # do not do anything if silenced
328
    # no-self-argument
329
330
    def silenced(function):
331
        def closure(self, *args):
332
            if self.silence:
333
                return
334
            return function(self, *args)
335
336
        return closure
337
338
    # invalid-name
339
    @silenced
340
    def up(self, neighbor):
341
        for process in self._notify(neighbor, 'neighbor-changes'):
342
            self.write(process, self._encoder[process].up(neighbor), neighbor)
343
344
    @silenced
345
    def connected(self, neighbor):
346
        for process in self._notify(neighbor, 'neighbor-changes'):
347
            self.write(process, self._encoder[process].connected(neighbor), neighbor)
348
349
    @silenced
350
    def down(self, neighbor, reason):
351
        for process in self._notify(neighbor, 'neighbor-changes'):
352
            self.write(process, self._encoder[process].down(neighbor, reason), neighbor)
353
354
    @silenced
355
    def negotiated(self, neighbor, negotiated):
356
        for process in self._notify(neighbor, 'negotiated'):
357
            self.write(process, self._encoder[process].negotiated(neighbor, negotiated), neighbor)
358
359
    @silenced
360
    def fsm(self, neighbor, fsm):
361
        for process in self._notify(neighbor, 'fsm'):
362
            self.write(process, self._encoder[process].fsm(neighbor, fsm), neighbor)
363
364
    @silenced
365
    def signal(self, neighbor, signal):
366
        for process in self._notify(neighbor, 'signal'):
367
            self.write(process, self._encoder[process].signal(neighbor, signal), neighbor)
368
369
    @silenced
370
    def packets(self, neighbor, direction, category, negotiated, header, body):
371
        for process in self._notify(neighbor, '%s-packets' % direction):
372
            self.write(
373
                process,
374
                self._encoder[process].packets(neighbor, direction, category, negotiated, header, body),
375
                neighbor,
376
            )
377
378
    @silenced
379
    def notification(self, neighbor, direction, code, subcode, data, header, body):
380
        for process in self._notify(neighbor, 'neighbor-changes'):
381
            self.write(
382
                process,
383
                self._encoder[process].notification(neighbor, direction, code, subcode, data, header, body),
384
                neighbor,
385
            )
386
387
    @silenced
388
    def message(self, message_id, neighbor, direction, message, negotiated, header, *body):
389
        self._dispatch[message_id](self, neighbor, direction, message, negotiated, header, *body)
390
391
    # registering message functions
392
    # no-self-argument
393
394
    def register_process(message_id, storage=_dispatch):
395
        def closure(function):
396
            def wrap(*args):
397
                function(*args)
398
399
            storage[message_id] = wrap
400
            return wrap
401
402
        return closure
403
404
    # notifications are handled in the loop as they use different arguments
405
406
    @register_process(Message.CODE.OPEN)
407
    def _open(self, peer, direction, message, negotiated, header, body):
408
        for process in self._notify(peer, '%s-%s' % (direction, Message.CODE.OPEN.SHORT)):
409
            self.write(process, self._encoder[process].open(peer, direction, message, negotiated, header, body), peer)
410
411
    @register_process(Message.CODE.UPDATE)
412
    def _update(self, peer, direction, update, negotiated, header, body):
413
        for process in self._notify(peer, '%s-%s' % (direction, Message.CODE.UPDATE.SHORT)):
414
            self.write(process, self._encoder[process].update(peer, direction, update, negotiated, header, body), peer)
415
416
    @register_process(Message.CODE.NOTIFICATION)
417
    def _notification(self, peer, direction, message, negotiated, header, body):
418
        for process in self._notify(peer, '%s-%s' % (direction, Message.CODE.NOTIFICATION.SHORT)):
419
            self.write(
420
                process, self._encoder[process].notification(peer, direction, message, negotiated, header, body), peer
421
            )
422
423
    # unused-argument, must keep the API
424
    @register_process(Message.CODE.KEEPALIVE)
425
    def _keepalive(self, peer, direction, keepalive, negotiated, header, body):
426
        for process in self._notify(peer, '%s-%s' % (direction, Message.CODE.KEEPALIVE.SHORT)):
427
            self.write(process, self._encoder[process].keepalive(peer, direction, negotiated, header, body), peer)
428
429
    @register_process(Message.CODE.ROUTE_REFRESH)
430
    def _refresh(self, peer, direction, refresh, negotiated, header, body):
431
        for process in self._notify(peer, '%s-%s' % (direction, Message.CODE.ROUTE_REFRESH.SHORT)):
432
            self.write(
433
                process, self._encoder[process].refresh(peer, direction, refresh, negotiated, header, body), peer
434
            )
435
436
    @register_process(Message.CODE.OPERATIONAL)
437
    def _operational(self, peer, direction, operational, negotiated, header, body):
438
        for process in self._notify(peer, '%s-%s' % (direction, Message.CODE.OPERATIONAL.SHORT)):
439
            self.write(
440
                process,
441
                self._encoder[process].operational(
442
                    peer, direction, operational.category, operational, negotiated, header, body
443
                ),
444
                peer,
445
            )
446