Total Complexity | 53 |
Total Lines | 347 |
Duplicated Lines | 79.54 % |
Changes | 0 |
Duplicate code is one of the most pungent code smells. A rule that is often used is to re-structure code once it is duplicated in three or more places.
Common duplication problems, and corresponding solutions are:
Complex classes like thrift.server.TNonblockingServer often do a lot of different things. To break such a class down, we need to identify a cohesive component within that class. A common approach to find such a component is to look for fields/methods that share the same prefixes, or suffixes.
Once you have determined the fields that belong together, you can apply the Extract Class refactoring. If the component makes sense as a sub-class, Extract Subclass is also a candidate, and is often faster.
1 | # |
||
2 | # Licensed to the Apache Software Foundation (ASF) under one |
||
3 | # or more contributor license agreements. See the NOTICE file |
||
4 | # distributed with this work for additional information |
||
5 | # regarding copyright ownership. The ASF licenses this file |
||
6 | # to you under the Apache License, Version 2.0 (the |
||
7 | # "License"); you may not use this file except in compliance |
||
8 | # with the License. You may obtain a copy of the License at |
||
9 | # |
||
10 | # http://www.apache.org/licenses/LICENSE-2.0 |
||
11 | # |
||
12 | # Unless required by applicable law or agreed to in writing, |
||
13 | # software distributed under the License is distributed on an |
||
14 | # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
||
15 | # KIND, either express or implied. See the License for the |
||
16 | # specific language governing permissions and limitations |
||
17 | # under the License. |
||
18 | # |
||
19 | """Implementation of non-blocking server. |
||
20 | |||
21 | The main idea of the server is to receive and send requests |
||
22 | only from the main thread. |
||
23 | |||
24 | The thread poool should be sized for concurrent tasks, not |
||
25 | maximum connections |
||
26 | """ |
||
27 | import threading |
||
28 | import socket |
||
29 | import queue |
||
30 | import select |
||
31 | import struct |
||
32 | import logging |
||
33 | |||
34 | from thrift.transport import TTransport |
||
35 | from thrift.protocol.TBinaryProtocol import TBinaryProtocolFactory |
||
36 | |||
37 | __all__ = ['TNonblockingServer'] |
||
38 | |||
39 | |||
40 | class Worker(threading.Thread): |
||
41 | """Worker is a small helper to process incoming connection.""" |
||
42 | |||
43 | def __init__(self, queue): |
||
44 | threading.Thread.__init__(self) |
||
45 | self.queue = queue |
||
46 | |||
47 | def run(self): |
||
48 | """Process queries from task queue, stop if processor is None.""" |
||
49 | while True: |
||
50 | try: |
||
51 | processor, iprot, oprot, otrans, callback = self.queue.get() |
||
52 | if processor is None: |
||
53 | break |
||
54 | processor.process(iprot, oprot) |
||
55 | callback(True, otrans.getvalue()) |
||
56 | except Exception: |
||
57 | logging.exception("Exception while processing request") |
||
58 | callback(False, '') |
||
59 | |||
60 | WAIT_LEN = 0 |
||
61 | WAIT_MESSAGE = 1 |
||
62 | WAIT_PROCESS = 2 |
||
63 | SEND_ANSWER = 3 |
||
64 | CLOSED = 4 |
||
65 | |||
66 | |||
67 | def locked(func): |
||
68 | """Decorator which locks self.lock.""" |
||
69 | def nested(self, *args, **kwargs): |
||
70 | self.lock.acquire() |
||
71 | try: |
||
72 | return func(self, *args, **kwargs) |
||
73 | finally: |
||
74 | self.lock.release() |
||
75 | return nested |
||
76 | |||
77 | |||
78 | def socket_exception(func): |
||
79 | """Decorator close object on socket.error.""" |
||
80 | def read(self, *args, **kwargs): |
||
81 | try: |
||
82 | return func(self, *args, **kwargs) |
||
83 | except socket.error: |
||
84 | self.close() |
||
85 | return read |
||
86 | |||
87 | |||
88 | class Connection: |
||
89 | """Basic class is represented connection. |
||
90 | |||
91 | It can be in state: |
||
92 | WAIT_LEN --- connection is reading request len. |
||
93 | WAIT_MESSAGE --- connection is reading request. |
||
94 | WAIT_PROCESS --- connection has just read whole request and |
||
95 | waits for call ready routine. |
||
96 | SEND_ANSWER --- connection is sending answer string (including length |
||
97 | of answer). |
||
98 | CLOSED --- socket was closed and connection should be deleted. |
||
99 | """ |
||
100 | def __init__(self, new_socket, wake_up): |
||
101 | self.socket = new_socket |
||
102 | self.socket.setblocking(False) |
||
103 | self.status = WAIT_LEN |
||
104 | self.len = 0 |
||
105 | self.message = '' |
||
106 | self.lock = threading.Lock() |
||
107 | self.wake_up = wake_up |
||
108 | |||
109 | def _read_len(self): |
||
110 | """Reads length of request. |
||
111 | |||
112 | It's a safer alternative to self.socket.recv(4) |
||
113 | """ |
||
114 | read = self.socket.recv(4 - len(self.message)) |
||
115 | if len(read) == 0: |
||
116 | # if we read 0 bytes and self.message is empty, then |
||
117 | # the client closed the connection |
||
118 | if len(self.message) != 0: |
||
119 | logging.error("can't read frame size from socket") |
||
120 | self.close() |
||
121 | return |
||
122 | self.message += read |
||
123 | if len(self.message) == 4: |
||
124 | self.len, = struct.unpack('!i', self.message) |
||
125 | if self.len < 0: |
||
126 | logging.error("negative frame size, it seems client " |
||
127 | "doesn't use FramedTransport") |
||
128 | self.close() |
||
129 | elif self.len == 0: |
||
130 | logging.error("empty frame, it's really strange") |
||
131 | self.close() |
||
132 | else: |
||
133 | self.message = '' |
||
134 | self.status = WAIT_MESSAGE |
||
135 | |||
136 | @socket_exception |
||
137 | def read(self): |
||
138 | """Reads data from stream and switch state.""" |
||
139 | assert self.status in (WAIT_LEN, WAIT_MESSAGE) |
||
140 | if self.status == WAIT_LEN: |
||
141 | self._read_len() |
||
142 | # go back to the main loop here for simplicity instead of |
||
143 | # falling through, even though there is a good chance that |
||
144 | # the message is already available |
||
145 | elif self.status == WAIT_MESSAGE: |
||
146 | read = self.socket.recv(self.len - len(self.message)) |
||
147 | if len(read) == 0: |
||
148 | logging.error("can't read frame from socket (get %d of " |
||
149 | "%d bytes)" % (len(self.message), self.len)) |
||
150 | self.close() |
||
151 | return |
||
152 | self.message += read |
||
153 | if len(self.message) == self.len: |
||
154 | self.status = WAIT_PROCESS |
||
155 | |||
156 | @socket_exception |
||
157 | def write(self): |
||
158 | """Writes data from socket and switch state.""" |
||
159 | assert self.status == SEND_ANSWER |
||
160 | sent = self.socket.send(self.message) |
||
161 | if sent == len(self.message): |
||
162 | self.status = WAIT_LEN |
||
163 | self.message = '' |
||
164 | self.len = 0 |
||
165 | else: |
||
166 | self.message = self.message[sent:] |
||
167 | |||
168 | @locked |
||
169 | def ready(self, all_ok, message): |
||
170 | """Callback function for switching state and waking up main thread. |
||
171 | |||
172 | This function is the only function witch can be called asynchronous. |
||
173 | |||
174 | The ready can switch Connection to three states: |
||
175 | WAIT_LEN if request was oneway. |
||
176 | SEND_ANSWER if request was processed in normal way. |
||
177 | CLOSED if request throws unexpected exception. |
||
178 | |||
179 | The one wakes up main thread. |
||
180 | """ |
||
181 | assert self.status == WAIT_PROCESS |
||
182 | if not all_ok: |
||
183 | self.close() |
||
184 | self.wake_up() |
||
185 | return |
||
186 | self.len = '' |
||
187 | if len(message) == 0: |
||
188 | # it was a oneway request, do not write answer |
||
189 | self.message = '' |
||
190 | self.status = WAIT_LEN |
||
191 | else: |
||
192 | self.message = struct.pack('!i', len(message)) + message |
||
193 | self.status = SEND_ANSWER |
||
194 | self.wake_up() |
||
195 | |||
196 | @locked |
||
197 | def is_writeable(self): |
||
198 | """Return True if connection should be added to write list of select""" |
||
199 | return self.status == SEND_ANSWER |
||
200 | |||
201 | # it's not necessary, but... |
||
202 | @locked |
||
203 | def is_readable(self): |
||
204 | """Return True if connection should be added to read list of select""" |
||
205 | return self.status in (WAIT_LEN, WAIT_MESSAGE) |
||
206 | |||
207 | @locked |
||
208 | def is_closed(self): |
||
209 | """Returns True if connection is closed.""" |
||
210 | return self.status == CLOSED |
||
211 | |||
212 | def fileno(self): |
||
213 | """Returns the file descriptor of the associated socket.""" |
||
214 | return self.socket.fileno() |
||
215 | |||
216 | def close(self): |
||
217 | """Closes connection""" |
||
218 | self.status = CLOSED |
||
219 | self.socket.close() |
||
220 | |||
221 | |||
222 | class TNonblockingServer: |
||
223 | """Non-blocking server.""" |
||
224 | |||
225 | def __init__(self, |
||
226 | processor, |
||
227 | lsocket, |
||
228 | inputProtocolFactory=None, |
||
229 | outputProtocolFactory=None, |
||
230 | threads=10): |
||
231 | self.processor = processor |
||
232 | self.socket = lsocket |
||
233 | self.in_protocol = inputProtocolFactory or TBinaryProtocolFactory() |
||
234 | self.out_protocol = outputProtocolFactory or self.in_protocol |
||
235 | self.threads = int(threads) |
||
236 | self.clients = {} |
||
237 | self.tasks = queue.Queue() |
||
238 | self._read, self._write = socket.socketpair() |
||
239 | self.prepared = False |
||
240 | self._stop = False |
||
241 | |||
242 | def setNumThreads(self, num): |
||
243 | """Set the number of worker threads that should be created.""" |
||
244 | # implement ThreadPool interface |
||
245 | assert not self.prepared, "Can't change number of threads after start" |
||
246 | self.threads = num |
||
247 | |||
248 | def prepare(self): |
||
249 | """Prepares server for serve requests.""" |
||
250 | if self.prepared: |
||
251 | return |
||
252 | self.socket.listen() |
||
253 | for _ in range(self.threads): |
||
254 | thread = Worker(self.tasks) |
||
255 | thread.setDaemon(True) |
||
256 | thread.start() |
||
257 | self.prepared = True |
||
258 | |||
259 | def wake_up(self): |
||
260 | """Wake up main thread. |
||
261 | |||
262 | The server usualy waits in select call in we should terminate one. |
||
263 | The simplest way is using socketpair. |
||
264 | |||
265 | Select always wait to read from the first socket of socketpair. |
||
266 | |||
267 | In this case, we can just write anything to the second socket from |
||
268 | socketpair. |
||
269 | """ |
||
270 | self._write.send('1') |
||
271 | |||
272 | def stop(self): |
||
273 | """Stop the server. |
||
274 | |||
275 | This method causes the serve() method to return. stop() may be invoked |
||
276 | from within your handler, or from another thread. |
||
277 | |||
278 | After stop() is called, serve() will return but the server will still |
||
279 | be listening on the socket. serve() may then be called again to resume |
||
280 | processing requests. Alternatively, close() may be called after |
||
281 | serve() returns to close the server socket and shutdown all worker |
||
282 | threads. |
||
283 | """ |
||
284 | self._stop = True |
||
285 | self.wake_up() |
||
286 | |||
287 | def _select(self): |
||
288 | """Does select on open connections.""" |
||
289 | readable = [self.socket.handle.fileno(), self._read.fileno()] |
||
290 | writable = [] |
||
291 | for i, connection in list(self.clients.items()): |
||
292 | if connection.is_readable(): |
||
293 | readable.append(connection.fileno()) |
||
294 | if connection.is_writeable(): |
||
295 | writable.append(connection.fileno()) |
||
296 | if connection.is_closed(): |
||
297 | del self.clients[i] |
||
298 | return select.select(readable, writable, readable) |
||
299 | |||
300 | def handle(self): |
||
301 | """Handle requests. |
||
302 | |||
303 | WARNING! You must call prepare() BEFORE calling handle() |
||
304 | """ |
||
305 | assert self.prepared, "You have to call prepare before handle" |
||
306 | rset, wset, xset = self._select() |
||
307 | for readable in rset: |
||
308 | if readable == self._read.fileno(): |
||
309 | # don't care i just need to clean readable flag |
||
310 | self._read.recv(1024) |
||
311 | elif readable == self.socket.handle.fileno(): |
||
312 | client = self.socket.accept().handle |
||
313 | self.clients[client.fileno()] = Connection(client, |
||
314 | self.wake_up) |
||
315 | else: |
||
316 | connection = self.clients[readable] |
||
317 | connection.read() |
||
318 | if connection.status == WAIT_PROCESS: |
||
319 | itransport = TTransport.TMemoryBuffer(connection.message) |
||
320 | otransport = TTransport.TMemoryBuffer() |
||
321 | iprot = self.in_protocol.getProtocol(itransport) |
||
322 | oprot = self.out_protocol.getProtocol(otransport) |
||
323 | self.tasks.put([self.processor, iprot, oprot, |
||
324 | otransport, connection.ready]) |
||
325 | for writeable in wset: |
||
326 | self.clients[writeable].write() |
||
327 | for oob in xset: |
||
328 | self.clients[oob].close() |
||
329 | del self.clients[oob] |
||
330 | |||
331 | def close(self): |
||
332 | """Closes the server.""" |
||
333 | for _ in range(self.threads): |
||
334 | self.tasks.put([None, None, None, None, None]) |
||
335 | self.socket.close() |
||
336 | self.prepared = False |
||
337 | |||
338 | def serve(self): |
||
339 | """Serve requests. |
||
340 | |||
341 | Serve requests forever, or until stop() is called. |
||
342 | """ |
||
343 | self._stop = False |
||
344 | self.prepare() |
||
345 | while not self._stop: |
||
346 | self.handle() |
||
347 |