Completed
Pull Request — master (#858)
by Eddie
10:07 queued 01:13
created

zipline.data.DataPortal.get_previous_value()   B

Complexity

Conditions 3

Size

Total Lines 34

Duplication

Lines 0
Ratio 0 %
Metric Value
cc 3
dl 0
loc 34
rs 8.8571
1
#
2
# Copyright 2015 Quantopian, Inc.
3
#
4
# Licensed under the Apache License, Version 2.0 (the "License");
5
# you may not use this file except in compliance with the License.
6
# You may obtain a copy of the License at
7
#
8
#     http://www.apache.org/licenses/LICENSE-2.0
9
#
10
# Unless required by applicable law or agreed to in writing, software
11
# distributed under the License is distributed on an "AS IS" BASIS,
12
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13
# See the License for the specific language governing permissions and
14
# limitations under the License.
15
from operator import mul
16
17
import bcolz
18
from logbook import Logger
19
20
import numpy as np
21
import pandas as pd
22
from pandas.tslib import normalize_date
23
from six import iteritems
24
from six.moves import reduce
25
26
from zipline.assets import Asset, Future, Equity
27
from zipline.data.us_equity_pricing import NoDataOnDate
28
29
from zipline.utils import tradingcalendar
30
from zipline.errors import (
31
    NoTradeDataAvailableTooEarly,
32
    NoTradeDataAvailableTooLate
33
)
34
35
log = Logger('DataPortal')
36
37
BASE_FIELDS = {
38
    'open': 'open',
39
    'open_price': 'open',
40
    'high': 'high',
41
    'low': 'low',
42
    'close': 'close',
43
    'close_price': 'close',
44
    'volume': 'volume',
45
    'price': 'close'
46
}
47
48
HISTORY_FREQUENCIES = set(["1m", "1d"])
49
50
51
class DataPortal(object):
52
    def __init__(self,
53
                 env,
54
                 equity_daily_reader=None,
55
                 equity_minute_reader=None,
56
                 future_daily_reader=None,
57
                 future_minute_reader=None,
58
                 adjustment_reader=None):
59
        self.env = env
60
61
        self.views = {}
62
63
        self._asset_finder = env.asset_finder
64
65
        self._carrays = {
66
            'open': {},
67
            'high': {},
68
            'low': {},
69
            'close': {},
70
            'volume': {},
71
            'sid': {},
72
            'dt': {},
73
        }
74
75
        self._adjustment_reader = adjustment_reader
76
77
        # caches of sid -> adjustment list
78
        self._splits_dict = {}
79
        self._mergers_dict = {}
80
        self._dividends_dict = {}
81
82
        # Cache of sid -> the first trading day of an asset, even if that day
83
        # is before 1/2/2002.
84
        self._asset_start_dates = {}
85
        self._asset_end_dates = {}
86
87
        # Handle extra sources, like Fetcher.
88
        self._augmented_sources_map = {}
89
        self._extra_source_df = None
90
91
        self.MINUTE_PRICE_ADJUSTMENT_FACTOR = 0.001
92
93
        self._equity_daily_reader = equity_daily_reader
94
        self._equity_minute_reader = equity_minute_reader
95
        self._future_daily_reader = future_daily_reader
96
        self._future_minute_reader = future_minute_reader
97
98
    def handle_extra_source(self, source_df, sim_params):
99
        """
100
        Extra sources always have a sid column.
101
102
        We expand the given data (by forward filling) to the full range of
103
        the simulation dates, so that lookup is fast during simulation.
104
        """
105
        if source_df is None:
106
            return
107
108
        self._extra_source_df = source_df
109
110
        # source_df's sid column can either consist of assets we know about
111
        # (such as sid(24)) or of assets we don't know about (such as
112
        # palladium).
113
        #
114
        # In both cases, we break up the dataframe into individual dfs
115
        # that only contain a single asset's information.  ie, if source_df
116
        # has data for PALLADIUM and GOLD, we split source_df into two
117
        # dataframes, one for each. (same applies if source_df has data for
118
        # AAPL and IBM).
119
        #
120
        # We then take each child df and reindex it to the simulation's date
121
        # range by forward-filling missing values. this makes reads simpler.
122
        #
123
        # Finally, we store the data. For each column, we store a mapping in
124
        # self.augmented_sources_map from the column to a dictionary of
125
        # asset -> df.  In other words,
126
        # self.augmented_sources_map['days_to_cover']['AAPL'] gives us the df
127
        # holding that data.
128
129
        if sim_params.emission_rate == "daily":
130
            source_date_index = self.env.days_in_range(
131
                start=sim_params.period_start,
132
                end=sim_params.period_end
133
            )
134
        else:
135
            source_date_index = self.env.minutes_for_days_in_range(
136
                start=sim_params.period_start,
137
                end=sim_params.period_end
138
            )
139
140
        # Break the source_df up into one dataframe per sid.  This lets
141
        # us (more easily) calculate accurate start/end dates for each sid,
142
        # de-dup data, and expand the data to fit the backtest start/end date.
143
        grouped_by_sid = source_df.groupby(["sid"])
144
        group_names = grouped_by_sid.groups.keys()
145
        group_dict = {}
146
        for group_name in group_names:
147
            group_dict[group_name] = grouped_by_sid.get_group(group_name)
148
149
        for identifier, df in iteritems(group_dict):
150
            # Before reindexing, save the earliest and latest dates
151
            earliest_date = df.index[0]
152
            latest_date = df.index[-1]
153
154
            # Since we know this df only contains a single sid, we can safely
155
            # de-dupe by the index (dt)
156
            df = df.groupby(level=0).last()
157
158
            # Reindex the dataframe based on the backtest start/end date.
159
            # This makes reads easier during the backtest.
160
            df = df.reindex(index=source_date_index, method='ffill')
161
162
            if not isinstance(identifier, Asset):
163
                # for fake assets we need to store a start/end date
164
                self._asset_start_dates[identifier] = earliest_date
165
                self._asset_end_dates[identifier] = latest_date
166
167
            for col_name in df.columns.difference(['sid']):
168
                if col_name not in self._augmented_sources_map:
169
                    self._augmented_sources_map[col_name] = {}
170
171
                self._augmented_sources_map[col_name][identifier] = df
172
173
    def _open_minute_file(self, field, asset):
174
        sid_str = str(int(asset))
175
176
        try:
177
            carray = self._carrays[field][sid_str]
178
        except KeyError:
179
            carray = self._carrays[field][sid_str] = \
180
                self._get_ctable(asset)[field]
181
182
        return carray
183
184
    def _get_ctable(self, asset):
185
        sid = int(asset)
186
187
        if isinstance(asset, Future):
188
            if self._future_minute_reader.sid_path_func is not None:
189
                path = self._future_minute_reader.sid_path_func(
190
                    self._future_minute_reader.rootdir, sid
191
                )
192
            else:
193
                path = "{0}/{1}.bcolz".format(
194
                    self._future_minute_reader.rootdir, sid)
195
        elif isinstance(asset, Equity):
196
            if self._equity_minute_reader.sid_path_func is not None:
197
                path = self._equity_minute_reader.sid_path_func(
198
                    self._equity_minute_reader.rootdir, sid
199
                )
200
            else:
201
                path = "{0}/{1}.bcolz".format(
202
                    self._equity_minute_reader.rootdir, sid)
203
204
        else:
205
            # TODO: Figure out if assets should be allowed if neither, and
206
            # why this code path is being hit.
207
            if self._equity_minute_reader.sid_path_func is not None:
208
                path = self._equity_minute_reader.sid_path_func(
209
                    self._equity_minute_reader.rootdir, sid
210
                )
211
            else:
212
                path = "{0}/{1}.bcolz".format(
213
                    self._equity_minute_reader.rootdir, sid)
214
215
        return bcolz.open(path, mode='r')
216
217
    def get_last_traded_dt(self, asset, dt, data_frequency):
218
        """
219
        Given an asset and dt, returns the last traded dt from the viewpoint
220
        of the given dt.
221
222
        If there is a trade on the dt, the answer is dt provided.
223
        """
224
        if data_frequency == 'minute':
225
            return self._equity_minute_reader.get_last_traded_dt(asset, dt)
226
        elif data_frequency == 'daily':
227
            return self._equity_daily_reader.get_last_traded_dt(asset, dt)
228
229
    def get_previous_value(self, asset, field, dt, data_frequency):
230
        """
231
        Given an asset and a column and a dt, returns the previous value for
232
        the same asset/column pair.  If this data portal is in minute mode,
233
        it's the previous minute value, otherwise it's the previous day's
234
        value.
235
236
        Parameters
237
        ---------
238
        asset : Asset
239
            The asset whose data is desired.
240
241
        field: string
242
            The desired field of the asset.  Valid values are "open",
243
            "open_price", "high", "low", "close", "close_price", "volume", and
244
            "price".
245
246
        dt: pd.Timestamp
247
            The timestamp from which to go back in time one slot.
248
249
        data_frequency: string
250
            The frequency of the data to query; i.e. whether the data is
251
            'daily' or 'minute' bars
252
253
        Returns
254
        -------
255
        The value of the desired field at the desired time.
256
        """
257
        if data_frequency == 'daily':
258
            prev_dt = self.env.previous_trading_day(dt)
259
        elif data_frequency == 'minute':
260
            prev_dt = self.env.previous_market_minute(dt)
261
262
        return self.get_spot_value(asset, field, prev_dt, data_frequency)
263
264
    def _check_extra_sources(self, asset, column, day):
265
        # If we have an extra source with a column called "price", only look
266
        # at it if it's on something like palladium and not AAPL (since our
267
        # own price data always wins when dealing with assets).
268
        look_in_augmented_sources = column in self._augmented_sources_map and \
269
            not (column in BASE_FIELDS and isinstance(asset, Asset))
270
271
        if look_in_augmented_sources:
272
            # we're being asked for a field in an extra source
273
            try:
274
                return self._augmented_sources_map[column][asset].\
275
                    loc[day, column]
276
            except:
277
                log.error(
278
                    "Could not find value for asset={0}, day={1},"
279
                    "column={2}".format(
280
                        str(asset),
281
                        str(day),
282
                        str(column)))
283
284
                raise KeyError
285
286
    def get_spot_value(self, asset, field, dt, data_frequency):
287
        """
288
        Public API method that returns a scalar value representing the value
289
        of the desired asset's field at either the given dt.
290
291
        Parameters
292
        ---------
293
        asset : Asset
294
            The asset whose data is desired.
295
296
        field: string
297
            The desired field of the asset.  Valid values are "open",
298
            "open_price", "high", "low", "close", "close_price", "volume", and
299
            "price".
300
301
        dt: pd.Timestamp
302
            The timestamp for the desired value.
303
304
        data_frequency: string
305
            The frequency of the data to query; i.e. whether the data is
306
            'daily' or 'minute' bars
307
308
        Returns
309
        -------
310
        The value of the desired field at the desired time.
311
        """
312
        extra_source_val = self._check_extra_sources(
313
            asset,
314
            field,
315
            dt,
316
        )
317
318
        if extra_source_val is not None:
319
            return extra_source_val
320
321
        if field not in BASE_FIELDS:
322
            raise KeyError("Invalid column: " + str(field))
323
324
        column_to_use = BASE_FIELDS[field]
325
326
        if isinstance(asset, int):
327
            asset = self._asset_finder.retrieve_asset(asset)
328
329
        self._check_is_currently_alive(asset, dt)
330
331
        if data_frequency == "daily":
332
            day_to_use = dt
333
            day_to_use = normalize_date(day_to_use)
334
            return self._get_daily_data(asset, column_to_use, day_to_use)
335
        else:
336
            if isinstance(asset, Future):
337
                return self._get_minute_spot_value_future(
338
                    asset, column_to_use, dt)
339
            else:
340
                return self._get_minute_spot_value(
341
                    asset, column_to_use, dt)
342
343
    def _get_adjusted_value(self, asset, field, dt,
344
                            perspective_dt,
345
                            data_frequency):
346
        """
347
        Private method that returns a scalar value representing the value
348
        of the desired asset's field at the given dt with adjustments applied.
349
350
        Parameters
351
        ---------
352
        asset : Asset
353
            The asset whose data is desired.
354
355
        field: string
356
            The desired field of the asset.  Valid values are "open",
357
            "open_price", "high", "low", "close", "close_price", "volume", and
358
            "price".
359
360
        dt: pd.Timestamp
361
            The timestamp for the desired value.
362
363
        perspective_dt : pd.Timestamp
364
            The timestamp from which the data is being viewed back from.
365
366
        data_frequency: string
367
            The frequency of the data to query; i.e. whether the data is
368
            'daily' or 'minute' bars
369
370
        Returns
371
        -------
372
        The value of the desired field at the desired time.
373
        """
374
        if isinstance(asset, int):
375
            asset = self._asset_finder.retrieve_asset(asset)
376
377
        spot_value = self.get_spot_value(asset, field, dt, data_frequency)
378
379
        if isinstance(asset, Equity):
380
            adjs = []
381
            split_adjustments = self._get_adjustment_list(
382
                asset, self._splits_dict, "SPLITS"
383
            )
384
            for adj_dt, adj in split_adjustments:
385
                if adj_dt < dt:
386
                    if field != 'volume':
387
                        adjs.append(adj)
388
                    else:
389
                        adjs.append(1.0 / adj)
390
                if adj_dt >= perspective_dt:
391
                    break
392
393
            if field != 'volume':
394
                merger_adjustments = self._get_adjustment_list(
395
                    asset, self._mergers_dict, "MERGERS"
396
                )
397
                for adj_dt, adj in merger_adjustments:
398
                    if adj_dt < dt:
399
                        adjs.append(adj)
400
                    if adj_dt >= perspective_dt:
401
                        break
402
                div_adjustments = self._get_adjustment_list(
403
                    asset, self._dividends_dict, "DIVIDENDS",
404
                )
405
                for adj_dt, adj in div_adjustments:
406
                    if adj_dt < dt:
407
                        adjs.append(adj)
408
                    if adj_dt >= perspective_dt:
409
                        break
410
411
            ratio = reduce(mul, adjs, 1.0)
412
413
            spot_value *= ratio
414
415
        return spot_value
416
417
    def _get_minute_spot_value_future(self, asset, column, dt):
418
        # Futures bcolz files have 1440 bars per day (24 hours), 7 days a week.
419
        # The file attributes contain the "start_dt" and "last_dt" fields,
420
        # which represent the time period for this bcolz file.
421
422
        # The start_dt is midnight of the first day that this future started
423
        # trading.
424
425
        # figure out the # of minutes between dt and this asset's start_dt
426
        start_date = self._get_asset_start_date(asset)
427
        minute_offset = int((dt - start_date).total_seconds() / 60)
428
429
        if minute_offset < 0:
430
            # asking for a date that is before the asset's start date, no dice
431
            return 0.0
432
433
        # then just index into the bcolz carray at that offset
434
        carray = self._open_minute_file(column, asset)
435
        result = carray[minute_offset]
436
437
        # if there's missing data, go backwards until we run out of file
438
        while result == 0 and minute_offset > 0:
439
            minute_offset -= 1
440
            result = carray[minute_offset]
441
442
        if column != 'volume':
443
            return result * self.MINUTE_PRICE_ADJUSTMENT_FACTOR
444
        else:
445
            return result
446
447
    def _get_minute_spot_value(self, asset, column, dt):
448
        # if dt is before the first market minute, minute_index
449
        # will be 0.  if it's after the last market minute, it'll
450
        # be len(minutes_for_day)
451
        minute_offset_to_use = \
452
            self._equity_minute_reader._find_position_of_minute(dt)
453
454
        carray = self._equity_minute_reader._open_minute_file(column, asset)
455
        result = carray[minute_offset_to_use]
456
457
        if result == 0:
458
            # if the given minute doesn't have data, we need to seek
459
            # backwards until we find data. This makes the data
460
            # forward-filled.
461
462
            # get this asset's start date, so that we don't look before it.
463
            start_date = self._get_asset_start_date(asset)
464
            start_date_idx = self._equity_minute_reader.trading_days.\
465
                searchsorted(start_date)
466
            start_day_offset = start_date_idx * 390
467
468
            original_start = minute_offset_to_use
469
470
            while result == 0 and minute_offset_to_use > start_day_offset:
471
                minute_offset_to_use -= 1
472
                result = carray[minute_offset_to_use]
473
474
            # once we've found data, we need to check whether it needs
475
            # to be adjusted.
476
            if result != 0:
477
                minutes = self.env.market_minute_window(
478
                    start=dt,
479
                    count=(original_start - minute_offset_to_use + 1),
480
                    step=-1
481
                ).order()
482
483
                # only need to check for adjustments if we've gone back
484
                # far enough to cross the day boundary.
485
                if minutes[0].date() != minutes[-1].date():
486
                    # create a np array of size minutes, fill it all with
487
                    # the same value.  and adjust the array.
488
                    arr = np.array([result] * len(minutes),
489
                                   dtype=np.float64)
490
                    self._apply_all_adjustments(
491
                        data=arr,
492
                        asset=asset,
493
                        dts=minutes,
494
                        field=column
495
                    )
496
497
                    # The first value of the adjusted array is the value
498
                    # we want.
499
                    result = arr[0]
500
501
        if column != 'volume':
502
            return result * self.MINUTE_PRICE_ADJUSTMENT_FACTOR
503
        else:
504
            return result
505
506
    def _get_daily_data(self, asset, column, dt):
507
        while True:
508
            try:
509
                value = self._equity_daily_reader.spot_price(asset, dt, column)
510
                if value != -1:
511
                    return value
512
                else:
513
                    dt -= tradingcalendar.trading_day
514
            except NoDataOnDate:
515
                return 0
516
517
    def _get_history_daily_window(self, assets, end_dt, bar_count,
518
                                  field_to_use):
519
        """
520
        Internal method that returns a dataframe containing history bars
521
        of daily frequency for the given sids.
522
        """
523
        day_idx = tradingcalendar.trading_days.searchsorted(end_dt.date())
524
        days_for_window = tradingcalendar.trading_days[
525
            (day_idx - bar_count + 1):(day_idx + 1)]
526
527
        if len(assets) == 0:
528
            return pd.DataFrame(None,
529
                                index=days_for_window,
530
                                columns=None)
531
532
        data = []
533
534
        for asset in assets:
535
            if isinstance(asset, Future):
536
                data.append(self._get_history_daily_window_future(
537
                    asset, days_for_window, end_dt, field_to_use
538
                ))
539
            else:
540
                data.append(self._get_history_daily_window_equity(
541
                    asset, days_for_window, end_dt, field_to_use
542
                ))
543
544
        return pd.DataFrame(
545
            np.array(data).T,
546
            index=days_for_window,
547
            columns=assets
548
        )
549
550
    def _get_history_daily_window_future(self, asset, days_for_window,
551
                                         end_dt, column):
552
        # Since we don't have daily bcolz files for futures (yet), use minute
553
        # bars to calculate the daily values.
554
        data = []
555
        data_groups = []
556
557
        # get all the minutes for the days NOT including today
558
        for day in days_for_window[:-1]:
559
            minutes = self.env.market_minutes_for_day(day)
560
561
            values_for_day = np.zeros(len(minutes), dtype=np.float64)
562
563
            for idx, minute in enumerate(minutes):
564
                minute_val = self._get_minute_spot_value_future(
565
                    asset, column, minute
566
                )
567
568
                values_for_day[idx] = minute_val
569
570
            data_groups.append(values_for_day)
571
572
        # get the minutes for today
573
        last_day_minutes = pd.date_range(
574
            start=self.env.get_open_and_close(end_dt)[0],
575
            end=end_dt,
576
            freq="T"
577
        )
578
579
        values_for_last_day = np.zeros(len(last_day_minutes), dtype=np.float64)
580
581
        for idx, minute in enumerate(last_day_minutes):
582
            minute_val = self._get_minute_spot_value_future(
583
                asset, column, minute
584
            )
585
586
            values_for_last_day[idx] = minute_val
587
588
        data_groups.append(values_for_last_day)
589
590
        for group in data_groups:
591
            if len(group) == 0:
592
                continue
593
594
            if column == 'volume':
595
                data.append(np.sum(group))
596
            elif column == 'open':
597
                data.append(group[0])
598
            elif column == 'close':
599
                data.append(group[-1])
600
            elif column == 'high':
601
                data.append(np.amax(group))
602
            elif column == 'low':
603
                data.append(np.amin(group))
604
605
        return data
606
607
    def _get_history_daily_window_equity(self, asset, days_for_window,
608
                                         end_dt, field_to_use):
609
        sid = int(asset)
610
        ends_at_midnight = end_dt.hour == 0 and end_dt.minute == 0
611
612
        # get the start and end dates for this sid
613
        end_date = self._get_asset_end_date(asset)
614
615
        if ends_at_midnight or (days_for_window[-1] > end_date):
616
            # two cases where we use daily data for the whole range:
617
            # 1) the history window ends at midnight utc.
618
            # 2) the last desired day of the window is after the
619
            # last trading day, use daily data for the whole range.
620
            return self._get_daily_window_for_sid(
621
                asset,
622
                field_to_use,
623
                days_for_window,
624
                extra_slot=False
625
            )
626
        else:
627
            # for the last day of the desired window, use minute
628
            # data and aggregate it.
629
            all_minutes_for_day = self.env.market_minutes_for_day(
630
                pd.Timestamp(end_dt.date()))
631
632
            last_minute_idx = all_minutes_for_day.searchsorted(end_dt)
633
634
            # these are the minutes for the partial day
635
            minutes_for_partial_day =\
636
                all_minutes_for_day[0:(last_minute_idx + 1)]
637
638
            daily_data = self._get_daily_window_for_sid(
639
                sid,
640
                field_to_use,
641
                days_for_window[0:-1]
642
            )
643
644
            minute_data = self._get_minute_window_for_equity(
645
                sid,
646
                field_to_use,
647
                minutes_for_partial_day
648
            )
649
650
            if field_to_use == 'volume':
651
                minute_value = np.sum(minute_data)
652
            elif field_to_use == 'open':
653
                minute_value = minute_data[0]
654
            elif field_to_use == 'close':
655
                minute_value = minute_data[-1]
656
            elif field_to_use == 'high':
657
                minute_value = np.amax(minute_data)
658
            elif field_to_use == 'low':
659
                minute_value = np.amin(minute_data)
660
661
            # append the partial day.
662
            daily_data[-1] = minute_value
663
664
            return daily_data
665
666
    def _get_history_minute_window(self, assets, end_dt, bar_count,
667
                                   field_to_use):
668
        """
669
        Internal method that returns a dataframe containing history bars
670
        of minute frequency for the given sids.
671
        """
672
        # get all the minutes for this window
673
        minutes_for_window = self.env.market_minute_window(
674
            end_dt, bar_count, step=-1)[::-1]
675
676
        first_trading_day = self._equity_minute_reader.first_trading_day
677
678
        # but then cut it down to only the minutes after
679
        # the first trading day.
680
        modified_minutes_for_window = minutes_for_window[
681
            minutes_for_window.slice_indexer(first_trading_day)]
682
683
        modified_minutes_length = len(modified_minutes_for_window)
684
685
        if modified_minutes_length == 0:
686
            raise ValueError("Cannot calculate history window that ends"
687
                             "before 2002-01-02 14:31 UTC!")
688
689
        data = []
690
        bars_to_prepend = 0
691
        nans_to_prepend = None
692
693
        if modified_minutes_length < bar_count:
694
            first_trading_date = first_trading_day.date()
695
            if modified_minutes_for_window[0].date() == first_trading_date:
696
                # the beginning of the window goes before our global trading
697
                # start date
698
                bars_to_prepend = bar_count - modified_minutes_length
699
                nans_to_prepend = np.repeat(np.nan, bars_to_prepend)
700
701
        if len(assets) == 0:
702
            return pd.DataFrame(
703
                None,
704
                index=modified_minutes_for_window,
705
                columns=None
706
            )
707
708
        for asset in assets:
709
            asset_minute_data = self._get_minute_window_for_asset(
710
                asset,
711
                field_to_use,
712
                modified_minutes_for_window
713
            )
714
715
            if bars_to_prepend != 0:
716
                asset_minute_data = np.insert(asset_minute_data, 0,
717
                                              nans_to_prepend)
718
719
            data.append(asset_minute_data)
720
721
        return pd.DataFrame(
722
            np.array(data).T,
723
            index=minutes_for_window,
724
            columns=assets
725
        )
726
727
    def get_history_window(self, assets, end_dt, bar_count, frequency, field,
728
                           ffill=True):
729
        """
730
        Public API method that returns a dataframe containing the requested
731
        history window.  Data is fully adjusted.
732
733
        Parameters
734
        ---------
735
        assets : list of zipline.data.Asset objects
736
            The assets whose data is desired.
737
738
        bar_count: int
739
            The number of bars desired.
740
741
        frequency: string
742
            "1d" or "1m"
743
744
        field: string
745
            The desired field of the asset.
746
747
        ffill: boolean
748
            Forward-fill missing values. Only has effect if field
749
            is 'price'.
750
751
        Returns
752
        -------
753
        A dataframe containing the requested data.
754
        """
755
        try:
756
            field_to_use = BASE_FIELDS[field]
757
        except KeyError:
758
            raise ValueError("Invalid history field: " + str(field))
759
760
        # sanity check in case sids were passed in
761
        assets = np.array([
762
            (self.env.asset_finder.retrieve_asset(asset) if
763
             isinstance(asset, int) else asset) for asset in assets])
764
765
        if frequency == "1d":
766
            df = self._get_history_daily_window(assets, end_dt, bar_count,
767
                                                field_to_use)
768
        elif frequency == "1m":
769
            df = self._get_history_minute_window(assets, end_dt, bar_count,
770
                                                 field_to_use)
771
        else:
772
            raise ValueError("Invalid frequency: {0}".format(frequency))
773
774
        # forward-fill if needed
775
        if field == "price" and ffill:
776
            if frequency == "1m":
777
                data_frequency = 'minute'
778
            elif frequency == "1d":
779
                data_frequency = 'daily'
780
            else:
781
                raise Exception(
782
                    "Only 1d and 1m are supported for forward-filling.")
783
784
            dt_to_fill = df.index[0]
785
786
            perspective_dt = df.index[-1]
787
            assets_with_leading_nan = np.where(pd.isnull(df.iloc[0]))[0]
788
            for missing_loc in assets_with_leading_nan:
789
                asset = assets[missing_loc]
790
                previous_dt = self.get_last_traded_dt(
791
                    asset, dt_to_fill, data_frequency)
792
                if pd.isnull(previous_dt):
793
                    continue
794
                previous_value = self._get_adjusted_value(
795
                    asset,
796
                    field,
797
                    previous_dt,
798
                    perspective_dt,
799
                    data_frequency,
800
                )
801
                df.iloc[0, missing_loc] = previous_value
802
803
            df.fillna(method='ffill', inplace=True)
804
805
        return df
806
807
    def _get_minute_window_for_asset(self, asset, field, minutes_for_window):
808
        """
809
        Internal method that gets a window of adjusted minute data for an asset
810
        and specified date range.  Used to support the history API method for
811
        minute bars.
812
813
        Missing bars are filled with NaN.
814
815
        Parameters
816
        ----------
817
        asset : Asset
818
            The asset whose data is desired.
819
820
        field: string
821
            The specific field to return.  "open", "high", "close_price", etc.
822
823
        minutes_for_window: pd.DateTimeIndex
824
            The list of minutes representing the desired window.  Each minute
825
            is a pd.Timestamp.
826
827
        Returns
828
        -------
829
        A numpy array with requested values.
830
        """
831
        if isinstance(asset, int):
832
            asset = self.env.asset_finder.retrieve_asset(asset)
833
834
        if isinstance(asset, Future):
835
            return self._get_minute_window_for_future(asset, field,
836
                                                      minutes_for_window)
837
        else:
838
            return self._get_minute_window_for_equity(asset, field,
839
                                                      minutes_for_window)
840
841
    def _get_minute_window_for_future(self, asset, field, minutes_for_window):
842
        # THIS IS TEMPORARY.  For now, we are only exposing futures within
843
        # equity trading hours (9:30 am to 4pm, Eastern).  The easiest way to
844
        # do this is to simply do a spot lookup for each desired minute.
845
        return_data = np.zeros(len(minutes_for_window), dtype=np.float64)
846
        for idx, minute in enumerate(minutes_for_window):
847
            return_data[idx] = \
848
                self._get_minute_spot_value_future(asset, field, minute)
849
850
        # Note: an improvement could be to find the consecutive runs within
851
        # minutes_for_window, and use them to read the underlying ctable
852
        # more efficiently.
853
854
        # Once futures are on 24-hour clock, then we can just grab all the
855
        # requested minutes in one shot from the ctable.
856
857
        # no adjustments for futures, yay.
858
        return return_data
859
860
    def _get_minute_window_for_equity(self, asset, field, minutes_for_window):
861
        # each sid's minutes are stored in a bcolz file
862
        # the bcolz file has 390 bars per day, starting at 1/2/2002, regardless
863
        # of when the asset started trading and regardless of half days.
864
        # for a half day, the second half is filled with zeroes.
865
866
        # find the position of start_dt in the entire timeline, go back
867
        # bar_count bars, and that's the unadjusted data
868
        raw_data = self._equity_minute_reader._open_minute_file(field, asset)
869
870
        try:
871
            start_idx = self._equity_minute_reader._find_position_of_minute(
872
                minutes_for_window[0])
873
        except KeyError:
874
            start_idx = 0
875
876
        try:
877
            end_idx = self._equity_minute_reader._find_position_of_minute(
878
                minutes_for_window[-1]) + 1
879
        except KeyError:
880
            end_idx = 0
881
882
        if end_idx == 0:
883
            # No data to return for minute window.
884
            return np.full(len(minutes_for_window), np.nan)
885
886
        return_data = np.zeros(len(minutes_for_window), dtype=np.float64)
887
888
        data_to_copy = raw_data[start_idx:end_idx]
889
890
        num_minutes = len(minutes_for_window)
891
892
        # data_to_copy contains all the zeros (from 1pm to 4pm of an early
893
        # close).  num_minutes is the number of actual trading minutes.  if
894
        # these two have different lengths, that means that we need to trim
895
        # away data due to early closes.
896
        if len(data_to_copy) != num_minutes:
897
            # get a copy of the minutes in Eastern time, since we depend on
898
            # an early close being at 1pm Eastern.
899
            eastern_minutes = minutes_for_window.tz_convert("US/Eastern")
900
901
            # accumulate a list of indices of the last minute of an early
902
            # close day.  For example, if data_to_copy starts at 12:55 pm, and
903
            # there are five minutes of real data before 180 zeroes, we would
904
            # put 5 into last_minute_idx_of_early_close_day, because the fifth
905
            # minute is the last "real" minute of the day.
906
            last_minute_idx_of_early_close_day = []
907
            for minute_idx, minute_dt in enumerate(eastern_minutes):
908
                if minute_idx == (num_minutes - 1):
909
                    break
910
911
                if minute_dt.hour == 13 and minute_dt.minute == 0:
912
                    next_minute = eastern_minutes[minute_idx + 1]
913
                    if next_minute.hour != 13:
914
                        # minute_dt is the last minute of an early close day
915
                        last_minute_idx_of_early_close_day.append(minute_idx)
916
917
            # spin through the list of early close markers, and use them to
918
            # chop off 180 minutes at a time from data_to_copy.
919
            for idx, early_close_minute_idx in \
920
                    enumerate(last_minute_idx_of_early_close_day):
921
                early_close_minute_idx -= (180 * idx)
922
                data_to_copy = np.delete(
923
                    data_to_copy,
924
                    range(
925
                        early_close_minute_idx + 1,
926
                        early_close_minute_idx + 181
927
                    )
928
                )
929
930
        return_data[0:len(data_to_copy)] = data_to_copy
931
932
        self._apply_all_adjustments(
933
            return_data,
934
            asset,
935
            minutes_for_window,
936
            field,
937
            self.MINUTE_PRICE_ADJUSTMENT_FACTOR
938
        )
939
940
        return return_data
941
942
    def _apply_all_adjustments(self, data, asset, dts, field,
943
                               price_adj_factor=1.0):
944
        """
945
        Internal method that applies all the necessary adjustments on the
946
        given data array.
947
948
        The adjustments are:
949
        - splits
950
        - if field != "volume":
951
            - mergers
952
            - dividends
953
            - * 0.001
954
            - any zero fields replaced with NaN
955
        - all values rounded to 3 digits after the decimal point.
956
957
        Parameters
958
        ----------
959
        data : np.array
960
            The data to be adjusted.
961
962
        asset: Asset
963
            The asset whose data is being adjusted.
964
965
        dts: pd.DateTimeIndex
966
            The list of minutes or days representing the desired window.
967
968
        field: string
969
            The field whose values are in the data array.
970
971
        price_adj_factor: float
972
            Factor with which to adjust OHLC values.
973
        Returns
974
        -------
975
        None.  The data array is modified in place.
976
        """
977
        self._apply_adjustments_to_window(
978
            self._get_adjustment_list(
979
                asset, self._splits_dict, "SPLITS"
980
            ),
981
            data,
982
            dts,
983
            field != 'volume'
984
        )
985
986
        if field != 'volume':
987
            self._apply_adjustments_to_window(
988
                self._get_adjustment_list(
989
                    asset, self._mergers_dict, "MERGERS"
990
                ),
991
                data,
992
                dts,
993
                True
994
            )
995
996
            self._apply_adjustments_to_window(
997
                self._get_adjustment_list(
998
                    asset, self._dividends_dict, "DIVIDENDS"
999
                ),
1000
                data,
1001
                dts,
1002
                True
1003
            )
1004
1005
            data *= price_adj_factor
1006
1007
            # if anything is zero, it's a missing bar, so replace it with NaN.
1008
            # we only want to do this for non-volume fields, because a missing
1009
            # volume should be 0.
1010
            data[data == 0] = np.NaN
1011
1012
        np.around(data, 3, out=data)
1013
1014
    def _get_daily_window_for_sid(self, asset, field, days_in_window,
1015
                                  extra_slot=True):
1016
        """
1017
        Internal method that gets a window of adjusted daily data for a sid
1018
        and specified date range.  Used to support the history API method for
1019
        daily bars.
1020
1021
        Parameters
1022
        ----------
1023
        asset : Asset
1024
            The asset whose data is desired.
1025
1026
        start_dt: pandas.Timestamp
1027
            The start of the desired window of data.
1028
1029
        bar_count: int
1030
            The number of days of data to return.
1031
1032
        field: string
1033
            The specific field to return.  "open", "high", "close_price", etc.
1034
1035
        extra_slot: boolean
1036
            Whether to allocate an extra slot in the returned numpy array.
1037
            This extra slot will hold the data for the last partial day.  It's
1038
            much better to create it here than to create a copy of the array
1039
            later just to add a slot.
1040
1041
        Returns
1042
        -------
1043
        A numpy array with requested values.  Any missing slots filled with
1044
        nan.
1045
1046
        """
1047
        bar_count = len(days_in_window)
1048
        # create an np.array of size bar_count
1049
        if extra_slot:
1050
            return_array = np.zeros((bar_count + 1,))
1051
        else:
1052
            return_array = np.zeros((bar_count,))
1053
1054
        return_array[:] = np.NAN
1055
1056
        start_date = self._get_asset_start_date(asset)
1057
        end_date = self._get_asset_end_date(asset)
1058
        day_slice = days_in_window.slice_indexer(start_date, end_date)
1059
        active_days = days_in_window[day_slice]
1060
1061
        if active_days.shape[0]:
1062
            data = self._equity_daily_reader.history_window(field,
1063
                                                            active_days[0],
1064
                                                            active_days[-1],
1065
                                                            asset)
1066
            return_array[day_slice] = data
1067
            self._apply_all_adjustments(
1068
                return_array,
1069
                asset,
1070
                active_days,
1071
                field,
1072
            )
1073
1074
        return return_array
1075
1076
    @staticmethod
1077
    def _apply_adjustments_to_window(adjustments_list, window_data,
1078
                                     dts_in_window, multiply):
1079
        if len(adjustments_list) == 0:
1080
            return
1081
1082
        # advance idx to the correct spot in the adjustments list, based on
1083
        # when the window starts
1084
        idx = 0
1085
1086
        while idx < len(adjustments_list) and dts_in_window[0] >\
1087
                adjustments_list[idx][0]:
1088
            idx += 1
1089
1090
        # if we've advanced through all the adjustments, then there's nothing
1091
        # to do.
1092
        if idx == len(adjustments_list):
1093
            return
1094
1095
        while idx < len(adjustments_list):
1096
            adjustment_to_apply = adjustments_list[idx]
1097
1098
            if adjustment_to_apply[0] > dts_in_window[-1]:
1099
                break
1100
1101
            range_end = dts_in_window.searchsorted(adjustment_to_apply[0])
1102
            if multiply:
1103
                window_data[0:range_end] *= adjustment_to_apply[1]
1104
            else:
1105
                window_data[0:range_end] /= adjustment_to_apply[1]
1106
1107
            idx += 1
1108
1109
    def _get_adjustment_list(self, asset, adjustments_dict, table_name):
1110
        """
1111
        Internal method that returns a list of adjustments for the given sid.
1112
1113
        Parameters
1114
        ----------
1115
        asset : Asset
1116
            The asset for which to return adjustments.
1117
1118
        adjustments_dict: dict
1119
            A dictionary of sid -> list that is used as a cache.
1120
1121
        table_name: string
1122
            The table that contains this data in the adjustments db.
1123
1124
        Returns
1125
        -------
1126
        adjustments: list
1127
            A list of [multiplier, pd.Timestamp], earliest first
1128
1129
        """
1130
        if self._adjustment_reader is None:
1131
            return []
1132
1133
        sid = int(asset)
1134
1135
        try:
1136
            adjustments = adjustments_dict[sid]
1137
        except KeyError:
1138
            adjustments = adjustments_dict[sid] = self._adjustment_reader.\
1139
                get_adjustments_for_sid(table_name, sid)
1140
1141
        return adjustments
1142
1143
    def _check_is_currently_alive(self, asset, dt):
1144
        sid = int(asset)
1145
1146
        if sid not in self._asset_start_dates:
1147
            self._get_asset_start_date(asset)
1148
1149
        start_date = self._asset_start_dates[sid]
1150
        if self._asset_start_dates[sid] > dt:
1151
            raise NoTradeDataAvailableTooEarly(
1152
                sid=sid,
1153
                dt=normalize_date(dt),
1154
                start_dt=start_date
1155
            )
1156
1157
        end_date = self._asset_end_dates[sid]
1158
        if self._asset_end_dates[sid] < dt:
1159
            raise NoTradeDataAvailableTooLate(
1160
                sid=sid,
1161
                dt=normalize_date(dt),
1162
                end_dt=end_date
1163
            )
1164
1165
    def _get_asset_start_date(self, asset):
1166
        self._ensure_asset_dates(asset)
1167
        return self._asset_start_dates[asset]
1168
1169
    def _get_asset_end_date(self, asset):
1170
        self._ensure_asset_dates(asset)
1171
        return self._asset_end_dates[asset]
1172
1173
    def _ensure_asset_dates(self, asset):
1174
        sid = int(asset)
1175
1176
        if sid not in self._asset_start_dates:
1177
            self._asset_start_dates[sid] = asset.start_date
1178
            self._asset_end_dates[sid] = asset.end_date
1179
1180
    def get_splits(self, sids, dt):
1181
        """
1182
        Returns any splits for the given sids and the given dt.
1183
1184
        Parameters
1185
        ----------
1186
        sids : list
1187
            Sids for which we want splits.
1188
1189
        dt: pd.Timestamp
1190
            The date for which we are checking for splits.  Note: this is
1191
            expected to be midnight UTC.
1192
1193
        Returns
1194
        -------
1195
        list: List of splits, where each split is a (sid, ratio) tuple.
1196
        """
1197
        if self._adjustment_reader is None or len(sids) == 0:
1198
            return {}
1199
1200
        # convert dt to # of seconds since epoch, because that's what we use
1201
        # in the adjustments db
1202
        seconds = int(dt.value / 1e9)
1203
1204
        splits = self._adjustment_reader.conn.execute(
1205
            "SELECT sid, ratio FROM SPLITS WHERE effective_date = ?",
1206
            (seconds,)).fetchall()
1207
1208
        sids_set = set(sids)
1209
        splits = [split for split in splits if split[0] in sids_set]
1210
1211
        return splits
1212
1213
    def get_stock_dividends(self, sid, trading_days):
1214
        """
1215
        Returns all the stock dividends for a specific sid that occur
1216
        in the given trading range.
1217
1218
        Parameters
1219
        ----------
1220
        sid: int
1221
            The asset whose stock dividends should be returned.
1222
1223
        trading_days: pd.DatetimeIndex
1224
            The trading range.
1225
1226
        Returns
1227
        -------
1228
        list: A list of objects with all relevant attributes populated.
1229
        All timestamp fields are converted to pd.Timestamps.
1230
        """
1231
1232
        if self._adjustment_reader is None:
1233
            return []
1234
1235
        if len(trading_days) == 0:
1236
            return []
1237
1238
        start_dt = trading_days[0].value / 1e9
1239
        end_dt = trading_days[-1].value / 1e9
1240
1241
        dividends = self._adjustment_reader.conn.execute(
1242
            "SELECT * FROM stock_dividend_payouts WHERE sid = ? AND "
1243
            "ex_date > ? AND pay_date < ?", (int(sid), start_dt, end_dt,)).\
1244
            fetchall()
1245
1246
        dividend_info = []
1247
        for dividend_tuple in dividends:
1248
            dividend_info.append({
1249
                "declared_date": dividend_tuple[1],
1250
                "ex_date": pd.Timestamp(dividend_tuple[2], unit="s"),
1251
                "pay_date": pd.Timestamp(dividend_tuple[3], unit="s"),
1252
                "payment_sid": dividend_tuple[4],
1253
                "ratio": dividend_tuple[5],
1254
                "record_date": pd.Timestamp(dividend_tuple[6], unit="s"),
1255
                "sid": dividend_tuple[7]
1256
            })
1257
1258
        return dividend_info
1259
1260
    def contains(self, asset, field):
1261
        return field in BASE_FIELDS or \
1262
            (field in self._augmented_sources_map and
1263
             asset in self._augmented_sources_map[field])
1264
1265
    def get_fetcher_assets(self, day):
1266
        """
1267
        Returns a list of assets for the current date, as defined by the
1268
        fetcher data.
1269
1270
        Notes
1271
        -----
1272
        Data is forward-filled.  If there is no fetcher data defined for day
1273
        N, we use day N-1's data (if available, otherwise we keep going back).
1274
1275
        Returns
1276
        -------
1277
        list: a list of Asset objects.
1278
        """
1279
        # return a list of assets for the current date, as defined by the
1280
        # fetcher source
1281
        if self._extra_source_df is None:
1282
            return []
1283
1284
        if day in self._extra_source_df.index:
1285
            date_to_use = day
1286
        else:
1287
            # current day isn't in the fetcher df, go back the last
1288
            # available day
1289
            idx = self._extra_source_df.index.searchsorted(day)
1290
            if idx == 0:
1291
                return []
1292
1293
            date_to_use = self._extra_source_df.index[idx - 1]
1294
1295
        asset_list = self._extra_source_df.loc[date_to_use]["sid"]
1296
1297
        # make sure they're actually assets
1298
        asset_list = [asset for asset in asset_list
1299
                      if isinstance(asset, Asset)]
1300
1301
        return asset_list
1302