Total Complexity | 60 |
Total Lines | 334 |
Duplicated Lines | 66.47 % |
Changes | 0 |
Duplicate code is one of the most pungent code smells. A rule that is often used is to re-structure code once it is duplicated in three or more places.
Common duplication problems, and corresponding solutions are:
Complex classes like thrift.transport.TTransport often do a lot of different things. To break such a class down, we need to identify a cohesive component within that class. A common approach to find such a component is to look for fields/methods that share the same prefixes, or suffixes.
Once you have determined the fields that belong together, you can apply the Extract Class refactoring. If the component makes sense as a sub-class, Extract Subclass is also a candidate, and is often faster.
1 | # |
||
2 | # Licensed to the Apache Software Foundation (ASF) under one |
||
3 | # or more contributor license agreements. See the NOTICE file |
||
4 | # distributed with this work for additional information |
||
5 | # regarding copyright ownership. The ASF licenses this file |
||
6 | # to you under the Apache License, Version 2.0 (the |
||
7 | # "License"); you may not use this file except in compliance |
||
8 | # with the License. You may obtain a copy of the License at |
||
9 | # |
||
10 | # http://www.apache.org/licenses/LICENSE-2.0 |
||
11 | # |
||
12 | # Unless required by applicable law or agreed to in writing, |
||
13 | # software distributed under the License is distributed on an |
||
14 | # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
||
15 | # KIND, either express or implied. See the License for the |
||
16 | # specific language governing permissions and limitations |
||
17 | # under the License. |
||
18 | # |
||
19 | |||
20 | from six import BytesIO |
||
21 | from struct import pack, unpack |
||
22 | from thrift.Thrift import TException |
||
23 | |||
24 | |||
25 | class TTransportException(TException): |
||
26 | """Custom Transport Exception class""" |
||
27 | |||
28 | UNKNOWN = 0 |
||
29 | NOT_OPEN = 1 |
||
30 | ALREADY_OPEN = 2 |
||
31 | TIMED_OUT = 3 |
||
32 | END_OF_FILE = 4 |
||
33 | |||
34 | def __init__(self, type=UNKNOWN, message=None): |
||
35 | TException.__init__(self, message) |
||
36 | self.type = type |
||
37 | |||
38 | |||
39 | class TTransportBase: |
||
40 | """Base class for Thrift transport layer.""" |
||
41 | |||
42 | def isOpen(self): |
||
43 | pass |
||
44 | |||
45 | def open(self): |
||
46 | pass |
||
47 | |||
48 | def close(self): |
||
49 | pass |
||
50 | |||
51 | def read(self, sz): |
||
52 | pass |
||
53 | |||
54 | def readAll(self, sz): |
||
55 | buff = b'' |
||
56 | have = 0 |
||
57 | while (have < sz): |
||
58 | chunk = self.read(sz - have) |
||
59 | have += len(chunk) |
||
60 | buff += chunk |
||
61 | |||
62 | if len(chunk) == 0: |
||
63 | raise EOFError() |
||
64 | |||
65 | return buff |
||
66 | |||
67 | def write(self, buf): |
||
68 | pass |
||
69 | |||
70 | def flush(self): |
||
71 | pass |
||
72 | |||
73 | |||
74 | # This class should be thought of as an interface. |
||
75 | class CReadableTransport: |
||
76 | """base class for transports that are readable from C""" |
||
77 | |||
78 | # TODO(dreiss): Think about changing this interface to allow us to use |
||
79 | # a (Python, not c) StringIO instead, because it allows |
||
80 | # you to write after reading. |
||
81 | |||
82 | # NOTE: This is a classic class, so properties will NOT work |
||
83 | # correctly for setting. |
||
84 | @property |
||
85 | def cstringio_buf(self): |
||
86 | """A cStringIO buffer that contains the current chunk we are reading.""" |
||
87 | pass |
||
88 | |||
89 | def cstringio_refill(self, partialread, reqlen): |
||
90 | """Refills cstringio_buf. |
||
91 | |||
92 | Returns the currently used buffer (which can but need not be the same as |
||
93 | the old cstringio_buf). partialread is what the C code has read from the |
||
94 | buffer, and should be inserted into the buffer before any more reads. The |
||
95 | return value must be a new, not borrowed reference. Something along the |
||
96 | lines of self._buf should be fine. |
||
97 | |||
98 | If reqlen bytes can't be read, throw EOFError. |
||
99 | """ |
||
100 | pass |
||
101 | |||
102 | |||
103 | class TServerTransportBase: |
||
104 | """Base class for Thrift server transports.""" |
||
105 | |||
106 | def listen(self): |
||
107 | pass |
||
108 | |||
109 | def accept(self): |
||
110 | pass |
||
111 | |||
112 | def close(self): |
||
113 | pass |
||
114 | |||
115 | |||
116 | class TTransportFactoryBase: |
||
117 | """Base class for a Transport Factory""" |
||
118 | |||
119 | def getTransport(self, trans): |
||
120 | return trans |
||
121 | |||
122 | |||
123 | class TBufferedTransportFactory: |
||
124 | """Factory transport that builds buffered transports""" |
||
125 | |||
126 | def getTransport(self, trans): |
||
127 | buffered = TBufferedTransport(trans) |
||
128 | return buffered |
||
129 | |||
130 | |||
131 | class TBufferedTransport(TTransportBase, CReadableTransport): |
||
132 | """Class that wraps another transport and buffers its I/O. |
||
133 | |||
134 | The implementation uses a (configurable) fixed-size read buffer |
||
135 | but buffers all writes until a flush is performed. |
||
136 | """ |
||
137 | DEFAULT_BUFFER = 4096 |
||
138 | |||
139 | def __init__(self, trans, rbuf_size=DEFAULT_BUFFER): |
||
140 | self.__trans = trans |
||
141 | self.__wbuf = BytesIO() |
||
142 | self.__rbuf = BytesIO("") |
||
143 | self.__rbuf_size = rbuf_size |
||
144 | |||
145 | def isOpen(self): |
||
146 | return self.__trans.isOpen() |
||
147 | |||
148 | def open(self): |
||
149 | return self.__trans.open() |
||
150 | |||
151 | def close(self): |
||
152 | return self.__trans.close() |
||
153 | |||
154 | def read(self, sz): |
||
155 | ret = self.__rbuf.read(sz) |
||
156 | if len(ret) != 0: |
||
157 | return ret |
||
158 | |||
159 | self.__rbuf = BytesIO(self.__trans.read(max(sz, self.__rbuf_size))) |
||
160 | return self.__rbuf.read(sz) |
||
161 | |||
162 | def write(self, buf): |
||
163 | self.__wbuf.write(buf) |
||
164 | |||
165 | def flush(self): |
||
166 | out = self.__wbuf.getvalue() |
||
167 | # reset wbuf before write/flush to preserve state on underlying failure |
||
168 | self.__wbuf = BytesIO() |
||
169 | self.__trans.write(out) |
||
170 | self.__trans.flush() |
||
171 | |||
172 | # Implement the CReadableTransport interface. |
||
173 | @property |
||
174 | def cstringio_buf(self): |
||
175 | return self.__rbuf |
||
176 | |||
177 | def cstringio_refill(self, partialread, reqlen): |
||
178 | retstring = partialread |
||
179 | if reqlen < self.__rbuf_size: |
||
180 | # try to make a read of as much as we can. |
||
181 | retstring += self.__trans.read(self.__rbuf_size) |
||
182 | |||
183 | # but make sure we do read reqlen bytes. |
||
184 | if len(retstring) < reqlen: |
||
185 | retstring += self.__trans.readAll(reqlen - len(retstring)) |
||
186 | |||
187 | self.__rbuf = BytesIO(retstring) |
||
188 | return self.__rbuf |
||
189 | |||
190 | |||
191 | class TMemoryBuffer(TTransportBase, CReadableTransport): |
||
192 | """Wraps a cStringIO object as a TTransport. |
||
193 | |||
194 | NOTE: Unlike the C++ version of this class, you cannot write to it |
||
195 | then immediately read from it. If you want to read from a |
||
196 | TMemoryBuffer, you must either pass a string to the constructor. |
||
197 | TODO(dreiss): Make this work like the C++ version. |
||
198 | """ |
||
199 | |||
200 | def __init__(self, value=None): |
||
201 | """value -- a value to read from for stringio |
||
202 | |||
203 | If value is set, this will be a transport for reading, |
||
204 | otherwise, it is for writing""" |
||
205 | if value is not None: |
||
206 | self._buffer = BytesIO(value) |
||
207 | else: |
||
208 | self._buffer = BytesIO() |
||
209 | |||
210 | def isOpen(self): |
||
211 | return not self._buffer.closed |
||
212 | |||
213 | def open(self): |
||
214 | pass |
||
215 | |||
216 | def close(self): |
||
217 | self._buffer.close() |
||
218 | |||
219 | def read(self, sz): |
||
220 | return self._buffer.read(sz) |
||
221 | |||
222 | def write(self, buf): |
||
223 | try: |
||
224 | self._buffer.write(buf) |
||
225 | except TypeError: |
||
226 | self._buffer.write(buf.encode('cp437')) |
||
227 | |||
228 | def flush(self): |
||
229 | pass |
||
230 | |||
231 | def getvalue(self): |
||
232 | return self._buffer.getvalue() |
||
233 | |||
234 | # Implement the CReadableTransport interface. |
||
235 | @property |
||
236 | def cstringio_buf(self): |
||
237 | return self._buffer |
||
238 | |||
239 | def cstringio_refill(self, partialread, reqlen): |
||
240 | # only one shot at reading... |
||
241 | raise EOFError() |
||
242 | |||
243 | |||
244 | class TFramedTransportFactory: |
||
245 | """Factory transport that builds framed transports""" |
||
246 | |||
247 | def getTransport(self, trans): |
||
248 | framed = TFramedTransport(trans) |
||
249 | return framed |
||
250 | |||
251 | |||
252 | class TFramedTransport(TTransportBase, CReadableTransport): |
||
253 | """Class that wraps another transport and frames its I/O when writing.""" |
||
254 | |||
255 | def __init__(self, trans,): |
||
256 | self.__trans = trans |
||
257 | self.__rbuf = BytesIO() |
||
258 | self.__wbuf = BytesIO() |
||
259 | |||
260 | def isOpen(self): |
||
261 | return self.__trans.isOpen() |
||
262 | |||
263 | def open(self): |
||
264 | return self.__trans.open() |
||
265 | |||
266 | def close(self): |
||
267 | return self.__trans.close() |
||
268 | |||
269 | def read(self, sz): |
||
270 | ret = self.__rbuf.read(sz) |
||
271 | if len(ret) != 0: |
||
272 | return ret |
||
273 | |||
274 | self.readFrame() |
||
275 | return self.__rbuf.read(sz) |
||
276 | |||
277 | def readFrame(self): |
||
278 | buff = self.__trans.readAll(4) |
||
279 | sz, = unpack('!i', buff) |
||
280 | self.__rbuf = BytesIO(self.__trans.readAll(sz)) |
||
281 | |||
282 | def write(self, buf): |
||
283 | self.__wbuf.write(buf) |
||
284 | |||
285 | def flush(self): |
||
286 | wout = self.__wbuf.getvalue() |
||
287 | wsz = len(wout) |
||
288 | # reset wbuf before write/flush to preserve state on underlying failure |
||
289 | self.__wbuf = BytesIO() |
||
290 | # N.B.: Doing this string concatenation is WAY cheaper than making |
||
291 | # two separate calls to the underlying socket object. Socket writes in |
||
292 | # Python turn out to be REALLY expensive, but it seems to do a pretty |
||
293 | # good job of managing string buffer operations without excessive copies |
||
294 | buf = pack("!i", wsz) + wout |
||
295 | self.__trans.write(buf) |
||
296 | self.__trans.flush() |
||
297 | |||
298 | # Implement the CReadableTransport interface. |
||
299 | @property |
||
300 | def cstringio_buf(self): |
||
301 | return self.__rbuf |
||
302 | |||
303 | def cstringio_refill(self, prefix, reqlen): |
||
304 | # self.__rbuf will already be empty here because fastbinary doesn't |
||
305 | # ask for a refill until the previous buffer is empty. Therefore, |
||
306 | # we can start reading new frames immediately. |
||
307 | while len(prefix) < reqlen: |
||
308 | self.readFrame() |
||
309 | prefix += self.__rbuf.getvalue() |
||
310 | self.__rbuf = BytesIO(prefix) |
||
311 | return self.__rbuf |
||
312 | |||
313 | |||
314 | class TFileObjectTransport(TTransportBase): |
||
315 | """Wraps a file-like object to make it work as a Thrift transport.""" |
||
316 | |||
317 | def __init__(self, fileobj): |
||
318 | self.fileobj = fileobj |
||
319 | |||
320 | def isOpen(self): |
||
321 | return True |
||
322 | |||
323 | def close(self): |
||
324 | self.fileobj.close() |
||
325 | |||
326 | def read(self, sz): |
||
327 | return self.fileobj.read(sz) |
||
328 | |||
329 | def write(self, buf): |
||
330 | self.fileobj.write(buf) |
||
331 | |||
332 | def flush(self): |
||
333 | self.fileobj.flush() |
||
334 |