Completed
Branch master (477316)
by Michael
08:56
created

thrift.transport.TTransport   F

Complexity

Total Complexity 60

Size/Duplication

Total Lines 334
Duplicated Lines 66.47 %

Importance

Changes 0
Metric Value
wmc 60
eloc 176
dl 222
loc 334
rs 3.6
c 0
b 0
f 0

How to fix   Duplicated Code    Complexity   

Duplicated Code

Duplicate code is one of the most pungent code smells. A rule that is often used is to re-structure code once it is duplicated in three or more places.

Common duplication problems, and corresponding solutions are:

Complexity

 Tip:   Before tackling complexity, make sure that you eliminate any duplication first. This often can reduce the size of classes significantly.

Complex classes like thrift.transport.TTransport often do a lot of different things. To break such a class down, we need to identify a cohesive component within that class. A common approach to find such a component is to look for fields/methods that share the same prefixes, or suffixes.

Once you have determined the fields that belong together, you can apply the Extract Class refactoring. If the component makes sense as a sub-class, Extract Subclass is also a candidate, and is often faster.

1
#
2
# Licensed to the Apache Software Foundation (ASF) under one
3
# or more contributor license agreements. See the NOTICE file
4
# distributed with this work for additional information
5
# regarding copyright ownership. The ASF licenses this file
6
# to you under the Apache License, Version 2.0 (the
7
# "License"); you may not use this file except in compliance
8
# with the License. You may obtain a copy of the License at
9
#
10
#   http://www.apache.org/licenses/LICENSE-2.0
11
#
12
# Unless required by applicable law or agreed to in writing,
13
# software distributed under the License is distributed on an
14
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15
# KIND, either express or implied. See the License for the
16
# specific language governing permissions and limitations
17
# under the License.
18
#
19
20
from six import BytesIO
21
from struct import pack, unpack
22
from thrift.Thrift import TException
23
24
25
class TTransportException(TException):
26
  """Custom Transport Exception class"""
27
28
  UNKNOWN = 0
29
  NOT_OPEN = 1
30
  ALREADY_OPEN = 2
31
  TIMED_OUT = 3
32
  END_OF_FILE = 4
33
34
  def __init__(self, type=UNKNOWN, message=None):
35
    TException.__init__(self, message)
36
    self.type = type
37
38
39
class TTransportBase:
40
  """Base class for Thrift transport layer."""
41
42
  def isOpen(self):
43
    pass
44
45
  def open(self):
46
    pass
47
48
  def close(self):
49
    pass
50
51
  def read(self, sz):
52
    pass
53
54
  def readAll(self, sz):
55
    buff = b''
56
    have = 0
57
    while (have < sz):
58
      chunk = self.read(sz - have)
59
      have += len(chunk)
60
      buff += chunk
61
62
      if len(chunk) == 0:
63
        raise EOFError()
64
65
    return buff
66
67
  def write(self, buf):
68
    pass
69
70
  def flush(self):
71
    pass
72
73
74
# This class should be thought of as an interface.
75
class CReadableTransport:
76
  """base class for transports that are readable from C"""
77
78
  # TODO(dreiss): Think about changing this interface to allow us to use
79
  #               a (Python, not c) StringIO instead, because it allows
80
  #               you to write after reading.
81
82
  # NOTE: This is a classic class, so properties will NOT work
83
  #       correctly for setting.
84
  @property
85
  def cstringio_buf(self):
86
    """A cStringIO buffer that contains the current chunk we are reading."""
87
    pass
88
89
  def cstringio_refill(self, partialread, reqlen):
90
    """Refills cstringio_buf.
91
92
    Returns the currently used buffer (which can but need not be the same as
93
    the old cstringio_buf). partialread is what the C code has read from the
94
    buffer, and should be inserted into the buffer before any more reads.  The
95
    return value must be a new, not borrowed reference.  Something along the
96
    lines of self._buf should be fine.
97
98
    If reqlen bytes can't be read, throw EOFError.
99
    """
100
    pass
101
102
103
class TServerTransportBase:
104
  """Base class for Thrift server transports."""
105
106
  def listen(self):
107
    pass
108
109
  def accept(self):
110
    pass
111
112
  def close(self):
113
    pass
114
115
116
class TTransportFactoryBase:
117
  """Base class for a Transport Factory"""
118
119
  def getTransport(self, trans):
120
    return trans
121
122
123
class TBufferedTransportFactory:
124
  """Factory transport that builds buffered transports"""
125
126
  def getTransport(self, trans):
127
    buffered = TBufferedTransport(trans)
128
    return buffered
129
130
131
class TBufferedTransport(TTransportBase, CReadableTransport):
132
  """Class that wraps another transport and buffers its I/O.
133
134
  The implementation uses a (configurable) fixed-size read buffer
135
  but buffers all writes until a flush is performed.
136
  """
137
  DEFAULT_BUFFER = 4096
138
139
  def __init__(self, trans, rbuf_size=DEFAULT_BUFFER):
140
    self.__trans = trans
141
    self.__wbuf = BytesIO()
142
    self.__rbuf = BytesIO("")
143
    self.__rbuf_size = rbuf_size
144
145
  def isOpen(self):
146
    return self.__trans.isOpen()
147
148
  def open(self):
149
    return self.__trans.open()
150
151
  def close(self):
152
    return self.__trans.close()
153
154
  def read(self, sz):
155
    ret = self.__rbuf.read(sz)
156
    if len(ret) != 0:
157
      return ret
158
159
    self.__rbuf = BytesIO(self.__trans.read(max(sz, self.__rbuf_size)))
160
    return self.__rbuf.read(sz)
161
162
  def write(self, buf):
163
    self.__wbuf.write(buf)
164
165
  def flush(self):
166
    out = self.__wbuf.getvalue()
167
    # reset wbuf before write/flush to preserve state on underlying failure
168
    self.__wbuf = BytesIO()
169
    self.__trans.write(out)
170
    self.__trans.flush()
171
172
  # Implement the CReadableTransport interface.
173
  @property
174
  def cstringio_buf(self):
175
    return self.__rbuf
176
177
  def cstringio_refill(self, partialread, reqlen):
178
    retstring = partialread
179
    if reqlen < self.__rbuf_size:
180
      # try to make a read of as much as we can.
181
      retstring += self.__trans.read(self.__rbuf_size)
182
183
    # but make sure we do read reqlen bytes.
184
    if len(retstring) < reqlen:
185
      retstring += self.__trans.readAll(reqlen - len(retstring))
186
187
    self.__rbuf = BytesIO(retstring)
188
    return self.__rbuf
189
190
191
class TMemoryBuffer(TTransportBase, CReadableTransport):
192
  """Wraps a cStringIO object as a TTransport.
193
194
  NOTE: Unlike the C++ version of this class, you cannot write to it
195
        then immediately read from it.  If you want to read from a
196
        TMemoryBuffer, you must either pass a string to the constructor.
197
  TODO(dreiss): Make this work like the C++ version.
198
  """
199
200
  def __init__(self, value=None):
201
    """value -- a value to read from for stringio
202
203
    If value is set, this will be a transport for reading,
204
    otherwise, it is for writing"""
205
    if value is not None:
206
      self._buffer = BytesIO(value)
207
    else:
208
      self._buffer = BytesIO()
209
210
  def isOpen(self):
211
    return not self._buffer.closed
212
213
  def open(self):
214
    pass
215
216
  def close(self):
217
    self._buffer.close()
218
219
  def read(self, sz):
220
    return self._buffer.read(sz)
221
222
  def write(self, buf):
223
    try:
224
      self._buffer.write(buf)
225
    except TypeError:
226
      self._buffer.write(buf.encode('cp437'))
227
228
  def flush(self):
229
    pass
230
231
  def getvalue(self):
232
    return self._buffer.getvalue()
233
234
  # Implement the CReadableTransport interface.
235
  @property
236
  def cstringio_buf(self):
237
    return self._buffer
238
239
  def cstringio_refill(self, partialread, reqlen):
240
    # only one shot at reading...
241
    raise EOFError()
242
243
244
class TFramedTransportFactory:
245
  """Factory transport that builds framed transports"""
246
247
  def getTransport(self, trans):
248
    framed = TFramedTransport(trans)
249
    return framed
250
251
252
class TFramedTransport(TTransportBase, CReadableTransport):
253
  """Class that wraps another transport and frames its I/O when writing."""
254
255
  def __init__(self, trans,):
256
    self.__trans = trans
257
    self.__rbuf = BytesIO()
258
    self.__wbuf = BytesIO()
259
260
  def isOpen(self):
261
    return self.__trans.isOpen()
262
263
  def open(self):
264
    return self.__trans.open()
265
266
  def close(self):
267
    return self.__trans.close()
268
269
  def read(self, sz):
270
    ret = self.__rbuf.read(sz)
271
    if len(ret) != 0:
272
      return ret
273
274
    self.readFrame()
275
    return self.__rbuf.read(sz)
276
277
  def readFrame(self):
278
    buff = self.__trans.readAll(4)
279
    sz, = unpack('!i', buff)
280
    self.__rbuf = BytesIO(self.__trans.readAll(sz))
281
282
  def write(self, buf):
283
    self.__wbuf.write(buf)
284
285
  def flush(self):
286
    wout = self.__wbuf.getvalue()
287
    wsz = len(wout)
288
    # reset wbuf before write/flush to preserve state on underlying failure
289
    self.__wbuf = BytesIO()
290
    # N.B.: Doing this string concatenation is WAY cheaper than making
291
    # two separate calls to the underlying socket object. Socket writes in
292
    # Python turn out to be REALLY expensive, but it seems to do a pretty
293
    # good job of managing string buffer operations without excessive copies
294
    buf = pack("!i", wsz) + wout
295
    self.__trans.write(buf)
296
    self.__trans.flush()
297
298
  # Implement the CReadableTransport interface.
299
  @property
300
  def cstringio_buf(self):
301
    return self.__rbuf
302
303
  def cstringio_refill(self, prefix, reqlen):
304
    # self.__rbuf will already be empty here because fastbinary doesn't
305
    # ask for a refill until the previous buffer is empty.  Therefore,
306
    # we can start reading new frames immediately.
307
    while len(prefix) < reqlen:
308
      self.readFrame()
309
      prefix += self.__rbuf.getvalue()
310
    self.__rbuf = BytesIO(prefix)
311
    return self.__rbuf
312
313
314
class TFileObjectTransport(TTransportBase):
315
  """Wraps a file-like object to make it work as a Thrift transport."""
316
317
  def __init__(self, fileobj):
318
    self.fileobj = fileobj
319
320
  def isOpen(self):
321
    return True
322
323
  def close(self):
324
    self.fileobj.close()
325
326
  def read(self, sz):
327
    return self.fileobj.read(sz)
328
329
  def write(self, buf):
330
    self.fileobj.write(buf)
331
332
  def flush(self):
333
    self.fileobj.flush()
334