Completed
Pull Request — master (#947)
by Joe
01:27
created

zipline.pipeline.loaders.normalize_data_query_time()   B

Complexity

Conditions 2

Size

Total Lines 29

Duplication

Lines 0
Ratio 0 %
Metric Value
cc 2
dl 0
loc 29
rs 8.8571
1
import datetime
2
3
import pandas as pd
4
from pytz import utc
5
6
7
def normalize_data_query_time(dt, time, tz):
8
    """Apply the correct time and timezone to a date.
9
10
    Parameters
11
    ----------
12
    dt : datetime.datetime
13
        The original datetime that represents the date.
14
    time : datetime.time
15
        The time to query before.
16
    tz : tzinfo
17
        The timezone the time applies to.
18
19
    Returns
20
    -------
21
    query_dt : pd.Timestamp
22
        The timestamp with the correct time and date in utc.
23
    """
24
    # get the date after converting the timezone
25
    if dt.tzinfo is None:
26
        date = tz.localize(dt).date()
27
    else:
28
        date = dt.astimezone(tz).date()
29
30
    # merge the correct date with the time in the given timezone then convert
31
    # back to utc
32
    return pd.Timestamp(
33
        datetime.datetime.combine(date, time),
34
        tz=tz,
35
    ).tz_convert(utc)
36
37
38
def normalize_timestamp_to_query_time(df,
39
                                      time,
40
                                      tz,
41
                                      inplace=False,
42
                                      ts_field='timestamp'):
43
    """Update the timestamp field of a dataframe to normalize dates around
44
    some data query time/timezone.
45
46
    Parameters
47
    ----------
48
    df : pd.DataFrame
49
        The dataframe to update. This needs a column named ``ts_field``.
50
    time : datetime.time
51
        The time to query before.
52
    tz : tzinfo
53
        The timezone the time applies to.
54
    inplace : bool, optional
55
        Update the dataframe in place.
56
    ts_field : str, optional
57
        The name of the timestamp field in ``df``.
58
59
    Returns
60
    -------
61
    df : pd.DataFrame
62
        The dataframe with the timestamp field normalized. If ``inplace`` is
63
        true, then this will be the same object as ``df`` otherwise this will
64
        be a copy.
65
    """
66
    dtidx = pd.DatetimeIndex(df[ts_field], tz='utc')
67
    # this mask represents the indicies where the time is greater than our
68
    # lookup time
69
    past_query_time_mask = dtidx.tz_convert(tz).time > time
70
71
    if not inplace:
72
        # don't mutate the dataframe in place
73
        df = df.copy()
74
75
    # for all of the times that are greater than our query time add 1
76
    # day and truncate to the date
77
    df.loc[past_query_time_mask, ts_field] = (
78
        dtidx[past_query_time_mask] + datetime.timedelta(days=1)
79
    ).normalize()
80
    # for all of the times that are less than our query time just truncate
81
    # to the date
82
    df.loc[~past_query_time_mask, ts_field] = pd.DatetimeIndex(
83
        df.loc[~past_query_time_mask, ts_field],
84
        tz='utc',
85
    ).normalize()
86
    return df
87