Completed
Pull Request — master (#858)
by Eddie
01:58
created

tests.TestDataPortal.test_spot_value_futures()   A

Complexity

Conditions 2

Size

Total Lines 66

Duplication

Lines 0
Ratio 0 %
Metric Value
cc 2
dl 0
loc 66
rs 9.3191

How to fix   Long Method   

Long Method

Small methods make your code easier to understand, in particular if combined with a good name. Besides, if your method is small, finding a good name is usually much easier.

For example, if you find yourself adding comments to a method's body, this is usually a good sign to extract the commented part to a new method, and use the comment as a starting point when coming up with a good name for this new method.

Commonly applied refactorings include:

1
import os
2
from datetime import timedelta
3
import bcolz
4
import numpy as np
5
import pandas as pd
6
7
from unittest import TestCase
8
from pandas.tslib import normalize_date
9
from testfixtures import TempDirectory
10
from zipline.data.data_portal import DataPortal
11
from zipline.data.us_equity_pricing import SQLiteAdjustmentWriter, \
12
    SQLiteAdjustmentReader
13
from zipline.finance.trading import TradingEnvironment, SimulationParameters
14
from zipline.data.us_equity_minutes import (
15
    MinuteBarWriterFromDataFrames,
16
    BcolzMinuteBarReader
17
)
18
from .utils.daily_bar_writer import DailyBarWriterFromDataFrames
19
20
21
class TestDataPortal(TestCase):
22
    def test_forward_fill_minute(self):
23
        tempdir = TempDirectory()
24
        try:
25
            env = TradingEnvironment()
26
            env.write_data(
27
                equities_data={
28
                    0: {
29
                        'start_date': pd.Timestamp("2015-09-28", tz='UTC'),
30
                        'end_date': pd.Timestamp("2015-09-29", tz='UTC')
31
                        + timedelta(days=1)
32
                    }
33
                }
34
            )
35
36
            minutes = env.minutes_for_days_in_range(
37
                start=pd.Timestamp("2015-09-28", tz='UTC'),
38
                end=pd.Timestamp("2015-09-29", tz='UTC')
39
            )
40
41
            df = pd.DataFrame({
42
                # one missing bar, then 200 bars of real data,
43
                # then 1.5 days of missing data
44
                "open": np.array([0] + list(range(0, 200)) + [0] * 579)
45
                * 1000,
46
                "high": np.array([0] + list(range(1000, 1200)) + [0] * 579)
47
                * 1000,
48
                "low": np.array([0] + list(range(2000, 2200)) + [0] * 579)
49
                * 1000,
50
                "close": np.array([0] + list(range(3000, 3200)) + [0] * 579)
51
                * 1000,
52
                "volume": [0] + list(range(4000, 4200)) + [0] * 579,
53
                "minute": minutes
54
            })
55
56
            MinuteBarWriterFromDataFrames(
57
                pd.Timestamp('2002-01-02', tz='UTC')).write(
58
                    tempdir.path, {0: df})
59
60
            sim_params = SimulationParameters(
61
                period_start=minutes[0],
62
                period_end=minutes[-1],
63
                data_frequency="minute",
64
                env=env,
65
            )
66
67
            equity_minute_reader = BcolzMinuteBarReader(tempdir.path)
68
69
            dp = DataPortal(
70
                env,
71
                equity_minute_reader=equity_minute_reader,
72
                sim_params=sim_params
73
            )
74
75
            for minute_idx, minute in enumerate(minutes):
76
                for field_idx, field in enumerate(
77
                        ["open", "high", "low", "close", "volume"]):
78
                    val = dp.get_spot_value(
79
                        0, field,
80
                        dt=minute,
81
                        data_frequency=sim_params.data_frequency)
82
                    if minute_idx == 0:
83
                        self.assertEqual(0, val)
84
                    elif minute_idx < 200:
85
                        self.assertEqual((minute_idx - 1) +
86
                                         (field_idx * 1000), val)
87
                    else:
88
                        self.assertEqual(199 + (field_idx * 1000), val)
89
        finally:
90
            tempdir.cleanup()
91
92
    def test_forward_fill_daily(self):
93
        tempdir = TempDirectory()
94
        try:
95
            # 17 trading days
96
            start_day = pd.Timestamp("2015-09-07", tz='UTC')
97
            end_day = pd.Timestamp("2015-09-30", tz='UTC')
98
99
            env = TradingEnvironment()
100
            env.write_data(
101
                equities_data={
102
                    0: {
103
                        'start_date': start_day,
104
                        'end_date': end_day
105
                    }
106
                }
107
            )
108
109
            days = env.days_in_range(start_day, end_day)
110
111
            # first bar is missing.  then 8 real bars.  then 8 more missing
112
            # bars.
113
            df = pd.DataFrame({
114
                "open": [0] + list(range(0, 8)) + [0] * 8,
115
                "high": [0] + list(range(10, 18)) + [0] * 8,
116
                "low": [0] + list(range(20, 28)) + [0] * 8,
117
                "close": [0] + list(range(30, 38)) + [0] * 8,
118
                "volume": [0] + list(range(40, 48)) + [0] * 8,
119
                "day": [day.value for day in days]
120
            }, index=days)
121
122
            assets = {0: df}
123
            path = os.path.join(tempdir.path, "testdaily.bcolz")
124
125
            DailyBarWriterFromDataFrames(assets).write(
126
                path,
127
                days,
128
                assets
129
            )
130
131
            sim_params = SimulationParameters(
132
                period_start=days[0],
133
                period_end=days[-1],
134
                data_frequency="daily"
135
            )
136
137
            dp = DataPortal(
138
                env,
139
                daily_equities_path=path,
140
                sim_params=sim_params
141
            )
142
143
            for day_idx, day in enumerate(days):
144
                for field_idx, field in enumerate(
145
                        ["open", "high", "low", "close", "volume"]):
146
                    val = dp.get_spot_value(
147
                        0, field,
148
                        dt=day,
149
                        data_frequency=sim_params.data_frequency)
150
                    if day_idx == 0:
151
                        self.assertEqual(0, val)
152
                    elif day_idx < 9:
153
                        self.assertEqual((day_idx - 1) + (field_idx * 10), val)
154
                    else:
155
                        self.assertEqual(7 + (field_idx * 10), val)
156
        finally:
157
            tempdir.cleanup()
158
159
    def test_adjust_forward_fill_minute(self):
160
        tempdir = TempDirectory()
161
        try:
162
            start_day = pd.Timestamp("2013-06-21", tz='UTC')
163
            end_day = pd.Timestamp("2013-06-24", tz='UTC')
164
165
            env = TradingEnvironment()
166
            env.write_data(
167
                equities_data={
168
                    0: {
169
                        'start_date': start_day,
170
                        'end_date': env.next_trading_day(end_day)
171
                    }
172
                }
173
            )
174
175
            minutes = env.minutes_for_days_in_range(
176
                start=start_day,
177
                end=end_day
178
            )
179
180
            df = pd.DataFrame({
181
                # 390 bars of real data, then 100 missing bars, then 290
182
                # bars of data again
183
                "open": np.array(list(range(0, 390)) + [0] * 100 +
184
                                 list(range(390, 680))) * 1000,
185
                "high": np.array(list(range(1000, 1390)) + [0] * 100 +
186
                                 list(range(1390, 1680))) * 1000,
187
                "low": np.array(list(range(2000, 2390)) + [0] * 100 +
188
                                list(range(2390, 2680))) * 1000,
189
                "close": np.array(list(range(3000, 3390)) + [0] * 100 +
190
                                  list(range(3390, 3680))) * 1000,
191
                "volume": np.array(list(range(4000, 4390)) + [0] * 100 +
192
                                   list(range(4390, 4680))),
193
                "minute": minutes
194
            })
195
196
            MinuteBarWriterFromDataFrames(
197
                pd.Timestamp('2002-01-02', tz='UTC')).write(
198
                    tempdir.path, {0: df})
199
200
            sim_params = SimulationParameters(
201
                period_start=minutes[0],
202
                period_end=minutes[-1],
203
                data_frequency="minute",
204
                env=env
205
            )
206
207
            # create a split for 6/24
208
            adjustments_path = os.path.join(tempdir.path, "adjustments.db")
209
            writer = SQLiteAdjustmentWriter(adjustments_path,
210
                                            pd.date_range(start=start_day,
211
                                                          end=end_day),
212
                                            None)
213
214
            splits = pd.DataFrame([{
215
                'effective_date': int(end_day.value / 1e9),
216
                'ratio': 0.5,
217
                'sid': 0
218
            }])
219
220
            dividend_data = {
221
                # Hackery to make the dtypes correct on an empty frame.
222
                'ex_date': np.array([], dtype='datetime64[ns]'),
223
                'pay_date': np.array([], dtype='datetime64[ns]'),
224
                'record_date': np.array([], dtype='datetime64[ns]'),
225
                'declared_date': np.array([], dtype='datetime64[ns]'),
226
                'amount': np.array([], dtype=float),
227
                'sid': np.array([], dtype=int),
228
            }
229
            dividends = pd.DataFrame(
230
                dividend_data,
231
                index=pd.DatetimeIndex([], tz='UTC'),
232
                columns=['ex_date',
233
                         'pay_date',
234
                         'record_date',
235
                         'declared_date',
236
                         'amount',
237
                         'sid']
238
            )
239
240
            merger_data = {
241
                # Hackery to make the dtypes correct on an empty frame.
242
                'effective_date': np.array([], dtype=int),
243
                'ratio': np.array([], dtype=float),
244
                'sid': np.array([], dtype=int),
245
            }
246
            mergers = pd.DataFrame(
247
                merger_data,
248
                index=pd.DatetimeIndex([], tz='UTC')
249
            )
250
251
            writer.write(splits, mergers, dividends)
252
253
            equity_minute_reader = BcolzMinuteBarReader(tempdir.path)
254
255
            dp = DataPortal(
256
                env,
257
                equity_minute_reader=equity_minute_reader,
258
                sim_params=sim_params,
259
                adjustment_reader=SQLiteAdjustmentReader(adjustments_path)
260
            )
261
262
            # phew, finally ready to start testing.
263
            for idx, minute in enumerate(minutes[:390]):
264
                for field_idx, field in enumerate(["open", "high", "low",
265
                                                   "close", "volume"]):
266
                    self.assertEqual(
267
                        dp.get_spot_value(
268
                            0, field,
269
                            dt=minute,
270
                            data_frequency=sim_params.data_frequency),
271
                        idx + (1000 * field_idx)
272
                    )
273
274
            for idx, minute in enumerate(minutes[390:490]):
275
                # no actual data for this part, so we'll forward-fill.
276
                # make sure the forward-filled values are adjusted.
277
                for field_idx, field in enumerate(["open", "high", "low",
278
                                                   "close"]):
279
                    self.assertEqual(
280
                        dp.get_spot_value(
281
                            0, field,
282
                            dt=minute,
283
                            data_frequency=sim_params.data_frequency),
284
                        (389 + (1000 * field_idx)) / 2.0
285
                    )
286
287
                self.assertEqual(
288
                    dp.get_spot_value(
289
                        0, "volume",
290
                        dt=minute,
291
                        data_frequency=sim_params.data_frequency),
292
                    8778  # 4389 * 2
293
                )
294
295
            for idx, minute in enumerate(minutes[490:]):
296
                # back to real data
297
                for field_idx, field in enumerate(["open", "high", "low",
298
                                                   "close", "volume"]):
299
                    self.assertEqual(
300
                        dp.get_spot_value(
301
                            0, field,
302
                            dt=minute,
303
                            data_frequency=sim_params.data_frequency
304
                        ),
305
                        (390 + idx + (1000 * field_idx))
306
                    )
307
        finally:
308
            tempdir.cleanup()
309
310
    def test_spot_value_futures(self):
311
        tempdir = TempDirectory()
312
        try:
313
            start_dt = pd.Timestamp("2015-11-20 20:11", tz='UTC')
314
            end_dt = pd.Timestamp(start_dt + timedelta(minutes=10000))
315
316
            zeroes_buffer = \
317
                [0] * int((start_dt -
318
                           normalize_date(start_dt)).total_seconds() / 60)
319
320
            df = pd.DataFrame({
321
                "open": np.array(zeroes_buffer + list(range(0, 10000))) * 1000,
322
                "high": np.array(
323
                    zeroes_buffer + list(range(10000, 20000))) * 1000,
324
                "low": np.array(
325
                    zeroes_buffer + list(range(20000, 30000))) * 1000,
326
                "close": np.array(
327
                    zeroes_buffer + list(range(30000, 40000))) * 1000,
328
                "volume": np.array(zeroes_buffer + list(range(40000, 50000)))
329
            })
330
331
            path = os.path.join(tempdir.path, "123.bcolz")
332
            ctable = bcolz.ctable.fromdataframe(df, rootdir=path)
333
            ctable.attrs["start_dt"] = start_dt.value / 1e9
334
            ctable.attrs["last_dt"] = end_dt.value / 1e9
335
336
            env = TradingEnvironment()
337
            env.write_data(futures_data={
338
                123: {
339
                    "start_date": normalize_date(start_dt),
340
                    "end_date": env.next_trading_day(normalize_date(end_dt)),
341
                    'symbol': 'TEST_FUTURE',
342
                    'asset_type': 'future',
343
                }
344
            })
345
346
            dp = DataPortal(
347
                env,
348
                minutes_futures_path=tempdir.path
349
            )
350
351
            future123 = env.asset_finder.retrieve_asset(123)
352
353
            data_frequency = 'minute'
354
355
            for i in range(0, 10000):
356
                dt = pd.Timestamp(start_dt + timedelta(minutes=i))
357
358
                self.assertEqual(i,
359
                                 dp.get_spot_value(
360
                                     future123, "open", dt, data_frequency))
361
                self.assertEqual(i + 10000,
362
                                 dp.get_spot_value(
363
                                     future123, "high", dt, data_frequency))
364
                self.assertEqual(i + 20000,
365
                                 dp.get_spot_value(
366
                                     future123, "low", dt, data_frequency))
367
                self.assertEqual(i + 30000,
368
                                 dp.get_spot_value(
369
                                     future123, "close", dt, data_frequency))
370
                self.assertEqual(i + 40000,
371
                                 dp.get_spot_value(
372
                                     future123, "volume", dt, data_frequency))
373
374
        finally:
375
            tempdir.cleanup()
376