Completed
Pull Request — master (#940)
by Joe
01:26
created

tests.pipeline.num_days_in_range()   A

Complexity

Conditions 1

Size

Total Lines 6

Duplication

Lines 0
Ratio 0 %
Metric Value
cc 1
dl 0
loc 6
rs 9.4286
1
"""
2
Tests for the reference loader for EarningsCalendar.
3
"""
4
from unittest import TestCase
5
6
import blaze as bz
7
from blaze.compute.core import swap_resources_into_scope
8
from contextlib2 import ExitStack
9
from nose_parameterized import parameterized
10
import pandas as pd
11
import numpy as np
12
from pandas.util.testing import assert_series_equal
13
from six import iteritems
14
15
from zipline.pipeline import Pipeline
16
from zipline.pipeline.data import EarningsCalendar
17
from zipline.pipeline.engine import SimplePipelineEngine
18
from zipline.pipeline.factors.events import (
19
    BusinessDaysUntilNextEarnings,
20
    BusinessDaysSincePreviousEarnings,
21
)
22
from zipline.pipeline.loaders.earnings import EarningsCalendarLoader
23
from zipline.pipeline.loaders.blaze import (
24
    ANNOUNCEMENT_FIELD_NAME,
25
    BlazeEarningsCalendarLoader,
26
    SID_FIELD_NAME,
27
    TS_FIELD_NAME,
28
)
29
from zipline.utils.numpy_utils import make_datetime64D, np_NaT
30
from zipline.utils.test_utils import (
31
    make_simple_equity_info,
32
    tmp_asset_finder,
33
    gen_calendars,
34
    to_series,
35
    num_days_in_range,
36
)
37
38
39
class EarningsCalendarLoaderTestCase(TestCase):
40
    """
41
    Tests for loading the earnings announcement data.
42
    """
43
    loader_type = EarningsCalendarLoader
44
45
    @classmethod
46
    def setUpClass(cls):
47
        cls._cleanup_stack = stack = ExitStack()
48
        cls.sids = A, B, C, D, E = range(5)
49
        equity_info = make_simple_equity_info(
50
            cls.sids,
51
            start_date=pd.Timestamp('2013-01-01', tz='UTC'),
52
            end_date=pd.Timestamp('2015-01-01', tz='UTC'),
53
        )
54
        cls.finder = stack.enter_context(
55
            tmp_asset_finder(equities=equity_info),
56
        )
57
58
        cls.earnings_dates = {
59
            # K1--K2--E1--E2.
60
            A: to_series(
61
                knowledge_dates=['2014-01-05', '2014-01-10'],
62
                earning_dates=['2014-01-15', '2014-01-20'],
63
            ),
64
            # K1--K2--E2--E1.
65
            B: to_series(
66
                knowledge_dates=['2014-01-05', '2014-01-10'],
67
                earning_dates=['2014-01-20', '2014-01-15']
68
            ),
69
            # K1--E1--K2--E2.
70
            C: to_series(
71
                knowledge_dates=['2014-01-05', '2014-01-15'],
72
                earning_dates=['2014-01-10', '2014-01-20']
73
            ),
74
            # K1 == K2.
75
            D: to_series(
76
                knowledge_dates=['2014-01-05'] * 2,
77
                earning_dates=['2014-01-10', '2014-01-15'],
78
            ),
79
            E: pd.Series(
80
                data=[],
81
                index=pd.DatetimeIndex([]),
82
                dtype='datetime64[ns]',
83
            ),
84
        }
85
86
    @classmethod
87
    def tearDownClass(cls):
88
        cls._cleanup_stack.close()
89
90
    def loader_args(self, dates):
91
        """Construct the base earnings announcements object to pass to the
92
        loader.
93
94
        Parameters
95
        ----------
96
        dates : pd.DatetimeIndex
97
            The dates we can serve.
98
99
        Returns
100
        -------
101
        args : tuple[any]
102
            The arguments to forward to the loader positionally.
103
        """
104
        return dates, self.earnings_dates
105
106
    def setup(self, dates):
107
        """
108
        Make a PipelineEngine and expectation functions for the given dates
109
        calendar.
110
111
        This exists to make it easy to test our various cases with critical
112
        dates missing from the calendar.
113
        """
114
        A, B, C, D, E = self.sids
115
116
        def num_days_between(start_date, end_date):
117
            return num_days_in_range(dates, start_date, end_date)
118
119
        def zip_with_dates(dts):
120
            return pd.Series(pd.to_datetime(dts), index=dates)
121
122
        _expected_next_announce = pd.DataFrame({
123
            A: zip_with_dates(
124
                ['NaT'] * num_days_between(None, '2014-01-04') +
125
                ['2014-01-15'] * num_days_between('2014-01-05', '2014-01-15') +
126
                ['2014-01-20'] * num_days_between('2014-01-16', '2014-01-20') +
127
                ['NaT'] * num_days_between('2014-01-21', None)
128
            ),
129
            B: zip_with_dates(
130
                ['NaT'] * num_days_between(None, '2014-01-04') +
131
                ['2014-01-20'] * num_days_between('2014-01-05', '2014-01-09') +
132
                ['2014-01-15'] * num_days_between('2014-01-10', '2014-01-15') +
133
                ['2014-01-20'] * num_days_between('2014-01-16', '2014-01-20') +
134
                ['NaT'] * num_days_between('2014-01-21', None)
135
            ),
136
            C: zip_with_dates(
137
                ['NaT'] * num_days_between(None, '2014-01-04') +
138
                ['2014-01-10'] * num_days_between('2014-01-05', '2014-01-10') +
139
                ['NaT'] * num_days_between('2014-01-11', '2014-01-14') +
140
                ['2014-01-20'] * num_days_between('2014-01-15', '2014-01-20') +
141
                ['NaT'] * num_days_between('2014-01-21', None)
142
            ),
143
            D: zip_with_dates(
144
                ['NaT'] * num_days_between(None, '2014-01-04') +
145
                ['2014-01-10'] * num_days_between('2014-01-05', '2014-01-10') +
146
                ['2014-01-15'] * num_days_between('2014-01-11', '2014-01-15') +
147
                ['NaT'] * num_days_between('2014-01-16', None)
148
            ),
149
            E: zip_with_dates(['NaT'] * len(dates)),
150
        }, index=dates)
151
152
        _expected_previous_announce = pd.DataFrame({
153
            A: zip_with_dates(
154
                ['NaT'] * num_days_between(None, '2014-01-14') +
155
                ['2014-01-15'] * num_days_between('2014-01-15', '2014-01-19') +
156
                ['2014-01-20'] * num_days_between('2014-01-20', None)
157
            ),
158
            B: zip_with_dates(
159
                ['NaT'] * num_days_between(None, '2014-01-14') +
160
                ['2014-01-15'] * num_days_between('2014-01-15', '2014-01-19') +
161
                ['2014-01-20'] * num_days_between('2014-01-20', None)
162
            ),
163
            C: zip_with_dates(
164
                ['NaT'] * num_days_between(None, '2014-01-09') +
165
                ['2014-01-10'] * num_days_between('2014-01-10', '2014-01-19') +
166
                ['2014-01-20'] * num_days_between('2014-01-20', None)
167
            ),
168
            D: zip_with_dates(
169
                ['NaT'] * num_days_between(None, '2014-01-09') +
170
                ['2014-01-10'] * num_days_between('2014-01-10', '2014-01-14') +
171
                ['2014-01-15'] * num_days_between('2014-01-15', None)
172
            ),
173
            E: zip_with_dates(['NaT'] * len(dates)),
174
        }, index=dates)
175
176
        _expected_next_busday_offsets = self._compute_busday_offsets(
177
            _expected_next_announce
178
        )
179
        _expected_previous_busday_offsets = self._compute_busday_offsets(
180
            _expected_previous_announce
181
        )
182
183
        def expected_next_announce(sid):
184
            """
185
            Return the expected next announcement dates for ``sid``.
186
            """
187
            return _expected_next_announce[sid]
188
189
        def expected_next_busday_offset(sid):
190
            """
191
            Return the expected number of days to the next announcement for
192
            ``sid``.
193
            """
194
            return _expected_next_busday_offsets[sid]
195
196
        def expected_previous_announce(sid):
197
            """
198
            Return the expected previous announcement dates for ``sid``.
199
            """
200
            return _expected_previous_announce[sid]
201
202
        def expected_previous_busday_offset(sid):
203
            """
204
            Return the expected number of days to the next announcement for
205
            ``sid``.
206
            """
207
            return _expected_previous_busday_offsets[sid]
208
209
        loader = self.loader_type(*self.loader_args(dates))
210
        engine = SimplePipelineEngine(lambda _: loader, dates, self.finder)
211
        return (
212
            engine,
213
            expected_next_announce,
214
            expected_next_busday_offset,
215
            expected_previous_announce,
216
            expected_previous_busday_offset,
217
        )
218
219
    @staticmethod
220
    def _compute_busday_offsets(announcement_dates):
221
        """
222
        Compute expected business day offsets from a DataFrame of announcement
223
        dates.
224
        """
225
        # Column-vector of dates on which factor `compute` will be called.
226
        raw_call_dates = announcement_dates.index.values.astype(
227
            'datetime64[D]'
228
        )[:, None]
229
230
        # 2D array of dates containining expected nexg announcement.
231
        raw_announce_dates = (
232
            announcement_dates.values.astype('datetime64[D]')
233
        )
234
235
        # Set NaTs to 0 temporarily because busday_count doesn't support NaT.
236
        # We fill these entries with NaNs later.
237
        whereNaT = raw_announce_dates == np_NaT
238
        raw_announce_dates[whereNaT] = make_datetime64D(0)
239
240
        # The abs call here makes it so that we can use this function to
241
        # compute offsets for both next and previous earnings (previous
242
        # earnings offsets come back negative).
243
        expected = abs(np.busday_count(
244
            raw_call_dates,
245
            raw_announce_dates
246
        ).astype(float))
247
248
        expected[whereNaT] = np.nan
249
        return pd.DataFrame(
250
            data=expected,
251
            columns=announcement_dates.columns,
252
            index=announcement_dates.index,
253
        )
254
255
    @parameterized.expand(gen_calendars(
256
        '2014-01-01',
257
        '2014-01-31',
258
        critical_dates=pd.to_datetime([
259
            '2014-01-05',
260
            '2014-01-10',
261
            '2014-01-15',
262
            '2014-01-20',
263
        ]),
264
    ))
265
    def test_compute_earnings(self, dates):
266
267
        (
268
            engine,
269
            expected_next,
270
            expected_next_busday_offset,
271
            expected_previous,
272
            expected_previous_busday_offset,
273
        ) = self.setup(dates)
274
275
        pipe = Pipeline(
276
            columns={
277
                'next': EarningsCalendar.next_announcement.latest,
278
                'previous': EarningsCalendar.previous_announcement.latest,
279
                'days_to_next': BusinessDaysUntilNextEarnings(),
280
                'days_since_prev': BusinessDaysSincePreviousEarnings(),
281
            }
282
        )
283
284
        result = engine.run_pipeline(
285
            pipe,
286
            start_date=dates[0],
287
            end_date=dates[-1],
288
        )
289
290
        computed_next = result['next']
291
        computed_previous = result['previous']
292
        computed_next_busday_offset = result['days_to_next']
293
        computed_previous_busday_offset = result['days_since_prev']
294
295
        # NaTs in next/prev should correspond to NaNs in offsets.
296
        assert_series_equal(
297
            computed_next.isnull(),
298
            computed_next_busday_offset.isnull(),
299
        )
300
        assert_series_equal(
301
            computed_previous.isnull(),
302
            computed_previous_busday_offset.isnull(),
303
        )
304
305
        for sid in self.sids:
306
307
            assert_series_equal(
308
                computed_next.xs(sid, level=1),
309
                expected_next(sid),
310
                sid,
311
            )
312
313
            assert_series_equal(
314
                computed_previous.xs(sid, level=1),
315
                expected_previous(sid),
316
                sid,
317
            )
318
319
            assert_series_equal(
320
                computed_next_busday_offset.xs(sid, level=1),
321
                expected_next_busday_offset(sid),
322
                sid,
323
            )
324
325
            assert_series_equal(
326
                computed_previous_busday_offset.xs(sid, level=1),
327
                expected_previous_busday_offset(sid),
328
                sid,
329
            )
330
331
332
class BlazeEarningsCalendarLoaderTestCase(EarningsCalendarLoaderTestCase):
333
    loader_type = BlazeEarningsCalendarLoader
334
335
    def loader_args(self, dates):
336
        _, mapping = super(
337
            BlazeEarningsCalendarLoaderTestCase,
338
            self,
339
        ).loader_args(dates)
340
        return (bz.Data(pd.concat(
341
            pd.DataFrame({
342
                ANNOUNCEMENT_FIELD_NAME: earning_dates,
343
                TS_FIELD_NAME: earning_dates.index,
344
                SID_FIELD_NAME: sid,
345
            })
346
            for sid, earning_dates in iteritems(mapping)
347
        ).reset_index(drop=True)),)
348
349
350
class BlazeEarningsCalendarLoaderNotInteractiveTestCase(
351
        BlazeEarningsCalendarLoaderTestCase):
352
    """Test case for passing a non-interactive symbol and a dict of resources.
353
    """
354
    def loader_args(self, dates):
355
        (bound_expr,) = super(
356
            BlazeEarningsCalendarLoaderNotInteractiveTestCase,
357
            self,
358
        ).loader_args(dates)
359
        return swap_resources_into_scope(bound_expr, {})
360
361
362
class EarningsCalendarLoaderInferTimestampTestCase(TestCase):
363
    def test_infer_timestamp(self):
364
        dtx = pd.date_range('2014-01-01', '2014-01-10')
365
        announcement_dates = {
366
            0: dtx,
367
            1: pd.Series(dtx, dtx),
368
        }
369
        loader = EarningsCalendarLoader(
370
            dtx,
371
            announcement_dates,
372
            infer_timestamps=True,
373
        )
374
        self.assertEqual(
375
            loader.announcement_dates.keys(),
376
            announcement_dates.keys(),
377
        )
378
        assert_series_equal(
379
            loader.announcement_dates[0],
380
            pd.Series(index=[dtx[0]] * 10, data=dtx),
381
        )
382
        assert_series_equal(
383
            loader.announcement_dates[1],
384
            announcement_dates[1],
385
        )
386