Completed
Pull Request — master (#858)
by Eddie
01:55
created

zipline.data.DataPortal.get_previous_value()   B

Complexity

Conditions 3

Size

Total Lines 34

Duplication

Lines 0
Ratio 0 %
Metric Value
cc 3
dl 0
loc 34
rs 8.8571
1
#
2
# Copyright 2015 Quantopian, Inc.
3
#
4
# Licensed under the Apache License, Version 2.0 (the "License");
5
# you may not use this file except in compliance with the License.
6
# You may obtain a copy of the License at
7
#
8
#     http://www.apache.org/licenses/LICENSE-2.0
9
#
10
# Unless required by applicable law or agreed to in writing, software
11
# distributed under the License is distributed on an "AS IS" BASIS,
12
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13
# See the License for the specific language governing permissions and
14
# limitations under the License.
15
16
import bcolz
17
from logbook import Logger
18
19
import numpy as np
20
import pandas as pd
21
from pandas.tslib import normalize_date
22
from six import iteritems
23
24
from zipline.assets import Asset, Future, Equity
25
from zipline.data.us_equity_pricing import NoDataOnDate
26
27
from zipline.utils import tradingcalendar
28
from zipline.errors import (
29
    NoTradeDataAvailableTooEarly,
30
    NoTradeDataAvailableTooLate
31
)
32
33
log = Logger('DataPortal')
34
35
BASE_FIELDS = {
36
    'open': 'open',
37
    'open_price': 'open',
38
    'high': 'high',
39
    'low': 'low',
40
    'close': 'close',
41
    'close_price': 'close',
42
    'volume': 'volume',
43
    'price': 'close'
44
}
45
46
47
class DataPortal(object):
48
    def __init__(self,
49
                 env,
50
                 equity_daily_reader=None,
51
                 equity_minute_reader=None,
52
                 future_daily_reader=None,
53
                 future_minute_reader=None,
54
                 adjustment_reader=None):
55
        self.env = env
56
57
        # This is a bit ugly, but is here for performance reasons.  In minute
58
        # simulations, we need to very quickly go from dt -> (# of minutes
59
        # since Jan 1 2002 9:30 Eastern).
60
        #
61
        # The clock that heartbeats the simulation has all the necessary
62
        # information to do this calculation very quickly.  This value is
63
        # calculated there, and then set here
64
        self.cur_data_offset = 0
65
66
        self.views = {}
67
68
        self._asset_finder = env.asset_finder
69
70
        self._carrays = {
71
            'open': {},
72
            'high': {},
73
            'low': {},
74
            'close': {},
75
            'volume': {},
76
            'sid': {},
77
            'dt': {},
78
        }
79
80
        self._adjustment_reader = adjustment_reader
81
82
        # caches of sid -> adjustment list
83
        self._splits_dict = {}
84
        self._mergers_dict = {}
85
        self._dividends_dict = {}
86
87
        # Cache of sid -> the first trading day of an asset, even if that day
88
        # is before 1/2/2002.
89
        self._asset_start_dates = {}
90
        self._asset_end_dates = {}
91
92
        # Handle extra sources, like Fetcher.
93
        self._augmented_sources_map = {}
94
        self._extra_source_df = None
95
96
        self.MINUTE_PRICE_ADJUSTMENT_FACTOR = 0.001
97
98
        self._equity_daily_reader = equity_daily_reader
99
        self._equity_minute_reader = equity_minute_reader
100
        self._future_daily_reader = future_daily_reader
101
        self._future_minute_reader = future_minute_reader
102
103
        # The following values are used by _minute_offset to calculate the
104
        # index into the minute bcolz date.
105
106
        # A lookup of table every minute to the corresponding day, to avoid
107
        # calling `.date()` on every lookup.
108
        self._minutes_to_day = {}
109
        # A map of days (keyed by midnight) to a DatetimeIndex of market
110
        # minutes for that day.
111
        self._minutes_by_day = {}
112
        # A dict of day to the offset into the minute bcolz on which that
113
        # days data starts.
114
        self._day_offsets = None
115
116
    def handle_extra_source(self, source_df, sim_params):
117
        """
118
        Extra sources always have a sid column.
119
120
        We expand the given data (by forward filling) to the full range of
121
        the simulation dates, so that lookup is fast during simulation.
122
        """
123
        if source_df is None:
124
            return
125
126
        self._extra_source_df = source_df
127
128
        # source_df's sid column can either consist of assets we know about
129
        # (such as sid(24)) or of assets we don't know about (such as
130
        # palladium).
131
        #
132
        # In both cases, we break up the dataframe into individual dfs
133
        # that only contain a single asset's information.  ie, if source_df
134
        # has data for PALLADIUM and GOLD, we split source_df into two
135
        # dataframes, one for each. (same applies if source_df has data for
136
        # AAPL and IBM).
137
        #
138
        # We then take each child df and reindex it to the simulation's date
139
        # range by forward-filling missing values. this makes reads simpler.
140
        #
141
        # Finally, we store the data. For each column, we store a mapping in
142
        # self.augmented_sources_map from the column to a dictionary of
143
        # asset -> df.  In other words,
144
        # self.augmented_sources_map['days_to_cover']['AAPL'] gives us the df
145
        # holding that data.
146
147
        if sim_params.emission_rate == "daily":
148
            source_date_index = self.env.days_in_range(
149
                start=sim_params.period_start,
150
                end=sim_params.period_end
151
            )
152
        else:
153
            source_date_index = self.env.minutes_for_days_in_range(
154
                start=sim_params.period_start,
155
                end=sim_params.period_end
156
            )
157
158
        # Break the source_df up into one dataframe per sid.  This lets
159
        # us (more easily) calculate accurate start/end dates for each sid,
160
        # de-dup data, and expand the data to fit the backtest start/end date.
161
        grouped_by_sid = source_df.groupby(["sid"])
162
        group_names = grouped_by_sid.groups.keys()
163
        group_dict = {}
164
        for group_name in group_names:
165
            group_dict[group_name] = grouped_by_sid.get_group(group_name)
166
167
        for identifier, df in iteritems(group_dict):
168
            # Before reindexing, save the earliest and latest dates
169
            earliest_date = df.index[0]
170
            latest_date = df.index[-1]
171
172
            # Since we know this df only contains a single sid, we can safely
173
            # de-dupe by the index (dt)
174
            df = df.groupby(level=0).last()
175
176
            # Reindex the dataframe based on the backtest start/end date.
177
            # This makes reads easier during the backtest.
178
            df = df.reindex(index=source_date_index, method='ffill')
179
180
            if not isinstance(identifier, Asset):
181
                # for fake assets we need to store a start/end date
182
                self._asset_start_dates[identifier] = earliest_date
183
                self._asset_end_dates[identifier] = latest_date
184
185
            for col_name in df.columns.difference(['sid']):
186
                if col_name not in self._augmented_sources_map:
187
                    self._augmented_sources_map[col_name] = {}
188
189
                self._augmented_sources_map[col_name][identifier] = df
190
191
    def _open_minute_file(self, field, asset):
192
        sid_str = str(int(asset))
193
194
        try:
195
            carray = self._carrays[field][sid_str]
196
        except KeyError:
197
            carray = self._carrays[field][sid_str] = \
198
                self._get_ctable(asset)[field]
199
200
        return carray
201
202
    def _get_ctable(self, asset):
203
        sid = int(asset)
204
205
        if isinstance(asset, Future):
206
            if self._future_minute_reader.sid_path_func is not None:
207
                path = self._future_minute_reader.sid_path_func(
208
                    self._future_minute_reader.rootdir, sid
209
                )
210
            else:
211
                path = "{0}/{1}.bcolz".format(
212
                    self._future_minute_reader.rootdir, sid)
213
        elif isinstance(asset, Equity):
214
            if self._equity_minute_reader.sid_path_func is not None:
215
                path = self._equity_minute_reader.sid_path_func(
216
                    self._equity_minute_reader.rootdir, sid
217
                )
218
            else:
219
                path = "{0}/{1}.bcolz".format(
220
                    self._equity_minute_reader.rootdir, sid)
221
222
        else:
223
            # TODO: Figure out if assets should be allowed if neither, and
224
            # why this code path is being hit.
225
            if self._equity_minute_reader.sid_path_func is not None:
226
                path = self._equity_minute_reader.sid_path_func(
227
                    self._equity_minute_reader.rootdir, sid
228
                )
229
            else:
230
                path = "{0}/{1}.bcolz".format(
231
                    self._equity_minute_reader.rootdir, sid)
232
233
        return bcolz.open(path, mode='r')
234
235
    def get_previous_value(self, asset, field, dt, data_frequency):
236
        """
237
        Given an asset and a column and a dt, returns the previous value for
238
        the same asset/column pair.  If this data portal is in minute mode,
239
        it's the previous minute value, otherwise it's the previous day's
240
        value.
241
242
        Parameters
243
        ---------
244
        asset : Asset
245
            The asset whose data is desired.
246
247
        field: string
248
            The desired field of the asset.  Valid values are "open",
249
            "open_price", "high", "low", "close", "close_price", "volume", and
250
            "price".
251
252
        dt: pd.Timestamp
253
            The timestamp from which to go back in time one slot.
254
255
        data_frequency: string
256
            The frequency of the data to query; i.e. whether the data is
257
            'daily' or 'minute' bars
258
259
        Returns
260
        -------
261
        The value of the desired field at the desired time.
262
        """
263
        if data_frequency == 'daily':
264
            prev_dt = self.env.previous_trading_day(dt)
265
        elif data_frequency == 'minute':
266
            prev_dt = self.env.previous_market_minute(dt)
267
268
        return self.get_spot_value(asset, field, prev_dt, data_frequency)
269
270
    def _check_extra_sources(self, asset, column, day):
271
        # If we have an extra source with a column called "price", only look
272
        # at it if it's on something like palladium and not AAPL (since our
273
        # own price data always wins when dealing with assets).
274
        look_in_augmented_sources = column in self._augmented_sources_map and \
275
            not (column in BASE_FIELDS and isinstance(asset, Asset))
276
277
        if look_in_augmented_sources:
278
            # we're being asked for a field in an extra source
279
            try:
280
                return self._augmented_sources_map[column][asset].\
281
                    loc[day, column]
282
            except:
283
                log.error(
284
                    "Could not find value for asset={0}, day={1},"
285
                    "column={2}".format(
286
                        str(asset),
287
                        str(day),
288
                        str(column)))
289
290
                raise KeyError
291
292
    def get_spot_value(self, asset, field, dt, data_frequency):
293
        """
294
        Public API method that returns a scalar value representing the value
295
        of the desired asset's field at either the given dt.
296
297
        Parameters
298
        ---------
299
        asset : Asset
300
            The asset whose data is desired.gith
301
302
        field: string
303
            The desired field of the asset.  Valid values are "open",
304
            "open_price", "high", "low", "close", "close_price", "volume", and
305
            "price".
306
307
        dt: pd.Timestamp
308
            The timestamp for the desired value.
309
310
        data_frequency: string
311
            The frequency of the data to query; i.e. whether the data is
312
            'daily' or 'minute' bars
313
314
        Returns
315
        -------
316
        The value of the desired field at the desired time.
317
        """
318
        extra_source_val = self._check_extra_sources(
319
            asset,
320
            field,
321
            dt,
322
        )
323
324
        if extra_source_val is not None:
325
            return extra_source_val
326
327
        if field not in BASE_FIELDS:
328
            raise KeyError("Invalid column: " + str(field))
329
330
        column_to_use = BASE_FIELDS[field]
331
332
        if isinstance(asset, int):
333
            asset = self._asset_finder.retrieve_asset(asset)
334
335
        self._check_is_currently_alive(asset, dt)
336
337
        if data_frequency == "daily":
338
            day_to_use = dt
339
            day_to_use = normalize_date(day_to_use)
340
            return self._get_daily_data(asset, column_to_use, day_to_use)
341
        else:
342
            if isinstance(asset, Future):
343
                return self._get_minute_spot_value_future(
344
                    asset, column_to_use, dt)
345
            else:
346
                return self._get_minute_spot_value(
347
                    asset, column_to_use, dt)
348
349
    def _get_minute_spot_value_future(self, asset, column, dt):
350
        # Futures bcolz files have 1440 bars per day (24 hours), 7 days a week.
351
        # The file attributes contain the "start_dt" and "last_dt" fields,
352
        # which represent the time period for this bcolz file.
353
354
        # The start_dt is midnight of the first day that this future started
355
        # trading.
356
357
        # figure out the # of minutes between dt and this asset's start_dt
358
        start_date = self._get_asset_start_date(asset)
359
        minute_offset = int((dt - start_date).total_seconds() / 60)
360
361
        if minute_offset < 0:
362
            # asking for a date that is before the asset's start date, no dice
363
            return 0.0
364
365
        # then just index into the bcolz carray at that offset
366
        carray = self._open_minute_file(column, asset)
367
        result = carray[minute_offset]
368
369
        # if there's missing data, go backwards until we run out of file
370
        while result == 0 and minute_offset > 0:
371
            minute_offset -= 1
372
            result = carray[minute_offset]
373
374
        if column != 'volume':
375
            return result * self.MINUTE_PRICE_ADJUSTMENT_FACTOR
376
        else:
377
            return result
378
379
    def setup_offset_cache(self, minutes_by_day, minutes_to_day, trading_days):
380
        # TODO: This case should not be hit, but is when tests are setup
381
        # with data_frequency of daily, but run with minutely.
382
        if self._equity_minute_reader is None:
383
            return
384
385
        self._minutes_to_day = minutes_to_day
386
        self._minutes_by_day = minutes_by_day
387
        start = trading_days[0]
388
        first_trading_day_idx = self._equity_minute_reader.trading_days.\
389
            searchsorted(start)
390
        self._day_offsets = {
391
            day: (i + first_trading_day_idx) * 390
392
            for i, day in enumerate(trading_days)}
393
394
    def _minute_offset(self, dt):
395
        if self._day_offsets is not None:
396
            try:
397
                day = self._minutes_to_day[dt]
398
                minutes = self._minutes_by_day[day]
399
                return self._day_offsets[day] + minutes.get_loc(dt)
400
            except KeyError:
401
                return None
402
403
    def _get_minute_spot_value(self, asset, column, dt):
404
        # if dt is before the first market minute, minute_index
405
        # will be 0.  if it's after the last market minute, it'll
406
        # be len(minutes_for_day)
407
        minute_offset_to_use = self._minute_offset(dt)
408
409
        if minute_offset_to_use is None:
410
            given_day = pd.Timestamp(dt.date(), tz='utc')
411
            day_index = self._equity_minute_reader.trading_days.searchsorted(
412
                given_day)
413
414
            # if dt is before the first market minute, minute_index
415
            # will be 0.  if it's after the last market minute, it'll
416
            # be len(minutes_for_day)
417
            minute_index = self.env.market_minutes_for_day(given_day).\
418
                searchsorted(dt)
419
420
            minute_offset_to_use = (day_index * 390) + minute_index
421
422
        carray = self._equity_minute_reader._open_minute_file(column, asset)
423
        result = carray[minute_offset_to_use]
424
425
        if result == 0:
426
            # if the given minute doesn't have data, we need to seek
427
            # backwards until we find data. This makes the data
428
            # forward-filled.
429
430
            # get this asset's start date, so that we don't look before it.
431
            start_date = self._get_asset_start_date(asset)
432
            start_date_idx = self._equity_minute_reader.trading_days.\
433
                searchsorted(start_date)
434
            start_day_offset = start_date_idx * 390
435
436
            original_start = minute_offset_to_use
437
438
            while result == 0 and minute_offset_to_use > start_day_offset:
439
                minute_offset_to_use -= 1
440
                result = carray[minute_offset_to_use]
441
442
            # once we've found data, we need to check whether it needs
443
            # to be adjusted.
444
            if result != 0:
445
                minutes = self.env.market_minute_window(
446
                    start=dt,
447
                    count=(original_start - minute_offset_to_use + 1),
448
                    step=-1
449
                ).order()
450
451
                # only need to check for adjustments if we've gone back
452
                # far enough to cross the day boundary.
453
                if minutes[0].date() != minutes[-1].date():
454
                    # create a np array of size minutes, fill it all with
455
                    # the same value.  and adjust the array.
456
                    arr = np.array([result] * len(minutes),
457
                                   dtype=np.float64)
458
                    self._apply_all_adjustments(
459
                        data=arr,
460
                        asset=asset,
461
                        dts=minutes,
462
                        field=column
463
                    )
464
465
                    # The first value of the adjusted array is the value
466
                    # we want.
467
                    result = arr[0]
468
469
        if column != 'volume':
470
            return result * self.MINUTE_PRICE_ADJUSTMENT_FACTOR
471
        else:
472
            return result
473
474
    def _get_daily_data(self, asset, column, dt):
475
        while True:
476
            try:
477
                value = self._equity_daily_reader.spot_price(asset, dt, column)
478
                if value != -1:
479
                    return value
480
                else:
481
                    dt -= tradingcalendar.trading_day
482
            except NoDataOnDate:
483
                return 0
484
485
    def _get_history_daily_window(self, assets, end_dt, bar_count,
486
                                  field_to_use):
487
        """
488
        Internal method that returns a dataframe containing history bars
489
        of daily frequency for the given sids.
490
        """
491
        day_idx = tradingcalendar.trading_days.searchsorted(end_dt.date())
492
        days_for_window = tradingcalendar.trading_days[
493
            (day_idx - bar_count + 1):(day_idx + 1)]
494
495
        if len(assets) == 0:
496
            return pd.DataFrame(None,
497
                                index=days_for_window,
498
                                columns=None)
499
500
        data = []
501
502
        for asset in assets:
503
            if isinstance(asset, Future):
504
                data.append(self._get_history_daily_window_future(
505
                    asset, days_for_window, end_dt, field_to_use
506
                ))
507
            else:
508
                data.append(self._get_history_daily_window_equity(
509
                    asset, days_for_window, end_dt, field_to_use
510
                ))
511
512
        return pd.DataFrame(
513
            np.array(data).T,
514
            index=days_for_window,
515
            columns=assets
516
        )
517
518
    def _get_history_daily_window_future(self, asset, days_for_window,
519
                                         end_dt, column):
520
        # Since we don't have daily bcolz files for futures (yet), use minute
521
        # bars to calculate the daily values.
522
        data = []
523
        data_groups = []
524
525
        # get all the minutes for the days NOT including today
526
        for day in days_for_window[:-1]:
527
            minutes = self.env.market_minutes_for_day(day)
528
529
            values_for_day = np.zeros(len(minutes), dtype=np.float64)
530
531
            for idx, minute in enumerate(minutes):
532
                minute_val = self._get_minute_spot_value_future(
533
                    asset, column, minute
534
                )
535
536
                values_for_day[idx] = minute_val
537
538
            data_groups.append(values_for_day)
539
540
        # get the minutes for today
541
        last_day_minutes = pd.date_range(
542
            start=self.env.get_open_and_close(end_dt)[0],
543
            end=end_dt,
544
            freq="T"
545
        )
546
547
        values_for_last_day = np.zeros(len(last_day_minutes), dtype=np.float64)
548
549
        for idx, minute in enumerate(last_day_minutes):
550
            minute_val = self._get_minute_spot_value_future(
551
                asset, column, minute
552
            )
553
554
            values_for_last_day[idx] = minute_val
555
556
        data_groups.append(values_for_last_day)
557
558
        for group in data_groups:
559
            if len(group) == 0:
560
                continue
561
562
            if column == 'volume':
563
                data.append(np.sum(group))
564
            elif column == 'open':
565
                data.append(group[0])
566
            elif column == 'close':
567
                data.append(group[-1])
568
            elif column == 'high':
569
                data.append(np.amax(group))
570
            elif column == 'low':
571
                data.append(np.amin(group))
572
573
        return data
574
575
    def _get_history_daily_window_equity(self, asset, days_for_window,
576
                                         end_dt, field_to_use):
577
        sid = int(asset)
578
        ends_at_midnight = end_dt.hour == 0 and end_dt.minute == 0
579
580
        # get the start and end dates for this sid
581
        end_date = self._get_asset_end_date(asset)
582
583
        if ends_at_midnight or (days_for_window[-1] > end_date):
584
            # two cases where we use daily data for the whole range:
585
            # 1) the history window ends at midnight utc.
586
            # 2) the last desired day of the window is after the
587
            # last trading day, use daily data for the whole range.
588
            return self._get_daily_window_for_sid(
589
                asset,
590
                field_to_use,
591
                days_for_window,
592
                extra_slot=False
593
            )
594
        else:
595
            # for the last day of the desired window, use minute
596
            # data and aggregate it.
597
            all_minutes_for_day = self.env.market_minutes_for_day(
598
                pd.Timestamp(end_dt.date()))
599
600
            last_minute_idx = all_minutes_for_day.searchsorted(end_dt)
601
602
            # these are the minutes for the partial day
603
            minutes_for_partial_day =\
604
                all_minutes_for_day[0:(last_minute_idx + 1)]
605
606
            daily_data = self._get_daily_window_for_sid(
607
                sid,
608
                field_to_use,
609
                days_for_window[0:-1]
610
            )
611
612
            minute_data = self._get_minute_window_for_equity(
613
                sid,
614
                field_to_use,
615
                minutes_for_partial_day
616
            )
617
618
            if field_to_use == 'volume':
619
                minute_value = np.sum(minute_data)
620
            elif field_to_use == 'open':
621
                minute_value = minute_data[0]
622
            elif field_to_use == 'close':
623
                minute_value = minute_data[-1]
624
            elif field_to_use == 'high':
625
                minute_value = np.amax(minute_data)
626
            elif field_to_use == 'low':
627
                minute_value = np.amin(minute_data)
628
629
            # append the partial day.
630
            daily_data[-1] = minute_value
631
632
            return daily_data
633
634
    def _get_history_minute_window(self, assets, end_dt, bar_count,
635
                                   field_to_use):
636
        """
637
        Internal method that returns a dataframe containing history bars
638
        of minute frequency for the given sids.
639
        """
640
        # get all the minutes for this window
641
        minutes_for_window = self.env.market_minute_window(
642
            end_dt, bar_count, step=-1)[::-1]
643
644
        first_trading_day = self._equity_minute_reader.first_trading_day
645
646
        # but then cut it down to only the minutes after
647
        # the first trading day.
648
        modified_minutes_for_window = minutes_for_window[
649
            minutes_for_window.slice_indexer(first_trading_day)]
650
651
        modified_minutes_length = len(modified_minutes_for_window)
652
653
        if modified_minutes_length == 0:
654
            raise ValueError("Cannot calculate history window that ends"
655
                             "before 2002-01-02 14:31 UTC!")
656
657
        data = []
658
        bars_to_prepend = 0
659
        nans_to_prepend = None
660
661
        if modified_minutes_length < bar_count:
662
            first_trading_date = first_trading_day.date()
663
            if modified_minutes_for_window[0].date() == first_trading_date:
664
                # the beginning of the window goes before our global trading
665
                # start date
666
                bars_to_prepend = bar_count - modified_minutes_length
667
                nans_to_prepend = np.repeat(np.nan, bars_to_prepend)
668
669
        if len(assets) == 0:
670
            return pd.DataFrame(
671
                None,
672
                index=modified_minutes_for_window,
673
                columns=None
674
            )
675
676
        for asset in assets:
677
            asset_minute_data = self._get_minute_window_for_asset(
678
                asset,
679
                field_to_use,
680
                modified_minutes_for_window
681
            )
682
683
            if bars_to_prepend != 0:
684
                asset_minute_data = np.insert(asset_minute_data, 0,
685
                                              nans_to_prepend)
686
687
            data.append(asset_minute_data)
688
689
        return pd.DataFrame(
690
            np.array(data).T,
691
            index=minutes_for_window,
692
            columns=assets
693
        )
694
695
    def get_history_window(self, assets, end_dt, bar_count, frequency, field,
696
                           ffill=True):
697
        """
698
        Public API method that returns a dataframe containing the requested
699
        history window.  Data is fully adjusted.
700
701
        Parameters
702
        ---------
703
        assets : list of zipline.data.Asset objects
704
            The assets whose data is desired.
705
706
        bar_count: int
707
            The number of bars desired.
708
709
        frequency: string
710
            "1d" or "1m"
711
712
        field: string
713
            The desired field of the asset.
714
715
        ffill: boolean
716
            Forward-fill missing values. Only has effect if field
717
            is 'price'.
718
719
        Returns
720
        -------
721
        A dataframe containing the requested data.
722
        """
723
        try:
724
            field_to_use = BASE_FIELDS[field]
725
        except KeyError:
726
            raise ValueError("Invalid history field: " + str(field))
727
728
        # sanity check in case sids were passed in
729
        assets = [(self.env.asset_finder.retrieve_asset(asset) if
730
                   isinstance(asset, int) else asset) for asset in assets]
731
732
        if frequency == "1d":
733
            df = self._get_history_daily_window(assets, end_dt, bar_count,
734
                                                field_to_use)
735
        elif frequency == "1m":
736
            df = self._get_history_minute_window(assets, end_dt, bar_count,
737
                                                 field_to_use)
738
        else:
739
            raise ValueError("Invalid frequency: {0}".format(frequency))
740
741
        # forward-fill if needed
742
        if field == "price" and ffill:
743
            df.fillna(method='ffill', inplace=True)
744
745
        return df
746
747
    def _get_minute_window_for_asset(self, asset, field, minutes_for_window):
748
        """
749
        Internal method that gets a window of adjusted minute data for an asset
750
        and specified date range.  Used to support the history API method for
751
        minute bars.
752
753
        Missing bars are filled with NaN.
754
755
        Parameters
756
        ----------
757
        asset : Asset
758
            The asset whose data is desired.
759
760
        field: string
761
            The specific field to return.  "open", "high", "close_price", etc.
762
763
        minutes_for_window: pd.DateTimeIndex
764
            The list of minutes representing the desired window.  Each minute
765
            is a pd.Timestamp.
766
767
        Returns
768
        -------
769
        A numpy array with requested values.
770
        """
771
        if isinstance(asset, int):
772
            asset = self.env.asset_finder.retrieve_asset(asset)
773
774
        if isinstance(asset, Future):
775
            return self._get_minute_window_for_future(asset, field,
776
                                                      minutes_for_window)
777
        else:
778
            return self._get_minute_window_for_equity(asset, field,
779
                                                      minutes_for_window)
780
781
    def _get_minute_window_for_future(self, asset, field, minutes_for_window):
782
        # THIS IS TEMPORARY.  For now, we are only exposing futures within
783
        # equity trading hours (9:30 am to 4pm, Eastern).  The easiest way to
784
        # do this is to simply do a spot lookup for each desired minute.
785
        return_data = np.zeros(len(minutes_for_window), dtype=np.float64)
786
        for idx, minute in enumerate(minutes_for_window):
787
            return_data[idx] = \
788
                self._get_minute_spot_value_future(asset, field, minute)
789
790
        # Note: an improvement could be to find the consecutive runs within
791
        # minutes_for_window, and use them to read the underlying ctable
792
        # more efficiently.
793
794
        # Once futures are on 24-hour clock, then we can just grab all the
795
        # requested minutes in one shot from the ctable.
796
797
        # no adjustments for futures, yay.
798
        return return_data
799
800
    def _get_minute_window_for_equity(self, asset, field, minutes_for_window):
801
        # each sid's minutes are stored in a bcolz file
802
        # the bcolz file has 390 bars per day, starting at 1/2/2002, regardless
803
        # of when the asset started trading and regardless of half days.
804
        # for a half day, the second half is filled with zeroes.
805
806
        # find the position of start_dt in the entire timeline, go back
807
        # bar_count bars, and that's the unadjusted data
808
        raw_data = self._equity_minute_reader._open_minute_file(field, asset)
809
810
        start_idx = max(
811
            self._equity_minute_reader._find_position_of_minute(
812
                minutes_for_window[0]),
813
            0)
814
        end_idx = self._equity_minute_reader._find_position_of_minute(
815
            minutes_for_window[-1]) + 1
816
817
        if end_idx == 0:
818
            # No data to return for minute window.
819
            return np.full(len(minutes_for_window), np.nan)
820
821
        return_data = np.zeros(len(minutes_for_window), dtype=np.float64)
822
823
        data_to_copy = raw_data[start_idx:end_idx]
824
825
        num_minutes = len(minutes_for_window)
826
827
        # data_to_copy contains all the zeros (from 1pm to 4pm of an early
828
        # close).  num_minutes is the number of actual trading minutes.  if
829
        # these two have different lengths, that means that we need to trim
830
        # away data due to early closes.
831
        if len(data_to_copy) != num_minutes:
832
            # get a copy of the minutes in Eastern time, since we depend on
833
            # an early close being at 1pm Eastern.
834
            eastern_minutes = minutes_for_window.tz_convert("US/Eastern")
835
836
            # accumulate a list of indices of the last minute of an early
837
            # close day.  For example, if data_to_copy starts at 12:55 pm, and
838
            # there are five minutes of real data before 180 zeroes, we would
839
            # put 5 into last_minute_idx_of_early_close_day, because the fifth
840
            # minute is the last "real" minute of the day.
841
            last_minute_idx_of_early_close_day = []
842
            for minute_idx, minute_dt in enumerate(eastern_minutes):
843
                if minute_idx == (num_minutes - 1):
844
                    break
845
846
                if minute_dt.hour == 13 and minute_dt.minute == 0:
847
                    next_minute = eastern_minutes[minute_idx + 1]
848
                    if next_minute.hour != 13:
849
                        # minute_dt is the last minute of an early close day
850
                        last_minute_idx_of_early_close_day.append(minute_idx)
851
852
            # spin through the list of early close markers, and use them to
853
            # chop off 180 minutes at a time from data_to_copy.
854
            for idx, early_close_minute_idx in \
855
                    enumerate(last_minute_idx_of_early_close_day):
856
                early_close_minute_idx -= (180 * idx)
857
                data_to_copy = np.delete(
858
                    data_to_copy,
859
                    range(
860
                        early_close_minute_idx + 1,
861
                        early_close_minute_idx + 181
862
                    )
863
                )
864
865
        return_data[0:len(data_to_copy)] = data_to_copy
866
867
        self._apply_all_adjustments(
868
            return_data,
869
            asset,
870
            minutes_for_window,
871
            field,
872
            self.MINUTE_PRICE_ADJUSTMENT_FACTOR
873
        )
874
875
        return return_data
876
877
    def _apply_all_adjustments(self, data, asset, dts, field,
878
                               price_adj_factor=1.0):
879
        """
880
        Internal method that applies all the necessary adjustments on the
881
        given data array.
882
883
        The adjustments are:
884
        - splits
885
        - if field != "volume":
886
            - mergers
887
            - dividends
888
            - * 0.001
889
            - any zero fields replaced with NaN
890
        - all values rounded to 3 digits after the decimal point.
891
892
        Parameters
893
        ----------
894
        data : np.array
895
            The data to be adjusted.
896
897
        asset: Asset
898
            The asset whose data is being adjusted.
899
900
        dts: pd.DateTimeIndex
901
            The list of minutes or days representing the desired window.
902
903
        field: string
904
            The field whose values are in the data array.
905
906
        price_adj_factor: float
907
            Factor with which to adjust OHLC values.
908
        Returns
909
        -------
910
        None.  The data array is modified in place.
911
        """
912
        self._apply_adjustments_to_window(
913
            self._get_adjustment_list(
914
                asset, self._splits_dict, "SPLITS"
915
            ),
916
            data,
917
            dts,
918
            field != 'volume'
919
        )
920
921
        if field != 'volume':
922
            self._apply_adjustments_to_window(
923
                self._get_adjustment_list(
924
                    asset, self._mergers_dict, "MERGERS"
925
                ),
926
                data,
927
                dts,
928
                True
929
            )
930
931
            self._apply_adjustments_to_window(
932
                self._get_adjustment_list(
933
                    asset, self._dividends_dict, "DIVIDENDS"
934
                ),
935
                data,
936
                dts,
937
                True
938
            )
939
940
            data *= price_adj_factor
941
942
            # if anything is zero, it's a missing bar, so replace it with NaN.
943
            # we only want to do this for non-volume fields, because a missing
944
            # volume should be 0.
945
            data[data == 0] = np.NaN
946
947
        np.around(data, 3, out=data)
948
949
    def _get_daily_window_for_sid(self, asset, field, days_in_window,
950
                                  extra_slot=True):
951
        """
952
        Internal method that gets a window of adjusted daily data for a sid
953
        and specified date range.  Used to support the history API method for
954
        daily bars.
955
956
        Parameters
957
        ----------
958
        asset : Asset
959
            The asset whose data is desired.
960
961
        start_dt: pandas.Timestamp
962
            The start of the desired window of data.
963
964
        bar_count: int
965
            The number of days of data to return.
966
967
        field: string
968
            The specific field to return.  "open", "high", "close_price", etc.
969
970
        extra_slot: boolean
971
            Whether to allocate an extra slot in the returned numpy array.
972
            This extra slot will hold the data for the last partial day.  It's
973
            much better to create it here than to create a copy of the array
974
            later just to add a slot.
975
976
        Returns
977
        -------
978
        A numpy array with requested values.  Any missing slots filled with
979
        nan.
980
981
        """
982
        bar_count = len(days_in_window)
983
        # create an np.array of size bar_count
984
        if extra_slot:
985
            return_array = np.zeros((bar_count + 1,))
986
        else:
987
            return_array = np.zeros((bar_count,))
988
989
        return_array[:] = np.NAN
990
991
        start_date = self._get_asset_start_date(asset)
992
        end_date = self._get_asset_end_date(asset)
993
        day_slice = days_in_window.slice_indexer(start_date, end_date)
994
        active_days = days_in_window[day_slice]
995
996
        if active_days.shape[0]:
997
            data = self._equity_daily_reader.history_window(field,
998
                                                            active_days[0],
999
                                                            active_days[-1],
1000
                                                            asset)
1001
            return_array[day_slice] = data
1002
            self._apply_all_adjustments(
1003
                return_array,
1004
                asset,
1005
                active_days,
1006
                field,
1007
            )
1008
1009
        return return_array
1010
1011
    @staticmethod
1012
    def _apply_adjustments_to_window(adjustments_list, window_data,
1013
                                     dts_in_window, multiply):
1014
        if len(adjustments_list) == 0:
1015
            return
1016
1017
        # advance idx to the correct spot in the adjustments list, based on
1018
        # when the window starts
1019
        idx = 0
1020
1021
        while idx < len(adjustments_list) and dts_in_window[0] >\
1022
                adjustments_list[idx][0]:
1023
            idx += 1
1024
1025
        # if we've advanced through all the adjustments, then there's nothing
1026
        # to do.
1027
        if idx == len(adjustments_list):
1028
            return
1029
1030
        while idx < len(adjustments_list):
1031
            adjustment_to_apply = adjustments_list[idx]
1032
1033
            if adjustment_to_apply[0] > dts_in_window[-1]:
1034
                break
1035
1036
            range_end = dts_in_window.searchsorted(adjustment_to_apply[0])
1037
            if multiply:
1038
                window_data[0:range_end] *= adjustment_to_apply[1]
1039
            else:
1040
                window_data[0:range_end] /= adjustment_to_apply[1]
1041
1042
            idx += 1
1043
1044
    def _get_adjustment_list(self, asset, adjustments_dict, table_name):
1045
        """
1046
        Internal method that returns a list of adjustments for the given sid.
1047
1048
        Parameters
1049
        ----------
1050
        asset : Asset
1051
            The asset for which to return adjustments.
1052
1053
        adjustments_dict: dict
1054
            A dictionary of sid -> list that is used as a cache.
1055
1056
        table_name: string
1057
            The table that contains this data in the adjustments db.
1058
1059
        Returns
1060
        -------
1061
        adjustments: list
1062
            A list of [multiplier, pd.Timestamp], earliest first
1063
1064
        """
1065
        if self._adjustment_reader is None:
1066
            return []
1067
1068
        sid = int(asset)
1069
1070
        try:
1071
            adjustments = adjustments_dict[sid]
1072
        except KeyError:
1073
            adjustments = adjustments_dict[sid] = self._adjustment_reader.\
1074
                get_adjustments_for_sid(table_name, sid)
1075
1076
        return adjustments
1077
1078
    def _check_is_currently_alive(self, asset, dt):
1079
        sid = int(asset)
1080
1081
        if sid not in self._asset_start_dates:
1082
            self._get_asset_start_date(asset)
1083
1084
        start_date = self._asset_start_dates[sid]
1085
        if self._asset_start_dates[sid] > dt:
1086
            raise NoTradeDataAvailableTooEarly(
1087
                sid=sid,
1088
                dt=normalize_date(dt),
1089
                start_dt=start_date
1090
            )
1091
1092
        end_date = self._asset_end_dates[sid]
1093
        if self._asset_end_dates[sid] < dt:
1094
            raise NoTradeDataAvailableTooLate(
1095
                sid=sid,
1096
                dt=normalize_date(dt),
1097
                end_dt=end_date
1098
            )
1099
1100
    def _get_asset_start_date(self, asset):
1101
        self._ensure_asset_dates(asset)
1102
        return self._asset_start_dates[asset]
1103
1104
    def _get_asset_end_date(self, asset):
1105
        self._ensure_asset_dates(asset)
1106
        return self._asset_end_dates[asset]
1107
1108
    def _ensure_asset_dates(self, asset):
1109
        sid = int(asset)
1110
1111
        if sid not in self._asset_start_dates:
1112
            self._asset_start_dates[sid] = asset.start_date
1113
            self._asset_end_dates[sid] = asset.end_date
1114
1115
    def get_splits(self, sids, dt):
1116
        """
1117
        Returns any splits for the given sids and the given dt.
1118
1119
        Parameters
1120
        ----------
1121
        sids : list
1122
            Sids for which we want splits.
1123
1124
        dt: pd.Timestamp
1125
            The date for which we are checking for splits.  Note: this is
1126
            expected to be midnight UTC.
1127
1128
        Returns
1129
        -------
1130
        list: List of splits, where each split is a (sid, ratio) tuple.
1131
        """
1132
        if self._adjustment_reader is None or len(sids) == 0:
1133
            return {}
1134
1135
        # convert dt to # of seconds since epoch, because that's what we use
1136
        # in the adjustments db
1137
        seconds = int(dt.value / 1e9)
1138
1139
        splits = self._adjustment_reader.conn.execute(
1140
            "SELECT sid, ratio FROM SPLITS WHERE effective_date = ?",
1141
            (seconds,)).fetchall()
1142
1143
        sids_set = set(sids)
1144
        splits = [split for split in splits if split[0] in sids_set]
1145
1146
        return splits
1147
1148
    def get_stock_dividends(self, sid, trading_days):
1149
        """
1150
        Returns all the stock dividends for a specific sid that occur
1151
        in the given trading range.
1152
1153
        Parameters
1154
        ----------
1155
        sid: int
1156
            The asset whose stock dividends should be returned.
1157
1158
        trading_days: pd.DatetimeIndex
1159
            The trading range.
1160
1161
        Returns
1162
        -------
1163
        list: A list of objects with all relevant attributes populated.
1164
        All timestamp fields are converted to pd.Timestamps.
1165
        """
1166
1167
        if self._adjustment_reader is None:
1168
            return []
1169
1170
        if len(trading_days) == 0:
1171
            return []
1172
1173
        start_dt = trading_days[0].value / 1e9
1174
        end_dt = trading_days[-1].value / 1e9
1175
1176
        dividends = self._adjustment_reader.conn.execute(
1177
            "SELECT * FROM stock_dividend_payouts WHERE sid = ? AND "
1178
            "ex_date > ? AND pay_date < ?", (int(sid), start_dt, end_dt,)).\
1179
            fetchall()
1180
1181
        dividend_info = []
1182
        for dividend_tuple in dividends:
1183
            dividend_info.append({
1184
                "declared_date": dividend_tuple[1],
1185
                "ex_date": pd.Timestamp(dividend_tuple[2], unit="s"),
1186
                "pay_date": pd.Timestamp(dividend_tuple[3], unit="s"),
1187
                "payment_sid": dividend_tuple[4],
1188
                "ratio": dividend_tuple[5],
1189
                "record_date": pd.Timestamp(dividend_tuple[6], unit="s"),
1190
                "sid": dividend_tuple[7]
1191
            })
1192
1193
        return dividend_info
1194
1195
    def contains(self, asset, field):
1196
        return field in BASE_FIELDS or \
1197
            (field in self._augmented_sources_map and
1198
             asset in self._augmented_sources_map[field])
1199
1200
    def get_fetcher_assets(self, day):
1201
        """
1202
        Returns a list of assets for the current date, as defined by the
1203
        fetcher data.
1204
1205
        Notes
1206
        -----
1207
        Data is forward-filled.  If there is no fetcher data defined for day
1208
        N, we use day N-1's data (if available, otherwise we keep going back).
1209
1210
        Returns
1211
        -------
1212
        list: a list of Asset objects.
1213
        """
1214
        # return a list of assets for the current date, as defined by the
1215
        # fetcher source
1216
        if self._extra_source_df is None:
1217
            return []
1218
1219
        if day in self._extra_source_df.index:
1220
            date_to_use = day
1221
        else:
1222
            # current day isn't in the fetcher df, go back the last
1223
            # available day
1224
            idx = self._extra_source_df.index.searchsorted(day)
1225
            if idx == 0:
1226
                return []
1227
1228
            date_to_use = self._extra_source_df.index[idx - 1]
1229
1230
        asset_list = self._extra_source_df.loc[date_to_use]["sid"]
1231
1232
        # make sure they're actually assets
1233
        asset_list = [asset for asset in asset_list
1234
                      if isinstance(asset, Asset)]
1235
1236
        return asset_list
1237