Completed
Pull Request — master (#858)
by Eddie
01:32
created

tests.TestDataPortal.test_forward_fill_daily()   B

Complexity

Conditions 6

Size

Total Lines 67

Duplication

Lines 0
Ratio 0 %
Metric Value
cc 6
dl 0
loc 67
rs 7.6298

How to fix   Long Method   

Long Method

Small methods make your code easier to understand, in particular if combined with a good name. Besides, if your method is small, finding a good name is usually much easier.

For example, if you find yourself adding comments to a method's body, this is usually a good sign to extract the commented part to a new method, and use the comment as a starting point when coming up with a good name for this new method.

Commonly applied refactorings include:

1
#
2
# Copyright 2015 Quantopian, Inc.
3
#
4
# Licensed under the Apache License, Version 2.0 (the "License");
5
# you may not use this file except in compliance with the License.
6
# You may obtain a copy of the License at
7
#
8
#     http://www.apache.org/licenses/LICENSE-2.0
9
#
10
# Unless required by applicable law or agreed to in writing, software
11
# distributed under the License is distributed on an "AS IS" BASIS,
12
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13
# See the License for the specific language governing permissions and
14
# limitations under the License.
15
import os
16
from datetime import timedelta
17
import bcolz
18
import numpy as np
19
import pandas as pd
20
21
from unittest import TestCase
22
from pandas.tslib import normalize_date
23
from testfixtures import TempDirectory
24
from zipline.data.data_portal import DataPortal
25
from zipline.data.us_equity_pricing import SQLiteAdjustmentWriter, \
26
    SQLiteAdjustmentReader
27
from zipline.finance.trading import TradingEnvironment, SimulationParameters
28
from zipline.data.us_equity_pricing import BcolzDailyBarReader
29
from zipline.data.us_equity_minutes import (
30
    MinuteBarWriterFromDataFrames,
31
    BcolzMinuteBarReader
32
)
33
from zipline.data.future_pricing import FutureMinuteReader
34
from .utils.daily_bar_writer import DailyBarWriterFromDataFrames
35
36
37
class TestDataPortal(TestCase):
38
    def test_forward_fill_minute(self):
39
        tempdir = TempDirectory()
40
        try:
41
            env = TradingEnvironment()
42
            env.write_data(
43
                equities_data={
44
                    0: {
45
                        'start_date': pd.Timestamp("2015-09-28", tz='UTC'),
46
                        'end_date': pd.Timestamp("2015-09-29", tz='UTC')
47
                        + timedelta(days=1)
48
                    }
49
                }
50
            )
51
52
            minutes = env.minutes_for_days_in_range(
53
                start=pd.Timestamp("2015-09-28", tz='UTC'),
54
                end=pd.Timestamp("2015-09-29", tz='UTC')
55
            )
56
57
            df = pd.DataFrame({
58
                # one missing bar, then 200 bars of real data,
59
                # then 1.5 days of missing data
60
                "open": np.array([0] + list(range(0, 200)) + [0] * 579)
61
                * 1000,
62
                "high": np.array([0] + list(range(1000, 1200)) + [0] * 579)
63
                * 1000,
64
                "low": np.array([0] + list(range(2000, 2200)) + [0] * 579)
65
                * 1000,
66
                "close": np.array([0] + list(range(3000, 3200)) + [0] * 579)
67
                * 1000,
68
                "volume": [0] + list(range(4000, 4200)) + [0] * 579,
69
                "minute": minutes
70
            })
71
72
            MinuteBarWriterFromDataFrames(
73
                pd.Timestamp('2002-01-02', tz='UTC')).write(
74
                    tempdir.path, {0: df})
75
76
            sim_params = SimulationParameters(
77
                period_start=minutes[0],
78
                period_end=minutes[-1],
79
                data_frequency="minute",
80
                env=env,
81
            )
82
83
            equity_minute_reader = BcolzMinuteBarReader(tempdir.path)
84
85
            dp = DataPortal(
86
                env,
87
                equity_minute_reader=equity_minute_reader,
88
            )
89
90
            for minute_idx, minute in enumerate(minutes):
91
                for field_idx, field in enumerate(
92
                        ["open", "high", "low", "close", "volume"]):
93
                    val = dp.get_spot_value(
94
                        0, field,
95
                        dt=minute,
96
                        data_frequency=sim_params.data_frequency)
97
                    if minute_idx == 0:
98
                        self.assertEqual(0, val)
99
                    elif minute_idx < 200:
100
                        self.assertEqual((minute_idx - 1) +
101
                                         (field_idx * 1000), val)
102
                    else:
103
                        self.assertEqual(199 + (field_idx * 1000), val)
104
        finally:
105
            tempdir.cleanup()
106
107
    def test_forward_fill_daily(self):
108
        tempdir = TempDirectory()
109
        try:
110
            # 17 trading days
111
            start_day = pd.Timestamp("2015-09-07", tz='UTC')
112
            end_day = pd.Timestamp("2015-09-30", tz='UTC')
113
114
            env = TradingEnvironment()
115
            env.write_data(
116
                equities_data={
117
                    0: {
118
                        'start_date': start_day,
119
                        'end_date': end_day
120
                    }
121
                }
122
            )
123
124
            days = env.days_in_range(start_day, end_day)
125
126
            # first bar is missing.  then 8 real bars.  then 8 more missing
127
            # bars.
128
            df = pd.DataFrame({
129
                "open": [0] + list(range(0, 8)) + [0] * 8,
130
                "high": [0] + list(range(10, 18)) + [0] * 8,
131
                "low": [0] + list(range(20, 28)) + [0] * 8,
132
                "close": [0] + list(range(30, 38)) + [0] * 8,
133
                "volume": [0] + list(range(40, 48)) + [0] * 8,
134
                "day": [day.value for day in days]
135
            }, index=days)
136
137
            assets = {0: df}
138
            path = os.path.join(tempdir.path, "testdaily.bcolz")
139
140
            DailyBarWriterFromDataFrames(assets).write(
141
                path,
142
                days,
143
                assets
144
            )
145
146
            sim_params = SimulationParameters(
147
                period_start=days[0],
148
                period_end=days[-1],
149
                data_frequency="daily"
150
            )
151
152
            equity_daily_reader = BcolzDailyBarReader(path)
153
154
            dp = DataPortal(
155
                env,
156
                equity_daily_reader=equity_daily_reader,
157
            )
158
159
            for day_idx, day in enumerate(days):
160
                for field_idx, field in enumerate(
161
                        ["open", "high", "low", "close", "volume"]):
162
                    val = dp.get_spot_value(
163
                        0, field,
164
                        dt=day,
165
                        data_frequency=sim_params.data_frequency)
166
                    if day_idx == 0:
167
                        self.assertEqual(0, val)
168
                    elif day_idx < 9:
169
                        self.assertEqual((day_idx - 1) + (field_idx * 10), val)
170
                    else:
171
                        self.assertEqual(7 + (field_idx * 10), val)
172
        finally:
173
            tempdir.cleanup()
174
175
    def test_adjust_forward_fill_minute(self):
176
        tempdir = TempDirectory()
177
        try:
178
            start_day = pd.Timestamp("2013-06-21", tz='UTC')
179
            end_day = pd.Timestamp("2013-06-24", tz='UTC')
180
181
            env = TradingEnvironment()
182
            env.write_data(
183
                equities_data={
184
                    0: {
185
                        'start_date': start_day,
186
                        'end_date': env.next_trading_day(end_day)
187
                    }
188
                }
189
            )
190
191
            minutes = env.minutes_for_days_in_range(
192
                start=start_day,
193
                end=end_day
194
            )
195
196
            df = pd.DataFrame({
197
                # 390 bars of real data, then 100 missing bars, then 290
198
                # bars of data again
199
                "open": np.array(list(range(0, 390)) + [0] * 100 +
200
                                 list(range(390, 680))) * 1000,
201
                "high": np.array(list(range(1000, 1390)) + [0] * 100 +
202
                                 list(range(1390, 1680))) * 1000,
203
                "low": np.array(list(range(2000, 2390)) + [0] * 100 +
204
                                list(range(2390, 2680))) * 1000,
205
                "close": np.array(list(range(3000, 3390)) + [0] * 100 +
206
                                  list(range(3390, 3680))) * 1000,
207
                "volume": np.array(list(range(4000, 4390)) + [0] * 100 +
208
                                   list(range(4390, 4680))),
209
                "minute": minutes
210
            })
211
212
            MinuteBarWriterFromDataFrames(
213
                pd.Timestamp('2002-01-02', tz='UTC')).write(
214
                    tempdir.path, {0: df})
215
216
            sim_params = SimulationParameters(
217
                period_start=minutes[0],
218
                period_end=minutes[-1],
219
                data_frequency="minute",
220
                env=env
221
            )
222
223
            # create a split for 6/24
224
            adjustments_path = os.path.join(tempdir.path, "adjustments.db")
225
            writer = SQLiteAdjustmentWriter(adjustments_path,
226
                                            pd.date_range(start=start_day,
227
                                                          end=end_day),
228
                                            None)
229
230
            splits = pd.DataFrame([{
231
                'effective_date': int(end_day.value / 1e9),
232
                'ratio': 0.5,
233
                'sid': 0
234
            }])
235
236
            dividend_data = {
237
                # Hackery to make the dtypes correct on an empty frame.
238
                'ex_date': np.array([], dtype='datetime64[ns]'),
239
                'pay_date': np.array([], dtype='datetime64[ns]'),
240
                'record_date': np.array([], dtype='datetime64[ns]'),
241
                'declared_date': np.array([], dtype='datetime64[ns]'),
242
                'amount': np.array([], dtype=float),
243
                'sid': np.array([], dtype=int),
244
            }
245
            dividends = pd.DataFrame(
246
                dividend_data,
247
                index=pd.DatetimeIndex([], tz='UTC'),
248
                columns=['ex_date',
249
                         'pay_date',
250
                         'record_date',
251
                         'declared_date',
252
                         'amount',
253
                         'sid']
254
            )
255
256
            merger_data = {
257
                # Hackery to make the dtypes correct on an empty frame.
258
                'effective_date': np.array([], dtype=int),
259
                'ratio': np.array([], dtype=float),
260
                'sid': np.array([], dtype=int),
261
            }
262
            mergers = pd.DataFrame(
263
                merger_data,
264
                index=pd.DatetimeIndex([], tz='UTC')
265
            )
266
267
            writer.write(splits, mergers, dividends)
268
269
            equity_minute_reader = BcolzMinuteBarReader(tempdir.path)
270
271
            dp = DataPortal(
272
                env,
273
                equity_minute_reader=equity_minute_reader,
274
                adjustment_reader=SQLiteAdjustmentReader(adjustments_path)
275
            )
276
277
            # phew, finally ready to start testing.
278
            for idx, minute in enumerate(minutes[:390]):
279
                for field_idx, field in enumerate(["open", "high", "low",
280
                                                   "close", "volume"]):
281
                    self.assertEqual(
282
                        dp.get_spot_value(
283
                            0, field,
284
                            dt=minute,
285
                            data_frequency=sim_params.data_frequency),
286
                        idx + (1000 * field_idx)
287
                    )
288
289
            for idx, minute in enumerate(minutes[390:490]):
290
                # no actual data for this part, so we'll forward-fill.
291
                # make sure the forward-filled values are adjusted.
292
                for field_idx, field in enumerate(["open", "high", "low",
293
                                                   "close"]):
294
                    self.assertEqual(
295
                        dp.get_spot_value(
296
                            0, field,
297
                            dt=minute,
298
                            data_frequency=sim_params.data_frequency),
299
                        (389 + (1000 * field_idx)) / 2.0
300
                    )
301
302
                self.assertEqual(
303
                    dp.get_spot_value(
304
                        0, "volume",
305
                        dt=minute,
306
                        data_frequency=sim_params.data_frequency),
307
                    8778  # 4389 * 2
308
                )
309
310
            for idx, minute in enumerate(minutes[490:]):
311
                # back to real data
312
                for field_idx, field in enumerate(["open", "high", "low",
313
                                                   "close", "volume"]):
314
                    self.assertEqual(
315
                        dp.get_spot_value(
316
                            0, field,
317
                            dt=minute,
318
                            data_frequency=sim_params.data_frequency
319
                        ),
320
                        (390 + idx + (1000 * field_idx))
321
                    )
322
        finally:
323
            tempdir.cleanup()
324
325
    def test_spot_value_futures(self):
326
        tempdir = TempDirectory()
327
        try:
328
            start_dt = pd.Timestamp("2015-11-20 20:11", tz='UTC')
329
            end_dt = pd.Timestamp(start_dt + timedelta(minutes=10000))
330
331
            zeroes_buffer = \
332
                [0] * int((start_dt -
333
                           normalize_date(start_dt)).total_seconds() / 60)
334
335
            df = pd.DataFrame({
336
                "open": np.array(zeroes_buffer + list(range(0, 10000))) * 1000,
337
                "high": np.array(
338
                    zeroes_buffer + list(range(10000, 20000))) * 1000,
339
                "low": np.array(
340
                    zeroes_buffer + list(range(20000, 30000))) * 1000,
341
                "close": np.array(
342
                    zeroes_buffer + list(range(30000, 40000))) * 1000,
343
                "volume": np.array(zeroes_buffer + list(range(40000, 50000)))
344
            })
345
346
            path = os.path.join(tempdir.path, "123.bcolz")
347
            ctable = bcolz.ctable.fromdataframe(df, rootdir=path)
348
            ctable.attrs["start_dt"] = start_dt.value / 1e9
349
            ctable.attrs["last_dt"] = end_dt.value / 1e9
350
351
            env = TradingEnvironment()
352
            env.write_data(futures_data={
353
                123: {
354
                    "start_date": normalize_date(start_dt),
355
                    "end_date": env.next_trading_day(normalize_date(end_dt)),
356
                    'symbol': 'TEST_FUTURE',
357
                    'asset_type': 'future',
358
                }
359
            })
360
361
            future_minute_reader = FutureMinuteReader(tempdir.path)
362
363
            dp = DataPortal(
364
                env,
365
                future_minute_reader=future_minute_reader
366
            )
367
368
            future123 = env.asset_finder.retrieve_asset(123)
369
370
            data_frequency = 'minute'
371
372
            for i in range(0, 10000):
373
                dt = pd.Timestamp(start_dt + timedelta(minutes=i))
374
375
                self.assertEqual(i,
376
                                 dp.get_spot_value(
377
                                     future123, "open", dt, data_frequency))
378
                self.assertEqual(i + 10000,
379
                                 dp.get_spot_value(
380
                                     future123, "high", dt, data_frequency))
381
                self.assertEqual(i + 20000,
382
                                 dp.get_spot_value(
383
                                     future123, "low", dt, data_frequency))
384
                self.assertEqual(i + 30000,
385
                                 dp.get_spot_value(
386
                                     future123, "close", dt, data_frequency))
387
                self.assertEqual(i + 40000,
388
                                 dp.get_spot_value(
389
                                     future123, "volume", dt, data_frequency))
390
391
        finally:
392
            tempdir.cleanup()
393