Completed
Pull Request — master (#858)
by Eddie
01:50
created

tests.TestDataPortal   A

Complexity

Total Complexity 20

Size/Duplication

Total Lines 321
Duplicated Lines 0 %
Metric Value
dl 0
loc 321
rs 10
wmc 20

4 Methods

Rating   Name   Duplication   Size   Complexity  
C test_adjust_forward_fill_minute() 0 133 7
B test_forward_fill_minute() 0 62 5
A test_spot_value_futures() 0 59 2
B test_forward_fill_daily() 0 63 6
1
import os
2
from datetime import timedelta
3
import bcolz
4
import numpy as np
5
import pandas as pd
6
7
from unittest import TestCase
8
from pandas.tslib import normalize_date
9
from testfixtures import TempDirectory
10
from zipline.data.data_portal import DataPortal
11
from zipline.data.us_equity_pricing import SQLiteAdjustmentWriter, \
12
    SQLiteAdjustmentReader
13
from zipline.finance.trading import TradingEnvironment, SimulationParameters
14
from zipline.data.us_equity_minutes import MinuteBarWriterFromDataFrames
15
from .utils.daily_bar_writer import DailyBarWriterFromDataFrames
16
17
18
class TestDataPortal(TestCase):
19
    def test_forward_fill_minute(self):
20
        tempdir = TempDirectory()
21
        try:
22
            env = TradingEnvironment()
23
            env.write_data(
24
                equities_data={
25
                    0: {
26
                        'start_date': pd.Timestamp("2015-09-28", tz='UTC'),
27
                        'end_date': pd.Timestamp("2015-09-29", tz='UTC')
28
                        + timedelta(days=1)
29
                    }
30
                }
31
            )
32
33
            minutes = env.minutes_for_days_in_range(
34
                start=pd.Timestamp("2015-09-28", tz='UTC'),
35
                end=pd.Timestamp("2015-09-29", tz='UTC')
36
            )
37
38
            df = pd.DataFrame({
39
                # one missing bar, then 200 bars of real data,
40
                # then 1.5 days of missing data
41
                "open": np.array([0] + list(range(0, 200)) + [0] * 579)
42
                * 1000,
43
                "high": np.array([0] + list(range(1000, 1200)) + [0] * 579)
44
                * 1000,
45
                "low": np.array([0] + list(range(2000, 2200)) + [0] * 579)
46
                * 1000,
47
                "close": np.array([0] + list(range(3000, 3200)) + [0] * 579)
48
                * 1000,
49
                "volume": [0] + list(range(4000, 4200)) + [0] * 579,
50
                "minute": minutes
51
            })
52
53
            MinuteBarWriterFromDataFrames().write(tempdir.path, {0: df})
54
55
            sim_params = SimulationParameters(
56
                period_start=minutes[0],
57
                period_end=minutes[-1],
58
                data_frequency="minute",
59
                env=env,
60
            )
61
62
            dp = DataPortal(
63
                env,
64
                minutes_equities_path=tempdir.path,
65
                sim_params=sim_params
66
            )
67
68
            for minute_idx, minute in enumerate(minutes):
69
                for field_idx, field in enumerate(
70
                        ["open", "high", "low", "close", "volume"]):
71
                    val = dp.get_spot_value(0, field, dt=minute)
72
                    if minute_idx == 0:
73
                        self.assertEqual(0, val)
74
                    elif minute_idx < 200:
75
                        self.assertEqual((minute_idx - 1) +
76
                                         (field_idx * 1000), val)
77
                    else:
78
                        self.assertEqual(199 + (field_idx * 1000), val)
79
        finally:
80
            tempdir.cleanup()
81
82
    def test_forward_fill_daily(self):
83
        tempdir = TempDirectory()
84
        try:
85
            # 17 trading days
86
            start_day = pd.Timestamp("2015-09-07", tz='UTC')
87
            end_day = pd.Timestamp("2015-09-30", tz='UTC')
88
89
            env = TradingEnvironment()
90
            env.write_data(
91
                equities_data={
92
                    0: {
93
                        'start_date': start_day,
94
                        'end_date': end_day
95
                    }
96
                }
97
            )
98
99
            days = env.days_in_range(start_day, end_day)
100
101
            # first bar is missing.  then 8 real bars.  then 8 more missing
102
            # bars.
103
            df = pd.DataFrame({
104
                "open": [0] + list(range(0, 8)) + [0] * 8,
105
                "high": [0] + list(range(10, 18)) + [0] * 8,
106
                "low": [0] + list(range(20, 28)) + [0] * 8,
107
                "close": [0] + list(range(30, 38)) + [0] * 8,
108
                "volume": [0] + list(range(40, 48)) + [0] * 8,
109
                "day": [day.value for day in days]
110
            }, index=days)
111
112
            assets = {0: df}
113
            path = os.path.join(tempdir.path, "testdaily.bcolz")
114
115
            DailyBarWriterFromDataFrames(assets).write(
116
                path,
117
                days,
118
                assets
119
            )
120
121
            sim_params = SimulationParameters(
122
                period_start=days[0],
123
                period_end=days[-1],
124
                data_frequency="daily"
125
            )
126
127
            dp = DataPortal(
128
                env,
129
                daily_equities_path=path,
130
                sim_params=sim_params
131
            )
132
133
            for day_idx, day in enumerate(days):
134
                for field_idx, field in enumerate(
135
                        ["open", "high", "low", "close", "volume"]):
136
                    val = dp.get_spot_value(0, field, dt=day)
137
                    if day_idx == 0:
138
                        self.assertEqual(0, val)
139
                    elif day_idx < 9:
140
                        self.assertEqual((day_idx - 1) + (field_idx * 10), val)
141
                    else:
142
                        self.assertEqual(7 + (field_idx * 10), val)
143
        finally:
144
            tempdir.cleanup()
145
146
    def test_adjust_forward_fill_minute(self):
147
        tempdir = TempDirectory()
148
        try:
149
            start_day = pd.Timestamp("2013-06-21", tz='UTC')
150
            end_day = pd.Timestamp("2013-06-24", tz='UTC')
151
152
            env = TradingEnvironment()
153
            env.write_data(
154
                equities_data={
155
                    0: {
156
                        'start_date': start_day,
157
                        'end_date': env.next_trading_day(end_day)
158
                    }
159
                }
160
            )
161
162
            minutes = env.minutes_for_days_in_range(
163
                start=start_day,
164
                end=end_day
165
            )
166
167
            df = pd.DataFrame({
168
                # 390 bars of real data, then 100 missing bars, then 290
169
                # bars of data again
170
                "open": np.array(list(range(0, 390)) + [0] * 100 +
171
                                 list(range(390, 680))) * 1000,
172
                "high": np.array(list(range(1000, 1390)) + [0] * 100 +
173
                                 list(range(1390, 1680))) * 1000,
174
                "low": np.array(list(range(2000, 2390)) + [0] * 100 +
175
                                list(range(2390, 2680))) * 1000,
176
                "close": np.array(list(range(3000, 3390)) + [0] * 100 +
177
                                  list(range(3390, 3680))) * 1000,
178
                "volume": np.array(list(range(4000, 4390)) + [0] * 100 +
179
                                   list(range(4390, 4680))),
180
                "minute": minutes
181
            })
182
183
            MinuteBarWriterFromDataFrames().write(tempdir.path, {0: df})
184
185
            sim_params = SimulationParameters(
186
                period_start=minutes[0],
187
                period_end=minutes[-1],
188
                data_frequency="minute",
189
                env=env
190
            )
191
192
            # create a split for 6/24
193
            adjustments_path = os.path.join(tempdir.path, "adjustments.db")
194
            writer = SQLiteAdjustmentWriter(adjustments_path,
195
                                            pd.date_range(start=start_day,
196
                                                          end=end_day),
197
                                            None)
198
199
            splits = pd.DataFrame([{
200
                'effective_date': int(end_day.value / 1e9),
201
                'ratio': 0.5,
202
                'sid': 0
203
            }])
204
205
            dividend_data = {
206
                # Hackery to make the dtypes correct on an empty frame.
207
                'ex_date': np.array([], dtype='datetime64[ns]'),
208
                'pay_date': np.array([], dtype='datetime64[ns]'),
209
                'record_date': np.array([], dtype='datetime64[ns]'),
210
                'declared_date': np.array([], dtype='datetime64[ns]'),
211
                'amount': np.array([], dtype=float),
212
                'sid': np.array([], dtype=int),
213
            }
214
            dividends = pd.DataFrame(
215
                dividend_data,
216
                index=pd.DatetimeIndex([], tz='UTC'),
217
                columns=['ex_date',
218
                         'pay_date',
219
                         'record_date',
220
                         'declared_date',
221
                         'amount',
222
                         'sid']
223
            )
224
225
            merger_data = {
226
                # Hackery to make the dtypes correct on an empty frame.
227
                'effective_date': np.array([], dtype=int),
228
                'ratio': np.array([], dtype=float),
229
                'sid': np.array([], dtype=int),
230
            }
231
            mergers = pd.DataFrame(
232
                merger_data,
233
                index=pd.DatetimeIndex([], tz='UTC')
234
            )
235
236
            writer.write(splits, mergers, dividends)
237
238
            dp = DataPortal(
239
                env,
240
                minutes_equities_path=tempdir.path,
241
                sim_params=sim_params,
242
                adjustment_reader=SQLiteAdjustmentReader(adjustments_path)
243
            )
244
245
            # phew, finally ready to start testing.
246
            for idx, minute in enumerate(minutes[:390]):
247
                for field_idx, field in enumerate(["open", "high", "low",
248
                                                   "close", "volume"]):
249
                    self.assertEqual(
250
                        dp.get_spot_value(0, field, dt=minute),
251
                        idx + (1000 * field_idx)
252
                    )
253
254
            for idx, minute in enumerate(minutes[390:490]):
255
                # no actual data for this part, so we'll forward-fill.
256
                # make sure the forward-filled values are adjusted.
257
                for field_idx, field in enumerate(["open", "high", "low",
258
                                                   "close"]):
259
                    self.assertEqual(
260
                        dp.get_spot_value(0, field, dt=minute),
261
                        (389 + (1000 * field_idx)) / 2.0
262
                    )
263
264
                self.assertEqual(
265
                    dp.get_spot_value(0, "volume", dt=minute),
266
                    8778  # 4389 * 2
267
                )
268
269
            for idx, minute in enumerate(minutes[490:]):
270
                # back to real data
271
                for field_idx, field in enumerate(["open", "high", "low",
272
                                                   "close", "volume"]):
273
                    self.assertEqual(
274
                        dp.get_spot_value(0, field, dt=minute),
275
                        (390 + idx + (1000 * field_idx))
276
                    )
277
        finally:
278
            tempdir.cleanup()
279
280
    def test_spot_value_futures(self):
281
        tempdir = TempDirectory()
282
        try:
283
            start_dt = pd.Timestamp("2015-11-20 20:11", tz='UTC')
284
            end_dt = pd.Timestamp(start_dt + timedelta(minutes=10000))
285
286
            zeroes_buffer = \
287
                [0] * int((start_dt -
288
                           normalize_date(start_dt)).total_seconds() / 60)
289
290
            df = pd.DataFrame({
291
                "open": np.array(zeroes_buffer + list(range(0, 10000))) * 1000,
292
                "high": np.array(
293
                    zeroes_buffer + list(range(10000, 20000))) * 1000,
294
                "low": np.array(
295
                    zeroes_buffer + list(range(20000, 30000))) * 1000,
296
                "close": np.array(
297
                    zeroes_buffer + list(range(30000, 40000))) * 1000,
298
                "volume": np.array(zeroes_buffer + list(range(40000, 50000)))
299
            })
300
301
            path = os.path.join(tempdir.path, "123.bcolz")
302
            ctable = bcolz.ctable.fromdataframe(df, rootdir=path)
303
            ctable.attrs["start_dt"] = start_dt.value / 1e9
304
            ctable.attrs["last_dt"] = end_dt.value / 1e9
305
306
            env = TradingEnvironment()
307
            env.write_data(futures_data={
308
                123: {
309
                    "start_date": normalize_date(start_dt),
310
                    "end_date": env.next_trading_day(normalize_date(end_dt)),
311
                    'symbol': 'TEST_FUTURE',
312
                    'asset_type': 'future',
313
                }
314
            })
315
316
            dp = DataPortal(
317
                env,
318
                minutes_equities_path=tempdir.path
319
            )
320
321
            future123 = env.asset_finder.retrieve_asset(123)
322
323
            for i in range(0, 10000):
324
                dt = pd.Timestamp(start_dt + timedelta(minutes=i))
325
326
                self.assertEqual(i,
327
                                 dp.get_spot_value(future123, "open", dt))
328
                self.assertEqual(i + 10000,
329
                                 dp.get_spot_value(future123, "high", dt))
330
                self.assertEqual(i + 20000,
331
                                 dp.get_spot_value(future123, "low", dt))
332
                self.assertEqual(i + 30000,
333
                                 dp.get_spot_value(future123, "close", dt))
334
                self.assertEqual(i + 40000,
335
                                 dp.get_spot_value(future123, "volume", dt))
336
337
        finally:
338
            tempdir.cleanup()
339