1
|
|
|
# Licensed to the StackStorm, Inc ('StackStorm') under one or more |
2
|
|
|
# contributor license agreements. See the NOTICE file distributed with |
3
|
|
|
# this work for additional information regarding copyright ownership. |
4
|
|
|
# The ASF licenses this file to You under the Apache License, Version 2.0 |
5
|
|
|
# (the "License"); you may not use this file except in compliance with |
6
|
|
|
# the License. You may obtain a copy of the License at |
7
|
|
|
# |
8
|
|
|
# http://www.apache.org/licenses/LICENSE-2.0 |
9
|
|
|
# |
10
|
|
|
# Unless required by applicable law or agreed to in writing, software |
11
|
|
|
# distributed under the License is distributed on an "AS IS" BASIS, |
12
|
|
|
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
13
|
|
|
# See the License for the specific language governing permissions and |
14
|
|
|
# limitations under the License. |
15
|
|
|
|
16
|
|
|
import os |
17
|
|
|
import sys |
18
|
|
|
import time |
19
|
|
|
import json |
20
|
|
|
import subprocess |
21
|
|
|
|
22
|
|
|
from collections import defaultdict |
23
|
|
|
|
24
|
|
|
import eventlet |
25
|
|
|
from eventlet.support import greenlets as greenlet |
26
|
|
|
from oslo_config import cfg |
27
|
|
|
|
28
|
|
|
from st2common import log as logging |
29
|
|
|
from st2common.constants.error_messages import PACK_VIRTUALENV_DOESNT_EXIST |
30
|
|
|
from st2common.constants.system import API_URL_ENV_VARIABLE_NAME |
31
|
|
|
from st2common.constants.system import AUTH_TOKEN_ENV_VARIABLE_NAME |
32
|
|
|
from st2common.constants.triggers import (SENSOR_SPAWN_TRIGGER, SENSOR_EXIT_TRIGGER) |
33
|
|
|
from st2common.constants.exit_codes import SUCCESS_EXIT_CODE |
34
|
|
|
from st2common.constants.exit_codes import FAILURE_EXIT_CODE |
35
|
|
|
from st2common.models.system.common import ResourceReference |
36
|
|
|
from st2common.persistence.pack import Pack |
37
|
|
|
from st2common.services.access import create_token |
38
|
|
|
from st2common.transport.reactor import TriggerDispatcher |
39
|
|
|
from st2common.util.api import get_full_public_api_url |
40
|
|
|
from st2common.util.pack import get_pack_common_libs_path |
41
|
|
|
from st2common.util.shell import on_parent_exit |
42
|
|
|
from st2common.util.sandboxing import get_sandbox_python_path |
43
|
|
|
from st2common.util.sandboxing import get_sandbox_python_binary_path |
44
|
|
|
from st2common.util.sandboxing import get_sandbox_virtualenv_path |
45
|
|
|
|
46
|
|
|
__all__ = [ |
47
|
|
|
'ProcessSensorContainer' |
48
|
|
|
] |
49
|
|
|
|
50
|
|
|
LOG = logging.getLogger('st2reactor.process_sensor_container') |
51
|
|
|
|
52
|
|
|
BASE_DIR = os.path.dirname(os.path.abspath(__file__)) |
53
|
|
|
WRAPPER_SCRIPT_NAME = 'sensor_wrapper.py' |
54
|
|
|
WRAPPER_SCRIPT_PATH = os.path.join(BASE_DIR, WRAPPER_SCRIPT_NAME) |
55
|
|
|
|
56
|
|
|
# How many times to try to subsequently respawn a sensor after a non-zero exit before giving up |
57
|
|
|
SENSOR_MAX_RESPAWN_COUNTS = 2 |
58
|
|
|
|
59
|
|
|
# How many seconds after the sensor has been started we should wait before considering sensor as |
60
|
|
|
# being started and running successfully |
61
|
|
|
SENSOR_SUCCESSFUL_START_THRESHOLD = 10 |
62
|
|
|
|
63
|
|
|
# How long to wait (in seconds) before respawning a dead process |
64
|
|
|
SENSOR_RESPAWN_DELAY = 2.5 |
65
|
|
|
|
66
|
|
|
# How long to wait for process to exit after sending SIGTERM signal. If the process doesn't |
67
|
|
|
# exit in this amount of seconds, SIGKILL signal will be sent to the process. |
68
|
|
|
PROCESS_EXIT_TIMEOUT = 5 |
69
|
|
|
|
70
|
|
|
# TODO: Allow multiple instances of the same sensor with different configuration |
71
|
|
|
# options - we need to update sensors for that and add "get_id" or similar |
72
|
|
|
# method to the sensor class |
73
|
|
|
|
74
|
|
|
|
75
|
|
|
class ProcessSensorContainer(object): |
76
|
|
|
""" |
77
|
|
|
Sensor container which runs sensors in a separate process. |
78
|
|
|
""" |
79
|
|
|
|
80
|
|
|
def __init__(self, sensors, poll_interval=5, dispatcher=None): |
81
|
|
|
""" |
82
|
|
|
:param sensors: A list of sensor dicts. |
83
|
|
|
:type sensors: ``list`` of ``dict`` |
84
|
|
|
|
85
|
|
|
:param poll_interval: How long to sleep between each poll for running / dead sensors. |
86
|
|
|
:type poll_interval: ``float`` |
87
|
|
|
""" |
88
|
|
|
self._poll_interval = poll_interval |
89
|
|
|
|
90
|
|
|
self._sensors = {} # maps sensor_id -> sensor object |
91
|
|
|
self._processes = {} # maps sensor_id -> sensor process |
92
|
|
|
|
93
|
|
|
if not dispatcher: |
94
|
|
|
dispatcher = TriggerDispatcher(LOG) |
95
|
|
|
self._dispatcher = dispatcher |
96
|
|
|
|
97
|
|
|
self._stopped = False |
98
|
|
|
|
99
|
|
|
sensors = sensors or [] |
100
|
|
|
for sensor_obj in sensors: |
101
|
|
|
sensor_id = self._get_sensor_id(sensor=sensor_obj) |
102
|
|
|
self._sensors[sensor_id] = sensor_obj |
103
|
|
|
|
104
|
|
|
# Stores information needed for respawning dead sensors |
105
|
|
|
self._sensor_start_times = {} # maps sensor_id -> sensor start time |
106
|
|
|
self._sensor_respawn_counts = defaultdict(int) # maps sensor_id -> number of respawns |
107
|
|
|
|
108
|
|
|
# A list of all the instance variables which hold internal state information about a |
109
|
|
|
# particular_sensor |
110
|
|
|
# Note: We don't clear respawn counts since we want to track this through the whole life |
111
|
|
|
# cycle of the container manager |
112
|
|
|
self._internal_sensor_state_variables = [ |
113
|
|
|
self._processes, |
114
|
|
|
self._sensors, |
115
|
|
|
self._sensor_start_times, |
116
|
|
|
] |
117
|
|
|
|
118
|
|
|
self._enable_common_pack_libs = cfg.CONF.packs.enable_common_libs or False |
119
|
|
|
|
120
|
|
|
def run(self): |
121
|
|
|
self._run_all_sensors() |
122
|
|
|
|
123
|
|
|
try: |
124
|
|
|
while not self._stopped: |
125
|
|
|
# Poll for all running processes |
126
|
|
|
sensor_ids = self._sensors.keys() |
127
|
|
|
|
128
|
|
|
if len(sensor_ids) >= 1: |
129
|
|
|
LOG.debug('%d active sensor(s)' % (len(sensor_ids))) |
130
|
|
|
self._poll_sensors_for_results(sensor_ids) |
131
|
|
|
else: |
132
|
|
|
LOG.debug('No active sensors') |
133
|
|
|
|
134
|
|
|
eventlet.sleep(self._poll_interval) |
135
|
|
|
except greenlet.GreenletExit: |
136
|
|
|
# This exception is thrown when sensor container manager |
137
|
|
|
# kills the thread which runs process container. Not sure |
138
|
|
|
# if this is the best thing to do. |
139
|
|
|
self._stopped = True |
140
|
|
|
return SUCCESS_EXIT_CODE |
141
|
|
|
except: |
142
|
|
|
LOG.exception('Container failed to run sensors.') |
143
|
|
|
self._stopped = True |
144
|
|
|
return FAILURE_EXIT_CODE |
145
|
|
|
|
146
|
|
|
self._stopped = True |
147
|
|
|
LOG.error('Process container quit. It shouldn\'t.') |
148
|
|
|
return SUCCESS_EXIT_CODE |
149
|
|
|
|
150
|
|
|
def _poll_sensors_for_results(self, sensor_ids): |
151
|
|
|
""" |
152
|
|
|
Main loop which polls sensor for results and detects dead sensors. |
153
|
|
|
""" |
154
|
|
|
for sensor_id in sensor_ids: |
155
|
|
|
now = int(time.time()) |
156
|
|
|
|
157
|
|
|
process = self._processes[sensor_id] |
158
|
|
|
status = process.poll() |
159
|
|
|
|
160
|
|
|
if status is not None: |
161
|
|
|
# Dead process detected |
162
|
|
|
LOG.info('Process for sensor %s has exited with code %s', sensor_id, status) |
163
|
|
|
|
164
|
|
|
sensor = self._sensors[sensor_id] |
165
|
|
|
self._delete_sensor(sensor_id) |
166
|
|
|
|
167
|
|
|
self._dispatch_trigger_for_sensor_exit(sensor=sensor, |
168
|
|
|
exit_code=status) |
169
|
|
|
|
170
|
|
|
# Try to respawn a dead process (maybe it was a simple failure which can be |
171
|
|
|
# resolved with a restart) |
172
|
|
|
eventlet.spawn_n(self._respawn_sensor, sensor_id=sensor_id, sensor=sensor, |
173
|
|
|
exit_code=status) |
174
|
|
|
else: |
175
|
|
|
sensor_start_time = self._sensor_start_times[sensor_id] |
176
|
|
|
sensor_respawn_count = self._sensor_respawn_counts[sensor_id] |
177
|
|
|
successfuly_started = (now - sensor_start_time) >= SENSOR_SUCCESSFUL_START_THRESHOLD |
178
|
|
|
|
179
|
|
|
if successfuly_started and sensor_respawn_count >= 1: |
180
|
|
|
# Sensor has been successfully running more than threshold seconds, clear the |
181
|
|
|
# respawn counter so we can try to restart the sensor if it dies later on |
182
|
|
|
self._sensor_respawn_counts[sensor_id] = 0 |
183
|
|
|
|
184
|
|
|
def running(self): |
185
|
|
|
return len(self._processes) |
186
|
|
|
|
187
|
|
|
def stopped(self): |
188
|
|
|
return self._stopped |
189
|
|
|
|
190
|
|
|
def shutdown(self, force=False): |
191
|
|
|
LOG.info('Container shutting down. Invoking cleanup on sensors.') |
192
|
|
|
self._stopped = True |
193
|
|
|
|
194
|
|
|
if force: |
195
|
|
|
exit_timeout = 0 |
196
|
|
|
else: |
197
|
|
|
exit_timeout = PROCESS_EXIT_TIMEOUT |
198
|
|
|
|
199
|
|
|
sensor_ids = self._sensors.keys() |
200
|
|
|
for sensor_id in sensor_ids: |
201
|
|
|
self._stop_sensor_process(sensor_id=sensor_id, exit_timeout=exit_timeout) |
202
|
|
|
|
203
|
|
|
LOG.info('All sensors are shut down.') |
204
|
|
|
|
205
|
|
|
self._sensors = {} |
206
|
|
|
self._processes = {} |
207
|
|
|
|
208
|
|
|
def add_sensor(self, sensor): |
209
|
|
|
""" |
210
|
|
|
Add a new sensor to the container. |
211
|
|
|
|
212
|
|
|
:type sensor: ``dict`` |
213
|
|
|
""" |
214
|
|
|
sensor_id = self._get_sensor_id(sensor=sensor) |
215
|
|
|
|
216
|
|
|
if sensor_id in self._sensors: |
217
|
|
|
LOG.warning('Sensor %s already exists and running.', sensor_id) |
218
|
|
|
return False |
219
|
|
|
|
220
|
|
|
self._spawn_sensor_process(sensor=sensor) |
221
|
|
|
LOG.debug('Sensor %s started.', sensor_id) |
222
|
|
|
self._sensors[sensor_id] = sensor |
223
|
|
|
return True |
224
|
|
|
|
225
|
|
|
def remove_sensor(self, sensor): |
226
|
|
|
""" |
227
|
|
|
Remove an existing sensor from the container. |
228
|
|
|
|
229
|
|
|
:type sensor: ``dict`` |
230
|
|
|
""" |
231
|
|
|
sensor_id = self._get_sensor_id(sensor=sensor) |
232
|
|
|
|
233
|
|
|
if sensor_id not in self._sensors: |
234
|
|
|
LOG.warning('Sensor %s isn\'t running in this container.', sensor_id) |
235
|
|
|
return False |
236
|
|
|
|
237
|
|
|
self._stop_sensor_process(sensor_id=sensor_id) |
238
|
|
|
LOG.debug('Sensor %s stopped.', sensor_id) |
239
|
|
|
return True |
240
|
|
|
|
241
|
|
|
def _run_all_sensors(self): |
242
|
|
|
sensor_ids = self._sensors.keys() |
243
|
|
|
|
244
|
|
|
for sensor_id in sensor_ids: |
245
|
|
|
sensor_obj = self._sensors[sensor_id] |
246
|
|
|
LOG.info('Running sensor %s', sensor_id) |
247
|
|
|
|
248
|
|
|
try: |
249
|
|
|
self._spawn_sensor_process(sensor=sensor_obj) |
250
|
|
|
except Exception as e: |
251
|
|
|
LOG.warning(e.message, exc_info=True) |
252
|
|
|
|
253
|
|
|
# Disable sensor which we are unable to start |
254
|
|
|
del self._sensors[sensor_id] |
255
|
|
|
continue |
256
|
|
|
|
257
|
|
|
LOG.info('Sensor %s started' % sensor_id) |
258
|
|
|
|
259
|
|
|
def _spawn_sensor_process(self, sensor): |
260
|
|
|
""" |
261
|
|
|
Spawn a new process for the provided sensor. |
262
|
|
|
|
263
|
|
|
New process uses isolated Python binary from a virtual environment |
264
|
|
|
belonging to the sensor pack. |
265
|
|
|
""" |
266
|
|
|
sensor_id = self._get_sensor_id(sensor=sensor) |
267
|
|
|
pack_ref = sensor['pack'] |
268
|
|
|
pack_db = Pack.get_by_ref(pack_ref) |
269
|
|
|
|
270
|
|
|
virtualenv_path = get_sandbox_virtualenv_path(pack=pack_ref) |
271
|
|
|
python_path = get_sandbox_python_binary_path(pack=pack_ref) |
272
|
|
|
|
273
|
|
|
if virtualenv_path and not os.path.isdir(virtualenv_path): |
274
|
|
|
format_values = {'pack': sensor['pack'], 'virtualenv_path': virtualenv_path} |
275
|
|
|
msg = PACK_VIRTUALENV_DOESNT_EXIST % format_values |
276
|
|
|
raise Exception(msg) |
277
|
|
|
|
278
|
|
|
trigger_type_refs = sensor['trigger_types'] or [] |
279
|
|
|
trigger_type_refs = ','.join(trigger_type_refs) |
280
|
|
|
|
281
|
|
|
parent_args = json.dumps(sys.argv[1:]) |
282
|
|
|
|
283
|
|
|
args = [ |
284
|
|
|
python_path, |
285
|
|
|
WRAPPER_SCRIPT_PATH, |
286
|
|
|
'--pack=%s' % (sensor['pack']), |
287
|
|
|
'--file-path=%s' % (sensor['file_path']), |
288
|
|
|
'--class-name=%s' % (sensor['class_name']), |
289
|
|
|
'--trigger-type-refs=%s' % (trigger_type_refs), |
290
|
|
|
'--parent-args=%s' % (parent_args) |
291
|
|
|
] |
292
|
|
|
|
293
|
|
|
if sensor['poll_interval']: |
294
|
|
|
args.append('--poll-interval=%s' % (sensor['poll_interval'])) |
295
|
|
|
|
296
|
|
|
sandbox_python_path = get_sandbox_python_path(inherit_from_parent=True, |
297
|
|
|
inherit_parent_virtualenv=True) |
298
|
|
|
pack_common_libs_path = get_pack_common_libs_path(pack_db=pack_db) |
299
|
|
|
|
300
|
|
|
env = os.environ.copy() |
301
|
|
|
|
302
|
|
|
if self._enable_common_pack_libs and pack_common_libs_path: |
303
|
|
|
env['PYTHONPATH'] = pack_common_libs_path + ':' + sandbox_python_path |
304
|
|
|
else: |
305
|
|
|
env['PYTHONPATH'] = sandbox_python_path |
306
|
|
|
|
307
|
|
|
# Include full api URL and API token specific to that sensor |
308
|
|
|
ttl = cfg.CONF.auth.service_token_ttl |
309
|
|
|
metadata = { |
310
|
|
|
'service': 'sensors_container', |
311
|
|
|
'sensor_path': sensor['file_path'], |
312
|
|
|
'sensor_class': sensor['class_name'] |
313
|
|
|
} |
314
|
|
|
temporary_token = create_token(username='sensors_container', ttl=ttl, metadata=metadata, |
315
|
|
|
service=True) |
316
|
|
|
|
317
|
|
|
env[API_URL_ENV_VARIABLE_NAME] = get_full_public_api_url() |
318
|
|
|
env[AUTH_TOKEN_ENV_VARIABLE_NAME] = temporary_token.token |
319
|
|
|
|
320
|
|
|
# TODO 1: Purge temporary token when service stops or sensor process dies |
321
|
|
|
# TODO 2: Store metadata (wrapper process id) with the token and delete |
322
|
|
|
# tokens for old, dead processes on startup |
323
|
|
|
cmd = ' '.join(args) |
324
|
|
|
LOG.debug('Running sensor subprocess (cmd="%s")', cmd) |
325
|
|
|
|
326
|
|
|
# TODO: Intercept stdout and stderr for aggregated logging purposes |
327
|
|
|
try: |
328
|
|
|
process = subprocess.Popen(args=args, stdin=None, stdout=None, |
329
|
|
|
stderr=None, shell=False, env=env, |
330
|
|
|
preexec_fn=on_parent_exit('SIGTERM')) |
331
|
|
|
except Exception as e: |
332
|
|
|
cmd = ' '.join(args) |
333
|
|
|
message = ('Failed to spawn process for sensor %s ("%s"): %s' % |
334
|
|
|
(sensor_id, cmd, str(e))) |
335
|
|
|
raise Exception(message) |
336
|
|
|
|
337
|
|
|
self._processes[sensor_id] = process |
338
|
|
|
self._sensors[sensor_id] = sensor |
339
|
|
|
self._sensor_start_times[sensor_id] = int(time.time()) |
340
|
|
|
|
341
|
|
|
self._dispatch_trigger_for_sensor_spawn(sensor=sensor, process=process, cmd=cmd) |
342
|
|
|
|
343
|
|
|
return process |
344
|
|
|
|
345
|
|
|
def _stop_sensor_process(self, sensor_id, exit_timeout=PROCESS_EXIT_TIMEOUT): |
346
|
|
|
""" |
347
|
|
|
Stop a sensor process for the provided sensor. |
348
|
|
|
|
349
|
|
|
:param sensor_id: Sensor ID. |
350
|
|
|
:type sensor_id: ``str`` |
351
|
|
|
|
352
|
|
|
:param exit_timeout: How long to wait for process to exit after |
353
|
|
|
sending SIGTERM signal. If the process doesn't |
354
|
|
|
exit in this amount of seconds, SIGKILL signal |
355
|
|
|
will be sent to the process. |
356
|
|
|
:type exit__timeout: ``int`` |
357
|
|
|
""" |
358
|
|
|
process = self._processes[sensor_id] |
359
|
|
|
|
360
|
|
|
# Delete sensor before terminating process so that it will not be |
361
|
|
|
# respawned during termination |
362
|
|
|
self._delete_sensor(sensor_id) |
363
|
|
|
|
364
|
|
|
# Terminate the process and wait for up to stop_timeout seconds for the |
365
|
|
|
# process to exit |
366
|
|
|
process.terminate() |
367
|
|
|
|
368
|
|
|
timeout = 0 |
369
|
|
|
sleep_delay = 1 |
370
|
|
|
while timeout < exit_timeout: |
371
|
|
|
status = process.poll() |
372
|
|
|
|
373
|
|
|
if status is not None: |
374
|
|
|
# Process has exited |
375
|
|
|
break |
376
|
|
|
|
377
|
|
|
timeout += sleep_delay |
378
|
|
|
time.sleep(sleep_delay) |
379
|
|
|
|
380
|
|
|
if status is None: |
381
|
|
|
# Process hasn't exited yet, forcefully kill it |
382
|
|
|
process.kill() |
383
|
|
|
|
384
|
|
|
def _respawn_sensor(self, sensor_id, sensor, exit_code): |
385
|
|
|
""" |
386
|
|
|
Method for respawning a sensor which died with a non-zero exit code. |
387
|
|
|
""" |
388
|
|
|
extra = {'sensor_id': sensor_id, 'sensor': sensor} |
389
|
|
|
|
390
|
|
|
if self._stopped: |
391
|
|
|
LOG.debug('Stopped, not respawning a dead sensor', extra=extra) |
392
|
|
|
return |
393
|
|
|
|
394
|
|
|
should_respawn = self._should_respawn_sensor(sensor_id=sensor_id, sensor=sensor, |
395
|
|
|
exit_code=exit_code) |
396
|
|
|
|
397
|
|
|
if not should_respawn: |
398
|
|
|
LOG.debug('Not respawning a dead sensor', extra=extra) |
399
|
|
|
return |
400
|
|
|
|
401
|
|
|
LOG.debug('Respawning dead sensor', extra=extra) |
402
|
|
|
|
403
|
|
|
self._sensor_respawn_counts[sensor_id] += 1 |
404
|
|
|
sleep_delay = (SENSOR_RESPAWN_DELAY * self._sensor_respawn_counts[sensor_id]) |
405
|
|
|
eventlet.sleep(sleep_delay) |
406
|
|
|
|
407
|
|
|
try: |
408
|
|
|
self._spawn_sensor_process(sensor=sensor) |
409
|
|
|
except Exception as e: |
410
|
|
|
LOG.warning(e.message, exc_info=True) |
411
|
|
|
|
412
|
|
|
# Disable sensor which we are unable to start |
413
|
|
|
del self._sensors[sensor_id] |
414
|
|
|
|
415
|
|
|
def _should_respawn_sensor(self, sensor_id, sensor, exit_code): |
416
|
|
|
""" |
417
|
|
|
Return True if the provided sensor should be respawned, False otherwise. |
418
|
|
|
""" |
419
|
|
|
if exit_code == 0: |
420
|
|
|
# We only try to respawn sensors which exited with non-zero status code |
421
|
|
|
return False |
422
|
|
|
|
423
|
|
|
respawn_count = self._sensor_respawn_counts[sensor_id] |
424
|
|
|
if respawn_count >= SENSOR_MAX_RESPAWN_COUNTS: |
425
|
|
|
LOG.debug('Sensor has already been respawned max times, giving up') |
426
|
|
|
return False |
427
|
|
|
|
428
|
|
|
return True |
429
|
|
|
|
430
|
|
|
def _get_sensor_id(self, sensor): |
431
|
|
|
""" |
432
|
|
|
Return unique identifier for the provider sensor dict. |
433
|
|
|
|
434
|
|
|
:type sensor: ``dict`` |
435
|
|
|
""" |
436
|
|
|
sensor_id = sensor['ref'] |
437
|
|
|
return sensor_id |
438
|
|
|
|
439
|
|
|
def _dispatch_trigger_for_sensor_spawn(self, sensor, process, cmd): |
440
|
|
|
trigger = ResourceReference.to_string_reference( |
441
|
|
|
name=SENSOR_SPAWN_TRIGGER['name'], |
442
|
|
|
pack=SENSOR_SPAWN_TRIGGER['pack']) |
443
|
|
|
now = int(time.time()) |
444
|
|
|
payload = { |
445
|
|
|
'id': sensor['class_name'], |
446
|
|
|
'timestamp': now, |
447
|
|
|
'pid': process.pid, |
448
|
|
|
'cmd': cmd |
449
|
|
|
} |
450
|
|
|
self._dispatcher.dispatch(trigger, payload=payload) |
451
|
|
|
|
452
|
|
|
def _dispatch_trigger_for_sensor_exit(self, sensor, exit_code): |
453
|
|
|
trigger = ResourceReference.to_string_reference( |
454
|
|
|
name=SENSOR_EXIT_TRIGGER['name'], |
455
|
|
|
pack=SENSOR_EXIT_TRIGGER['pack']) |
456
|
|
|
now = int(time.time()) |
457
|
|
|
payload = { |
458
|
|
|
'id': sensor['class_name'], |
459
|
|
|
'timestamp': now, |
460
|
|
|
'exit_code': exit_code |
461
|
|
|
} |
462
|
|
|
self._dispatcher.dispatch(trigger, payload=payload) |
463
|
|
|
|
464
|
|
|
def _delete_sensor(self, sensor_id): |
465
|
|
|
""" |
466
|
|
|
Delete / reset all the internal state about a particular sensor. |
467
|
|
|
""" |
468
|
|
|
for var in self._internal_sensor_state_variables: |
469
|
|
|
if sensor_id in var: |
470
|
|
|
del var[sensor_id] |
471
|
|
|
|