Completed
Pull Request — master (#858)
by Eddie
01:58
created

tests.TestDataPortal.test_forward_fill_minute()   B

Complexity

Conditions 5

Size

Total Lines 66

Duplication

Lines 0
Ratio 0 %
Metric Value
cc 5
dl 0
loc 66
rs 8.2488

How to fix   Long Method   

Long Method

Small methods make your code easier to understand, in particular if combined with a good name. Besides, if your method is small, finding a good name is usually much easier.

For example, if you find yourself adding comments to a method's body, this is usually a good sign to extract the commented part to a new method, and use the comment as a starting point when coming up with a good name for this new method.

Commonly applied refactorings include:

1
import os
2
from datetime import timedelta
3
import bcolz
4
import numpy as np
5
import pandas as pd
6
7
from unittest import TestCase
8
from pandas.tslib import normalize_date
9
from testfixtures import TempDirectory
10
from zipline.data.data_portal import DataPortal
11
from zipline.data.us_equity_pricing import SQLiteAdjustmentWriter, \
12
    SQLiteAdjustmentReader
13
from zipline.finance.trading import TradingEnvironment, SimulationParameters
14
from zipline.data.us_equity_minutes import (
15
    MinuteBarWriterFromDataFrames,
16
    BcolzMinuteBarReader
17
)
18
from .utils.daily_bar_writer import DailyBarWriterFromDataFrames
19
20
21
class TestDataPortal(TestCase):
22
    def test_forward_fill_minute(self):
23
        tempdir = TempDirectory()
24
        try:
25
            env = TradingEnvironment()
26
            env.write_data(
27
                equities_data={
28
                    0: {
29
                        'start_date': pd.Timestamp("2015-09-28", tz='UTC'),
30
                        'end_date': pd.Timestamp("2015-09-29", tz='UTC')
31
                        + timedelta(days=1)
32
                    }
33
                }
34
            )
35
36
            minutes = env.minutes_for_days_in_range(
37
                start=pd.Timestamp("2015-09-28", tz='UTC'),
38
                end=pd.Timestamp("2015-09-29", tz='UTC')
39
            )
40
41
            df = pd.DataFrame({
42
                # one missing bar, then 200 bars of real data,
43
                # then 1.5 days of missing data
44
                "open": np.array([0] + list(range(0, 200)) + [0] * 579)
45
                * 1000,
46
                "high": np.array([0] + list(range(1000, 1200)) + [0] * 579)
47
                * 1000,
48
                "low": np.array([0] + list(range(2000, 2200)) + [0] * 579)
49
                * 1000,
50
                "close": np.array([0] + list(range(3000, 3200)) + [0] * 579)
51
                * 1000,
52
                "volume": [0] + list(range(4000, 4200)) + [0] * 579,
53
                "minute": minutes
54
            })
55
56
            MinuteBarWriterFromDataFrames(
57
                pd.Timestamp('2002-01-02', tz='UTC')).write(
58
                    tempdir.path, {0: df})
59
60
            sim_params = SimulationParameters(
61
                period_start=minutes[0],
62
                period_end=minutes[-1],
63
                data_frequency="minute",
64
                env=env,
65
            )
66
67
            equity_minute_reader = BcolzMinuteBarReader(tempdir.path)
68
69
            dp = DataPortal(
70
                env,
71
                equity_minute_reader=equity_minute_reader,
72
                sim_params=sim_params
73
            )
74
75
            for minute_idx, minute in enumerate(minutes):
76
                for field_idx, field in enumerate(
77
                        ["open", "high", "low", "close", "volume"]):
78
                    val = dp.get_spot_value(0, field, dt=minute)
79
                    if minute_idx == 0:
80
                        self.assertEqual(0, val)
81
                    elif minute_idx < 200:
82
                        self.assertEqual((minute_idx - 1) +
83
                                         (field_idx * 1000), val)
84
                    else:
85
                        self.assertEqual(199 + (field_idx * 1000), val)
86
        finally:
87
            tempdir.cleanup()
88
89
    def test_forward_fill_daily(self):
90
        tempdir = TempDirectory()
91
        try:
92
            # 17 trading days
93
            start_day = pd.Timestamp("2015-09-07", tz='UTC')
94
            end_day = pd.Timestamp("2015-09-30", tz='UTC')
95
96
            env = TradingEnvironment()
97
            env.write_data(
98
                equities_data={
99
                    0: {
100
                        'start_date': start_day,
101
                        'end_date': end_day
102
                    }
103
                }
104
            )
105
106
            days = env.days_in_range(start_day, end_day)
107
108
            # first bar is missing.  then 8 real bars.  then 8 more missing
109
            # bars.
110
            df = pd.DataFrame({
111
                "open": [0] + list(range(0, 8)) + [0] * 8,
112
                "high": [0] + list(range(10, 18)) + [0] * 8,
113
                "low": [0] + list(range(20, 28)) + [0] * 8,
114
                "close": [0] + list(range(30, 38)) + [0] * 8,
115
                "volume": [0] + list(range(40, 48)) + [0] * 8,
116
                "day": [day.value for day in days]
117
            }, index=days)
118
119
            assets = {0: df}
120
            path = os.path.join(tempdir.path, "testdaily.bcolz")
121
122
            DailyBarWriterFromDataFrames(assets).write(
123
                path,
124
                days,
125
                assets
126
            )
127
128
            sim_params = SimulationParameters(
129
                period_start=days[0],
130
                period_end=days[-1],
131
                data_frequency="daily"
132
            )
133
134
            dp = DataPortal(
135
                env,
136
                daily_equities_path=path,
137
                sim_params=sim_params
138
            )
139
140
            for day_idx, day in enumerate(days):
141
                for field_idx, field in enumerate(
142
                        ["open", "high", "low", "close", "volume"]):
143
                    val = dp.get_spot_value(0, field, dt=day)
144
                    if day_idx == 0:
145
                        self.assertEqual(0, val)
146
                    elif day_idx < 9:
147
                        self.assertEqual((day_idx - 1) + (field_idx * 10), val)
148
                    else:
149
                        self.assertEqual(7 + (field_idx * 10), val)
150
        finally:
151
            tempdir.cleanup()
152
153
    def test_adjust_forward_fill_minute(self):
154
        tempdir = TempDirectory()
155
        try:
156
            start_day = pd.Timestamp("2013-06-21", tz='UTC')
157
            end_day = pd.Timestamp("2013-06-24", tz='UTC')
158
159
            env = TradingEnvironment()
160
            env.write_data(
161
                equities_data={
162
                    0: {
163
                        'start_date': start_day,
164
                        'end_date': env.next_trading_day(end_day)
165
                    }
166
                }
167
            )
168
169
            minutes = env.minutes_for_days_in_range(
170
                start=start_day,
171
                end=end_day
172
            )
173
174
            df = pd.DataFrame({
175
                # 390 bars of real data, then 100 missing bars, then 290
176
                # bars of data again
177
                "open": np.array(list(range(0, 390)) + [0] * 100 +
178
                                 list(range(390, 680))) * 1000,
179
                "high": np.array(list(range(1000, 1390)) + [0] * 100 +
180
                                 list(range(1390, 1680))) * 1000,
181
                "low": np.array(list(range(2000, 2390)) + [0] * 100 +
182
                                list(range(2390, 2680))) * 1000,
183
                "close": np.array(list(range(3000, 3390)) + [0] * 100 +
184
                                  list(range(3390, 3680))) * 1000,
185
                "volume": np.array(list(range(4000, 4390)) + [0] * 100 +
186
                                   list(range(4390, 4680))),
187
                "minute": minutes
188
            })
189
190
            MinuteBarWriterFromDataFrames(
191
                pd.Timestamp('2002-01-02', tz='UTC')).write(
192
                    tempdir.path, {0: df})
193
194
            sim_params = SimulationParameters(
195
                period_start=minutes[0],
196
                period_end=minutes[-1],
197
                data_frequency="minute",
198
                env=env
199
            )
200
201
            # create a split for 6/24
202
            adjustments_path = os.path.join(tempdir.path, "adjustments.db")
203
            writer = SQLiteAdjustmentWriter(adjustments_path,
204
                                            pd.date_range(start=start_day,
205
                                                          end=end_day),
206
                                            None)
207
208
            splits = pd.DataFrame([{
209
                'effective_date': int(end_day.value / 1e9),
210
                'ratio': 0.5,
211
                'sid': 0
212
            }])
213
214
            dividend_data = {
215
                # Hackery to make the dtypes correct on an empty frame.
216
                'ex_date': np.array([], dtype='datetime64[ns]'),
217
                'pay_date': np.array([], dtype='datetime64[ns]'),
218
                'record_date': np.array([], dtype='datetime64[ns]'),
219
                'declared_date': np.array([], dtype='datetime64[ns]'),
220
                'amount': np.array([], dtype=float),
221
                'sid': np.array([], dtype=int),
222
            }
223
            dividends = pd.DataFrame(
224
                dividend_data,
225
                index=pd.DatetimeIndex([], tz='UTC'),
226
                columns=['ex_date',
227
                         'pay_date',
228
                         'record_date',
229
                         'declared_date',
230
                         'amount',
231
                         'sid']
232
            )
233
234
            merger_data = {
235
                # Hackery to make the dtypes correct on an empty frame.
236
                'effective_date': np.array([], dtype=int),
237
                'ratio': np.array([], dtype=float),
238
                'sid': np.array([], dtype=int),
239
            }
240
            mergers = pd.DataFrame(
241
                merger_data,
242
                index=pd.DatetimeIndex([], tz='UTC')
243
            )
244
245
            writer.write(splits, mergers, dividends)
246
247
            equity_minute_reader = BcolzMinuteBarReader(tempdir.path)
248
249
            dp = DataPortal(
250
                env,
251
                equity_minute_reader=equity_minute_reader,
252
                sim_params=sim_params,
253
                adjustment_reader=SQLiteAdjustmentReader(adjustments_path)
254
            )
255
256
            # phew, finally ready to start testing.
257
            for idx, minute in enumerate(minutes[:390]):
258
                for field_idx, field in enumerate(["open", "high", "low",
259
                                                   "close", "volume"]):
260
                    self.assertEqual(
261
                        dp.get_spot_value(0, field, dt=minute),
262
                        idx + (1000 * field_idx)
263
                    )
264
265
            for idx, minute in enumerate(minutes[390:490]):
266
                # no actual data for this part, so we'll forward-fill.
267
                # make sure the forward-filled values are adjusted.
268
                for field_idx, field in enumerate(["open", "high", "low",
269
                                                   "close"]):
270
                    self.assertEqual(
271
                        dp.get_spot_value(0, field, dt=minute),
272
                        (389 + (1000 * field_idx)) / 2.0
273
                    )
274
275
                self.assertEqual(
276
                    dp.get_spot_value(0, "volume", dt=minute),
277
                    8778  # 4389 * 2
278
                )
279
280
            for idx, minute in enumerate(minutes[490:]):
281
                # back to real data
282
                for field_idx, field in enumerate(["open", "high", "low",
283
                                                   "close", "volume"]):
284
                    self.assertEqual(
285
                        dp.get_spot_value(0, field, dt=minute),
286
                        (390 + idx + (1000 * field_idx))
287
                    )
288
        finally:
289
            tempdir.cleanup()
290
291
    def test_spot_value_futures(self):
292
        tempdir = TempDirectory()
293
        try:
294
            start_dt = pd.Timestamp("2015-11-20 20:11", tz='UTC')
295
            end_dt = pd.Timestamp(start_dt + timedelta(minutes=10000))
296
297
            zeroes_buffer = \
298
                [0] * int((start_dt -
299
                           normalize_date(start_dt)).total_seconds() / 60)
300
301
            df = pd.DataFrame({
302
                "open": np.array(zeroes_buffer + list(range(0, 10000))) * 1000,
303
                "high": np.array(
304
                    zeroes_buffer + list(range(10000, 20000))) * 1000,
305
                "low": np.array(
306
                    zeroes_buffer + list(range(20000, 30000))) * 1000,
307
                "close": np.array(
308
                    zeroes_buffer + list(range(30000, 40000))) * 1000,
309
                "volume": np.array(zeroes_buffer + list(range(40000, 50000)))
310
            })
311
312
            path = os.path.join(tempdir.path, "123.bcolz")
313
            ctable = bcolz.ctable.fromdataframe(df, rootdir=path)
314
            ctable.attrs["start_dt"] = start_dt.value / 1e9
315
            ctable.attrs["last_dt"] = end_dt.value / 1e9
316
317
            env = TradingEnvironment()
318
            env.write_data(futures_data={
319
                123: {
320
                    "start_date": normalize_date(start_dt),
321
                    "end_date": env.next_trading_day(normalize_date(end_dt)),
322
                    'symbol': 'TEST_FUTURE',
323
                    'asset_type': 'future',
324
                }
325
            })
326
327
            dp = DataPortal(
328
                env,
329
                minutes_futures_path=tempdir.path
330
            )
331
332
            future123 = env.asset_finder.retrieve_asset(123)
333
334
            for i in range(0, 10000):
335
                dt = pd.Timestamp(start_dt + timedelta(minutes=i))
336
337
                self.assertEqual(i,
338
                                 dp.get_spot_value(future123, "open", dt))
339
                self.assertEqual(i + 10000,
340
                                 dp.get_spot_value(future123, "high", dt))
341
                self.assertEqual(i + 20000,
342
                                 dp.get_spot_value(future123, "low", dt))
343
                self.assertEqual(i + 30000,
344
                                 dp.get_spot_value(future123, "close", dt))
345
                self.assertEqual(i + 40000,
346
                                 dp.get_spot_value(future123, "volume", dt))
347
348
        finally:
349
            tempdir.cleanup()
350