1
|
|
|
from ..common.events import Event |
2
|
|
|
from .history import HistoryStorageInterface |
3
|
|
|
from motor.motor_asyncio import AsyncIOMotorClient |
4
|
|
|
from asyncua.ua.uatypes import LocalizedText |
5
|
|
|
from asyncua.ua.uatypes import VariantType |
6
|
|
|
import json |
7
|
|
|
|
8
|
|
|
|
9
|
|
|
class HistoryMongoDb(HistoryStorageInterface): |
10
|
|
|
""" |
11
|
|
|
Create an interface for storing and tracking event and variable history with the |
12
|
|
|
mongodb database server. |
13
|
|
|
|
14
|
|
|
In this example I am using a maximum of 5000 results limit and storing the last result |
15
|
|
|
as an ack of the date of the last event to be fetched. |
16
|
|
|
The biggest reason was the need that I am having to deploy opcua servers that store |
17
|
|
|
data every 1 second and can request the track record of at least one year which will add |
18
|
|
|
up to a total of 31.104.000 samples. |
19
|
|
|
|
20
|
|
|
Limiting this value to stream an http server with an internal opcua client that will |
21
|
|
|
fetch the samples from the opcua server and feed a graph to an html page. |
22
|
|
|
|
23
|
|
|
""" |
24
|
|
|
async def init(self): |
25
|
|
|
pass |
26
|
|
|
|
27
|
|
|
async def new_historized_node(self, node_id, period, count=0): |
28
|
|
|
pass |
29
|
|
|
|
30
|
|
|
async def save_node_value(self, node_id, datavalue): |
31
|
|
|
pass |
32
|
|
|
|
33
|
|
|
async def read_node_history(self, node_id, start, end, nb_values, session): |
34
|
|
|
pass |
35
|
|
|
|
36
|
|
|
async def new_historized_event(self, source_id, evtypes, period, count=0): |
37
|
|
|
pass |
38
|
|
|
|
39
|
|
|
async def save_event(self, event): |
40
|
|
|
message_event = event.get_event_props_as_fields_dict() |
41
|
|
|
message = message_event["Message"].Value.Text |
42
|
|
|
time_stamp = message_event["Time"].Value |
43
|
|
|
|
44
|
|
|
_event = dict() |
45
|
|
|
_event["message"] = message |
46
|
|
|
_event["time_stamp"] = time_stamp |
47
|
|
|
|
48
|
|
|
self.collection.insert_one(_event) |
49
|
|
|
|
50
|
|
|
def format_event(self, document): |
51
|
|
|
event = Event() |
52
|
|
|
content = LocalizedText(text=json.dumps(document["message"])) |
53
|
|
|
event.add_property("Message", content, VariantType(21)) |
54
|
|
|
return event |
55
|
|
|
|
56
|
|
|
async def read_event_history(self, source_id, start, end, nb_values, evfilter, session): |
57
|
|
|
events_query = list() |
58
|
|
|
if (not nb_values) or nb_values > self.limits_for_request: |
59
|
|
|
_max_value = 5001 |
60
|
|
|
else: |
61
|
|
|
_max_value = nb_values + 1 |
62
|
|
|
|
63
|
|
|
query = {'$and': [{"time_stamp": {"$gte": start}}, {"time_stamp": {"$lte": end}}]} |
64
|
|
|
cursor = self.collection.find(query, {"_id": 0}).sort([('sample_datetime', 1)]).limit(_max_value) |
65
|
|
|
async for document in cursor: |
66
|
|
|
events_query.append(document) |
67
|
|
|
|
68
|
|
|
events = list(map(self.format_event, events_query)) |
69
|
|
|
length = len(events) |
70
|
|
|
|
71
|
|
|
if length > (_max_value - 1): |
72
|
|
|
last_event = events_query[length - 1] |
73
|
|
|
last_time_stamp = last_event["time_stamp"] |
74
|
|
|
|
75
|
|
|
event = Event() |
76
|
|
|
last_sample = dict() |
77
|
|
|
last_sample["last_sample"] = last_time_stamp.iso_format() |
78
|
|
|
content = LocalizedText(text=json.dumps(last_sample)) |
79
|
|
|
event.add_property("Message", content, VariantType(21)) |
80
|
|
|
|
81
|
|
|
events.pop(length - 1) |
82
|
|
|
events.append(event) |
83
|
|
|
|
84
|
|
|
return events, None |
85
|
|
|
|
86
|
|
|
async def stop(self): |
87
|
|
|
pass |
88
|
|
|
|
89
|
|
|
def __init__(self): |
90
|
|
|
self.server_database: AsyncIOMotorClient = AsyncIOMotorClient("10.10.20.100:27017") |
91
|
|
|
self.database = self.server_database["MyHistoryEvents"] |
92
|
|
|
self.collection = self.database["myevents"] |
93
|
|
|
self.limits_for_request = 5000 |
94
|
|
|
|