Completed
Pull Request — master (#509)
by
unknown
03:15
created

OPCUAProtocol.connection_lost()   A

Complexity

Conditions 2

Size

Total Lines 7

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 0
CRAP Score 6

Importance

Changes 1
Bugs 0 Features 0
Metric Value
cc 2
c 1
b 0
f 0
dl 0
loc 7
ccs 0
cts 7
cp 0
crap 6
rs 9.4285
1
"""
2
Socket server forwarding request to internal server
3
"""
4 1
import logging
5 1
try:
6
    # we prefer to use bundles asyncio version, otherwise fallback to trollius
7 1
    import asyncio
8 1
except ImportError:
9 1
    import trollius as asyncio
10
11
12 1
from opcua import ua
13 1
import opcua.ua.ua_binary as uabin
14 1
from opcua.server.uaprocessor import UaProcessor
15
16 1
logger = logging.getLogger(__name__)
17
18
19 1
class BinaryServer(object):
20
21 1
    def __init__(self, internal_server, hostname, port):
22
        self.logger = logging.getLogger(__name__)
23
        self.hostname = hostname
24
        self.port = port
25
        self.iserver = internal_server
26
        self.loop = internal_server.loop
27
        self._server = None
28
        self._policies = []
29
        self.clients = []
30
31 1
    def set_policies(self, policies):
32
        self._policies = policies
33
34 1
    def start(self):
35
36
        class OPCUAProtocol(asyncio.Protocol):
37
38
            """
39
            instanciated for every connection
40
            defined as internal class since it needs access
41
            to the internal server object
42
            FIXME: find another solution
43
            """
44
45
            iserver = self.iserver
46
            loop = self.loop
47
            logger = self.logger
48
            policies = self._policies
49
            clients = self.clients
50
51
            def __str__(self):
52
                return "OPCUAProtocol({}, {})".format(self.peername, self.processor.session)
53
            __repr__ = __str__
54
55
            def connection_made(self, transport):
56
                self.peername = transport.get_extra_info('peername')
57
                self.logger.info('New connection from %s', self.peername)
58
                self.transport = transport
59
                self.processor = UaProcessor(self.iserver, self.transport)
60
                self.processor.set_policies(self.policies)
61
                self.data = b""
62
                self.iserver.asyncio_transports.append(transport)
63
                self.clients.append(self)
64
65
            def connection_lost(self, ex):
66
                self.logger.info('Lost connection from %s, %s', self.peername, ex)
67
                self.transport.close()
68
                self.iserver.asyncio_transports.remove(self.transport)
69
                self.processor.close()
70
                if self in self.clients:
71
                    self.clients.remove(self)
72
73
            def data_received(self, data):
74
                logger.debug("received %s bytes from socket", len(data))
75
                if self.data:
76
                    data = self.data + data
77
                    self.data = b""
78
                self._process_data(data)
79
80
            def _process_data(self, data):
81
                buf = ua.utils.Buffer(data)
82
                while True:
83
                    try:
84
                        backup_buf = buf.copy()
85
                        try:
86
                            hdr = uabin.header_from_binary(buf)
87
                        except ua.utils.NotEnoughData:
88
                            logger.info("We did not receive enough data from client, waiting for more")
89
                            self.data = backup_buf.read(len(backup_buf))
90
                            return
91
                        if len(buf) < hdr.body_size:
92
                            logger.info("We did not receive enough data from client, waiting for more")
93
                            self.data = backup_buf.read(len(backup_buf))
94
                            return
95
                        ret = self.processor.process(hdr, buf)
96
                        if not ret:
97
                            logger.info("processor returned False, we close connection from %s", self.peername)
98
                            self.transport.close()
99
                            return
100
                        if len(buf) == 0:
101
                            return
102
                    except Exception:
103
                        logger.exception("Exception raised while parsing message from client, closing")
104
                        return
105
106
        coro = self.loop.create_server(OPCUAProtocol, self.hostname, self.port)
107
        self._server = self.loop.run_coro_and_wait(coro)
108
        # get the port and the hostname from the created server socket
109
        # only relevant for dynamic port asignment (when self.port == 0)
110
        if self.port == 0 and len(self._server.sockets) == 1:
111
            # will work for AF_INET and AF_INET6 socket names
112
            # these are to only families supported by the create_server call
113
            sockname = self._server.sockets[0].getsockname()
114
            self.hostname = sockname[0]
115
            self.port = sockname[1]
116
        print('Listening on {0}:{1}'.format(self.hostname, self.port))
117
118 1
    def stop(self):
119
        self.logger.info("Closing asyncio socket server")
120
        for transport in self.iserver.asyncio_transports:
121
            transport.close()
122
        if self._server:
123
            self.loop.call_soon(self._server.close)
124
            self.loop.run_coro_and_wait(self._server.wait_closed())
125