Completed
Pull Request — master (#858)
by Eddie
01:32
created

zipline.data.DataPortal.get_fetcher_assets()   B

Complexity

Conditions 6

Size

Total Lines 37

Duplication

Lines 0
Ratio 0 %
Metric Value
cc 6
dl 0
loc 37
rs 7.5385
1
#
2
# Copyright 2015 Quantopian, Inc.
3
#
4
# Licensed under the Apache License, Version 2.0 (the "License");
5
# you may not use this file except in compliance with the License.
6
# You may obtain a copy of the License at
7
#
8
#     http://www.apache.org/licenses/LICENSE-2.0
9
#
10
# Unless required by applicable law or agreed to in writing, software
11
# distributed under the License is distributed on an "AS IS" BASIS,
12
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13
# See the License for the specific language governing permissions and
14
# limitations under the License.
15
16
import bcolz
17
from logbook import Logger
18
19
import numpy as np
20
import pandas as pd
21
from pandas.tslib import normalize_date
22
from six import iteritems
23
24
from zipline.assets import Asset, Future, Equity
25
from zipline.data.us_equity_pricing import NoDataOnDate
26
27
from zipline.utils import tradingcalendar
28
from zipline.errors import (
29
    NoTradeDataAvailableTooEarly,
30
    NoTradeDataAvailableTooLate
31
)
32
33
log = Logger('DataPortal')
34
35
BASE_FIELDS = {
36
    'open': 'open',
37
    'open_price': 'open',
38
    'high': 'high',
39
    'low': 'low',
40
    'close': 'close',
41
    'close_price': 'close',
42
    'volume': 'volume',
43
    'price': 'close'
44
}
45
46
HISTORY_FREQUENCIES = set(["1m", "1d"])
47
48
49
class DataPortal(object):
50
    def __init__(self,
51
                 env,
52
                 equity_daily_reader=None,
53
                 equity_minute_reader=None,
54
                 future_daily_reader=None,
55
                 future_minute_reader=None,
56
                 adjustment_reader=None):
57
        self.env = env
58
59
        # This is a bit ugly, but is here for performance reasons.  In minute
60
        # simulations, we need to very quickly go from dt -> (# of minutes
61
        # since Jan 1 2002 9:30 Eastern).
62
        #
63
        # The clock that heartbeats the simulation has all the necessary
64
        # information to do this calculation very quickly.  This value is
65
        # calculated there, and then set here
66
        self.cur_data_offset = 0
67
68
        self.views = {}
69
70
        self._asset_finder = env.asset_finder
71
72
        self._carrays = {
73
            'open': {},
74
            'high': {},
75
            'low': {},
76
            'close': {},
77
            'volume': {},
78
            'sid': {},
79
            'dt': {},
80
        }
81
82
        self._adjustment_reader = adjustment_reader
83
84
        # caches of sid -> adjustment list
85
        self._splits_dict = {}
86
        self._mergers_dict = {}
87
        self._dividends_dict = {}
88
89
        # Cache of sid -> the first trading day of an asset, even if that day
90
        # is before 1/2/2002.
91
        self._asset_start_dates = {}
92
        self._asset_end_dates = {}
93
94
        # Handle extra sources, like Fetcher.
95
        self._augmented_sources_map = {}
96
        self._extra_source_df = None
97
98
        self.MINUTE_PRICE_ADJUSTMENT_FACTOR = 0.001
99
100
        self._equity_daily_reader = equity_daily_reader
101
        self._equity_minute_reader = equity_minute_reader
102
        self._future_daily_reader = future_daily_reader
103
        self._future_minute_reader = future_minute_reader
104
105
        # The following values are used by _minute_offset to calculate the
106
        # index into the minute bcolz date.
107
108
        # A lookup of table every minute to the corresponding day, to avoid
109
        # calling `.date()` on every lookup.
110
        self._minutes_to_day = {}
111
        # A map of days (keyed by midnight) to a DatetimeIndex of market
112
        # minutes for that day.
113
        self._minutes_by_day = {}
114
        # A dict of day to the offset into the minute bcolz on which that
115
        # days data starts.
116
        self._day_offsets = None
117
118
    def handle_extra_source(self, source_df, sim_params):
119
        """
120
        Extra sources always have a sid column.
121
122
        We expand the given data (by forward filling) to the full range of
123
        the simulation dates, so that lookup is fast during simulation.
124
        """
125
        if source_df is None:
126
            return
127
128
        self._extra_source_df = source_df
129
130
        # source_df's sid column can either consist of assets we know about
131
        # (such as sid(24)) or of assets we don't know about (such as
132
        # palladium).
133
        #
134
        # In both cases, we break up the dataframe into individual dfs
135
        # that only contain a single asset's information.  ie, if source_df
136
        # has data for PALLADIUM and GOLD, we split source_df into two
137
        # dataframes, one for each. (same applies if source_df has data for
138
        # AAPL and IBM).
139
        #
140
        # We then take each child df and reindex it to the simulation's date
141
        # range by forward-filling missing values. this makes reads simpler.
142
        #
143
        # Finally, we store the data. For each column, we store a mapping in
144
        # self.augmented_sources_map from the column to a dictionary of
145
        # asset -> df.  In other words,
146
        # self.augmented_sources_map['days_to_cover']['AAPL'] gives us the df
147
        # holding that data.
148
149
        if sim_params.emission_rate == "daily":
150
            source_date_index = self.env.days_in_range(
151
                start=sim_params.period_start,
152
                end=sim_params.period_end
153
            )
154
        else:
155
            source_date_index = self.env.minutes_for_days_in_range(
156
                start=sim_params.period_start,
157
                end=sim_params.period_end
158
            )
159
160
        # Break the source_df up into one dataframe per sid.  This lets
161
        # us (more easily) calculate accurate start/end dates for each sid,
162
        # de-dup data, and expand the data to fit the backtest start/end date.
163
        grouped_by_sid = source_df.groupby(["sid"])
164
        group_names = grouped_by_sid.groups.keys()
165
        group_dict = {}
166
        for group_name in group_names:
167
            group_dict[group_name] = grouped_by_sid.get_group(group_name)
168
169
        for identifier, df in iteritems(group_dict):
170
            # Before reindexing, save the earliest and latest dates
171
            earliest_date = df.index[0]
172
            latest_date = df.index[-1]
173
174
            # Since we know this df only contains a single sid, we can safely
175
            # de-dupe by the index (dt)
176
            df = df.groupby(level=0).last()
177
178
            # Reindex the dataframe based on the backtest start/end date.
179
            # This makes reads easier during the backtest.
180
            df = df.reindex(index=source_date_index, method='ffill')
181
182
            if not isinstance(identifier, Asset):
183
                # for fake assets we need to store a start/end date
184
                self._asset_start_dates[identifier] = earliest_date
185
                self._asset_end_dates[identifier] = latest_date
186
187
            for col_name in df.columns.difference(['sid']):
188
                if col_name not in self._augmented_sources_map:
189
                    self._augmented_sources_map[col_name] = {}
190
191
                self._augmented_sources_map[col_name][identifier] = df
192
193
    def _open_minute_file(self, field, asset):
194
        sid_str = str(int(asset))
195
196
        try:
197
            carray = self._carrays[field][sid_str]
198
        except KeyError:
199
            carray = self._carrays[field][sid_str] = \
200
                self._get_ctable(asset)[field]
201
202
        return carray
203
204
    def _get_ctable(self, asset):
205
        sid = int(asset)
206
207
        if isinstance(asset, Future):
208
            if self._future_minute_reader.sid_path_func is not None:
209
                path = self._future_minute_reader.sid_path_func(
210
                    self._future_minute_reader.rootdir, sid
211
                )
212
            else:
213
                path = "{0}/{1}.bcolz".format(
214
                    self._future_minute_reader.rootdir, sid)
215
        elif isinstance(asset, Equity):
216
            if self._equity_minute_reader.sid_path_func is not None:
217
                path = self._equity_minute_reader.sid_path_func(
218
                    self._equity_minute_reader.rootdir, sid
219
                )
220
            else:
221
                path = "{0}/{1}.bcolz".format(
222
                    self._equity_minute_reader.rootdir, sid)
223
224
        else:
225
            # TODO: Figure out if assets should be allowed if neither, and
226
            # why this code path is being hit.
227
            if self._equity_minute_reader.sid_path_func is not None:
228
                path = self._equity_minute_reader.sid_path_func(
229
                    self._equity_minute_reader.rootdir, sid
230
                )
231
            else:
232
                path = "{0}/{1}.bcolz".format(
233
                    self._equity_minute_reader.rootdir, sid)
234
235
        return bcolz.open(path, mode='r')
236
237
    def get_previous_value(self, asset, field, dt, data_frequency):
238
        """
239
        Given an asset and a column and a dt, returns the previous value for
240
        the same asset/column pair.  If this data portal is in minute mode,
241
        it's the previous minute value, otherwise it's the previous day's
242
        value.
243
244
        Parameters
245
        ---------
246
        asset : Asset
247
            The asset whose data is desired.
248
249
        field: string
250
            The desired field of the asset.  Valid values are "open",
251
            "open_price", "high", "low", "close", "close_price", "volume", and
252
            "price".
253
254
        dt: pd.Timestamp
255
            The timestamp from which to go back in time one slot.
256
257
        data_frequency: string
258
            The frequency of the data to query; i.e. whether the data is
259
            'daily' or 'minute' bars
260
261
        Returns
262
        -------
263
        The value of the desired field at the desired time.
264
        """
265
        if data_frequency == 'daily':
266
            prev_dt = self.env.previous_trading_day(dt)
267
        elif data_frequency == 'minute':
268
            prev_dt = self.env.previous_market_minute(dt)
269
270
        return self.get_spot_value(asset, field, prev_dt, data_frequency)
271
272
    def _check_extra_sources(self, asset, column, day):
273
        # If we have an extra source with a column called "price", only look
274
        # at it if it's on something like palladium and not AAPL (since our
275
        # own price data always wins when dealing with assets).
276
        look_in_augmented_sources = column in self._augmented_sources_map and \
277
            not (column in BASE_FIELDS and isinstance(asset, Asset))
278
279
        if look_in_augmented_sources:
280
            # we're being asked for a field in an extra source
281
            try:
282
                return self._augmented_sources_map[column][asset].\
283
                    loc[day, column]
284
            except:
285
                log.error(
286
                    "Could not find value for asset={0}, day={1},"
287
                    "column={2}".format(
288
                        str(asset),
289
                        str(day),
290
                        str(column)))
291
292
                raise KeyError
293
294
    def get_spot_value(self, asset, field, dt, data_frequency):
295
        """
296
        Public API method that returns a scalar value representing the value
297
        of the desired asset's field at either the given dt.
298
299
        Parameters
300
        ---------
301
        asset : Asset
302
            The asset whose data is desired.gith
303
304
        field: string
305
            The desired field of the asset.  Valid values are "open",
306
            "open_price", "high", "low", "close", "close_price", "volume", and
307
            "price".
308
309
        dt: pd.Timestamp
310
            The timestamp for the desired value.
311
312
        data_frequency: string
313
            The frequency of the data to query; i.e. whether the data is
314
            'daily' or 'minute' bars
315
316
        Returns
317
        -------
318
        The value of the desired field at the desired time.
319
        """
320
        extra_source_val = self._check_extra_sources(
321
            asset,
322
            field,
323
            dt,
324
        )
325
326
        if extra_source_val is not None:
327
            return extra_source_val
328
329
        if field not in BASE_FIELDS:
330
            raise KeyError("Invalid column: " + str(field))
331
332
        column_to_use = BASE_FIELDS[field]
333
334
        if isinstance(asset, int):
335
            asset = self._asset_finder.retrieve_asset(asset)
336
337
        self._check_is_currently_alive(asset, dt)
338
339
        if data_frequency == "daily":
340
            day_to_use = dt
341
            day_to_use = normalize_date(day_to_use)
342
            return self._get_daily_data(asset, column_to_use, day_to_use)
343
        else:
344
            if isinstance(asset, Future):
345
                return self._get_minute_spot_value_future(
346
                    asset, column_to_use, dt)
347
            else:
348
                return self._get_minute_spot_value(
349
                    asset, column_to_use, dt)
350
351
    def _get_minute_spot_value_future(self, asset, column, dt):
352
        # Futures bcolz files have 1440 bars per day (24 hours), 7 days a week.
353
        # The file attributes contain the "start_dt" and "last_dt" fields,
354
        # which represent the time period for this bcolz file.
355
356
        # The start_dt is midnight of the first day that this future started
357
        # trading.
358
359
        # figure out the # of minutes between dt and this asset's start_dt
360
        start_date = self._get_asset_start_date(asset)
361
        minute_offset = int((dt - start_date).total_seconds() / 60)
362
363
        if minute_offset < 0:
364
            # asking for a date that is before the asset's start date, no dice
365
            return 0.0
366
367
        # then just index into the bcolz carray at that offset
368
        carray = self._open_minute_file(column, asset)
369
        result = carray[minute_offset]
370
371
        # if there's missing data, go backwards until we run out of file
372
        while result == 0 and minute_offset > 0:
373
            minute_offset -= 1
374
            result = carray[minute_offset]
375
376
        if column != 'volume':
377
            return result * self.MINUTE_PRICE_ADJUSTMENT_FACTOR
378
        else:
379
            return result
380
381
    def setup_offset_cache(self, minutes_by_day, minutes_to_day, trading_days):
382
        # TODO: This case should not be hit, but is when tests are setup
383
        # with data_frequency of daily, but run with minutely.
384
        if self._equity_minute_reader is None:
385
            return
386
387
        self._minutes_to_day = minutes_to_day
388
        self._minutes_by_day = minutes_by_day
389
        start = trading_days[0]
390
        first_trading_day_idx = self._equity_minute_reader.trading_days.\
391
            searchsorted(start)
392
        self._day_offsets = {
393
            day: (i + first_trading_day_idx) * 390
394
            for i, day in enumerate(trading_days)}
395
396
    def _minute_offset(self, dt):
397
        if self._day_offsets is not None:
398
            try:
399
                day = self._minutes_to_day[dt]
400
                minutes = self._minutes_by_day[day]
401
                return self._day_offsets[day] + minutes.get_loc(dt)
402
            except KeyError:
403
                return None
404
405
    def _get_minute_spot_value(self, asset, column, dt):
406
        # if dt is before the first market minute, minute_index
407
        # will be 0.  if it's after the last market minute, it'll
408
        # be len(minutes_for_day)
409
        minute_offset_to_use = self._minute_offset(dt)
410
411
        if minute_offset_to_use is None:
412
            given_day = pd.Timestamp(dt.date(), tz='utc')
413
            day_index = self._equity_minute_reader.trading_days.searchsorted(
414
                given_day)
415
416
            # if dt is before the first market minute, minute_index
417
            # will be 0.  if it's after the last market minute, it'll
418
            # be len(minutes_for_day)
419
            minute_index = self.env.market_minutes_for_day(given_day).\
420
                searchsorted(dt)
421
422
            minute_offset_to_use = (day_index * 390) + minute_index
423
424
        carray = self._equity_minute_reader._open_minute_file(column, asset)
425
        result = carray[minute_offset_to_use]
426
427
        if result == 0:
428
            # if the given minute doesn't have data, we need to seek
429
            # backwards until we find data. This makes the data
430
            # forward-filled.
431
432
            # get this asset's start date, so that we don't look before it.
433
            start_date = self._get_asset_start_date(asset)
434
            start_date_idx = self._equity_minute_reader.trading_days.\
435
                searchsorted(start_date)
436
            start_day_offset = start_date_idx * 390
437
438
            original_start = minute_offset_to_use
439
440
            while result == 0 and minute_offset_to_use > start_day_offset:
441
                minute_offset_to_use -= 1
442
                result = carray[minute_offset_to_use]
443
444
            # once we've found data, we need to check whether it needs
445
            # to be adjusted.
446
            if result != 0:
447
                minutes = self.env.market_minute_window(
448
                    start=dt,
449
                    count=(original_start - minute_offset_to_use + 1),
450
                    step=-1
451
                ).order()
452
453
                # only need to check for adjustments if we've gone back
454
                # far enough to cross the day boundary.
455
                if minutes[0].date() != minutes[-1].date():
456
                    # create a np array of size minutes, fill it all with
457
                    # the same value.  and adjust the array.
458
                    arr = np.array([result] * len(minutes),
459
                                   dtype=np.float64)
460
                    self._apply_all_adjustments(
461
                        data=arr,
462
                        asset=asset,
463
                        dts=minutes,
464
                        field=column
465
                    )
466
467
                    # The first value of the adjusted array is the value
468
                    # we want.
469
                    result = arr[0]
470
471
        if column != 'volume':
472
            return result * self.MINUTE_PRICE_ADJUSTMENT_FACTOR
473
        else:
474
            return result
475
476
    def _get_daily_data(self, asset, column, dt):
477
        while True:
478
            try:
479
                value = self._equity_daily_reader.spot_price(asset, dt, column)
480
                if value != -1:
481
                    return value
482
                else:
483
                    dt -= tradingcalendar.trading_day
484
            except NoDataOnDate:
485
                return 0
486
487
    def _get_history_daily_window(self, assets, end_dt, bar_count,
488
                                  field_to_use):
489
        """
490
        Internal method that returns a dataframe containing history bars
491
        of daily frequency for the given sids.
492
        """
493
        day_idx = tradingcalendar.trading_days.searchsorted(end_dt.date())
494
        days_for_window = tradingcalendar.trading_days[
495
            (day_idx - bar_count + 1):(day_idx + 1)]
496
497
        if len(assets) == 0:
498
            return pd.DataFrame(None,
499
                                index=days_for_window,
500
                                columns=None)
501
502
        data = []
503
504
        for asset in assets:
505
            if isinstance(asset, Future):
506
                data.append(self._get_history_daily_window_future(
507
                    asset, days_for_window, end_dt, field_to_use
508
                ))
509
            else:
510
                data.append(self._get_history_daily_window_equity(
511
                    asset, days_for_window, end_dt, field_to_use
512
                ))
513
514
        return pd.DataFrame(
515
            np.array(data).T,
516
            index=days_for_window,
517
            columns=assets
518
        )
519
520
    def _get_history_daily_window_future(self, asset, days_for_window,
521
                                         end_dt, column):
522
        # Since we don't have daily bcolz files for futures (yet), use minute
523
        # bars to calculate the daily values.
524
        data = []
525
        data_groups = []
526
527
        # get all the minutes for the days NOT including today
528
        for day in days_for_window[:-1]:
529
            minutes = self.env.market_minutes_for_day(day)
530
531
            values_for_day = np.zeros(len(minutes), dtype=np.float64)
532
533
            for idx, minute in enumerate(minutes):
534
                minute_val = self._get_minute_spot_value_future(
535
                    asset, column, minute
536
                )
537
538
                values_for_day[idx] = minute_val
539
540
            data_groups.append(values_for_day)
541
542
        # get the minutes for today
543
        last_day_minutes = pd.date_range(
544
            start=self.env.get_open_and_close(end_dt)[0],
545
            end=end_dt,
546
            freq="T"
547
        )
548
549
        values_for_last_day = np.zeros(len(last_day_minutes), dtype=np.float64)
550
551
        for idx, minute in enumerate(last_day_minutes):
552
            minute_val = self._get_minute_spot_value_future(
553
                asset, column, minute
554
            )
555
556
            values_for_last_day[idx] = minute_val
557
558
        data_groups.append(values_for_last_day)
559
560
        for group in data_groups:
561
            if len(group) == 0:
562
                continue
563
564
            if column == 'volume':
565
                data.append(np.sum(group))
566
            elif column == 'open':
567
                data.append(group[0])
568
            elif column == 'close':
569
                data.append(group[-1])
570
            elif column == 'high':
571
                data.append(np.amax(group))
572
            elif column == 'low':
573
                data.append(np.amin(group))
574
575
        return data
576
577
    def _get_history_daily_window_equity(self, asset, days_for_window,
578
                                         end_dt, field_to_use):
579
        sid = int(asset)
580
        ends_at_midnight = end_dt.hour == 0 and end_dt.minute == 0
581
582
        # get the start and end dates for this sid
583
        end_date = self._get_asset_end_date(asset)
584
585
        if ends_at_midnight or (days_for_window[-1] > end_date):
586
            # two cases where we use daily data for the whole range:
587
            # 1) the history window ends at midnight utc.
588
            # 2) the last desired day of the window is after the
589
            # last trading day, use daily data for the whole range.
590
            return self._get_daily_window_for_sid(
591
                asset,
592
                field_to_use,
593
                days_for_window,
594
                extra_slot=False
595
            )
596
        else:
597
            # for the last day of the desired window, use minute
598
            # data and aggregate it.
599
            all_minutes_for_day = self.env.market_minutes_for_day(
600
                pd.Timestamp(end_dt.date()))
601
602
            last_minute_idx = all_minutes_for_day.searchsorted(end_dt)
603
604
            # these are the minutes for the partial day
605
            minutes_for_partial_day =\
606
                all_minutes_for_day[0:(last_minute_idx + 1)]
607
608
            daily_data = self._get_daily_window_for_sid(
609
                sid,
610
                field_to_use,
611
                days_for_window[0:-1]
612
            )
613
614
            minute_data = self._get_minute_window_for_equity(
615
                sid,
616
                field_to_use,
617
                minutes_for_partial_day
618
            )
619
620
            if field_to_use == 'volume':
621
                minute_value = np.sum(minute_data)
622
            elif field_to_use == 'open':
623
                minute_value = minute_data[0]
624
            elif field_to_use == 'close':
625
                minute_value = minute_data[-1]
626
            elif field_to_use == 'high':
627
                minute_value = np.amax(minute_data)
628
            elif field_to_use == 'low':
629
                minute_value = np.amin(minute_data)
630
631
            # append the partial day.
632
            daily_data[-1] = minute_value
633
634
            return daily_data
635
636
    def _get_history_minute_window(self, assets, end_dt, bar_count,
637
                                   field_to_use):
638
        """
639
        Internal method that returns a dataframe containing history bars
640
        of minute frequency for the given sids.
641
        """
642
        # get all the minutes for this window
643
        minutes_for_window = self.env.market_minute_window(
644
            end_dt, bar_count, step=-1)[::-1]
645
646
        first_trading_day = self._equity_minute_reader.first_trading_day
647
648
        # but then cut it down to only the minutes after
649
        # the first trading day.
650
        modified_minutes_for_window = minutes_for_window[
651
            minutes_for_window.slice_indexer(first_trading_day)]
652
653
        modified_minutes_length = len(modified_minutes_for_window)
654
655
        if modified_minutes_length == 0:
656
            raise ValueError("Cannot calculate history window that ends"
657
                             "before 2002-01-02 14:31 UTC!")
658
659
        data = []
660
        bars_to_prepend = 0
661
        nans_to_prepend = None
662
663
        if modified_minutes_length < bar_count:
664
            first_trading_date = first_trading_day.date()
665
            if modified_minutes_for_window[0].date() == first_trading_date:
666
                # the beginning of the window goes before our global trading
667
                # start date
668
                bars_to_prepend = bar_count - modified_minutes_length
669
                nans_to_prepend = np.repeat(np.nan, bars_to_prepend)
670
671
        if len(assets) == 0:
672
            return pd.DataFrame(
673
                None,
674
                index=modified_minutes_for_window,
675
                columns=None
676
            )
677
678
        for asset in assets:
679
            asset_minute_data = self._get_minute_window_for_asset(
680
                asset,
681
                field_to_use,
682
                modified_minutes_for_window
683
            )
684
685
            if bars_to_prepend != 0:
686
                asset_minute_data = np.insert(asset_minute_data, 0,
687
                                              nans_to_prepend)
688
689
            data.append(asset_minute_data)
690
691
        return pd.DataFrame(
692
            np.array(data).T,
693
            index=minutes_for_window,
694
            columns=assets
695
        )
696
697
    def get_history_window(self, assets, end_dt, bar_count, frequency, field,
698
                           ffill=True):
699
        """
700
        Public API method that returns a dataframe containing the requested
701
        history window.  Data is fully adjusted.
702
703
        Parameters
704
        ---------
705
        assets : list of zipline.data.Asset objects
706
            The assets whose data is desired.
707
708
        bar_count: int
709
            The number of bars desired.
710
711
        frequency: string
712
            "1d" or "1m"
713
714
        field: string
715
            The desired field of the asset.
716
717
        ffill: boolean
718
            Forward-fill missing values. Only has effect if field
719
            is 'price'.
720
721
        Returns
722
        -------
723
        A dataframe containing the requested data.
724
        """
725
        try:
726
            field_to_use = BASE_FIELDS[field]
727
        except KeyError:
728
            raise ValueError("Invalid history field: " + str(field))
729
730
        # sanity check in case sids were passed in
731
        assets = [(self.env.asset_finder.retrieve_asset(asset) if
732
                   isinstance(asset, int) else asset) for asset in assets]
733
734
        if frequency == "1d":
735
            df = self._get_history_daily_window(assets, end_dt, bar_count,
736
                                                field_to_use)
737
        elif frequency == "1m":
738
            df = self._get_history_minute_window(assets, end_dt, bar_count,
739
                                                 field_to_use)
740
        else:
741
            raise ValueError("Invalid frequency: {0}".format(frequency))
742
743
        # forward-fill if needed
744
        if field == "price" and ffill:
745
            df.fillna(method='ffill', inplace=True)
746
747
        return df
748
749
    def _get_minute_window_for_asset(self, asset, field, minutes_for_window):
750
        """
751
        Internal method that gets a window of adjusted minute data for an asset
752
        and specified date range.  Used to support the history API method for
753
        minute bars.
754
755
        Missing bars are filled with NaN.
756
757
        Parameters
758
        ----------
759
        asset : Asset
760
            The asset whose data is desired.
761
762
        field: string
763
            The specific field to return.  "open", "high", "close_price", etc.
764
765
        minutes_for_window: pd.DateTimeIndex
766
            The list of minutes representing the desired window.  Each minute
767
            is a pd.Timestamp.
768
769
        Returns
770
        -------
771
        A numpy array with requested values.
772
        """
773
        if isinstance(asset, int):
774
            asset = self.env.asset_finder.retrieve_asset(asset)
775
776
        if isinstance(asset, Future):
777
            return self._get_minute_window_for_future(asset, field,
778
                                                      minutes_for_window)
779
        else:
780
            return self._get_minute_window_for_equity(asset, field,
781
                                                      minutes_for_window)
782
783
    def _get_minute_window_for_future(self, asset, field, minutes_for_window):
784
        # THIS IS TEMPORARY.  For now, we are only exposing futures within
785
        # equity trading hours (9:30 am to 4pm, Eastern).  The easiest way to
786
        # do this is to simply do a spot lookup for each desired minute.
787
        return_data = np.zeros(len(minutes_for_window), dtype=np.float64)
788
        for idx, minute in enumerate(minutes_for_window):
789
            return_data[idx] = \
790
                self._get_minute_spot_value_future(asset, field, minute)
791
792
        # Note: an improvement could be to find the consecutive runs within
793
        # minutes_for_window, and use them to read the underlying ctable
794
        # more efficiently.
795
796
        # Once futures are on 24-hour clock, then we can just grab all the
797
        # requested minutes in one shot from the ctable.
798
799
        # no adjustments for futures, yay.
800
        return return_data
801
802
    def _get_minute_window_for_equity(self, asset, field, minutes_for_window):
803
        # each sid's minutes are stored in a bcolz file
804
        # the bcolz file has 390 bars per day, starting at 1/2/2002, regardless
805
        # of when the asset started trading and regardless of half days.
806
        # for a half day, the second half is filled with zeroes.
807
808
        # find the position of start_dt in the entire timeline, go back
809
        # bar_count bars, and that's the unadjusted data
810
        raw_data = self._equity_minute_reader._open_minute_file(field, asset)
811
812
        start_idx = max(
813
            self._equity_minute_reader._find_position_of_minute(
814
                minutes_for_window[0]),
815
            0)
816
        end_idx = self._equity_minute_reader._find_position_of_minute(
817
            minutes_for_window[-1]) + 1
818
819
        if end_idx == 0:
820
            # No data to return for minute window.
821
            return np.full(len(minutes_for_window), np.nan)
822
823
        return_data = np.zeros(len(minutes_for_window), dtype=np.float64)
824
825
        data_to_copy = raw_data[start_idx:end_idx]
826
827
        num_minutes = len(minutes_for_window)
828
829
        # data_to_copy contains all the zeros (from 1pm to 4pm of an early
830
        # close).  num_minutes is the number of actual trading minutes.  if
831
        # these two have different lengths, that means that we need to trim
832
        # away data due to early closes.
833
        if len(data_to_copy) != num_minutes:
834
            # get a copy of the minutes in Eastern time, since we depend on
835
            # an early close being at 1pm Eastern.
836
            eastern_minutes = minutes_for_window.tz_convert("US/Eastern")
837
838
            # accumulate a list of indices of the last minute of an early
839
            # close day.  For example, if data_to_copy starts at 12:55 pm, and
840
            # there are five minutes of real data before 180 zeroes, we would
841
            # put 5 into last_minute_idx_of_early_close_day, because the fifth
842
            # minute is the last "real" minute of the day.
843
            last_minute_idx_of_early_close_day = []
844
            for minute_idx, minute_dt in enumerate(eastern_minutes):
845
                if minute_idx == (num_minutes - 1):
846
                    break
847
848
                if minute_dt.hour == 13 and minute_dt.minute == 0:
849
                    next_minute = eastern_minutes[minute_idx + 1]
850
                    if next_minute.hour != 13:
851
                        # minute_dt is the last minute of an early close day
852
                        last_minute_idx_of_early_close_day.append(minute_idx)
853
854
            # spin through the list of early close markers, and use them to
855
            # chop off 180 minutes at a time from data_to_copy.
856
            for idx, early_close_minute_idx in \
857
                    enumerate(last_minute_idx_of_early_close_day):
858
                early_close_minute_idx -= (180 * idx)
859
                data_to_copy = np.delete(
860
                    data_to_copy,
861
                    range(
862
                        early_close_minute_idx + 1,
863
                        early_close_minute_idx + 181
864
                    )
865
                )
866
867
        return_data[0:len(data_to_copy)] = data_to_copy
868
869
        self._apply_all_adjustments(
870
            return_data,
871
            asset,
872
            minutes_for_window,
873
            field,
874
            self.MINUTE_PRICE_ADJUSTMENT_FACTOR
875
        )
876
877
        return return_data
878
879
    def _apply_all_adjustments(self, data, asset, dts, field,
880
                               price_adj_factor=1.0):
881
        """
882
        Internal method that applies all the necessary adjustments on the
883
        given data array.
884
885
        The adjustments are:
886
        - splits
887
        - if field != "volume":
888
            - mergers
889
            - dividends
890
            - * 0.001
891
            - any zero fields replaced with NaN
892
        - all values rounded to 3 digits after the decimal point.
893
894
        Parameters
895
        ----------
896
        data : np.array
897
            The data to be adjusted.
898
899
        asset: Asset
900
            The asset whose data is being adjusted.
901
902
        dts: pd.DateTimeIndex
903
            The list of minutes or days representing the desired window.
904
905
        field: string
906
            The field whose values are in the data array.
907
908
        price_adj_factor: float
909
            Factor with which to adjust OHLC values.
910
        Returns
911
        -------
912
        None.  The data array is modified in place.
913
        """
914
        self._apply_adjustments_to_window(
915
            self._get_adjustment_list(
916
                asset, self._splits_dict, "SPLITS"
917
            ),
918
            data,
919
            dts,
920
            field != 'volume'
921
        )
922
923
        if field != 'volume':
924
            self._apply_adjustments_to_window(
925
                self._get_adjustment_list(
926
                    asset, self._mergers_dict, "MERGERS"
927
                ),
928
                data,
929
                dts,
930
                True
931
            )
932
933
            self._apply_adjustments_to_window(
934
                self._get_adjustment_list(
935
                    asset, self._dividends_dict, "DIVIDENDS"
936
                ),
937
                data,
938
                dts,
939
                True
940
            )
941
942
            data *= price_adj_factor
943
944
            # if anything is zero, it's a missing bar, so replace it with NaN.
945
            # we only want to do this for non-volume fields, because a missing
946
            # volume should be 0.
947
            data[data == 0] = np.NaN
948
949
        np.around(data, 3, out=data)
950
951
    def _get_daily_window_for_sid(self, asset, field, days_in_window,
952
                                  extra_slot=True):
953
        """
954
        Internal method that gets a window of adjusted daily data for a sid
955
        and specified date range.  Used to support the history API method for
956
        daily bars.
957
958
        Parameters
959
        ----------
960
        asset : Asset
961
            The asset whose data is desired.
962
963
        start_dt: pandas.Timestamp
964
            The start of the desired window of data.
965
966
        bar_count: int
967
            The number of days of data to return.
968
969
        field: string
970
            The specific field to return.  "open", "high", "close_price", etc.
971
972
        extra_slot: boolean
973
            Whether to allocate an extra slot in the returned numpy array.
974
            This extra slot will hold the data for the last partial day.  It's
975
            much better to create it here than to create a copy of the array
976
            later just to add a slot.
977
978
        Returns
979
        -------
980
        A numpy array with requested values.  Any missing slots filled with
981
        nan.
982
983
        """
984
        bar_count = len(days_in_window)
985
        # create an np.array of size bar_count
986
        if extra_slot:
987
            return_array = np.zeros((bar_count + 1,))
988
        else:
989
            return_array = np.zeros((bar_count,))
990
991
        return_array[:] = np.NAN
992
993
        start_date = self._get_asset_start_date(asset)
994
        end_date = self._get_asset_end_date(asset)
995
        day_slice = days_in_window.slice_indexer(start_date, end_date)
996
        active_days = days_in_window[day_slice]
997
998
        if active_days.shape[0]:
999
            data = self._equity_daily_reader.history_window(field,
1000
                                                            active_days[0],
1001
                                                            active_days[-1],
1002
                                                            asset)
1003
            return_array[day_slice] = data
1004
            self._apply_all_adjustments(
1005
                return_array,
1006
                asset,
1007
                active_days,
1008
                field,
1009
            )
1010
1011
        return return_array
1012
1013
    @staticmethod
1014
    def _apply_adjustments_to_window(adjustments_list, window_data,
1015
                                     dts_in_window, multiply):
1016
        if len(adjustments_list) == 0:
1017
            return
1018
1019
        # advance idx to the correct spot in the adjustments list, based on
1020
        # when the window starts
1021
        idx = 0
1022
1023
        while idx < len(adjustments_list) and dts_in_window[0] >\
1024
                adjustments_list[idx][0]:
1025
            idx += 1
1026
1027
        # if we've advanced through all the adjustments, then there's nothing
1028
        # to do.
1029
        if idx == len(adjustments_list):
1030
            return
1031
1032
        while idx < len(adjustments_list):
1033
            adjustment_to_apply = adjustments_list[idx]
1034
1035
            if adjustment_to_apply[0] > dts_in_window[-1]:
1036
                break
1037
1038
            range_end = dts_in_window.searchsorted(adjustment_to_apply[0])
1039
            if multiply:
1040
                window_data[0:range_end] *= adjustment_to_apply[1]
1041
            else:
1042
                window_data[0:range_end] /= adjustment_to_apply[1]
1043
1044
            idx += 1
1045
1046
    def _get_adjustment_list(self, asset, adjustments_dict, table_name):
1047
        """
1048
        Internal method that returns a list of adjustments for the given sid.
1049
1050
        Parameters
1051
        ----------
1052
        asset : Asset
1053
            The asset for which to return adjustments.
1054
1055
        adjustments_dict: dict
1056
            A dictionary of sid -> list that is used as a cache.
1057
1058
        table_name: string
1059
            The table that contains this data in the adjustments db.
1060
1061
        Returns
1062
        -------
1063
        adjustments: list
1064
            A list of [multiplier, pd.Timestamp], earliest first
1065
1066
        """
1067
        if self._adjustment_reader is None:
1068
            return []
1069
1070
        sid = int(asset)
1071
1072
        try:
1073
            adjustments = adjustments_dict[sid]
1074
        except KeyError:
1075
            adjustments = adjustments_dict[sid] = self._adjustment_reader.\
1076
                get_adjustments_for_sid(table_name, sid)
1077
1078
        return adjustments
1079
1080
    def _check_is_currently_alive(self, asset, dt):
1081
        sid = int(asset)
1082
1083
        if sid not in self._asset_start_dates:
1084
            self._get_asset_start_date(asset)
1085
1086
        start_date = self._asset_start_dates[sid]
1087
        if self._asset_start_dates[sid] > dt:
1088
            raise NoTradeDataAvailableTooEarly(
1089
                sid=sid,
1090
                dt=normalize_date(dt),
1091
                start_dt=start_date
1092
            )
1093
1094
        end_date = self._asset_end_dates[sid]
1095
        if self._asset_end_dates[sid] < dt:
1096
            raise NoTradeDataAvailableTooLate(
1097
                sid=sid,
1098
                dt=normalize_date(dt),
1099
                end_dt=end_date
1100
            )
1101
1102
    def _get_asset_start_date(self, asset):
1103
        self._ensure_asset_dates(asset)
1104
        return self._asset_start_dates[asset]
1105
1106
    def _get_asset_end_date(self, asset):
1107
        self._ensure_asset_dates(asset)
1108
        return self._asset_end_dates[asset]
1109
1110
    def _ensure_asset_dates(self, asset):
1111
        sid = int(asset)
1112
1113
        if sid not in self._asset_start_dates:
1114
            self._asset_start_dates[sid] = asset.start_date
1115
            self._asset_end_dates[sid] = asset.end_date
1116
1117
    def get_splits(self, sids, dt):
1118
        """
1119
        Returns any splits for the given sids and the given dt.
1120
1121
        Parameters
1122
        ----------
1123
        sids : list
1124
            Sids for which we want splits.
1125
1126
        dt: pd.Timestamp
1127
            The date for which we are checking for splits.  Note: this is
1128
            expected to be midnight UTC.
1129
1130
        Returns
1131
        -------
1132
        list: List of splits, where each split is a (sid, ratio) tuple.
1133
        """
1134
        if self._adjustment_reader is None or len(sids) == 0:
1135
            return {}
1136
1137
        # convert dt to # of seconds since epoch, because that's what we use
1138
        # in the adjustments db
1139
        seconds = int(dt.value / 1e9)
1140
1141
        splits = self._adjustment_reader.conn.execute(
1142
            "SELECT sid, ratio FROM SPLITS WHERE effective_date = ?",
1143
            (seconds,)).fetchall()
1144
1145
        sids_set = set(sids)
1146
        splits = [split for split in splits if split[0] in sids_set]
1147
1148
        return splits
1149
1150
    def get_stock_dividends(self, sid, trading_days):
1151
        """
1152
        Returns all the stock dividends for a specific sid that occur
1153
        in the given trading range.
1154
1155
        Parameters
1156
        ----------
1157
        sid: int
1158
            The asset whose stock dividends should be returned.
1159
1160
        trading_days: pd.DatetimeIndex
1161
            The trading range.
1162
1163
        Returns
1164
        -------
1165
        list: A list of objects with all relevant attributes populated.
1166
        All timestamp fields are converted to pd.Timestamps.
1167
        """
1168
1169
        if self._adjustment_reader is None:
1170
            return []
1171
1172
        if len(trading_days) == 0:
1173
            return []
1174
1175
        start_dt = trading_days[0].value / 1e9
1176
        end_dt = trading_days[-1].value / 1e9
1177
1178
        dividends = self._adjustment_reader.conn.execute(
1179
            "SELECT * FROM stock_dividend_payouts WHERE sid = ? AND "
1180
            "ex_date > ? AND pay_date < ?", (int(sid), start_dt, end_dt,)).\
1181
            fetchall()
1182
1183
        dividend_info = []
1184
        for dividend_tuple in dividends:
1185
            dividend_info.append({
1186
                "declared_date": dividend_tuple[1],
1187
                "ex_date": pd.Timestamp(dividend_tuple[2], unit="s"),
1188
                "pay_date": pd.Timestamp(dividend_tuple[3], unit="s"),
1189
                "payment_sid": dividend_tuple[4],
1190
                "ratio": dividend_tuple[5],
1191
                "record_date": pd.Timestamp(dividend_tuple[6], unit="s"),
1192
                "sid": dividend_tuple[7]
1193
            })
1194
1195
        return dividend_info
1196
1197
    def contains(self, asset, field):
1198
        return field in BASE_FIELDS or \
1199
            (field in self._augmented_sources_map and
1200
             asset in self._augmented_sources_map[field])
1201
1202
    def get_fetcher_assets(self, day):
1203
        """
1204
        Returns a list of assets for the current date, as defined by the
1205
        fetcher data.
1206
1207
        Notes
1208
        -----
1209
        Data is forward-filled.  If there is no fetcher data defined for day
1210
        N, we use day N-1's data (if available, otherwise we keep going back).
1211
1212
        Returns
1213
        -------
1214
        list: a list of Asset objects.
1215
        """
1216
        # return a list of assets for the current date, as defined by the
1217
        # fetcher source
1218
        if self._extra_source_df is None:
1219
            return []
1220
1221
        if day in self._extra_source_df.index:
1222
            date_to_use = day
1223
        else:
1224
            # current day isn't in the fetcher df, go back the last
1225
            # available day
1226
            idx = self._extra_source_df.index.searchsorted(day)
1227
            if idx == 0:
1228
                return []
1229
1230
            date_to_use = self._extra_source_df.index[idx - 1]
1231
1232
        asset_list = self._extra_source_df.loc[date_to_use]["sid"]
1233
1234
        # make sure they're actually assets
1235
        asset_list = [asset for asset in asset_list
1236
                      if isinstance(asset, Asset)]
1237
1238
        return asset_list
1239