Completed
Branch master (477316)
by Michael
08:56
created

thrift.server.TNonblockingServer.TNonblockingServer.handle()   B

Complexity

Conditions 7

Size

Total Lines 30
Code Lines 24

Duplication

Lines 30
Ratio 100 %

Importance

Changes 0
Metric Value
eloc 24
dl 30
loc 30
rs 7.904
c 0
b 0
f 0
cc 7
nop 1
1
#
2
# Licensed to the Apache Software Foundation (ASF) under one
3
# or more contributor license agreements. See the NOTICE file
4
# distributed with this work for additional information
5
# regarding copyright ownership. The ASF licenses this file
6
# to you under the Apache License, Version 2.0 (the
7
# "License"); you may not use this file except in compliance
8
# with the License. You may obtain a copy of the License at
9
#
10
#   http://www.apache.org/licenses/LICENSE-2.0
11
#
12
# Unless required by applicable law or agreed to in writing,
13
# software distributed under the License is distributed on an
14
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15
# KIND, either express or implied. See the License for the
16
# specific language governing permissions and limitations
17
# under the License.
18
#
19
"""Implementation of non-blocking server.
20
21
The main idea of the server is to receive and send requests
22
only from the main thread.
23
24
The thread poool should be sized for concurrent tasks, not
25
maximum connections
26
"""
27
import threading
28
import socket
29
import queue
30
import select
31
import struct
32
import logging
33
34
from thrift.transport import TTransport
35
from thrift.protocol.TBinaryProtocol import TBinaryProtocolFactory
36
37
__all__ = ['TNonblockingServer']
38
39
40
class Worker(threading.Thread):
41
    """Worker is a small helper to process incoming connection."""
42
43
    def __init__(self, queue):
44
        threading.Thread.__init__(self)
45
        self.queue = queue
46
47
    def run(self):
48
        """Process queries from task queue, stop if processor is None."""
49
        while True:
50
            try:
51
                processor, iprot, oprot, otrans, callback = self.queue.get()
52
                if processor is None:
53
                    break
54
                processor.process(iprot, oprot)
55
                callback(True, otrans.getvalue())
56
            except Exception:
57
                logging.exception("Exception while processing request")
58
                callback(False, '')
59
60
WAIT_LEN = 0
61
WAIT_MESSAGE = 1
62
WAIT_PROCESS = 2
63
SEND_ANSWER = 3
64
CLOSED = 4
65
66
67
def locked(func):
68
    """Decorator which locks self.lock."""
69
    def nested(self, *args, **kwargs):
70
        self.lock.acquire()
71
        try:
72
            return func(self, *args, **kwargs)
73
        finally:
74
            self.lock.release()
75
    return nested
76
77
78
def socket_exception(func):
79
    """Decorator close object on socket.error."""
80
    def read(self, *args, **kwargs):
81
        try:
82
            return func(self, *args, **kwargs)
83
        except socket.error:
84
            self.close()
85
    return read
86
87
88
class Connection:
89
    """Basic class is represented connection.
90
91
    It can be in state:
92
        WAIT_LEN --- connection is reading request len.
93
        WAIT_MESSAGE --- connection is reading request.
94
        WAIT_PROCESS --- connection has just read whole request and
95
                         waits for call ready routine.
96
        SEND_ANSWER --- connection is sending answer string (including length
97
                        of answer).
98
        CLOSED --- socket was closed and connection should be deleted.
99
    """
100
    def __init__(self, new_socket, wake_up):
101
        self.socket = new_socket
102
        self.socket.setblocking(False)
103
        self.status = WAIT_LEN
104
        self.len = 0
105
        self.message = ''
106
        self.lock = threading.Lock()
107
        self.wake_up = wake_up
108
109
    def _read_len(self):
110
        """Reads length of request.
111
112
        It's a safer alternative to self.socket.recv(4)
113
        """
114
        read = self.socket.recv(4 - len(self.message))
115
        if len(read) == 0:
116
            # if we read 0 bytes and self.message is empty, then
117
            # the client closed the connection
118
            if len(self.message) != 0:
119
                logging.error("can't read frame size from socket")
120
            self.close()
121
            return
122
        self.message += read
123
        if len(self.message) == 4:
124
            self.len, = struct.unpack('!i', self.message)
125
            if self.len < 0:
126
                logging.error("negative frame size, it seems client "
127
                              "doesn't use FramedTransport")
128
                self.close()
129
            elif self.len == 0:
130
                logging.error("empty frame, it's really strange")
131
                self.close()
132
            else:
133
                self.message = ''
134
                self.status = WAIT_MESSAGE
135
136
    @socket_exception
137
    def read(self):
138
        """Reads data from stream and switch state."""
139
        assert self.status in (WAIT_LEN, WAIT_MESSAGE)
140
        if self.status == WAIT_LEN:
141
            self._read_len()
142
            # go back to the main loop here for simplicity instead of
143
            # falling through, even though there is a good chance that
144
            # the message is already available
145
        elif self.status == WAIT_MESSAGE:
146
            read = self.socket.recv(self.len - len(self.message))
147
            if len(read) == 0:
148
                logging.error("can't read frame from socket (get %d of "
149
                              "%d bytes)" % (len(self.message), self.len))
150
                self.close()
151
                return
152
            self.message += read
153
            if len(self.message) == self.len:
154
                self.status = WAIT_PROCESS
155
156
    @socket_exception
157
    def write(self):
158
        """Writes data from socket and switch state."""
159
        assert self.status == SEND_ANSWER
160
        sent = self.socket.send(self.message)
161
        if sent == len(self.message):
162
            self.status = WAIT_LEN
163
            self.message = ''
164
            self.len = 0
165
        else:
166
            self.message = self.message[sent:]
167
168
    @locked
169
    def ready(self, all_ok, message):
170
        """Callback function for switching state and waking up main thread.
171
172
        This function is the only function witch can be called asynchronous.
173
174
        The ready can switch Connection to three states:
175
            WAIT_LEN if request was oneway.
176
            SEND_ANSWER if request was processed in normal way.
177
            CLOSED if request throws unexpected exception.
178
179
        The one wakes up main thread.
180
        """
181
        assert self.status == WAIT_PROCESS
182
        if not all_ok:
183
            self.close()
184
            self.wake_up()
185
            return
186
        self.len = ''
187
        if len(message) == 0:
188
            # it was a oneway request, do not write answer
189
            self.message = ''
190
            self.status = WAIT_LEN
191
        else:
192
            self.message = struct.pack('!i', len(message)) + message
193
            self.status = SEND_ANSWER
194
        self.wake_up()
195
196
    @locked
197
    def is_writeable(self):
198
        """Return True if connection should be added to write list of select"""
199
        return self.status == SEND_ANSWER
200
201
    # it's not necessary, but...
202
    @locked
203
    def is_readable(self):
204
        """Return True if connection should be added to read list of select"""
205
        return self.status in (WAIT_LEN, WAIT_MESSAGE)
206
207
    @locked
208
    def is_closed(self):
209
        """Returns True if connection is closed."""
210
        return self.status == CLOSED
211
212
    def fileno(self):
213
        """Returns the file descriptor of the associated socket."""
214
        return self.socket.fileno()
215
216
    def close(self):
217
        """Closes connection"""
218
        self.status = CLOSED
219
        self.socket.close()
220
221
222
class TNonblockingServer:
223
    """Non-blocking server."""
224
225
    def __init__(self,
226
                 processor,
227
                 lsocket,
228
                 inputProtocolFactory=None,
229
                 outputProtocolFactory=None,
230
                 threads=10):
231
        self.processor = processor
232
        self.socket = lsocket
233
        self.in_protocol = inputProtocolFactory or TBinaryProtocolFactory()
234
        self.out_protocol = outputProtocolFactory or self.in_protocol
235
        self.threads = int(threads)
236
        self.clients = {}
237
        self.tasks = queue.Queue()
238
        self._read, self._write = socket.socketpair()
239
        self.prepared = False
240
        self._stop = False
241
242
    def setNumThreads(self, num):
243
        """Set the number of worker threads that should be created."""
244
        # implement ThreadPool interface
245
        assert not self.prepared, "Can't change number of threads after start"
246
        self.threads = num
247
248
    def prepare(self):
249
        """Prepares server for serve requests."""
250
        if self.prepared:
251
            return
252
        self.socket.listen()
253
        for _ in range(self.threads):
254
            thread = Worker(self.tasks)
255
            thread.setDaemon(True)
256
            thread.start()
257
        self.prepared = True
258
259
    def wake_up(self):
260
        """Wake up main thread.
261
262
        The server usualy waits in select call in we should terminate one.
263
        The simplest way is using socketpair.
264
265
        Select always wait to read from the first socket of socketpair.
266
267
        In this case, we can just write anything to the second socket from
268
        socketpair.
269
        """
270
        self._write.send('1')
271
272
    def stop(self):
273
        """Stop the server.
274
275
        This method causes the serve() method to return.  stop() may be invoked
276
        from within your handler, or from another thread.
277
278
        After stop() is called, serve() will return but the server will still
279
        be listening on the socket.  serve() may then be called again to resume
280
        processing requests.  Alternatively, close() may be called after
281
        serve() returns to close the server socket and shutdown all worker
282
        threads.
283
        """
284
        self._stop = True
285
        self.wake_up()
286
287
    def _select(self):
288
        """Does select on open connections."""
289
        readable = [self.socket.handle.fileno(), self._read.fileno()]
290
        writable = []
291
        for i, connection in list(self.clients.items()):
292
            if connection.is_readable():
293
                readable.append(connection.fileno())
294
            if connection.is_writeable():
295
                writable.append(connection.fileno())
296
            if connection.is_closed():
297
                del self.clients[i]
298
        return select.select(readable, writable, readable)
299
300
    def handle(self):
301
        """Handle requests.
302
303
        WARNING! You must call prepare() BEFORE calling handle()
304
        """
305
        assert self.prepared, "You have to call prepare before handle"
306
        rset, wset, xset = self._select()
307
        for readable in rset:
308
            if readable == self._read.fileno():
309
                # don't care i just need to clean readable flag
310
                self._read.recv(1024)
311
            elif readable == self.socket.handle.fileno():
312
                client = self.socket.accept().handle
313
                self.clients[client.fileno()] = Connection(client,
314
                                                           self.wake_up)
315
            else:
316
                connection = self.clients[readable]
317
                connection.read()
318
                if connection.status == WAIT_PROCESS:
319
                    itransport = TTransport.TMemoryBuffer(connection.message)
320
                    otransport = TTransport.TMemoryBuffer()
321
                    iprot = self.in_protocol.getProtocol(itransport)
322
                    oprot = self.out_protocol.getProtocol(otransport)
323
                    self.tasks.put([self.processor, iprot, oprot,
324
                                    otransport, connection.ready])
325
        for writeable in wset:
326
            self.clients[writeable].write()
327
        for oob in xset:
328
            self.clients[oob].close()
329
            del self.clients[oob]
330
331
    def close(self):
332
        """Closes the server."""
333
        for _ in range(self.threads):
334
            self.tasks.put([None, None, None, None, None])
335
        self.socket.close()
336
        self.prepared = False
337
338
    def serve(self):
339
        """Serve requests.
340
341
        Serve requests forever, or until stop() is called.
342
        """
343
        self._stop = False
344
        self.prepare()
345
        while not self._stop:
346
            self.handle()
347