Completed
Pull Request — master (#947)
by Joe
01:20
created

zipline.pipeline.loaders.normalize_data_query_time()   B

Complexity

Conditions 1

Size

Total Lines 26

Duplication

Lines 0
Ratio 0 %
Metric Value
cc 1
dl 0
loc 26
rs 8.8571
1
import datetime
2
3
import numpy as np
4
import pandas as pd
5
from six import iteritems
6
from six.moves import zip
7
8
from zipline.utils.numpy_utils import np_NaT
9
10
11
def next_date_frame(dates, events_by_sid):
12
    """
13
    Make a DataFrame representing the simulated next known date for an event.
14
15
    Parameters
16
    ----------
17
    dates : pd.DatetimeIndex.
18
        The index of the returned DataFrame.
19
    events_by_sid : dict[int -> pd.Series]
20
        Dict mapping sids to a series of dates. Each k:v pair of the series
21
        represents the date we learned of the event mapping to the date the
22
        event will occur.
23
    Returns
24
    -------
25
    next_events: pd.DataFrame
26
        A DataFrame where each column is a security from `events_by_sid` where
27
        the values are the dates of the next known event with the knowledge we
28
        had on the date of the index. Entries falling after the last date will
29
        have `NaT` as the result in the output.
30
31
32
    See Also
33
    --------
34
    previous_date_frame
35
    """
36
    cols = {
37
        equity: np.full_like(dates, np_NaT) for equity in events_by_sid
38
    }
39
    raw_dates = dates.values
40
    for equity, event_dates in iteritems(events_by_sid):
41
        data = cols[equity]
42
        if not event_dates.index.is_monotonic_increasing:
43
            event_dates = event_dates.sort_index()
44
45
        # Iterate over the raw Series values, since we're comparing against
46
        # numpy arrays anyway.
47
        iterkv = zip(event_dates.index.values, event_dates.values)
48
        for knowledge_date, event_date in iterkv:
49
            date_mask = (
50
                (knowledge_date <= raw_dates) &
51
                (raw_dates <= event_date)
52
            )
53
            value_mask = (event_date <= data) | (data == np_NaT)
54
            data[date_mask & value_mask] = event_date
55
56
    return pd.DataFrame(index=dates, data=cols)
57
58
59
def previous_date_frame(date_index, events_by_sid):
60
    """
61
    Make a DataFrame representing simulated next earnings date_index.
62
63
    Parameters
64
    ----------
65
    date_index : DatetimeIndex.
66
        The index of the returned DataFrame.
67
    events_by_sid : dict[int -> DatetimeIndex]
68
        Dict mapping sids to a series of dates. Each k:v pair of the series
69
        represents the date we learned of the event mapping to the date the
70
        event will occur.
71
72
    Returns
73
    -------
74
    previous_events: pd.DataFrame
75
        A DataFrame where each column is a security from `events_by_sid` where
76
        the values are the dates of the previous event that occured on the date
77
        of the index. Entries falling before the first date will have `NaT` as
78
        the result in the output.
79
80
    See Also
81
    --------
82
    next_date_frame
83
    """
84
    sids = list(events_by_sid)
85
    out = np.full((len(date_index), len(sids)), np_NaT, dtype='datetime64[ns]')
86
    dn = date_index[-1].asm8
87
    for col_idx, sid in enumerate(sids):
88
        # events_by_sid[sid] is Series mapping knowledge_date to actual
89
        # event_date.  We don't care about the knowledge date for
90
        # computing previous earnings.
91
        values = events_by_sid[sid].values
92
        values = values[values <= dn]
93
        out[date_index.searchsorted(values), col_idx] = values
94
95
    frame = pd.DataFrame(out, index=date_index, columns=sids)
96
    frame.ffill(inplace=True)
97
    return frame
98
99
100
def normalize_data_query_time(dt, time, tz):
101
    """Apply the correct time and timezone to a date.
102
103
    Parameters
104
    ----------
105
    dt : pd.Timestamp
106
        The original datetime that represents the date.
107
    time : datetime.time
108
        The time of day to use as the cutoff point for new data. Data points
109
        that you learn about after this time will become available to your
110
        algorithm on the next trading day.
111
    tz : tzinfo
112
        The timezone to normalize your dates to before comparing against
113
        `time`.
114
115
    Returns
116
    -------
117
    query_dt : pd.Timestamp
118
        The timestamp with the correct time and date in utc.
119
    """
120
    # merge the correct date with the time in the given timezone then convert
121
    # back to utc
122
    return pd.Timestamp(
123
        datetime.datetime.combine(dt.date(), time),
124
        tz=tz,
125
    ).tz_convert('utc')
126
127
128
def normalize_timestamp_to_query_time(df,
129
                                      time,
130
                                      tz,
131
                                      inplace=False,
132
                                      ts_field='timestamp'):
133
    """Update the timestamp field of a dataframe to normalize dates around
134
    some data query time/timezone.
135
136
    Parameters
137
    ----------
138
    df : pd.DataFrame
139
        The dataframe to update. This needs a column named ``ts_field``.
140
    time : datetime.time
141
        The time of day to use as the cutoff point for new data. Data points
142
        that you learn about after this time will become available to your
143
        algorithm on the next trading day.
144
    tz : tzinfo
145
        The timezone to normalize your dates to before comparing against
146
        `time`.
147
    inplace : bool, optional
148
        Update the dataframe in place.
149
    ts_field : str, optional
150
        The name of the timestamp field in ``df``.
151
152
    Returns
153
    -------
154
    df : pd.DataFrame
155
        The dataframe with the timestamp field normalized. If ``inplace`` is
156
        true, then this will be the same object as ``df`` otherwise this will
157
        be a copy.
158
    """
159
    if not inplace:
160
        # don't mutate the dataframe in place
161
        df = df.copy()
162
163
    dtidx = pd.DatetimeIndex(df.loc[:, ts_field], tz='utc')
164
    dtidx_local_time = dtidx.tz_convert(tz)
165
    to_roll_forward = dtidx_local_time.time > time
166
    # for all of the times that are greater than our query time add 1
167
    # day and truncate to the date
168
    df.loc[to_roll_forward, ts_field] = (
169
        dtidx_local_time[to_roll_forward] + datetime.timedelta(days=1)
170
    ).normalize().tz_localize(None).tz_localize('utc')  # cast back to utc
171
    df.loc[~to_roll_forward, ts_field] = dtidx[~to_roll_forward].normalize()
172
    return df
173