Total Complexity | 49 |
Total Lines | 270 |
Duplicated Lines | 85.19 % |
Changes | 0 |
Duplicate code is one of the most pungent code smells. A rule that is often used is to re-structure code once it is duplicated in three or more places.
Common duplication problems, and corresponding solutions are:
Complex classes like thrift.server.TServer often do a lot of different things. To break such a class down, we need to identify a cohesive component within that class. A common approach to find such a component is to look for fields/methods that share the same prefixes, or suffixes.
Once you have determined the fields that belong together, you can apply the Extract Class refactoring. If the component makes sense as a sub-class, Extract Subclass is also a candidate, and is often faster.
1 | # |
||
2 | # Licensed to the Apache Software Foundation (ASF) under one |
||
3 | # or more contributor license agreements. See the NOTICE file |
||
4 | # distributed with this work for additional information |
||
5 | # regarding copyright ownership. The ASF licenses this file |
||
6 | # to you under the Apache License, Version 2.0 (the |
||
7 | # "License"); you may not use this file except in compliance |
||
8 | # with the License. You may obtain a copy of the License at |
||
9 | # |
||
10 | # http://www.apache.org/licenses/LICENSE-2.0 |
||
11 | # |
||
12 | # Unless required by applicable law or agreed to in writing, |
||
13 | # software distributed under the License is distributed on an |
||
14 | # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
||
15 | # KIND, either express or implied. See the License for the |
||
16 | # specific language governing permissions and limitations |
||
17 | # under the License. |
||
18 | # |
||
19 | |||
20 | import queue |
||
21 | import logging |
||
22 | import os |
||
23 | import sys |
||
24 | import threading |
||
25 | import traceback |
||
26 | |||
27 | from thrift.Thrift import TProcessor |
||
28 | from thrift.protocol import TBinaryProtocol |
||
29 | from thrift.transport import TTransport |
||
30 | |||
31 | |||
32 | class TServer: |
||
33 | """Base interface for a server, which must have a serve() method. |
||
34 | |||
35 | Three constructors for all servers: |
||
36 | 1) (processor, serverTransport) |
||
37 | 2) (processor, serverTransport, transportFactory, protocolFactory) |
||
38 | 3) (processor, serverTransport, |
||
39 | inputTransportFactory, outputTransportFactory, |
||
40 | inputProtocolFactory, outputProtocolFactory) |
||
41 | """ |
||
42 | def __init__(self, *args): |
||
43 | if (len(args) == 2): |
||
44 | self.__initArgs__(args[0], args[1], |
||
45 | TTransport.TTransportFactoryBase(), |
||
46 | TTransport.TTransportFactoryBase(), |
||
47 | TBinaryProtocol.TBinaryProtocolFactory(), |
||
48 | TBinaryProtocol.TBinaryProtocolFactory()) |
||
49 | elif (len(args) == 4): |
||
50 | self.__initArgs__(args[0], args[1], args[2], args[2], args[3], args[3]) |
||
51 | elif (len(args) == 6): |
||
52 | self.__initArgs__(args[0], args[1], args[2], args[3], args[4], args[5]) |
||
53 | |||
54 | def __initArgs__(self, processor, serverTransport, |
||
55 | inputTransportFactory, outputTransportFactory, |
||
56 | inputProtocolFactory, outputProtocolFactory): |
||
57 | self.processor = processor |
||
58 | self.serverTransport = serverTransport |
||
59 | self.inputTransportFactory = inputTransportFactory |
||
60 | self.outputTransportFactory = outputTransportFactory |
||
61 | self.inputProtocolFactory = inputProtocolFactory |
||
62 | self.outputProtocolFactory = outputProtocolFactory |
||
63 | |||
64 | def serve(self): |
||
65 | pass |
||
66 | |||
67 | |||
68 | class TSimpleServer(TServer): |
||
69 | """Simple single-threaded server that just pumps around one transport.""" |
||
70 | |||
71 | def __init__(self, *args): |
||
72 | TServer.__init__(self, *args) |
||
73 | |||
74 | def serve(self): |
||
75 | self.serverTransport.listen() |
||
76 | while True: |
||
77 | client = self.serverTransport.accept() |
||
78 | itrans = self.inputTransportFactory.getTransport(client) |
||
79 | otrans = self.outputTransportFactory.getTransport(client) |
||
80 | iprot = self.inputProtocolFactory.getProtocol(itrans) |
||
81 | oprot = self.outputProtocolFactory.getProtocol(otrans) |
||
82 | try: |
||
83 | while True: |
||
84 | self.processor.process(iprot, oprot) |
||
85 | except TTransport.TTransportException as tx: |
||
86 | pass |
||
87 | except Exception as x: |
||
88 | logging.exception(x) |
||
89 | |||
90 | itrans.close() |
||
91 | otrans.close() |
||
92 | |||
93 | |||
94 | class TThreadedServer(TServer): |
||
95 | """Threaded server that spawns a new thread per each connection.""" |
||
96 | |||
97 | def __init__(self, *args, **kwargs): |
||
98 | TServer.__init__(self, *args) |
||
99 | self.daemon = kwargs.get("daemon", False) |
||
100 | |||
101 | def serve(self): |
||
102 | self.serverTransport.listen() |
||
103 | while True: |
||
104 | try: |
||
105 | client = self.serverTransport.accept() |
||
106 | t = threading.Thread(target=self.handle, args=(client,)) |
||
107 | t.setDaemon(self.daemon) |
||
108 | t.start() |
||
109 | except KeyboardInterrupt: |
||
110 | raise |
||
111 | except Exception as x: |
||
112 | logging.exception(x) |
||
113 | |||
114 | def handle(self, client): |
||
115 | itrans = self.inputTransportFactory.getTransport(client) |
||
116 | otrans = self.outputTransportFactory.getTransport(client) |
||
117 | iprot = self.inputProtocolFactory.getProtocol(itrans) |
||
118 | oprot = self.outputProtocolFactory.getProtocol(otrans) |
||
119 | try: |
||
120 | while True: |
||
121 | self.processor.process(iprot, oprot) |
||
122 | except TTransport.TTransportException as tx: |
||
123 | pass |
||
124 | except Exception as x: |
||
125 | logging.exception(x) |
||
126 | |||
127 | itrans.close() |
||
128 | otrans.close() |
||
129 | |||
130 | |||
131 | class TThreadPoolServer(TServer): |
||
132 | """Server with a fixed size pool of threads which service requests.""" |
||
133 | |||
134 | def __init__(self, *args, **kwargs): |
||
135 | TServer.__init__(self, *args) |
||
136 | self.clients = queue.Queue() |
||
137 | self.threads = 10 |
||
138 | self.daemon = kwargs.get("daemon", False) |
||
139 | |||
140 | def setNumThreads(self, num): |
||
141 | """Set the number of worker threads that should be created""" |
||
142 | self.threads = num |
||
143 | |||
144 | def serveThread(self): |
||
145 | """Loop around getting clients from the shared queue and process them.""" |
||
146 | while True: |
||
147 | try: |
||
148 | client = self.clients.get() |
||
149 | self.serveClient(client) |
||
150 | except Exception as x: |
||
151 | logging.exception(x) |
||
152 | |||
153 | def serveClient(self, client): |
||
154 | """Process input/output from a client for as long as possible""" |
||
155 | itrans = self.inputTransportFactory.getTransport(client) |
||
156 | otrans = self.outputTransportFactory.getTransport(client) |
||
157 | iprot = self.inputProtocolFactory.getProtocol(itrans) |
||
158 | oprot = self.outputProtocolFactory.getProtocol(otrans) |
||
159 | try: |
||
160 | while True: |
||
161 | self.processor.process(iprot, oprot) |
||
162 | except TTransport.TTransportException as tx: |
||
163 | pass |
||
164 | except Exception as x: |
||
165 | logging.exception(x) |
||
166 | |||
167 | itrans.close() |
||
168 | otrans.close() |
||
169 | |||
170 | def serve(self): |
||
171 | """Start a fixed number of worker threads and put client into a queue""" |
||
172 | for i in range(self.threads): |
||
173 | try: |
||
174 | t = threading.Thread(target=self.serveThread) |
||
175 | t.setDaemon(self.daemon) |
||
176 | t.start() |
||
177 | except Exception as x: |
||
178 | logging.exception(x) |
||
179 | |||
180 | # Pump the socket for clients |
||
181 | self.serverTransport.listen() |
||
182 | while True: |
||
183 | try: |
||
184 | client = self.serverTransport.accept() |
||
185 | self.clients.put(client) |
||
186 | except Exception as x: |
||
187 | logging.exception(x) |
||
188 | |||
189 | |||
190 | class TForkingServer(TServer): |
||
191 | """A Thrift server that forks a new process for each request |
||
192 | |||
193 | This is more scalable than the threaded server as it does not cause |
||
194 | GIL contention. |
||
195 | |||
196 | Note that this has different semantics from the threading server. |
||
197 | Specifically, updates to shared variables will no longer be shared. |
||
198 | It will also not work on windows. |
||
199 | |||
200 | This code is heavily inspired by SocketServer.ForkingMixIn in the |
||
201 | Python stdlib. |
||
202 | """ |
||
203 | def __init__(self, *args): |
||
204 | TServer.__init__(self, *args) |
||
205 | self.children = [] |
||
206 | |||
207 | def serve(self): |
||
208 | def try_close(file): |
||
209 | try: |
||
210 | file.close() |
||
211 | except IOError as e: |
||
212 | logging.warning(e, exc_info=True) |
||
213 | |||
214 | self.serverTransport.listen() |
||
215 | while True: |
||
216 | client = self.serverTransport.accept() |
||
217 | try: |
||
218 | pid = os.fork() |
||
219 | |||
220 | if pid: # parent |
||
221 | # add before collect, otherwise you race w/ waitpid |
||
222 | self.children.append(pid) |
||
223 | self.collect_children() |
||
224 | |||
225 | # Parent must close socket or the connection may not get |
||
226 | # closed promptly |
||
227 | itrans = self.inputTransportFactory.getTransport(client) |
||
228 | otrans = self.outputTransportFactory.getTransport(client) |
||
229 | try_close(itrans) |
||
230 | try_close(otrans) |
||
231 | else: |
||
232 | itrans = self.inputTransportFactory.getTransport(client) |
||
233 | otrans = self.outputTransportFactory.getTransport(client) |
||
234 | |||
235 | iprot = self.inputProtocolFactory.getProtocol(itrans) |
||
236 | oprot = self.outputProtocolFactory.getProtocol(otrans) |
||
237 | |||
238 | ecode = 0 |
||
239 | try: |
||
240 | try: |
||
241 | while True: |
||
242 | self.processor.process(iprot, oprot) |
||
243 | except TTransport.TTransportException as tx: |
||
244 | pass |
||
245 | except Exception as e: |
||
246 | logging.exception(e) |
||
247 | ecode = 1 |
||
248 | finally: |
||
249 | try_close(itrans) |
||
250 | try_close(otrans) |
||
251 | |||
252 | os._exit(ecode) |
||
253 | |||
254 | except TTransport.TTransportException as tx: |
||
255 | pass |
||
256 | except Exception as x: |
||
257 | logging.exception(x) |
||
258 | |||
259 | def collect_children(self): |
||
260 | while self.children: |
||
261 | try: |
||
262 | pid, status = os.waitpid(0, os.WNOHANG) |
||
263 | except os.error: |
||
264 | pid = None |
||
265 | |||
266 | if pid: |
||
267 | self.children.remove(pid) |
||
268 | else: |
||
269 | break |
||
270 |