Completed
Push — master ( a3ac06...273580 )
by Olivier
07:20 queued 01:07
created

BinaryServer.__init__()   A

Complexity

Conditions 1

Size

Total Lines 9

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 9
CRAP Score 1

Importance

Changes 1
Bugs 0 Features 0
Metric Value
cc 1
c 1
b 0
f 0
dl 0
loc 9
ccs 9
cts 9
cp 1
crap 1
rs 9.6666
1
"""
2
Socket server forwarding request to internal server
3
"""
4 1
import logging
5 1
try:
6
    # we prefer to use bundles asyncio version, otherwise fallback to trollius
7 1
    import asyncio
8 1
except ImportError:
9 1
    import trollius as asyncio
10
11
12 1
from opcua import ua
13 1
from opcua.server.uaprocessor import UaProcessor
14
15 1
logger = logging.getLogger(__name__)
16
17
18 1
class BinaryServer(object):
19
20 1
    def __init__(self, internal_server, hostname, port):
21 1
        self.logger = logging.getLogger(__name__)
22 1
        self.hostname = hostname
23 1
        self.port = port
24 1
        self.iserver = internal_server
25 1
        self.loop = internal_server.loop
26 1
        self._server = None
27 1
        self._policies = []
28 1
        self.clients = []
29
30 1
    def set_policies(self, policies):
31 1
        self._policies = policies
32
33 1
    def start(self):
34
35 1
        class OPCUAProtocol(asyncio.Protocol):
36
37
            """
38
            instanciated for every connection
39
            defined as internal class since it needs access
40
            to the internal server object
41
            FIXME: find another solution
42
            """
43
44 1
            iserver = self.iserver
45 1
            loop = self.loop
46 1
            logger = self.logger
47 1
            policies = self._policies
48 1
            clients = self.clients
49
50 1
            def __str__(self):
51
                return "OPCUAProtocol({}, {})".format(self.peername, self.processor.session)
52 1
            __repr__ = __str__
53
54 1
            def connection_made(self, transport):
55 1
                self.peername = transport.get_extra_info('peername')
56 1
                self.logger.info('New connection from %s', self.peername)
57 1
                self.transport = transport
58 1
                self.processor = UaProcessor(self.iserver, self.transport)
59 1
                self.processor.set_policies(self.policies)
60 1
                self.data = b""
61 1
                self.iserver.asyncio_transports.append(transport)
62 1
                self.clients.append(self)
63
64 1
            def connection_lost(self, ex):
65 1
                self.logger.info('Lost connection from %s, %s', self.peername, ex)
66 1
                self.transport.close()
67 1
                self.iserver.asyncio_transports.remove(self.transport)
68 1
                self.processor.close()
69 1
                if self in self.clients:
70 1
                    self.clients.remove(self)
71
72 1
            def data_received(self, data):
73 1
                logger.debug("received %s bytes from socket", len(data))
74 1
                if self.data:
75
                    data = self.data + data
76
                    self.data = b""
77 1
                self._process_data(data)
78
79 1
            def _process_data(self, data):
80 1
                buf = ua.utils.Buffer(data)
81 1
                while True:
82 1
                    try:
83 1
                        backup_buf = buf.copy()
84 1
                        try:
85 1
                            hdr = ua.Header.from_string(buf)
86
                        except ua.utils.NotEnoughData:
87
                            logger.info("We did not receive enough data from client, waiting for more")
88
                            self.data = backup_buf.read(len(backup_buf))
89
                            return
90 1
                        if len(buf) < hdr.body_size:
91
                            logger.info("We did not receive enough data from client, waiting for more")
92
                            self.data = backup_buf.read(len(backup_buf))
93
                            return
94 1
                        ret = self.processor.process(hdr, buf)
95 1
                        if not ret:
96 1
                            logger.info("processor returned False, we close connection from %s", self.peername)
97 1
                            self.transport.close()
98 1
                            return
99 1
                        if len(buf) == 0:
100 1
                            return
101
                    except Exception:
102
                        logger.exception("Exception raised while parsing message from client, closing")
103
                        return
104
105 1
        coro = self.loop.create_server(OPCUAProtocol, self.hostname, self.port)
106 1
        self._server = self.loop.run_coro_and_wait(coro)
107
        # get the port and the hostname from the created server socket
108
        # only relevant for dynamic port asignment (when self.port == 0)
109 1
        if self.port == 0 and len(self._server.sockets) == 1:
110
            # will work for AF_INET and AF_INET6 socket names
111
            # these are to only families supported by the create_server call
112
            sockname = self._server.sockets[0].getsockname()
113
            self.hostname = sockname[0]
114
            self.port = sockname[1]
115 1
        print('Listening on {0}:{1}'.format(self.hostname, self.port))
116
117 1
    def stop(self):
118 1
        self.logger.info("Closing asyncio socket server")
119 1
        for transport in self.iserver.asyncio_transports:
120 1
            transport.close()
121 1
        if self._server:
122 1
            self.loop.call_soon(self._server.close)
123
            self.loop.run_coro_and_wait(self._server.wait_closed())
124