Completed
Branch master (477316)
by Michael
08:56
created

thrift.server.TServer   B

Complexity

Total Complexity 49

Size/Duplication

Total Lines 270
Duplicated Lines 85.19 %

Importance

Changes 0
Metric Value
wmc 49
eloc 179
dl 230
loc 270
rs 8.48
c 0
b 0
f 0

How to fix   Duplicated Code    Complexity   

Duplicated Code

Duplicate code is one of the most pungent code smells. A rule that is often used is to re-structure code once it is duplicated in three or more places.

Common duplication problems, and corresponding solutions are:

Complexity

 Tip:   Before tackling complexity, make sure that you eliminate any duplication first. This often can reduce the size of classes significantly.

Complex classes like thrift.server.TServer often do a lot of different things. To break such a class down, we need to identify a cohesive component within that class. A common approach to find such a component is to look for fields/methods that share the same prefixes, or suffixes.

Once you have determined the fields that belong together, you can apply the Extract Class refactoring. If the component makes sense as a sub-class, Extract Subclass is also a candidate, and is often faster.

1
#
2
# Licensed to the Apache Software Foundation (ASF) under one
3
# or more contributor license agreements. See the NOTICE file
4
# distributed with this work for additional information
5
# regarding copyright ownership. The ASF licenses this file
6
# to you under the Apache License, Version 2.0 (the
7
# "License"); you may not use this file except in compliance
8
# with the License. You may obtain a copy of the License at
9
#
10
#   http://www.apache.org/licenses/LICENSE-2.0
11
#
12
# Unless required by applicable law or agreed to in writing,
13
# software distributed under the License is distributed on an
14
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15
# KIND, either express or implied. See the License for the
16
# specific language governing permissions and limitations
17
# under the License.
18
#
19
20
import queue
21
import logging
22
import os
23
import sys
24
import threading
25
import traceback
26
27
from thrift.Thrift import TProcessor
28
from thrift.protocol import TBinaryProtocol
29
from thrift.transport import TTransport
30
31
32
class TServer:
33
  """Base interface for a server, which must have a serve() method.
34
35
  Three constructors for all servers:
36
  1) (processor, serverTransport)
37
  2) (processor, serverTransport, transportFactory, protocolFactory)
38
  3) (processor, serverTransport,
39
      inputTransportFactory, outputTransportFactory,
40
      inputProtocolFactory, outputProtocolFactory)
41
  """
42
  def __init__(self, *args):
43
    if (len(args) == 2):
44
      self.__initArgs__(args[0], args[1],
45
                        TTransport.TTransportFactoryBase(),
46
                        TTransport.TTransportFactoryBase(),
47
                        TBinaryProtocol.TBinaryProtocolFactory(),
48
                        TBinaryProtocol.TBinaryProtocolFactory())
49
    elif (len(args) == 4):
50
      self.__initArgs__(args[0], args[1], args[2], args[2], args[3], args[3])
51
    elif (len(args) == 6):
52
      self.__initArgs__(args[0], args[1], args[2], args[3], args[4], args[5])
53
54
  def __initArgs__(self, processor, serverTransport,
55
                   inputTransportFactory, outputTransportFactory,
56
                   inputProtocolFactory, outputProtocolFactory):
57
    self.processor = processor
58
    self.serverTransport = serverTransport
59
    self.inputTransportFactory = inputTransportFactory
60
    self.outputTransportFactory = outputTransportFactory
61
    self.inputProtocolFactory = inputProtocolFactory
62
    self.outputProtocolFactory = outputProtocolFactory
63
64
  def serve(self):
65
    pass
66
67
68
class TSimpleServer(TServer):
69
  """Simple single-threaded server that just pumps around one transport."""
70
71
  def __init__(self, *args):
72
    TServer.__init__(self, *args)
73
74
  def serve(self):
75
    self.serverTransport.listen()
76
    while True:
77
      client = self.serverTransport.accept()
78
      itrans = self.inputTransportFactory.getTransport(client)
79
      otrans = self.outputTransportFactory.getTransport(client)
80
      iprot = self.inputProtocolFactory.getProtocol(itrans)
81
      oprot = self.outputProtocolFactory.getProtocol(otrans)
82
      try:
83
        while True:
84
          self.processor.process(iprot, oprot)
85
      except TTransport.TTransportException as tx:
86
        pass
87
      except Exception as x:
88
        logging.exception(x)
89
90
      itrans.close()
91
      otrans.close()
92
93
94
class TThreadedServer(TServer):
95
  """Threaded server that spawns a new thread per each connection."""
96
97
  def __init__(self, *args, **kwargs):
98
    TServer.__init__(self, *args)
99
    self.daemon = kwargs.get("daemon", False)
100
101
  def serve(self):
102
    self.serverTransport.listen()
103
    while True:
104
      try:
105
        client = self.serverTransport.accept()
106
        t = threading.Thread(target=self.handle, args=(client,))
107
        t.setDaemon(self.daemon)
108
        t.start()
109
      except KeyboardInterrupt:
110
        raise
111
      except Exception as x:
112
        logging.exception(x)
113
114
  def handle(self, client):
115
    itrans = self.inputTransportFactory.getTransport(client)
116
    otrans = self.outputTransportFactory.getTransport(client)
117
    iprot = self.inputProtocolFactory.getProtocol(itrans)
118
    oprot = self.outputProtocolFactory.getProtocol(otrans)
119
    try:
120
      while True:
121
        self.processor.process(iprot, oprot)
122
    except TTransport.TTransportException as tx:
123
      pass
124
    except Exception as x:
125
      logging.exception(x)
126
127
    itrans.close()
128
    otrans.close()
129
130
131
class TThreadPoolServer(TServer):
132
  """Server with a fixed size pool of threads which service requests."""
133
134
  def __init__(self, *args, **kwargs):
135
    TServer.__init__(self, *args)
136
    self.clients = queue.Queue()
137
    self.threads = 10
138
    self.daemon = kwargs.get("daemon", False)
139
140
  def setNumThreads(self, num):
141
    """Set the number of worker threads that should be created"""
142
    self.threads = num
143
144
  def serveThread(self):
145
    """Loop around getting clients from the shared queue and process them."""
146
    while True:
147
      try:
148
        client = self.clients.get()
149
        self.serveClient(client)
150
      except Exception as x:
151
        logging.exception(x)
152
153
  def serveClient(self, client):
154
    """Process input/output from a client for as long as possible"""
155
    itrans = self.inputTransportFactory.getTransport(client)
156
    otrans = self.outputTransportFactory.getTransport(client)
157
    iprot = self.inputProtocolFactory.getProtocol(itrans)
158
    oprot = self.outputProtocolFactory.getProtocol(otrans)
159
    try:
160
      while True:
161
        self.processor.process(iprot, oprot)
162
    except TTransport.TTransportException as tx:
163
      pass
164
    except Exception as x:
165
      logging.exception(x)
166
167
    itrans.close()
168
    otrans.close()
169
170
  def serve(self):
171
    """Start a fixed number of worker threads and put client into a queue"""
172
    for i in range(self.threads):
173
      try:
174
        t = threading.Thread(target=self.serveThread)
175
        t.setDaemon(self.daemon)
176
        t.start()
177
      except Exception as x:
178
        logging.exception(x)
179
180
    # Pump the socket for clients
181
    self.serverTransport.listen()
182
    while True:
183
      try:
184
        client = self.serverTransport.accept()
185
        self.clients.put(client)
186
      except Exception as x:
187
        logging.exception(x)
188
189
190
class TForkingServer(TServer):
191
  """A Thrift server that forks a new process for each request
192
193
  This is more scalable than the threaded server as it does not cause
194
  GIL contention.
195
196
  Note that this has different semantics from the threading server.
197
  Specifically, updates to shared variables will no longer be shared.
198
  It will also not work on windows.
199
200
  This code is heavily inspired by SocketServer.ForkingMixIn in the
201
  Python stdlib.
202
  """
203
  def __init__(self, *args):
204
    TServer.__init__(self, *args)
205
    self.children = []
206
207
  def serve(self):
208
    def try_close(file):
209
      try:
210
        file.close()
211
      except IOError as e:
212
        logging.warning(e, exc_info=True)
213
214
    self.serverTransport.listen()
215
    while True:
216
      client = self.serverTransport.accept()
217
      try:
218
        pid = os.fork()
219
220
        if pid:  # parent
221
          # add before collect, otherwise you race w/ waitpid
222
          self.children.append(pid)
223
          self.collect_children()
224
225
          # Parent must close socket or the connection may not get
226
          # closed promptly
227
          itrans = self.inputTransportFactory.getTransport(client)
228
          otrans = self.outputTransportFactory.getTransport(client)
229
          try_close(itrans)
230
          try_close(otrans)
231
        else:
232
          itrans = self.inputTransportFactory.getTransport(client)
233
          otrans = self.outputTransportFactory.getTransport(client)
234
235
          iprot = self.inputProtocolFactory.getProtocol(itrans)
236
          oprot = self.outputProtocolFactory.getProtocol(otrans)
237
238
          ecode = 0
239
          try:
240
            try:
241
              while True:
242
                self.processor.process(iprot, oprot)
243
            except TTransport.TTransportException as tx:
244
              pass
245
            except Exception as e:
246
              logging.exception(e)
247
              ecode = 1
248
          finally:
249
            try_close(itrans)
250
            try_close(otrans)
251
252
          os._exit(ecode)
253
254
      except TTransport.TTransportException as tx:
255
        pass
256
      except Exception as x:
257
        logging.exception(x)
258
259
  def collect_children(self):
260
    while self.children:
261
      try:
262
        pid, status = os.waitpid(0, os.WNOHANG)
263
      except os.error:
264
        pid = None
265
266
      if pid:
267
        self.children.remove(pid)
268
      else:
269
        break
270