Completed
Push — master ( ebb4fb...323695 )
by
unknown
01:25
created

zipline.pipeline.loaders.next_earnings_date_frame()   B

Complexity

Conditions 5

Size

Total Lines 40

Duplication

Lines 0
Ratio 0 %
Metric Value
cc 5
dl 0
loc 40
rs 8.0896
1
"""
2
Reference implementation for EarningsCalendar loaders.
3
"""
4
from itertools import repeat
5
6
from numpy import full_like, full
7
import pandas as pd
8
from six import iteritems
9
from six.moves import zip
10
from toolz import merge
11
12
from .base import PipelineLoader
13
from .frame import DataFrameLoader
14
from ..data.earnings import EarningsCalendar
15
from zipline.utils.numpy_utils import np_NaT
16
from zipline.utils.memoize import lazyval
17
18
19
class EarningsCalendarLoader(PipelineLoader):
20
    """
21
    Reference loader for
22
    :class:`zipline.pipeline.data.earnings.EarningsCalendar`.
23
24
    Does not currently support adjustments to the dates of known earnings.
25
26
    Parameters
27
    ----------
28
    all_dates : pd.DatetimeIndex
29
        Index of dates for which we can serve queries.
30
    announcement_dates : dict[int -> pd.Series or pd.DatetimeIndex]
31
        Dict mapping sids to objects representing dates on which earnings
32
        occurred.
33
34
        If a dict value is a Series, it's interpreted as a mapping from the
35
        date on which we learned an announcement was coming to the date on
36
        which the announcement was made.
37
38
        If a dict value is a DatetimeIndex, it's interpreted as just containing
39
        the dates that announcements were made, and we assume we knew about the
40
        announcement on all prior dates.  This mode is only supported if
41
        ``infer_timestamp`` is explicitly passed as a truthy value.
42
43
    infer_timestamps : bool, optional
44
        Whether to allow passing ``DatetimeIndex`` values in
45
        ``announcement_dates``.
46
    """
47
    def __init__(self, all_dates, announcement_dates, infer_timestamps=False):
48
        self.all_dates = all_dates
49
50
        self.announcement_dates = announcement_dates = (
51
            announcement_dates.copy()
52
        )
53
        dates = self.all_dates.values
54
        for k, v in iteritems(announcement_dates):
55
            if isinstance(v, pd.DatetimeIndex):
56
                if not infer_timestamps:
57
                    raise ValueError(
58
                        "Got DatetimeIndex of announcement dates for sid %d.\n"
59
                        "Pass `infer_timestamps=True` to use the first date in"
60
                        " `all_dates` as implicit timestamp."
61
                    )
62
                # If we are passed a DatetimeIndex, we always have
63
                # knowledge of the announcements.
64
                announcement_dates[k] = pd.Series(
65
                    v, index=repeat(dates[0], len(v)),
66
                )
67
68
    def get_loader(self, column):
69
        """Dispatch to the loader for ``column``.
70
        """
71
        if column is EarningsCalendar.next_announcement:
72
            return self.next_announcement_loader
73
        elif column is EarningsCalendar.previous_announcement:
74
            return self.previous_announcement_loader
75
        else:
76
            raise ValueError("Don't know how to load column '%s'." % column)
77
78
    @lazyval
79
    def next_announcement_loader(self):
80
        return DataFrameLoader(
81
            EarningsCalendar.next_announcement,
82
            next_earnings_date_frame(
83
                self.all_dates,
84
                self.announcement_dates,
85
            ),
86
            adjustments=None,
87
        )
88
89
    @lazyval
90
    def previous_announcement_loader(self):
91
        return DataFrameLoader(
92
            EarningsCalendar.previous_announcement,
93
            previous_earnings_date_frame(
94
                self.all_dates,
95
                self.announcement_dates,
96
            ),
97
            adjustments=None,
98
        )
99
100
    def load_adjusted_array(self, columns, dates, assets, mask):
101
        return merge(
102
            self.get_loader(column).load_adjusted_array(
103
                [column], dates, assets, mask
104
            )
105
            for column in columns
106
        )
107
108
109
def next_earnings_date_frame(dates, announcement_dates):
110
    """
111
    Make a DataFrame representing simulated next earnings dates.
112
113
    Parameters
114
    ----------
115
    dates : pd.DatetimeIndex.
116
        The index of the returned DataFrame.
117
    announcement_dates : dict[int -> pd.Series]
118
        Dict mapping sids to an index of dates on which earnings were announced
119
        for that sid.
120
121
    Returns
122
    -------
123
    next_earnings: pd.DataFrame
124
        A DataFrame representing, for each (label, date) pair, the first entry
125
        in `earnings_calendars[label]` on or after `date`.  Entries falling
126
        after the last date in a calendar will have `np_NaT` as the result in
127
        the output.
128
129
    See Also
130
    --------
131
    previous_earnings_date_frame
132
    """
133
    cols = {equity: full_like(dates, np_NaT) for equity in announcement_dates}
134
    raw_dates = dates.values
135
    for equity, earnings_dates in iteritems(announcement_dates):
136
        data = cols[equity]
137
        if not earnings_dates.index.is_monotonic_increasing:
138
            earnings_dates = earnings_dates.sort_index()
139
140
        # Iterate over the raw Series values, since we're comparing against
141
        # numpy arrays anyway.
142
        iterkv = zip(earnings_dates.index.values, earnings_dates.values)
143
        for timestamp, announce_date in iterkv:
144
            date_mask = (timestamp <= raw_dates) & (raw_dates <= announce_date)
145
            value_mask = (announce_date <= data) | (data == np_NaT)
146
            data[date_mask & value_mask] = announce_date
147
148
    return pd.DataFrame(index=dates, data=cols)
149
150
151
def previous_earnings_date_frame(dates, announcement_dates):
152
    """
153
    Make a DataFrame representing simulated next earnings dates.
154
155
    Parameters
156
    ----------
157
    dates : DatetimeIndex.
158
        The index of the returned DataFrame.
159
    announcement_dates : dict[int -> DatetimeIndex]
160
        Dict mapping sids to an index of dates on which earnings were announced
161
        for that sid.
162
163
    Returns
164
    -------
165
    prev_earnings: pd.DataFrame
166
        A DataFrame representing, for (label, date) pair, the first entry in
167
        `announcement_dates[label]` strictly before `date`.  Entries falling
168
        before the first date in a calendar will have `NaT` as the result in
169
        the output.
170
171
    See Also
172
    --------
173
    next_earnings_date_frame
174
    """
175
    sids = list(announcement_dates)
176
    out = full((len(dates), len(sids)), np_NaT, dtype='datetime64[ns]')
177
    dn = dates[-1].asm8
178
    for col_idx, sid in enumerate(sids):
179
        # announcement_dates[sid] is Series mapping knowledge_date to actual
180
        # announcement date.  We don't care about the knowledge date for
181
        # computing previous earnings.
182
        values = announcement_dates[sid].values
183
        values = values[values <= dn]
184
        out[dates.searchsorted(values), col_idx] = values
185
186
    frame = pd.DataFrame(out, index=dates, columns=sids)
187
    frame.ffill(inplace=True)
188
    return frame
189