Completed
Pull Request — master (#858)
by Eddie
01:58
created

zipline.data.DataPortal.get_spot_value()   B

Complexity

Conditions 6

Size

Total Lines 56

Duplication

Lines 0
Ratio 0 %
Metric Value
cc 6
dl 0
loc 56
rs 7.861

How to fix   Long Method   

Long Method

Small methods make your code easier to understand, in particular if combined with a good name. Besides, if your method is small, finding a good name is usually much easier.

For example, if you find yourself adding comments to a method's body, this is usually a good sign to extract the commented part to a new method, and use the comment as a starting point when coming up with a good name for this new method.

Commonly applied refactorings include:

1
#
2
# Copyright 2015 Quantopian, Inc.
3
#
4
# Licensed under the Apache License, Version 2.0 (the "License");
5
# you may not use this file except in compliance with the License.
6
# You may obtain a copy of the License at
7
#
8
#     http://www.apache.org/licenses/LICENSE-2.0
9
#
10
# Unless required by applicable law or agreed to in writing, software
11
# distributed under the License is distributed on an "AS IS" BASIS,
12
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13
# See the License for the specific language governing permissions and
14
# limitations under the License.
15
16
import bcolz
17
from logbook import Logger
18
19
import numpy as np
20
import pandas as pd
21
from pandas.tslib import normalize_date
22
from six import iteritems
23
24
from zipline.assets import Asset, Future, Equity
25
from zipline.data.us_equity_pricing import (
26
    BcolzDailyBarReader,
27
    NoDataOnDate
28
)
29
from zipline.pipeline.data.equity_pricing import USEquityPricing
30
31
from zipline.utils import tradingcalendar
32
from zipline.errors import (
33
    NoTradeDataAvailableTooEarly,
34
    NoTradeDataAvailableTooLate
35
)
36
37
log = Logger('DataPortal')
38
39
HISTORY_FREQUENCIES = ["1d", "1m"]
40
41
BASE_FIELDS = {
42
    'open': 'open',
43
    'open_price': 'open',
44
    'high': 'high',
45
    'low': 'low',
46
    'close': 'close',
47
    'close_price': 'close',
48
    'volume': 'volume',
49
    'price': 'close'
50
}
51
52
53
US_EQUITY_COLUMNS = {
54
    'open': USEquityPricing.open,
55
    'open_price': USEquityPricing.open,
56
    'high': USEquityPricing.high,
57
    'low': USEquityPricing.low,
58
    'close': USEquityPricing.close,
59
    'close_price': USEquityPricing.close,
60
    'volume': USEquityPricing.volume,
61
    'price': USEquityPricing.close,
62
}
63
64
65
class DataPortal(object):
66
    def __init__(self,
67
                 env,
68
                 sim_params=None,
69
                 equity_minute_reader=None,
70
                 minutes_futures_path=None,
71
                 daily_equities_path=None,
72
                 adjustment_reader=None,
73
                 futures_sid_path_func=None):
74
        self.env = env
75
76
        # This is a bit ugly, but is here for performance reasons.  In minute
77
        # simulations, we need to very quickly go from dt -> (# of minutes
78
        # since Jan 1 2002 9:30 Eastern).
79
        #
80
        # The clock that heartbeats the simulation has all the necessary
81
        # information to do this calculation very quickly.  This value is
82
        # calculated there, and then set here
83
        self.cur_data_offset = 0
84
85
        self.views = {}
86
87
        self._daily_equities_path = daily_equities_path
88
        self._minutes_futures_path = minutes_futures_path
89
90
        self._asset_finder = env.asset_finder
91
92
        self._carrays = {
93
            'open': {},
94
            'high': {},
95
            'low': {},
96
            'close': {},
97
            'volume': {},
98
            'sid': {},
99
            'dt': {},
100
        }
101
102
        self._adjustment_reader = adjustment_reader
103
104
        # caches of sid -> adjustment list
105
        self._splits_dict = {}
106
        self._mergers_dict = {}
107
        self._dividends_dict = {}
108
109
        # Cache of sid -> the first trading day of an asset, even if that day
110
        # is before 1/2/2002.
111
        self._asset_start_dates = {}
112
        self._asset_end_dates = {}
113
114
        # Handle extra sources, like Fetcher.
115
        self._augmented_sources_map = {}
116
        self._extra_source_df = None
117
118
        self._sim_params = sim_params
119
120
        self._futures_sid_path_func = futures_sid_path_func
121
122
        self.MINUTE_PRICE_ADJUSTMENT_FACTOR = 0.001
123
124
        if daily_equities_path is not None:
125
            self._daily_bar_reader = BcolzDailyBarReader(daily_equities_path)
126
        else:
127
            self._daily_bar_reader = None
128
129
        self._equity_minute_reader = equity_minute_reader
130
131
        # The following values are used by _minute_offset to calculate the
132
        # index into the minute bcolz date.
133
134
        # A lookup of table every minute to the corresponding day, to avoid
135
        # calling `.date()` on every lookup.
136
        self._minutes_to_day = {}
137
        # A map of days (keyed by midnight) to a DatetimeIndex of market
138
        # minutes for that day.
139
        self._minutes_by_day = {}
140
        # A dict of day to the offset into the minute bcolz on which that
141
        # days data starts.
142
        self._day_offsets = None
143
144
    def handle_extra_source(self, source_df):
145
        """
146
        Extra sources always have a sid column.
147
148
        We expand the given data (by forward filling) to the full range of
149
        the simulation dates, so that lookup is fast during simulation.
150
        """
151
        if source_df is None:
152
            return
153
154
        self._extra_source_df = source_df
155
156
        # source_df's sid column can either consist of assets we know about
157
        # (such as sid(24)) or of assets we don't know about (such as
158
        # palladium).
159
        #
160
        # In both cases, we break up the dataframe into individual dfs
161
        # that only contain a single asset's information.  ie, if source_df
162
        # has data for PALLADIUM and GOLD, we split source_df into two
163
        # dataframes, one for each. (same applies if source_df has data for
164
        # AAPL and IBM).
165
        #
166
        # We then take each child df and reindex it to the simulation's date
167
        # range by forward-filling missing values. this makes reads simpler.
168
        #
169
        # Finally, we store the data. For each column, we store a mapping in
170
        # self.augmented_sources_map from the column to a dictionary of
171
        # asset -> df.  In other words,
172
        # self.augmented_sources_map['days_to_cover']['AAPL'] gives us the df
173
        # holding that data.
174
175
        if self._sim_params.emission_rate == "daily":
176
            source_date_index = self.env.days_in_range(
177
                start=self._sim_params.period_start,
178
                end=self._sim_params.period_end
179
            )
180
        else:
181
            source_date_index = self.env.minutes_for_days_in_range(
182
                start=self._sim_params.period_start,
183
                end=self._sim_params.period_end
184
            )
185
186
        # Break the source_df up into one dataframe per sid.  This lets
187
        # us (more easily) calculate accurate start/end dates for each sid,
188
        # de-dup data, and expand the data to fit the backtest start/end date.
189
        grouped_by_sid = source_df.groupby(["sid"])
190
        group_names = grouped_by_sid.groups.keys()
191
        group_dict = {}
192
        for group_name in group_names:
193
            group_dict[group_name] = grouped_by_sid.get_group(group_name)
194
195
        for identifier, df in iteritems(group_dict):
196
            # Before reindexing, save the earliest and latest dates
197
            earliest_date = df.index[0]
198
            latest_date = df.index[-1]
199
200
            # Since we know this df only contains a single sid, we can safely
201
            # de-dupe by the index (dt)
202
            df = df.groupby(level=0).last()
203
204
            # Reindex the dataframe based on the backtest start/end date.
205
            # This makes reads easier during the backtest.
206
            df = df.reindex(index=source_date_index, method='ffill')
207
208
            if not isinstance(identifier, Asset):
209
                # for fake assets we need to store a start/end date
210
                self._asset_start_dates[identifier] = earliest_date
211
                self._asset_end_dates[identifier] = latest_date
212
213
            for col_name in df.columns.difference(['sid']):
214
                if col_name not in self._augmented_sources_map:
215
                    self._augmented_sources_map[col_name] = {}
216
217
                self._augmented_sources_map[col_name][identifier] = df
218
219
    def _open_minute_file(self, field, asset):
220
        sid_str = str(int(asset))
221
222
        try:
223
            carray = self._carrays[field][sid_str]
224
        except KeyError:
225
            carray = self._carrays[field][sid_str] = \
226
                self._get_ctable(asset)[field]
227
228
        return carray
229
230
    def _get_ctable(self, asset):
231
        sid = int(asset)
232
233
        if isinstance(asset, Future):
234
            if self._futures_sid_path_func is not None:
235
                path = self._futures_sid_path_func(
236
                    self._minutes_futures_path, sid
237
                )
238
            else:
239
                path = "{0}/{1}.bcolz".format(self._minutes_futures_path, sid)
240
        elif isinstance(asset, Equity):
241
            if self._equity_minute_reader.sid_path_func is not None:
242
                path = self._equity_minute_reader.sid_path_func(
243
                    self._equity_minute_reader.rootdir, sid
244
                )
245
            else:
246
                path = "{0}/{1}.bcolz".format(
247
                    self._equity_minute_reader.rootdir, sid)
248
249
        else:
250
            # TODO: Figure out if assets should be allowed if neither, and
251
            # why this code path is being hit.
252
            if self._equity_minute_reader.sid_path_func is not None:
253
                path = self._equity_minute_reader.sid_path_func(
254
                    self._equity_minute_reader.rootdir, sid
255
                )
256
            else:
257
                path = "{0}/{1}.bcolz".format(
258
                    self._equity_minute_reader.rootdir, sid)
259
260
        return bcolz.open(path, mode='r')
261
262
    def get_previous_value(self, asset, field, dt, data_frequency):
263
        """
264
        Given an asset and a column and a dt, returns the previous value for
265
        the same asset/column pair.  If this data portal is in minute mode,
266
        it's the previous minute value, otherwise it's the previous day's
267
        value.
268
269
        Parameters
270
        ---------
271
        asset : Asset
272
            The asset whose data is desired.
273
274
        field: string
275
            The desired field of the asset.  Valid values are "open",
276
            "open_price", "high", "low", "close", "close_price", "volume", and
277
            "price".
278
279
        dt: pd.Timestamp
280
            The timestamp from which to go back in time one slot.
281
282
        data_frequency: string
283
            The frequency of the data to query; i.e. whether the data is
284
            'daily' or 'minute' bars
285
286
        Returns
287
        -------
288
        The value of the desired field at the desired time.
289
        """
290
        if data_frequency == 'daily':
291
            prev_dt = self.env.previous_trading_day(dt)
292
        elif data_frequency == 'minute':
293
            prev_dt = self.env.previous_market_minute(dt)
294
295
        return self.get_spot_value(asset, field, prev_dt, data_frequency)
296
297
    def _check_extra_sources(self, asset, column, day):
298
        # If we have an extra source with a column called "price", only look
299
        # at it if it's on something like palladium and not AAPL (since our
300
        # own price data always wins when dealing with assets).
301
        look_in_augmented_sources = column in self._augmented_sources_map and \
302
            not (column in BASE_FIELDS and isinstance(asset, Asset))
303
304
        if look_in_augmented_sources:
305
            # we're being asked for a field in an extra source
306
            try:
307
                return self._augmented_sources_map[column][asset].\
308
                    loc[day, column]
309
            except:
310
                log.error(
311
                    "Could not find value for asset={0}, day={1},"
312
                    "column={2}".format(
313
                        str(asset),
314
                        str(day),
315
                        str(column)))
316
317
                raise KeyError
318
319
    def get_spot_value(self, asset, field, dt, data_frequency):
320
        """
321
        Public API method that returns a scalar value representing the value
322
        of the desired asset's field at either the given dt.
323
324
        Parameters
325
        ---------
326
        asset : Asset
327
            The asset whose data is desired.gith
328
329
        field: string
330
            The desired field of the asset.  Valid values are "open",
331
            "open_price", "high", "low", "close", "close_price", "volume", and
332
            "price".
333
334
        dt: pd.Timestamp
335
            The timestamp for the desired value.
336
337
        data_frequency: string
338
            The frequency of the data to query; i.e. whether the data is
339
            'daily' or 'minute' bars
340
341
        Returns
342
        -------
343
        The value of the desired field at the desired time.
344
        """
345
        extra_source_val = self._check_extra_sources(
346
            asset,
347
            field,
348
            dt,
349
        )
350
351
        if extra_source_val is not None:
352
            return extra_source_val
353
354
        if field not in BASE_FIELDS:
355
            raise KeyError("Invalid column: " + str(field))
356
357
        column_to_use = BASE_FIELDS[field]
358
359
        if isinstance(asset, int):
360
            asset = self._asset_finder.retrieve_asset(asset)
361
362
        self._check_is_currently_alive(asset, dt)
363
364
        if data_frequency == "daily":
365
            day_to_use = dt
366
            day_to_use = normalize_date(day_to_use)
367
            return self._get_daily_data(asset, column_to_use, day_to_use)
368
        else:
369
            if isinstance(asset, Future):
370
                return self._get_minute_spot_value_future(
371
                    asset, column_to_use, dt)
372
            else:
373
                return self._get_minute_spot_value(
374
                    asset, column_to_use, dt)
375
376
    def _get_minute_spot_value_future(self, asset, column, dt):
377
        # Futures bcolz files have 1440 bars per day (24 hours), 7 days a week.
378
        # The file attributes contain the "start_dt" and "last_dt" fields,
379
        # which represent the time period for this bcolz file.
380
381
        # The start_dt is midnight of the first day that this future started
382
        # trading.
383
384
        # figure out the # of minutes between dt and this asset's start_dt
385
        start_date = self._get_asset_start_date(asset)
386
        minute_offset = int((dt - start_date).total_seconds() / 60)
387
388
        if minute_offset < 0:
389
            # asking for a date that is before the asset's start date, no dice
390
            return 0.0
391
392
        # then just index into the bcolz carray at that offset
393
        carray = self._open_minute_file(column, asset)
394
        result = carray[minute_offset]
395
396
        # if there's missing data, go backwards until we run out of file
397
        while result == 0 and minute_offset > 0:
398
            minute_offset -= 1
399
            result = carray[minute_offset]
400
401
        if column != 'volume':
402
            return result * self.MINUTE_PRICE_ADJUSTMENT_FACTOR
403
        else:
404
            return result
405
406
    def setup_offset_cache(self, minutes_by_day, minutes_to_day):
407
        # TODO: This case should not be hit, but is when tests are setup
408
        # with data_frequency of daily, but run with minutely.
409
        if self._equity_minute_reader is None:
410
            return
411
412
        self._minutes_to_day = minutes_to_day
413
        self._minutes_by_day = minutes_by_day
414
        if self._sim_params is not None:
415
            start = self._sim_params.trading_days[0]
416
            first_trading_day_idx = self._equity_minute_reader.trading_days.\
417
                searchsorted(start)
418
            self._day_offsets = {
419
                day: (i + first_trading_day_idx) * 390
420
                for i, day in enumerate(
421
                    self._sim_params.trading_days)}
422
423
    def _minute_offset(self, dt):
424
        if self._day_offsets is not None:
425
            try:
426
                day = self._minutes_to_day[dt]
427
                minutes = self._minutes_by_day[day]
428
                return self._day_offsets[day] + minutes.get_loc(dt)
429
            except KeyError:
430
                return None
431
432
    def _get_minute_spot_value(self, asset, column, dt):
433
        # if dt is before the first market minute, minute_index
434
        # will be 0.  if it's after the last market minute, it'll
435
        # be len(minutes_for_day)
436
        minute_offset_to_use = self._minute_offset(dt)
437
438
        if minute_offset_to_use is None:
439
            given_day = pd.Timestamp(dt.date(), tz='utc')
440
            day_index = self._equity_minute_reader.trading_days.searchsorted(
441
                given_day)
442
443
            # if dt is before the first market minute, minute_index
444
            # will be 0.  if it's after the last market minute, it'll
445
            # be len(minutes_for_day)
446
            minute_index = self.env.market_minutes_for_day(given_day).\
447
                searchsorted(dt)
448
449
            minute_offset_to_use = (day_index * 390) + minute_index
450
451
        carray = self._equity_minute_reader._open_minute_file(column, asset)
452
        result = carray[minute_offset_to_use]
453
454
        if result == 0:
455
            # if the given minute doesn't have data, we need to seek
456
            # backwards until we find data. This makes the data
457
            # forward-filled.
458
459
            # get this asset's start date, so that we don't look before it.
460
            start_date = self._get_asset_start_date(asset)
461
            start_date_idx = self._equity_minute_reader.trading_days.\
462
                searchsorted(start_date)
463
            start_day_offset = start_date_idx * 390
464
465
            original_start = minute_offset_to_use
466
467
            while result == 0 and minute_offset_to_use > start_day_offset:
468
                minute_offset_to_use -= 1
469
                result = carray[minute_offset_to_use]
470
471
            # once we've found data, we need to check whether it needs
472
            # to be adjusted.
473
            if result != 0:
474
                minutes = self.env.market_minute_window(
475
                    start=dt,
476
                    count=(original_start - minute_offset_to_use + 1),
477
                    step=-1
478
                ).order()
479
480
                # only need to check for adjustments if we've gone back
481
                # far enough to cross the day boundary.
482
                if minutes[0].date() != minutes[-1].date():
483
                    # create a np array of size minutes, fill it all with
484
                    # the same value.  and adjust the array.
485
                    arr = np.array([result] * len(minutes),
486
                                   dtype=np.float64)
487
                    self._apply_all_adjustments(
488
                        data=arr,
489
                        asset=asset,
490
                        dts=minutes,
491
                        field=column
492
                    )
493
494
                    # The first value of the adjusted array is the value
495
                    # we want.
496
                    result = arr[0]
497
498
        if column != 'volume':
499
            return result * self.MINUTE_PRICE_ADJUSTMENT_FACTOR
500
        else:
501
            return result
502
503
    def _get_daily_data(self, asset, column, dt):
504
        while True:
505
            try:
506
                value = self._daily_bar_reader.spot_price(asset, dt, column)
507
                if value != -1:
508
                    return value
509
                else:
510
                    dt -= tradingcalendar.trading_day
511
            except NoDataOnDate:
512
                return 0
513
514
    def _get_history_daily_window(self, assets, end_dt, bar_count,
515
                                  field_to_use):
516
        """
517
        Internal method that returns a dataframe containing history bars
518
        of daily frequency for the given sids.
519
        """
520
        day_idx = tradingcalendar.trading_days.searchsorted(end_dt.date())
521
        days_for_window = tradingcalendar.trading_days[
522
            (day_idx - bar_count + 1):(day_idx + 1)]
523
524
        if len(assets) == 0:
525
            return pd.DataFrame(None,
526
                                index=days_for_window,
527
                                columns=None)
528
529
        data = []
530
531
        for asset in assets:
532
            if isinstance(asset, Future):
533
                data.append(self._get_history_daily_window_future(
534
                    asset, days_for_window, end_dt, field_to_use
535
                ))
536
            else:
537
                data.append(self._get_history_daily_window_equity(
538
                    asset, days_for_window, end_dt, field_to_use
539
                ))
540
541
        return pd.DataFrame(
542
            np.array(data).T,
543
            index=days_for_window,
544
            columns=assets
545
        )
546
547
    def _get_history_daily_window_future(self, asset, days_for_window,
548
                                         end_dt, column):
549
        # Since we don't have daily bcolz files for futures (yet), use minute
550
        # bars to calculate the daily values.
551
        data = []
552
        data_groups = []
553
554
        # get all the minutes for the days NOT including today
555
        for day in days_for_window[:-1]:
556
            minutes = self.env.market_minutes_for_day(day)
557
558
            values_for_day = np.zeros(len(minutes), dtype=np.float64)
559
560
            for idx, minute in enumerate(minutes):
561
                minute_val = self._get_minute_spot_value_future(
562
                    asset, column, minute
563
                )
564
565
                values_for_day[idx] = minute_val
566
567
            data_groups.append(values_for_day)
568
569
        # get the minutes for today
570
        last_day_minutes = pd.date_range(
571
            start=self.env.get_open_and_close(end_dt)[0],
572
            end=end_dt,
573
            freq="T"
574
        )
575
576
        values_for_last_day = np.zeros(len(last_day_minutes), dtype=np.float64)
577
578
        for idx, minute in enumerate(last_day_minutes):
579
            minute_val = self._get_minute_spot_value_future(
580
                asset, column, minute
581
            )
582
583
            values_for_last_day[idx] = minute_val
584
585
        data_groups.append(values_for_last_day)
586
587
        for group in data_groups:
588
            if len(group) == 0:
589
                continue
590
591
            if column == 'volume':
592
                data.append(np.sum(group))
593
            elif column == 'open':
594
                data.append(group[0])
595
            elif column == 'close':
596
                data.append(group[-1])
597
            elif column == 'high':
598
                data.append(np.amax(group))
599
            elif column == 'low':
600
                data.append(np.amin(group))
601
602
        return data
603
604
    def _get_history_daily_window_equity(self, asset, days_for_window,
605
                                         end_dt, field_to_use):
606
        sid = int(asset)
607
        ends_at_midnight = end_dt.hour == 0 and end_dt.minute == 0
608
609
        # get the start and end dates for this sid
610
        end_date = self._get_asset_end_date(asset)
611
612
        if ends_at_midnight or (days_for_window[-1] > end_date):
613
            # two cases where we use daily data for the whole range:
614
            # 1) the history window ends at midnight utc.
615
            # 2) the last desired day of the window is after the
616
            # last trading day, use daily data for the whole range.
617
            return self._get_daily_window_for_sid(
618
                asset,
619
                field_to_use,
620
                days_for_window,
621
                extra_slot=False
622
            )
623
        else:
624
            # for the last day of the desired window, use minute
625
            # data and aggregate it.
626
            all_minutes_for_day = self.env.market_minutes_for_day(
627
                pd.Timestamp(end_dt.date()))
628
629
            last_minute_idx = all_minutes_for_day.searchsorted(end_dt)
630
631
            # these are the minutes for the partial day
632
            minutes_for_partial_day =\
633
                all_minutes_for_day[0:(last_minute_idx + 1)]
634
635
            daily_data = self._get_daily_window_for_sid(
636
                sid,
637
                field_to_use,
638
                days_for_window[0:-1]
639
            )
640
641
            minute_data = self._get_minute_window_for_equity(
642
                sid,
643
                field_to_use,
644
                minutes_for_partial_day
645
            )
646
647
            if field_to_use == 'volume':
648
                minute_value = np.sum(minute_data)
649
            elif field_to_use == 'open':
650
                minute_value = minute_data[0]
651
            elif field_to_use == 'close':
652
                minute_value = minute_data[-1]
653
            elif field_to_use == 'high':
654
                minute_value = np.amax(minute_data)
655
            elif field_to_use == 'low':
656
                minute_value = np.amin(minute_data)
657
658
            # append the partial day.
659
            daily_data[-1] = minute_value
660
661
            return daily_data
662
663
    def _get_history_minute_window(self, assets, end_dt, bar_count,
664
                                   field_to_use):
665
        """
666
        Internal method that returns a dataframe containing history bars
667
        of minute frequency for the given sids.
668
        """
669
        # get all the minutes for this window
670
        minutes_for_window = self.env.market_minute_window(
671
            end_dt, bar_count, step=-1)[::-1]
672
673
        first_trading_day = self._equity_minute_reader.first_trading_day
674
675
        # but then cut it down to only the minutes after
676
        # the first trading day.
677
        modified_minutes_for_window = minutes_for_window[
678
            minutes_for_window.slice_indexer(first_trading_day)]
679
680
        modified_minutes_length = len(modified_minutes_for_window)
681
682
        if modified_minutes_length == 0:
683
            raise ValueError("Cannot calculate history window that ends"
684
                             "before 2002-01-02 14:31 UTC!")
685
686
        data = []
687
        bars_to_prepend = 0
688
        nans_to_prepend = None
689
690
        if modified_minutes_length < bar_count:
691
            first_trading_date = first_trading_day.date()
692
            if modified_minutes_for_window[0].date() == first_trading_date:
693
                # the beginning of the window goes before our global trading
694
                # start date
695
                bars_to_prepend = bar_count - modified_minutes_length
696
                nans_to_prepend = np.repeat(np.nan, bars_to_prepend)
697
698
        if len(assets) == 0:
699
            return pd.DataFrame(
700
                None,
701
                index=modified_minutes_for_window,
702
                columns=None
703
            )
704
705
        for asset in assets:
706
            asset_minute_data = self._get_minute_window_for_asset(
707
                asset,
708
                field_to_use,
709
                modified_minutes_for_window
710
            )
711
712
            if bars_to_prepend != 0:
713
                asset_minute_data = np.insert(asset_minute_data, 0,
714
                                              nans_to_prepend)
715
716
            data.append(asset_minute_data)
717
718
        return pd.DataFrame(
719
            np.array(data).T,
720
            index=minutes_for_window,
721
            columns=map(int, assets)
722
        )
723
724
    def get_history_window(self, assets, end_dt, bar_count, frequency, field,
725
                           ffill=True):
726
        """
727
        Public API method that returns a dataframe containing the requested
728
        history window.  Data is fully adjusted.
729
730
        Parameters
731
        ---------
732
        assets : list of zipline.data.Asset objects
733
            The assets whose data is desired.
734
735
        bar_count: int
736
            The number of bars desired.
737
738
        frequency: string
739
            "1d" or "1m"
740
741
        field: string
742
            The desired field of the asset.
743
744
        ffill: boolean
745
            Forward-fill missing values. Only has effect if field
746
            is 'price'.
747
748
        Returns
749
        -------
750
        A dataframe containing the requested data.
751
        """
752
        try:
753
            field_to_use = BASE_FIELDS[field]
754
        except KeyError:
755
            raise ValueError("Invalid history field: " + str(field))
756
757
        # sanity check in case sids were passed in
758
        assets = [(self.env.asset_finder.retrieve_asset(asset) if
759
                   isinstance(asset, int) else asset) for asset in assets]
760
761
        if frequency == "1d":
762
            df = self._get_history_daily_window(assets, end_dt, bar_count,
763
                                                field_to_use)
764
        elif frequency == "1m":
765
            df = self._get_history_minute_window(assets, end_dt, bar_count,
766
                                                 field_to_use)
767
        else:
768
            raise ValueError("Invalid frequency: {0}".format(frequency))
769
770
        # forward-fill if needed
771
        if field == "price" and ffill:
772
            df.fillna(method='ffill', inplace=True)
773
774
        return df
775
776
    def _get_minute_window_for_asset(self, asset, field, minutes_for_window):
777
        """
778
        Internal method that gets a window of adjusted minute data for an asset
779
        and specified date range.  Used to support the history API method for
780
        minute bars.
781
782
        Missing bars are filled with NaN.
783
784
        Parameters
785
        ----------
786
        asset : Asset
787
            The asset whose data is desired.
788
789
        field: string
790
            The specific field to return.  "open", "high", "close_price", etc.
791
792
        minutes_for_window: pd.DateTimeIndex
793
            The list of minutes representing the desired window.  Each minute
794
            is a pd.Timestamp.
795
796
        Returns
797
        -------
798
        A numpy array with requested values.
799
        """
800
        if isinstance(asset, int):
801
            asset = self.env.asset_finder.retrieve_asset(asset)
802
803
        if isinstance(asset, Future):
804
            return self._get_minute_window_for_future(asset, field,
805
                                                      minutes_for_window)
806
        else:
807
            return self._get_minute_window_for_equity(asset, field,
808
                                                      minutes_for_window)
809
810
    def _get_minute_window_for_future(self, asset, field, minutes_for_window):
811
        # THIS IS TEMPORARY.  For now, we are only exposing futures within
812
        # equity trading hours (9:30 am to 4pm, Eastern).  The easiest way to
813
        # do this is to simply do a spot lookup for each desired minute.
814
        return_data = np.zeros(len(minutes_for_window), dtype=np.float64)
815
        for idx, minute in enumerate(minutes_for_window):
816
            return_data[idx] = \
817
                self._get_minute_spot_value_future(asset, field, minute)
818
819
        # Note: an improvement could be to find the consecutive runs within
820
        # minutes_for_window, and use them to read the underlying ctable
821
        # more efficiently.
822
823
        # Once futures are on 24-hour clock, then we can just grab all the
824
        # requested minutes in one shot from the ctable.
825
826
        # no adjustments for futures, yay.
827
        return return_data
828
829
    def _get_minute_window_for_equity(self, asset, field, minutes_for_window):
830
        # each sid's minutes are stored in a bcolz file
831
        # the bcolz file has 390 bars per day, starting at 1/2/2002, regardless
832
        # of when the asset started trading and regardless of half days.
833
        # for a half day, the second half is filled with zeroes.
834
835
        # find the position of start_dt in the entire timeline, go back
836
        # bar_count bars, and that's the unadjusted data
837
        raw_data = self._equity_minute_reader._open_minute_file(field, asset)
838
839
        start_idx = max(
840
            self._equity_minute_reader._find_position_of_minute(
841
                minutes_for_window[0]),
842
            0)
843
        end_idx = self._equity_minute_reader._find_position_of_minute(
844
            minutes_for_window[-1]) + 1
845
846
        if end_idx == 0:
847
            # No data to return for minute window.
848
            return np.full(len(minutes_for_window), np.nan)
849
850
        return_data = np.zeros(len(minutes_for_window), dtype=np.float64)
851
852
        data_to_copy = raw_data[start_idx:end_idx]
853
854
        num_minutes = len(minutes_for_window)
855
856
        # data_to_copy contains all the zeros (from 1pm to 4pm of an early
857
        # close).  num_minutes is the number of actual trading minutes.  if
858
        # these two have different lengths, that means that we need to trim
859
        # away data due to early closes.
860
        if len(data_to_copy) != num_minutes:
861
            # get a copy of the minutes in Eastern time, since we depend on
862
            # an early close being at 1pm Eastern.
863
            eastern_minutes = minutes_for_window.tz_convert("US/Eastern")
864
865
            # accumulate a list of indices of the last minute of an early
866
            # close day.  For example, if data_to_copy starts at 12:55 pm, and
867
            # there are five minutes of real data before 180 zeroes, we would
868
            # put 5 into last_minute_idx_of_early_close_day, because the fifth
869
            # minute is the last "real" minute of the day.
870
            last_minute_idx_of_early_close_day = []
871
            for minute_idx, minute_dt in enumerate(eastern_minutes):
872
                if minute_idx == (num_minutes - 1):
873
                    break
874
875
                if minute_dt.hour == 13 and minute_dt.minute == 0:
876
                    next_minute = eastern_minutes[minute_idx + 1]
877
                    if next_minute.hour != 13:
878
                        # minute_dt is the last minute of an early close day
879
                        last_minute_idx_of_early_close_day.append(minute_idx)
880
881
            # spin through the list of early close markers, and use them to
882
            # chop off 180 minutes at a time from data_to_copy.
883
            for idx, early_close_minute_idx in \
884
                    enumerate(last_minute_idx_of_early_close_day):
885
                early_close_minute_idx -= (180 * idx)
886
                data_to_copy = np.delete(
887
                    data_to_copy,
888
                    range(
889
                        early_close_minute_idx + 1,
890
                        early_close_minute_idx + 181
891
                    )
892
                )
893
894
        return_data[0:len(data_to_copy)] = data_to_copy
895
896
        self._apply_all_adjustments(
897
            return_data,
898
            asset,
899
            minutes_for_window,
900
            field,
901
            self.MINUTE_PRICE_ADJUSTMENT_FACTOR
902
        )
903
904
        return return_data
905
906
    def _apply_all_adjustments(self, data, asset, dts, field,
907
                               price_adj_factor=1.0):
908
        """
909
        Internal method that applies all the necessary adjustments on the
910
        given data array.
911
912
        The adjustments are:
913
        - splits
914
        - if field != "volume":
915
            - mergers
916
            - dividends
917
            - * 0.001
918
            - any zero fields replaced with NaN
919
        - all values rounded to 3 digits after the decimal point.
920
921
        Parameters
922
        ----------
923
        data : np.array
924
            The data to be adjusted.
925
926
        asset: Asset
927
            The asset whose data is being adjusted.
928
929
        dts: pd.DateTimeIndex
930
            The list of minutes or days representing the desired window.
931
932
        field: string
933
            The field whose values are in the data array.
934
935
        price_adj_factor: float
936
            Factor with which to adjust OHLC values.
937
        Returns
938
        -------
939
        None.  The data array is modified in place.
940
        """
941
        self._apply_adjustments_to_window(
942
            self._get_adjustment_list(
943
                asset, self._splits_dict, "SPLITS"
944
            ),
945
            data,
946
            dts,
947
            field != 'volume'
948
        )
949
950
        if field != 'volume':
951
            self._apply_adjustments_to_window(
952
                self._get_adjustment_list(
953
                    asset, self._mergers_dict, "MERGERS"
954
                ),
955
                data,
956
                dts,
957
                True
958
            )
959
960
            self._apply_adjustments_to_window(
961
                self._get_adjustment_list(
962
                    asset, self._dividends_dict, "DIVIDENDS"
963
                ),
964
                data,
965
                dts,
966
                True
967
            )
968
969
            data *= price_adj_factor
970
971
            # if anything is zero, it's a missing bar, so replace it with NaN.
972
            # we only want to do this for non-volume fields, because a missing
973
            # volume should be 0.
974
            data[data == 0] = np.NaN
975
976
        np.around(data, 3, out=data)
977
978
    def _get_daily_window_for_sid(self, asset, field, days_in_window,
979
                                  extra_slot=True):
980
        """
981
        Internal method that gets a window of adjusted daily data for a sid
982
        and specified date range.  Used to support the history API method for
983
        daily bars.
984
985
        Parameters
986
        ----------
987
        asset : Asset
988
            The asset whose data is desired.
989
990
        start_dt: pandas.Timestamp
991
            The start of the desired window of data.
992
993
        bar_count: int
994
            The number of days of data to return.
995
996
        field: string
997
            The specific field to return.  "open", "high", "close_price", etc.
998
999
        extra_slot: boolean
1000
            Whether to allocate an extra slot in the returned numpy array.
1001
            This extra slot will hold the data for the last partial day.  It's
1002
            much better to create it here than to create a copy of the array
1003
            later just to add a slot.
1004
1005
        Returns
1006
        -------
1007
        A numpy array with requested values.  Any missing slots filled with
1008
        nan.
1009
1010
        """
1011
        bar_count = len(days_in_window)
1012
        # create an np.array of size bar_count
1013
        if extra_slot:
1014
            return_array = np.zeros((bar_count + 1,))
1015
        else:
1016
            return_array = np.zeros((bar_count,))
1017
1018
        return_array[:] = np.NAN
1019
1020
        start_date = self._get_asset_start_date(asset)
1021
        end_date = self._get_asset_end_date(asset)
1022
        day_slice = days_in_window.slice_indexer(start_date, end_date)
1023
        active_days = days_in_window[day_slice]
1024
1025
        if active_days.shape[0]:
1026
            data = self._daily_bar_reader.history_window(field,
1027
                                                         active_days[0],
1028
                                                         active_days[-1],
1029
                                                         asset)
1030
            return_array[day_slice] = data
1031
            self._apply_all_adjustments(
1032
                return_array,
1033
                asset,
1034
                active_days,
1035
                field,
1036
            )
1037
1038
        return return_array
1039
1040
    @staticmethod
1041
    def _apply_adjustments_to_window(adjustments_list, window_data,
1042
                                     dts_in_window, multiply):
1043
        if len(adjustments_list) == 0:
1044
            return
1045
1046
        # advance idx to the correct spot in the adjustments list, based on
1047
        # when the window starts
1048
        idx = 0
1049
1050
        while idx < len(adjustments_list) and dts_in_window[0] >\
1051
                adjustments_list[idx][0]:
1052
            idx += 1
1053
1054
        # if we've advanced through all the adjustments, then there's nothing
1055
        # to do.
1056
        if idx == len(adjustments_list):
1057
            return
1058
1059
        while idx < len(adjustments_list):
1060
            adjustment_to_apply = adjustments_list[idx]
1061
1062
            if adjustment_to_apply[0] > dts_in_window[-1]:
1063
                break
1064
1065
            range_end = dts_in_window.searchsorted(adjustment_to_apply[0])
1066
            if multiply:
1067
                window_data[0:range_end] *= adjustment_to_apply[1]
1068
            else:
1069
                window_data[0:range_end] /= adjustment_to_apply[1]
1070
1071
            idx += 1
1072
1073
    def _get_adjustment_list(self, asset, adjustments_dict, table_name):
1074
        """
1075
        Internal method that returns a list of adjustments for the given sid.
1076
1077
        Parameters
1078
        ----------
1079
        asset : Asset
1080
            The asset for which to return adjustments.
1081
1082
        adjustments_dict: dict
1083
            A dictionary of sid -> list that is used as a cache.
1084
1085
        table_name: string
1086
            The table that contains this data in the adjustments db.
1087
1088
        Returns
1089
        -------
1090
        adjustments: list
1091
            A list of [multiplier, pd.Timestamp], earliest first
1092
1093
        """
1094
        if self._adjustment_reader is None:
1095
            return []
1096
1097
        sid = int(asset)
1098
1099
        try:
1100
            adjustments = adjustments_dict[sid]
1101
        except KeyError:
1102
            adjustments = adjustments_dict[sid] = self._adjustment_reader.\
1103
                get_adjustments_for_sid(table_name, sid)
1104
1105
        return adjustments
1106
1107
    def _check_is_currently_alive(self, asset, dt):
1108
        sid = int(asset)
1109
1110
        if sid not in self._asset_start_dates:
1111
            self._get_asset_start_date(asset)
1112
1113
        start_date = self._asset_start_dates[sid]
1114
        if self._asset_start_dates[sid] > dt:
1115
            raise NoTradeDataAvailableTooEarly(
1116
                sid=sid,
1117
                dt=normalize_date(dt),
1118
                start_dt=start_date
1119
            )
1120
1121
        end_date = self._asset_end_dates[sid]
1122
        if self._asset_end_dates[sid] < dt:
1123
            raise NoTradeDataAvailableTooLate(
1124
                sid=sid,
1125
                dt=normalize_date(dt),
1126
                end_dt=end_date
1127
            )
1128
1129
    def _get_asset_start_date(self, asset):
1130
        self._ensure_asset_dates(asset)
1131
        return self._asset_start_dates[asset]
1132
1133
    def _get_asset_end_date(self, asset):
1134
        self._ensure_asset_dates(asset)
1135
        return self._asset_end_dates[asset]
1136
1137
    def _ensure_asset_dates(self, asset):
1138
        sid = int(asset)
1139
1140
        if sid not in self._asset_start_dates:
1141
            self._asset_start_dates[sid] = asset.start_date
1142
            self._asset_end_dates[sid] = asset.end_date
1143
1144
    def get_splits(self, sids, dt):
1145
        """
1146
        Returns any splits for the given sids and the given dt.
1147
1148
        Parameters
1149
        ----------
1150
        sids : list
1151
            Sids for which we want splits.
1152
1153
        dt: pd.Timestamp
1154
            The date for which we are checking for splits.  Note: this is
1155
            expected to be midnight UTC.
1156
1157
        Returns
1158
        -------
1159
        list: List of splits, where each split is a (sid, ratio) tuple.
1160
        """
1161
        if self._adjustment_reader is None or len(sids) == 0:
1162
            return {}
1163
1164
        # convert dt to # of seconds since epoch, because that's what we use
1165
        # in the adjustments db
1166
        seconds = int(dt.value / 1e9)
1167
1168
        splits = self._adjustment_reader.conn.execute(
1169
            "SELECT sid, ratio FROM SPLITS WHERE effective_date = ?",
1170
            (seconds,)).fetchall()
1171
1172
        sids_set = set(sids)
1173
        splits = [split for split in splits if split[0] in sids_set]
1174
1175
        return splits
1176
1177
    def get_stock_dividends(self, sid, trading_days):
1178
        """
1179
        Returns all the stock dividends for a specific sid that occur
1180
        in the given trading range.
1181
1182
        Parameters
1183
        ----------
1184
        sid: int
1185
            The asset whose stock dividends should be returned.
1186
1187
        trading_days: pd.DatetimeIndex
1188
            The trading range.
1189
1190
        Returns
1191
        -------
1192
        list: A list of objects with all relevant attributes populated.
1193
        All timestamp fields are converted to pd.Timestamps.
1194
        """
1195
1196
        if self._adjustment_reader is None:
1197
            return []
1198
1199
        if len(trading_days) == 0:
1200
            return []
1201
1202
        start_dt = trading_days[0].value / 1e9
1203
        end_dt = trading_days[-1].value / 1e9
1204
1205
        dividends = self._adjustment_reader.conn.execute(
1206
            "SELECT * FROM stock_dividend_payouts WHERE sid = ? AND "
1207
            "ex_date > ? AND pay_date < ?", (int(sid), start_dt, end_dt,)).\
1208
            fetchall()
1209
1210
        dividend_info = []
1211
        for dividend_tuple in dividends:
1212
            dividend_info.append({
1213
                "declared_date": dividend_tuple[1],
1214
                "ex_date": pd.Timestamp(dividend_tuple[2], unit="s"),
1215
                "pay_date": pd.Timestamp(dividend_tuple[3], unit="s"),
1216
                "payment_sid": dividend_tuple[4],
1217
                "ratio": dividend_tuple[5],
1218
                "record_date": pd.Timestamp(dividend_tuple[6], unit="s"),
1219
                "sid": dividend_tuple[7]
1220
            })
1221
1222
        return dividend_info
1223
1224
    def contains(self, asset, field):
1225
        return field in BASE_FIELDS or \
1226
            (field in self._augmented_sources_map and
1227
             asset in self._augmented_sources_map[field])
1228
1229
    def get_fetcher_assets(self, day):
1230
        """
1231
        Returns a list of assets for the current date, as defined by the
1232
        fetcher data.
1233
1234
        Notes
1235
        -----
1236
        Data is forward-filled.  If there is no fetcher data defined for day
1237
        N, we use day N-1's data (if available, otherwise we keep going back).
1238
1239
        Returns
1240
        -------
1241
        list: a list of Asset objects.
1242
        """
1243
        # return a list of assets for the current date, as defined by the
1244
        # fetcher source
1245
        if self._extra_source_df is None:
1246
            return []
1247
1248
        if day in self._extra_source_df.index:
1249
            date_to_use = day
1250
        else:
1251
            # current day isn't in the fetcher df, go back the last
1252
            # available day
1253
            idx = self._extra_source_df.index.searchsorted(day)
1254
            if idx == 0:
1255
                return []
1256
1257
            date_to_use = self._extra_source_df.index[idx - 1]
1258
1259
        asset_list = self._extra_source_df.loc[date_to_use]["sid"]
1260
1261
        # make sure they're actually assets
1262
        asset_list = [asset for asset in asset_list
1263
                      if isinstance(asset, Asset)]
1264
1265
        return asset_list
1266