1
|
|
|
from ..common.events import Event |
2
|
|
|
from .history import HistoryStorageInterface |
3
|
|
|
from motor.motor_asyncio import AsyncIOMotorClient |
4
|
|
|
from asyncua.ua.uatypes import LocalizedText |
5
|
|
|
from asyncua.ua.uatypes import VariantType |
6
|
|
|
import json |
7
|
|
|
from asyncua.server.internal_session import InternalSession |
8
|
|
|
|
9
|
|
|
|
10
|
|
|
class HistoryMongoDb(HistoryStorageInterface): |
11
|
|
|
""" |
12
|
|
|
Create an interface for storing and tracking event and variable history with the |
13
|
|
|
mongodb database server. |
14
|
|
|
|
15
|
|
|
In this example I am using a maximum of 5000 results limit and storing the last result |
16
|
|
|
as an ack of the date of the last event to be fetched. |
17
|
|
|
The biggest reason was the need that I am having to deploy opcua servers that store |
18
|
|
|
data every 1 second and can request the track record of at least one year which will add |
19
|
|
|
up to a total of 31.104.000 samples. |
20
|
|
|
|
21
|
|
|
Limiting this value to stream an http server with an internal opcua client that will |
22
|
|
|
fetch the samples from the opcua server and feed a graph to an html page. |
23
|
|
|
|
24
|
|
|
""" |
25
|
|
|
async def init(self): |
26
|
|
|
pass |
27
|
|
|
|
28
|
|
|
async def new_historized_node(self, node_id, period, count=0): |
29
|
|
|
pass |
30
|
|
|
|
31
|
|
|
async def save_node_value(self, node_id, datavalue): |
32
|
|
|
pass |
33
|
|
|
|
34
|
|
|
async def read_node_history(self, node_id, start, end, nb_values, session): |
35
|
|
|
pass |
36
|
|
|
|
37
|
|
|
async def new_historized_event(self, source_id, evtypes, period, count=0): |
38
|
|
|
pass |
39
|
|
|
|
40
|
|
|
async def save_event(self, event): |
41
|
|
|
message_event = event.get_event_props_as_fields_dict() |
42
|
|
|
message = message_event["Message"].Value.Text |
43
|
|
|
time_stamp = message_event["Time"].Value |
44
|
|
|
|
45
|
|
|
_event = dict() |
46
|
|
|
_event["message"] = message |
47
|
|
|
_event["time_stamp"] = time_stamp |
48
|
|
|
|
49
|
|
|
self.collection.insert_one(_event) |
50
|
|
|
|
51
|
|
|
def format_event(self, document): |
52
|
|
|
event = Event() |
53
|
|
|
content = LocalizedText(text=json.dumps(document["message"])) |
54
|
|
|
event.add_property("Message", content, VariantType(21)) |
55
|
|
|
return event |
56
|
|
|
|
57
|
|
|
async def read_event_history(self, source_id, start, end, nb_values, evfilter, session: InternalSession): |
58
|
|
|
# create the variables |
59
|
|
|
_session_id = str(session.session_id) |
60
|
|
|
events_query = list() |
61
|
|
|
query = {'$and': [{"time_stamp": {"$gte": start}}, {"time_stamp": {"$lte": end}}]} |
62
|
|
|
|
63
|
|
|
# limits the number of requests per query |
64
|
|
|
if (not nb_values) or nb_values > self.limits_for_request: |
65
|
|
|
_max_value = self.limits_for_request + 1 |
66
|
|
|
else: |
67
|
|
|
_max_value = nb_values + 1 |
68
|
|
|
|
69
|
|
|
# check if the session exists, otherwise create a data key for it |
70
|
|
|
if _session_id not in self.clients_session_packages: |
71
|
|
|
self.clients_session_packages[_session_id] = dict() |
72
|
|
|
self.clients_session_packages[_session_id]["isFirstRequest"] = True |
73
|
|
|
self.clients_session_packages[_session_id]["length"] = 0 |
74
|
|
|
self.clients_session_packages[_session_id]["count_pages"] = 0 |
75
|
|
|
self.clients_session_packages[_session_id]["pages"] = 0 |
76
|
|
|
|
77
|
|
|
# if the session already exists it means it's a next appointment |
78
|
|
|
else: |
79
|
|
|
self.clients_session_packages[_session_id]["count_pages"] += 1 |
80
|
|
|
|
81
|
|
|
# create initial query data, return number of documents, calculate number of required pages. |
82
|
|
|
if self.clients_session_packages[_session_id]["isFirstRequest"]: |
83
|
|
|
length_request = await self.collection.count_documents(query) |
84
|
|
|
pages = round(length_request / (_max_value - 1)) |
85
|
|
|
missing = length_request % (_max_value - 1) |
86
|
|
|
|
87
|
|
|
if missing > 0: |
88
|
|
|
pages += 1 |
89
|
|
|
|
90
|
|
|
self.clients_session_packages[_session_id]["length"] = length_request |
91
|
|
|
self.clients_session_packages[_session_id]["pages"] = pages |
92
|
|
|
self.clients_session_packages[_session_id]["isFirstRequest"] = False |
93
|
|
|
|
94
|
|
|
# execute the query by sorting by date and limiting the cursor to only the number requested by the client |
95
|
|
|
cursor = self.collection.find(query, {"_id": 0}).sort([('sample_datetime', 1)]).limit(_max_value) |
96
|
|
|
async for document in cursor: |
97
|
|
|
events_query.append(document) |
98
|
|
|
|
99
|
|
|
events = list(map(self.format_event, events_query)) |
100
|
|
|
length = len(events) |
101
|
|
|
|
102
|
|
|
# if the query returns one more format document and event to display pagination data |
103
|
|
|
if length > (_max_value - 1): |
104
|
|
|
last_event = events_query[length - 1] |
105
|
|
|
last_time_stamp = last_event["time_stamp"] |
106
|
|
|
|
107
|
|
|
last_sample = dict() |
108
|
|
|
last_sample["pages"] = self.clients_session_packages[_session_id]["pages"] |
109
|
|
|
last_sample["count"] = self.clients_session_packages[_session_id]["count_pages"] |
110
|
|
|
last_sample["length"] = self.clients_session_packages[_session_id]["length"] |
111
|
|
|
last_sample["last_sample"] = last_time_stamp.isoformat() |
112
|
|
|
|
113
|
|
|
event = Event() |
114
|
|
|
content = LocalizedText(text=json.dumps(last_sample)) |
115
|
|
|
event.add_property("Message", content, VariantType(21)) |
116
|
|
|
|
117
|
|
|
events.pop(length - 1) |
118
|
|
|
events.append(event) |
119
|
|
|
|
120
|
|
|
pages = self.clients_session_packages[_session_id]["pages"] |
121
|
|
|
count_pages = self.clients_session_packages[_session_id]["count_pages"] |
122
|
|
|
|
123
|
|
|
# check if the query number is equal to the number of pages, if yes delete the session |
124
|
|
|
if count_pages >= (pages - 1): |
125
|
|
|
if _session_id in self.clients_session_packages: |
126
|
|
|
del self.clients_session_packages[_session_id] |
127
|
|
|
|
128
|
|
|
return events, None |
129
|
|
|
|
130
|
|
|
async def stop(self): |
131
|
|
|
pass |
132
|
|
|
|
133
|
|
|
def __init__(self): |
134
|
|
|
self.server_database: AsyncIOMotorClient = AsyncIOMotorClient("10.10.20.100:27017") |
135
|
|
|
self.database = self.server_database["MyHistoryEvents"] |
136
|
|
|
self.collection = self.database["myevents"] |
137
|
|
|
self.limits_for_request = 5000 |
138
|
|
|
self.clients_session_packages = dict() |
139
|
|
|
|