Passed
Pull Request — master (#128)
by
unknown
02:06
created

asyncua.server.history_mongo.HistoryMongoDb.init()   A

Complexity

Conditions 1

Size

Total Lines 2
Code Lines 2

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
cc 1
eloc 2
nop 1
dl 0
loc 2
rs 10
c 0
b 0
f 0
1
from ..common.events import Event
2
from .history import HistoryStorageInterface
3
from motor.motor_asyncio import AsyncIOMotorClient
4
from asyncua.ua.uatypes import LocalizedText
5
from asyncua.ua.uatypes import VariantType
6
import json
7
from asyncua.server.internal_session import InternalSession
8
9
10
class HistoryMongoDb(HistoryStorageInterface):
11
    """
12
    Create an interface for storing and tracking event and variable history with the
13
    mongodb database server.
14
15
    In this example I am using a maximum of 5000 results limit and storing the last result
16
    as an ack of the date of the last event to be fetched.
17
    The biggest reason was the need that I am having to deploy opcua servers that store
18
    data every 1 second and can request the track record of at least one year which will add
19
    up to a total of 31.104.000 samples.
20
21
    Limiting this value to stream an http server with an internal opcua client that will
22
    fetch the samples from the opcua server and feed a graph to an html page.
23
24
    """
25
    async def init(self):
26
        pass
27
28
    async def new_historized_node(self, node_id, period, count=0):
29
        pass
30
31
    async def save_node_value(self, node_id, datavalue):
32
        pass
33
34
    async def read_node_history(self, node_id, start, end, nb_values, session):
35
        pass
36
37
    async def new_historized_event(self, source_id, evtypes, period, count=0):
38
        pass
39
40
    async def save_event(self, event):
41
        message_event = event.get_event_props_as_fields_dict()
42
        message = message_event["Message"].Value.Text
43
        time_stamp = message_event["Time"].Value
44
45
        _event = dict()
46
        _event["message"] = message
47
        _event["time_stamp"] = time_stamp
48
49
        self.collection.insert_one(_event)
50
51
    def format_event(self, document):
52
        event = Event()
53
        content = LocalizedText(text=json.dumps(document["message"]))
54
        event.add_property("Message", content, VariantType(21))
55
        return event
56
57
    async def read_event_history(self, source_id, start, end, nb_values, evfilter, session: InternalSession):
58
        # create the variables
59
        _session_id = str(session.session_id)
60
        events_query = list()
61
        query = {'$and': [{"time_stamp": {"$gte": start}}, {"time_stamp": {"$lte": end}}]}
62
63
        # limits the number of requests per query
64
        if (not nb_values) or nb_values > self.limits_for_request:
65
            _max_value = self.limits_for_request + 1
66
        else:
67
            _max_value = nb_values + 1
68
69
        # check if the session exists, otherwise create a data key for it
70
        if _session_id not in self.clients_session_packages:
71
            self.clients_session_packages[_session_id] = dict()
72
            self.clients_session_packages[_session_id]["isFirstRequest"] = True
73
            self.clients_session_packages[_session_id]["length"] = 0
74
            self.clients_session_packages[_session_id]["count_pages"] = 0
75
            self.clients_session_packages[_session_id]["pages"] = 0
76
77
        # if the session already exists it means it's a next appointment
78
        else:
79
            self.clients_session_packages[_session_id]["count_pages"] += 1
80
81
        # create initial query data, return number of documents, calculate number of required pages.
82
        if self.clients_session_packages[_session_id]["isFirstRequest"]:
83
            length_request = await self.collection.count_documents(query)
84
            pages = round(length_request / (_max_value - 1))
85
            missing = length_request % (_max_value - 1)
86
87
            if missing > 0:
88
                pages += 1
89
90
            self.clients_session_packages[_session_id]["length"] = length_request
91
            self.clients_session_packages[_session_id]["pages"] = pages
92
            self.clients_session_packages[_session_id]["isFirstRequest"] = False
93
94
        # execute the query by sorting by date and limiting the cursor to only the number requested by the client
95
        cursor = self.collection.find(query, {"_id": 0}).sort([('sample_datetime', 1)]).limit(_max_value)
96
        async for document in cursor:
97
            events_query.append(document)
98
99
        events = list(map(self.format_event, events_query))
100
        length = len(events)
101
102
        # if the query returns one more format document and event to display pagination data
103
        if length > (_max_value - 1):
104
            last_event = events_query[length - 1]
105
            last_time_stamp = last_event["time_stamp"]
106
107
            last_sample = dict()
108
            last_sample["pages"] = self.clients_session_packages[_session_id]["pages"]
109
            last_sample["count"] = self.clients_session_packages[_session_id]["count_pages"]
110
            last_sample["length"] = self.clients_session_packages[_session_id]["length"]
111
            last_sample["last_sample"] = last_time_stamp.isoformat()
112
113
            event = Event()
114
            content = LocalizedText(text=json.dumps(last_sample))
115
            event.add_property("Message", content, VariantType(21))
116
117
            events.pop(length - 1)
118
            events.append(event)
119
120
        pages = self.clients_session_packages[_session_id]["pages"]
121
        count_pages = self.clients_session_packages[_session_id]["count_pages"]
122
123
        # check if the query number is equal to the number of pages, if yes delete the session
124
        if count_pages >= (pages - 1):
125
            if _session_id in self.clients_session_packages:
126
                del self.clients_session_packages[_session_id]
127
128
        return events, None
129
130
    async def stop(self):
131
        pass
132
133
    def __init__(self):
134
        self.server_database: AsyncIOMotorClient = AsyncIOMotorClient("10.10.20.100:27017")
135
        self.database = self.server_database["MyHistoryEvents"]
136
        self.collection = self.database["myevents"]
137
        self.limits_for_request = 5000
138
        self.clients_session_packages = dict()
139