Completed
Pull Request — master (#858)
by Eddie
01:32
created

zipline.data.DataPortal.get_history_window()   C

Complexity

Conditions 8

Size

Total Lines 51

Duplication

Lines 0
Ratio 0 %
Metric Value
cc 8
dl 0
loc 51
rs 5.2592

How to fix   Long Method   

Long Method

Small methods make your code easier to understand, in particular if combined with a good name. Besides, if your method is small, finding a good name is usually much easier.

For example, if you find yourself adding comments to a method's body, this is usually a good sign to extract the commented part to a new method, and use the comment as a starting point when coming up with a good name for this new method.

Commonly applied refactorings include:

1
#
2
# Copyright 2015 Quantopian, Inc.
3
#
4
# Licensed under the Apache License, Version 2.0 (the "License");
5
# you may not use this file except in compliance with the License.
6
# You may obtain a copy of the License at
7
#
8
#     http://www.apache.org/licenses/LICENSE-2.0
9
#
10
# Unless required by applicable law or agreed to in writing, software
11
# distributed under the License is distributed on an "AS IS" BASIS,
12
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13
# See the License for the specific language governing permissions and
14
# limitations under the License.
15
16
import bcolz
17
from logbook import Logger
18
19
import numpy as np
20
import pandas as pd
21
from pandas.tslib import normalize_date
22
from six import iteritems
23
24
from zipline.assets import Asset, Future, Equity
25
from zipline.data.us_equity_pricing import NoDataOnDate
26
from zipline.pipeline.data.equity_pricing import USEquityPricing
27
28
from zipline.utils import tradingcalendar
29
from zipline.errors import (
30
    NoTradeDataAvailableTooEarly,
31
    NoTradeDataAvailableTooLate
32
)
33
34
log = Logger('DataPortal')
35
36
HISTORY_FREQUENCIES = ["1d", "1m"]
37
38
BASE_FIELDS = {
39
    'open': 'open',
40
    'open_price': 'open',
41
    'high': 'high',
42
    'low': 'low',
43
    'close': 'close',
44
    'close_price': 'close',
45
    'volume': 'volume',
46
    'price': 'close'
47
}
48
49
50
US_EQUITY_COLUMNS = {
51
    'open': USEquityPricing.open,
52
    'open_price': USEquityPricing.open,
53
    'high': USEquityPricing.high,
54
    'low': USEquityPricing.low,
55
    'close': USEquityPricing.close,
56
    'close_price': USEquityPricing.close,
57
    'volume': USEquityPricing.volume,
58
    'price': USEquityPricing.close,
59
}
60
61
62
class DataPortal(object):
63
    def __init__(self,
64
                 env,
65
                 equity_daily_reader=None,
66
                 equity_minute_reader=None,
67
                 future_daily_reader=None,
68
                 future_minute_reader=None,
69
                 adjustment_reader=None):
70
        self.env = env
71
72
        # This is a bit ugly, but is here for performance reasons.  In minute
73
        # simulations, we need to very quickly go from dt -> (# of minutes
74
        # since Jan 1 2002 9:30 Eastern).
75
        #
76
        # The clock that heartbeats the simulation has all the necessary
77
        # information to do this calculation very quickly.  This value is
78
        # calculated there, and then set here
79
        self.cur_data_offset = 0
80
81
        self.views = {}
82
83
        self._asset_finder = env.asset_finder
84
85
        self._carrays = {
86
            'open': {},
87
            'high': {},
88
            'low': {},
89
            'close': {},
90
            'volume': {},
91
            'sid': {},
92
            'dt': {},
93
        }
94
95
        self._adjustment_reader = adjustment_reader
96
97
        # caches of sid -> adjustment list
98
        self._splits_dict = {}
99
        self._mergers_dict = {}
100
        self._dividends_dict = {}
101
102
        # Cache of sid -> the first trading day of an asset, even if that day
103
        # is before 1/2/2002.
104
        self._asset_start_dates = {}
105
        self._asset_end_dates = {}
106
107
        # Handle extra sources, like Fetcher.
108
        self._augmented_sources_map = {}
109
        self._extra_source_df = None
110
111
        self.MINUTE_PRICE_ADJUSTMENT_FACTOR = 0.001
112
113
        self._equity_daily_reader = equity_daily_reader
114
        self._equity_minute_reader = equity_minute_reader
115
        self._future_daily_reader = future_daily_reader
116
        self._future_minute_reader = future_minute_reader
117
118
        # The following values are used by _minute_offset to calculate the
119
        # index into the minute bcolz date.
120
121
        # A lookup of table every minute to the corresponding day, to avoid
122
        # calling `.date()` on every lookup.
123
        self._minutes_to_day = {}
124
        # A map of days (keyed by midnight) to a DatetimeIndex of market
125
        # minutes for that day.
126
        self._minutes_by_day = {}
127
        # A dict of day to the offset into the minute bcolz on which that
128
        # days data starts.
129
        self._day_offsets = None
130
131
    def handle_extra_source(self, source_df, sim_params):
132
        """
133
        Extra sources always have a sid column.
134
135
        We expand the given data (by forward filling) to the full range of
136
        the simulation dates, so that lookup is fast during simulation.
137
        """
138
        if source_df is None:
139
            return
140
141
        self._extra_source_df = source_df
142
143
        # source_df's sid column can either consist of assets we know about
144
        # (such as sid(24)) or of assets we don't know about (such as
145
        # palladium).
146
        #
147
        # In both cases, we break up the dataframe into individual dfs
148
        # that only contain a single asset's information.  ie, if source_df
149
        # has data for PALLADIUM and GOLD, we split source_df into two
150
        # dataframes, one for each. (same applies if source_df has data for
151
        # AAPL and IBM).
152
        #
153
        # We then take each child df and reindex it to the simulation's date
154
        # range by forward-filling missing values. this makes reads simpler.
155
        #
156
        # Finally, we store the data. For each column, we store a mapping in
157
        # self.augmented_sources_map from the column to a dictionary of
158
        # asset -> df.  In other words,
159
        # self.augmented_sources_map['days_to_cover']['AAPL'] gives us the df
160
        # holding that data.
161
162
        if sim_params.emission_rate == "daily":
163
            source_date_index = self.env.days_in_range(
164
                start=sim_params.period_start,
165
                end=sim_params.period_end
166
            )
167
        else:
168
            source_date_index = self.env.minutes_for_days_in_range(
169
                start=sim_params.period_start,
170
                end=sim_params.period_end
171
            )
172
173
        # Break the source_df up into one dataframe per sid.  This lets
174
        # us (more easily) calculate accurate start/end dates for each sid,
175
        # de-dup data, and expand the data to fit the backtest start/end date.
176
        grouped_by_sid = source_df.groupby(["sid"])
177
        group_names = grouped_by_sid.groups.keys()
178
        group_dict = {}
179
        for group_name in group_names:
180
            group_dict[group_name] = grouped_by_sid.get_group(group_name)
181
182
        for identifier, df in iteritems(group_dict):
183
            # Before reindexing, save the earliest and latest dates
184
            earliest_date = df.index[0]
185
            latest_date = df.index[-1]
186
187
            # Since we know this df only contains a single sid, we can safely
188
            # de-dupe by the index (dt)
189
            df = df.groupby(level=0).last()
190
191
            # Reindex the dataframe based on the backtest start/end date.
192
            # This makes reads easier during the backtest.
193
            df = df.reindex(index=source_date_index, method='ffill')
194
195
            if not isinstance(identifier, Asset):
196
                # for fake assets we need to store a start/end date
197
                self._asset_start_dates[identifier] = earliest_date
198
                self._asset_end_dates[identifier] = latest_date
199
200
            for col_name in df.columns.difference(['sid']):
201
                if col_name not in self._augmented_sources_map:
202
                    self._augmented_sources_map[col_name] = {}
203
204
                self._augmented_sources_map[col_name][identifier] = df
205
206
    def _open_minute_file(self, field, asset):
207
        sid_str = str(int(asset))
208
209
        try:
210
            carray = self._carrays[field][sid_str]
211
        except KeyError:
212
            carray = self._carrays[field][sid_str] = \
213
                self._get_ctable(asset)[field]
214
215
        return carray
216
217
    def _get_ctable(self, asset):
218
        sid = int(asset)
219
220
        if isinstance(asset, Future):
221
            if self._future_minute_reader.sid_path_func is not None:
222
                path = self._future_minute_reader.sid_path_func(
223
                    self._future_minute_reader.rootdir, sid
224
                )
225
            else:
226
                path = "{0}/{1}.bcolz".format(
227
                    self._future_minute_reader.rootdir, sid)
228
        elif isinstance(asset, Equity):
229
            if self._equity_minute_reader.sid_path_func is not None:
230
                path = self._equity_minute_reader.sid_path_func(
231
                    self._equity_minute_reader.rootdir, sid
232
                )
233
            else:
234
                path = "{0}/{1}.bcolz".format(
235
                    self._equity_minute_reader.rootdir, sid)
236
237
        else:
238
            # TODO: Figure out if assets should be allowed if neither, and
239
            # why this code path is being hit.
240
            if self._equity_minute_reader.sid_path_func is not None:
241
                path = self._equity_minute_reader.sid_path_func(
242
                    self._equity_minute_reader.rootdir, sid
243
                )
244
            else:
245
                path = "{0}/{1}.bcolz".format(
246
                    self._equity_minute_reader.rootdir, sid)
247
248
        return bcolz.open(path, mode='r')
249
250
    def get_previous_value(self, asset, field, dt, data_frequency):
251
        """
252
        Given an asset and a column and a dt, returns the previous value for
253
        the same asset/column pair.  If this data portal is in minute mode,
254
        it's the previous minute value, otherwise it's the previous day's
255
        value.
256
257
        Parameters
258
        ---------
259
        asset : Asset
260
            The asset whose data is desired.
261
262
        field: string
263
            The desired field of the asset.  Valid values are "open",
264
            "open_price", "high", "low", "close", "close_price", "volume", and
265
            "price".
266
267
        dt: pd.Timestamp
268
            The timestamp from which to go back in time one slot.
269
270
        data_frequency: string
271
            The frequency of the data to query; i.e. whether the data is
272
            'daily' or 'minute' bars
273
274
        Returns
275
        -------
276
        The value of the desired field at the desired time.
277
        """
278
        if data_frequency == 'daily':
279
            prev_dt = self.env.previous_trading_day(dt)
280
        elif data_frequency == 'minute':
281
            prev_dt = self.env.previous_market_minute(dt)
282
283
        return self.get_spot_value(asset, field, prev_dt, data_frequency)
284
285
    def _check_extra_sources(self, asset, column, day):
286
        # If we have an extra source with a column called "price", only look
287
        # at it if it's on something like palladium and not AAPL (since our
288
        # own price data always wins when dealing with assets).
289
        look_in_augmented_sources = column in self._augmented_sources_map and \
290
            not (column in BASE_FIELDS and isinstance(asset, Asset))
291
292
        if look_in_augmented_sources:
293
            # we're being asked for a field in an extra source
294
            try:
295
                return self._augmented_sources_map[column][asset].\
296
                    loc[day, column]
297
            except:
298
                log.error(
299
                    "Could not find value for asset={0}, day={1},"
300
                    "column={2}".format(
301
                        str(asset),
302
                        str(day),
303
                        str(column)))
304
305
                raise KeyError
306
307
    def get_spot_value(self, asset, field, dt, data_frequency):
308
        """
309
        Public API method that returns a scalar value representing the value
310
        of the desired asset's field at either the given dt.
311
312
        Parameters
313
        ---------
314
        asset : Asset
315
            The asset whose data is desired.gith
316
317
        field: string
318
            The desired field of the asset.  Valid values are "open",
319
            "open_price", "high", "low", "close", "close_price", "volume", and
320
            "price".
321
322
        dt: pd.Timestamp
323
            The timestamp for the desired value.
324
325
        data_frequency: string
326
            The frequency of the data to query; i.e. whether the data is
327
            'daily' or 'minute' bars
328
329
        Returns
330
        -------
331
        The value of the desired field at the desired time.
332
        """
333
        extra_source_val = self._check_extra_sources(
334
            asset,
335
            field,
336
            dt,
337
        )
338
339
        if extra_source_val is not None:
340
            return extra_source_val
341
342
        if field not in BASE_FIELDS:
343
            raise KeyError("Invalid column: " + str(field))
344
345
        column_to_use = BASE_FIELDS[field]
346
347
        if isinstance(asset, int):
348
            asset = self._asset_finder.retrieve_asset(asset)
349
350
        self._check_is_currently_alive(asset, dt)
351
352
        if data_frequency == "daily":
353
            day_to_use = dt
354
            day_to_use = normalize_date(day_to_use)
355
            return self._get_daily_data(asset, column_to_use, day_to_use)
356
        else:
357
            if isinstance(asset, Future):
358
                return self._get_minute_spot_value_future(
359
                    asset, column_to_use, dt)
360
            else:
361
                return self._get_minute_spot_value(
362
                    asset, column_to_use, dt)
363
364
    def _get_minute_spot_value_future(self, asset, column, dt):
365
        # Futures bcolz files have 1440 bars per day (24 hours), 7 days a week.
366
        # The file attributes contain the "start_dt" and "last_dt" fields,
367
        # which represent the time period for this bcolz file.
368
369
        # The start_dt is midnight of the first day that this future started
370
        # trading.
371
372
        # figure out the # of minutes between dt and this asset's start_dt
373
        start_date = self._get_asset_start_date(asset)
374
        minute_offset = int((dt - start_date).total_seconds() / 60)
375
376
        if minute_offset < 0:
377
            # asking for a date that is before the asset's start date, no dice
378
            return 0.0
379
380
        # then just index into the bcolz carray at that offset
381
        carray = self._open_minute_file(column, asset)
382
        result = carray[minute_offset]
383
384
        # if there's missing data, go backwards until we run out of file
385
        while result == 0 and minute_offset > 0:
386
            minute_offset -= 1
387
            result = carray[minute_offset]
388
389
        if column != 'volume':
390
            return result * self.MINUTE_PRICE_ADJUSTMENT_FACTOR
391
        else:
392
            return result
393
394
    def setup_offset_cache(self, minutes_by_day, minutes_to_day, trading_days):
395
        # TODO: This case should not be hit, but is when tests are setup
396
        # with data_frequency of daily, but run with minutely.
397
        if self._equity_minute_reader is None:
398
            return
399
400
        self._minutes_to_day = minutes_to_day
401
        self._minutes_by_day = minutes_by_day
402
        start = trading_days[0]
403
        first_trading_day_idx = self._equity_minute_reader.trading_days.\
404
            searchsorted(start)
405
        self._day_offsets = {
406
            day: (i + first_trading_day_idx) * 390
407
            for i, day in enumerate(trading_days)}
408
409
    def _minute_offset(self, dt):
410
        if self._day_offsets is not None:
411
            try:
412
                day = self._minutes_to_day[dt]
413
                minutes = self._minutes_by_day[day]
414
                return self._day_offsets[day] + minutes.get_loc(dt)
415
            except KeyError:
416
                return None
417
418
    def _get_minute_spot_value(self, asset, column, dt):
419
        # if dt is before the first market minute, minute_index
420
        # will be 0.  if it's after the last market minute, it'll
421
        # be len(minutes_for_day)
422
        minute_offset_to_use = self._minute_offset(dt)
423
424
        if minute_offset_to_use is None:
425
            given_day = pd.Timestamp(dt.date(), tz='utc')
426
            day_index = self._equity_minute_reader.trading_days.searchsorted(
427
                given_day)
428
429
            # if dt is before the first market minute, minute_index
430
            # will be 0.  if it's after the last market minute, it'll
431
            # be len(minutes_for_day)
432
            minute_index = self.env.market_minutes_for_day(given_day).\
433
                searchsorted(dt)
434
435
            minute_offset_to_use = (day_index * 390) + minute_index
436
437
        carray = self._equity_minute_reader._open_minute_file(column, asset)
438
        result = carray[minute_offset_to_use]
439
440
        if result == 0:
441
            # if the given minute doesn't have data, we need to seek
442
            # backwards until we find data. This makes the data
443
            # forward-filled.
444
445
            # get this asset's start date, so that we don't look before it.
446
            start_date = self._get_asset_start_date(asset)
447
            start_date_idx = self._equity_minute_reader.trading_days.\
448
                searchsorted(start_date)
449
            start_day_offset = start_date_idx * 390
450
451
            original_start = minute_offset_to_use
452
453
            while result == 0 and minute_offset_to_use > start_day_offset:
454
                minute_offset_to_use -= 1
455
                result = carray[minute_offset_to_use]
456
457
            # once we've found data, we need to check whether it needs
458
            # to be adjusted.
459
            if result != 0:
460
                minutes = self.env.market_minute_window(
461
                    start=dt,
462
                    count=(original_start - minute_offset_to_use + 1),
463
                    step=-1
464
                ).order()
465
466
                # only need to check for adjustments if we've gone back
467
                # far enough to cross the day boundary.
468
                if minutes[0].date() != minutes[-1].date():
469
                    # create a np array of size minutes, fill it all with
470
                    # the same value.  and adjust the array.
471
                    arr = np.array([result] * len(minutes),
472
                                   dtype=np.float64)
473
                    self._apply_all_adjustments(
474
                        data=arr,
475
                        asset=asset,
476
                        dts=minutes,
477
                        field=column
478
                    )
479
480
                    # The first value of the adjusted array is the value
481
                    # we want.
482
                    result = arr[0]
483
484
        if column != 'volume':
485
            return result * self.MINUTE_PRICE_ADJUSTMENT_FACTOR
486
        else:
487
            return result
488
489
    def _get_daily_data(self, asset, column, dt):
490
        while True:
491
            try:
492
                value = self._equity_daily_reader.spot_price(asset, dt, column)
493
                if value != -1:
494
                    return value
495
                else:
496
                    dt -= tradingcalendar.trading_day
497
            except NoDataOnDate:
498
                return 0
499
500
    def _get_history_daily_window(self, assets, end_dt, bar_count,
501
                                  field_to_use):
502
        """
503
        Internal method that returns a dataframe containing history bars
504
        of daily frequency for the given sids.
505
        """
506
        day_idx = tradingcalendar.trading_days.searchsorted(end_dt.date())
507
        days_for_window = tradingcalendar.trading_days[
508
            (day_idx - bar_count + 1):(day_idx + 1)]
509
510
        if len(assets) == 0:
511
            return pd.DataFrame(None,
512
                                index=days_for_window,
513
                                columns=None)
514
515
        data = []
516
517
        for asset in assets:
518
            if isinstance(asset, Future):
519
                data.append(self._get_history_daily_window_future(
520
                    asset, days_for_window, end_dt, field_to_use
521
                ))
522
            else:
523
                data.append(self._get_history_daily_window_equity(
524
                    asset, days_for_window, end_dt, field_to_use
525
                ))
526
527
        return pd.DataFrame(
528
            np.array(data).T,
529
            index=days_for_window,
530
            columns=assets
531
        )
532
533
    def _get_history_daily_window_future(self, asset, days_for_window,
534
                                         end_dt, column):
535
        # Since we don't have daily bcolz files for futures (yet), use minute
536
        # bars to calculate the daily values.
537
        data = []
538
        data_groups = []
539
540
        # get all the minutes for the days NOT including today
541
        for day in days_for_window[:-1]:
542
            minutes = self.env.market_minutes_for_day(day)
543
544
            values_for_day = np.zeros(len(minutes), dtype=np.float64)
545
546
            for idx, minute in enumerate(minutes):
547
                minute_val = self._get_minute_spot_value_future(
548
                    asset, column, minute
549
                )
550
551
                values_for_day[idx] = minute_val
552
553
            data_groups.append(values_for_day)
554
555
        # get the minutes for today
556
        last_day_minutes = pd.date_range(
557
            start=self.env.get_open_and_close(end_dt)[0],
558
            end=end_dt,
559
            freq="T"
560
        )
561
562
        values_for_last_day = np.zeros(len(last_day_minutes), dtype=np.float64)
563
564
        for idx, minute in enumerate(last_day_minutes):
565
            minute_val = self._get_minute_spot_value_future(
566
                asset, column, minute
567
            )
568
569
            values_for_last_day[idx] = minute_val
570
571
        data_groups.append(values_for_last_day)
572
573
        for group in data_groups:
574
            if len(group) == 0:
575
                continue
576
577
            if column == 'volume':
578
                data.append(np.sum(group))
579
            elif column == 'open':
580
                data.append(group[0])
581
            elif column == 'close':
582
                data.append(group[-1])
583
            elif column == 'high':
584
                data.append(np.amax(group))
585
            elif column == 'low':
586
                data.append(np.amin(group))
587
588
        return data
589
590
    def _get_history_daily_window_equity(self, asset, days_for_window,
591
                                         end_dt, field_to_use):
592
        sid = int(asset)
593
        ends_at_midnight = end_dt.hour == 0 and end_dt.minute == 0
594
595
        # get the start and end dates for this sid
596
        end_date = self._get_asset_end_date(asset)
597
598
        if ends_at_midnight or (days_for_window[-1] > end_date):
599
            # two cases where we use daily data for the whole range:
600
            # 1) the history window ends at midnight utc.
601
            # 2) the last desired day of the window is after the
602
            # last trading day, use daily data for the whole range.
603
            return self._get_daily_window_for_sid(
604
                asset,
605
                field_to_use,
606
                days_for_window,
607
                extra_slot=False
608
            )
609
        else:
610
            # for the last day of the desired window, use minute
611
            # data and aggregate it.
612
            all_minutes_for_day = self.env.market_minutes_for_day(
613
                pd.Timestamp(end_dt.date()))
614
615
            last_minute_idx = all_minutes_for_day.searchsorted(end_dt)
616
617
            # these are the minutes for the partial day
618
            minutes_for_partial_day =\
619
                all_minutes_for_day[0:(last_minute_idx + 1)]
620
621
            daily_data = self._get_daily_window_for_sid(
622
                sid,
623
                field_to_use,
624
                days_for_window[0:-1]
625
            )
626
627
            minute_data = self._get_minute_window_for_equity(
628
                sid,
629
                field_to_use,
630
                minutes_for_partial_day
631
            )
632
633
            if field_to_use == 'volume':
634
                minute_value = np.sum(minute_data)
635
            elif field_to_use == 'open':
636
                minute_value = minute_data[0]
637
            elif field_to_use == 'close':
638
                minute_value = minute_data[-1]
639
            elif field_to_use == 'high':
640
                minute_value = np.amax(minute_data)
641
            elif field_to_use == 'low':
642
                minute_value = np.amin(minute_data)
643
644
            # append the partial day.
645
            daily_data[-1] = minute_value
646
647
            return daily_data
648
649
    def _get_history_minute_window(self, assets, end_dt, bar_count,
650
                                   field_to_use):
651
        """
652
        Internal method that returns a dataframe containing history bars
653
        of minute frequency for the given sids.
654
        """
655
        # get all the minutes for this window
656
        minutes_for_window = self.env.market_minute_window(
657
            end_dt, bar_count, step=-1)[::-1]
658
659
        first_trading_day = self._equity_minute_reader.first_trading_day
660
661
        # but then cut it down to only the minutes after
662
        # the first trading day.
663
        modified_minutes_for_window = minutes_for_window[
664
            minutes_for_window.slice_indexer(first_trading_day)]
665
666
        modified_minutes_length = len(modified_minutes_for_window)
667
668
        if modified_minutes_length == 0:
669
            raise ValueError("Cannot calculate history window that ends"
670
                             "before 2002-01-02 14:31 UTC!")
671
672
        data = []
673
        bars_to_prepend = 0
674
        nans_to_prepend = None
675
676
        if modified_minutes_length < bar_count:
677
            first_trading_date = first_trading_day.date()
678
            if modified_minutes_for_window[0].date() == first_trading_date:
679
                # the beginning of the window goes before our global trading
680
                # start date
681
                bars_to_prepend = bar_count - modified_minutes_length
682
                nans_to_prepend = np.repeat(np.nan, bars_to_prepend)
683
684
        if len(assets) == 0:
685
            return pd.DataFrame(
686
                None,
687
                index=modified_minutes_for_window,
688
                columns=None
689
            )
690
691
        for asset in assets:
692
            asset_minute_data = self._get_minute_window_for_asset(
693
                asset,
694
                field_to_use,
695
                modified_minutes_for_window
696
            )
697
698
            if bars_to_prepend != 0:
699
                asset_minute_data = np.insert(asset_minute_data, 0,
700
                                              nans_to_prepend)
701
702
            data.append(asset_minute_data)
703
704
        return pd.DataFrame(
705
            np.array(data).T,
706
            index=minutes_for_window,
707
            columns=map(int, assets)
708
        )
709
710
    def get_history_window(self, assets, end_dt, bar_count, frequency, field,
711
                           ffill=True):
712
        """
713
        Public API method that returns a dataframe containing the requested
714
        history window.  Data is fully adjusted.
715
716
        Parameters
717
        ---------
718
        assets : list of zipline.data.Asset objects
719
            The assets whose data is desired.
720
721
        bar_count: int
722
            The number of bars desired.
723
724
        frequency: string
725
            "1d" or "1m"
726
727
        field: string
728
            The desired field of the asset.
729
730
        ffill: boolean
731
            Forward-fill missing values. Only has effect if field
732
            is 'price'.
733
734
        Returns
735
        -------
736
        A dataframe containing the requested data.
737
        """
738
        try:
739
            field_to_use = BASE_FIELDS[field]
740
        except KeyError:
741
            raise ValueError("Invalid history field: " + str(field))
742
743
        # sanity check in case sids were passed in
744
        assets = [(self.env.asset_finder.retrieve_asset(asset) if
745
                   isinstance(asset, int) else asset) for asset in assets]
746
747
        if frequency == "1d":
748
            df = self._get_history_daily_window(assets, end_dt, bar_count,
749
                                                field_to_use)
750
        elif frequency == "1m":
751
            df = self._get_history_minute_window(assets, end_dt, bar_count,
752
                                                 field_to_use)
753
        else:
754
            raise ValueError("Invalid frequency: {0}".format(frequency))
755
756
        # forward-fill if needed
757
        if field == "price" and ffill:
758
            df.fillna(method='ffill', inplace=True)
759
760
        return df
761
762
    def _get_minute_window_for_asset(self, asset, field, minutes_for_window):
763
        """
764
        Internal method that gets a window of adjusted minute data for an asset
765
        and specified date range.  Used to support the history API method for
766
        minute bars.
767
768
        Missing bars are filled with NaN.
769
770
        Parameters
771
        ----------
772
        asset : Asset
773
            The asset whose data is desired.
774
775
        field: string
776
            The specific field to return.  "open", "high", "close_price", etc.
777
778
        minutes_for_window: pd.DateTimeIndex
779
            The list of minutes representing the desired window.  Each minute
780
            is a pd.Timestamp.
781
782
        Returns
783
        -------
784
        A numpy array with requested values.
785
        """
786
        if isinstance(asset, int):
787
            asset = self.env.asset_finder.retrieve_asset(asset)
788
789
        if isinstance(asset, Future):
790
            return self._get_minute_window_for_future(asset, field,
791
                                                      minutes_for_window)
792
        else:
793
            return self._get_minute_window_for_equity(asset, field,
794
                                                      minutes_for_window)
795
796
    def _get_minute_window_for_future(self, asset, field, minutes_for_window):
797
        # THIS IS TEMPORARY.  For now, we are only exposing futures within
798
        # equity trading hours (9:30 am to 4pm, Eastern).  The easiest way to
799
        # do this is to simply do a spot lookup for each desired minute.
800
        return_data = np.zeros(len(minutes_for_window), dtype=np.float64)
801
        for idx, minute in enumerate(minutes_for_window):
802
            return_data[idx] = \
803
                self._get_minute_spot_value_future(asset, field, minute)
804
805
        # Note: an improvement could be to find the consecutive runs within
806
        # minutes_for_window, and use them to read the underlying ctable
807
        # more efficiently.
808
809
        # Once futures are on 24-hour clock, then we can just grab all the
810
        # requested minutes in one shot from the ctable.
811
812
        # no adjustments for futures, yay.
813
        return return_data
814
815
    def _get_minute_window_for_equity(self, asset, field, minutes_for_window):
816
        # each sid's minutes are stored in a bcolz file
817
        # the bcolz file has 390 bars per day, starting at 1/2/2002, regardless
818
        # of when the asset started trading and regardless of half days.
819
        # for a half day, the second half is filled with zeroes.
820
821
        # find the position of start_dt in the entire timeline, go back
822
        # bar_count bars, and that's the unadjusted data
823
        raw_data = self._equity_minute_reader._open_minute_file(field, asset)
824
825
        start_idx = max(
826
            self._equity_minute_reader._find_position_of_minute(
827
                minutes_for_window[0]),
828
            0)
829
        end_idx = self._equity_minute_reader._find_position_of_minute(
830
            minutes_for_window[-1]) + 1
831
832
        if end_idx == 0:
833
            # No data to return for minute window.
834
            return np.full(len(minutes_for_window), np.nan)
835
836
        return_data = np.zeros(len(minutes_for_window), dtype=np.float64)
837
838
        data_to_copy = raw_data[start_idx:end_idx]
839
840
        num_minutes = len(minutes_for_window)
841
842
        # data_to_copy contains all the zeros (from 1pm to 4pm of an early
843
        # close).  num_minutes is the number of actual trading minutes.  if
844
        # these two have different lengths, that means that we need to trim
845
        # away data due to early closes.
846
        if len(data_to_copy) != num_minutes:
847
            # get a copy of the minutes in Eastern time, since we depend on
848
            # an early close being at 1pm Eastern.
849
            eastern_minutes = minutes_for_window.tz_convert("US/Eastern")
850
851
            # accumulate a list of indices of the last minute of an early
852
            # close day.  For example, if data_to_copy starts at 12:55 pm, and
853
            # there are five minutes of real data before 180 zeroes, we would
854
            # put 5 into last_minute_idx_of_early_close_day, because the fifth
855
            # minute is the last "real" minute of the day.
856
            last_minute_idx_of_early_close_day = []
857
            for minute_idx, minute_dt in enumerate(eastern_minutes):
858
                if minute_idx == (num_minutes - 1):
859
                    break
860
861
                if minute_dt.hour == 13 and minute_dt.minute == 0:
862
                    next_minute = eastern_minutes[minute_idx + 1]
863
                    if next_minute.hour != 13:
864
                        # minute_dt is the last minute of an early close day
865
                        last_minute_idx_of_early_close_day.append(minute_idx)
866
867
            # spin through the list of early close markers, and use them to
868
            # chop off 180 minutes at a time from data_to_copy.
869
            for idx, early_close_minute_idx in \
870
                    enumerate(last_minute_idx_of_early_close_day):
871
                early_close_minute_idx -= (180 * idx)
872
                data_to_copy = np.delete(
873
                    data_to_copy,
874
                    range(
875
                        early_close_minute_idx + 1,
876
                        early_close_minute_idx + 181
877
                    )
878
                )
879
880
        return_data[0:len(data_to_copy)] = data_to_copy
881
882
        self._apply_all_adjustments(
883
            return_data,
884
            asset,
885
            minutes_for_window,
886
            field,
887
            self.MINUTE_PRICE_ADJUSTMENT_FACTOR
888
        )
889
890
        return return_data
891
892
    def _apply_all_adjustments(self, data, asset, dts, field,
893
                               price_adj_factor=1.0):
894
        """
895
        Internal method that applies all the necessary adjustments on the
896
        given data array.
897
898
        The adjustments are:
899
        - splits
900
        - if field != "volume":
901
            - mergers
902
            - dividends
903
            - * 0.001
904
            - any zero fields replaced with NaN
905
        - all values rounded to 3 digits after the decimal point.
906
907
        Parameters
908
        ----------
909
        data : np.array
910
            The data to be adjusted.
911
912
        asset: Asset
913
            The asset whose data is being adjusted.
914
915
        dts: pd.DateTimeIndex
916
            The list of minutes or days representing the desired window.
917
918
        field: string
919
            The field whose values are in the data array.
920
921
        price_adj_factor: float
922
            Factor with which to adjust OHLC values.
923
        Returns
924
        -------
925
        None.  The data array is modified in place.
926
        """
927
        self._apply_adjustments_to_window(
928
            self._get_adjustment_list(
929
                asset, self._splits_dict, "SPLITS"
930
            ),
931
            data,
932
            dts,
933
            field != 'volume'
934
        )
935
936
        if field != 'volume':
937
            self._apply_adjustments_to_window(
938
                self._get_adjustment_list(
939
                    asset, self._mergers_dict, "MERGERS"
940
                ),
941
                data,
942
                dts,
943
                True
944
            )
945
946
            self._apply_adjustments_to_window(
947
                self._get_adjustment_list(
948
                    asset, self._dividends_dict, "DIVIDENDS"
949
                ),
950
                data,
951
                dts,
952
                True
953
            )
954
955
            data *= price_adj_factor
956
957
            # if anything is zero, it's a missing bar, so replace it with NaN.
958
            # we only want to do this for non-volume fields, because a missing
959
            # volume should be 0.
960
            data[data == 0] = np.NaN
961
962
        np.around(data, 3, out=data)
963
964
    def _get_daily_window_for_sid(self, asset, field, days_in_window,
965
                                  extra_slot=True):
966
        """
967
        Internal method that gets a window of adjusted daily data for a sid
968
        and specified date range.  Used to support the history API method for
969
        daily bars.
970
971
        Parameters
972
        ----------
973
        asset : Asset
974
            The asset whose data is desired.
975
976
        start_dt: pandas.Timestamp
977
            The start of the desired window of data.
978
979
        bar_count: int
980
            The number of days of data to return.
981
982
        field: string
983
            The specific field to return.  "open", "high", "close_price", etc.
984
985
        extra_slot: boolean
986
            Whether to allocate an extra slot in the returned numpy array.
987
            This extra slot will hold the data for the last partial day.  It's
988
            much better to create it here than to create a copy of the array
989
            later just to add a slot.
990
991
        Returns
992
        -------
993
        A numpy array with requested values.  Any missing slots filled with
994
        nan.
995
996
        """
997
        bar_count = len(days_in_window)
998
        # create an np.array of size bar_count
999
        if extra_slot:
1000
            return_array = np.zeros((bar_count + 1,))
1001
        else:
1002
            return_array = np.zeros((bar_count,))
1003
1004
        return_array[:] = np.NAN
1005
1006
        start_date = self._get_asset_start_date(asset)
1007
        end_date = self._get_asset_end_date(asset)
1008
        day_slice = days_in_window.slice_indexer(start_date, end_date)
1009
        active_days = days_in_window[day_slice]
1010
1011
        if active_days.shape[0]:
1012
            data = self._equity_daily_reader.history_window(field,
1013
                                                            active_days[0],
1014
                                                            active_days[-1],
1015
                                                            asset)
1016
            return_array[day_slice] = data
1017
            self._apply_all_adjustments(
1018
                return_array,
1019
                asset,
1020
                active_days,
1021
                field,
1022
            )
1023
1024
        return return_array
1025
1026
    @staticmethod
1027
    def _apply_adjustments_to_window(adjustments_list, window_data,
1028
                                     dts_in_window, multiply):
1029
        if len(adjustments_list) == 0:
1030
            return
1031
1032
        # advance idx to the correct spot in the adjustments list, based on
1033
        # when the window starts
1034
        idx = 0
1035
1036
        while idx < len(adjustments_list) and dts_in_window[0] >\
1037
                adjustments_list[idx][0]:
1038
            idx += 1
1039
1040
        # if we've advanced through all the adjustments, then there's nothing
1041
        # to do.
1042
        if idx == len(adjustments_list):
1043
            return
1044
1045
        while idx < len(adjustments_list):
1046
            adjustment_to_apply = adjustments_list[idx]
1047
1048
            if adjustment_to_apply[0] > dts_in_window[-1]:
1049
                break
1050
1051
            range_end = dts_in_window.searchsorted(adjustment_to_apply[0])
1052
            if multiply:
1053
                window_data[0:range_end] *= adjustment_to_apply[1]
1054
            else:
1055
                window_data[0:range_end] /= adjustment_to_apply[1]
1056
1057
            idx += 1
1058
1059
    def _get_adjustment_list(self, asset, adjustments_dict, table_name):
1060
        """
1061
        Internal method that returns a list of adjustments for the given sid.
1062
1063
        Parameters
1064
        ----------
1065
        asset : Asset
1066
            The asset for which to return adjustments.
1067
1068
        adjustments_dict: dict
1069
            A dictionary of sid -> list that is used as a cache.
1070
1071
        table_name: string
1072
            The table that contains this data in the adjustments db.
1073
1074
        Returns
1075
        -------
1076
        adjustments: list
1077
            A list of [multiplier, pd.Timestamp], earliest first
1078
1079
        """
1080
        if self._adjustment_reader is None:
1081
            return []
1082
1083
        sid = int(asset)
1084
1085
        try:
1086
            adjustments = adjustments_dict[sid]
1087
        except KeyError:
1088
            adjustments = adjustments_dict[sid] = self._adjustment_reader.\
1089
                get_adjustments_for_sid(table_name, sid)
1090
1091
        return adjustments
1092
1093
    def _check_is_currently_alive(self, asset, dt):
1094
        sid = int(asset)
1095
1096
        if sid not in self._asset_start_dates:
1097
            self._get_asset_start_date(asset)
1098
1099
        start_date = self._asset_start_dates[sid]
1100
        if self._asset_start_dates[sid] > dt:
1101
            raise NoTradeDataAvailableTooEarly(
1102
                sid=sid,
1103
                dt=normalize_date(dt),
1104
                start_dt=start_date
1105
            )
1106
1107
        end_date = self._asset_end_dates[sid]
1108
        if self._asset_end_dates[sid] < dt:
1109
            raise NoTradeDataAvailableTooLate(
1110
                sid=sid,
1111
                dt=normalize_date(dt),
1112
                end_dt=end_date
1113
            )
1114
1115
    def _get_asset_start_date(self, asset):
1116
        self._ensure_asset_dates(asset)
1117
        return self._asset_start_dates[asset]
1118
1119
    def _get_asset_end_date(self, asset):
1120
        self._ensure_asset_dates(asset)
1121
        return self._asset_end_dates[asset]
1122
1123
    def _ensure_asset_dates(self, asset):
1124
        sid = int(asset)
1125
1126
        if sid not in self._asset_start_dates:
1127
            self._asset_start_dates[sid] = asset.start_date
1128
            self._asset_end_dates[sid] = asset.end_date
1129
1130
    def get_splits(self, sids, dt):
1131
        """
1132
        Returns any splits for the given sids and the given dt.
1133
1134
        Parameters
1135
        ----------
1136
        sids : list
1137
            Sids for which we want splits.
1138
1139
        dt: pd.Timestamp
1140
            The date for which we are checking for splits.  Note: this is
1141
            expected to be midnight UTC.
1142
1143
        Returns
1144
        -------
1145
        list: List of splits, where each split is a (sid, ratio) tuple.
1146
        """
1147
        if self._adjustment_reader is None or len(sids) == 0:
1148
            return {}
1149
1150
        # convert dt to # of seconds since epoch, because that's what we use
1151
        # in the adjustments db
1152
        seconds = int(dt.value / 1e9)
1153
1154
        splits = self._adjustment_reader.conn.execute(
1155
            "SELECT sid, ratio FROM SPLITS WHERE effective_date = ?",
1156
            (seconds,)).fetchall()
1157
1158
        sids_set = set(sids)
1159
        splits = [split for split in splits if split[0] in sids_set]
1160
1161
        return splits
1162
1163
    def get_stock_dividends(self, sid, trading_days):
1164
        """
1165
        Returns all the stock dividends for a specific sid that occur
1166
        in the given trading range.
1167
1168
        Parameters
1169
        ----------
1170
        sid: int
1171
            The asset whose stock dividends should be returned.
1172
1173
        trading_days: pd.DatetimeIndex
1174
            The trading range.
1175
1176
        Returns
1177
        -------
1178
        list: A list of objects with all relevant attributes populated.
1179
        All timestamp fields are converted to pd.Timestamps.
1180
        """
1181
1182
        if self._adjustment_reader is None:
1183
            return []
1184
1185
        if len(trading_days) == 0:
1186
            return []
1187
1188
        start_dt = trading_days[0].value / 1e9
1189
        end_dt = trading_days[-1].value / 1e9
1190
1191
        dividends = self._adjustment_reader.conn.execute(
1192
            "SELECT * FROM stock_dividend_payouts WHERE sid = ? AND "
1193
            "ex_date > ? AND pay_date < ?", (int(sid), start_dt, end_dt,)).\
1194
            fetchall()
1195
1196
        dividend_info = []
1197
        for dividend_tuple in dividends:
1198
            dividend_info.append({
1199
                "declared_date": dividend_tuple[1],
1200
                "ex_date": pd.Timestamp(dividend_tuple[2], unit="s"),
1201
                "pay_date": pd.Timestamp(dividend_tuple[3], unit="s"),
1202
                "payment_sid": dividend_tuple[4],
1203
                "ratio": dividend_tuple[5],
1204
                "record_date": pd.Timestamp(dividend_tuple[6], unit="s"),
1205
                "sid": dividend_tuple[7]
1206
            })
1207
1208
        return dividend_info
1209
1210
    def contains(self, asset, field):
1211
        return field in BASE_FIELDS or \
1212
            (field in self._augmented_sources_map and
1213
             asset in self._augmented_sources_map[field])
1214
1215
    def get_fetcher_assets(self, day):
1216
        """
1217
        Returns a list of assets for the current date, as defined by the
1218
        fetcher data.
1219
1220
        Notes
1221
        -----
1222
        Data is forward-filled.  If there is no fetcher data defined for day
1223
        N, we use day N-1's data (if available, otherwise we keep going back).
1224
1225
        Returns
1226
        -------
1227
        list: a list of Asset objects.
1228
        """
1229
        # return a list of assets for the current date, as defined by the
1230
        # fetcher source
1231
        if self._extra_source_df is None:
1232
            return []
1233
1234
        if day in self._extra_source_df.index:
1235
            date_to_use = day
1236
        else:
1237
            # current day isn't in the fetcher df, go back the last
1238
            # available day
1239
            idx = self._extra_source_df.index.searchsorted(day)
1240
            if idx == 0:
1241
                return []
1242
1243
            date_to_use = self._extra_source_df.index[idx - 1]
1244
1245
        asset_list = self._extra_source_df.loc[date_to_use]["sid"]
1246
1247
        # make sure they're actually assets
1248
        asset_list = [asset for asset in asset_list
1249
                      if isinstance(asset, Asset)]
1250
1251
        return asset_list
1252