messaging.sensormessages   A
last analyzed

Complexity

Total Complexity 4

Size/Duplication

Total Lines 42
Duplicated Lines 0 %

Importance

Changes 0
Metric Value
eloc 29
dl 0
loc 42
rs 10
c 0
b 0
f 0
wmc 4

4 Methods

Rating   Name   Duplication   Size   Complexity  
A SensorSchema.make_sensor() 0 4 1
A SensorValueSchema.make_sensorvalue() 0 4 1
A SensorValueMessage.__init__() 0 4 1
A SensorMessage.__init__() 0 4 1
1
'''sensor messages and schema'''
2
from marshmallow import Schema, fields, post_load
3
from domain.sensors import Sensor, SensorValue
4
5
class SensorValueMessage(object):
6
    """Sensor data in a message"""
7
    def __init__(self, sensorid, value, sensor_type):
8
        self.sensorid = sensorid
9
        self.value = value
10
        self.type = sensor_type
11
12
class SensorMessage(object):
13
    """Sensor message"""
14
    def __init__(self, sensorid, sensor_type, location):
15
        self.sensorid = sensorid
16
        self.type = sensor_type
17
        self.location = location
18
19
class SensorSchema(Schema):
20
    '''schema for sensor'''
21
    sensorid = fields.Str()
22
    sensortype = fields.Str()
23
    location = fields.Str()
24
25
    @post_load
26
    def make_sensor(self, data):
27
        '''reconstitute sensor'''
28
        return Sensor(**data)
29
30
class SensorValueSchema(Schema):
31
    '''serialized version of sensor value'''
32
    sensorid = fields.Str()
33
    value = fields.Str()
34
    valuetype = fields.Str()
35
    sensor = fields.Nested(SensorSchema, allow_none=True)
36
    sensortime = fields.DateTime()
37
38
    @post_load
39
    def make_sensorvalue(self, data):
40
        '''reconstitute sensorvalue'''
41
        return SensorValue(**data)
42