Completed
Pull Request — master (#858)
by Eddie
01:35
created

tests.TestDataPortal   A

Complexity

Total Complexity 20

Size/Duplication

Total Lines 325
Duplicated Lines 0 %
Metric Value
dl 0
loc 325
rs 10
wmc 20

4 Methods

Rating   Name   Duplication   Size   Complexity  
C test_adjust_forward_fill_minute() 0 135 7
B test_forward_fill_minute() 0 64 5
A test_spot_value_futures() 0 59 2
B test_forward_fill_daily() 0 63 6
1
import os
2
from datetime import timedelta
3
import bcolz
4
import numpy as np
5
import pandas as pd
6
7
from unittest import TestCase
8
from pandas.tslib import normalize_date
9
from testfixtures import TempDirectory
10
from zipline.data.data_portal import DataPortal
11
from zipline.data.us_equity_pricing import SQLiteAdjustmentWriter, \
12
    SQLiteAdjustmentReader
13
from zipline.finance.trading import TradingEnvironment, SimulationParameters
14
from zipline.data.us_equity_minutes import MinuteBarWriterFromDataFrames
15
from .utils.daily_bar_writer import DailyBarWriterFromDataFrames
16
17
18
class TestDataPortal(TestCase):
19
    def test_forward_fill_minute(self):
20
        tempdir = TempDirectory()
21
        try:
22
            env = TradingEnvironment()
23
            env.write_data(
24
                equities_data={
25
                    0: {
26
                        'start_date': pd.Timestamp("2015-09-28", tz='UTC'),
27
                        'end_date': pd.Timestamp("2015-09-29", tz='UTC')
28
                        + timedelta(days=1)
29
                    }
30
                }
31
            )
32
33
            minutes = env.minutes_for_days_in_range(
34
                start=pd.Timestamp("2015-09-28", tz='UTC'),
35
                end=pd.Timestamp("2015-09-29", tz='UTC')
36
            )
37
38
            df = pd.DataFrame({
39
                # one missing bar, then 200 bars of real data,
40
                # then 1.5 days of missing data
41
                "open": np.array([0] + list(range(0, 200)) + [0] * 579)
42
                * 1000,
43
                "high": np.array([0] + list(range(1000, 1200)) + [0] * 579)
44
                * 1000,
45
                "low": np.array([0] + list(range(2000, 2200)) + [0] * 579)
46
                * 1000,
47
                "close": np.array([0] + list(range(3000, 3200)) + [0] * 579)
48
                * 1000,
49
                "volume": [0] + list(range(4000, 4200)) + [0] * 579,
50
                "minute": minutes
51
            })
52
53
            MinuteBarWriterFromDataFrames(
54
                pd.Timestamp('2002-01-02', tz='UTC')).write(
55
                    tempdir.path, {0: df})
56
57
            sim_params = SimulationParameters(
58
                period_start=minutes[0],
59
                period_end=minutes[-1],
60
                data_frequency="minute",
61
                env=env,
62
            )
63
64
            dp = DataPortal(
65
                env,
66
                minutes_equities_path=tempdir.path,
67
                sim_params=sim_params
68
            )
69
70
            for minute_idx, minute in enumerate(minutes):
71
                for field_idx, field in enumerate(
72
                        ["open", "high", "low", "close", "volume"]):
73
                    val = dp.get_spot_value(0, field, dt=minute)
74
                    if minute_idx == 0:
75
                        self.assertEqual(0, val)
76
                    elif minute_idx < 200:
77
                        self.assertEqual((minute_idx - 1) +
78
                                         (field_idx * 1000), val)
79
                    else:
80
                        self.assertEqual(199 + (field_idx * 1000), val)
81
        finally:
82
            tempdir.cleanup()
83
84
    def test_forward_fill_daily(self):
85
        tempdir = TempDirectory()
86
        try:
87
            # 17 trading days
88
            start_day = pd.Timestamp("2015-09-07", tz='UTC')
89
            end_day = pd.Timestamp("2015-09-30", tz='UTC')
90
91
            env = TradingEnvironment()
92
            env.write_data(
93
                equities_data={
94
                    0: {
95
                        'start_date': start_day,
96
                        'end_date': end_day
97
                    }
98
                }
99
            )
100
101
            days = env.days_in_range(start_day, end_day)
102
103
            # first bar is missing.  then 8 real bars.  then 8 more missing
104
            # bars.
105
            df = pd.DataFrame({
106
                "open": [0] + list(range(0, 8)) + [0] * 8,
107
                "high": [0] + list(range(10, 18)) + [0] * 8,
108
                "low": [0] + list(range(20, 28)) + [0] * 8,
109
                "close": [0] + list(range(30, 38)) + [0] * 8,
110
                "volume": [0] + list(range(40, 48)) + [0] * 8,
111
                "day": [day.value for day in days]
112
            }, index=days)
113
114
            assets = {0: df}
115
            path = os.path.join(tempdir.path, "testdaily.bcolz")
116
117
            DailyBarWriterFromDataFrames(assets).write(
118
                path,
119
                days,
120
                assets
121
            )
122
123
            sim_params = SimulationParameters(
124
                period_start=days[0],
125
                period_end=days[-1],
126
                data_frequency="daily"
127
            )
128
129
            dp = DataPortal(
130
                env,
131
                daily_equities_path=path,
132
                sim_params=sim_params
133
            )
134
135
            for day_idx, day in enumerate(days):
136
                for field_idx, field in enumerate(
137
                        ["open", "high", "low", "close", "volume"]):
138
                    val = dp.get_spot_value(0, field, dt=day)
139
                    if day_idx == 0:
140
                        self.assertEqual(0, val)
141
                    elif day_idx < 9:
142
                        self.assertEqual((day_idx - 1) + (field_idx * 10), val)
143
                    else:
144
                        self.assertEqual(7 + (field_idx * 10), val)
145
        finally:
146
            tempdir.cleanup()
147
148
    def test_adjust_forward_fill_minute(self):
149
        tempdir = TempDirectory()
150
        try:
151
            start_day = pd.Timestamp("2013-06-21", tz='UTC')
152
            end_day = pd.Timestamp("2013-06-24", tz='UTC')
153
154
            env = TradingEnvironment()
155
            env.write_data(
156
                equities_data={
157
                    0: {
158
                        'start_date': start_day,
159
                        'end_date': env.next_trading_day(end_day)
160
                    }
161
                }
162
            )
163
164
            minutes = env.minutes_for_days_in_range(
165
                start=start_day,
166
                end=end_day
167
            )
168
169
            df = pd.DataFrame({
170
                # 390 bars of real data, then 100 missing bars, then 290
171
                # bars of data again
172
                "open": np.array(list(range(0, 390)) + [0] * 100 +
173
                                 list(range(390, 680))) * 1000,
174
                "high": np.array(list(range(1000, 1390)) + [0] * 100 +
175
                                 list(range(1390, 1680))) * 1000,
176
                "low": np.array(list(range(2000, 2390)) + [0] * 100 +
177
                                list(range(2390, 2680))) * 1000,
178
                "close": np.array(list(range(3000, 3390)) + [0] * 100 +
179
                                  list(range(3390, 3680))) * 1000,
180
                "volume": np.array(list(range(4000, 4390)) + [0] * 100 +
181
                                   list(range(4390, 4680))),
182
                "minute": minutes
183
            })
184
185
            MinuteBarWriterFromDataFrames(
186
                pd.Timestamp('2002-01-02', tz='UTC')).write(
187
                    tempdir.path, {0: df})
188
189
            sim_params = SimulationParameters(
190
                period_start=minutes[0],
191
                period_end=minutes[-1],
192
                data_frequency="minute",
193
                env=env
194
            )
195
196
            # create a split for 6/24
197
            adjustments_path = os.path.join(tempdir.path, "adjustments.db")
198
            writer = SQLiteAdjustmentWriter(adjustments_path,
199
                                            pd.date_range(start=start_day,
200
                                                          end=end_day),
201
                                            None)
202
203
            splits = pd.DataFrame([{
204
                'effective_date': int(end_day.value / 1e9),
205
                'ratio': 0.5,
206
                'sid': 0
207
            }])
208
209
            dividend_data = {
210
                # Hackery to make the dtypes correct on an empty frame.
211
                'ex_date': np.array([], dtype='datetime64[ns]'),
212
                'pay_date': np.array([], dtype='datetime64[ns]'),
213
                'record_date': np.array([], dtype='datetime64[ns]'),
214
                'declared_date': np.array([], dtype='datetime64[ns]'),
215
                'amount': np.array([], dtype=float),
216
                'sid': np.array([], dtype=int),
217
            }
218
            dividends = pd.DataFrame(
219
                dividend_data,
220
                index=pd.DatetimeIndex([], tz='UTC'),
221
                columns=['ex_date',
222
                         'pay_date',
223
                         'record_date',
224
                         'declared_date',
225
                         'amount',
226
                         'sid']
227
            )
228
229
            merger_data = {
230
                # Hackery to make the dtypes correct on an empty frame.
231
                'effective_date': np.array([], dtype=int),
232
                'ratio': np.array([], dtype=float),
233
                'sid': np.array([], dtype=int),
234
            }
235
            mergers = pd.DataFrame(
236
                merger_data,
237
                index=pd.DatetimeIndex([], tz='UTC')
238
            )
239
240
            writer.write(splits, mergers, dividends)
241
242
            dp = DataPortal(
243
                env,
244
                minutes_equities_path=tempdir.path,
245
                sim_params=sim_params,
246
                adjustment_reader=SQLiteAdjustmentReader(adjustments_path)
247
            )
248
249
            # phew, finally ready to start testing.
250
            for idx, minute in enumerate(minutes[:390]):
251
                for field_idx, field in enumerate(["open", "high", "low",
252
                                                   "close", "volume"]):
253
                    self.assertEqual(
254
                        dp.get_spot_value(0, field, dt=minute),
255
                        idx + (1000 * field_idx)
256
                    )
257
258
            for idx, minute in enumerate(minutes[390:490]):
259
                # no actual data for this part, so we'll forward-fill.
260
                # make sure the forward-filled values are adjusted.
261
                for field_idx, field in enumerate(["open", "high", "low",
262
                                                   "close"]):
263
                    self.assertEqual(
264
                        dp.get_spot_value(0, field, dt=minute),
265
                        (389 + (1000 * field_idx)) / 2.0
266
                    )
267
268
                self.assertEqual(
269
                    dp.get_spot_value(0, "volume", dt=minute),
270
                    8778  # 4389 * 2
271
                )
272
273
            for idx, minute in enumerate(minutes[490:]):
274
                # back to real data
275
                for field_idx, field in enumerate(["open", "high", "low",
276
                                                   "close", "volume"]):
277
                    self.assertEqual(
278
                        dp.get_spot_value(0, field, dt=minute),
279
                        (390 + idx + (1000 * field_idx))
280
                    )
281
        finally:
282
            tempdir.cleanup()
283
284
    def test_spot_value_futures(self):
285
        tempdir = TempDirectory()
286
        try:
287
            start_dt = pd.Timestamp("2015-11-20 20:11", tz='UTC')
288
            end_dt = pd.Timestamp(start_dt + timedelta(minutes=10000))
289
290
            zeroes_buffer = \
291
                [0] * int((start_dt -
292
                           normalize_date(start_dt)).total_seconds() / 60)
293
294
            df = pd.DataFrame({
295
                "open": np.array(zeroes_buffer + list(range(0, 10000))) * 1000,
296
                "high": np.array(
297
                    zeroes_buffer + list(range(10000, 20000))) * 1000,
298
                "low": np.array(
299
                    zeroes_buffer + list(range(20000, 30000))) * 1000,
300
                "close": np.array(
301
                    zeroes_buffer + list(range(30000, 40000))) * 1000,
302
                "volume": np.array(zeroes_buffer + list(range(40000, 50000)))
303
            })
304
305
            path = os.path.join(tempdir.path, "123.bcolz")
306
            ctable = bcolz.ctable.fromdataframe(df, rootdir=path)
307
            ctable.attrs["start_dt"] = start_dt.value / 1e9
308
            ctable.attrs["last_dt"] = end_dt.value / 1e9
309
310
            env = TradingEnvironment()
311
            env.write_data(futures_data={
312
                123: {
313
                    "start_date": normalize_date(start_dt),
314
                    "end_date": env.next_trading_day(normalize_date(end_dt)),
315
                    'symbol': 'TEST_FUTURE',
316
                    'asset_type': 'future',
317
                }
318
            })
319
320
            dp = DataPortal(
321
                env,
322
                minutes_futures_path=tempdir.path
323
            )
324
325
            future123 = env.asset_finder.retrieve_asset(123)
326
327
            for i in range(0, 10000):
328
                dt = pd.Timestamp(start_dt + timedelta(minutes=i))
329
330
                self.assertEqual(i,
331
                                 dp.get_spot_value(future123, "open", dt))
332
                self.assertEqual(i + 10000,
333
                                 dp.get_spot_value(future123, "high", dt))
334
                self.assertEqual(i + 20000,
335
                                 dp.get_spot_value(future123, "low", dt))
336
                self.assertEqual(i + 30000,
337
                                 dp.get_spot_value(future123, "close", dt))
338
                self.assertEqual(i + 40000,
339
                                 dp.get_spot_value(future123, "volume", dt))
340
341
        finally:
342
            tempdir.cleanup()
343