Completed
Pull Request — master (#940)
by Joe
01:26
created

zipline.pipeline.loaders.next_earnings_date_frame()   B

Complexity

Conditions 5

Size

Total Lines 40

Duplication

Lines 0
Ratio 0 %
Metric Value
cc 5
dl 0
loc 40
rs 8.0896
1
"""
2
Reference implementation for EarningsCalendar loaders.
3
"""
4
from itertools import repeat
5
6
import pandas as pd
7
from six import iteritems
8
from toolz import merge
9
10
from .base import PipelineLoader
11
from .frame import DataFrameLoader
12
from .utils import next_date_frame, previous_date_frame
13
from ..data.earnings import EarningsCalendar
14
from zipline.utils.memoize import lazyval
15
16
17
class EarningsCalendarLoader(PipelineLoader):
18
    """
19
    Reference loader for
20
    :class:`zipline.pipeline.data.earnings.EarningsCalendar`.
21
22
    Does not currently support adjustments to the dates of known earnings.
23
24
    Parameters
25
    ----------
26
    all_dates : pd.DatetimeIndex
27
        Index of dates for which we can serve queries.
28
    announcement_dates : dict[int -> pd.Series or pd.DatetimeIndex]
29
        Dict mapping sids to objects representing dates on which earnings
30
        occurred.
31
32
        If a dict value is a Series, it's interpreted as a mapping from the
33
        date on which we learned an announcement was coming to the date on
34
        which the announcement was made.
35
36
        If a dict value is a DatetimeIndex, it's interpreted as just containing
37
        the dates that announcements were made, and we assume we knew about the
38
        announcement on all prior dates.  This mode is only supported if
39
        ``infer_timestamp`` is explicitly passed as a truthy value.
40
41
    infer_timestamps : bool, optional
42
        Whether to allow passing ``DatetimeIndex`` values in
43
        ``announcement_dates``.
44
    """
45
    def __init__(self,
46
                 all_dates,
47
                 announcement_dates,
48
                 infer_timestamps=False,
49
                 dataset=EarningsCalendar):
50
        self.all_dates = all_dates
51
        self.announcement_dates = announcement_dates = (
52
            announcement_dates.copy()
53
        )
54
        dates = self.all_dates.values
55
        for k, v in iteritems(announcement_dates):
56
            if isinstance(v, pd.DatetimeIndex):
57
                if not infer_timestamps:
58
                    raise ValueError(
59
                        "Got DatetimeIndex of announcement dates for sid %d.\n"
60
                        "Pass `infer_timestamps=True` to use the first date in"
61
                        " `all_dates` as implicit timestamp."
62
                    )
63
                # If we are passed a DatetimeIndex, we always have
64
                # knowledge of the announcements.
65
                announcement_dates[k] = pd.Series(
66
                    v, index=repeat(dates[0], len(v)),
67
                )
68
        self.dataset = dataset
69
70
    def get_loader(self, column):
71
        """Dispatch to the loader for ``column``.
72
        """
73
        if column is self.dataset.next_announcement:
74
            return self.next_announcement_loader
75
        elif column is self.dataset.previous_announcement:
76
            return self.previous_announcement_loader
77
        else:
78
            raise ValueError("Don't know how to load column '%s'." % column)
79
80
    @lazyval
81
    def next_announcement_loader(self):
82
        return DataFrameLoader(
83
            self.dataset.next_announcement,
84
            next_date_frame(
85
                self.all_dates,
86
                self.announcement_dates,
87
            ),
88
            adjustments=None,
89
        )
90
91
    @lazyval
92
    def previous_announcement_loader(self):
93
        return DataFrameLoader(
94
            self.dataset.previous_announcement,
95
            previous_date_frame(
96
                self.all_dates,
97
                self.announcement_dates,
98
            ),
99
            adjustments=None,
100
        )
101
102
    def load_adjusted_array(self, columns, dates, assets, mask):
103
        return merge(
104
            self.get_loader(column).load_adjusted_array(
105
                [column], dates, assets, mask
106
            )
107
            for column in columns
108
        )
109