Completed
Pull Request — master (#858)
by Eddie
01:45
created

zipline.data.DataPortal.get_history_window()   C

Complexity

Conditions 8

Size

Total Lines 51

Duplication

Lines 0
Ratio 0 %
Metric Value
cc 8
dl 0
loc 51
rs 5.2592

How to fix   Long Method   

Long Method

Small methods make your code easier to understand, in particular if combined with a good name. Besides, if your method is small, finding a good name is usually much easier.

For example, if you find yourself adding comments to a method's body, this is usually a good sign to extract the commented part to a new method, and use the comment as a starting point when coming up with a good name for this new method.

Commonly applied refactorings include:

1
#
2
# Copyright 2015 Quantopian, Inc.
3
#
4
# Licensed under the Apache License, Version 2.0 (the "License");
5
# you may not use this file except in compliance with the License.
6
# You may obtain a copy of the License at
7
#
8
#     http://www.apache.org/licenses/LICENSE-2.0
9
#
10
# Unless required by applicable law or agreed to in writing, software
11
# distributed under the License is distributed on an "AS IS" BASIS,
12
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13
# See the License for the specific language governing permissions and
14
# limitations under the License.
15
16
from datetime import datetime
17
import bcolz
18
from logbook import Logger
19
20
import numpy as np
21
import pandas as pd
22
from pandas.tslib import normalize_date
23
from six import iteritems
24
25
from zipline.assets import Asset, Future, Equity
26
from zipline.data.us_equity_pricing import (
27
    BcolzDailyBarReader,
28
    NoDataOnDate
29
)
30
from zipline.pipeline.data.equity_pricing import USEquityPricing
31
32
from zipline.utils import tradingcalendar
33
from zipline.errors import (
34
    NoTradeDataAvailableTooEarly,
35
    NoTradeDataAvailableTooLate
36
)
37
38
log = Logger('DataPortal')
39
40
HISTORY_FREQUENCIES = ["1d", "1m"]
41
42
BASE_FIELDS = {
43
    'open': 'open',
44
    'open_price': 'open',
45
    'high': 'high',
46
    'low': 'low',
47
    'close': 'close',
48
    'close_price': 'close',
49
    'volume': 'volume',
50
    'price': 'close'
51
}
52
53
54
US_EQUITY_COLUMNS = {
55
    'open': USEquityPricing.open,
56
    'open_price': USEquityPricing.open,
57
    'high': USEquityPricing.high,
58
    'low': USEquityPricing.low,
59
    'close': USEquityPricing.close,
60
    'close_price': USEquityPricing.close,
61
    'volume': USEquityPricing.volume,
62
    'price': USEquityPricing.close,
63
}
64
65
66
class DataPortal(object):
67
    def __init__(self,
68
                 env,
69
                 sim_params=None,
70
                 equity_minute_reader=None,
71
                 minutes_futures_path=None,
72
                 daily_equities_path=None,
73
                 adjustment_reader=None,
74
                 futures_sid_path_func=None):
75
        self.env = env
76
77
        # Internal pointers to the current dt (can be minute) and current day.
78
        # In daily mode, they point to the same thing. In minute mode, it's
79
        # useful to have separate pointers to the current day and to the
80
        # current minute.  These pointers are updated by the
81
        # AlgorithmSimulator's transform loop.
82
        self.current_dt = None
83
        self.current_day = None
84
85
        # This is a bit ugly, but is here for performance reasons.  In minute
86
        # simulations, we need to very quickly go from dt -> (# of minutes
87
        # since Jan 1 2002 9:30 Eastern).
88
        #
89
        # The clock that heartbeats the simulation has all the necessary
90
        # information to do this calculation very quickly.  This value is
91
        # calculated there, and then set here
92
        self.cur_data_offset = 0
93
94
        self.views = {}
95
96
        self._daily_equities_path = daily_equities_path
97
        self._minutes_futures_path = minutes_futures_path
98
99
        self._asset_finder = env.asset_finder
100
101
        self._carrays = {
102
            'open': {},
103
            'high': {},
104
            'low': {},
105
            'close': {},
106
            'volume': {},
107
            'sid': {},
108
            'dt': {},
109
        }
110
111
        self._adjustment_reader = adjustment_reader
112
113
        # caches of sid -> adjustment list
114
        self._splits_dict = {}
115
        self._mergers_dict = {}
116
        self._dividends_dict = {}
117
118
        # Cache of sid -> the first trading day of an asset, even if that day
119
        # is before 1/2/2002.
120
        self._asset_start_dates = {}
121
        self._asset_end_dates = {}
122
123
        # Handle extra sources, like Fetcher.
124
        self._augmented_sources_map = {}
125
        self._extra_source_df = None
126
127
        self._sim_params = sim_params
128
        if self._sim_params is not None:
129
            self._data_frequency = self._sim_params.data_frequency
130
        else:
131
            self._data_frequency = "minute"
132
133
        self._futures_sid_path_func = futures_sid_path_func
134
135
        self.MINUTE_PRICE_ADJUSTMENT_FACTOR = 0.001
136
137
        if daily_equities_path is not None:
138
            self._daily_bar_reader = BcolzDailyBarReader(daily_equities_path)
139
        else:
140
            self._daily_bar_reader = None
141
142
        self._equity_minute_reader = equity_minute_reader
143
144
        # The following values are used by _minute_offset to calculate the
145
        # index into the minute bcolz date.
146
147
        # A lookup of table every minute to the corresponding day, to avoid
148
        # calling `.date()` on every lookup.
149
        self._minutes_to_day = {}
150
        # A map of days (keyed by midnight) to a DatetimeIndex of market
151
        # minutes for that day.
152
        self._minutes_by_day = {}
153
        # A dict of day to the offset into the minute bcolz on which that
154
        # days data starts.
155
        self._day_offsets = None
156
157
    def handle_extra_source(self, source_df):
158
        """
159
        Extra sources always have a sid column.
160
161
        We expand the given data (by forward filling) to the full range of
162
        the simulation dates, so that lookup is fast during simulation.
163
        """
164
        if source_df is None:
165
            return
166
167
        self._extra_source_df = source_df
168
169
        # source_df's sid column can either consist of assets we know about
170
        # (such as sid(24)) or of assets we don't know about (such as
171
        # palladium).
172
        #
173
        # In both cases, we break up the dataframe into individual dfs
174
        # that only contain a single asset's information.  ie, if source_df
175
        # has data for PALLADIUM and GOLD, we split source_df into two
176
        # dataframes, one for each. (same applies if source_df has data for
177
        # AAPL and IBM).
178
        #
179
        # We then take each child df and reindex it to the simulation's date
180
        # range by forward-filling missing values. this makes reads simpler.
181
        #
182
        # Finally, we store the data. For each column, we store a mapping in
183
        # self.augmented_sources_map from the column to a dictionary of
184
        # asset -> df.  In other words,
185
        # self.augmented_sources_map['days_to_cover']['AAPL'] gives us the df
186
        # holding that data.
187
188
        if self._sim_params.emission_rate == "daily":
189
            source_date_index = self.env.days_in_range(
190
                start=self._sim_params.period_start,
191
                end=self._sim_params.period_end
192
            )
193
        else:
194
            source_date_index = self.env.minutes_for_days_in_range(
195
                start=self._sim_params.period_start,
196
                end=self._sim_params.period_end
197
            )
198
199
        # Break the source_df up into one dataframe per sid.  This lets
200
        # us (more easily) calculate accurate start/end dates for each sid,
201
        # de-dup data, and expand the data to fit the backtest start/end date.
202
        grouped_by_sid = source_df.groupby(["sid"])
203
        group_names = grouped_by_sid.groups.keys()
204
        group_dict = {}
205
        for group_name in group_names:
206
            group_dict[group_name] = grouped_by_sid.get_group(group_name)
207
208
        for identifier, df in iteritems(group_dict):
209
            # Before reindexing, save the earliest and latest dates
210
            earliest_date = df.index[0]
211
            latest_date = df.index[-1]
212
213
            # Since we know this df only contains a single sid, we can safely
214
            # de-dupe by the index (dt)
215
            df = df.groupby(level=0).last()
216
217
            # Reindex the dataframe based on the backtest start/end date.
218
            # This makes reads easier during the backtest.
219
            df = df.reindex(index=source_date_index, method='ffill')
220
221
            if not isinstance(identifier, Asset):
222
                # for fake assets we need to store a start/end date
223
                self._asset_start_dates[identifier] = earliest_date
224
                self._asset_end_dates[identifier] = latest_date
225
226
            for col_name in df.columns.difference(['sid']):
227
                if col_name not in self._augmented_sources_map:
228
                    self._augmented_sources_map[col_name] = {}
229
230
                self._augmented_sources_map[col_name][identifier] = df
231
232
    def _open_minute_file(self, field, asset):
233
        sid_str = str(int(asset))
234
235
        try:
236
            carray = self._carrays[field][sid_str]
237
        except KeyError:
238
            carray = self._carrays[field][sid_str] = \
239
                self._get_ctable(asset)[field]
240
241
        return carray
242
243
    def _get_ctable(self, asset):
244
        sid = int(asset)
245
246
        if isinstance(asset, Future):
247
            if self._futures_sid_path_func is not None:
248
                path = self._futures_sid_path_func(
249
                    self._minutes_futures_path, sid
250
                )
251
            else:
252
                path = "{0}/{1}.bcolz".format(self._minutes_futures_path, sid)
253
        elif isinstance(asset, Equity):
254
            if self._equity_minute_reader.sid_path_func is not None:
255
                path = self._equity_minute_reader.sid_path_func(
256
                    self._equity_minute_reader.rootdir, sid
257
                )
258
            else:
259
                path = "{0}/{1}.bcolz".format(
260
                    self._equity_minute_reader.rootdir, sid)
261
262
        else:
263
            path = "{0}/{1}.bcolz".format(
264
                self._equity_minute_reader.rootdir, sid)
265
266
        return bcolz.open(path, mode='r')
267
268
    def get_previous_value(self, asset, field, dt):
269
        """
270
        Given an asset and a column and a dt, returns the previous value for
271
        the same asset/column pair.  If this data portal is in minute mode,
272
        it's the previous minute value, otherwise it's the previous day's
273
        value.
274
275
        Parameters
276
        ---------
277
        asset : Asset
278
            The asset whose data is desired.
279
280
        field: string
281
            The desired field of the asset.  Valid values are "open",
282
            "open_price", "high", "low", "close", "close_price", "volume", and
283
            "price".
284
285
        dt: pd.Timestamp
286
            The timestamp from which to go back in time one slot.
287
288
        Returns
289
        -------
290
        The value of the desired field at the desired time.
291
        """
292
        if self._data_frequency == 'daily':
293
            prev_dt = self.env.previous_trading_day(dt)
294
        elif self._data_frequency == 'minute':
295
            prev_dt = self.env.previous_market_minute(dt)
296
297
        return self.get_spot_value(asset, field, prev_dt)
298
299
    def _check_extra_sources(self, asset, column, day):
300
        # If we have an extra source with a column called "price", only look
301
        # at it if it's on something like palladium and not AAPL (since our
302
        # own price data always wins when dealing with assets).
303
        look_in_augmented_sources = column in self._augmented_sources_map and \
304
            not (column in BASE_FIELDS and isinstance(asset, Asset))
305
306
        if look_in_augmented_sources:
307
            # we're being asked for a field in an extra source
308
            try:
309
                return self._augmented_sources_map[column][asset].\
310
                    loc[day, column]
311
            except:
312
                log.error(
313
                    "Could not find value for asset={0}, current_day={1},"
314
                    "column={2}".format(
315
                        str(asset),
316
                        str(self.current_day),
317
                        str(column)))
318
319
                raise KeyError
320
321
    def get_spot_value(self, asset, field, dt=None):
322
        """
323
        Public API method that returns a scalar value representing the value
324
        of the desired asset's field at either the given dt, or this data
325
        portal's current_dt.
326
327
        Parameters
328
        ---------
329
        asset : Asset
330
            The asset whose data is desired.gith
331
332
        field: string
333
            The desired field of the asset.  Valid values are "open",
334
            "open_price", "high", "low", "close", "close_price", "volume", and
335
            "price".
336
337
        dt: pd.Timestamp
338
            (Optional) The timestamp for the desired value.
339
340
        Returns
341
        -------
342
        The value of the desired field at the desired time.
343
        """
344
        extra_source_val = self._check_extra_sources(
345
            asset,
346
            field,
347
            (dt or self.current_dt)
348
        )
349
350
        if extra_source_val is not None:
351
            return extra_source_val
352
353
        if field not in BASE_FIELDS:
354
            raise KeyError("Invalid column: " + str(field))
355
356
        column_to_use = BASE_FIELDS[field]
357
358
        if isinstance(asset, int):
359
            asset = self._asset_finder.retrieve_asset(asset)
360
361
        self._check_is_currently_alive(asset, dt)
362
363
        if self._data_frequency == "daily":
364
            day_to_use = dt or self.current_day
365
            day_to_use = normalize_date(day_to_use)
366
            return self._get_daily_data(asset, column_to_use, day_to_use)
367
        else:
368
            dt_to_use = dt or self.current_dt
369
370
            if isinstance(asset, Future):
371
                return self._get_minute_spot_value_future(
372
                    asset, column_to_use, dt_to_use)
373
            else:
374
                return self._get_minute_spot_value(
375
                    asset, column_to_use, dt_to_use)
376
377
    def _get_minute_spot_value_future(self, asset, column, dt):
378
        # Futures bcolz files have 1440 bars per day (24 hours), 7 days a week.
379
        # The file attributes contain the "start_dt" and "last_dt" fields,
380
        # which represent the time period for this bcolz file.
381
382
        # The start_dt is midnight of the first day that this future started
383
        # trading.
384
385
        # figure out the # of minutes between dt and this asset's start_dt
386
        start_date = self._get_asset_start_date(asset)
387
        minute_offset = int((dt - start_date).total_seconds() / 60)
388
389
        if minute_offset < 0:
390
            # asking for a date that is before the asset's start date, no dice
391
            return 0.0
392
393
        # then just index into the bcolz carray at that offset
394
        carray = self._open_minute_file(column, asset)
395
        result = carray[minute_offset]
396
397
        # if there's missing data, go backwards until we run out of file
398
        while result == 0 and minute_offset > 0:
399
            minute_offset -= 1
400
            result = carray[minute_offset]
401
402
        if column != 'volume':
403
            return result * self.MINUTE_PRICE_ADJUSTMENT_FACTOR
404
        else:
405
            return result
406
407
    def setup_offset_cache(self, minutes_by_day, minutes_to_day):
408
        # TODO: This case should not be hit, but is when tests are setup
409
        # with data_frequency of daily, but run with minutely.
410
        if self._equity_minute_reader is None:
411
            return
412
413
        self._minutes_to_day = minutes_to_day
414
        self._minutes_by_day = minutes_by_day
415
        if self._sim_params is not None:
416
            start = self._sim_params.trading_days[0]
417
            first_trading_day_idx = self._equity_minute_reader.trading_days.\
418
                searchsorted(start)
419
            self._day_offsets = {
420
                day: (i + first_trading_day_idx) * 390
421
                for i, day in enumerate(
422
                    self._sim_params.trading_days)}
423
424
    def _minute_offset(self, dt):
425
        if self._day_offsets is not None:
426
            try:
427
                day = self._minutes_to_day[dt]
428
                minutes = self._minutes_by_day[day]
429
                return self._day_offsets[day] + minutes.get_loc(dt)
430
            except KeyError:
431
                return None
432
433
    def _get_minute_spot_value(self, asset, column, dt):
434
        # if dt is before the first market minute, minute_index
435
        # will be 0.  if it's after the last market minute, it'll
436
        # be len(minutes_for_day)
437
        minute_offset_to_use = self._minute_offset(dt)
438
439
        if minute_offset_to_use is None:
440
            given_day = pd.Timestamp(dt.date(), tz='utc')
441
            day_index = self._equity_minute_reader.trading_days.searchsorted(
442
                given_day)
443
444
            # if dt is before the first market minute, minute_index
445
            # will be 0.  if it's after the last market minute, it'll
446
            # be len(minutes_for_day)
447
            minute_index = self.env.market_minutes_for_day(given_day).\
448
                searchsorted(dt)
449
450
            minute_offset_to_use = (day_index * 390) + minute_index
451
452
        carray = self._open_minute_file(column, asset)
453
        result = carray[minute_offset_to_use]
454
455
        if result == 0:
456
            # if the given minute doesn't have data, we need to seek
457
            # backwards until we find data. This makes the data
458
            # forward-filled.
459
460
            # get this asset's start date, so that we don't look before it.
461
            start_date = self._get_asset_start_date(asset)
462
            start_date_idx = self._equity_minute_reader.trading_days.\
463
                searchsorted(start_date)
464
            start_day_offset = start_date_idx * 390
465
466
            original_start = minute_offset_to_use
467
468
            while result == 0 and minute_offset_to_use > start_day_offset:
469
                minute_offset_to_use -= 1
470
                result = carray[minute_offset_to_use]
471
472
            # once we've found data, we need to check whether it needs
473
            # to be adjusted.
474
            if result != 0:
475
                minutes = self.env.market_minute_window(
476
                    start=(dt or self.current_dt),
477
                    count=(original_start - minute_offset_to_use + 1),
478
                    step=-1
479
                ).order()
480
481
                # only need to check for adjustments if we've gone back
482
                # far enough to cross the day boundary.
483
                if minutes[0].date() != minutes[-1].date():
484
                    # create a np array of size minutes, fill it all with
485
                    # the same value.  and adjust the array.
486
                    arr = np.array([result] * len(minutes),
487
                                   dtype=np.float64)
488
                    self._apply_all_adjustments(
489
                        data=arr,
490
                        asset=asset,
491
                        dts=minutes,
492
                        field=column
493
                    )
494
495
                    # The first value of the adjusted array is the value
496
                    # we want.
497
                    result = arr[0]
498
499
        if column != 'volume':
500
            return result * self.MINUTE_PRICE_ADJUSTMENT_FACTOR
501
        else:
502
            return result
503
504
    def _get_daily_data(self, asset, column, dt):
505
        while True:
506
            try:
507
                value = self._daily_bar_reader.spot_price(asset, dt, column)
508
                if value != -1:
509
                    return value
510
                else:
511
                    dt -= tradingcalendar.trading_day
512
            except NoDataOnDate:
513
                return 0
514
515
    def _get_history_daily_window(self, assets, end_dt, bar_count,
516
                                  field_to_use):
517
        """
518
        Internal method that returns a dataframe containing history bars
519
        of daily frequency for the given sids.
520
        """
521
        day_idx = tradingcalendar.trading_days.searchsorted(end_dt.date())
522
        days_for_window = tradingcalendar.trading_days[
523
            (day_idx - bar_count + 1):(day_idx + 1)]
524
525
        if len(assets) == 0:
526
            return pd.DataFrame(None,
527
                                index=days_for_window,
528
                                columns=None)
529
530
        data = []
531
532
        for asset in assets:
533
            if isinstance(asset, Future):
534
                data.append(self._get_history_daily_window_future(
535
                    asset, days_for_window, end_dt, field_to_use
536
                ))
537
            else:
538
                data.append(self._get_history_daily_window_equity(
539
                    asset, days_for_window, end_dt, field_to_use
540
                ))
541
542
        return pd.DataFrame(
543
            np.array(data).T,
544
            index=days_for_window,
545
            columns=assets
546
        )
547
548
    def _get_history_daily_window_future(self, asset, days_for_window,
549
                                         end_dt, column):
550
        # Since we don't have daily bcolz files for futures (yet), use minute
551
        # bars to calculate the daily values.
552
        data = []
553
        data_groups = []
554
555
        # get all the minutes for the days NOT including today
556
        for day in days_for_window[:-1]:
557
            minutes = self.env.market_minutes_for_day(day)
558
559
            values_for_day = np.zeros(len(minutes), dtype=np.float64)
560
561
            for idx, minute in enumerate(minutes):
562
                minute_val = self._get_minute_spot_value_future(
563
                    asset, column, minute
564
                )
565
566
                values_for_day[idx] = minute_val
567
568
            data_groups.append(values_for_day)
569
570
        # get the minutes for today
571
        last_day_minutes = pd.date_range(
572
            start=self.env.get_open_and_close(end_dt)[0],
573
            end=end_dt,
574
            freq="T"
575
        )
576
577
        values_for_last_day = np.zeros(len(last_day_minutes), dtype=np.float64)
578
579
        for idx, minute in enumerate(last_day_minutes):
580
            minute_val = self._get_minute_spot_value_future(
581
                asset, column, minute
582
            )
583
584
            values_for_last_day[idx] = minute_val
585
586
        data_groups.append(values_for_last_day)
587
588
        for group in data_groups:
589
            if len(group) == 0:
590
                continue
591
592
            if column == 'volume':
593
                data.append(np.sum(group))
594
            elif column == 'open':
595
                data.append(group[0])
596
            elif column == 'close':
597
                data.append(group[-1])
598
            elif column == 'high':
599
                data.append(np.amax(group))
600
            elif column == 'low':
601
                data.append(np.amin(group))
602
603
        return data
604
605
    def _get_history_daily_window_equity(self, asset, days_for_window,
606
                                         end_dt, field_to_use):
607
        sid = int(asset)
608
        ends_at_midnight = end_dt.hour == 0 and end_dt.minute == 0
609
610
        # get the start and end dates for this sid
611
        end_date = self._get_asset_end_date(asset)
612
613
        if ends_at_midnight or (days_for_window[-1] > end_date):
614
            # two cases where we use daily data for the whole range:
615
            # 1) the history window ends at midnight utc.
616
            # 2) the last desired day of the window is after the
617
            # last trading day, use daily data for the whole range.
618
            return self._get_daily_window_for_sid(
619
                asset,
620
                field_to_use,
621
                days_for_window,
622
                extra_slot=False
623
            )
624
        else:
625
            # for the last day of the desired window, use minute
626
            # data and aggregate it.
627
            all_minutes_for_day = self.env.market_minutes_for_day(
628
                pd.Timestamp(end_dt.date()))
629
630
            last_minute_idx = all_minutes_for_day.searchsorted(end_dt)
631
632
            # these are the minutes for the partial day
633
            minutes_for_partial_day =\
634
                all_minutes_for_day[0:(last_minute_idx + 1)]
635
636
            daily_data = self._get_daily_window_for_sid(
637
                sid,
638
                field_to_use,
639
                days_for_window[0:-1]
640
            )
641
642
            minute_data = self._get_minute_window_for_equity(
643
                sid,
644
                field_to_use,
645
                minutes_for_partial_day
646
            )
647
648
            if field_to_use == 'volume':
649
                minute_value = np.sum(minute_data)
650
            elif field_to_use == 'open':
651
                minute_value = minute_data[0]
652
            elif field_to_use == 'close':
653
                minute_value = minute_data[-1]
654
            elif field_to_use == 'high':
655
                minute_value = np.amax(minute_data)
656
            elif field_to_use == 'low':
657
                minute_value = np.amin(minute_data)
658
659
            # append the partial day.
660
            daily_data[-1] = minute_value
661
662
            return daily_data
663
664
    def _get_history_minute_window(self, assets, end_dt, bar_count,
665
                                   field_to_use):
666
        """
667
        Internal method that returns a dataframe containing history bars
668
        of minute frequency for the given sids.
669
        """
670
        # get all the minutes for this window
671
        minutes_for_window = self.env.market_minute_window(
672
            end_dt, bar_count, step=-1)[::-1]
673
674
        first_trading_day = self._equity_minute_reader.first_trading_day
675
676
        # but then cut it down to only the minutes after
677
        # the first trading day.
678
        modified_minutes_for_window = minutes_for_window[
679
            minutes_for_window.slice_indexer(first_trading_day)]
680
681
        modified_minutes_length = len(modified_minutes_for_window)
682
683
        if modified_minutes_length == 0:
684
            raise ValueError("Cannot calculate history window that ends"
685
                             "before 2002-01-02 14:31 UTC!")
686
687
        data = []
688
        bars_to_prepend = 0
689
        nans_to_prepend = None
690
691
        if modified_minutes_length < bar_count:
692
            first_trading_date = first_trading_day.date()
693
            if modified_minutes_for_window[0].date() == first_trading_date:
694
                # the beginning of the window goes before our global trading
695
                # start date
696
                bars_to_prepend = bar_count - modified_minutes_length
697
                nans_to_prepend = np.repeat(np.nan, bars_to_prepend)
698
699
        if len(assets) == 0:
700
            return pd.DataFrame(
701
                None,
702
                index=modified_minutes_for_window,
703
                columns=None
704
            )
705
706
        for asset in assets:
707
            asset_minute_data = self._get_minute_window_for_asset(
708
                asset,
709
                field_to_use,
710
                modified_minutes_for_window
711
            )
712
713
            if bars_to_prepend != 0:
714
                asset_minute_data = np.insert(asset_minute_data, 0,
715
                                              nans_to_prepend)
716
717
            data.append(asset_minute_data)
718
719
        return pd.DataFrame(
720
            np.array(data).T,
721
            index=minutes_for_window,
722
            columns=map(int, assets)
723
        )
724
725
    def get_history_window(self, assets, end_dt, bar_count, frequency, field,
726
                           ffill=True):
727
        """
728
        Public API method that returns a dataframe containing the requested
729
        history window.  Data is fully adjusted.
730
731
        Parameters
732
        ---------
733
        assets : list of zipline.data.Asset objects
734
            The assets whose data is desired.
735
736
        bar_count: int
737
            The number of bars desired.
738
739
        frequency: string
740
            "1d" or "1m"
741
742
        field: string
743
            The desired field of the asset.
744
745
        ffill: boolean
746
            Forward-fill missing values. Only has effect if field
747
            is 'price'.
748
749
        Returns
750
        -------
751
        A dataframe containing the requested data.
752
        """
753
        try:
754
            field_to_use = BASE_FIELDS[field]
755
        except KeyError:
756
            raise ValueError("Invalid history field: " + str(field))
757
758
        # sanity check in case sids were passed in
759
        assets = [(self.env.asset_finder.retrieve_asset(asset) if
760
                   isinstance(asset, int) else asset) for asset in assets]
761
762
        if frequency == "1d":
763
            df = self._get_history_daily_window(assets, end_dt, bar_count,
764
                                                field_to_use)
765
        elif frequency == "1m":
766
            df = self._get_history_minute_window(assets, end_dt, bar_count,
767
                                                 field_to_use)
768
        else:
769
            raise ValueError("Invalid frequency: {0}".format(frequency))
770
771
        # forward-fill if needed
772
        if field == "price" and ffill:
773
            df.fillna(method='ffill', inplace=True)
774
775
        return df
776
777
    def _get_minute_window_for_asset(self, asset, field, minutes_for_window):
778
        """
779
        Internal method that gets a window of adjusted minute data for an asset
780
        and specified date range.  Used to support the history API method for
781
        minute bars.
782
783
        Missing bars are filled with NaN.
784
785
        Parameters
786
        ----------
787
        asset : Asset
788
            The asset whose data is desired.
789
790
        field: string
791
            The specific field to return.  "open", "high", "close_price", etc.
792
793
        minutes_for_window: pd.DateTimeIndex
794
            The list of minutes representing the desired window.  Each minute
795
            is a pd.Timestamp.
796
797
        Returns
798
        -------
799
        A numpy array with requested values.
800
        """
801
        if isinstance(asset, int):
802
            asset = self.env.asset_finder.retrieve_asset(asset)
803
804
        if isinstance(asset, Future):
805
            return self._get_minute_window_for_future(asset, field,
806
                                                      minutes_for_window)
807
        else:
808
            return self._get_minute_window_for_equity(asset, field,
809
                                                      minutes_for_window)
810
811
    def _get_minute_window_for_future(self, asset, field, minutes_for_window):
812
        # THIS IS TEMPORARY.  For now, we are only exposing futures within
813
        # equity trading hours (9:30 am to 4pm, Eastern).  The easiest way to
814
        # do this is to simply do a spot lookup for each desired minute.
815
        return_data = np.zeros(len(minutes_for_window), dtype=np.float64)
816
        for idx, minute in enumerate(minutes_for_window):
817
            return_data[idx] = \
818
                self._get_minute_spot_value_future(asset, field, minute)
819
820
        # Note: an improvement could be to find the consecutive runs within
821
        # minutes_for_window, and use them to read the underlying ctable
822
        # more efficiently.
823
824
        # Once futures are on 24-hour clock, then we can just grab all the
825
        # requested minutes in one shot from the ctable.
826
827
        # no adjustments for futures, yay.
828
        return return_data
829
830
    def _get_minute_window_for_equity(self, asset, field, minutes_for_window):
831
        # each sid's minutes are stored in a bcolz file
832
        # the bcolz file has 390 bars per day, starting at 1/2/2002, regardless
833
        # of when the asset started trading and regardless of half days.
834
        # for a half day, the second half is filled with zeroes.
835
836
        # find the position of start_dt in the entire timeline, go back
837
        # bar_count bars, and that's the unadjusted data
838
        raw_data = self._open_minute_file(field, asset)
839
840
        start_idx = max(self._find_position_of_minute(minutes_for_window[0]),
841
                        0)
842
        end_idx = self._find_position_of_minute(minutes_for_window[-1]) + 1
843
844
        if end_idx == 0:
845
            # No data to return for minute window.
846
            return np.full(len(minutes_for_window), np.nan)
847
848
        return_data = np.zeros(len(minutes_for_window), dtype=np.float64)
849
850
        data_to_copy = raw_data[start_idx:end_idx]
851
852
        num_minutes = len(minutes_for_window)
853
854
        # data_to_copy contains all the zeros (from 1pm to 4pm of an early
855
        # close).  num_minutes is the number of actual trading minutes.  if
856
        # these two have different lengths, that means that we need to trim
857
        # away data due to early closes.
858
        if len(data_to_copy) != num_minutes:
859
            # get a copy of the minutes in Eastern time, since we depend on
860
            # an early close being at 1pm Eastern.
861
            eastern_minutes = minutes_for_window.tz_convert("US/Eastern")
862
863
            # accumulate a list of indices of the last minute of an early
864
            # close day.  For example, if data_to_copy starts at 12:55 pm, and
865
            # there are five minutes of real data before 180 zeroes, we would
866
            # put 5 into last_minute_idx_of_early_close_day, because the fifth
867
            # minute is the last "real" minute of the day.
868
            last_minute_idx_of_early_close_day = []
869
            for minute_idx, minute_dt in enumerate(eastern_minutes):
870
                if minute_idx == (num_minutes - 1):
871
                    break
872
873
                if minute_dt.hour == 13 and minute_dt.minute == 0:
874
                    next_minute = eastern_minutes[minute_idx + 1]
875
                    if next_minute.hour != 13:
876
                        # minute_dt is the last minute of an early close day
877
                        last_minute_idx_of_early_close_day.append(minute_idx)
878
879
            # spin through the list of early close markers, and use them to
880
            # chop off 180 minutes at a time from data_to_copy.
881
            for idx, early_close_minute_idx in \
882
                    enumerate(last_minute_idx_of_early_close_day):
883
                early_close_minute_idx -= (180 * idx)
884
                data_to_copy = np.delete(
885
                    data_to_copy,
886
                    range(
887
                        early_close_minute_idx + 1,
888
                        early_close_minute_idx + 181
889
                    )
890
                )
891
892
        return_data[0:len(data_to_copy)] = data_to_copy
893
894
        self._apply_all_adjustments(
895
            return_data,
896
            asset,
897
            minutes_for_window,
898
            field,
899
            self.MINUTE_PRICE_ADJUSTMENT_FACTOR
900
        )
901
902
        return return_data
903
904
    def _apply_all_adjustments(self, data, asset, dts, field,
905
                               price_adj_factor=1.0):
906
        """
907
        Internal method that applies all the necessary adjustments on the
908
        given data array.
909
910
        The adjustments are:
911
        - splits
912
        - if field != "volume":
913
            - mergers
914
            - dividends
915
            - * 0.001
916
            - any zero fields replaced with NaN
917
        - all values rounded to 3 digits after the decimal point.
918
919
        Parameters
920
        ----------
921
        data : np.array
922
            The data to be adjusted.
923
924
        asset: Asset
925
            The asset whose data is being adjusted.
926
927
        dts: pd.DateTimeIndex
928
            The list of minutes or days representing the desired window.
929
930
        field: string
931
            The field whose values are in the data array.
932
933
        price_adj_factor: float
934
            Factor with which to adjust OHLC values.
935
        Returns
936
        -------
937
        None.  The data array is modified in place.
938
        """
939
        self._apply_adjustments_to_window(
940
            self._get_adjustment_list(
941
                asset, self._splits_dict, "SPLITS"
942
            ),
943
            data,
944
            dts,
945
            field != 'volume'
946
        )
947
948
        if field != 'volume':
949
            self._apply_adjustments_to_window(
950
                self._get_adjustment_list(
951
                    asset, self._mergers_dict, "MERGERS"
952
                ),
953
                data,
954
                dts,
955
                True
956
            )
957
958
            self._apply_adjustments_to_window(
959
                self._get_adjustment_list(
960
                    asset, self._dividends_dict, "DIVIDENDS"
961
                ),
962
                data,
963
                dts,
964
                True
965
            )
966
967
            data *= price_adj_factor
968
969
            # if anything is zero, it's a missing bar, so replace it with NaN.
970
            # we only want to do this for non-volume fields, because a missing
971
            # volume should be 0.
972
            data[data == 0] = np.NaN
973
974
        np.around(data, 3, out=data)
975
976
    def _find_position_of_minute(self, minute_dt):
977
        """
978
        Internal method that returns the position of the given minute in the
979
        list of every trading minute since market open on 1/2/2002.
980
981
        IMPORTANT: This method assumes every day is 390 minutes long, even
982
        early closes.  Our minute bcolz files are generated like this to
983
        support fast lookup.
984
985
        ex. this method would return 2 for 1/2/2002 9:32 AM Eastern.
986
987
        Parameters
988
        ----------
989
        minute_dt: pd.Timestamp
990
            The minute whose position should be calculated.
991
992
        Returns
993
        -------
994
        The position of the given minute in the list of all trading minutes
995
        since market open on 1/2/2002.
996
        """
997
        day = minute_dt.date()
998
        day_idx = self._equity_minute_reader.trading_days.searchsorted(day)
999
        if day_idx < 0:
1000
            return -1
1001
1002
        day_open = pd.Timestamp(
1003
            datetime(
1004
                year=day.year,
1005
                month=day.month,
1006
                day=day.day,
1007
                hour=9,
1008
                minute=31),
1009
            tz='US/Eastern').tz_convert('UTC')
1010
1011
        minutes_offset = int((minute_dt - day_open).total_seconds()) / 60
1012
1013
        return int((390 * day_idx) + minutes_offset)
1014
1015
    def _get_daily_window_for_sid(self, asset, field, days_in_window,
1016
                                  extra_slot=True):
1017
        """
1018
        Internal method that gets a window of adjusted daily data for a sid
1019
        and specified date range.  Used to support the history API method for
1020
        daily bars.
1021
1022
        Parameters
1023
        ----------
1024
        asset : Asset
1025
            The asset whose data is desired.
1026
1027
        start_dt: pandas.Timestamp
1028
            The start of the desired window of data.
1029
1030
        bar_count: int
1031
            The number of days of data to return.
1032
1033
        field: string
1034
            The specific field to return.  "open", "high", "close_price", etc.
1035
1036
        extra_slot: boolean
1037
            Whether to allocate an extra slot in the returned numpy array.
1038
            This extra slot will hold the data for the last partial day.  It's
1039
            much better to create it here than to create a copy of the array
1040
            later just to add a slot.
1041
1042
        Returns
1043
        -------
1044
        A numpy array with requested values.  Any missing slots filled with
1045
        nan.
1046
1047
        """
1048
        column = US_EQUITY_COLUMNS[field]
1049
        data = self._daily_bar_reader.load_raw_arrays([column],
1050
                                                      days_in_window[0],
1051
                                                      days_in_window[-1],
1052
                                                      [asset])
1053
1054
        bar_count = len(days_in_window)
1055
1056
        # create an np.array of size bar_count
1057
        if extra_slot:
1058
            return_array = np.zeros((bar_count + 1,))
1059
        else:
1060
            return_array = np.zeros((bar_count,))
1061
1062
        return_array[:] = np.NAN
1063
        sid = int(asset)
1064
1065
        return_array[0:bar_count] = data[0].T[0]
1066
1067
        self._apply_all_adjustments(
1068
            return_array,
1069
            sid,
1070
            days_in_window,
1071
            field,
1072
        )
1073
1074
        return return_array
1075
1076
    @staticmethod
1077
    def _apply_adjustments_to_window(adjustments_list, window_data,
1078
                                     dts_in_window, multiply):
1079
        if len(adjustments_list) == 0:
1080
            return
1081
1082
        # advance idx to the correct spot in the adjustments list, based on
1083
        # when the window starts
1084
        idx = 0
1085
1086
        while idx < len(adjustments_list) and dts_in_window[0] >\
1087
                adjustments_list[idx][0]:
1088
            idx += 1
1089
1090
        # if we've advanced through all the adjustments, then there's nothing
1091
        # to do.
1092
        if idx == len(adjustments_list):
1093
            return
1094
1095
        while idx < len(adjustments_list):
1096
            adjustment_to_apply = adjustments_list[idx]
1097
1098
            if adjustment_to_apply[0] > dts_in_window[-1]:
1099
                break
1100
1101
            range_end = dts_in_window.searchsorted(adjustment_to_apply[0])
1102
            if multiply:
1103
                window_data[0:range_end] *= adjustment_to_apply[1]
1104
            else:
1105
                window_data[0:range_end] /= adjustment_to_apply[1]
1106
1107
            idx += 1
1108
1109
    def _get_adjustment_list(self, asset, adjustments_dict, table_name):
1110
        """
1111
        Internal method that returns a list of adjustments for the given sid.
1112
1113
        Parameters
1114
        ----------
1115
        asset : Asset
1116
            The asset for which to return adjustments.
1117
1118
        adjustments_dict: dict
1119
            A dictionary of sid -> list that is used as a cache.
1120
1121
        table_name: string
1122
            The table that contains this data in the adjustments db.
1123
1124
        Returns
1125
        -------
1126
        adjustments: list
1127
            A list of [multiplier, pd.Timestamp], earliest first
1128
1129
        """
1130
        if self._adjustment_reader is None:
1131
            return []
1132
1133
        sid = int(asset)
1134
1135
        try:
1136
            adjustments = adjustments_dict[sid]
1137
        except KeyError:
1138
            adjustments = adjustments_dict[sid] = self._adjustment_reader.\
1139
                get_adjustments_for_sid(table_name, sid)
1140
1141
        return adjustments
1142
1143
    def get_equity_price_view(self, asset):
1144
        """
1145
        Returns a DataPortalSidView for the given asset.  Used to support the
1146
        data[sid(N)] public API.  Not needed if DataPortal is used standalone.
1147
1148
        Parameters
1149
        ----------
1150
        asset : Asset
1151
            Asset that is being queried.
1152
1153
        Returns
1154
        -------
1155
        DataPortalSidView: Accessor into the given asset's data.
1156
        """
1157
        try:
1158
            view = self.views[asset]
1159
        except KeyError:
1160
            view = self.views[asset] = DataPortalSidView(asset, self)
1161
1162
        return view
1163
1164
    def _check_is_currently_alive(self, asset, dt):
1165
        if dt is None:
1166
            dt = self.current_day
1167
1168
        sid = int(asset)
1169
1170
        if sid not in self._asset_start_dates:
1171
            self._get_asset_start_date(asset)
1172
1173
        start_date = self._asset_start_dates[sid]
1174
        if self._asset_start_dates[sid] > dt:
1175
            raise NoTradeDataAvailableTooEarly(
1176
                sid=sid,
1177
                dt=dt,
1178
                start_dt=start_date
1179
            )
1180
1181
        end_date = self._asset_end_dates[sid]
1182
        if self._asset_end_dates[sid] < dt:
1183
            raise NoTradeDataAvailableTooLate(
1184
                sid=sid,
1185
                dt=dt,
1186
                end_dt=end_date
1187
            )
1188
1189
    def _get_asset_start_date(self, asset):
1190
        self._ensure_asset_dates(asset)
1191
        return self._asset_start_dates[asset]
1192
1193
    def _get_asset_end_date(self, asset):
1194
        self._ensure_asset_dates(asset)
1195
        return self._asset_end_dates[asset]
1196
1197
    def _ensure_asset_dates(self, asset):
1198
        sid = int(asset)
1199
1200
        if sid not in self._asset_start_dates:
1201
            self._asset_start_dates[sid] = asset.start_date
1202
            self._asset_end_dates[sid] = asset.end_date
1203
1204
    def get_splits(self, sids, dt):
1205
        """
1206
        Returns any splits for the given sids and the given dt.
1207
1208
        Parameters
1209
        ----------
1210
        sids : list
1211
            Sids for which we want splits.
1212
1213
        dt: pd.Timestamp
1214
            The date for which we are checking for splits.  Note: this is
1215
            expected to be midnight UTC.
1216
1217
        Returns
1218
        -------
1219
        list: List of splits, where each split is a (sid, ratio) tuple.
1220
        """
1221
        if self._adjustment_reader is None or len(sids) == 0:
1222
            return {}
1223
1224
        # convert dt to # of seconds since epoch, because that's what we use
1225
        # in the adjustments db
1226
        seconds = int(dt.value / 1e9)
1227
1228
        splits = self._adjustment_reader.conn.execute(
1229
            "SELECT sid, ratio FROM SPLITS WHERE effective_date = ?",
1230
            (seconds,)).fetchall()
1231
1232
        sids_set = set(sids)
1233
        splits = [split for split in splits if split[0] in sids_set]
1234
1235
        return splits
1236
1237
    def get_stock_dividends(self, sid, trading_days):
1238
        """
1239
        Returns all the stock dividends for a specific sid that occur
1240
        in the given trading range.
1241
1242
        Parameters
1243
        ----------
1244
        sid: int
1245
            The asset whose stock dividends should be returned.
1246
1247
        trading_days: pd.DatetimeIndex
1248
            The trading range.
1249
1250
        Returns
1251
        -------
1252
        list: A list of objects with all relevant attributes populated.
1253
        All timestamp fields are converted to pd.Timestamps.
1254
        """
1255
1256
        if self._adjustment_reader is None:
1257
            return []
1258
1259
        if len(trading_days) == 0:
1260
            return []
1261
1262
        start_dt = trading_days[0].value / 1e9
1263
        end_dt = trading_days[-1].value / 1e9
1264
1265
        dividends = self._adjustment_reader.conn.execute(
1266
            "SELECT * FROM stock_dividend_payouts WHERE sid = ? AND "
1267
            "ex_date > ? AND pay_date < ?", (int(sid), start_dt, end_dt,)).\
1268
            fetchall()
1269
1270
        dividend_info = []
1271
        for dividend_tuple in dividends:
1272
            dividend_info.append({
1273
                "declared_date": dividend_tuple[1],
1274
                "ex_date": pd.Timestamp(dividend_tuple[2], unit="s"),
1275
                "pay_date": pd.Timestamp(dividend_tuple[3], unit="s"),
1276
                "payment_sid": dividend_tuple[4],
1277
                "ratio": dividend_tuple[5],
1278
                "record_date": pd.Timestamp(dividend_tuple[6], unit="s"),
1279
                "sid": dividend_tuple[7]
1280
            })
1281
1282
        return dividend_info
1283
1284
    def contains(self, asset, field):
1285
        return field in BASE_FIELDS or \
1286
            (field in self._augmented_sources_map and
1287
             asset in self._augmented_sources_map[field])
1288
1289
    def get_fetcher_assets(self):
1290
        """
1291
        Returns a list of assets for the current date, as defined by the
1292
        fetcher data.
1293
1294
        Notes
1295
        -----
1296
        Data is forward-filled.  If there is no fetcher data defined for day
1297
        N, we use day N-1's data (if available, otherwise we keep going back).
1298
1299
        Returns
1300
        -------
1301
        list: a list of Asset objects.
1302
        """
1303
        # return a list of assets for the current date, as defined by the
1304
        # fetcher source
1305
        if self._extra_source_df is None:
1306
            return []
1307
1308
        if self.current_day in self._extra_source_df.index:
1309
            date_to_use = self.current_day
1310
        else:
1311
            # current day isn't in the fetcher df, go back the last
1312
            # available day
1313
            idx = self._extra_source_df.index.searchsorted(self.current_day)
1314
            if idx == 0:
1315
                return []
1316
1317
            date_to_use = self._extra_source_df.index[idx - 1]
1318
1319
        asset_list = self._extra_source_df.loc[date_to_use]["sid"]
1320
1321
        # make sure they're actually assets
1322
        asset_list = [asset for asset in asset_list
1323
                      if isinstance(asset, Asset)]
1324
1325
        return asset_list
1326
1327
1328
class DataPortalSidView(object):
1329
    def __init__(self, asset, portal):
1330
        self.asset = asset
1331
        self.portal = portal
1332
1333
    def __getattr__(self, column):
1334
        return self.portal.get_spot_value(self.asset, column)
1335
1336
    def __contains__(self, column):
1337
        return self.portal.contains(self.asset, column)
1338
1339
    def __getitem__(self, column):
1340
        return self.__getattr__(column)
1341