CEHQ.response()   A
last analyzed

Complexity

Conditions 1

Size

Total Lines 8

Duplication

Lines 0
Ratio 0 %

Importance

Changes 2
Bugs 0 Features 0
Metric Value
c 2
b 0
f 0
dl 0
loc 8
rs 9.4285
cc 1
1
"""
2
Get flows from the Quebec CEHQ site
3
Sensor.remote_id is the Numero de la station e.g.: 050915 for the Nelson (http://www.cehq.gouv.qc.ca/suivihydro/graphique.asp?NoStation=050915)
4
"""
5
import datetime
6
import logging
7
import requests
8
9
from app.models import Sensor
10
from .base import add_new_sample, RemoteGage
11
12
logger = logging.getLogger(__name__)
13
14
15
def digit_or_period(c):
16
    """Returns True if the character c given is a string or a period"""
17
    return str.isdigit(c) or c is '.'
18
19
20
class CEHQ(RemoteGage):
21
    URLBASE = 'http://www.cehq.gouv.qc.ca/suivihydro/fichier_donnees.asp'
22
23
    def response(self, site_num):
24
        """
25
        Retrieve a requests.Response object for the plain text representation
26
        of a Quebec CEHQ gage
27
        """
28
        url = (self.URLBASE +
29
               '?NoStation=' + str(site_num))
30
        return requests.get(url)
31
32
    def recent_flow(self, site_num):
33
        r = self.response(site_num)
34
        line = r.text.splitlines()[2]
35
        number = line.split('\t')[2].replace(',', '.')
36
        return float(''.join(list(filter(digit_or_period, number))))
37
38
    def get_sample(self, sensor_id):
39
        sensor = self.sensor(sensor_id)
40
        v = self.recent_flow(sensor.remote_id)
41
        dt = datetime.datetime.now()
42
        add_new_sample(sensor.id, dt, v)
43