Completed
Branch master (477316)
by Michael
08:56
created

thrift.server.TProcessPoolServer   A

Complexity

Total Complexity 20

Size/Duplication

Total Lines 120
Duplicated Lines 75.83 %

Importance

Changes 0
Metric Value
wmc 20
eloc 71
dl 91
loc 120
rs 10
c 0
b 0
f 0

How to fix   Duplicated Code   

Duplicated Code

Duplicate code is one of the most pungent code smells. A rule that is often used is to re-structure code once it is duplicated in three or more places.

Common duplication problems, and corresponding solutions are:

1
#
2
# Licensed to the Apache Software Foundation (ASF) under one
3
# or more contributor license agreements. See the NOTICE file
4
# distributed with this work for additional information
5
# regarding copyright ownership. The ASF licenses this file
6
# to you under the Apache License, Version 2.0 (the
7
# "License"); you may not use this file except in compliance
8
# with the License. You may obtain a copy of the License at
9
#
10
#   http://www.apache.org/licenses/LICENSE-2.0
11
#
12
# Unless required by applicable law or agreed to in writing,
13
# software distributed under the License is distributed on an
14
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15
# KIND, either express or implied. See the License for the
16
# specific language governing permissions and limitations
17
# under the License.
18
#
19
20
21
import logging
22
from multiprocessing import  Process, Value, Condition, reduction
23
24
from .TServer import TServer
25
from thrift.transport.TTransport import TTransportException
26
import collections
27
28
29
class TProcessPoolServer(TServer):
30
    """Server with a fixed size pool of worker subprocesses to service requests
31
32
    Note that if you need shared state between the handlers - it's up to you!
33
    Written by Dvir Volk, doat.com
34
    """
35
    def __init__(self, *args):
36
        TServer.__init__(self, *args)
37
        self.numWorkers = 10
38
        self.workers = []
39
        self.isRunning = Value('b', False)
40
        self.stopCondition = Condition()
41
        self.postForkCallback = None
42
43
    def setPostForkCallback(self, callback):
44
        if not isinstance(callback, collections.Callable):
45
            raise TypeError("This is not a callback!")
46
        self.postForkCallback = callback
47
48
    def setNumWorkers(self, num):
49
        """Set the number of worker threads that should be created"""
50
        self.numWorkers = num
51
52
    def workerProcess(self):
53
        """Loop getting clients from the shared queue and process them"""
54
        if self.postForkCallback:
55
            self.postForkCallback()
56
57
        while self.isRunning.value:
58
            try:
59
                client = self.serverTransport.accept()
60
                self.serveClient(client)
61
            except (KeyboardInterrupt, SystemExit):
62
                return 0
63
            except Exception as x:
64
                logging.exception(x)
65
66
    def serveClient(self, client):
67
        """Process input/output from a client for as long as possible"""
68
        itrans = self.inputTransportFactory.getTransport(client)
69
        otrans = self.outputTransportFactory.getTransport(client)
70
        iprot = self.inputProtocolFactory.getProtocol(itrans)
71
        oprot = self.outputProtocolFactory.getProtocol(otrans)
72
73
        try:
74
            while True:
75
                self.processor.process(iprot, oprot)
76
        except TTransportException as tx:
77
            pass
78
        except Exception as x:
79
            logging.exception(x)
80
81
        itrans.close()
82
        otrans.close()
83
84
    def serve(self):
85
        """Start workers and put into queue"""
86
        # this is a shared state that can tell the workers to exit when False
87
        self.isRunning.value = True
88
89
        # first bind and listen to the port
90
        self.serverTransport.listen()
91
92
        # fork the children
93
        for i in range(self.numWorkers):
94
            try:
95
                w = Process(target=self.workerProcess)
96
                w.daemon = True
97
                w.start()
98
                self.workers.append(w)
99
            except Exception as x:
100
                logging.exception(x)
101
102
        # wait until the condition is set by stop()
103
        while True:
104
            self.stopCondition.acquire()
105
            try:
106
                self.stopCondition.wait()
107
                break
108
            except (SystemExit, KeyboardInterrupt):
109
                break
110
            except Exception as x:
111
                logging.exception(x)
112
113
        self.isRunning.value = False
114
115
    def stop(self):
116
        self.isRunning.value = False
117
        self.stopCondition.acquire()
118
        self.stopCondition.notify()
119
        self.stopCondition.release()
120