Passed
Push — master ( a120eb...a5037a )
by Olivier
03:30
created

BinaryServer.start()   A

Complexity

Conditions 3

Size

Total Lines 21

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 7
CRAP Score 3.243

Importance

Changes 2
Bugs 0 Features 0
Metric Value
cc 3
c 2
b 0
f 0
dl 0
loc 21
ccs 7
cts 10
cp 0.7
crap 3.243
rs 9.3142
1
"""
2
Socket server forwarding request to internal server
3
"""
4 1
import logging
5 1
try:
6
    # we prefer to use bundles asyncio version, otherwise fallback to trollius
7 1
    import asyncio
8 1
except ImportError:
9 1
    import trollius as asyncio
10
11
12 1
from opcua import ua
13 1
import opcua.ua.ua_binary as uabin
14 1
from opcua.server.uaprocessor import UaProcessor
15
16 1
logger = logging.getLogger(__name__)
17
18
19
20 1
class OPCUAProtocol(asyncio.Protocol):
21
22
    """Interface for OPCUA protocol.
23
    """
24
25 1
    iserver = None
26 1
    loop = None
27 1
    logger = None
28 1
    policies = None
29 1
    clients = None
30
31 1
    def __str__(self):
32
        return "OPCUAProtocol({}, {})".format(self.peername, self.processor.session)
33 1
    __repr__ = __str__
34
35 1
    def connection_made(self, transport):
36 1
        self.peername = transport.get_extra_info('peername')
37 1
        self.logger.info('New connection from %s', self.peername)
38 1
        self.transport = transport
39 1
        self.processor = UaProcessor(self.iserver, self.transport)
40 1
        self.processor.set_policies(self.policies)
41 1
        self.data = b""
42 1
        self.iserver.asyncio_transports.append(transport)
43 1
        self.clients.append(self)
44
45 1
    def connection_lost(self, ex):
46 1
        self.logger.info('Lost connection from %s, %s', self.peername, ex)
47 1
        self.transport.close()
48 1
        self.iserver.asyncio_transports.remove(self.transport)
49 1
        self.processor.close()
50 1
        if self in self.clients:
51 1
            self.clients.remove(self)
52
53 1
    def data_received(self, data):
54 1
        logger.debug("received %s bytes from socket", len(data))
55 1
        if self.data:
56
            data = self.data + data
57
            self.data = b""
58 1
        self._process_data(data)
59
60 1
    def _process_data(self, data):
61 1
        buf = ua.utils.Buffer(data)
62 1
        while True:
63 1
            try:
64 1
                backup_buf = buf.copy()
65 1
                try:
66 1
                    hdr = uabin.header_from_binary(buf)
67
                except ua.utils.NotEnoughData:
68
                    logger.info("We did not receive enough data from client, waiting for more")
69
                    self.data = backup_buf.read(len(backup_buf))
70
                    return
71 1
                if len(buf) < hdr.body_size:
72
                    logger.info("We did not receive enough data from client, waiting for more")
73
                    self.data = backup_buf.read(len(backup_buf))
74
                    return
75 1
                ret = self.processor.process(hdr, buf)
76 1
                if not ret:
77 1
                    logger.info("processor returned False, we close connection from %s", self.peername)
78 1
                    self.transport.close()
79 1
                    return
80 1
                if len(buf) == 0:
81 1
                    return
82
            except Exception:
83
                logger.exception("Exception raised while parsing message from client, closing")
84
                return
85
86
87 1
class BinaryServer(object):
88
89 1
    def __init__(self, internal_server, hostname, port):
90 1
        self.logger = logging.getLogger(__name__)
91 1
        self.hostname = hostname
92 1
        self.port = port
93 1
        self.iserver = internal_server
94 1
        self.loop = internal_server.loop
95 1
        self._server = None
96 1
        self._policies = []
97 1
        self.clients = []
98
99 1
    def set_policies(self, policies):
100 1
        self._policies = policies
101
102 1
    def start(self):
103 1
        prop = dict(
104
                iserver=self.iserver,
105
                loop=self.loop,
106
                logger=self.logger,
107
                policies=self._policies,
108
                clients=self.clients
109
            )
110 1
        protocol_factory = type('OPCUAProtocol', (OPCUAProtocol,), prop)
111
112 1
        coro = self.loop.create_server(protocol_factory, self.hostname, self.port)
113 1
        self._server = self.loop.run_coro_and_wait(coro)
114
        # get the port and the hostname from the created server socket
115
        # only relevant for dynamic port asignment (when self.port == 0)
116 1
        if self.port == 0 and len(self._server.sockets) == 1:
117
            # will work for AF_INET and AF_INET6 socket names
118
            # these are to only families supported by the create_server call
119
            sockname = self._server.sockets[0].getsockname()
120
            self.hostname = sockname[0]
121
            self.port = sockname[1]
122 1
        self.logger.warning('Listening on {0}:{1}'.format(self.hostname, self.port))
123
124 1
    def stop(self):
125 1
        self.logger.info("Closing asyncio socket server")
126 1
        for transport in self.iserver.asyncio_transports:
127 1
            transport.close()
128 1
        if self._server:
129 1
            self.loop.call_soon(self._server.close)
130
            self.loop.run_coro_and_wait(self._server.wait_closed())
131