Completed
Pull Request — master (#858)
by Eddie
05:34 queued 02:25
created

tests.TestDataPortal   A

Complexity

Total Complexity 20

Size/Duplication

Total Lines 319
Duplicated Lines 0 %
Metric Value
dl 0
loc 319
rs 10
wmc 20

4 Methods

Rating   Name   Duplication   Size   Complexity  
C test_adjust_forward_fill_minute() 0 132 7
B test_forward_fill_minute() 0 61 5
A test_spot_value_futures() 0 59 2
B test_forward_fill_daily() 0 63 6
1
import os
2
from datetime import timedelta
3
import bcolz
4
import numpy as np
5
import pandas as pd
6
7
from unittest import TestCase
8
from pandas.tslib import normalize_date
9
from testfixtures import TempDirectory
10
from zipline.data.data_portal import DataPortal
11
from zipline.data.us_equity_pricing import SQLiteAdjustmentWriter, \
12
    SQLiteAdjustmentReader
13
from zipline.finance.trading import TradingEnvironment, SimulationParameters
14
from zipline.data.minute_writer import MinuteBarWriterFromDataFrames
15
from .utils.daily_bar_writer import DailyBarWriterFromDataFrames
16
17
18
class TestDataPortal(TestCase):
19
    def test_forward_fill_minute(self):
20
        tempdir = TempDirectory()
21
        try:
22
            env = TradingEnvironment()
23
            env.write_data(
24
                equities_data={
25
                    0: {
26
                        'start_date': pd.Timestamp("2015-09-28", tz='UTC'),
27
                        'end_date': pd.Timestamp("2015-09-29", tz='UTC')
28
                        + timedelta(days=1)
29
                    }
30
                }
31
            )
32
33
            minutes = env.minutes_for_days_in_range(
34
                start=pd.Timestamp("2015-09-28", tz='UTC'),
35
                end=pd.Timestamp("2015-09-29", tz='UTC')
36
            )
37
38
            df = pd.DataFrame({
39
                # one missing bar, then 200 bars of real data,
40
                # then 1.5 days of missing data
41
                "open": np.array([0] + list(range(0, 200)) + [0] * 579)
42
                * 1000,
43
                "high": np.array([0] + list(range(1000, 1200)) + [0] * 579)
44
                * 1000,
45
                "low": np.array([0] + list(range(2000, 2200)) + [0] * 579)
46
                * 1000,
47
                "close": np.array([0] + list(range(3000, 3200)) + [0] * 579)
48
                * 1000,
49
                "volume": [0] + list(range(4000, 4200)) + [0] * 579,
50
                "minute": minutes
51
            })
52
53
            MinuteBarWriterFromDataFrames().write(tempdir.path, {0: df})
54
55
            sim_params = SimulationParameters(
56
                period_start=minutes[0],
57
                period_end=minutes[-1],
58
                data_frequency="minute"
59
            )
60
61
            dp = DataPortal(
62
                env,
63
                minutes_equities_path=tempdir.path,
64
                sim_params=sim_params
65
            )
66
67
            for minute_idx, minute in enumerate(minutes):
68
                for field_idx, field in enumerate(
69
                        ["open", "high", "low", "close", "volume"]):
70
                    val = dp.get_spot_value(0, field, dt=minute)
71
                    if minute_idx == 0:
72
                        self.assertEqual(0, val)
73
                    elif minute_idx < 200:
74
                        self.assertEqual((minute_idx - 1) +
75
                                         (field_idx * 1000), val)
76
                    else:
77
                        self.assertEqual(199 + (field_idx * 1000), val)
78
        finally:
79
            tempdir.cleanup()
80
81
    def test_forward_fill_daily(self):
82
        tempdir = TempDirectory()
83
        try:
84
            # 17 trading days
85
            start_day = pd.Timestamp("2015-09-07", tz='UTC')
86
            end_day = pd.Timestamp("2015-09-30", tz='UTC')
87
88
            env = TradingEnvironment()
89
            env.write_data(
90
                equities_data={
91
                    0: {
92
                        'start_date': start_day,
93
                        'end_date': end_day
94
                    }
95
                }
96
            )
97
98
            days = env.days_in_range(start_day, end_day)
99
100
            # first bar is missing.  then 8 real bars.  then 8 more missing
101
            # bars.
102
            df = pd.DataFrame({
103
                "open": [0] + list(range(0, 8)) + [0] * 8,
104
                "high": [0] + list(range(10, 18)) + [0] * 8,
105
                "low": [0] + list(range(20, 28)) + [0] * 8,
106
                "close": [0] + list(range(30, 38)) + [0] * 8,
107
                "volume": [0] + list(range(40, 48)) + [0] * 8,
108
                "day": [day.value for day in days]
109
            }, index=days)
110
111
            assets = {0: df}
112
            path = os.path.join(tempdir.path, "testdaily.bcolz")
113
114
            DailyBarWriterFromDataFrames(assets).write(
115
                path,
116
                days,
117
                assets
118
            )
119
120
            sim_params = SimulationParameters(
121
                period_start=days[0],
122
                period_end=days[-1],
123
                data_frequency="daily"
124
            )
125
126
            dp = DataPortal(
127
                env,
128
                daily_equities_path=path,
129
                sim_params=sim_params
130
            )
131
132
            for day_idx, day in enumerate(days):
133
                for field_idx, field in enumerate(
134
                        ["open", "high", "low", "close", "volume"]):
135
                    val = dp.get_spot_value(0, field, dt=day)
136
                    if day_idx == 0:
137
                        self.assertEqual(0, val)
138
                    elif day_idx < 9:
139
                        self.assertEqual((day_idx - 1) + (field_idx * 10), val)
140
                    else:
141
                        self.assertEqual(7 + (field_idx * 10), val)
142
        finally:
143
            tempdir.cleanup()
144
145
    def test_adjust_forward_fill_minute(self):
146
        tempdir = TempDirectory()
147
        try:
148
            start_day = pd.Timestamp("2013-06-21", tz='UTC')
149
            end_day = pd.Timestamp("2013-06-24", tz='UTC')
150
151
            env = TradingEnvironment()
152
            env.write_data(
153
                equities_data={
154
                    0: {
155
                        'start_date': start_day,
156
                        'end_date': env.next_trading_day(end_day)
157
                    }
158
                }
159
            )
160
161
            minutes = env.minutes_for_days_in_range(
162
                start=start_day,
163
                end=end_day
164
            )
165
166
            df = pd.DataFrame({
167
                # 390 bars of real data, then 100 missing bars, then 290
168
                # bars of data again
169
                "open": np.array(list(range(0, 390)) + [0] * 100 +
170
                                 list(range(390, 680))) * 1000,
171
                "high": np.array(list(range(1000, 1390)) + [0] * 100 +
172
                                 list(range(1390, 1680))) * 1000,
173
                "low": np.array(list(range(2000, 2390)) + [0] * 100 +
174
                                list(range(2390, 2680))) * 1000,
175
                "close": np.array(list(range(3000, 3390)) + [0] * 100 +
176
                                  list(range(3390, 3680))) * 1000,
177
                "volume": np.array(list(range(4000, 4390)) + [0] * 100 +
178
                                   list(range(4390, 4680))),
179
                "minute": minutes
180
            })
181
182
            MinuteBarWriterFromDataFrames().write(tempdir.path, {0: df})
183
184
            sim_params = SimulationParameters(
185
                period_start=minutes[0],
186
                period_end=minutes[-1],
187
                data_frequency="minute"
188
            )
189
190
            # create a split for 6/24
191
            adjustments_path = os.path.join(tempdir.path, "adjustments.db")
192
            writer = SQLiteAdjustmentWriter(adjustments_path,
193
                                            pd.date_range(start=start_day,
194
                                                          end=end_day),
195
                                            None)
196
197
            splits = pd.DataFrame([{
198
                'effective_date': int(end_day.value / 1e9),
199
                'ratio': 0.5,
200
                'sid': 0
201
            }])
202
203
            dividend_data = {
204
                # Hackery to make the dtypes correct on an empty frame.
205
                'ex_date': np.array([], dtype='datetime64[ns]'),
206
                'pay_date': np.array([], dtype='datetime64[ns]'),
207
                'record_date': np.array([], dtype='datetime64[ns]'),
208
                'declared_date': np.array([], dtype='datetime64[ns]'),
209
                'amount': np.array([], dtype=float),
210
                'sid': np.array([], dtype=int),
211
            }
212
            dividends = pd.DataFrame(
213
                dividend_data,
214
                index=pd.DatetimeIndex([], tz='UTC'),
215
                columns=['ex_date',
216
                         'pay_date',
217
                         'record_date',
218
                         'declared_date',
219
                         'amount',
220
                         'sid']
221
            )
222
223
            merger_data = {
224
                # Hackery to make the dtypes correct on an empty frame.
225
                'effective_date': np.array([], dtype=int),
226
                'ratio': np.array([], dtype=float),
227
                'sid': np.array([], dtype=int),
228
            }
229
            mergers = pd.DataFrame(
230
                merger_data,
231
                index=pd.DatetimeIndex([], tz='UTC')
232
            )
233
234
            writer.write(splits, mergers, dividends)
235
236
            dp = DataPortal(
237
                env,
238
                minutes_equities_path=tempdir.path,
239
                sim_params=sim_params,
240
                adjustment_reader=SQLiteAdjustmentReader(adjustments_path)
241
            )
242
243
            # phew, finally ready to start testing.
244
            for idx, minute in enumerate(minutes[:390]):
245
                for field_idx, field in enumerate(["open", "high", "low",
246
                                                   "close", "volume"]):
247
                    self.assertEqual(
248
                        dp.get_spot_value(0, field, dt=minute),
249
                        idx + (1000 * field_idx)
250
                    )
251
252
            for idx, minute in enumerate(minutes[390:490]):
253
                # no actual data for this part, so we'll forward-fill.
254
                # make sure the forward-filled values are adjusted.
255
                for field_idx, field in enumerate(["open", "high", "low",
256
                                                   "close"]):
257
                    self.assertEqual(
258
                        dp.get_spot_value(0, field, dt=minute),
259
                        (389 + (1000 * field_idx)) / 2.0
260
                    )
261
262
                self.assertEqual(
263
                    dp.get_spot_value(0, "volume", dt=minute),
264
                    8778  # 4389 * 2
265
                )
266
267
            for idx, minute in enumerate(minutes[490:]):
268
                # back to real data
269
                for field_idx, field in enumerate(["open", "high", "low",
270
                                                   "close", "volume"]):
271
                    self.assertEqual(
272
                        dp.get_spot_value(0, field, dt=minute),
273
                        (390 + idx + (1000 * field_idx))
274
                    )
275
        finally:
276
            tempdir.cleanup()
277
278
    def test_spot_value_futures(self):
279
        tempdir = TempDirectory()
280
        try:
281
            start_dt = pd.Timestamp("2015-11-20 20:11", tz='UTC')
282
            end_dt = pd.Timestamp(start_dt + timedelta(minutes=10000))
283
284
            zeroes_buffer = \
285
                [0] * int((start_dt -
286
                           normalize_date(start_dt)).total_seconds() / 60)
287
288
            df = pd.DataFrame({
289
                "open": np.array(zeroes_buffer + list(range(0, 10000))) * 1000,
290
                "high": np.array(
291
                    zeroes_buffer + list(range(10000, 20000))) * 1000,
292
                "low": np.array(
293
                    zeroes_buffer + list(range(20000, 30000))) * 1000,
294
                "close": np.array(
295
                    zeroes_buffer + list(range(30000, 40000))) * 1000,
296
                "volume": np.array(zeroes_buffer + list(range(40000, 50000)))
297
            })
298
299
            path = os.path.join(tempdir.path, "123.bcolz")
300
            ctable = bcolz.ctable.fromdataframe(df, rootdir=path)
301
            ctable.attrs["start_dt"] = start_dt.value / 1e9
302
            ctable.attrs["last_dt"] = end_dt.value / 1e9
303
304
            env = TradingEnvironment()
305
            env.write_data(futures_data={
306
                123: {
307
                    "start_date": normalize_date(start_dt),
308
                    "end_date": env.next_trading_day(normalize_date(end_dt)),
309
                    'symbol': 'TEST_FUTURE',
310
                    'asset_type': 'future',
311
                }
312
            })
313
314
            dp = DataPortal(
315
                env,
316
                minutes_equities_path=tempdir.path
317
            )
318
319
            future123 = env.asset_finder.retrieve_asset(123)
320
321
            for i in range(0, 10000):
322
                dt = pd.Timestamp(start_dt + timedelta(minutes=i))
323
324
                self.assertEqual(i,
325
                                 dp.get_spot_value(future123, "open", dt))
326
                self.assertEqual(i + 10000,
327
                                 dp.get_spot_value(future123, "high", dt))
328
                self.assertEqual(i + 20000,
329
                                 dp.get_spot_value(future123, "low", dt))
330
                self.assertEqual(i + 30000,
331
                                 dp.get_spot_value(future123, "close", dt))
332
                self.assertEqual(i + 40000,
333
                                 dp.get_spot_value(future123, "volume", dt))
334
335
        finally:
336
            tempdir.cleanup()
337