Completed
Pull Request — master (#906)
by Eddie
01:40
created

tests.TestDataPortal.test_forward_fill_daily()   B

Complexity

Conditions 6

Size

Total Lines 65

Duplication

Lines 0
Ratio 0 %
Metric Value
cc 6
dl 0
loc 65
rs 7.6697

How to fix   Long Method   

Long Method

Small methods make your code easier to understand, in particular if combined with a good name. Besides, if your method is small, finding a good name is usually much easier.

For example, if you find yourself adding comments to a method's body, this is usually a good sign to extract the commented part to a new method, and use the comment as a starting point when coming up with a good name for this new method.

Commonly applied refactorings include:

1
# Copyright 2015 Quantopian, Inc.
2
#
3
# Licensed under the Apache License, Version 2.0 (the "License");
4
# you may not use this file except in compliance with the License.
5
# You may obtain a copy of the License at
6
#
7
#     http://www.apache.org/licenses/LICENSE-2.0
8
#
9
# Unless required by applicable law or agreed to in writing, software
10
# distributed under the License is distributed on an "AS IS" BASIS,
11
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12
# See the License for the specific language governing permissions and
13
# limitations under the License.
14
15
import os
16
from datetime import timedelta
17
import bcolz
18
import numpy as np
19
import pandas as pd
20
21
from unittest import TestCase
22
from pandas.tslib import normalize_date
23
from testfixtures import TempDirectory
24
from zipline.data.data_portal import DataPortal
25
from zipline.data.us_equity_pricing import (
26
    SQLiteAdjustmentWriter,
27
    SQLiteAdjustmentReader,
28
    BcolzDailyBarReader
29
)
30
from zipline.finance.trading import TradingEnvironment, SimulationParameters
31
from zipline.data.future_pricing import FutureMinuteReader
32
from zipline.data.us_equity_minutes import (
33
    MinuteBarWriterFromDataFrames,
34
    BcolzMinuteBarReader
35
)
36
from .utils.daily_bar_writer import DailyBarWriterFromDataFrames
37
38
39
class TestDataPortal(TestCase):
40
    def test_forward_fill_minute(self):
41
        tempdir = TempDirectory()
42
        try:
43
            env = TradingEnvironment()
44
            env.write_data(
45
                equities_data={
46
                    0: {
47
                        'start_date': pd.Timestamp("2015-09-28", tz='UTC'),
48
                        'end_date': pd.Timestamp("2015-09-29", tz='UTC')
49
                        + timedelta(days=1)
50
                    }
51
                }
52
            )
53
54
            minutes = env.minutes_for_days_in_range(
55
                start=pd.Timestamp("2015-09-28", tz='UTC'),
56
                end=pd.Timestamp("2015-09-29", tz='UTC')
57
            )
58
59
            df = pd.DataFrame({
60
                # one missing bar, then 200 bars of real data,
61
                # then 1.5 days of missing data
62
                "open": np.array([0] + list(range(0, 200)) + [0] * 579)
63
                * 1000,
64
                "high": np.array([0] + list(range(1000, 1200)) + [0] * 579)
65
                * 1000,
66
                "low": np.array([0] + list(range(2000, 2200)) + [0] * 579)
67
                * 1000,
68
                "close": np.array([0] + list(range(3000, 3200)) + [0] * 579)
69
                * 1000,
70
                "volume": [0] + list(range(4000, 4200)) + [0] * 579,
71
                "minute": minutes
72
            })
73
74
            MinuteBarWriterFromDataFrames(
75
                pd.Timestamp('2002-01-02', tz='UTC')).write(
76
                    tempdir.path, {0: df})
77
78
            sim_params = SimulationParameters(
79
                period_start=minutes[0],
80
                period_end=minutes[-1],
81
                data_frequency="minute",
82
                env=env,
83
            )
84
85
            equity_minute_reader = BcolzMinuteBarReader(tempdir.path)
86
87
            dp = DataPortal(
88
                env,
89
                equity_minute_reader=equity_minute_reader,
90
                sim_params=sim_params
91
            )
92
93
            for minute_idx, minute in enumerate(minutes):
94
                for field_idx, field in enumerate(
95
                        ["open", "high", "low", "close", "volume"]):
96
                    val = dp.get_spot_value(0, field, dt=minute)
97
                    if minute_idx == 0:
98
                        self.assertEqual(0, val)
99
                    elif minute_idx < 200:
100
                        self.assertEqual((minute_idx - 1) +
101
                                         (field_idx * 1000), val)
102
                    else:
103
                        self.assertEqual(199 + (field_idx * 1000), val)
104
        finally:
105
            tempdir.cleanup()
106
107
    def test_forward_fill_daily(self):
108
        tempdir = TempDirectory()
109
        try:
110
            # 17 trading days
111
            start_day = pd.Timestamp("2015-09-07", tz='UTC')
112
            end_day = pd.Timestamp("2015-09-30", tz='UTC')
113
114
            env = TradingEnvironment()
115
            env.write_data(
116
                equities_data={
117
                    0: {
118
                        'start_date': start_day,
119
                        'end_date': end_day
120
                    }
121
                }
122
            )
123
124
            days = env.days_in_range(start_day, end_day)
125
126
            # first bar is missing.  then 8 real bars.  then 8 more missing
127
            # bars.
128
            df = pd.DataFrame({
129
                "open": [0] + list(range(0, 8)) + [0] * 8,
130
                "high": [0] + list(range(10, 18)) + [0] * 8,
131
                "low": [0] + list(range(20, 28)) + [0] * 8,
132
                "close": [0] + list(range(30, 38)) + [0] * 8,
133
                "volume": [0] + list(range(40, 48)) + [0] * 8,
134
                "day": [day.value for day in days]
135
            }, index=days)
136
137
            assets = {0: df}
138
            path = os.path.join(tempdir.path, "testdaily.bcolz")
139
140
            DailyBarWriterFromDataFrames(assets).write(
141
                path,
142
                days,
143
                assets
144
            )
145
146
            sim_params = SimulationParameters(
147
                period_start=days[0],
148
                period_end=days[-1],
149
                data_frequency="daily"
150
            )
151
152
            equity_daily_reader = BcolzDailyBarReader(path)
153
154
            dp = DataPortal(
155
                env,
156
                equity_daily_reader=equity_daily_reader,
157
                sim_params=sim_params
158
            )
159
160
            for day_idx, day in enumerate(days):
161
                for field_idx, field in enumerate(
162
                        ["open", "high", "low", "close", "volume"]):
163
                    val = dp.get_spot_value(0, field, dt=day)
164
                    if day_idx == 0:
165
                        self.assertEqual(0, val)
166
                    elif day_idx < 9:
167
                        self.assertEqual((day_idx - 1) + (field_idx * 10), val)
168
                    else:
169
                        self.assertEqual(7 + (field_idx * 10), val)
170
        finally:
171
            tempdir.cleanup()
172
173
    def test_adjust_forward_fill_minute(self):
174
        tempdir = TempDirectory()
175
        try:
176
            start_day = pd.Timestamp("2013-06-21", tz='UTC')
177
            end_day = pd.Timestamp("2013-06-24", tz='UTC')
178
179
            env = TradingEnvironment()
180
            env.write_data(
181
                equities_data={
182
                    0: {
183
                        'start_date': start_day,
184
                        'end_date': env.next_trading_day(end_day)
185
                    }
186
                }
187
            )
188
189
            minutes = env.minutes_for_days_in_range(
190
                start=start_day,
191
                end=end_day
192
            )
193
194
            df = pd.DataFrame({
195
                # 390 bars of real data, then 100 missing bars, then 290
196
                # bars of data again
197
                "open": np.array(list(range(0, 390)) + [0] * 100 +
198
                                 list(range(390, 680))) * 1000,
199
                "high": np.array(list(range(1000, 1390)) + [0] * 100 +
200
                                 list(range(1390, 1680))) * 1000,
201
                "low": np.array(list(range(2000, 2390)) + [0] * 100 +
202
                                list(range(2390, 2680))) * 1000,
203
                "close": np.array(list(range(3000, 3390)) + [0] * 100 +
204
                                  list(range(3390, 3680))) * 1000,
205
                "volume": np.array(list(range(4000, 4390)) + [0] * 100 +
206
                                   list(range(4390, 4680))),
207
                "minute": minutes
208
            })
209
210
            MinuteBarWriterFromDataFrames(
211
                pd.Timestamp('2002-01-02', tz='UTC')).write(
212
                    tempdir.path, {0: df})
213
214
            sim_params = SimulationParameters(
215
                period_start=minutes[0],
216
                period_end=minutes[-1],
217
                data_frequency="minute",
218
                env=env
219
            )
220
221
            # create a split for 6/24
222
            adjustments_path = os.path.join(tempdir.path, "adjustments.db")
223
            writer = SQLiteAdjustmentWriter(adjustments_path,
224
                                            pd.date_range(start=start_day,
225
                                                          end=end_day),
226
                                            None)
227
228
            splits = pd.DataFrame([{
229
                'effective_date': int(end_day.value / 1e9),
230
                'ratio': 0.5,
231
                'sid': 0
232
            }])
233
234
            dividend_data = {
235
                # Hackery to make the dtypes correct on an empty frame.
236
                'ex_date': np.array([], dtype='datetime64[ns]'),
237
                'pay_date': np.array([], dtype='datetime64[ns]'),
238
                'record_date': np.array([], dtype='datetime64[ns]'),
239
                'declared_date': np.array([], dtype='datetime64[ns]'),
240
                'amount': np.array([], dtype=float),
241
                'sid': np.array([], dtype=int),
242
            }
243
            dividends = pd.DataFrame(
244
                dividend_data,
245
                index=pd.DatetimeIndex([], tz='UTC'),
246
                columns=['ex_date',
247
                         'pay_date',
248
                         'record_date',
249
                         'declared_date',
250
                         'amount',
251
                         'sid']
252
            )
253
254
            merger_data = {
255
                # Hackery to make the dtypes correct on an empty frame.
256
                'effective_date': np.array([], dtype=int),
257
                'ratio': np.array([], dtype=float),
258
                'sid': np.array([], dtype=int),
259
            }
260
            mergers = pd.DataFrame(
261
                merger_data,
262
                index=pd.DatetimeIndex([], tz='UTC')
263
            )
264
265
            writer.write(splits, mergers, dividends)
266
267
            equity_minute_reader = BcolzMinuteBarReader(tempdir.path)
268
269
            dp = DataPortal(
270
                env,
271
                equity_minute_reader=equity_minute_reader,
272
                sim_params=sim_params,
273
                adjustment_reader=SQLiteAdjustmentReader(adjustments_path)
274
            )
275
276
            # phew, finally ready to start testing.
277
            for idx, minute in enumerate(minutes[:390]):
278
                for field_idx, field in enumerate(["open", "high", "low",
279
                                                   "close", "volume"]):
280
                    self.assertEqual(
281
                        dp.get_spot_value(0, field, dt=minute),
282
                        idx + (1000 * field_idx)
283
                    )
284
285
            for idx, minute in enumerate(minutes[390:490]):
286
                # no actual data for this part, so we'll forward-fill.
287
                # make sure the forward-filled values are adjusted.
288
                for field_idx, field in enumerate(["open", "high", "low",
289
                                                   "close"]):
290
                    self.assertEqual(
291
                        dp.get_spot_value(0, field, dt=minute),
292
                        (389 + (1000 * field_idx)) / 2.0
293
                    )
294
295
                self.assertEqual(
296
                    dp.get_spot_value(0, "volume", dt=minute),
297
                    8778  # 4389 * 2
298
                )
299
300
            for idx, minute in enumerate(minutes[490:]):
301
                # back to real data
302
                for field_idx, field in enumerate(["open", "high", "low",
303
                                                   "close", "volume"]):
304
                    self.assertEqual(
305
                        dp.get_spot_value(0, field, dt=minute),
306
                        (390 + idx + (1000 * field_idx))
307
                    )
308
        finally:
309
            tempdir.cleanup()
310
311
    def test_spot_value_futures(self):
312
        tempdir = TempDirectory()
313
        try:
314
            start_dt = pd.Timestamp("2015-11-20 20:11", tz='UTC')
315
            end_dt = pd.Timestamp(start_dt + timedelta(minutes=10000))
316
317
            zeroes_buffer = \
318
                [0] * int((start_dt -
319
                           normalize_date(start_dt)).total_seconds() / 60)
320
321
            df = pd.DataFrame({
322
                "open": np.array(zeroes_buffer + list(range(0, 10000))) * 1000,
323
                "high": np.array(
324
                    zeroes_buffer + list(range(10000, 20000))) * 1000,
325
                "low": np.array(
326
                    zeroes_buffer + list(range(20000, 30000))) * 1000,
327
                "close": np.array(
328
                    zeroes_buffer + list(range(30000, 40000))) * 1000,
329
                "volume": np.array(zeroes_buffer + list(range(40000, 50000)))
330
            })
331
332
            path = os.path.join(tempdir.path, "123.bcolz")
333
            ctable = bcolz.ctable.fromdataframe(df, rootdir=path)
334
            ctable.attrs["start_dt"] = start_dt.value / 1e9
335
            ctable.attrs["last_dt"] = end_dt.value / 1e9
336
337
            env = TradingEnvironment()
338
            env.write_data(futures_data={
339
                123: {
340
                    "start_date": normalize_date(start_dt),
341
                    "end_date": env.next_trading_day(normalize_date(end_dt)),
342
                    'symbol': 'TEST_FUTURE',
343
                    'asset_type': 'future',
344
                }
345
            })
346
347
            future_minute_reader = FutureMinuteReader(tempdir.path)
348
349
            dp = DataPortal(
350
                env,
351
                future_minute_reader=future_minute_reader
352
            )
353
354
            future123 = env.asset_finder.retrieve_asset(123)
355
356
            for i in range(0, 10000):
357
                dt = pd.Timestamp(start_dt + timedelta(minutes=i))
358
359
                self.assertEqual(i,
360
                                 dp.get_spot_value(future123, "open", dt))
361
                self.assertEqual(i + 10000,
362
                                 dp.get_spot_value(future123, "high", dt))
363
                self.assertEqual(i + 20000,
364
                                 dp.get_spot_value(future123, "low", dt))
365
                self.assertEqual(i + 30000,
366
                                 dp.get_spot_value(future123, "close", dt))
367
                self.assertEqual(i + 40000,
368
                                 dp.get_spot_value(future123, "volume", dt))
369
370
        finally:
371
            tempdir.cleanup()
372