Completed
Pull Request — master (#905)
by
unknown
04:05
created

zipline.pipeline.loaders.next_earnings_date_frame()   B

Complexity

Conditions 5

Size

Total Lines 40

Duplication

Lines 0
Ratio 0 %
Metric Value
cc 5
dl 0
loc 40
rs 8.0896
1
"""
2
Reference implementation for EarningsCalendar loaders.
3
"""
4
from itertools import repeat
5
6
from numpy import full_like, full
7
import pandas as pd
8
from six import iteritems
9
from six.moves import zip
10
from toolz import merge
11
12
from .base import PipelineLoader
13
from .frame import DataFrameLoader
14
from ..data.earnings import EarningsCalendar
15
from zipline.utils.numpy_utils import np_NaT
16
from zipline.utils.memoize import lazyval
17
18
19
class EarningsCalendarLoader(PipelineLoader):
20
    """
21
    Reference loader for
22
    :class:`zipline.pipeline.data.earnings.EarningsCalendar`.
23
24
    Does not currently support adjustments to the dates of known earnings.
25
26
    Parameters
27
    ----------
28
    all_dates : pd.DatetimeIndex
29
        Index of dates for which we can serve queries.
30
    announcement_dates : dict[int -> pd.Series or pd.DatetimeIndex]
31
        Dict mapping sids to an index of dates on which earnings were
32
        announced.
33
    """
34
    def __init__(self, all_dates, announcement_dates):
35
        self._all_dates = all_dates
36
37
        self._announcement_dates = announcement_dates = (
38
            announcement_dates.copy()
39
        )
40
        dates = self._all_dates.values
41
        for k, v in iteritems(announcement_dates):
42
            if isinstance(v, pd.DatetimeIndex):
43
                # If we are passed a DatetimeIndex, we always have
44
                # knowledge of the announcements.
45
                announcement_dates[k] = pd.Series(
46
                    v, index=repeat(dates[0], len(v)),
47
                )
48
49
    def get_loader(self, column):
50
        """Dispatch to the loader for ``column``.
51
        """
52
        if column is EarningsCalendar.next_announcement:
53
            return self.next_announcement_loader
54
        elif column is EarningsCalendar.previous_announcement:
55
            return self.previous_announcement_loader
56
        else:
57
            raise ValueError("Don't know how to load column '%s'." % column)
58
59
    @lazyval
60
    def next_announcement_loader(self):
61
        return DataFrameLoader(
62
            EarningsCalendar.next_announcement,
63
            next_earnings_date_frame(
64
                self._all_dates,
65
                self._announcement_dates,
66
            ),
67
            adjustments=None,
68
        )
69
70
    @lazyval
71
    def previous_announcement_loader(self):
72
        return DataFrameLoader(
73
            EarningsCalendar.previous_announcement,
74
            previous_earnings_date_frame(
75
                self._all_dates,
76
                self._announcement_dates,
77
            ),
78
            adjustments=None,
79
        )
80
81
    def load_adjusted_array(self, columns, dates, assets, mask):
82
        return merge(
83
            self.get_loader(column).load_adjusted_array(
84
                [column], dates, assets, mask
85
            )
86
            for column in columns
87
        )
88
89
90
def next_earnings_date_frame(dates, announcement_dates):
91
    """
92
    Make a DataFrame representing simulated next earnings dates.
93
94
    Parameters
95
    ----------
96
    dates : pd.DatetimeIndex.
97
        The index of the returned DataFrame.
98
    announcement_dates : dict[int -> pd.Series]
99
        Dict mapping sids to an index of dates on which earnings were announced
100
        for that sid.
101
102
    Returns
103
    -------
104
    next_earnings: pd.DataFrame
105
        A DataFrame representing, for each (label, date) pair, the first entry
106
        in `earnings_calendars[label]` on or after `date`.  Entries falling
107
        after the last date in a calendar will have `np_NaT` as the result in
108
        the output.
109
110
    See Also
111
    --------
112
    previous_earnings_date_frame
113
    """
114
    cols = {equity: full_like(dates, np_NaT) for equity in announcement_dates}
115
    raw_dates = dates.values
116
    for equity, earnings_dates in iteritems(announcement_dates):
117
        data = cols[equity]
118
        if not earnings_dates.index.is_monotonic_increasing:
119
            earnings_dates = earnings_dates.sort_index()
120
121
        # Iterate over the raw Series values, since we're comparing against
122
        # numpy arrays anyway.
123
        iterkv = zip(earnings_dates.index.values, earnings_dates.values)
124
        for timestamp, announce_date in iterkv:
125
            date_mask = (timestamp <= raw_dates) & (raw_dates <= announce_date)
126
            value_mask = (announce_date <= data) | (data == np_NaT)
127
            data[date_mask & value_mask] = announce_date
128
129
    return pd.DataFrame(index=dates, data=cols)
130
131
132
def previous_earnings_date_frame(dates, announcement_dates):
133
    """
134
    Make a DataFrame representing simulated next earnings dates.
135
136
    Parameters
137
    ----------
138
    dates : DatetimeIndex.
139
        The index of the returned DataFrame.
140
    announcement_dates : dict[int -> DatetimeIndex]
141
        Dict mapping sids to an index of dates on which earnings were announced
142
        for that sid.
143
144
    Returns
145
    -------
146
    prev_earnings: pd.DataFrame
147
        A DataFrame representing, for (label, date) pair, the first entry in
148
        `announcement_dates[label]` strictly before `date`.  Entries falling
149
        before the first date in a calendar will have `NaT` as the result in
150
        the output.
151
152
    See Also
153
    --------
154
    next_earnings_date_frame
155
    """
156
    sids = list(announcement_dates)
157
    out = full((len(dates), len(sids)), np_NaT, dtype='datetime64[ns]')
158
    dn = dates[-1].asm8
159
    for col_idx, sid in enumerate(sids):
160
        # announcement_dates[sid] is Series mapping knowledge_date to actual
161
        # announcement date.  We don't care about the knowledge date for
162
        # computing previous earnings.
163
        values = announcement_dates[sid].values
164
        values = values[values <= dn]
165
        out[dates.searchsorted(values), col_idx] = values
166
167
    frame = pd.DataFrame(out, index=dates, columns=sids)
168
    frame.ffill(inplace=True)
169
    return frame
170