1
|
|
|
# |
2
|
|
|
# Licensed to the Apache Software Foundation (ASF) under one |
3
|
|
|
# or more contributor license agreements. See the NOTICE file |
4
|
|
|
# distributed with this work for additional information |
5
|
|
|
# regarding copyright ownership. The ASF licenses this file |
6
|
|
|
# to you under the Apache License, Version 2.0 (the |
7
|
|
|
# "License"); you may not use this file except in compliance |
8
|
|
|
# with the License. You may obtain a copy of the License at |
9
|
|
|
# |
10
|
|
|
# http://www.apache.org/licenses/LICENSE-2.0 |
11
|
|
|
# |
12
|
|
|
# Unless required by applicable law or agreed to in writing, |
13
|
|
|
# software distributed under the License is distributed on an |
14
|
|
|
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
15
|
|
|
# KIND, either express or implied. See the License for the |
16
|
|
|
# specific language governing permissions and limitations |
17
|
|
|
# under the License. |
18
|
|
|
# |
19
|
|
|
|
20
|
|
|
from io import StringIO |
21
|
|
|
|
22
|
|
|
from zope.interface import implements, Interface, Attribute |
23
|
|
|
from twisted.internet.protocol import Protocol, ServerFactory, ClientFactory, \ |
24
|
|
|
connectionDone |
25
|
|
|
from twisted.internet import defer |
26
|
|
|
from twisted.protocols import basic |
27
|
|
|
from twisted.python import log |
28
|
|
|
from twisted.web import server, resource, http |
29
|
|
|
|
30
|
|
|
from thrift.transport import TTransport |
31
|
|
|
|
32
|
|
|
|
33
|
|
|
class TMessageSenderTransport(TTransport.TTransportBase): |
34
|
|
|
|
35
|
|
|
def __init__(self): |
36
|
|
|
self.__wbuf = StringIO() |
37
|
|
|
|
38
|
|
|
def write(self, buf): |
39
|
|
|
self.__wbuf.write(buf) |
40
|
|
|
|
41
|
|
|
def flush(self): |
42
|
|
|
msg = self.__wbuf.getvalue() |
43
|
|
|
self.__wbuf = StringIO() |
44
|
|
|
self.sendMessage(msg) |
45
|
|
|
|
46
|
|
|
def sendMessage(self, message): |
47
|
|
|
raise NotImplementedError |
48
|
|
|
|
49
|
|
|
|
50
|
|
|
class TCallbackTransport(TMessageSenderTransport): |
51
|
|
|
|
52
|
|
|
def __init__(self, func): |
53
|
|
|
TMessageSenderTransport.__init__(self) |
54
|
|
|
self.func = func |
55
|
|
|
|
56
|
|
|
def sendMessage(self, message): |
57
|
|
|
self.func(message) |
58
|
|
|
|
59
|
|
|
|
60
|
|
|
class ThriftClientProtocol(basic.Int32StringReceiver): |
61
|
|
|
|
62
|
|
|
MAX_LENGTH = 2 ** 31 - 1 |
63
|
|
|
|
64
|
|
|
def __init__(self, client_class, iprot_factory, oprot_factory=None): |
65
|
|
|
self._client_class = client_class |
66
|
|
|
self._iprot_factory = iprot_factory |
67
|
|
|
if oprot_factory is None: |
68
|
|
|
self._oprot_factory = iprot_factory |
69
|
|
|
else: |
70
|
|
|
self._oprot_factory = oprot_factory |
71
|
|
|
|
72
|
|
|
self.recv_map = {} |
73
|
|
|
self.started = defer.Deferred() |
74
|
|
|
|
75
|
|
|
def dispatch(self, msg): |
76
|
|
|
self.sendString(msg) |
77
|
|
|
|
78
|
|
|
def connectionMade(self): |
79
|
|
|
tmo = TCallbackTransport(self.dispatch) |
80
|
|
|
self.client = self._client_class(tmo, self._oprot_factory) |
81
|
|
|
self.started.callback(self.client) |
82
|
|
|
|
83
|
|
|
def connectionLost(self, reason=connectionDone): |
84
|
|
|
for k, v in self.client._reqs.items(): |
85
|
|
|
tex = TTransport.TTransportException( |
86
|
|
|
type=TTransport.TTransportException.END_OF_FILE, |
87
|
|
|
message='Connection closed') |
88
|
|
|
v.errback(tex) |
89
|
|
|
|
90
|
|
|
def stringReceived(self, frame): |
91
|
|
|
tr = TTransport.TMemoryBuffer(frame) |
92
|
|
|
iprot = self._iprot_factory.getProtocol(tr) |
93
|
|
|
(fname, mtype, rseqid) = iprot.readMessageBegin() |
94
|
|
|
|
95
|
|
|
try: |
96
|
|
|
method = self.recv_map[fname] |
97
|
|
|
except KeyError: |
98
|
|
|
method = getattr(self.client, 'recv_' + fname) |
99
|
|
|
self.recv_map[fname] = method |
100
|
|
|
|
101
|
|
|
method(iprot, mtype, rseqid) |
102
|
|
|
|
103
|
|
|
|
104
|
|
|
class ThriftServerProtocol(basic.Int32StringReceiver): |
105
|
|
|
|
106
|
|
|
MAX_LENGTH = 2 ** 31 - 1 |
107
|
|
|
|
108
|
|
|
def dispatch(self, msg): |
109
|
|
|
self.sendString(msg) |
110
|
|
|
|
111
|
|
|
def processError(self, error): |
112
|
|
|
self.transport.loseConnection() |
113
|
|
|
|
114
|
|
|
def processOk(self, _, tmo): |
115
|
|
|
msg = tmo.getvalue() |
116
|
|
|
|
117
|
|
|
if len(msg) > 0: |
118
|
|
|
self.dispatch(msg) |
119
|
|
|
|
120
|
|
|
def stringReceived(self, frame): |
121
|
|
|
tmi = TTransport.TMemoryBuffer(frame) |
122
|
|
|
tmo = TTransport.TMemoryBuffer() |
123
|
|
|
|
124
|
|
|
iprot = self.factory.iprot_factory.getProtocol(tmi) |
125
|
|
|
oprot = self.factory.oprot_factory.getProtocol(tmo) |
126
|
|
|
|
127
|
|
|
d = self.factory.processor.process(iprot, oprot) |
128
|
|
|
d.addCallbacks(self.processOk, self.processError, |
129
|
|
|
callbackArgs=(tmo,)) |
130
|
|
|
|
131
|
|
|
|
132
|
|
|
class IThriftServerFactory(Interface): |
133
|
|
|
|
134
|
|
|
processor = Attribute("Thrift processor") |
135
|
|
|
|
136
|
|
|
iprot_factory = Attribute("Input protocol factory") |
137
|
|
|
|
138
|
|
|
oprot_factory = Attribute("Output protocol factory") |
139
|
|
|
|
140
|
|
|
|
141
|
|
|
class IThriftClientFactory(Interface): |
142
|
|
|
|
143
|
|
|
client_class = Attribute("Thrift client class") |
144
|
|
|
|
145
|
|
|
iprot_factory = Attribute("Input protocol factory") |
146
|
|
|
|
147
|
|
|
oprot_factory = Attribute("Output protocol factory") |
148
|
|
|
|
149
|
|
|
|
150
|
|
|
class ThriftServerFactory(ServerFactory): |
151
|
|
|
|
152
|
|
|
implements(IThriftServerFactory) |
153
|
|
|
|
154
|
|
|
protocol = ThriftServerProtocol |
155
|
|
|
|
156
|
|
|
def __init__(self, processor, iprot_factory, oprot_factory=None): |
157
|
|
|
self.processor = processor |
158
|
|
|
self.iprot_factory = iprot_factory |
159
|
|
|
if oprot_factory is None: |
160
|
|
|
self.oprot_factory = iprot_factory |
161
|
|
|
else: |
162
|
|
|
self.oprot_factory = oprot_factory |
163
|
|
|
|
164
|
|
|
|
165
|
|
|
class ThriftClientFactory(ClientFactory): |
166
|
|
|
|
167
|
|
|
implements(IThriftClientFactory) |
168
|
|
|
|
169
|
|
|
protocol = ThriftClientProtocol |
170
|
|
|
|
171
|
|
|
def __init__(self, client_class, iprot_factory, oprot_factory=None): |
172
|
|
|
self.client_class = client_class |
173
|
|
|
self.iprot_factory = iprot_factory |
174
|
|
|
if oprot_factory is None: |
175
|
|
|
self.oprot_factory = iprot_factory |
176
|
|
|
else: |
177
|
|
|
self.oprot_factory = oprot_factory |
178
|
|
|
|
179
|
|
|
def buildProtocol(self, addr): |
180
|
|
|
p = self.protocol(self.client_class, self.iprot_factory, |
181
|
|
|
self.oprot_factory) |
182
|
|
|
p.factory = self |
183
|
|
|
return p |
184
|
|
|
|
185
|
|
|
|
186
|
|
|
class ThriftResource(resource.Resource): |
187
|
|
|
|
188
|
|
|
allowedMethods = ('POST',) |
189
|
|
|
|
190
|
|
|
def __init__(self, processor, inputProtocolFactory, |
191
|
|
|
outputProtocolFactory=None): |
192
|
|
|
resource.Resource.__init__(self) |
193
|
|
|
self.inputProtocolFactory = inputProtocolFactory |
194
|
|
|
if outputProtocolFactory is None: |
195
|
|
|
self.outputProtocolFactory = inputProtocolFactory |
196
|
|
|
else: |
197
|
|
|
self.outputProtocolFactory = outputProtocolFactory |
198
|
|
|
self.processor = processor |
199
|
|
|
|
200
|
|
|
def getChild(self, path, request): |
201
|
|
|
return self |
202
|
|
|
|
203
|
|
|
def _cbProcess(self, _, request, tmo): |
204
|
|
|
msg = tmo.getvalue() |
205
|
|
|
request.setResponseCode(http.OK) |
206
|
|
|
request.setHeader("content-type", "application/x-thrift") |
207
|
|
|
request.write(msg) |
208
|
|
|
request.finish() |
209
|
|
|
|
210
|
|
|
def render_POST(self, request): |
211
|
|
|
request.content.seek(0, 0) |
212
|
|
|
data = request.content.read() |
213
|
|
|
tmi = TTransport.TMemoryBuffer(data) |
214
|
|
|
tmo = TTransport.TMemoryBuffer() |
215
|
|
|
|
216
|
|
|
iprot = self.inputProtocolFactory.getProtocol(tmi) |
217
|
|
|
oprot = self.outputProtocolFactory.getProtocol(tmo) |
218
|
|
|
|
219
|
|
|
d = self.processor.process(iprot, oprot) |
220
|
|
|
d.addCallback(self._cbProcess, request, tmo) |
221
|
|
|
return server.NOT_DONE_YET |
222
|
|
|
|