| Total Complexity | 49 |
| Total Lines | 270 |
| Duplicated Lines | 85.19 % |
| Changes | 0 | ||
Duplicate code is one of the most pungent code smells. A rule that is often used is to re-structure code once it is duplicated in three or more places.
Common duplication problems, and corresponding solutions are:
Complex classes like thrift.server.TServer often do a lot of different things. To break such a class down, we need to identify a cohesive component within that class. A common approach to find such a component is to look for fields/methods that share the same prefixes, or suffixes.
Once you have determined the fields that belong together, you can apply the Extract Class refactoring. If the component makes sense as a sub-class, Extract Subclass is also a candidate, and is often faster.
| 1 | # |
||
| 2 | # Licensed to the Apache Software Foundation (ASF) under one |
||
| 3 | # or more contributor license agreements. See the NOTICE file |
||
| 4 | # distributed with this work for additional information |
||
| 5 | # regarding copyright ownership. The ASF licenses this file |
||
| 6 | # to you under the Apache License, Version 2.0 (the |
||
| 7 | # "License"); you may not use this file except in compliance |
||
| 8 | # with the License. You may obtain a copy of the License at |
||
| 9 | # |
||
| 10 | # http://www.apache.org/licenses/LICENSE-2.0 |
||
| 11 | # |
||
| 12 | # Unless required by applicable law or agreed to in writing, |
||
| 13 | # software distributed under the License is distributed on an |
||
| 14 | # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
||
| 15 | # KIND, either express or implied. See the License for the |
||
| 16 | # specific language governing permissions and limitations |
||
| 17 | # under the License. |
||
| 18 | # |
||
| 19 | |||
| 20 | import queue |
||
| 21 | import logging |
||
| 22 | import os |
||
| 23 | import sys |
||
| 24 | import threading |
||
| 25 | import traceback |
||
| 26 | |||
| 27 | from thrift.Thrift import TProcessor |
||
| 28 | from thrift.protocol import TBinaryProtocol |
||
| 29 | from thrift.transport import TTransport |
||
| 30 | |||
| 31 | |||
| 32 | class TServer: |
||
| 33 | """Base interface for a server, which must have a serve() method. |
||
| 34 | |||
| 35 | Three constructors for all servers: |
||
| 36 | 1) (processor, serverTransport) |
||
| 37 | 2) (processor, serverTransport, transportFactory, protocolFactory) |
||
| 38 | 3) (processor, serverTransport, |
||
| 39 | inputTransportFactory, outputTransportFactory, |
||
| 40 | inputProtocolFactory, outputProtocolFactory) |
||
| 41 | """ |
||
| 42 | def __init__(self, *args): |
||
| 43 | if (len(args) == 2): |
||
| 44 | self.__initArgs__(args[0], args[1], |
||
| 45 | TTransport.TTransportFactoryBase(), |
||
| 46 | TTransport.TTransportFactoryBase(), |
||
| 47 | TBinaryProtocol.TBinaryProtocolFactory(), |
||
| 48 | TBinaryProtocol.TBinaryProtocolFactory()) |
||
| 49 | elif (len(args) == 4): |
||
| 50 | self.__initArgs__(args[0], args[1], args[2], args[2], args[3], args[3]) |
||
| 51 | elif (len(args) == 6): |
||
| 52 | self.__initArgs__(args[0], args[1], args[2], args[3], args[4], args[5]) |
||
| 53 | |||
| 54 | def __initArgs__(self, processor, serverTransport, |
||
| 55 | inputTransportFactory, outputTransportFactory, |
||
| 56 | inputProtocolFactory, outputProtocolFactory): |
||
| 57 | self.processor = processor |
||
| 58 | self.serverTransport = serverTransport |
||
| 59 | self.inputTransportFactory = inputTransportFactory |
||
| 60 | self.outputTransportFactory = outputTransportFactory |
||
| 61 | self.inputProtocolFactory = inputProtocolFactory |
||
| 62 | self.outputProtocolFactory = outputProtocolFactory |
||
| 63 | |||
| 64 | def serve(self): |
||
| 65 | pass |
||
| 66 | |||
| 67 | |||
| 68 | class TSimpleServer(TServer): |
||
| 69 | """Simple single-threaded server that just pumps around one transport.""" |
||
| 70 | |||
| 71 | def __init__(self, *args): |
||
| 72 | TServer.__init__(self, *args) |
||
| 73 | |||
| 74 | def serve(self): |
||
| 75 | self.serverTransport.listen() |
||
| 76 | while True: |
||
| 77 | client = self.serverTransport.accept() |
||
| 78 | itrans = self.inputTransportFactory.getTransport(client) |
||
| 79 | otrans = self.outputTransportFactory.getTransport(client) |
||
| 80 | iprot = self.inputProtocolFactory.getProtocol(itrans) |
||
| 81 | oprot = self.outputProtocolFactory.getProtocol(otrans) |
||
| 82 | try: |
||
| 83 | while True: |
||
| 84 | self.processor.process(iprot, oprot) |
||
| 85 | except TTransport.TTransportException as tx: |
||
| 86 | pass |
||
| 87 | except Exception as x: |
||
| 88 | logging.exception(x) |
||
| 89 | |||
| 90 | itrans.close() |
||
| 91 | otrans.close() |
||
| 92 | |||
| 93 | |||
| 94 | class TThreadedServer(TServer): |
||
| 95 | """Threaded server that spawns a new thread per each connection.""" |
||
| 96 | |||
| 97 | def __init__(self, *args, **kwargs): |
||
| 98 | TServer.__init__(self, *args) |
||
| 99 | self.daemon = kwargs.get("daemon", False) |
||
| 100 | |||
| 101 | def serve(self): |
||
| 102 | self.serverTransport.listen() |
||
| 103 | while True: |
||
| 104 | try: |
||
| 105 | client = self.serverTransport.accept() |
||
| 106 | t = threading.Thread(target=self.handle, args=(client,)) |
||
| 107 | t.setDaemon(self.daemon) |
||
| 108 | t.start() |
||
| 109 | except KeyboardInterrupt: |
||
| 110 | raise |
||
| 111 | except Exception as x: |
||
| 112 | logging.exception(x) |
||
| 113 | |||
| 114 | def handle(self, client): |
||
| 115 | itrans = self.inputTransportFactory.getTransport(client) |
||
| 116 | otrans = self.outputTransportFactory.getTransport(client) |
||
| 117 | iprot = self.inputProtocolFactory.getProtocol(itrans) |
||
| 118 | oprot = self.outputProtocolFactory.getProtocol(otrans) |
||
| 119 | try: |
||
| 120 | while True: |
||
| 121 | self.processor.process(iprot, oprot) |
||
| 122 | except TTransport.TTransportException as tx: |
||
| 123 | pass |
||
| 124 | except Exception as x: |
||
| 125 | logging.exception(x) |
||
| 126 | |||
| 127 | itrans.close() |
||
| 128 | otrans.close() |
||
| 129 | |||
| 130 | |||
| 131 | class TThreadPoolServer(TServer): |
||
| 132 | """Server with a fixed size pool of threads which service requests.""" |
||
| 133 | |||
| 134 | def __init__(self, *args, **kwargs): |
||
| 135 | TServer.__init__(self, *args) |
||
| 136 | self.clients = queue.Queue() |
||
| 137 | self.threads = 10 |
||
| 138 | self.daemon = kwargs.get("daemon", False) |
||
| 139 | |||
| 140 | def setNumThreads(self, num): |
||
| 141 | """Set the number of worker threads that should be created""" |
||
| 142 | self.threads = num |
||
| 143 | |||
| 144 | def serveThread(self): |
||
| 145 | """Loop around getting clients from the shared queue and process them.""" |
||
| 146 | while True: |
||
| 147 | try: |
||
| 148 | client = self.clients.get() |
||
| 149 | self.serveClient(client) |
||
| 150 | except Exception as x: |
||
| 151 | logging.exception(x) |
||
| 152 | |||
| 153 | def serveClient(self, client): |
||
| 154 | """Process input/output from a client for as long as possible""" |
||
| 155 | itrans = self.inputTransportFactory.getTransport(client) |
||
| 156 | otrans = self.outputTransportFactory.getTransport(client) |
||
| 157 | iprot = self.inputProtocolFactory.getProtocol(itrans) |
||
| 158 | oprot = self.outputProtocolFactory.getProtocol(otrans) |
||
| 159 | try: |
||
| 160 | while True: |
||
| 161 | self.processor.process(iprot, oprot) |
||
| 162 | except TTransport.TTransportException as tx: |
||
| 163 | pass |
||
| 164 | except Exception as x: |
||
| 165 | logging.exception(x) |
||
| 166 | |||
| 167 | itrans.close() |
||
| 168 | otrans.close() |
||
| 169 | |||
| 170 | def serve(self): |
||
| 171 | """Start a fixed number of worker threads and put client into a queue""" |
||
| 172 | for i in range(self.threads): |
||
| 173 | try: |
||
| 174 | t = threading.Thread(target=self.serveThread) |
||
| 175 | t.setDaemon(self.daemon) |
||
| 176 | t.start() |
||
| 177 | except Exception as x: |
||
| 178 | logging.exception(x) |
||
| 179 | |||
| 180 | # Pump the socket for clients |
||
| 181 | self.serverTransport.listen() |
||
| 182 | while True: |
||
| 183 | try: |
||
| 184 | client = self.serverTransport.accept() |
||
| 185 | self.clients.put(client) |
||
| 186 | except Exception as x: |
||
| 187 | logging.exception(x) |
||
| 188 | |||
| 189 | |||
| 190 | class TForkingServer(TServer): |
||
| 191 | """A Thrift server that forks a new process for each request |
||
| 192 | |||
| 193 | This is more scalable than the threaded server as it does not cause |
||
| 194 | GIL contention. |
||
| 195 | |||
| 196 | Note that this has different semantics from the threading server. |
||
| 197 | Specifically, updates to shared variables will no longer be shared. |
||
| 198 | It will also not work on windows. |
||
| 199 | |||
| 200 | This code is heavily inspired by SocketServer.ForkingMixIn in the |
||
| 201 | Python stdlib. |
||
| 202 | """ |
||
| 203 | def __init__(self, *args): |
||
| 204 | TServer.__init__(self, *args) |
||
| 205 | self.children = [] |
||
| 206 | |||
| 207 | def serve(self): |
||
| 208 | def try_close(file): |
||
| 209 | try: |
||
| 210 | file.close() |
||
| 211 | except IOError as e: |
||
| 212 | logging.warning(e, exc_info=True) |
||
| 213 | |||
| 214 | self.serverTransport.listen() |
||
| 215 | while True: |
||
| 216 | client = self.serverTransport.accept() |
||
| 217 | try: |
||
| 218 | pid = os.fork() |
||
| 219 | |||
| 220 | if pid: # parent |
||
| 221 | # add before collect, otherwise you race w/ waitpid |
||
| 222 | self.children.append(pid) |
||
| 223 | self.collect_children() |
||
| 224 | |||
| 225 | # Parent must close socket or the connection may not get |
||
| 226 | # closed promptly |
||
| 227 | itrans = self.inputTransportFactory.getTransport(client) |
||
| 228 | otrans = self.outputTransportFactory.getTransport(client) |
||
| 229 | try_close(itrans) |
||
| 230 | try_close(otrans) |
||
| 231 | else: |
||
| 232 | itrans = self.inputTransportFactory.getTransport(client) |
||
| 233 | otrans = self.outputTransportFactory.getTransport(client) |
||
| 234 | |||
| 235 | iprot = self.inputProtocolFactory.getProtocol(itrans) |
||
| 236 | oprot = self.outputProtocolFactory.getProtocol(otrans) |
||
| 237 | |||
| 238 | ecode = 0 |
||
| 239 | try: |
||
| 240 | try: |
||
| 241 | while True: |
||
| 242 | self.processor.process(iprot, oprot) |
||
| 243 | except TTransport.TTransportException as tx: |
||
| 244 | pass |
||
| 245 | except Exception as e: |
||
| 246 | logging.exception(e) |
||
| 247 | ecode = 1 |
||
| 248 | finally: |
||
| 249 | try_close(itrans) |
||
| 250 | try_close(otrans) |
||
| 251 | |||
| 252 | os._exit(ecode) |
||
| 253 | |||
| 254 | except TTransport.TTransportException as tx: |
||
| 255 | pass |
||
| 256 | except Exception as x: |
||
| 257 | logging.exception(x) |
||
| 258 | |||
| 259 | def collect_children(self): |
||
| 260 | while self.children: |
||
| 261 | try: |
||
| 262 | pid, status = os.waitpid(0, os.WNOHANG) |
||
| 263 | except os.error: |
||
| 264 | pid = None |
||
| 265 | |||
| 266 | if pid: |
||
| 267 | self.children.remove(pid) |
||
| 268 | else: |
||
| 269 | break |
||
| 270 |