Completed
Pull Request — master (#947)
by Joe
01:19
created

zipline.pipeline.loaders.normalize_data_query_time()   A

Complexity

Conditions 1

Size

Total Lines 23

Duplication

Lines 0
Ratio 0 %
Metric Value
cc 1
dl 0
loc 23
rs 9.0857
1
import datetime
2
3
import numpy as np
4
import pandas as pd
5
from six import iteritems
6
from six.moves import zip
7
8
from zipline.utils.numpy_utils import np_NaT
9
10
11
def next_date_frame(dates, events_by_sid):
12
    """
13
    Make a DataFrame representing the simulated next known date for an event.
14
15
    Parameters
16
    ----------
17
    dates : pd.DatetimeIndex.
18
        The index of the returned DataFrame.
19
    events_by_sid : dict[int -> pd.Series]
20
        Dict mapping sids to a series of dates. Each k:v pair of the series
21
        represents the date we learned of the event mapping to the date the
22
        event will occur.
23
    Returns
24
    -------
25
    next_events: pd.DataFrame
26
        A DataFrame where each column is a security from `events_by_sid` where
27
        the values are the dates of the next known event with the knowledge we
28
        had on the date of the index. Entries falling after the last date will
29
        have `NaT` as the result in the output.
30
31
32
    See Also
33
    --------
34
    previous_date_frame
35
    """
36
    cols = {
37
        equity: np.full_like(dates, np_NaT) for equity in events_by_sid
38
    }
39
    raw_dates = dates.values
40
    for equity, event_dates in iteritems(events_by_sid):
41
        data = cols[equity]
42
        if not event_dates.index.is_monotonic_increasing:
43
            event_dates = event_dates.sort_index()
44
45
        # Iterate over the raw Series values, since we're comparing against
46
        # numpy arrays anyway.
47
        iterkv = zip(event_dates.index.values, event_dates.values)
48
        for knowledge_date, event_date in iterkv:
49
            date_mask = (
50
                (knowledge_date <= raw_dates) &
51
                (raw_dates <= event_date)
52
            )
53
            value_mask = (event_date <= data) | (data == np_NaT)
54
            data[date_mask & value_mask] = event_date
55
56
    return pd.DataFrame(index=dates, data=cols)
57
58
59
def previous_date_frame(date_index, events_by_sid):
60
    """
61
    Make a DataFrame representing simulated next earnings date_index.
62
63
    Parameters
64
    ----------
65
    date_index : DatetimeIndex.
66
        The index of the returned DataFrame.
67
    events_by_sid : dict[int -> DatetimeIndex]
68
        Dict mapping sids to a series of dates. Each k:v pair of the series
69
        represents the date we learned of the event mapping to the date the
70
        event will occur.
71
72
    Returns
73
    -------
74
    previous_events: pd.DataFrame
75
        A DataFrame where each column is a security from `events_by_sid` where
76
        the values are the dates of the previous event that occured on the date
77
        of the index. Entries falling before the first date will have `NaT` as
78
        the result in the output.
79
80
    See Also
81
    --------
82
    next_date_frame
83
    """
84
    sids = list(events_by_sid)
85
    out = np.full((len(date_index), len(sids)), np_NaT, dtype='datetime64[ns]')
86
    dn = date_index[-1].asm8
87
    for col_idx, sid in enumerate(sids):
88
        # events_by_sid[sid] is Series mapping knowledge_date to actual
89
        # event_date.  We don't care about the knowledge date for
90
        # computing previous earnings.
91
        values = events_by_sid[sid].values
92
        values = values[values <= dn]
93
        out[date_index.searchsorted(values), col_idx] = values
94
95
    frame = pd.DataFrame(out, index=date_index, columns=sids)
96
    frame.ffill(inplace=True)
97
    return frame
98
99
100
def normalize_data_query_time(dt, time, tz):
101
    """Apply the correct time and timezone to a date.
102
103
    Parameters
104
    ----------
105
    dt : pd.Timestamp
106
        The original datetime that represents the date.
107
    time : datetime.time
108
        The time to query before.
109
    tz : tzinfo
110
        The timezone the time applies to.
111
112
    Returns
113
    -------
114
    query_dt : pd.Timestamp
115
        The timestamp with the correct time and date in utc.
116
    """
117
    # merge the correct date with the time in the given timezone then convert
118
    # back to utc
119
    return pd.Timestamp(
120
        datetime.datetime.combine(dt.date(), time),
121
        tz=tz,
122
    ).tz_convert('utc')
123
124
125
def normalize_timestamp_to_query_time(df,
126
                                      time,
127
                                      tz,
128
                                      inplace=False,
129
                                      ts_field='timestamp'):
130
    """Update the timestamp field of a dataframe to normalize dates around
131
    some data query time/timezone.
132
133
    Parameters
134
    ----------
135
    df : pd.DataFrame
136
        The dataframe to update. This needs a column named ``ts_field``.
137
    time : datetime.time
138
        The time to query before.
139
    tz : tzinfo
140
        The timezone the time applies to.
141
    inplace : bool, optional
142
        Update the dataframe in place.
143
    ts_field : str, optional
144
        The name of the timestamp field in ``df``.
145
146
    Returns
147
    -------
148
    df : pd.DataFrame
149
        The dataframe with the timestamp field normalized. If ``inplace`` is
150
        true, then this will be the same object as ``df`` otherwise this will
151
        be a copy.
152
    """
153
    if not inplace:
154
        # don't mutate the dataframe in place
155
        df = df.copy()
156
157
    dtidx = pd.DatetimeIndex(df.loc[:, ts_field], tz='utc')
158
    dtidx_local_time = dtidx.tz_convert(tz)
159
    to_roll_forward = dtidx_local_time.time > time
160
    # for all of the times that are greater than our query time add 1
161
    # day and truncate to the date
162
    df.loc[to_roll_forward, ts_field] = (
163
        dtidx_local_time[to_roll_forward] + datetime.timedelta(days=1)
164
    ).normalize().tz_localize(None).tz_localize('utc')  # cast back to utc
165
    df.loc[~to_roll_forward, ts_field] = dtidx[~to_roll_forward].normalize()
166
    return df
167