Passed
Pull Request — master (#127)
by
unknown
04:13
created

HistoryMongoDb.__init__()   A

Complexity

Conditions 1

Size

Total Lines 5
Code Lines 5

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
cc 1
eloc 5
nop 1
dl 0
loc 5
rs 10
c 0
b 0
f 0
1
from ..common.events import Event
2
from .history import HistoryStorageInterface
3
from motor.motor_asyncio import AsyncIOMotorClient
4
from asyncua.ua.uatypes import LocalizedText
5
from asyncua.ua.uatypes import VariantType
6
import json
7
8
9
class HistoryMongoDb(HistoryStorageInterface):
10
    """
11
    Create an interface for storing and tracking event and variable history with the
12
    mongodb database server.
13
14
    In this example I am using a maximum of 5000 results limit and storing the last result
15
    as an ack of the date of the last event to be fetched.
16
    The biggest reason was the need that I am having to deploy opcua servers that store
17
    data every 1 second and can request the track record of at least one year which will add
18
    up to a total of 31.104.000 samples.
19
20
    Limiting this value to stream an http server with an internal opcua client that will
21
    fetch the samples from the opcua server and feed a graph to an html page.
22
23
    """
24
    async def init(self):
25
        pass
26
27
    async def new_historized_node(self, node_id, period, count=0):
28
        pass
29
30
    async def save_node_value(self, node_id, datavalue):
31
        pass
32
33
    async def read_node_history(self, node_id, start, end, nb_values, session):
34
        pass
35
36
    async def new_historized_event(self, source_id, evtypes, period, count=0):
37
        pass
38
39
    async def save_event(self, event):
40
        message_event = event.get_event_props_as_fields_dict()
41
        message = message_event["Message"].Value.Text
42
        time_stamp = message_event["Time"].Value
43
44
        _event = dict()
45
        _event["message"] = message
46
        _event["time_stamp"] = time_stamp
47
48
        self.collection.insert_one(_event)
49
50
    def format_event(self, document):
51
        event = Event()
52
        content = LocalizedText(text=json.dumps(document["message"]))
53
        event.add_property("Message", content, VariantType(21))
54
        return event
55
56
    async def read_event_history(self, source_id, start, end, nb_values, evfilter, session):
57
        events_query = list()
58
        if (not nb_values) or nb_values > self.limits_for_request:
59
            _max_value = 5001
60
        else:
61
            _max_value = nb_values + 1
62
63
        query = {'$and': [{"time_stamp": {"$gte": start}}, {"time_stamp": {"$lte": end}}]}
64
        cursor = self.collection.find(query, {"_id": 0}).sort([('sample_datetime', 1)]).limit(_max_value)
65
        async for document in cursor:
66
            events_query.append(document)
67
68
        events = list(map(self.format_event, events_query))
69
        length = len(events)
70
71
        if length > (_max_value - 1):
72
            last_event = events_query[length - 1]
73
            last_time_stamp = last_event["time_stamp"]
74
75
            event = Event()
76
            last_sample = dict()
77
            last_sample["last_sample"] = last_time_stamp.iso_format()
78
            content = LocalizedText(text=json.dumps(last_sample))
79
            event.add_property("Message", content, VariantType(21))
80
81
            events.pop(length - 1)
82
            events.append(event)
83
84
        return events, None
85
86
    async def stop(self):
87
        pass
88
89
    def __init__(self):
90
        self.server_database: AsyncIOMotorClient = AsyncIOMotorClient("10.10.20.100:27017")
91
        self.database = self.server_database["MyHistoryEvents"]
92
        self.collection = self.database["myevents"]
93
        self.limits_for_request = 5000
94