| Total Complexity | 60 |
| Total Lines | 334 |
| Duplicated Lines | 66.47 % |
| Changes | 0 | ||
Duplicate code is one of the most pungent code smells. A rule that is often used is to re-structure code once it is duplicated in three or more places.
Common duplication problems, and corresponding solutions are:
Complex classes like thrift.transport.TTransport often do a lot of different things. To break such a class down, we need to identify a cohesive component within that class. A common approach to find such a component is to look for fields/methods that share the same prefixes, or suffixes.
Once you have determined the fields that belong together, you can apply the Extract Class refactoring. If the component makes sense as a sub-class, Extract Subclass is also a candidate, and is often faster.
| 1 | # |
||
| 2 | # Licensed to the Apache Software Foundation (ASF) under one |
||
| 3 | # or more contributor license agreements. See the NOTICE file |
||
| 4 | # distributed with this work for additional information |
||
| 5 | # regarding copyright ownership. The ASF licenses this file |
||
| 6 | # to you under the Apache License, Version 2.0 (the |
||
| 7 | # "License"); you may not use this file except in compliance |
||
| 8 | # with the License. You may obtain a copy of the License at |
||
| 9 | # |
||
| 10 | # http://www.apache.org/licenses/LICENSE-2.0 |
||
| 11 | # |
||
| 12 | # Unless required by applicable law or agreed to in writing, |
||
| 13 | # software distributed under the License is distributed on an |
||
| 14 | # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
||
| 15 | # KIND, either express or implied. See the License for the |
||
| 16 | # specific language governing permissions and limitations |
||
| 17 | # under the License. |
||
| 18 | # |
||
| 19 | |||
| 20 | from six import BytesIO |
||
| 21 | from struct import pack, unpack |
||
| 22 | from thrift.Thrift import TException |
||
| 23 | |||
| 24 | |||
| 25 | class TTransportException(TException): |
||
| 26 | """Custom Transport Exception class""" |
||
| 27 | |||
| 28 | UNKNOWN = 0 |
||
| 29 | NOT_OPEN = 1 |
||
| 30 | ALREADY_OPEN = 2 |
||
| 31 | TIMED_OUT = 3 |
||
| 32 | END_OF_FILE = 4 |
||
| 33 | |||
| 34 | def __init__(self, type=UNKNOWN, message=None): |
||
| 35 | TException.__init__(self, message) |
||
| 36 | self.type = type |
||
| 37 | |||
| 38 | |||
| 39 | class TTransportBase: |
||
| 40 | """Base class for Thrift transport layer.""" |
||
| 41 | |||
| 42 | def isOpen(self): |
||
| 43 | pass |
||
| 44 | |||
| 45 | def open(self): |
||
| 46 | pass |
||
| 47 | |||
| 48 | def close(self): |
||
| 49 | pass |
||
| 50 | |||
| 51 | def read(self, sz): |
||
| 52 | pass |
||
| 53 | |||
| 54 | def readAll(self, sz): |
||
| 55 | buff = b'' |
||
| 56 | have = 0 |
||
| 57 | while (have < sz): |
||
| 58 | chunk = self.read(sz - have) |
||
| 59 | have += len(chunk) |
||
| 60 | buff += chunk |
||
| 61 | |||
| 62 | if len(chunk) == 0: |
||
| 63 | raise EOFError() |
||
| 64 | |||
| 65 | return buff |
||
| 66 | |||
| 67 | def write(self, buf): |
||
| 68 | pass |
||
| 69 | |||
| 70 | def flush(self): |
||
| 71 | pass |
||
| 72 | |||
| 73 | |||
| 74 | # This class should be thought of as an interface. |
||
| 75 | class CReadableTransport: |
||
| 76 | """base class for transports that are readable from C""" |
||
| 77 | |||
| 78 | # TODO(dreiss): Think about changing this interface to allow us to use |
||
| 79 | # a (Python, not c) StringIO instead, because it allows |
||
| 80 | # you to write after reading. |
||
| 81 | |||
| 82 | # NOTE: This is a classic class, so properties will NOT work |
||
| 83 | # correctly for setting. |
||
| 84 | @property |
||
| 85 | def cstringio_buf(self): |
||
| 86 | """A cStringIO buffer that contains the current chunk we are reading.""" |
||
| 87 | pass |
||
| 88 | |||
| 89 | def cstringio_refill(self, partialread, reqlen): |
||
| 90 | """Refills cstringio_buf. |
||
| 91 | |||
| 92 | Returns the currently used buffer (which can but need not be the same as |
||
| 93 | the old cstringio_buf). partialread is what the C code has read from the |
||
| 94 | buffer, and should be inserted into the buffer before any more reads. The |
||
| 95 | return value must be a new, not borrowed reference. Something along the |
||
| 96 | lines of self._buf should be fine. |
||
| 97 | |||
| 98 | If reqlen bytes can't be read, throw EOFError. |
||
| 99 | """ |
||
| 100 | pass |
||
| 101 | |||
| 102 | |||
| 103 | class TServerTransportBase: |
||
| 104 | """Base class for Thrift server transports.""" |
||
| 105 | |||
| 106 | def listen(self): |
||
| 107 | pass |
||
| 108 | |||
| 109 | def accept(self): |
||
| 110 | pass |
||
| 111 | |||
| 112 | def close(self): |
||
| 113 | pass |
||
| 114 | |||
| 115 | |||
| 116 | class TTransportFactoryBase: |
||
| 117 | """Base class for a Transport Factory""" |
||
| 118 | |||
| 119 | def getTransport(self, trans): |
||
| 120 | return trans |
||
| 121 | |||
| 122 | |||
| 123 | class TBufferedTransportFactory: |
||
| 124 | """Factory transport that builds buffered transports""" |
||
| 125 | |||
| 126 | def getTransport(self, trans): |
||
| 127 | buffered = TBufferedTransport(trans) |
||
| 128 | return buffered |
||
| 129 | |||
| 130 | |||
| 131 | class TBufferedTransport(TTransportBase, CReadableTransport): |
||
| 132 | """Class that wraps another transport and buffers its I/O. |
||
| 133 | |||
| 134 | The implementation uses a (configurable) fixed-size read buffer |
||
| 135 | but buffers all writes until a flush is performed. |
||
| 136 | """ |
||
| 137 | DEFAULT_BUFFER = 4096 |
||
| 138 | |||
| 139 | def __init__(self, trans, rbuf_size=DEFAULT_BUFFER): |
||
| 140 | self.__trans = trans |
||
| 141 | self.__wbuf = BytesIO() |
||
| 142 | self.__rbuf = BytesIO("") |
||
| 143 | self.__rbuf_size = rbuf_size |
||
| 144 | |||
| 145 | def isOpen(self): |
||
| 146 | return self.__trans.isOpen() |
||
| 147 | |||
| 148 | def open(self): |
||
| 149 | return self.__trans.open() |
||
| 150 | |||
| 151 | def close(self): |
||
| 152 | return self.__trans.close() |
||
| 153 | |||
| 154 | def read(self, sz): |
||
| 155 | ret = self.__rbuf.read(sz) |
||
| 156 | if len(ret) != 0: |
||
| 157 | return ret |
||
| 158 | |||
| 159 | self.__rbuf = BytesIO(self.__trans.read(max(sz, self.__rbuf_size))) |
||
| 160 | return self.__rbuf.read(sz) |
||
| 161 | |||
| 162 | def write(self, buf): |
||
| 163 | self.__wbuf.write(buf) |
||
| 164 | |||
| 165 | def flush(self): |
||
| 166 | out = self.__wbuf.getvalue() |
||
| 167 | # reset wbuf before write/flush to preserve state on underlying failure |
||
| 168 | self.__wbuf = BytesIO() |
||
| 169 | self.__trans.write(out) |
||
| 170 | self.__trans.flush() |
||
| 171 | |||
| 172 | # Implement the CReadableTransport interface. |
||
| 173 | @property |
||
| 174 | def cstringio_buf(self): |
||
| 175 | return self.__rbuf |
||
| 176 | |||
| 177 | def cstringio_refill(self, partialread, reqlen): |
||
| 178 | retstring = partialread |
||
| 179 | if reqlen < self.__rbuf_size: |
||
| 180 | # try to make a read of as much as we can. |
||
| 181 | retstring += self.__trans.read(self.__rbuf_size) |
||
| 182 | |||
| 183 | # but make sure we do read reqlen bytes. |
||
| 184 | if len(retstring) < reqlen: |
||
| 185 | retstring += self.__trans.readAll(reqlen - len(retstring)) |
||
| 186 | |||
| 187 | self.__rbuf = BytesIO(retstring) |
||
| 188 | return self.__rbuf |
||
| 189 | |||
| 190 | |||
| 191 | class TMemoryBuffer(TTransportBase, CReadableTransport): |
||
| 192 | """Wraps a cStringIO object as a TTransport. |
||
| 193 | |||
| 194 | NOTE: Unlike the C++ version of this class, you cannot write to it |
||
| 195 | then immediately read from it. If you want to read from a |
||
| 196 | TMemoryBuffer, you must either pass a string to the constructor. |
||
| 197 | TODO(dreiss): Make this work like the C++ version. |
||
| 198 | """ |
||
| 199 | |||
| 200 | def __init__(self, value=None): |
||
| 201 | """value -- a value to read from for stringio |
||
| 202 | |||
| 203 | If value is set, this will be a transport for reading, |
||
| 204 | otherwise, it is for writing""" |
||
| 205 | if value is not None: |
||
| 206 | self._buffer = BytesIO(value) |
||
| 207 | else: |
||
| 208 | self._buffer = BytesIO() |
||
| 209 | |||
| 210 | def isOpen(self): |
||
| 211 | return not self._buffer.closed |
||
| 212 | |||
| 213 | def open(self): |
||
| 214 | pass |
||
| 215 | |||
| 216 | def close(self): |
||
| 217 | self._buffer.close() |
||
| 218 | |||
| 219 | def read(self, sz): |
||
| 220 | return self._buffer.read(sz) |
||
| 221 | |||
| 222 | def write(self, buf): |
||
| 223 | try: |
||
| 224 | self._buffer.write(buf) |
||
| 225 | except TypeError: |
||
| 226 | self._buffer.write(buf.encode('cp437')) |
||
| 227 | |||
| 228 | def flush(self): |
||
| 229 | pass |
||
| 230 | |||
| 231 | def getvalue(self): |
||
| 232 | return self._buffer.getvalue() |
||
| 233 | |||
| 234 | # Implement the CReadableTransport interface. |
||
| 235 | @property |
||
| 236 | def cstringio_buf(self): |
||
| 237 | return self._buffer |
||
| 238 | |||
| 239 | def cstringio_refill(self, partialread, reqlen): |
||
| 240 | # only one shot at reading... |
||
| 241 | raise EOFError() |
||
| 242 | |||
| 243 | |||
| 244 | class TFramedTransportFactory: |
||
| 245 | """Factory transport that builds framed transports""" |
||
| 246 | |||
| 247 | def getTransport(self, trans): |
||
| 248 | framed = TFramedTransport(trans) |
||
| 249 | return framed |
||
| 250 | |||
| 251 | |||
| 252 | class TFramedTransport(TTransportBase, CReadableTransport): |
||
| 253 | """Class that wraps another transport and frames its I/O when writing.""" |
||
| 254 | |||
| 255 | def __init__(self, trans,): |
||
| 256 | self.__trans = trans |
||
| 257 | self.__rbuf = BytesIO() |
||
| 258 | self.__wbuf = BytesIO() |
||
| 259 | |||
| 260 | def isOpen(self): |
||
| 261 | return self.__trans.isOpen() |
||
| 262 | |||
| 263 | def open(self): |
||
| 264 | return self.__trans.open() |
||
| 265 | |||
| 266 | def close(self): |
||
| 267 | return self.__trans.close() |
||
| 268 | |||
| 269 | def read(self, sz): |
||
| 270 | ret = self.__rbuf.read(sz) |
||
| 271 | if len(ret) != 0: |
||
| 272 | return ret |
||
| 273 | |||
| 274 | self.readFrame() |
||
| 275 | return self.__rbuf.read(sz) |
||
| 276 | |||
| 277 | def readFrame(self): |
||
| 278 | buff = self.__trans.readAll(4) |
||
| 279 | sz, = unpack('!i', buff) |
||
| 280 | self.__rbuf = BytesIO(self.__trans.readAll(sz)) |
||
| 281 | |||
| 282 | def write(self, buf): |
||
| 283 | self.__wbuf.write(buf) |
||
| 284 | |||
| 285 | def flush(self): |
||
| 286 | wout = self.__wbuf.getvalue() |
||
| 287 | wsz = len(wout) |
||
| 288 | # reset wbuf before write/flush to preserve state on underlying failure |
||
| 289 | self.__wbuf = BytesIO() |
||
| 290 | # N.B.: Doing this string concatenation is WAY cheaper than making |
||
| 291 | # two separate calls to the underlying socket object. Socket writes in |
||
| 292 | # Python turn out to be REALLY expensive, but it seems to do a pretty |
||
| 293 | # good job of managing string buffer operations without excessive copies |
||
| 294 | buf = pack("!i", wsz) + wout |
||
| 295 | self.__trans.write(buf) |
||
| 296 | self.__trans.flush() |
||
| 297 | |||
| 298 | # Implement the CReadableTransport interface. |
||
| 299 | @property |
||
| 300 | def cstringio_buf(self): |
||
| 301 | return self.__rbuf |
||
| 302 | |||
| 303 | def cstringio_refill(self, prefix, reqlen): |
||
| 304 | # self.__rbuf will already be empty here because fastbinary doesn't |
||
| 305 | # ask for a refill until the previous buffer is empty. Therefore, |
||
| 306 | # we can start reading new frames immediately. |
||
| 307 | while len(prefix) < reqlen: |
||
| 308 | self.readFrame() |
||
| 309 | prefix += self.__rbuf.getvalue() |
||
| 310 | self.__rbuf = BytesIO(prefix) |
||
| 311 | return self.__rbuf |
||
| 312 | |||
| 313 | |||
| 314 | class TFileObjectTransport(TTransportBase): |
||
| 315 | """Wraps a file-like object to make it work as a Thrift transport.""" |
||
| 316 | |||
| 317 | def __init__(self, fileobj): |
||
| 318 | self.fileobj = fileobj |
||
| 319 | |||
| 320 | def isOpen(self): |
||
| 321 | return True |
||
| 322 | |||
| 323 | def close(self): |
||
| 324 | self.fileobj.close() |
||
| 325 | |||
| 326 | def read(self, sz): |
||
| 327 | return self.fileobj.read(sz) |
||
| 328 | |||
| 329 | def write(self, buf): |
||
| 330 | self.fileobj.write(buf) |
||
| 331 | |||
| 332 | def flush(self): |
||
| 333 | self.fileobj.flush() |
||
| 334 |