Completed
Pull Request — master (#858)
by Eddie
01:48
created

tests.TestDataPortal.test_spot_value_futures()   A

Complexity

Conditions 2

Size

Total Lines 68

Duplication

Lines 0
Ratio 0 %
Metric Value
cc 2
dl 0
loc 68
rs 9.2448

How to fix   Long Method   

Long Method

Small methods make your code easier to understand, in particular if combined with a good name. Besides, if your method is small, finding a good name is usually much easier.

For example, if you find yourself adding comments to a method's body, this is usually a good sign to extract the commented part to a new method, and use the comment as a starting point when coming up with a good name for this new method.

Commonly applied refactorings include:

1
import os
2
from datetime import timedelta
3
import bcolz
4
import numpy as np
5
import pandas as pd
6
7
from unittest import TestCase
8
from pandas.tslib import normalize_date
9
from testfixtures import TempDirectory
10
from zipline.data.data_portal import DataPortal
11
from zipline.data.us_equity_pricing import SQLiteAdjustmentWriter, \
12
    SQLiteAdjustmentReader
13
from zipline.finance.trading import TradingEnvironment, SimulationParameters
14
from zipline.data.us_equity_pricing import BcolzDailyBarReader
15
from zipline.data.us_equity_minutes import (
16
    MinuteBarWriterFromDataFrames,
17
    BcolzMinuteBarReader
18
)
19
from zipline.data.future_pricing import FutureMinuteReader
20
from .utils.daily_bar_writer import DailyBarWriterFromDataFrames
21
22
23
class TestDataPortal(TestCase):
24
    def test_forward_fill_minute(self):
25
        tempdir = TempDirectory()
26
        try:
27
            env = TradingEnvironment()
28
            env.write_data(
29
                equities_data={
30
                    0: {
31
                        'start_date': pd.Timestamp("2015-09-28", tz='UTC'),
32
                        'end_date': pd.Timestamp("2015-09-29", tz='UTC')
33
                        + timedelta(days=1)
34
                    }
35
                }
36
            )
37
38
            minutes = env.minutes_for_days_in_range(
39
                start=pd.Timestamp("2015-09-28", tz='UTC'),
40
                end=pd.Timestamp("2015-09-29", tz='UTC')
41
            )
42
43
            df = pd.DataFrame({
44
                # one missing bar, then 200 bars of real data,
45
                # then 1.5 days of missing data
46
                "open": np.array([0] + list(range(0, 200)) + [0] * 579)
47
                * 1000,
48
                "high": np.array([0] + list(range(1000, 1200)) + [0] * 579)
49
                * 1000,
50
                "low": np.array([0] + list(range(2000, 2200)) + [0] * 579)
51
                * 1000,
52
                "close": np.array([0] + list(range(3000, 3200)) + [0] * 579)
53
                * 1000,
54
                "volume": [0] + list(range(4000, 4200)) + [0] * 579,
55
                "minute": minutes
56
            })
57
58
            MinuteBarWriterFromDataFrames(
59
                pd.Timestamp('2002-01-02', tz='UTC')).write(
60
                    tempdir.path, {0: df})
61
62
            sim_params = SimulationParameters(
63
                period_start=minutes[0],
64
                period_end=minutes[-1],
65
                data_frequency="minute",
66
                env=env,
67
            )
68
69
            equity_minute_reader = BcolzMinuteBarReader(tempdir.path)
70
71
            dp = DataPortal(
72
                env,
73
                equity_minute_reader=equity_minute_reader,
74
            )
75
76
            for minute_idx, minute in enumerate(minutes):
77
                for field_idx, field in enumerate(
78
                        ["open", "high", "low", "close", "volume"]):
79
                    val = dp.get_spot_value(
80
                        0, field,
81
                        dt=minute,
82
                        data_frequency=sim_params.data_frequency)
83
                    if minute_idx == 0:
84
                        self.assertEqual(0, val)
85
                    elif minute_idx < 200:
86
                        self.assertEqual((minute_idx - 1) +
87
                                         (field_idx * 1000), val)
88
                    else:
89
                        self.assertEqual(199 + (field_idx * 1000), val)
90
        finally:
91
            tempdir.cleanup()
92
93
    def test_forward_fill_daily(self):
94
        tempdir = TempDirectory()
95
        try:
96
            # 17 trading days
97
            start_day = pd.Timestamp("2015-09-07", tz='UTC')
98
            end_day = pd.Timestamp("2015-09-30", tz='UTC')
99
100
            env = TradingEnvironment()
101
            env.write_data(
102
                equities_data={
103
                    0: {
104
                        'start_date': start_day,
105
                        'end_date': end_day
106
                    }
107
                }
108
            )
109
110
            days = env.days_in_range(start_day, end_day)
111
112
            # first bar is missing.  then 8 real bars.  then 8 more missing
113
            # bars.
114
            df = pd.DataFrame({
115
                "open": [0] + list(range(0, 8)) + [0] * 8,
116
                "high": [0] + list(range(10, 18)) + [0] * 8,
117
                "low": [0] + list(range(20, 28)) + [0] * 8,
118
                "close": [0] + list(range(30, 38)) + [0] * 8,
119
                "volume": [0] + list(range(40, 48)) + [0] * 8,
120
                "day": [day.value for day in days]
121
            }, index=days)
122
123
            assets = {0: df}
124
            path = os.path.join(tempdir.path, "testdaily.bcolz")
125
126
            DailyBarWriterFromDataFrames(assets).write(
127
                path,
128
                days,
129
                assets
130
            )
131
132
            sim_params = SimulationParameters(
133
                period_start=days[0],
134
                period_end=days[-1],
135
                data_frequency="daily"
136
            )
137
138
            equity_daily_reader = BcolzDailyBarReader(path)
139
140
            dp = DataPortal(
141
                env,
142
                equity_daily_reader=equity_daily_reader,
143
            )
144
145
            for day_idx, day in enumerate(days):
146
                for field_idx, field in enumerate(
147
                        ["open", "high", "low", "close", "volume"]):
148
                    val = dp.get_spot_value(
149
                        0, field,
150
                        dt=day,
151
                        data_frequency=sim_params.data_frequency)
152
                    if day_idx == 0:
153
                        self.assertEqual(0, val)
154
                    elif day_idx < 9:
155
                        self.assertEqual((day_idx - 1) + (field_idx * 10), val)
156
                    else:
157
                        self.assertEqual(7 + (field_idx * 10), val)
158
        finally:
159
            tempdir.cleanup()
160
161
    def test_adjust_forward_fill_minute(self):
162
        tempdir = TempDirectory()
163
        try:
164
            start_day = pd.Timestamp("2013-06-21", tz='UTC')
165
            end_day = pd.Timestamp("2013-06-24", tz='UTC')
166
167
            env = TradingEnvironment()
168
            env.write_data(
169
                equities_data={
170
                    0: {
171
                        'start_date': start_day,
172
                        'end_date': env.next_trading_day(end_day)
173
                    }
174
                }
175
            )
176
177
            minutes = env.minutes_for_days_in_range(
178
                start=start_day,
179
                end=end_day
180
            )
181
182
            df = pd.DataFrame({
183
                # 390 bars of real data, then 100 missing bars, then 290
184
                # bars of data again
185
                "open": np.array(list(range(0, 390)) + [0] * 100 +
186
                                 list(range(390, 680))) * 1000,
187
                "high": np.array(list(range(1000, 1390)) + [0] * 100 +
188
                                 list(range(1390, 1680))) * 1000,
189
                "low": np.array(list(range(2000, 2390)) + [0] * 100 +
190
                                list(range(2390, 2680))) * 1000,
191
                "close": np.array(list(range(3000, 3390)) + [0] * 100 +
192
                                  list(range(3390, 3680))) * 1000,
193
                "volume": np.array(list(range(4000, 4390)) + [0] * 100 +
194
                                   list(range(4390, 4680))),
195
                "minute": minutes
196
            })
197
198
            MinuteBarWriterFromDataFrames(
199
                pd.Timestamp('2002-01-02', tz='UTC')).write(
200
                    tempdir.path, {0: df})
201
202
            sim_params = SimulationParameters(
203
                period_start=minutes[0],
204
                period_end=minutes[-1],
205
                data_frequency="minute",
206
                env=env
207
            )
208
209
            # create a split for 6/24
210
            adjustments_path = os.path.join(tempdir.path, "adjustments.db")
211
            writer = SQLiteAdjustmentWriter(adjustments_path,
212
                                            pd.date_range(start=start_day,
213
                                                          end=end_day),
214
                                            None)
215
216
            splits = pd.DataFrame([{
217
                'effective_date': int(end_day.value / 1e9),
218
                'ratio': 0.5,
219
                'sid': 0
220
            }])
221
222
            dividend_data = {
223
                # Hackery to make the dtypes correct on an empty frame.
224
                'ex_date': np.array([], dtype='datetime64[ns]'),
225
                'pay_date': np.array([], dtype='datetime64[ns]'),
226
                'record_date': np.array([], dtype='datetime64[ns]'),
227
                'declared_date': np.array([], dtype='datetime64[ns]'),
228
                'amount': np.array([], dtype=float),
229
                'sid': np.array([], dtype=int),
230
            }
231
            dividends = pd.DataFrame(
232
                dividend_data,
233
                index=pd.DatetimeIndex([], tz='UTC'),
234
                columns=['ex_date',
235
                         'pay_date',
236
                         'record_date',
237
                         'declared_date',
238
                         'amount',
239
                         'sid']
240
            )
241
242
            merger_data = {
243
                # Hackery to make the dtypes correct on an empty frame.
244
                'effective_date': np.array([], dtype=int),
245
                'ratio': np.array([], dtype=float),
246
                'sid': np.array([], dtype=int),
247
            }
248
            mergers = pd.DataFrame(
249
                merger_data,
250
                index=pd.DatetimeIndex([], tz='UTC')
251
            )
252
253
            writer.write(splits, mergers, dividends)
254
255
            equity_minute_reader = BcolzMinuteBarReader(tempdir.path)
256
257
            dp = DataPortal(
258
                env,
259
                equity_minute_reader=equity_minute_reader,
260
                adjustment_reader=SQLiteAdjustmentReader(adjustments_path)
261
            )
262
263
            # phew, finally ready to start testing.
264
            for idx, minute in enumerate(minutes[:390]):
265
                for field_idx, field in enumerate(["open", "high", "low",
266
                                                   "close", "volume"]):
267
                    self.assertEqual(
268
                        dp.get_spot_value(
269
                            0, field,
270
                            dt=minute,
271
                            data_frequency=sim_params.data_frequency),
272
                        idx + (1000 * field_idx)
273
                    )
274
275
            for idx, minute in enumerate(minutes[390:490]):
276
                # no actual data for this part, so we'll forward-fill.
277
                # make sure the forward-filled values are adjusted.
278
                for field_idx, field in enumerate(["open", "high", "low",
279
                                                   "close"]):
280
                    self.assertEqual(
281
                        dp.get_spot_value(
282
                            0, field,
283
                            dt=minute,
284
                            data_frequency=sim_params.data_frequency),
285
                        (389 + (1000 * field_idx)) / 2.0
286
                    )
287
288
                self.assertEqual(
289
                    dp.get_spot_value(
290
                        0, "volume",
291
                        dt=minute,
292
                        data_frequency=sim_params.data_frequency),
293
                    8778  # 4389 * 2
294
                )
295
296
            for idx, minute in enumerate(minutes[490:]):
297
                # back to real data
298
                for field_idx, field in enumerate(["open", "high", "low",
299
                                                   "close", "volume"]):
300
                    self.assertEqual(
301
                        dp.get_spot_value(
302
                            0, field,
303
                            dt=minute,
304
                            data_frequency=sim_params.data_frequency
305
                        ),
306
                        (390 + idx + (1000 * field_idx))
307
                    )
308
        finally:
309
            tempdir.cleanup()
310
311
    def test_spot_value_futures(self):
312
        tempdir = TempDirectory()
313
        try:
314
            start_dt = pd.Timestamp("2015-11-20 20:11", tz='UTC')
315
            end_dt = pd.Timestamp(start_dt + timedelta(minutes=10000))
316
317
            zeroes_buffer = \
318
                [0] * int((start_dt -
319
                           normalize_date(start_dt)).total_seconds() / 60)
320
321
            df = pd.DataFrame({
322
                "open": np.array(zeroes_buffer + list(range(0, 10000))) * 1000,
323
                "high": np.array(
324
                    zeroes_buffer + list(range(10000, 20000))) * 1000,
325
                "low": np.array(
326
                    zeroes_buffer + list(range(20000, 30000))) * 1000,
327
                "close": np.array(
328
                    zeroes_buffer + list(range(30000, 40000))) * 1000,
329
                "volume": np.array(zeroes_buffer + list(range(40000, 50000)))
330
            })
331
332
            path = os.path.join(tempdir.path, "123.bcolz")
333
            ctable = bcolz.ctable.fromdataframe(df, rootdir=path)
334
            ctable.attrs["start_dt"] = start_dt.value / 1e9
335
            ctable.attrs["last_dt"] = end_dt.value / 1e9
336
337
            env = TradingEnvironment()
338
            env.write_data(futures_data={
339
                123: {
340
                    "start_date": normalize_date(start_dt),
341
                    "end_date": env.next_trading_day(normalize_date(end_dt)),
342
                    'symbol': 'TEST_FUTURE',
343
                    'asset_type': 'future',
344
                }
345
            })
346
347
            future_minute_reader = FutureMinuteReader(tempdir.path)
348
349
            dp = DataPortal(
350
                env,
351
                future_minute_reader=future_minute_reader
352
            )
353
354
            future123 = env.asset_finder.retrieve_asset(123)
355
356
            data_frequency = 'minute'
357
358
            for i in range(0, 10000):
359
                dt = pd.Timestamp(start_dt + timedelta(minutes=i))
360
361
                self.assertEqual(i,
362
                                 dp.get_spot_value(
363
                                     future123, "open", dt, data_frequency))
364
                self.assertEqual(i + 10000,
365
                                 dp.get_spot_value(
366
                                     future123, "high", dt, data_frequency))
367
                self.assertEqual(i + 20000,
368
                                 dp.get_spot_value(
369
                                     future123, "low", dt, data_frequency))
370
                self.assertEqual(i + 30000,
371
                                 dp.get_spot_value(
372
                                     future123, "close", dt, data_frequency))
373
                self.assertEqual(i + 40000,
374
                                 dp.get_spot_value(
375
                                     future123, "volume", dt, data_frequency))
376
377
        finally:
378
            tempdir.cleanup()
379