medusa.asyncore.dispatcher.log_info()   A
last analyzed

Complexity

Conditions 3

Size

Total Lines 3
Code Lines 3

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
cc 3
eloc 3
nop 3
dl 0
loc 3
rs 10
c 0
b 0
f 0
1
# -*- Mode: Python; tab-width: 4 -*-
2
#     $Id$
3
#    Author: Sam Rushing <[email protected]>
4
5
# ======================================================================
6
# Copyright 1996 by Sam Rushing
7
#
8
#                         All Rights Reserved
9
#
10
# Permission to use, copy, modify, and distribute this software and
11
# its documentation for any purpose and without fee is hereby
12
# granted, provided that the above copyright notice appear in all
13
# copies and that both that copyright notice and this permission
14
# notice appear in supporting documentation, and that the name of Sam
15
# Rushing not be used in advertising or publicity pertaining to
16
# distribution of the software without specific, written prior
17
# permission.
18
#
19
# SAM RUSHING DISCLAIMS ALL WARRANTIES WITH REGARD TO THIS SOFTWARE,
20
# INCLUDING ALL IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS, IN
21
# NO EVENT SHALL SAM RUSHING BE LIABLE FOR ANY SPECIAL, INDIRECT OR
22
# CONSEQUENTIAL DAMAGES OR ANY DAMAGES WHATSOEVER RESULTING FROM LOSS
23
# OF USE, DATA OR PROFITS, WHETHER IN AN ACTION OF CONTRACT,
24
# NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF OR IN
25
# CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
26
# ======================================================================
27
28
"""Basic infrastructure for asynchronous socket service clients and servers.
29
30
There are only two ways to have a program on a single processor do "more
31
than one thing at a time".  Multi-threaded programming is the simplest and
32
most popular way to do it, but there is another very different technique,
33
that lets you have nearly all the advantages of multi-threading, without
34
actually using multiple threads. it's really only practical if your program
35
is largely I/O bound. If your program is CPU bound, then pre-emptive
36
scheduled threads are probably what you really need. Network servers are
37
rarely CPU-bound, however.
38
39
If your operating system supports the select() system call in its I/O
40
library (and nearly all do), then you can use it to juggle multiple
41
communication channels at once; doing other work while your I/O is taking
42
place in the "background."  Although this strategy can seem strange and
43
complex, especially at first, it is in many ways easier to understand and
44
control than multi-threaded programming. The module documented here solves
45
many of the difficult problems for you, making the task of building
46
sophisticated high-performance network servers and clients a snap.
47
"""
48
49
import exceptions
50
import select
51
import socket
52
import string
53
import sys
54
55
import os
56
if os.name == 'nt':
57
    EWOULDBLOCK    = 10035
58
    EINPROGRESS    = 10036
59
    EALREADY    = 10037
60
    ECONNRESET  = 10054
61
    ENOTCONN    = 10057
62
    ESHUTDOWN    = 10058
63
else:
64
    from errno import EALREADY, EINPROGRESS, EWOULDBLOCK, ECONNRESET, ENOTCONN, ESHUTDOWN
65
66
try:
67
    socket_map
0 ignored issues
show
Comprehensibility Best Practice introduced by
The variable socket_map does not seem to be defined.
Loading history...
68
except NameError:
69
    socket_map = {}
70
71
class ExitNow (exceptions.Exception):
72
    pass
73
74
DEBUG = 0
75
76
def poll (timeout=0.0, map=None):
77
    global DEBUG
78
    if map is None:
79
        map = socket_map
80
    if map:
81
        r = []; w = []; e = []
82
        for fd, obj in map.items():
83
            if obj.readable():
84
                r.append (fd)
85
            if obj.writable():
86
                w.append (fd)
87
        r,w,e = select.select (r,w,e, timeout)
88
89
        if DEBUG:
90
            print r,w,e
91
92
        for fd in r:
93
            try:
94
                obj = map[fd]
95
                try:
96
                    obj.handle_read_event()
97
                except ExitNow:
98
                    raise ExitNow
99
                except:
100
                    obj.handle_error()
101
            except KeyError:
102
                pass
103
104
        for fd in w:
105
            try:
106
                obj = map[fd]
107
                try:
108
                    obj.handle_write_event()
109
                except ExitNow:
110
                    raise ExitNow
111
                except:
112
                    obj.handle_error()
113
            except KeyError:
114
                pass
115
116 View Code Duplication
def poll2 (timeout=0.0, map=None):
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
117
    import poll
118
    if map is None:
119
        map=socket_map
120
    # timeout is in milliseconds
121
    timeout = int(timeout*1000)
122
    if map:
123
        l = []
124
        for fd, obj in map.items():
125
            flags = 0
126
            if obj.readable():
127
                flags = poll.POLLIN
128
            if obj.writable():
129
                flags = flags | poll.POLLOUT
130
            if flags:
131
                l.append ((fd, flags))
132
        r = poll.poll (l, timeout)
133
        for fd, flags in r:
134
            try:
135
                obj = map[fd]
136
                try:
137
                    if (flags  & poll.POLLIN):
138
                        obj.handle_read_event()
139
                    if (flags & poll.POLLOUT):
140
                        obj.handle_write_event()
141
                except ExitNow:
142
                    raise ExitNow
143
                except:
144
                    obj.handle_error()
145
            except KeyError:
146
                pass
147
148 View Code Duplication
def poll3 (timeout=0.0, map=None):
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
149
    # Use the poll() support added to the select module in Python 2.0
150
    if map is None:
151
        map=socket_map
152
    # timeout is in milliseconds
153
    timeout = int(timeout*1000)
154
    pollster = select.poll()
155
    if map:
156
        l = []
157
        for fd, obj in map.items():
158
            flags = 0
159
            if obj.readable():
160
                flags = select.POLLIN
161
            if obj.writable():
162
                flags = flags | select.POLLOUT
163
            if flags:
164
                pollster.register(fd, flags)
165
        r = pollster.poll (timeout)
166
        for fd, flags in r:
167
            try:
168
                obj = map[fd]
169
                try:
170
                    if (flags  & select.POLLIN):
171
                        obj.handle_read_event()
172
                    if (flags & select.POLLOUT):
173
                        obj.handle_write_event()
174
                except ExitNow:
175
                    raise ExitNow
176
                except:
177
                    obj.handle_error()
178
            except KeyError:
179
                pass
180
181
def loop (timeout=30.0, use_poll=0, map=None):
182
183
    if use_poll:
184
        if hasattr (select, 'poll'):
185
            poll_fun = poll3
186
        else:
187
            poll_fun = poll2
188
    else:
189
        poll_fun = poll
190
191
    if map is None:
192
        map=socket_map
193
194
    while map:
195
        poll_fun (timeout, map)
196
197
class dispatcher:
198
    debug = 0
199
    connected = 0
200
    accepting = 0
201
    closing = 0
202
    addr = None
203
204
    def __init__ (self, sock=None, map=None):
205
        if sock:
206
            self.set_socket (sock, map)
207
            # I think it should inherit this anyway
208
            self.socket.setblocking (0)
209
            self.connected = 1
210
211
    def __repr__ (self):
212
        try:
213
            status = []
214
            if self.accepting and self.addr:
215
                status.append ('listening')
216
            elif self.connected:
217
                status.append ('connected')
218
            if self.addr:
219
                status.append ('%s:%d' % self.addr)
220
            return '<%s %s at %x>' % (
221
                self.__class__.__name__,
222
                string.join (status, ' '),
223
                id(self)
224
                )
225
        except:
226
            try:
227
                ar = repr(self.addr)
228
            except:
229
                ar = 'no self.addr!'
230
231
            return '<__repr__ (self) failed for object at %x (addr=%s)>' % (id(self),ar)
232
233
    def add_channel (self, map=None):
234
        #self.log_info ('adding channel %s' % self)
235
        if map is None:
236
            map=socket_map
237
        map [self._fileno] = self
238
239
    def del_channel (self, map=None):
240
        fd = self._fileno
241
        if map is None:
242
            map=socket_map
243
        if map.has_key (fd):
244
            #self.log_info ('closing channel %d:%s' % (fd, self))
245
            del map [fd]
246
247
    def create_socket (self, family, type):
248
        self.family_and_type = family, type
249
        self.socket = socket.socket (family, type)
250
        self.socket.setblocking(0)
251
        self._fileno = self.socket.fileno()
252
        self.add_channel()
253
254
    def set_socket (self, sock, map=None):
255
        self.__dict__['socket'] = sock
256
        self._fileno = sock.fileno()
257
        self.add_channel (map)
258
259
    def set_reuse_addr (self):
260
        # try to re-use a server port if possible
261
        try:
262
            self.socket.setsockopt (
263
                socket.SOL_SOCKET, socket.SO_REUSEADDR,
264
                self.socket.getsockopt (socket.SOL_SOCKET, socket.SO_REUSEADDR) | 1
265
                )
266
        except:
267
            pass
268
269
    # ==================================================
270
    # predicates for select()
271
    # these are used as filters for the lists of sockets
272
    # to pass to select().
273
    # ==================================================
274
275
    def readable (self):
276
        return 1
277
278
    if os.name == 'mac':
279
        # The macintosh will select a listening socket for
280
        # write if you let it.  What might this mean?
281
        def writable (self):
282
            return not self.accepting
283
    else:
284
        def writable (self):
285
            return 1
286
287
    # ==================================================
288
    # socket object methods.
289
    # ==================================================
290
291
    def listen (self, num):
292
        self.accepting = 1
293
        if os.name == 'nt' and num > 5:
294
            num = 1
295
        return self.socket.listen (num)
296
297
    def bind (self, addr):
298
        self.addr = addr
299
        return self.socket.bind (addr)
300
301
    def connect (self, address):
302
        self.connected = 0
303
        try:
304
            self.socket.connect (address)
305
        except socket.error, why:
306
            if why[0] in (EINPROGRESS, EALREADY, EWOULDBLOCK):
307
                return
308
            else:
309
                raise socket.error, why
310
        self.connected = 1
311
        self.handle_connect()
312
313
    def accept (self):
314
        try:
315
            conn, addr = self.socket.accept()
316
            return conn, addr
317
        except socket.error, why:
318
            if why[0] == EWOULDBLOCK:
319
                pass
320
            else:
321
                raise socket.error, why
322
323
    def send (self, data):
324
        try:
325
            result = self.socket.send (data)
326
            return result
327
        except socket.error, why:
328
            if why[0] == EWOULDBLOCK:
329
                return 0
330
            else:
331
                raise socket.error, why
332
            return 0
333
334
    def recv (self, buffer_size):
335
        try:
336
            data = self.socket.recv (buffer_size)
337
            if not data:
338
                # a closed connection is indicated by signaling
339
                # a read condition, and having recv() return 0.
340
                self.handle_close()
341
                return ''
342
            else:
343
                return data
344
        except socket.error, why:
345
            # winsock sometimes throws ENOTCONN
346
            if why[0] in [ECONNRESET, ENOTCONN, ESHUTDOWN]:
347
                self.handle_close()
348
                return ''
349
            else:
350
                raise socket.error, why
351
352
    def close (self):
353
        self.del_channel()
354
        self.socket.close()
355
356
    # cheap inheritance, used to pass all other attribute
357
    # references to the underlying socket object.
358
    def __getattr__ (self, attr):
359
        return getattr (self.socket, attr)
360
361
    # log and log_info maybe overriden to provide more sophisitcated
362
    # logging and warning methods. In general, log is for 'hit' logging
363
    # and 'log_info' is for informational, warning and error logging.
364
365
    def log (self, message):
366
        sys.stderr.write ('log: %s\n' % str(message))
367
368
    def log_info (self, message, type='info'):
369
        if __debug__ or type != 'info':
0 ignored issues
show
Comprehensibility Best Practice introduced by
The variable __debug__ does not seem to be defined.
Loading history...
370
            print '%s: %s' % (type, message)
371
372
    def handle_read_event (self):
373
        if self.accepting:
374
            # for an accepting socket, getting a read implies
375
            # that we are connected
376
            if not self.connected:
377
                self.connected = 1
378
            self.handle_accept()
379
        elif not self.connected:
380
            self.handle_connect()
381
            self.connected = 1
382
            self.handle_read()
383
        else:
384
            self.handle_read()
385
386
    def handle_write_event (self):
387
        # getting a write implies that we are connected
388
        if not self.connected:
389
            self.handle_connect()
390
            self.connected = 1
391
        self.handle_write()
392
393
    def handle_expt_event (self):
394
        self.handle_expt()
395
396
    def handle_error (self):
397
        (file,fun,line), t, v, tbinfo = compact_traceback()
398
399
        # sometimes a user repr method will crash.
400
        try:
401
            self_repr = repr (self)
402
        except:
403
            self_repr = '<__repr__ (self) failed for object at %0x>' % id(self)
404
405
        self.log_info (
406
            'uncaptured python exception, closing channel %s (%s:%s %s)' % (
407
                self_repr,
408
                t,
409
                v,
410
                tbinfo
411
                ),
412
            'error'
413
            )
414
        self.close()
415
416
    def handle_expt (self):
417
        self.log_info ('unhandled exception', 'warning')
418
419
    def handle_read (self):
420
        self.log_info ('unhandled read event', 'warning')
421
422
    def handle_write (self):
423
        self.log_info ('unhandled write event', 'warning')
424
425
    def handle_connect (self):
426
        self.log_info ('unhandled connect event', 'warning')
427
428
    def handle_accept (self):
429
        self.log_info ('unhandled accept event', 'warning')
430
431
    def handle_close (self):
432
        self.log_info ('unhandled close event', 'warning')
433
        self.close()
434
435
# ---------------------------------------------------------------------------
436
# adds simple buffered output capability, useful for simple clients.
437
# [for more sophisticated usage use asynchat.async_chat]
438
# ---------------------------------------------------------------------------
439
440
class dispatcher_with_send (dispatcher):
441
    def __init__ (self, sock=None):
442
        dispatcher.__init__ (self, sock)
443
        self.out_buffer = ''
444
445
    def initiate_send (self):
446
        num_sent = 0
447
        num_sent = dispatcher.send (self, self.out_buffer[:512])
448
        self.out_buffer = self.out_buffer[num_sent:]
449
450
    def handle_write (self):
451
        self.initiate_send()
452
453
    def writable (self):
454
        return (not self.connected) or len(self.out_buffer)
455
456
    def send (self, data):
457
        if self.debug:
458
            self.log_info ('sending %s' % repr(data))
459
        self.out_buffer = self.out_buffer + data
460
        self.initiate_send()
461
462
# ---------------------------------------------------------------------------
463
# used for debugging.
464
# ---------------------------------------------------------------------------
465
466
def compact_traceback ():
467
    t,v,tb = sys.exc_info()
468
    tbinfo = []
469
    while 1:
470
        tbinfo.append ((
471
            tb.tb_frame.f_code.co_filename,
472
            tb.tb_frame.f_code.co_name,
473
            str(tb.tb_lineno)
474
            ))
475
        tb = tb.tb_next
476
        if not tb:
477
            break
478
479
    # just to be safe
480
    del tb
481
482
    file, function, line = tbinfo[-1]
483
    info = '[' + string.join (
484
        map (
485
            lambda x: string.join (x, '|'),
486
            tbinfo
487
            ),
488
        '] ['
489
        ) + ']'
490
    return (file, function, line), t, v, info
491
492
def close_all (map=None):
493
    if map is None:
494
        map=socket_map
495
    for x in map.values():
496
        x.socket.close()
497
    map.clear()
498
499
# Asynchronous File I/O:
500
#
501
# After a little research (reading man pages on various unixen, and
502
# digging through the linux kernel), I've determined that select()
503
# isn't meant for doing doing asynchronous file i/o.
504
# Heartening, though - reading linux/mm/filemap.c shows that linux
505
# supports asynchronous read-ahead.  So _MOST_ of the time, the data
506
# will be sitting in memory for us already when we go to read it.
507
#
508
# What other OS's (besides NT) support async file i/o?  [VMS?]
509
#
510
# Regardless, this is useful for pipes, and stdin/stdout...
511
512
import os
513
if os.name == 'posix':
514
    import fcntl
515
516
    class file_wrapper:
517
        # here we override just enough to make a file
518
        # look like a socket for the purposes of asyncore.
519
        def __init__ (self, fd):
520
            self.fd = fd
521
522
        def recv (self, *args):
523
            return apply (os.read, (self.fd,)+args)
0 ignored issues
show
Comprehensibility Best Practice introduced by
The variable apply does not seem to be defined.
Loading history...
524
525
        def send (self, *args):
526
            return apply (os.write, (self.fd,)+args)
0 ignored issues
show
Comprehensibility Best Practice introduced by
The variable apply does not seem to be defined.
Loading history...
527
528
        read = recv
529
        write = send
530
531
        def close (self):
532
            return os.close (self.fd)
533
534
        def fileno (self):
535
            return self.fd
536
537
    class file_dispatcher (dispatcher):
538
        def __init__ (self, fd):
539
            dispatcher.__init__ (self)
540
            self.connected = 1
541
            # set it to non-blocking mode
542
            flags = fcntl.fcntl (fd, fcntl.F_GETFL, 0)
0 ignored issues
show
introduced by
The variable fcntl does not seem to be defined in case os.name == 'posix' on line 513 is False. Are you sure this can never be the case?
Loading history...
543
            flags = flags | fcntl.O_NONBLOCK
544
            fcntl.fcntl (fd, fcntl.F_SETFL, flags)
545
            self.set_file (fd)
546
547
        def set_file (self, fd):
548
            self._fileno = fd
549
            self.socket = file_wrapper (fd)
0 ignored issues
show
introduced by
The variable file_wrapper does not seem to be defined in case os.name == 'posix' on line 513 is False. Are you sure this can never be the case?
Loading history...
550
            self.add_channel()
551
552