Completed
Push — master ( 941704...f167fe )
by Roy
13s
created

TestXMLRPCServer.test_1()   A

Complexity

Conditions 1

Size

Total Lines 2

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
cc 1
dl 0
loc 2
rs 10
c 0
b 0
f 0
1
#   Copyright (c) 2006-2007 Open Source Applications Foundation
2
#
3
#   Licensed under the Apache License, Version 2.0 (the "License");
4
#   you may not use this file except in compliance with the License.
5
#   You may obtain a copy of the License at
6
#
7
#       http://www.apache.org/licenses/LICENSE-2.0
8
#
9
#   Unless required by applicable law or agreed to in writing, software
10
#   distributed under the License is distributed on an "AS IS" BASIS,
11
#   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12
#   See the License for the specific language governing permissions and
13
#   limitations under the License.
14
#
15
#   Origin: https://code.google.com/p/wsgi-xmlrpc/
16
17
import unittest2 as unittest
18
import tornado.wsgi
19
import tornado.ioloop
20
import tornado.httpserver
21
from pyspider.libs import utils
22
23
class TestXMLRPCServer(unittest.TestCase):
24
    @classmethod
25
    def setUpClass(self):
26
        from pyspider.libs import wsgi_xmlrpc
27
        
28
        def test_1():
29
            return 'test_1'
30
            
31
        class Test2(object):
32
            def test_3(self, obj):
33
                return obj
34
                
35
        test = Test2()
36
        
37
        application = wsgi_xmlrpc.WSGIXMLRPCApplication()
38
        application.register_instance(Test2())
39
        application.register_function(test_1)
40
41
        container = tornado.wsgi.WSGIContainer(application)
42
        self.io_loop = tornado.ioloop.IOLoop.current()
43
        http_server = tornado.httpserver.HTTPServer(container, io_loop=self.io_loop)
44
        http_server.listen(3423)
45
        self.thread = utils.run_in_thread(self.io_loop.start)
46
47
    @classmethod
48
    def tearDownClass(self):
49
        self.io_loop.add_callback(self.io_loop.stop)
50
        self.thread.join()
51
    
52
    def test_xmlrpc_server(self, uri='http://127.0.0.1:3423'):
53
        from six.moves.xmlrpc_client import ServerProxy
54
        
55
        client = ServerProxy(uri)
56
        
57
        assert client.test_1() == 'test_1'
58
        assert client.test_3({'asdf':4}) == {'asdf':4}
59