Completed
Pull Request — master (#858)
by Eddie
10:07 queued 01:13
created

tests.TestDataPortal.test_last_traded_dt()   A

Complexity

Conditions 1

Size

Total Lines 70

Duplication

Lines 0
Ratio 0 %
Metric Value
cc 1
dl 0
loc 70
rs 9.1724

How to fix   Long Method   

Long Method

Small methods make your code easier to understand, in particular if combined with a good name. Besides, if your method is small, finding a good name is usually much easier.

For example, if you find yourself adding comments to a method's body, this is usually a good sign to extract the commented part to a new method, and use the comment as a starting point when coming up with a good name for this new method.

Commonly applied refactorings include:

1
#
2
# Copyright 2015 Quantopian, Inc.
3
#
4
# Licensed under the Apache License, Version 2.0 (the "License");
5
# you may not use this file except in compliance with the License.
6
# You may obtain a copy of the License at
7
#
8
#     http://www.apache.org/licenses/LICENSE-2.0
9
#
10
# Unless required by applicable law or agreed to in writing, software
11
# distributed under the License is distributed on an "AS IS" BASIS,
12
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13
# See the License for the specific language governing permissions and
14
# limitations under the License.
15
import os
16
from datetime import timedelta
17
import bcolz
18
import numpy as np
19
import pandas as pd
20
21
from unittest import TestCase
22
from pandas.tslib import normalize_date
23
from testfixtures import TempDirectory
24
from zipline.data.data_portal import DataPortal
25
from zipline.data.us_equity_pricing import SQLiteAdjustmentWriter, \
26
    SQLiteAdjustmentReader
27
from zipline.finance.trading import TradingEnvironment, SimulationParameters
28
from zipline.data.us_equity_pricing import BcolzDailyBarReader
29
from zipline.data.us_equity_minutes import (
30
    MinuteBarWriterFromDataFrames,
31
    BcolzMinuteBarReader
32
)
33
from zipline.data.future_pricing import FutureMinuteReader
34
from .utils.daily_bar_writer import DailyBarWriterFromDataFrames
35
36
37
class TestDataPortal(TestCase):
38
    def test_forward_fill_minute(self):
39
        tempdir = TempDirectory()
40
        try:
41
            env = TradingEnvironment()
42
            env.write_data(
43
                equities_data={
44
                    0: {
45
                        'start_date': pd.Timestamp("2015-09-28", tz='UTC'),
46
                        'end_date': pd.Timestamp("2015-09-29", tz='UTC')
47
                        + timedelta(days=1)
48
                    }
49
                }
50
            )
51
52
            minutes = env.minutes_for_days_in_range(
53
                start=pd.Timestamp("2015-09-28", tz='UTC'),
54
                end=pd.Timestamp("2015-09-29", tz='UTC')
55
            )
56
57
            df = pd.DataFrame({
58
                # one missing bar, then 200 bars of real data,
59
                # then 1.5 days of missing data
60
                "open": np.array([0] + list(range(0, 200)) + [0] * 579)
61
                * 1000,
62
                "high": np.array([0] + list(range(1000, 1200)) + [0] * 579)
63
                * 1000,
64
                "low": np.array([0] + list(range(2000, 2200)) + [0] * 579)
65
                * 1000,
66
                "close": np.array([0] + list(range(3000, 3200)) + [0] * 579)
67
                * 1000,
68
                "volume": [0] + list(range(4000, 4200)) + [0] * 579,
69
                "minute": minutes
70
            })
71
72
            MinuteBarWriterFromDataFrames(
73
                pd.Timestamp('2002-01-02', tz='UTC')).write(
74
                    tempdir.path, {0: df})
75
76
            sim_params = SimulationParameters(
77
                period_start=minutes[0],
78
                period_end=minutes[-1],
79
                data_frequency="minute",
80
                env=env,
81
            )
82
83
            equity_minute_reader = BcolzMinuteBarReader(tempdir.path)
84
85
            dp = DataPortal(
86
                env,
87
                equity_minute_reader=equity_minute_reader,
88
            )
89
90
            for minute_idx, minute in enumerate(minutes):
91
                for field_idx, field in enumerate(
92
                        ["open", "high", "low", "close", "volume"]):
93
                    val = dp.get_spot_value(
94
                        0, field,
95
                        dt=minute,
96
                        data_frequency=sim_params.data_frequency)
97
                    if minute_idx == 0:
98
                        self.assertEqual(0, val)
99
                    elif minute_idx < 200:
100
                        self.assertEqual((minute_idx - 1) +
101
                                         (field_idx * 1000), val)
102
                    else:
103
                        self.assertEqual(199 + (field_idx * 1000), val)
104
        finally:
105
            tempdir.cleanup()
106
107
    def test_forward_fill_daily(self):
108
        tempdir = TempDirectory()
109
        try:
110
            # 17 trading days
111
            start_day = pd.Timestamp("2015-09-07", tz='UTC')
112
            end_day = pd.Timestamp("2015-09-30", tz='UTC')
113
114
            env = TradingEnvironment()
115
            env.write_data(
116
                equities_data={
117
                    0: {
118
                        'start_date': start_day,
119
                        'end_date': end_day
120
                    }
121
                }
122
            )
123
124
            days = env.days_in_range(start_day, end_day)
125
126
            # first bar is missing.  then 8 real bars.  then 8 more missing
127
            # bars.
128
            df = pd.DataFrame({
129
                "open": [0] + list(range(0, 8)) + [0] * 8,
130
                "high": [0] + list(range(10, 18)) + [0] * 8,
131
                "low": [0] + list(range(20, 28)) + [0] * 8,
132
                "close": [0] + list(range(30, 38)) + [0] * 8,
133
                "volume": [0] + list(range(40, 48)) + [0] * 8,
134
                "day": [day.value for day in days]
135
            }, index=days)
136
137
            assets = {0: df}
138
            path = os.path.join(tempdir.path, "testdaily.bcolz")
139
140
            DailyBarWriterFromDataFrames(assets).write(
141
                path,
142
                days,
143
                assets
144
            )
145
146
            sim_params = SimulationParameters(
147
                period_start=days[0],
148
                period_end=days[-1],
149
                data_frequency="daily"
150
            )
151
152
            equity_daily_reader = BcolzDailyBarReader(path)
153
154
            dp = DataPortal(
155
                env,
156
                equity_daily_reader=equity_daily_reader,
157
            )
158
159
            for day_idx, day in enumerate(days):
160
                for field_idx, field in enumerate(
161
                        ["open", "high", "low", "close", "volume"]):
162
                    val = dp.get_spot_value(
163
                        0, field,
164
                        dt=day,
165
                        data_frequency=sim_params.data_frequency)
166
                    if day_idx == 0:
167
                        self.assertEqual(0, val)
168
                    elif day_idx < 9:
169
                        self.assertEqual((day_idx - 1) + (field_idx * 10), val)
170
                    else:
171
                        self.assertEqual(7 + (field_idx * 10), val)
172
        finally:
173
            tempdir.cleanup()
174
175
    def test_adjust_forward_fill_minute(self):
176
        tempdir = TempDirectory()
177
        try:
178
            start_day = pd.Timestamp("2013-06-21", tz='UTC')
179
            end_day = pd.Timestamp("2013-06-24", tz='UTC')
180
181
            env = TradingEnvironment()
182
            env.write_data(
183
                equities_data={
184
                    0: {
185
                        'start_date': start_day,
186
                        'end_date': env.next_trading_day(end_day)
187
                    }
188
                }
189
            )
190
191
            minutes = env.minutes_for_days_in_range(
192
                start=start_day,
193
                end=end_day
194
            )
195
196
            df = pd.DataFrame({
197
                # 390 bars of real data, then 100 missing bars, then 290
198
                # bars of data again
199
                "open": np.array(list(range(0, 390)) + [0] * 100 +
200
                                 list(range(390, 680))) * 1000,
201
                "high": np.array(list(range(1000, 1390)) + [0] * 100 +
202
                                 list(range(1390, 1680))) * 1000,
203
                "low": np.array(list(range(2000, 2390)) + [0] * 100 +
204
                                list(range(2390, 2680))) * 1000,
205
                "close": np.array(list(range(3000, 3390)) + [0] * 100 +
206
                                  list(range(3390, 3680))) * 1000,
207
                "volume": np.array(list(range(4000, 4390)) + [0] * 100 +
208
                                   list(range(4390, 4680))),
209
                "minute": minutes
210
            })
211
212
            MinuteBarWriterFromDataFrames(
213
                pd.Timestamp('2002-01-02', tz='UTC')).write(
214
                    tempdir.path, {0: df})
215
216
            sim_params = SimulationParameters(
217
                period_start=minutes[0],
218
                period_end=minutes[-1],
219
                data_frequency="minute",
220
                env=env
221
            )
222
223
            # create a split for 6/24
224
            adjustments_path = os.path.join(tempdir.path, "adjustments.db")
225
            writer = SQLiteAdjustmentWriter(adjustments_path,
226
                                            pd.date_range(start=start_day,
227
                                                          end=end_day),
228
                                            None)
229
230
            splits = pd.DataFrame([{
231
                'effective_date': int(end_day.value / 1e9),
232
                'ratio': 0.5,
233
                'sid': 0
234
            }])
235
236
            dividend_data = {
237
                # Hackery to make the dtypes correct on an empty frame.
238
                'ex_date': np.array([], dtype='datetime64[ns]'),
239
                'pay_date': np.array([], dtype='datetime64[ns]'),
240
                'record_date': np.array([], dtype='datetime64[ns]'),
241
                'declared_date': np.array([], dtype='datetime64[ns]'),
242
                'amount': np.array([], dtype=float),
243
                'sid': np.array([], dtype=int),
244
            }
245
            dividends = pd.DataFrame(
246
                dividend_data,
247
                index=pd.DatetimeIndex([], tz='UTC'),
248
                columns=['ex_date',
249
                         'pay_date',
250
                         'record_date',
251
                         'declared_date',
252
                         'amount',
253
                         'sid']
254
            )
255
256
            merger_data = {
257
                # Hackery to make the dtypes correct on an empty frame.
258
                'effective_date': np.array([], dtype=int),
259
                'ratio': np.array([], dtype=float),
260
                'sid': np.array([], dtype=int),
261
            }
262
            mergers = pd.DataFrame(
263
                merger_data,
264
                index=pd.DatetimeIndex([], tz='UTC')
265
            )
266
267
            writer.write(splits, mergers, dividends)
268
269
            equity_minute_reader = BcolzMinuteBarReader(tempdir.path)
270
271
            dp = DataPortal(
272
                env,
273
                equity_minute_reader=equity_minute_reader,
274
                adjustment_reader=SQLiteAdjustmentReader(adjustments_path)
275
            )
276
277
            # phew, finally ready to start testing.
278
            for idx, minute in enumerate(minutes[:390]):
279
                for field_idx, field in enumerate(["open", "high", "low",
280
                                                   "close", "volume"]):
281
                    self.assertEqual(
282
                        dp.get_spot_value(
283
                            0, field,
284
                            dt=minute,
285
                            data_frequency=sim_params.data_frequency),
286
                        idx + (1000 * field_idx)
287
                    )
288
289
            for idx, minute in enumerate(minutes[390:490]):
290
                # no actual data for this part, so we'll forward-fill.
291
                # make sure the forward-filled values are adjusted.
292
                for field_idx, field in enumerate(["open", "high", "low",
293
                                                   "close"]):
294
                    self.assertEqual(
295
                        dp.get_spot_value(
296
                            0, field,
297
                            dt=minute,
298
                            data_frequency=sim_params.data_frequency),
299
                        (389 + (1000 * field_idx)) / 2.0
300
                    )
301
302
                self.assertEqual(
303
                    dp.get_spot_value(
304
                        0, "volume",
305
                        dt=minute,
306
                        data_frequency=sim_params.data_frequency),
307
                    8778  # 4389 * 2
308
                )
309
310
            for idx, minute in enumerate(minutes[490:]):
311
                # back to real data
312
                for field_idx, field in enumerate(["open", "high", "low",
313
                                                   "close", "volume"]):
314
                    self.assertEqual(
315
                        dp.get_spot_value(
316
                            0, field,
317
                            dt=minute,
318
                            data_frequency=sim_params.data_frequency
319
                        ),
320
                        (390 + idx + (1000 * field_idx))
321
                    )
322
        finally:
323
            tempdir.cleanup()
324
325
    def test_last_traded_dt(self):
326
        tempdir = TempDirectory()
327
        try:
328
            start_day = pd.Timestamp("2013-06-21", tz='UTC')
329
            end_day = pd.Timestamp("2013-06-24", tz='UTC')
330
331
            env = TradingEnvironment()
332
            env.write_data(
333
                equities_data={
334
                    0: {
335
                        'start_date': start_day,
336
                        'end_date': env.next_trading_day(end_day)
337
                    }
338
                }
339
            )
340
341
            minutes = env.minutes_for_days_in_range(
342
                start=start_day,
343
                end=end_day
344
            )
345
346
            df = pd.DataFrame({
347
                # 390 bars of real data, then 100 missing bars, then 290
348
                # bars of data again
349
                "open": np.array(list(range(0, 390)) + [0] * 100 +
350
                                 list(range(390, 680))) * 1000,
351
                "high": np.array(list(range(1000, 1390)) + [0] * 100 +
352
                                 list(range(1390, 1680))) * 1000,
353
                "low": np.array(list(range(2000, 2390)) + [0] * 100 +
354
                                list(range(2390, 2680))) * 1000,
355
                "close": np.array(list(range(3000, 3390)) + [0] * 100 +
356
                                  list(range(3390, 3680))) * 1000,
357
                "volume": np.array(list(range(4000, 4390)) + [0] * 100 +
358
                                   list(range(4390, 4680))),
359
                "minute": minutes
360
            })
361
362
            MinuteBarWriterFromDataFrames(
363
                pd.Timestamp('2002-01-02', tz='UTC')).write(
364
                    tempdir.path, {0: df})
365
366
            equity_minute_reader = BcolzMinuteBarReader(tempdir.path)
367
368
            dp = DataPortal(
369
                env,
370
                equity_minute_reader=equity_minute_reader,
371
            )
372
373
            asset = env.asset_finder.retrieve_asset(0)
374
375
            minute_with_trade = minutes[389]
376
377
            minute_without_trade = minutes[390]
378
379
            last_traded = dp.get_last_traded_dt(asset, minute_with_trade,
380
                                                'minute')
381
382
            self.assertEqual(last_traded, minute_with_trade)
383
384
            last_traded = dp.get_last_traded_dt(asset, minute_without_trade,
385
                                                'minute')
386
387
            minute_without_trade = minutes[489]
388
389
            last_traded = dp.get_last_traded_dt(asset, minute_without_trade,
390
                                                'minute')
391
392
            self.assertEqual(last_traded, minute_with_trade)
393
        finally:
394
            tempdir.cleanup()
395
396
    def test_last_traded_dt_daily(self):
397
        tempdir = TempDirectory()
398
        try:
399
            # 17 trading days
400
            start_day = pd.Timestamp("2015-09-07", tz='UTC')
401
            end_day = pd.Timestamp("2015-09-30", tz='UTC')
402
403
            env = TradingEnvironment()
404
            env.write_data(
405
                equities_data={
406
                    0: {
407
                        'start_date': start_day,
408
                        'end_date': end_day
409
                    },
410
                    1: {
411
                        'start_date': env.next_trading_day(start_day),
412
                        'end_date': end_day
413
                    }
414
                }
415
            )
416
417
            days = env.days_in_range(start_day, end_day)
418
419
            # first bar is missing.  then 8 real bars.  then 8 more missing
420
            # bars.
421
            df = pd.DataFrame({
422
                "open": [0] + list(range(0, 8)) + [0] * 8,
423
                "high": [0] + list(range(10, 18)) + [0] * 8,
424
                "low": [0] + list(range(20, 28)) + [0] * 8,
425
                "close": [0] + list(range(30, 38)) + [0] * 8,
426
                "volume": [0] + list(range(40, 48)) + [0] * 8,
427
                "day": [day.value for day in days]
428
            }, index=days)
429
            # Test a second sid, so that edge condition with very first sid
430
            # in calendar, as well as a sid with a start date after the
431
            # calendar start are tested for the 'no leading data case'
432
            df_sid_1 = pd.DataFrame({
433
                "open": [0] + list(range(0, 8)) + [0] * 7,
434
                "high": [0] + list(range(10, 18)) + [0] * 7,
435
                "low": [0] + list(range(20, 28)) + [0] * 7,
436
                "close": [0] + list(range(30, 38)) + [0] * 7,
437
                "volume": [0] + list(range(40, 48)) + [0] * 7,
438
                "day": [day.value for day in days[1:]]
439
            }, index=days[1:])
440
441
            assets = {0: df, 1: df_sid_1}
442
            path = os.path.join(tempdir.path, "testdaily.bcolz")
443
444
            DailyBarWriterFromDataFrames(assets).write(
445
                path,
446
                days,
447
                assets
448
            )
449
450
            equity_daily_reader = BcolzDailyBarReader(path)
451
452
            dp = DataPortal(
453
                env,
454
                equity_daily_reader=equity_daily_reader,
455
            )
456
457
            asset = env.asset_finder.retrieve_asset(0)
458
459
            # Day with trades.
460
            day_with_trade = df.index[8]
461
            last_traded = dp.get_last_traded_dt(asset, day_with_trade,
462
                                                'daily')
463
464
            self.assertEqual(last_traded, day_with_trade)
465
466
            # Day with no trades, should return most recent with trade.
467
            day_without_trade = df.index[11]
468
            last_traded = dp.get_last_traded_dt(asset, day_without_trade,
469
                                                'daily')
470
471
            self.assertEqual(last_traded, day_with_trade)
472
473
            first_day_also_no_trade = df.index[0]
474
475
            # Beginning bar, should return None.
476
            last_traded = dp.get_last_traded_dt(asset, first_day_also_no_trade,
477
                                                'daily')
478
479
            self.assertEqual(last_traded, None)
480
481
            asset = env.asset_finder.retrieve_asset(1)
482
483
            # Day with trades.
484
            day_with_trade = df_sid_1.index[8]
485
            last_traded = dp.get_last_traded_dt(asset, day_with_trade,
486
                                                'daily')
487
488
            self.assertEqual(last_traded, day_with_trade)
489
490
            # Day with no trades, should return most recent with trade.
491
            day_without_trade = df_sid_1.index[10]
492
            last_traded = dp.get_last_traded_dt(asset, day_without_trade,
493
                                                'daily')
494
495
            self.assertEqual(last_traded, day_with_trade)
496
497
            first_day_also_no_trade = df_sid_1.index[0]
498
499
            # Beginning bar, should return None.
500
            last_traded = dp.get_last_traded_dt(asset, first_day_also_no_trade,
501
                                                'daily')
502
503
            self.assertEqual(last_traded, None)
504
505
        finally:
506
            tempdir.cleanup()
507
508
    def test_spot_value_futures(self):
509
        tempdir = TempDirectory()
510
        try:
511
            start_dt = pd.Timestamp("2015-11-20 20:11", tz='UTC')
512
            end_dt = pd.Timestamp(start_dt + timedelta(minutes=10000))
513
514
            zeroes_buffer = \
515
                [0] * int((start_dt -
516
                           normalize_date(start_dt)).total_seconds() / 60)
517
518
            df = pd.DataFrame({
519
                "open": np.array(zeroes_buffer + list(range(0, 10000))) * 1000,
520
                "high": np.array(
521
                    zeroes_buffer + list(range(10000, 20000))) * 1000,
522
                "low": np.array(
523
                    zeroes_buffer + list(range(20000, 30000))) * 1000,
524
                "close": np.array(
525
                    zeroes_buffer + list(range(30000, 40000))) * 1000,
526
                "volume": np.array(zeroes_buffer + list(range(40000, 50000)))
527
            })
528
529
            path = os.path.join(tempdir.path, "123.bcolz")
530
            ctable = bcolz.ctable.fromdataframe(df, rootdir=path)
531
            ctable.attrs["start_dt"] = start_dt.value / 1e9
532
            ctable.attrs["last_dt"] = end_dt.value / 1e9
533
534
            env = TradingEnvironment()
535
            env.write_data(futures_data={
536
                123: {
537
                    "start_date": normalize_date(start_dt),
538
                    "end_date": env.next_trading_day(normalize_date(end_dt)),
539
                    'symbol': 'TEST_FUTURE',
540
                    'asset_type': 'future',
541
                }
542
            })
543
544
            future_minute_reader = FutureMinuteReader(tempdir.path)
545
546
            dp = DataPortal(
547
                env,
548
                future_minute_reader=future_minute_reader
549
            )
550
551
            future123 = env.asset_finder.retrieve_asset(123)
552
553
            data_frequency = 'minute'
554
555
            for i in range(0, 10000):
556
                dt = pd.Timestamp(start_dt + timedelta(minutes=i))
557
558
                self.assertEqual(i,
559
                                 dp.get_spot_value(
560
                                     future123, "open", dt, data_frequency))
561
                self.assertEqual(i + 10000,
562
                                 dp.get_spot_value(
563
                                     future123, "high", dt, data_frequency))
564
                self.assertEqual(i + 20000,
565
                                 dp.get_spot_value(
566
                                     future123, "low", dt, data_frequency))
567
                self.assertEqual(i + 30000,
568
                                 dp.get_spot_value(
569
                                     future123, "close", dt, data_frequency))
570
                self.assertEqual(i + 40000,
571
                                 dp.get_spot_value(
572
                                     future123, "volume", dt, data_frequency))
573
574
        finally:
575
            tempdir.cleanup()
576