Completed
Pull Request — master (#406)
by Denis
03:42
created

BinaryServer.stop()   A

Complexity

Conditions 2

Size

Total Lines 6

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 6
CRAP Score 2

Importance

Changes 0
Metric Value
cc 2
c 0
b 0
f 0
dl 0
loc 6
ccs 6
cts 6
cp 1
crap 2
rs 9.4285
1
"""
2
Socket server forwarding request to internal server
3
"""
4 1
import logging
5 1
try:
6
    # we prefer to use bundles asyncio version, otherwise fallback to trollius
7 1
    import asyncio
8 1
except ImportError:
9 1
    import trollius as asyncio
10
11
12 1
from opcua import ua
13 1
from opcua.server.uaprocessor import UaProcessor
14
15 1
logger = logging.getLogger(__name__)
16
17
18 1
class BinaryServer(object):
19
20 1
    def __init__(self, internal_server, hostname, port):
21 1
        self.logger = logging.getLogger(__name__)
22 1
        self.hostname = hostname
23 1
        self.port = port
24 1
        self.iserver = internal_server
25 1
        self.loop = internal_server.loop
26 1
        self._server = None
27 1
        self._policies = []
28
29 1
    def set_policies(self, policies):
30 1
        self._policies = policies
31
32 1
    def start(self):
33
34 1
        class OPCUAProtocol(asyncio.Protocol):
35
36
            """
37
            instanciated for every connection
38
            defined as internal class since it needs access
39
            to the internal server object
40
            FIXME: find another solution
41
            """
42
43 1
            iserver = self.iserver
44 1
            loop = self.loop
45 1
            logger = self.logger
46 1
            policies = self._policies
47
48 1
            def connection_made(self, transport):
49 1
                self.peername = transport.get_extra_info('peername')
50 1
                self.logger.info('New connection from %s', self.peername)
51 1
                self.transport = transport
52 1
                self.processor = UaProcessor(self.iserver, self.transport)
53 1
                self.processor.set_policies(self.policies)
54 1
                self.data = b""
55 1
                self.iserver.asyncio_transports.append(transport)
56
57 1
            def connection_lost(self, ex):
58 1
                self.logger.info('Lost connection from %s, %s', self.peername, ex)
59 1
                self.transport.close()
60 1
                self.iserver.asyncio_transports.remove(self.transport)
61 1
                self.processor.close()
62
63 1
            def data_received(self, data):
64 1
                logger.debug("received %s bytes from socket", len(data))
65 1
                if self.data:
66
                    data = self.data + data
67
                    self.data = b""
68 1
                self._process_data(data)
69
70 1
            def _process_data(self, data):
71 1
                buf = ua.utils.Buffer(data)
72 1
                while True:
73 1
                    try:
74 1
                        backup_buf = buf.copy()
75 1
                        try:
76 1
                            hdr = ua.Header.from_string(buf)
77
                        except ua.utils.NotEnoughData:
78
                            logger.info("We did not receive enough data from client, waiting for more")
79
                            self.data = backup_buf.read(len(backup_buf))
80
                            return
81 1
                        if len(buf) < hdr.body_size:
82
                            logger.info("We did not receive enough data from client, waiting for more")
83
                            self.data = backup_buf.read(len(backup_buf))
84
                            return
85 1
                        ret = self.processor.process(hdr, buf)
86 1
                        if not ret:
87 1
                            logger.info("processor returned False, we close connection from %s", self.peername)
88 1
                            self.transport.close()
89 1
                            return
90 1
                        if len(buf) == 0:
91 1
                            return
92
                    except Exception:
93
                        logger.exception("Exception raised while parsing message from client, closing")
94
                        return
95
96 1
        coro = self.loop.create_server(OPCUAProtocol, self.hostname, self.port)
97 1
        self._server = self.loop.run_coro_and_wait(coro)
98
        # get the port and the hostname from the created server socket
99
        # only relevant for dynamic port asignment (when self.port == 0)
100 1
        if self.port == 0 and len(self._server.sockets) == 1:
101
            # will work for AF_INET and AF_INET6 socket names
102
            # these are to only families supported by the create_server call
103
            sockname = self._server.sockets[0].getsockname()
104
            self.hostname = sockname[0]
105
            self.port = sockname[1]
106 1
        print('Listening on {0}:{1}'.format(self.hostname, self.port))
107
108 1
    def stop(self):
109 1
        self.logger.info("Closing asyncio socket server")
110 1
        for transport in self.iserver.asyncio_transports:
111 1
            transport.close()
112 1
        self.loop.call_soon(self._server.close)
113
        self.loop.run_coro_and_wait(self._server.wait_closed())
114