Completed
Pull Request — master (#858)
by Eddie
01:40
created

zipline.data.DataPortal.get_spot_value()   B

Complexity

Conditions 6

Size

Total Lines 56

Duplication

Lines 0
Ratio 0 %
Metric Value
cc 6
dl 0
loc 56
rs 7.861

How to fix   Long Method   

Long Method

Small methods make your code easier to understand, in particular if combined with a good name. Besides, if your method is small, finding a good name is usually much easier.

For example, if you find yourself adding comments to a method's body, this is usually a good sign to extract the commented part to a new method, and use the comment as a starting point when coming up with a good name for this new method.

Commonly applied refactorings include:

1
#
2
# Copyright 2015 Quantopian, Inc.
3
#
4
# Licensed under the Apache License, Version 2.0 (the "License");
5
# you may not use this file except in compliance with the License.
6
# You may obtain a copy of the License at
7
#
8
#     http://www.apache.org/licenses/LICENSE-2.0
9
#
10
# Unless required by applicable law or agreed to in writing, software
11
# distributed under the License is distributed on an "AS IS" BASIS,
12
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13
# See the License for the specific language governing permissions and
14
# limitations under the License.
15
16
import bcolz
17
from logbook import Logger
18
19
import numpy as np
20
import pandas as pd
21
from pandas.tslib import normalize_date
22
from six import iteritems
23
24
from zipline.assets import Asset, Future, Equity
25
from zipline.data.us_equity_pricing import NoDataOnDate
26
from zipline.pipeline.data.equity_pricing import USEquityPricing
27
28
from zipline.utils import tradingcalendar
29
from zipline.errors import (
30
    NoTradeDataAvailableTooEarly,
31
    NoTradeDataAvailableTooLate
32
)
33
34
log = Logger('DataPortal')
35
36
BASE_FIELDS = {
37
    'open': 'open',
38
    'open_price': 'open',
39
    'high': 'high',
40
    'low': 'low',
41
    'close': 'close',
42
    'close_price': 'close',
43
    'volume': 'volume',
44
    'price': 'close'
45
}
46
47
48
class DataPortal(object):
49
    def __init__(self,
50
                 env,
51
                 equity_daily_reader=None,
52
                 equity_minute_reader=None,
53
                 future_daily_reader=None,
54
                 future_minute_reader=None,
55
                 adjustment_reader=None):
56
        self.env = env
57
58
        # This is a bit ugly, but is here for performance reasons.  In minute
59
        # simulations, we need to very quickly go from dt -> (# of minutes
60
        # since Jan 1 2002 9:30 Eastern).
61
        #
62
        # The clock that heartbeats the simulation has all the necessary
63
        # information to do this calculation very quickly.  This value is
64
        # calculated there, and then set here
65
        self.cur_data_offset = 0
66
67
        self.views = {}
68
69
        self._asset_finder = env.asset_finder
70
71
        self._carrays = {
72
            'open': {},
73
            'high': {},
74
            'low': {},
75
            'close': {},
76
            'volume': {},
77
            'sid': {},
78
            'dt': {},
79
        }
80
81
        self._adjustment_reader = adjustment_reader
82
83
        # caches of sid -> adjustment list
84
        self._splits_dict = {}
85
        self._mergers_dict = {}
86
        self._dividends_dict = {}
87
88
        # Cache of sid -> the first trading day of an asset, even if that day
89
        # is before 1/2/2002.
90
        self._asset_start_dates = {}
91
        self._asset_end_dates = {}
92
93
        # Handle extra sources, like Fetcher.
94
        self._augmented_sources_map = {}
95
        self._extra_source_df = None
96
97
        self.MINUTE_PRICE_ADJUSTMENT_FACTOR = 0.001
98
99
        self._equity_daily_reader = equity_daily_reader
100
        self._equity_minute_reader = equity_minute_reader
101
        self._future_daily_reader = future_daily_reader
102
        self._future_minute_reader = future_minute_reader
103
104
        # The following values are used by _minute_offset to calculate the
105
        # index into the minute bcolz date.
106
107
        # A lookup of table every minute to the corresponding day, to avoid
108
        # calling `.date()` on every lookup.
109
        self._minutes_to_day = {}
110
        # A map of days (keyed by midnight) to a DatetimeIndex of market
111
        # minutes for that day.
112
        self._minutes_by_day = {}
113
        # A dict of day to the offset into the minute bcolz on which that
114
        # days data starts.
115
        self._day_offsets = None
116
117
    def handle_extra_source(self, source_df, sim_params):
118
        """
119
        Extra sources always have a sid column.
120
121
        We expand the given data (by forward filling) to the full range of
122
        the simulation dates, so that lookup is fast during simulation.
123
        """
124
        if source_df is None:
125
            return
126
127
        self._extra_source_df = source_df
128
129
        # source_df's sid column can either consist of assets we know about
130
        # (such as sid(24)) or of assets we don't know about (such as
131
        # palladium).
132
        #
133
        # In both cases, we break up the dataframe into individual dfs
134
        # that only contain a single asset's information.  ie, if source_df
135
        # has data for PALLADIUM and GOLD, we split source_df into two
136
        # dataframes, one for each. (same applies if source_df has data for
137
        # AAPL and IBM).
138
        #
139
        # We then take each child df and reindex it to the simulation's date
140
        # range by forward-filling missing values. this makes reads simpler.
141
        #
142
        # Finally, we store the data. For each column, we store a mapping in
143
        # self.augmented_sources_map from the column to a dictionary of
144
        # asset -> df.  In other words,
145
        # self.augmented_sources_map['days_to_cover']['AAPL'] gives us the df
146
        # holding that data.
147
148
        if sim_params.emission_rate == "daily":
149
            source_date_index = self.env.days_in_range(
150
                start=sim_params.period_start,
151
                end=sim_params.period_end
152
            )
153
        else:
154
            source_date_index = self.env.minutes_for_days_in_range(
155
                start=sim_params.period_start,
156
                end=sim_params.period_end
157
            )
158
159
        # Break the source_df up into one dataframe per sid.  This lets
160
        # us (more easily) calculate accurate start/end dates for each sid,
161
        # de-dup data, and expand the data to fit the backtest start/end date.
162
        grouped_by_sid = source_df.groupby(["sid"])
163
        group_names = grouped_by_sid.groups.keys()
164
        group_dict = {}
165
        for group_name in group_names:
166
            group_dict[group_name] = grouped_by_sid.get_group(group_name)
167
168
        for identifier, df in iteritems(group_dict):
169
            # Before reindexing, save the earliest and latest dates
170
            earliest_date = df.index[0]
171
            latest_date = df.index[-1]
172
173
            # Since we know this df only contains a single sid, we can safely
174
            # de-dupe by the index (dt)
175
            df = df.groupby(level=0).last()
176
177
            # Reindex the dataframe based on the backtest start/end date.
178
            # This makes reads easier during the backtest.
179
            df = df.reindex(index=source_date_index, method='ffill')
180
181
            if not isinstance(identifier, Asset):
182
                # for fake assets we need to store a start/end date
183
                self._asset_start_dates[identifier] = earliest_date
184
                self._asset_end_dates[identifier] = latest_date
185
186
            for col_name in df.columns.difference(['sid']):
187
                if col_name not in self._augmented_sources_map:
188
                    self._augmented_sources_map[col_name] = {}
189
190
                self._augmented_sources_map[col_name][identifier] = df
191
192
    def _open_minute_file(self, field, asset):
193
        sid_str = str(int(asset))
194
195
        try:
196
            carray = self._carrays[field][sid_str]
197
        except KeyError:
198
            carray = self._carrays[field][sid_str] = \
199
                self._get_ctable(asset)[field]
200
201
        return carray
202
203
    def _get_ctable(self, asset):
204
        sid = int(asset)
205
206
        if isinstance(asset, Future):
207
            if self._future_minute_reader.sid_path_func is not None:
208
                path = self._future_minute_reader.sid_path_func(
209
                    self._future_minute_reader.rootdir, sid
210
                )
211
            else:
212
                path = "{0}/{1}.bcolz".format(
213
                    self._future_minute_reader.rootdir, sid)
214
        elif isinstance(asset, Equity):
215
            if self._equity_minute_reader.sid_path_func is not None:
216
                path = self._equity_minute_reader.sid_path_func(
217
                    self._equity_minute_reader.rootdir, sid
218
                )
219
            else:
220
                path = "{0}/{1}.bcolz".format(
221
                    self._equity_minute_reader.rootdir, sid)
222
223
        else:
224
            # TODO: Figure out if assets should be allowed if neither, and
225
            # why this code path is being hit.
226
            if self._equity_minute_reader.sid_path_func is not None:
227
                path = self._equity_minute_reader.sid_path_func(
228
                    self._equity_minute_reader.rootdir, sid
229
                )
230
            else:
231
                path = "{0}/{1}.bcolz".format(
232
                    self._equity_minute_reader.rootdir, sid)
233
234
        return bcolz.open(path, mode='r')
235
236
    def get_previous_value(self, asset, field, dt, data_frequency):
237
        """
238
        Given an asset and a column and a dt, returns the previous value for
239
        the same asset/column pair.  If this data portal is in minute mode,
240
        it's the previous minute value, otherwise it's the previous day's
241
        value.
242
243
        Parameters
244
        ---------
245
        asset : Asset
246
            The asset whose data is desired.
247
248
        field: string
249
            The desired field of the asset.  Valid values are "open",
250
            "open_price", "high", "low", "close", "close_price", "volume", and
251
            "price".
252
253
        dt: pd.Timestamp
254
            The timestamp from which to go back in time one slot.
255
256
        data_frequency: string
257
            The frequency of the data to query; i.e. whether the data is
258
            'daily' or 'minute' bars
259
260
        Returns
261
        -------
262
        The value of the desired field at the desired time.
263
        """
264
        if data_frequency == 'daily':
265
            prev_dt = self.env.previous_trading_day(dt)
266
        elif data_frequency == 'minute':
267
            prev_dt = self.env.previous_market_minute(dt)
268
269
        return self.get_spot_value(asset, field, prev_dt, data_frequency)
270
271
    def _check_extra_sources(self, asset, column, day):
272
        # If we have an extra source with a column called "price", only look
273
        # at it if it's on something like palladium and not AAPL (since our
274
        # own price data always wins when dealing with assets).
275
        look_in_augmented_sources = column in self._augmented_sources_map and \
276
            not (column in BASE_FIELDS and isinstance(asset, Asset))
277
278
        if look_in_augmented_sources:
279
            # we're being asked for a field in an extra source
280
            try:
281
                return self._augmented_sources_map[column][asset].\
282
                    loc[day, column]
283
            except:
284
                log.error(
285
                    "Could not find value for asset={0}, day={1},"
286
                    "column={2}".format(
287
                        str(asset),
288
                        str(day),
289
                        str(column)))
290
291
                raise KeyError
292
293
    def get_spot_value(self, asset, field, dt, data_frequency):
294
        """
295
        Public API method that returns a scalar value representing the value
296
        of the desired asset's field at either the given dt.
297
298
        Parameters
299
        ---------
300
        asset : Asset
301
            The asset whose data is desired.gith
302
303
        field: string
304
            The desired field of the asset.  Valid values are "open",
305
            "open_price", "high", "low", "close", "close_price", "volume", and
306
            "price".
307
308
        dt: pd.Timestamp
309
            The timestamp for the desired value.
310
311
        data_frequency: string
312
            The frequency of the data to query; i.e. whether the data is
313
            'daily' or 'minute' bars
314
315
        Returns
316
        -------
317
        The value of the desired field at the desired time.
318
        """
319
        extra_source_val = self._check_extra_sources(
320
            asset,
321
            field,
322
            dt,
323
        )
324
325
        if extra_source_val is not None:
326
            return extra_source_val
327
328
        if field not in BASE_FIELDS:
329
            raise KeyError("Invalid column: " + str(field))
330
331
        column_to_use = BASE_FIELDS[field]
332
333
        if isinstance(asset, int):
334
            asset = self._asset_finder.retrieve_asset(asset)
335
336
        self._check_is_currently_alive(asset, dt)
337
338
        if data_frequency == "daily":
339
            day_to_use = dt
340
            day_to_use = normalize_date(day_to_use)
341
            return self._get_daily_data(asset, column_to_use, day_to_use)
342
        else:
343
            if isinstance(asset, Future):
344
                return self._get_minute_spot_value_future(
345
                    asset, column_to_use, dt)
346
            else:
347
                return self._get_minute_spot_value(
348
                    asset, column_to_use, dt)
349
350
    def _get_minute_spot_value_future(self, asset, column, dt):
351
        # Futures bcolz files have 1440 bars per day (24 hours), 7 days a week.
352
        # The file attributes contain the "start_dt" and "last_dt" fields,
353
        # which represent the time period for this bcolz file.
354
355
        # The start_dt is midnight of the first day that this future started
356
        # trading.
357
358
        # figure out the # of minutes between dt and this asset's start_dt
359
        start_date = self._get_asset_start_date(asset)
360
        minute_offset = int((dt - start_date).total_seconds() / 60)
361
362
        if minute_offset < 0:
363
            # asking for a date that is before the asset's start date, no dice
364
            return 0.0
365
366
        # then just index into the bcolz carray at that offset
367
        carray = self._open_minute_file(column, asset)
368
        result = carray[minute_offset]
369
370
        # if there's missing data, go backwards until we run out of file
371
        while result == 0 and minute_offset > 0:
372
            minute_offset -= 1
373
            result = carray[minute_offset]
374
375
        if column != 'volume':
376
            return result * self.MINUTE_PRICE_ADJUSTMENT_FACTOR
377
        else:
378
            return result
379
380
    def setup_offset_cache(self, minutes_by_day, minutes_to_day, trading_days):
381
        # TODO: This case should not be hit, but is when tests are setup
382
        # with data_frequency of daily, but run with minutely.
383
        if self._equity_minute_reader is None:
384
            return
385
386
        self._minutes_to_day = minutes_to_day
387
        self._minutes_by_day = minutes_by_day
388
        start = trading_days[0]
389
        first_trading_day_idx = self._equity_minute_reader.trading_days.\
390
            searchsorted(start)
391
        self._day_offsets = {
392
            day: (i + first_trading_day_idx) * 390
393
            for i, day in enumerate(trading_days)}
394
395
    def _minute_offset(self, dt):
396
        if self._day_offsets is not None:
397
            try:
398
                day = self._minutes_to_day[dt]
399
                minutes = self._minutes_by_day[day]
400
                return self._day_offsets[day] + minutes.get_loc(dt)
401
            except KeyError:
402
                return None
403
404
    def _get_minute_spot_value(self, asset, column, dt):
405
        # if dt is before the first market minute, minute_index
406
        # will be 0.  if it's after the last market minute, it'll
407
        # be len(minutes_for_day)
408
        minute_offset_to_use = self._minute_offset(dt)
409
410
        if minute_offset_to_use is None:
411
            given_day = pd.Timestamp(dt.date(), tz='utc')
412
            day_index = self._equity_minute_reader.trading_days.searchsorted(
413
                given_day)
414
415
            # if dt is before the first market minute, minute_index
416
            # will be 0.  if it's after the last market minute, it'll
417
            # be len(minutes_for_day)
418
            minute_index = self.env.market_minutes_for_day(given_day).\
419
                searchsorted(dt)
420
421
            minute_offset_to_use = (day_index * 390) + minute_index
422
423
        carray = self._equity_minute_reader._open_minute_file(column, asset)
424
        result = carray[minute_offset_to_use]
425
426
        if result == 0:
427
            # if the given minute doesn't have data, we need to seek
428
            # backwards until we find data. This makes the data
429
            # forward-filled.
430
431
            # get this asset's start date, so that we don't look before it.
432
            start_date = self._get_asset_start_date(asset)
433
            start_date_idx = self._equity_minute_reader.trading_days.\
434
                searchsorted(start_date)
435
            start_day_offset = start_date_idx * 390
436
437
            original_start = minute_offset_to_use
438
439
            while result == 0 and minute_offset_to_use > start_day_offset:
440
                minute_offset_to_use -= 1
441
                result = carray[minute_offset_to_use]
442
443
            # once we've found data, we need to check whether it needs
444
            # to be adjusted.
445
            if result != 0:
446
                minutes = self.env.market_minute_window(
447
                    start=dt,
448
                    count=(original_start - minute_offset_to_use + 1),
449
                    step=-1
450
                ).order()
451
452
                # only need to check for adjustments if we've gone back
453
                # far enough to cross the day boundary.
454
                if minutes[0].date() != minutes[-1].date():
455
                    # create a np array of size minutes, fill it all with
456
                    # the same value.  and adjust the array.
457
                    arr = np.array([result] * len(minutes),
458
                                   dtype=np.float64)
459
                    self._apply_all_adjustments(
460
                        data=arr,
461
                        asset=asset,
462
                        dts=minutes,
463
                        field=column
464
                    )
465
466
                    # The first value of the adjusted array is the value
467
                    # we want.
468
                    result = arr[0]
469
470
        if column != 'volume':
471
            return result * self.MINUTE_PRICE_ADJUSTMENT_FACTOR
472
        else:
473
            return result
474
475
    def _get_daily_data(self, asset, column, dt):
476
        while True:
477
            try:
478
                value = self._equity_daily_reader.spot_price(asset, dt, column)
479
                if value != -1:
480
                    return value
481
                else:
482
                    dt -= tradingcalendar.trading_day
483
            except NoDataOnDate:
484
                return 0
485
486
    def _get_history_daily_window(self, assets, end_dt, bar_count,
487
                                  field_to_use):
488
        """
489
        Internal method that returns a dataframe containing history bars
490
        of daily frequency for the given sids.
491
        """
492
        day_idx = tradingcalendar.trading_days.searchsorted(end_dt.date())
493
        days_for_window = tradingcalendar.trading_days[
494
            (day_idx - bar_count + 1):(day_idx + 1)]
495
496
        if len(assets) == 0:
497
            return pd.DataFrame(None,
498
                                index=days_for_window,
499
                                columns=None)
500
501
        data = []
502
503
        for asset in assets:
504
            if isinstance(asset, Future):
505
                data.append(self._get_history_daily_window_future(
506
                    asset, days_for_window, end_dt, field_to_use
507
                ))
508
            else:
509
                data.append(self._get_history_daily_window_equity(
510
                    asset, days_for_window, end_dt, field_to_use
511
                ))
512
513
        return pd.DataFrame(
514
            np.array(data).T,
515
            index=days_for_window,
516
            columns=assets
517
        )
518
519
    def _get_history_daily_window_future(self, asset, days_for_window,
520
                                         end_dt, column):
521
        # Since we don't have daily bcolz files for futures (yet), use minute
522
        # bars to calculate the daily values.
523
        data = []
524
        data_groups = []
525
526
        # get all the minutes for the days NOT including today
527
        for day in days_for_window[:-1]:
528
            minutes = self.env.market_minutes_for_day(day)
529
530
            values_for_day = np.zeros(len(minutes), dtype=np.float64)
531
532
            for idx, minute in enumerate(minutes):
533
                minute_val = self._get_minute_spot_value_future(
534
                    asset, column, minute
535
                )
536
537
                values_for_day[idx] = minute_val
538
539
            data_groups.append(values_for_day)
540
541
        # get the minutes for today
542
        last_day_minutes = pd.date_range(
543
            start=self.env.get_open_and_close(end_dt)[0],
544
            end=end_dt,
545
            freq="T"
546
        )
547
548
        values_for_last_day = np.zeros(len(last_day_minutes), dtype=np.float64)
549
550
        for idx, minute in enumerate(last_day_minutes):
551
            minute_val = self._get_minute_spot_value_future(
552
                asset, column, minute
553
            )
554
555
            values_for_last_day[idx] = minute_val
556
557
        data_groups.append(values_for_last_day)
558
559
        for group in data_groups:
560
            if len(group) == 0:
561
                continue
562
563
            if column == 'volume':
564
                data.append(np.sum(group))
565
            elif column == 'open':
566
                data.append(group[0])
567
            elif column == 'close':
568
                data.append(group[-1])
569
            elif column == 'high':
570
                data.append(np.amax(group))
571
            elif column == 'low':
572
                data.append(np.amin(group))
573
574
        return data
575
576
    def _get_history_daily_window_equity(self, asset, days_for_window,
577
                                         end_dt, field_to_use):
578
        sid = int(asset)
579
        ends_at_midnight = end_dt.hour == 0 and end_dt.minute == 0
580
581
        # get the start and end dates for this sid
582
        end_date = self._get_asset_end_date(asset)
583
584
        if ends_at_midnight or (days_for_window[-1] > end_date):
585
            # two cases where we use daily data for the whole range:
586
            # 1) the history window ends at midnight utc.
587
            # 2) the last desired day of the window is after the
588
            # last trading day, use daily data for the whole range.
589
            return self._get_daily_window_for_sid(
590
                asset,
591
                field_to_use,
592
                days_for_window,
593
                extra_slot=False
594
            )
595
        else:
596
            # for the last day of the desired window, use minute
597
            # data and aggregate it.
598
            all_minutes_for_day = self.env.market_minutes_for_day(
599
                pd.Timestamp(end_dt.date()))
600
601
            last_minute_idx = all_minutes_for_day.searchsorted(end_dt)
602
603
            # these are the minutes for the partial day
604
            minutes_for_partial_day =\
605
                all_minutes_for_day[0:(last_minute_idx + 1)]
606
607
            daily_data = self._get_daily_window_for_sid(
608
                sid,
609
                field_to_use,
610
                days_for_window[0:-1]
611
            )
612
613
            minute_data = self._get_minute_window_for_equity(
614
                sid,
615
                field_to_use,
616
                minutes_for_partial_day
617
            )
618
619
            if field_to_use == 'volume':
620
                minute_value = np.sum(minute_data)
621
            elif field_to_use == 'open':
622
                minute_value = minute_data[0]
623
            elif field_to_use == 'close':
624
                minute_value = minute_data[-1]
625
            elif field_to_use == 'high':
626
                minute_value = np.amax(minute_data)
627
            elif field_to_use == 'low':
628
                minute_value = np.amin(minute_data)
629
630
            # append the partial day.
631
            daily_data[-1] = minute_value
632
633
            return daily_data
634
635
    def _get_history_minute_window(self, assets, end_dt, bar_count,
636
                                   field_to_use):
637
        """
638
        Internal method that returns a dataframe containing history bars
639
        of minute frequency for the given sids.
640
        """
641
        # get all the minutes for this window
642
        minutes_for_window = self.env.market_minute_window(
643
            end_dt, bar_count, step=-1)[::-1]
644
645
        first_trading_day = self._equity_minute_reader.first_trading_day
646
647
        # but then cut it down to only the minutes after
648
        # the first trading day.
649
        modified_minutes_for_window = minutes_for_window[
650
            minutes_for_window.slice_indexer(first_trading_day)]
651
652
        modified_minutes_length = len(modified_minutes_for_window)
653
654
        if modified_minutes_length == 0:
655
            raise ValueError("Cannot calculate history window that ends"
656
                             "before 2002-01-02 14:31 UTC!")
657
658
        data = []
659
        bars_to_prepend = 0
660
        nans_to_prepend = None
661
662
        if modified_minutes_length < bar_count:
663
            first_trading_date = first_trading_day.date()
664
            if modified_minutes_for_window[0].date() == first_trading_date:
665
                # the beginning of the window goes before our global trading
666
                # start date
667
                bars_to_prepend = bar_count - modified_minutes_length
668
                nans_to_prepend = np.repeat(np.nan, bars_to_prepend)
669
670
        if len(assets) == 0:
671
            return pd.DataFrame(
672
                None,
673
                index=modified_minutes_for_window,
674
                columns=None
675
            )
676
677
        for asset in assets:
678
            asset_minute_data = self._get_minute_window_for_asset(
679
                asset,
680
                field_to_use,
681
                modified_minutes_for_window
682
            )
683
684
            if bars_to_prepend != 0:
685
                asset_minute_data = np.insert(asset_minute_data, 0,
686
                                              nans_to_prepend)
687
688
            data.append(asset_minute_data)
689
690
        return pd.DataFrame(
691
            np.array(data).T,
692
            index=minutes_for_window,
693
            columns=assets
694
        )
695
696
    def get_history_window(self, assets, end_dt, bar_count, frequency, field,
697
                           ffill=True):
698
        """
699
        Public API method that returns a dataframe containing the requested
700
        history window.  Data is fully adjusted.
701
702
        Parameters
703
        ---------
704
        assets : list of zipline.data.Asset objects
705
            The assets whose data is desired.
706
707
        bar_count: int
708
            The number of bars desired.
709
710
        frequency: string
711
            "1d" or "1m"
712
713
        field: string
714
            The desired field of the asset.
715
716
        ffill: boolean
717
            Forward-fill missing values. Only has effect if field
718
            is 'price'.
719
720
        Returns
721
        -------
722
        A dataframe containing the requested data.
723
        """
724
        try:
725
            field_to_use = BASE_FIELDS[field]
726
        except KeyError:
727
            raise ValueError("Invalid history field: " + str(field))
728
729
        # sanity check in case sids were passed in
730
        assets = [(self.env.asset_finder.retrieve_asset(asset) if
731
                   isinstance(asset, int) else asset) for asset in assets]
732
733
        if frequency == "1d":
734
            df = self._get_history_daily_window(assets, end_dt, bar_count,
735
                                                field_to_use)
736
        elif frequency == "1m":
737
            df = self._get_history_minute_window(assets, end_dt, bar_count,
738
                                                 field_to_use)
739
        else:
740
            raise ValueError("Invalid frequency: {0}".format(frequency))
741
742
        # forward-fill if needed
743
        if field == "price" and ffill:
744
            df.fillna(method='ffill', inplace=True)
745
746
        return df
747
748
    def _get_minute_window_for_asset(self, asset, field, minutes_for_window):
749
        """
750
        Internal method that gets a window of adjusted minute data for an asset
751
        and specified date range.  Used to support the history API method for
752
        minute bars.
753
754
        Missing bars are filled with NaN.
755
756
        Parameters
757
        ----------
758
        asset : Asset
759
            The asset whose data is desired.
760
761
        field: string
762
            The specific field to return.  "open", "high", "close_price", etc.
763
764
        minutes_for_window: pd.DateTimeIndex
765
            The list of minutes representing the desired window.  Each minute
766
            is a pd.Timestamp.
767
768
        Returns
769
        -------
770
        A numpy array with requested values.
771
        """
772
        if isinstance(asset, int):
773
            asset = self.env.asset_finder.retrieve_asset(asset)
774
775
        if isinstance(asset, Future):
776
            return self._get_minute_window_for_future(asset, field,
777
                                                      minutes_for_window)
778
        else:
779
            return self._get_minute_window_for_equity(asset, field,
780
                                                      minutes_for_window)
781
782
    def _get_minute_window_for_future(self, asset, field, minutes_for_window):
783
        # THIS IS TEMPORARY.  For now, we are only exposing futures within
784
        # equity trading hours (9:30 am to 4pm, Eastern).  The easiest way to
785
        # do this is to simply do a spot lookup for each desired minute.
786
        return_data = np.zeros(len(minutes_for_window), dtype=np.float64)
787
        for idx, minute in enumerate(minutes_for_window):
788
            return_data[idx] = \
789
                self._get_minute_spot_value_future(asset, field, minute)
790
791
        # Note: an improvement could be to find the consecutive runs within
792
        # minutes_for_window, and use them to read the underlying ctable
793
        # more efficiently.
794
795
        # Once futures are on 24-hour clock, then we can just grab all the
796
        # requested minutes in one shot from the ctable.
797
798
        # no adjustments for futures, yay.
799
        return return_data
800
801
    def _get_minute_window_for_equity(self, asset, field, minutes_for_window):
802
        # each sid's minutes are stored in a bcolz file
803
        # the bcolz file has 390 bars per day, starting at 1/2/2002, regardless
804
        # of when the asset started trading and regardless of half days.
805
        # for a half day, the second half is filled with zeroes.
806
807
        # find the position of start_dt in the entire timeline, go back
808
        # bar_count bars, and that's the unadjusted data
809
        raw_data = self._equity_minute_reader._open_minute_file(field, asset)
810
811
        start_idx = max(
812
            self._equity_minute_reader._find_position_of_minute(
813
                minutes_for_window[0]),
814
            0)
815
        end_idx = self._equity_minute_reader._find_position_of_minute(
816
            minutes_for_window[-1]) + 1
817
818
        if end_idx == 0:
819
            # No data to return for minute window.
820
            return np.full(len(minutes_for_window), np.nan)
821
822
        return_data = np.zeros(len(minutes_for_window), dtype=np.float64)
823
824
        data_to_copy = raw_data[start_idx:end_idx]
825
826
        num_minutes = len(minutes_for_window)
827
828
        # data_to_copy contains all the zeros (from 1pm to 4pm of an early
829
        # close).  num_minutes is the number of actual trading minutes.  if
830
        # these two have different lengths, that means that we need to trim
831
        # away data due to early closes.
832
        if len(data_to_copy) != num_minutes:
833
            # get a copy of the minutes in Eastern time, since we depend on
834
            # an early close being at 1pm Eastern.
835
            eastern_minutes = minutes_for_window.tz_convert("US/Eastern")
836
837
            # accumulate a list of indices of the last minute of an early
838
            # close day.  For example, if data_to_copy starts at 12:55 pm, and
839
            # there are five minutes of real data before 180 zeroes, we would
840
            # put 5 into last_minute_idx_of_early_close_day, because the fifth
841
            # minute is the last "real" minute of the day.
842
            last_minute_idx_of_early_close_day = []
843
            for minute_idx, minute_dt in enumerate(eastern_minutes):
844
                if minute_idx == (num_minutes - 1):
845
                    break
846
847
                if minute_dt.hour == 13 and minute_dt.minute == 0:
848
                    next_minute = eastern_minutes[minute_idx + 1]
849
                    if next_minute.hour != 13:
850
                        # minute_dt is the last minute of an early close day
851
                        last_minute_idx_of_early_close_day.append(minute_idx)
852
853
            # spin through the list of early close markers, and use them to
854
            # chop off 180 minutes at a time from data_to_copy.
855
            for idx, early_close_minute_idx in \
856
                    enumerate(last_minute_idx_of_early_close_day):
857
                early_close_minute_idx -= (180 * idx)
858
                data_to_copy = np.delete(
859
                    data_to_copy,
860
                    range(
861
                        early_close_minute_idx + 1,
862
                        early_close_minute_idx + 181
863
                    )
864
                )
865
866
        return_data[0:len(data_to_copy)] = data_to_copy
867
868
        self._apply_all_adjustments(
869
            return_data,
870
            asset,
871
            minutes_for_window,
872
            field,
873
            self.MINUTE_PRICE_ADJUSTMENT_FACTOR
874
        )
875
876
        return return_data
877
878
    def _apply_all_adjustments(self, data, asset, dts, field,
879
                               price_adj_factor=1.0):
880
        """
881
        Internal method that applies all the necessary adjustments on the
882
        given data array.
883
884
        The adjustments are:
885
        - splits
886
        - if field != "volume":
887
            - mergers
888
            - dividends
889
            - * 0.001
890
            - any zero fields replaced with NaN
891
        - all values rounded to 3 digits after the decimal point.
892
893
        Parameters
894
        ----------
895
        data : np.array
896
            The data to be adjusted.
897
898
        asset: Asset
899
            The asset whose data is being adjusted.
900
901
        dts: pd.DateTimeIndex
902
            The list of minutes or days representing the desired window.
903
904
        field: string
905
            The field whose values are in the data array.
906
907
        price_adj_factor: float
908
            Factor with which to adjust OHLC values.
909
        Returns
910
        -------
911
        None.  The data array is modified in place.
912
        """
913
        self._apply_adjustments_to_window(
914
            self._get_adjustment_list(
915
                asset, self._splits_dict, "SPLITS"
916
            ),
917
            data,
918
            dts,
919
            field != 'volume'
920
        )
921
922
        if field != 'volume':
923
            self._apply_adjustments_to_window(
924
                self._get_adjustment_list(
925
                    asset, self._mergers_dict, "MERGERS"
926
                ),
927
                data,
928
                dts,
929
                True
930
            )
931
932
            self._apply_adjustments_to_window(
933
                self._get_adjustment_list(
934
                    asset, self._dividends_dict, "DIVIDENDS"
935
                ),
936
                data,
937
                dts,
938
                True
939
            )
940
941
            data *= price_adj_factor
942
943
            # if anything is zero, it's a missing bar, so replace it with NaN.
944
            # we only want to do this for non-volume fields, because a missing
945
            # volume should be 0.
946
            data[data == 0] = np.NaN
947
948
        np.around(data, 3, out=data)
949
950
    def _get_daily_window_for_sid(self, asset, field, days_in_window,
951
                                  extra_slot=True):
952
        """
953
        Internal method that gets a window of adjusted daily data for a sid
954
        and specified date range.  Used to support the history API method for
955
        daily bars.
956
957
        Parameters
958
        ----------
959
        asset : Asset
960
            The asset whose data is desired.
961
962
        start_dt: pandas.Timestamp
963
            The start of the desired window of data.
964
965
        bar_count: int
966
            The number of days of data to return.
967
968
        field: string
969
            The specific field to return.  "open", "high", "close_price", etc.
970
971
        extra_slot: boolean
972
            Whether to allocate an extra slot in the returned numpy array.
973
            This extra slot will hold the data for the last partial day.  It's
974
            much better to create it here than to create a copy of the array
975
            later just to add a slot.
976
977
        Returns
978
        -------
979
        A numpy array with requested values.  Any missing slots filled with
980
        nan.
981
982
        """
983
        bar_count = len(days_in_window)
984
        # create an np.array of size bar_count
985
        if extra_slot:
986
            return_array = np.zeros((bar_count + 1,))
987
        else:
988
            return_array = np.zeros((bar_count,))
989
990
        return_array[:] = np.NAN
991
992
        start_date = self._get_asset_start_date(asset)
993
        end_date = self._get_asset_end_date(asset)
994
        day_slice = days_in_window.slice_indexer(start_date, end_date)
995
        active_days = days_in_window[day_slice]
996
997
        if active_days.shape[0]:
998
            data = self._equity_daily_reader.history_window(field,
999
                                                            active_days[0],
1000
                                                            active_days[-1],
1001
                                                            asset)
1002
            return_array[day_slice] = data
1003
            self._apply_all_adjustments(
1004
                return_array,
1005
                asset,
1006
                active_days,
1007
                field,
1008
            )
1009
1010
        return return_array
1011
1012
    @staticmethod
1013
    def _apply_adjustments_to_window(adjustments_list, window_data,
1014
                                     dts_in_window, multiply):
1015
        if len(adjustments_list) == 0:
1016
            return
1017
1018
        # advance idx to the correct spot in the adjustments list, based on
1019
        # when the window starts
1020
        idx = 0
1021
1022
        while idx < len(adjustments_list) and dts_in_window[0] >\
1023
                adjustments_list[idx][0]:
1024
            idx += 1
1025
1026
        # if we've advanced through all the adjustments, then there's nothing
1027
        # to do.
1028
        if idx == len(adjustments_list):
1029
            return
1030
1031
        while idx < len(adjustments_list):
1032
            adjustment_to_apply = adjustments_list[idx]
1033
1034
            if adjustment_to_apply[0] > dts_in_window[-1]:
1035
                break
1036
1037
            range_end = dts_in_window.searchsorted(adjustment_to_apply[0])
1038
            if multiply:
1039
                window_data[0:range_end] *= adjustment_to_apply[1]
1040
            else:
1041
                window_data[0:range_end] /= adjustment_to_apply[1]
1042
1043
            idx += 1
1044
1045
    def _get_adjustment_list(self, asset, adjustments_dict, table_name):
1046
        """
1047
        Internal method that returns a list of adjustments for the given sid.
1048
1049
        Parameters
1050
        ----------
1051
        asset : Asset
1052
            The asset for which to return adjustments.
1053
1054
        adjustments_dict: dict
1055
            A dictionary of sid -> list that is used as a cache.
1056
1057
        table_name: string
1058
            The table that contains this data in the adjustments db.
1059
1060
        Returns
1061
        -------
1062
        adjustments: list
1063
            A list of [multiplier, pd.Timestamp], earliest first
1064
1065
        """
1066
        if self._adjustment_reader is None:
1067
            return []
1068
1069
        sid = int(asset)
1070
1071
        try:
1072
            adjustments = adjustments_dict[sid]
1073
        except KeyError:
1074
            adjustments = adjustments_dict[sid] = self._adjustment_reader.\
1075
                get_adjustments_for_sid(table_name, sid)
1076
1077
        return adjustments
1078
1079
    def _check_is_currently_alive(self, asset, dt):
1080
        sid = int(asset)
1081
1082
        if sid not in self._asset_start_dates:
1083
            self._get_asset_start_date(asset)
1084
1085
        start_date = self._asset_start_dates[sid]
1086
        if self._asset_start_dates[sid] > dt:
1087
            raise NoTradeDataAvailableTooEarly(
1088
                sid=sid,
1089
                dt=normalize_date(dt),
1090
                start_dt=start_date
1091
            )
1092
1093
        end_date = self._asset_end_dates[sid]
1094
        if self._asset_end_dates[sid] < dt:
1095
            raise NoTradeDataAvailableTooLate(
1096
                sid=sid,
1097
                dt=normalize_date(dt),
1098
                end_dt=end_date
1099
            )
1100
1101
    def _get_asset_start_date(self, asset):
1102
        self._ensure_asset_dates(asset)
1103
        return self._asset_start_dates[asset]
1104
1105
    def _get_asset_end_date(self, asset):
1106
        self._ensure_asset_dates(asset)
1107
        return self._asset_end_dates[asset]
1108
1109
    def _ensure_asset_dates(self, asset):
1110
        sid = int(asset)
1111
1112
        if sid not in self._asset_start_dates:
1113
            self._asset_start_dates[sid] = asset.start_date
1114
            self._asset_end_dates[sid] = asset.end_date
1115
1116
    def get_splits(self, sids, dt):
1117
        """
1118
        Returns any splits for the given sids and the given dt.
1119
1120
        Parameters
1121
        ----------
1122
        sids : list
1123
            Sids for which we want splits.
1124
1125
        dt: pd.Timestamp
1126
            The date for which we are checking for splits.  Note: this is
1127
            expected to be midnight UTC.
1128
1129
        Returns
1130
        -------
1131
        list: List of splits, where each split is a (sid, ratio) tuple.
1132
        """
1133
        if self._adjustment_reader is None or len(sids) == 0:
1134
            return {}
1135
1136
        # convert dt to # of seconds since epoch, because that's what we use
1137
        # in the adjustments db
1138
        seconds = int(dt.value / 1e9)
1139
1140
        splits = self._adjustment_reader.conn.execute(
1141
            "SELECT sid, ratio FROM SPLITS WHERE effective_date = ?",
1142
            (seconds,)).fetchall()
1143
1144
        sids_set = set(sids)
1145
        splits = [split for split in splits if split[0] in sids_set]
1146
1147
        return splits
1148
1149
    def get_stock_dividends(self, sid, trading_days):
1150
        """
1151
        Returns all the stock dividends for a specific sid that occur
1152
        in the given trading range.
1153
1154
        Parameters
1155
        ----------
1156
        sid: int
1157
            The asset whose stock dividends should be returned.
1158
1159
        trading_days: pd.DatetimeIndex
1160
            The trading range.
1161
1162
        Returns
1163
        -------
1164
        list: A list of objects with all relevant attributes populated.
1165
        All timestamp fields are converted to pd.Timestamps.
1166
        """
1167
1168
        if self._adjustment_reader is None:
1169
            return []
1170
1171
        if len(trading_days) == 0:
1172
            return []
1173
1174
        start_dt = trading_days[0].value / 1e9
1175
        end_dt = trading_days[-1].value / 1e9
1176
1177
        dividends = self._adjustment_reader.conn.execute(
1178
            "SELECT * FROM stock_dividend_payouts WHERE sid = ? AND "
1179
            "ex_date > ? AND pay_date < ?", (int(sid), start_dt, end_dt,)).\
1180
            fetchall()
1181
1182
        dividend_info = []
1183
        for dividend_tuple in dividends:
1184
            dividend_info.append({
1185
                "declared_date": dividend_tuple[1],
1186
                "ex_date": pd.Timestamp(dividend_tuple[2], unit="s"),
1187
                "pay_date": pd.Timestamp(dividend_tuple[3], unit="s"),
1188
                "payment_sid": dividend_tuple[4],
1189
                "ratio": dividend_tuple[5],
1190
                "record_date": pd.Timestamp(dividend_tuple[6], unit="s"),
1191
                "sid": dividend_tuple[7]
1192
            })
1193
1194
        return dividend_info
1195
1196
    def contains(self, asset, field):
1197
        return field in BASE_FIELDS or \
1198
            (field in self._augmented_sources_map and
1199
             asset in self._augmented_sources_map[field])
1200
1201
    def get_fetcher_assets(self, day):
1202
        """
1203
        Returns a list of assets for the current date, as defined by the
1204
        fetcher data.
1205
1206
        Notes
1207
        -----
1208
        Data is forward-filled.  If there is no fetcher data defined for day
1209
        N, we use day N-1's data (if available, otherwise we keep going back).
1210
1211
        Returns
1212
        -------
1213
        list: a list of Asset objects.
1214
        """
1215
        # return a list of assets for the current date, as defined by the
1216
        # fetcher source
1217
        if self._extra_source_df is None:
1218
            return []
1219
1220
        if day in self._extra_source_df.index:
1221
            date_to_use = day
1222
        else:
1223
            # current day isn't in the fetcher df, go back the last
1224
            # available day
1225
            idx = self._extra_source_df.index.searchsorted(day)
1226
            if idx == 0:
1227
                return []
1228
1229
            date_to_use = self._extra_source_df.index[idx - 1]
1230
1231
        asset_list = self._extra_source_df.loc[date_to_use]["sid"]
1232
1233
        # make sure they're actually assets
1234
        asset_list = [asset for asset in asset_list
1235
                      if isinstance(asset, Asset)]
1236
1237
        return asset_list
1238