Completed
Pull Request — master (#905)
by
unknown
01:32
created

setup()   D

Complexity

Conditions 8

Size

Total Lines 111

Duplication

Lines 0
Ratio 0 %
Metric Value
cc 8
dl 0
loc 111
rs 4

6 Methods

Rating   Name   Duplication   Size   Complexity  
A tests.pipeline.EarningsCalendarLoaderTestCase.expected_next_announce() 0 5 1
A tests.pipeline.EarningsCalendarLoaderTestCase.expected_previous_announce() 0 5 1
A tests.pipeline.EarningsCalendarLoaderTestCase.expected_previous_busday_offset() 0 6 1
A tests.pipeline.EarningsCalendarLoaderTestCase.expected_next_busday_offset() 0 6 1
A tests.pipeline.EarningsCalendarLoaderTestCase.zip_with_dates() 0 2 1
A tests.pipeline.EarningsCalendarLoaderTestCase.num_days_between() 0 2 1

How to fix   Long Method   

Long Method

Small methods make your code easier to understand, in particular if combined with a good name. Besides, if your method is small, finding a good name is usually much easier.

For example, if you find yourself adding comments to a method's body, this is usually a good sign to extract the commented part to a new method, and use the comment as a starting point when coming up with a good name for this new method.

Commonly applied refactorings include:

1
"""
2
Tests for the reference loader for EarningsCalendar.
3
"""
4
from unittest import TestCase
5
6
import blaze as bz
7
from blaze.compute.core import swap_resources_into_scope
8
from contextlib2 import ExitStack
9
from nose_parameterized import parameterized
10
import pandas as pd
11
import numpy as np
12
from pandas.util.testing import assert_series_equal
13
from six import iteritems
14
15
from zipline.pipeline import Pipeline
16
from zipline.pipeline.data import EarningsCalendar
17
from zipline.pipeline.engine import SimplePipelineEngine
18
from zipline.pipeline.factors.events import (
19
    BusinessDaysUntilNextEarnings,
20
    BusinessDaysSincePreviousEarnings,
21
)
22
from zipline.pipeline.loaders.earnings import EarningsCalendarLoader
23
from zipline.pipeline.loaders.blaze import (
24
    ANNOUNCEMENT_FIELD_NAME,
25
    BlazeEarningsCalendarLoader,
26
    SID_FIELD_NAME,
27
    TS_FIELD_NAME,
28
)
29
from zipline.utils.numpy_utils import make_datetime64D, np_NaT
30
from zipline.utils.tradingcalendar import trading_days
31
from zipline.utils.test_utils import (
32
    make_simple_equity_info,
33
    powerset,
34
    tmp_asset_finder,
35
)
36
37
38
def _to_series(knowledge_dates, earning_dates):
39
    """
40
    Helper for converting a dict of strings to a Series of datetimes.
41
42
    This is just for making the test cases more readable.
43
    """
44
    return pd.Series(
45
        index=pd.to_datetime(knowledge_dates),
46
        data=pd.to_datetime(earning_dates),
47
    )
48
49
50
def num_days_in_range(dates, start, end):
51
    """
52
    Return the number of days in `dates` between start and end, inclusive.
53
    """
54
    start_idx, stop_idx = dates.slice_locs(start, end)
55
    return stop_idx - start_idx
56
57
58
def gen_calendars():
59
    """
60
    Generate calendars to use as inputs to test_compute_latest.
61
    """
62
    start, stop = '2014-01-01', '2014-01-31'
63
    all_dates = pd.date_range(start, stop, tz='utc')
64
65
    # These dates are the points where announcements or knowledge dates happen.
66
    # Test every combination of them being absent.
67
    critical_dates = pd.to_datetime([
68
        '2014-01-05',
69
        '2014-01-10',
70
        '2014-01-15',
71
        '2014-01-20',
72
    ])
73
    for to_drop in map(list, powerset(critical_dates)):
74
        # Have to yield tuples.
75
        yield (all_dates.drop(to_drop),)
76
77
    # Also test with the trading calendar.
78
    yield (trading_days[trading_days.slice_indexer(start, stop)],)
79
80
81
class EarningsCalendarLoaderTestCase(TestCase):
82
    """
83
    Tests for loading the earnings announcement data.
84
    """
85
    loader_type = EarningsCalendarLoader
86
87
    @classmethod
88
    def setUpClass(cls):
89
        cls._cleanup_stack = stack = ExitStack()
90
        cls.sids = A, B, C, D, E = range(5)
91
        equity_info = make_simple_equity_info(
92
            cls.sids,
93
            start_date=pd.Timestamp('2013-01-01', tz='UTC'),
94
            end_date=pd.Timestamp('2015-01-01', tz='UTC'),
95
        )
96
        cls.finder = stack.enter_context(
97
            tmp_asset_finder(equities=equity_info),
98
        )
99
100
        cls.earnings_dates = {
101
            # K1--K2--E1--E2.
102
            A: _to_series(
103
                knowledge_dates=['2014-01-05', '2014-01-10'],
104
                earning_dates=['2014-01-15', '2014-01-20'],
105
            ),
106
            # K1--K2--E2--E1.
107
            B: _to_series(
108
                knowledge_dates=['2014-01-05', '2014-01-10'],
109
                earning_dates=['2014-01-20', '2014-01-15']
110
            ),
111
            # K1--E1--K2--E2.
112
            C: _to_series(
113
                knowledge_dates=['2014-01-05', '2014-01-15'],
114
                earning_dates=['2014-01-10', '2014-01-20']
115
            ),
116
            # K1 == K2.
117
            D: _to_series(
118
                knowledge_dates=['2014-01-05'] * 2,
119
                earning_dates=['2014-01-10', '2014-01-15'],
120
            ),
121
            E: pd.Series(
122
                data=[],
123
                index=pd.DatetimeIndex([]),
124
                dtype='datetime64[ns]',
125
            ),
126
        }
127
128
    @classmethod
129
    def tearDownClass(cls):
130
        cls._cleanup_stack.close()
131
132
    def loader_args(self, dates):
133
        """Construct the base earnings announcements object to pass to the
134
        loader.
135
136
        Parameters
137
        ----------
138
        dates : pd.DatetimeIndex
139
            The dates we can serve.
140
141
        Returns
142
        -------
143
        args : tuple[any]
144
            The arguments to forward to the loader positionally.
145
        """
146
        return dates, self.earnings_dates
147
148
    def setup(self, dates):
149
        """
150
        Make a PipelineEngine and expectation functions for the given dates
151
        calendar.
152
153
        This exists to make it easy to test our various cases with critical
154
        dates missing from the calendar.
155
        """
156
        A, B, C, D, E = self.sids
157
158
        def num_days_between(start_date, end_date):
159
            return num_days_in_range(dates, start_date, end_date)
160
161
        def zip_with_dates(dts):
162
            return pd.Series(pd.to_datetime(dts), index=dates)
163
164
        _expected_next_announce = pd.DataFrame({
165
            A: zip_with_dates(
166
                ['NaT'] * num_days_between(None, '2014-01-04') +
167
                ['2014-01-15'] * num_days_between('2014-01-05', '2014-01-15') +
168
                ['2014-01-20'] * num_days_between('2014-01-16', '2014-01-20') +
169
                ['NaT'] * num_days_between('2014-01-21', None)
170
            ),
171
            B: zip_with_dates(
172
                ['NaT'] * num_days_between(None, '2014-01-04') +
173
                ['2014-01-20'] * num_days_between('2014-01-05', '2014-01-09') +
174
                ['2014-01-15'] * num_days_between('2014-01-10', '2014-01-15') +
175
                ['2014-01-20'] * num_days_between('2014-01-16', '2014-01-20') +
176
                ['NaT'] * num_days_between('2014-01-21', None)
177
            ),
178
            C: zip_with_dates(
179
                ['NaT'] * num_days_between(None, '2014-01-04') +
180
                ['2014-01-10'] * num_days_between('2014-01-05', '2014-01-10') +
181
                ['NaT'] * num_days_between('2014-01-11', '2014-01-14') +
182
                ['2014-01-20'] * num_days_between('2014-01-15', '2014-01-20') +
183
                ['NaT'] * num_days_between('2014-01-21', None)
184
            ),
185
            D: zip_with_dates(
186
                ['NaT'] * num_days_between(None, '2014-01-04') +
187
                ['2014-01-10'] * num_days_between('2014-01-05', '2014-01-10') +
188
                ['2014-01-15'] * num_days_between('2014-01-11', '2014-01-15') +
189
                ['NaT'] * num_days_between('2014-01-16', None)
190
            ),
191
            E: zip_with_dates(['NaT'] * len(dates)),
192
        }, index=dates)
193
194
        _expected_previous_announce = pd.DataFrame({
195
            A: zip_with_dates(
196
                ['NaT'] * num_days_between(None, '2014-01-14') +
197
                ['2014-01-15'] * num_days_between('2014-01-15', '2014-01-19') +
198
                ['2014-01-20'] * num_days_between('2014-01-20', None)
199
            ),
200
            B: zip_with_dates(
201
                ['NaT'] * num_days_between(None, '2014-01-14') +
202
                ['2014-01-15'] * num_days_between('2014-01-15', '2014-01-19') +
203
                ['2014-01-20'] * num_days_between('2014-01-20', None)
204
            ),
205
            C: zip_with_dates(
206
                ['NaT'] * num_days_between(None, '2014-01-09') +
207
                ['2014-01-10'] * num_days_between('2014-01-10', '2014-01-19') +
208
                ['2014-01-20'] * num_days_between('2014-01-20', None)
209
            ),
210
            D: zip_with_dates(
211
                ['NaT'] * num_days_between(None, '2014-01-09') +
212
                ['2014-01-10'] * num_days_between('2014-01-10', '2014-01-14') +
213
                ['2014-01-15'] * num_days_between('2014-01-15', None)
214
            ),
215
            E: zip_with_dates(['NaT'] * len(dates)),
216
        }, index=dates)
217
218
        _expected_next_busday_offsets = self._compute_busday_offsets(
219
            _expected_next_announce
220
        )
221
        _expected_previous_busday_offsets = self._compute_busday_offsets(
222
            _expected_previous_announce
223
        )
224
225
        def expected_next_announce(sid):
226
            """
227
            Return the expected next announcement dates for ``sid``.
228
            """
229
            return _expected_next_announce[sid]
230
231
        def expected_next_busday_offset(sid):
232
            """
233
            Return the expected number of days to the next announcement for
234
            ``sid``.
235
            """
236
            return _expected_next_busday_offsets[sid]
237
238
        def expected_previous_announce(sid):
239
            """
240
            Return the expected previous announcement dates for ``sid``.
241
            """
242
            return _expected_previous_announce[sid]
243
244
        def expected_previous_busday_offset(sid):
245
            """
246
            Return the expected number of days to the next announcement for
247
            ``sid``.
248
            """
249
            return _expected_previous_busday_offsets[sid]
250
251
        loader = self.loader_type(*self.loader_args(dates))
252
        engine = SimplePipelineEngine(lambda _: loader, dates, self.finder)
253
        return (
254
            engine,
255
            expected_next_announce,
256
            expected_next_busday_offset,
257
            expected_previous_announce,
258
            expected_previous_busday_offset,
259
        )
260
261
    @staticmethod
262
    def _compute_busday_offsets(announcement_dates):
263
        """
264
        Compute expected business day offsets from a DataFrame of announcement
265
        dates.
266
        """
267
        # Column-vector of dates on which factor `compute` will be called.
268
        raw_call_dates = announcement_dates.index.values.astype(
269
            'datetime64[D]'
270
        )[:, None]
271
272
        # 2D array of dates containining expected nexg announcement.
273
        raw_announce_dates = (
274
            announcement_dates.values.astype('datetime64[D]')
275
        )
276
277
        # Set NaTs to 0 temporarily because busday_count doesn't support NaT.
278
        # We fill these entries with NaNs later.
279
        whereNaT = raw_announce_dates == np_NaT
280
        raw_announce_dates[whereNaT] = make_datetime64D(0)
281
282
        # The abs call here makes it so that we can use this function to
283
        # compute offsets for both next and previous earnings (previous
284
        # earnings offsets come back negative).
285
        expected = abs(np.busday_count(
286
            raw_call_dates,
287
            raw_announce_dates
288
        ).astype(float))
289
290
        expected[whereNaT] = np.nan
291
        return pd.DataFrame(
292
            data=expected,
293
            columns=announcement_dates.columns,
294
            index=announcement_dates.index,
295
        )
296
297
    @parameterized.expand(gen_calendars())
298
    def test_compute_earnings(self, dates):
299
300
        (
301
            engine,
302
            expected_next,
303
            expected_next_busday_offset,
304
            expected_previous,
305
            expected_previous_busday_offset,
306
        ) = self.setup(dates)
307
308
        pipe = Pipeline(
309
            columns={
310
                'next': EarningsCalendar.next_announcement.latest,
311
                'previous': EarningsCalendar.previous_announcement.latest,
312
                'days_to_next': BusinessDaysUntilNextEarnings(),
313
                'days_since_prev': BusinessDaysSincePreviousEarnings(),
314
            }
315
        )
316
317
        result = engine.run_pipeline(
318
            pipe,
319
            start_date=dates[0],
320
            end_date=dates[-1],
321
        )
322
323
        computed_next = result['next']
324
        computed_previous = result['previous']
325
        computed_next_busday_offset = result['days_to_next']
326
        computed_previous_busday_offset = result['days_since_prev']
327
328
        # NaTs in next/prev should correspond to NaNs in offsets.
329
        assert_series_equal(
330
            computed_next.isnull(),
331
            computed_next_busday_offset.isnull(),
332
        )
333
        assert_series_equal(
334
            computed_previous.isnull(),
335
            computed_previous_busday_offset.isnull(),
336
        )
337
338
        for sid in self.sids:
339
340
            assert_series_equal(
341
                computed_next.xs(sid, level=1),
342
                expected_next(sid),
343
                sid,
344
            )
345
346
            assert_series_equal(
347
                computed_previous.xs(sid, level=1),
348
                expected_previous(sid),
349
                sid,
350
            )
351
352
            assert_series_equal(
353
                computed_next_busday_offset.xs(sid, level=1),
354
                expected_next_busday_offset(sid),
355
                sid,
356
            )
357
358
            assert_series_equal(
359
                computed_previous_busday_offset.xs(sid, level=1),
360
                expected_previous_busday_offset(sid),
361
                sid,
362
            )
363
364
365
class BlazeEarningsCalendarLoaderTestCase(EarningsCalendarLoaderTestCase):
366
    loader_type = BlazeEarningsCalendarLoader
367
368
    def loader_args(self, dates):
369
        _, mapping = super(
370
            BlazeEarningsCalendarLoaderTestCase,
371
            self,
372
        ).loader_args(dates)
373
        return (bz.Data(pd.concat(
374
            pd.DataFrame({
375
                ANNOUNCEMENT_FIELD_NAME: earning_dates,
376
                TS_FIELD_NAME: earning_dates.index,
377
                SID_FIELD_NAME: sid,
378
            })
379
            for sid, earning_dates in iteritems(mapping)
380
        ).reset_index(drop=True)),)
381
382
383
class BlazeEarningsCalendarLoaderNotInteractiveTestCase(
384
        BlazeEarningsCalendarLoaderTestCase):
385
    """Test case for passing a non-interactive symbol and a dict of resources.
386
    """
387
    def loader_args(self, dates):
388
        (bound_expr,) = super(
389
            BlazeEarningsCalendarLoaderNotInteractiveTestCase,
390
            self,
391
        ).loader_args(dates)
392
        return swap_resources_into_scope(bound_expr, {})
393
394
395
class EarningsCalendarLoaderInferTimestampTestCase(TestCase):
396
    def test_infer_timestamp(self):
397
        dtx = pd.date_range('2014-01-01', '2014-01-10')
398
        announcement_dates = {
399
            0: dtx,
400
            1: pd.Series(dtx, dtx),
401
        }
402
        loader = EarningsCalendarLoader(
403
            dtx,
404
            announcement_dates,
405
            infer_timestamps=True,
406
        )
407
        self.assertEqual(
408
            loader.announcement_dates.keys(),
409
            announcement_dates.keys(),
410
        )
411
        assert_series_equal(
412
            loader.announcement_dates[0],
413
            pd.Series(index=[dtx[0]] * 10, data=dtx),
414
        )
415
        assert_series_equal(
416
            loader.announcement_dates[1],
417
            announcement_dates[1],
418
        )
419