UnknownSource   A
last analyzed

Complexity

Total Complexity 2

Size/Duplication

Total Lines 9
Duplicated Lines 0 %

Importance

Changes 2
Bugs 0 Features 0
Metric Value
wmc 2
c 2
b 0
f 0
dl 0
loc 9
rs 10

2 Methods

Rating   Name   Duplication   Size   Complexity  
A __str__() 0 2 1
A __init__() 0 2 1
1
"""
2
Celery tasks for fetching remote samples
3
"""
4
import logging
5
6
from celery.task.schedules import crontab
7
from celery.decorators import periodic_task
8
from sqlalchemy import func
9
10
from app.database import db
11
from app.models import Sensor
12
from app.remote import h2oline, usgs, cehq, cawateroffice, corps, base
13
from app.celery import celery
14
15
logger = logging.getLogger(__name__)
16
17
SAMPLES_PER_CHUNK = 25
18
19
H2Oline = h2oline.H2Oline()
20
USGS = usgs.USGS()
21
CEHQ = cehq.CEHQ()
22
CAWaterOffice = cawateroffice.WaterOffice()
23
CORPS = corps.Corps()
24
Failure = base.RemoteGage()
25
26
27
sources = {
28
    'h2oline': H2Oline.get_multiple_samples,
29
    'usgs': USGS.get_multiple_samples,
30
    'cehq': CEHQ.get_multiple_samples,
31
    'cawater': CAWaterOffice.get_multiple_samples,
32
    'corps': CORPS.get_multiple_samples
33
}
34
35
36
class UnknownSource(Exception):
37
    """
38
    Raised when an unknown remote_type is used
39
    """
40
    def __init__(self, value):
41
        self.value = value
42
43
    def __str__(self):
44
        return repr(self.value)
45
46
47
def missing_multiple_samples(sensor_ids):
48
    """
49
    If a remote_type is missing from sources then raise an error
50
    with the sensor ids with the remote type
51
    """
52
    raise UnknownSource('Unknown remote_type for %s', ', '.join(sensor_ids))
53
54
55
@celery.task
56
def fetch_samples(sensor_ids, remote_type, remote_parameter):
57
    """
58
    Fetch samples for a group of sensor_ids with the same remote_type
59
    and remote_parameter
60
    """
61
    logger.info('Fetching a chunk of %s samples for sensors %s', remote_type, sensor_ids)
62
    sources.get(remote_type, missing_multiple_samples)(sensor_ids)
63
64
65
def chunk_sensor_ids(sensor_ids, remote_type, remote_parameter, delay):
66
    """
67
    Break down tasks into smaller bits based on SAMPLES_PER_CHUNK size
68
    then execute fetch_samples (normally as a celery task)
69
    """
70
    logger.info('Chunking %s sensors for tasks for sensors %s', remote_type, sensor_ids)
71
    try:
72
        x_range = xrange
73
    except NameError:
74
        x_range = range
75
    for chunk in [sensor_ids[x:x+SAMPLES_PER_CHUNK] for x in
76
                  x_range(0, len(sensor_ids), SAMPLES_PER_CHUNK)]:
77
        if delay:
78
            fetch_samples.delay(chunk, remote_type, remote_parameter)
79
        else:
80
            fetch_samples(chunk, remote_type, remote_parameter)
81
82
83
@periodic_task(run_every=(crontab(minute='*/15')),
84
               name='fetch_remote_samples',
85
               ignore_result=True)
86
def fetch_remote_samples(delay=True):
87
    """
88
    Create tasks for all remote sensors to be updated
89
    """
90
    # Fetch remote USGS level gages
91
    logger.info('Fetching remote samples')
92
    remote_sensors = db.session.query(func.array_Agg(Sensor.id),
93
                                      Sensor.remote_type,
94
                                      Sensor.remote_parameter)\
95
                               .group_by(Sensor.remote_type,
96
                                         Sensor.remote_parameter)\
97
                               .filter(Sensor.local == False).all()
98
    for group in remote_sensors:
99
        chunk_sensor_ids(group[0], group[1], group[2], delay)
100