Completed
Pull Request — master (#901)
by Eddie
02:06
created

_check_is_currently_alive()   B

Complexity

Conditions 5

Size

Total Lines 23

Duplication

Lines 0
Ratio 0 %
Metric Value
cc 5
dl 0
loc 23
rs 8.2508
1
#
2
# Copyright 2015 Quantopian, Inc.
3
#
4
# Licensed under the Apache License, Version 2.0 (the "License");
5
# you may not use this file except in compliance with the License.
6
# You may obtain a copy of the License at
7
#
8
#     http://www.apache.org/licenses/LICENSE-2.0
9
#
10
# Unless required by applicable law or agreed to in writing, software
11
# distributed under the License is distributed on an "AS IS" BASIS,
12
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13
# See the License for the specific language governing permissions and
14
# limitations under the License.
15
16
from datetime import datetime
17
import bcolz
18
from logbook import Logger
19
20
import numpy as np
21
import pandas as pd
22
from pandas.tslib import normalize_date
23
from six import iteritems
24
25
from zipline.assets import Asset, Future, Equity
26
from zipline.data.us_equity_pricing import (
27
    BcolzDailyBarReader,
28
    NoDataOnDate
29
)
30
from zipline.pipeline.data.equity_pricing import USEquityPricing
31
32
from zipline.utils import tradingcalendar
33
from zipline.errors import (
34
    NoTradeDataAvailableTooEarly,
35
    NoTradeDataAvailableTooLate
36
)
37
38
log = Logger('DataPortal')
39
40
HISTORY_FREQUENCIES = ["1d", "1m"]
41
42
BASE_FIELDS = {
43
    'open': 'open',
44
    'open_price': 'open',
45
    'high': 'high',
46
    'low': 'low',
47
    'close': 'close',
48
    'close_price': 'close',
49
    'volume': 'volume',
50
    'price': 'close'
51
}
52
53
54
US_EQUITY_COLUMNS = {
55
    'open': USEquityPricing.open,
56
    'open_price': USEquityPricing.open,
57
    'high': USEquityPricing.high,
58
    'low': USEquityPricing.low,
59
    'close': USEquityPricing.close,
60
    'close_price': USEquityPricing.close,
61
    'volume': USEquityPricing.volume,
62
    'price': USEquityPricing.close,
63
}
64
65
66
class DataPortal(object):
67
    def __init__(self,
68
                 env,
69
                 sim_params=None,
70
                 equity_minute_reader=None,
71
                 minutes_futures_path=None,
72
                 daily_equities_path=None,
73
                 adjustment_reader=None,
74
                 futures_sid_path_func=None):
75
        self.env = env
76
77
        # Internal pointers to the current dt (can be minute) and current day.
78
        # In daily mode, they point to the same thing. In minute mode, it's
79
        # useful to have separate pointers to the current day and to the
80
        # current minute.  These pointers are updated by the
81
        # AlgorithmSimulator's transform loop.
82
        self.current_dt = None
83
        self.current_day = None
84
85
        # This is a bit ugly, but is here for performance reasons.  In minute
86
        # simulations, we need to very quickly go from dt -> (# of minutes
87
        # since Jan 1 2002 9:30 Eastern).
88
        #
89
        # The clock that heartbeats the simulation has all the necessary
90
        # information to do this calculation very quickly.  This value is
91
        # calculated there, and then set here
92
        self.cur_data_offset = 0
93
94
        self.views = {}
95
96
        self._daily_equities_path = daily_equities_path
97
        self._minutes_futures_path = minutes_futures_path
98
99
        self._asset_finder = env.asset_finder
100
101
        self._carrays = {
102
            'open': {},
103
            'high': {},
104
            'low': {},
105
            'close': {},
106
            'volume': {},
107
            'sid': {},
108
            'dt': {},
109
        }
110
111
        self._adjustment_reader = adjustment_reader
112
113
        # caches of sid -> adjustment list
114
        self._splits_dict = {}
115
        self._mergers_dict = {}
116
        self._dividends_dict = {}
117
118
        # Cache of sid -> the first trading day of an asset, even if that day
119
        # is before 1/2/2002.
120
        self._asset_start_dates = {}
121
        self._asset_end_dates = {}
122
123
        # Handle extra sources, like Fetcher.
124
        self._augmented_sources_map = {}
125
        self._extra_source_df = None
126
127
        self._sim_params = sim_params
128
        if self._sim_params is not None:
129
            self._data_frequency = self._sim_params.data_frequency
130
        else:
131
            self._data_frequency = "minute"
132
133
        self._futures_sid_path_func = futures_sid_path_func
134
135
        self.MINUTE_PRICE_ADJUSTMENT_FACTOR = 0.001
136
137
        if daily_equities_path is not None:
138
            self._daily_bar_reader = BcolzDailyBarReader(daily_equities_path)
139
        else:
140
            self._daily_bar_reader = None
141
142
        self._equity_minute_reader = equity_minute_reader
143
144
        # The following values are used by _minute_offset to calculate the
145
        # index into the minute bcolz date.
146
147
        # A lookup of table every minute to the corresponding day, to avoid
148
        # calling `.date()` on every lookup.
149
        self._minutes_to_day = {}
150
        # A map of days (keyed by midnight) to a DatetimeIndex of market
151
        # minutes for that day.
152
        self._minutes_by_day = {}
153
        # A dict of day to the offset into the minute bcolz on which that
154
        # days data starts.
155
        self._day_offsets = None
156
157
    def handle_extra_source(self, source_df):
158
        """
159
        Extra sources always have a sid column.
160
161
        We expand the given data (by forward filling) to the full range of
162
        the simulation dates, so that lookup is fast during simulation.
163
        """
164
        if source_df is None:
165
            return
166
167
        self._extra_source_df = source_df
168
169
        # source_df's sid column can either consist of assets we know about
170
        # (such as sid(24)) or of assets we don't know about (such as
171
        # palladium).
172
        #
173
        # In both cases, we break up the dataframe into individual dfs
174
        # that only contain a single asset's information.  ie, if source_df
175
        # has data for PALLADIUM and GOLD, we split source_df into two
176
        # dataframes, one for each. (same applies if source_df has data for
177
        # AAPL and IBM).
178
        #
179
        # We then take each child df and reindex it to the simulation's date
180
        # range by forward-filling missing values. this makes reads simpler.
181
        #
182
        # Finally, we store the data. For each column, we store a mapping in
183
        # self.augmented_sources_map from the column to a dictionary of
184
        # asset -> df.  In other words,
185
        # self.augmented_sources_map['days_to_cover']['AAPL'] gives us the df
186
        # holding that data.
187
188
        if self._sim_params.emission_rate == "daily":
189
            source_date_index = self.env.days_in_range(
190
                start=self._sim_params.period_start,
191
                end=self._sim_params.period_end
192
            )
193
        else:
194
            source_date_index = self.env.minutes_for_days_in_range(
195
                start=self._sim_params.period_start,
196
                end=self._sim_params.period_end
197
            )
198
199
        # Break the source_df up into one dataframe per sid.  This lets
200
        # us (more easily) calculate accurate start/end dates for each sid,
201
        # de-dup data, and expand the data to fit the backtest start/end date.
202
        grouped_by_sid = source_df.groupby(["sid"])
203
        group_names = grouped_by_sid.groups.keys()
204
        group_dict = {}
205
        for group_name in group_names:
206
            group_dict[group_name] = grouped_by_sid.get_group(group_name)
207
208
        for identifier, df in iteritems(group_dict):
209
            # Before reindexing, save the earliest and latest dates
210
            earliest_date = df.index[0]
211
            latest_date = df.index[-1]
212
213
            # Since we know this df only contains a single sid, we can safely
214
            # de-dupe by the index (dt)
215
            df = df.groupby(level=0).last()
216
217
            # Reindex the dataframe based on the backtest start/end date.
218
            # This makes reads easier during the backtest.
219
            df = df.reindex(index=source_date_index, method='ffill')
220
221
            if not isinstance(identifier, Asset):
222
                # for fake assets we need to store a start/end date
223
                self._asset_start_dates[identifier] = earliest_date
224
                self._asset_end_dates[identifier] = latest_date
225
226
            for col_name in df.columns.difference(['sid']):
227
                if col_name not in self._augmented_sources_map:
228
                    self._augmented_sources_map[col_name] = {}
229
230
                self._augmented_sources_map[col_name][identifier] = df
231
232
    def _open_minute_file(self, field, asset):
233
        sid_str = str(int(asset))
234
235
        try:
236
            carray = self._carrays[field][sid_str]
237
        except KeyError:
238
            carray = self._carrays[field][sid_str] = \
239
                self._get_ctable(asset)[field]
240
241
        return carray
242
243
    def _get_ctable(self, asset):
244
        sid = int(asset)
245
246
        if isinstance(asset, Future):
247
            if self._futures_sid_path_func is not None:
248
                path = self._futures_sid_path_func(
249
                    self._minutes_futures_path, sid
250
                )
251
            else:
252
                path = "{0}/{1}.bcolz".format(self._minutes_futures_path, sid)
253
        elif isinstance(asset, Equity):
254
            if self._equity_minute_reader.sid_path_func is not None:
255
                path = self._equity_minute_reader.sid_path_func(
256
                    self._equity_minute_reader.rootdir, sid
257
                )
258
            else:
259
                path = "{0}/{1}.bcolz".format(
260
                    self._equity_minute_reader.rootdir, sid)
261
262
        else:
263
            # TODO: Figure out if assets should be allowed if neither, and
264
            # why this code path is being hit.
265
            if self._equity_minute_reader.sid_path_func is not None:
266
                path = self._equity_minute_reader.sid_path_func(
267
                    self._equity_minute_reader.rootdir, sid
268
                )
269
            else:
270
                path = "{0}/{1}.bcolz".format(
271
                    self._equity_minute_reader.rootdir, sid)
272
273
        return bcolz.open(path, mode='r')
274
275
    def get_previous_value(self, asset, field, dt):
276
        """
277
        Given an asset and a column and a dt, returns the previous value for
278
        the same asset/column pair.  If this data portal is in minute mode,
279
        it's the previous minute value, otherwise it's the previous day's
280
        value.
281
282
        Parameters
283
        ---------
284
        asset : Asset
285
            The asset whose data is desired.
286
287
        field: string
288
            The desired field of the asset.  Valid values are "open",
289
            "open_price", "high", "low", "close", "close_price", "volume", and
290
            "price".
291
292
        dt: pd.Timestamp
293
            The timestamp from which to go back in time one slot.
294
295
        Returns
296
        -------
297
        The value of the desired field at the desired time.
298
        """
299
        if self._data_frequency == 'daily':
300
            prev_dt = self.env.previous_trading_day(dt)
301
        elif self._data_frequency == 'minute':
302
            prev_dt = self.env.previous_market_minute(dt)
303
304
        return self.get_spot_value(asset, field, prev_dt)
305
306
    def _check_extra_sources(self, asset, column, day):
307
        # If we have an extra source with a column called "price", only look
308
        # at it if it's on something like palladium and not AAPL (since our
309
        # own price data always wins when dealing with assets).
310
        look_in_augmented_sources = column in self._augmented_sources_map and \
311
            not (column in BASE_FIELDS and isinstance(asset, Asset))
312
313
        if look_in_augmented_sources:
314
            # we're being asked for a field in an extra source
315
            try:
316
                return self._augmented_sources_map[column][asset].\
317
                    loc[day, column]
318
            except:
319
                log.error(
320
                    "Could not find value for asset={0}, current_day={1},"
321
                    "column={2}".format(
322
                        str(asset),
323
                        str(self.current_day),
324
                        str(column)))
325
326
                raise KeyError
327
328
    def get_spot_value(self, asset, field, dt=None):
329
        """
330
        Public API method that returns a scalar value representing the value
331
        of the desired asset's field at either the given dt, or this data
332
        portal's current_dt.
333
334
        Parameters
335
        ---------
336
        asset : Asset
337
            The asset whose data is desired.gith
338
339
        field: string
340
            The desired field of the asset.  Valid values are "open",
341
            "open_price", "high", "low", "close", "close_price", "volume", and
342
            "price".
343
344
        dt: pd.Timestamp
345
            (Optional) The timestamp for the desired value.
346
347
        Returns
348
        -------
349
        The value of the desired field at the desired time.
350
        """
351
        extra_source_val = self._check_extra_sources(
352
            asset,
353
            field,
354
            (dt or self.current_dt)
355
        )
356
357
        if extra_source_val is not None:
358
            return extra_source_val
359
360
        if field not in BASE_FIELDS:
361
            raise KeyError("Invalid column: " + str(field))
362
363
        column_to_use = BASE_FIELDS[field]
364
365
        if isinstance(asset, int):
366
            asset = self._asset_finder.retrieve_asset(asset)
367
368
        self._check_is_currently_alive(asset, dt)
369
370
        if self._data_frequency == "daily":
371
            day_to_use = dt or self.current_day
372
            day_to_use = normalize_date(day_to_use)
373
            return self._get_daily_data(asset, column_to_use, day_to_use)
374
        else:
375
            dt_to_use = dt or self.current_dt
376
377
            if isinstance(asset, Future):
378
                return self._get_minute_spot_value_future(
379
                    asset, column_to_use, dt_to_use)
380
            else:
381
                return self._get_minute_spot_value(
382
                    asset, column_to_use, dt_to_use)
383
384
    def _get_minute_spot_value_future(self, asset, column, dt):
385
        # Futures bcolz files have 1440 bars per day (24 hours), 7 days a week.
386
        # The file attributes contain the "start_dt" and "last_dt" fields,
387
        # which represent the time period for this bcolz file.
388
389
        # The start_dt is midnight of the first day that this future started
390
        # trading.
391
392
        # figure out the # of minutes between dt and this asset's start_dt
393
        start_date = self._get_asset_start_date(asset)
394
        minute_offset = int((dt - start_date).total_seconds() / 60)
395
396
        if minute_offset < 0:
397
            # asking for a date that is before the asset's start date, no dice
398
            return 0.0
399
400
        # then just index into the bcolz carray at that offset
401
        carray = self._open_minute_file(column, asset)
402
        result = carray[minute_offset]
403
404
        # if there's missing data, go backwards until we run out of file
405
        while result == 0 and minute_offset > 0:
406
            minute_offset -= 1
407
            result = carray[minute_offset]
408
409
        if column != 'volume':
410
            return result * self.MINUTE_PRICE_ADJUSTMENT_FACTOR
411
        else:
412
            return result
413
414
    def setup_offset_cache(self, minutes_by_day, minutes_to_day):
415
        # TODO: This case should not be hit, but is when tests are setup
416
        # with data_frequency of daily, but run with minutely.
417
        if self._equity_minute_reader is None:
418
            return
419
420
        self._minutes_to_day = minutes_to_day
421
        self._minutes_by_day = minutes_by_day
422
        if self._sim_params is not None:
423
            start = self._sim_params.trading_days[0]
424
            first_trading_day_idx = self._equity_minute_reader.trading_days.\
425
                searchsorted(start)
426
            self._day_offsets = {
427
                day: (i + first_trading_day_idx) * 390
428
                for i, day in enumerate(
429
                    self._sim_params.trading_days)}
430
431
    def _minute_offset(self, dt):
432
        if self._day_offsets is not None:
433
            try:
434
                day = self._minutes_to_day[dt]
435
                minutes = self._minutes_by_day[day]
436
                return self._day_offsets[day] + minutes.get_loc(dt)
437
            except KeyError:
438
                return None
439
440
    def _get_minute_spot_value(self, asset, column, dt):
441
        # if dt is before the first market minute, minute_index
442
        # will be 0.  if it's after the last market minute, it'll
443
        # be len(minutes_for_day)
444
        minute_offset_to_use = self._minute_offset(dt)
445
446
        if minute_offset_to_use is None:
447
            given_day = pd.Timestamp(dt.date(), tz='utc')
448
            day_index = self._equity_minute_reader.trading_days.searchsorted(
449
                given_day)
450
451
            # if dt is before the first market minute, minute_index
452
            # will be 0.  if it's after the last market minute, it'll
453
            # be len(minutes_for_day)
454
            minute_index = self.env.market_minutes_for_day(given_day).\
455
                searchsorted(dt)
456
457
            minute_offset_to_use = (day_index * 390) + minute_index
458
459
        carray = self._open_minute_file(column, asset)
460
        result = carray[minute_offset_to_use]
461
462
        if result == 0:
463
            # if the given minute doesn't have data, we need to seek
464
            # backwards until we find data. This makes the data
465
            # forward-filled.
466
467
            # get this asset's start date, so that we don't look before it.
468
            start_date = self._get_asset_start_date(asset)
469
            start_date_idx = self._equity_minute_reader.trading_days.\
470
                searchsorted(start_date)
471
            start_day_offset = start_date_idx * 390
472
473
            original_start = minute_offset_to_use
474
475
            while result == 0 and minute_offset_to_use > start_day_offset:
476
                minute_offset_to_use -= 1
477
                result = carray[minute_offset_to_use]
478
479
            # once we've found data, we need to check whether it needs
480
            # to be adjusted.
481
            if result != 0:
482
                minutes = self.env.market_minute_window(
483
                    start=(dt or self.current_dt),
484
                    count=(original_start - minute_offset_to_use + 1),
485
                    step=-1
486
                ).order()
487
488
                # only need to check for adjustments if we've gone back
489
                # far enough to cross the day boundary.
490
                if minutes[0].date() != minutes[-1].date():
491
                    # create a np array of size minutes, fill it all with
492
                    # the same value.  and adjust the array.
493
                    arr = np.array([result] * len(minutes),
494
                                   dtype=np.float64)
495
                    self._apply_all_adjustments(
496
                        data=arr,
497
                        asset=asset,
498
                        dts=minutes,
499
                        field=column
500
                    )
501
502
                    # The first value of the adjusted array is the value
503
                    # we want.
504
                    result = arr[0]
505
506
        if column != 'volume':
507
            return result * self.MINUTE_PRICE_ADJUSTMENT_FACTOR
508
        else:
509
            return result
510
511
    def _get_daily_data(self, asset, column, dt):
512
        while True:
513
            try:
514
                value = self._daily_bar_reader.spot_price(asset, dt, column)
515
                if value != -1:
516
                    return value
517
                else:
518
                    dt -= tradingcalendar.trading_day
519
            except NoDataOnDate:
520
                return 0
521
522
    def _get_history_daily_window(self, assets, end_dt, bar_count,
523
                                  field_to_use):
524
        """
525
        Internal method that returns a dataframe containing history bars
526
        of daily frequency for the given sids.
527
        """
528
        day_idx = tradingcalendar.trading_days.searchsorted(end_dt.date())
529
        days_for_window = tradingcalendar.trading_days[
530
            (day_idx - bar_count + 1):(day_idx + 1)]
531
532
        if len(assets) == 0:
533
            return pd.DataFrame(None,
534
                                index=days_for_window,
535
                                columns=None)
536
537
        data = []
538
539
        for asset in assets:
540
            if isinstance(asset, Future):
541
                data.append(self._get_history_daily_window_future(
542
                    asset, days_for_window, end_dt, field_to_use
543
                ))
544
            else:
545
                data.append(self._get_history_daily_window_equity(
546
                    asset, days_for_window, end_dt, field_to_use
547
                ))
548
549
        return pd.DataFrame(
550
            np.array(data).T,
551
            index=days_for_window,
552
            columns=assets
553
        )
554
555
    def _get_history_daily_window_future(self, asset, days_for_window,
556
                                         end_dt, column):
557
        # Since we don't have daily bcolz files for futures (yet), use minute
558
        # bars to calculate the daily values.
559
        data = []
560
        data_groups = []
561
562
        # get all the minutes for the days NOT including today
563
        for day in days_for_window[:-1]:
564
            minutes = self.env.market_minutes_for_day(day)
565
566
            values_for_day = np.zeros(len(minutes), dtype=np.float64)
567
568
            for idx, minute in enumerate(minutes):
569
                minute_val = self._get_minute_spot_value_future(
570
                    asset, column, minute
571
                )
572
573
                values_for_day[idx] = minute_val
574
575
            data_groups.append(values_for_day)
576
577
        # get the minutes for today
578
        last_day_minutes = pd.date_range(
579
            start=self.env.get_open_and_close(end_dt)[0],
580
            end=end_dt,
581
            freq="T"
582
        )
583
584
        values_for_last_day = np.zeros(len(last_day_minutes), dtype=np.float64)
585
586
        for idx, minute in enumerate(last_day_minutes):
587
            minute_val = self._get_minute_spot_value_future(
588
                asset, column, minute
589
            )
590
591
            values_for_last_day[idx] = minute_val
592
593
        data_groups.append(values_for_last_day)
594
595
        for group in data_groups:
596
            if len(group) == 0:
597
                continue
598
599
            if column == 'volume':
600
                data.append(np.sum(group))
601
            elif column == 'open':
602
                data.append(group[0])
603
            elif column == 'close':
604
                data.append(group[-1])
605
            elif column == 'high':
606
                data.append(np.amax(group))
607
            elif column == 'low':
608
                data.append(np.amin(group))
609
610
        return data
611
612
    def _get_history_daily_window_equity(self, asset, days_for_window,
613
                                         end_dt, field_to_use):
614
        sid = int(asset)
615
        ends_at_midnight = end_dt.hour == 0 and end_dt.minute == 0
616
617
        # get the start and end dates for this sid
618
        end_date = self._get_asset_end_date(asset)
619
620
        if ends_at_midnight or (days_for_window[-1] > end_date):
621
            # two cases where we use daily data for the whole range:
622
            # 1) the history window ends at midnight utc.
623
            # 2) the last desired day of the window is after the
624
            # last trading day, use daily data for the whole range.
625
            return self._get_daily_window_for_sid(
626
                asset,
627
                field_to_use,
628
                days_for_window,
629
                extra_slot=False
630
            )
631
        else:
632
            # for the last day of the desired window, use minute
633
            # data and aggregate it.
634
            all_minutes_for_day = self.env.market_minutes_for_day(
635
                pd.Timestamp(end_dt.date()))
636
637
            last_minute_idx = all_minutes_for_day.searchsorted(end_dt)
638
639
            # these are the minutes for the partial day
640
            minutes_for_partial_day =\
641
                all_minutes_for_day[0:(last_minute_idx + 1)]
642
643
            daily_data = self._get_daily_window_for_sid(
644
                sid,
645
                field_to_use,
646
                days_for_window[0:-1]
647
            )
648
649
            minute_data = self._get_minute_window_for_equity(
650
                sid,
651
                field_to_use,
652
                minutes_for_partial_day
653
            )
654
655
            if field_to_use == 'volume':
656
                minute_value = np.sum(minute_data)
657
            elif field_to_use == 'open':
658
                minute_value = minute_data[0]
659
            elif field_to_use == 'close':
660
                minute_value = minute_data[-1]
661
            elif field_to_use == 'high':
662
                minute_value = np.amax(minute_data)
663
            elif field_to_use == 'low':
664
                minute_value = np.amin(minute_data)
665
666
            # append the partial day.
667
            daily_data[-1] = minute_value
668
669
            return daily_data
670
671
    def _get_history_minute_window(self, assets, end_dt, bar_count,
672
                                   field_to_use):
673
        """
674
        Internal method that returns a dataframe containing history bars
675
        of minute frequency for the given sids.
676
        """
677
        # get all the minutes for this window
678
        minutes_for_window = self.env.market_minute_window(
679
            end_dt, bar_count, step=-1)[::-1]
680
681
        first_trading_day = self._equity_minute_reader.first_trading_day
682
683
        # but then cut it down to only the minutes after
684
        # the first trading day.
685
        modified_minutes_for_window = minutes_for_window[
686
            minutes_for_window.slice_indexer(first_trading_day)]
687
688
        modified_minutes_length = len(modified_minutes_for_window)
689
690
        if modified_minutes_length == 0:
691
            raise ValueError("Cannot calculate history window that ends"
692
                             "before 2002-01-02 14:31 UTC!")
693
694
        data = []
695
        bars_to_prepend = 0
696
        nans_to_prepend = None
697
698
        if modified_minutes_length < bar_count:
699
            first_trading_date = first_trading_day.date()
700
            if modified_minutes_for_window[0].date() == first_trading_date:
701
                # the beginning of the window goes before our global trading
702
                # start date
703
                bars_to_prepend = bar_count - modified_minutes_length
704
                nans_to_prepend = np.repeat(np.nan, bars_to_prepend)
705
706
        if len(assets) == 0:
707
            return pd.DataFrame(
708
                None,
709
                index=modified_minutes_for_window,
710
                columns=None
711
            )
712
713
        for asset in assets:
714
            asset_minute_data = self._get_minute_window_for_asset(
715
                asset,
716
                field_to_use,
717
                modified_minutes_for_window
718
            )
719
720
            if bars_to_prepend != 0:
721
                asset_minute_data = np.insert(asset_minute_data, 0,
722
                                              nans_to_prepend)
723
724
            data.append(asset_minute_data)
725
726
        return pd.DataFrame(
727
            np.array(data).T,
728
            index=minutes_for_window,
729
            columns=map(int, assets)
730
        )
731
732
    def get_history_window(self, assets, end_dt, bar_count, frequency, field,
733
                           ffill=True):
734
        """
735
        Public API method that returns a dataframe containing the requested
736
        history window.  Data is fully adjusted.
737
738
        Parameters
739
        ---------
740
        assets : list of zipline.data.Asset objects
741
            The assets whose data is desired.
742
743
        bar_count: int
744
            The number of bars desired.
745
746
        frequency: string
747
            "1d" or "1m"
748
749
        field: string
750
            The desired field of the asset.
751
752
        ffill: boolean
753
            Forward-fill missing values. Only has effect if field
754
            is 'price'.
755
756
        Returns
757
        -------
758
        A dataframe containing the requested data.
759
        """
760
        try:
761
            field_to_use = BASE_FIELDS[field]
762
        except KeyError:
763
            raise ValueError("Invalid history field: " + str(field))
764
765
        # sanity check in case sids were passed in
766
        assets = [(self.env.asset_finder.retrieve_asset(asset) if
767
                   isinstance(asset, int) else asset) for asset in assets]
768
769
        if frequency == "1d":
770
            df = self._get_history_daily_window(assets, end_dt, bar_count,
771
                                                field_to_use)
772
        elif frequency == "1m":
773
            df = self._get_history_minute_window(assets, end_dt, bar_count,
774
                                                 field_to_use)
775
        else:
776
            raise ValueError("Invalid frequency: {0}".format(frequency))
777
778
        # forward-fill if needed
779
        if field == "price" and ffill:
780
            df.fillna(method='ffill', inplace=True)
781
782
        return df
783
784
    def _get_minute_window_for_asset(self, asset, field, minutes_for_window):
785
        """
786
        Internal method that gets a window of adjusted minute data for an asset
787
        and specified date range.  Used to support the history API method for
788
        minute bars.
789
790
        Missing bars are filled with NaN.
791
792
        Parameters
793
        ----------
794
        asset : Asset
795
            The asset whose data is desired.
796
797
        field: string
798
            The specific field to return.  "open", "high", "close_price", etc.
799
800
        minutes_for_window: pd.DateTimeIndex
801
            The list of minutes representing the desired window.  Each minute
802
            is a pd.Timestamp.
803
804
        Returns
805
        -------
806
        A numpy array with requested values.
807
        """
808
        if isinstance(asset, int):
809
            asset = self.env.asset_finder.retrieve_asset(asset)
810
811
        if isinstance(asset, Future):
812
            return self._get_minute_window_for_future(asset, field,
813
                                                      minutes_for_window)
814
        else:
815
            return self._get_minute_window_for_equity(asset, field,
816
                                                      minutes_for_window)
817
818
    def _get_minute_window_for_future(self, asset, field, minutes_for_window):
819
        # THIS IS TEMPORARY.  For now, we are only exposing futures within
820
        # equity trading hours (9:30 am to 4pm, Eastern).  The easiest way to
821
        # do this is to simply do a spot lookup for each desired minute.
822
        return_data = np.zeros(len(minutes_for_window), dtype=np.float64)
823
        for idx, minute in enumerate(minutes_for_window):
824
            return_data[idx] = \
825
                self._get_minute_spot_value_future(asset, field, minute)
826
827
        # Note: an improvement could be to find the consecutive runs within
828
        # minutes_for_window, and use them to read the underlying ctable
829
        # more efficiently.
830
831
        # Once futures are on 24-hour clock, then we can just grab all the
832
        # requested minutes in one shot from the ctable.
833
834
        # no adjustments for futures, yay.
835
        return return_data
836
837
    def _get_minute_window_for_equity(self, asset, field, minutes_for_window):
838
        # each sid's minutes are stored in a bcolz file
839
        # the bcolz file has 390 bars per day, starting at 1/2/2002, regardless
840
        # of when the asset started trading and regardless of half days.
841
        # for a half day, the second half is filled with zeroes.
842
843
        # find the position of start_dt in the entire timeline, go back
844
        # bar_count bars, and that's the unadjusted data
845
        raw_data = self._open_minute_file(field, asset)
846
847
        start_idx = max(self._find_position_of_minute(minutes_for_window[0]),
848
                        0)
849
        end_idx = self._find_position_of_minute(minutes_for_window[-1]) + 1
850
851
        if end_idx == 0:
852
            # No data to return for minute window.
853
            return np.full(len(minutes_for_window), np.nan)
854
855
        return_data = np.zeros(len(minutes_for_window), dtype=np.float64)
856
857
        data_to_copy = raw_data[start_idx:end_idx]
858
859
        num_minutes = len(minutes_for_window)
860
861
        # data_to_copy contains all the zeros (from 1pm to 4pm of an early
862
        # close).  num_minutes is the number of actual trading minutes.  if
863
        # these two have different lengths, that means that we need to trim
864
        # away data due to early closes.
865
        if len(data_to_copy) != num_minutes:
866
            # get a copy of the minutes in Eastern time, since we depend on
867
            # an early close being at 1pm Eastern.
868
            eastern_minutes = minutes_for_window.tz_convert("US/Eastern")
869
870
            # accumulate a list of indices of the last minute of an early
871
            # close day.  For example, if data_to_copy starts at 12:55 pm, and
872
            # there are five minutes of real data before 180 zeroes, we would
873
            # put 5 into last_minute_idx_of_early_close_day, because the fifth
874
            # minute is the last "real" minute of the day.
875
            last_minute_idx_of_early_close_day = []
876
            for minute_idx, minute_dt in enumerate(eastern_minutes):
877
                if minute_idx == (num_minutes - 1):
878
                    break
879
880
                if minute_dt.hour == 13 and minute_dt.minute == 0:
881
                    next_minute = eastern_minutes[minute_idx + 1]
882
                    if next_minute.hour != 13:
883
                        # minute_dt is the last minute of an early close day
884
                        last_minute_idx_of_early_close_day.append(minute_idx)
885
886
            # spin through the list of early close markers, and use them to
887
            # chop off 180 minutes at a time from data_to_copy.
888
            for idx, early_close_minute_idx in \
889
                    enumerate(last_minute_idx_of_early_close_day):
890
                early_close_minute_idx -= (180 * idx)
891
                data_to_copy = np.delete(
892
                    data_to_copy,
893
                    range(
894
                        early_close_minute_idx + 1,
895
                        early_close_minute_idx + 181
896
                    )
897
                )
898
899
        return_data[0:len(data_to_copy)] = data_to_copy
900
901
        self._apply_all_adjustments(
902
            return_data,
903
            asset,
904
            minutes_for_window,
905
            field,
906
            self.MINUTE_PRICE_ADJUSTMENT_FACTOR
907
        )
908
909
        return return_data
910
911
    def _apply_all_adjustments(self, data, asset, dts, field,
912
                               price_adj_factor=1.0):
913
        """
914
        Internal method that applies all the necessary adjustments on the
915
        given data array.
916
917
        The adjustments are:
918
        - splits
919
        - if field != "volume":
920
            - mergers
921
            - dividends
922
            - * 0.001
923
            - any zero fields replaced with NaN
924
        - all values rounded to 3 digits after the decimal point.
925
926
        Parameters
927
        ----------
928
        data : np.array
929
            The data to be adjusted.
930
931
        asset: Asset
932
            The asset whose data is being adjusted.
933
934
        dts: pd.DateTimeIndex
935
            The list of minutes or days representing the desired window.
936
937
        field: string
938
            The field whose values are in the data array.
939
940
        price_adj_factor: float
941
            Factor with which to adjust OHLC values.
942
        Returns
943
        -------
944
        None.  The data array is modified in place.
945
        """
946
        self._apply_adjustments_to_window(
947
            self._get_adjustment_list(
948
                asset, self._splits_dict, "SPLITS"
949
            ),
950
            data,
951
            dts,
952
            field != 'volume'
953
        )
954
955
        if field != 'volume':
956
            self._apply_adjustments_to_window(
957
                self._get_adjustment_list(
958
                    asset, self._mergers_dict, "MERGERS"
959
                ),
960
                data,
961
                dts,
962
                True
963
            )
964
965
            self._apply_adjustments_to_window(
966
                self._get_adjustment_list(
967
                    asset, self._dividends_dict, "DIVIDENDS"
968
                ),
969
                data,
970
                dts,
971
                True
972
            )
973
974
            data *= price_adj_factor
975
976
            # if anything is zero, it's a missing bar, so replace it with NaN.
977
            # we only want to do this for non-volume fields, because a missing
978
            # volume should be 0.
979
            data[data == 0] = np.NaN
980
981
        np.around(data, 3, out=data)
982
983
    def _find_position_of_minute(self, minute_dt):
984
        """
985
        Internal method that returns the position of the given minute in the
986
        list of every trading minute since market open on 1/2/2002.
987
988
        IMPORTANT: This method assumes every day is 390 minutes long, even
989
        early closes.  Our minute bcolz files are generated like this to
990
        support fast lookup.
991
992
        ex. this method would return 2 for 1/2/2002 9:32 AM Eastern.
993
994
        Parameters
995
        ----------
996
        minute_dt: pd.Timestamp
997
            The minute whose position should be calculated.
998
999
        Returns
1000
        -------
1001
        The position of the given minute in the list of all trading minutes
1002
        since market open on 1/2/2002.
1003
        """
1004
        day = minute_dt.date()
1005
        day_idx = self._equity_minute_reader.trading_days.searchsorted(day)
1006
        if day_idx < 0:
1007
            return -1
1008
1009
        day_open = pd.Timestamp(
1010
            datetime(
1011
                year=day.year,
1012
                month=day.month,
1013
                day=day.day,
1014
                hour=9,
1015
                minute=31),
1016
            tz='US/Eastern').tz_convert('UTC')
1017
1018
        minutes_offset = int((minute_dt - day_open).total_seconds()) / 60
1019
1020
        return int((390 * day_idx) + minutes_offset)
1021
1022
    def _get_daily_window_for_sid(self, asset, field, days_in_window,
1023
                                  extra_slot=True):
1024
        """
1025
        Internal method that gets a window of adjusted daily data for a sid
1026
        and specified date range.  Used to support the history API method for
1027
        daily bars.
1028
1029
        Parameters
1030
        ----------
1031
        asset : Asset
1032
            The asset whose data is desired.
1033
1034
        start_dt: pandas.Timestamp
1035
            The start of the desired window of data.
1036
1037
        bar_count: int
1038
            The number of days of data to return.
1039
1040
        field: string
1041
            The specific field to return.  "open", "high", "close_price", etc.
1042
1043
        extra_slot: boolean
1044
            Whether to allocate an extra slot in the returned numpy array.
1045
            This extra slot will hold the data for the last partial day.  It's
1046
            much better to create it here than to create a copy of the array
1047
            later just to add a slot.
1048
1049
        Returns
1050
        -------
1051
        A numpy array with requested values.  Any missing slots filled with
1052
        nan.
1053
1054
        """
1055
        bar_count = len(days_in_window)
1056
        # create an np.array of size bar_count
1057
        if extra_slot:
1058
            return_array = np.zeros((bar_count + 1,))
1059
        else:
1060
            return_array = np.zeros((bar_count,))
1061
1062
        return_array[:] = np.NAN
1063
1064
        start_date = self._get_asset_start_date(asset)
1065
        end_date = self._get_asset_end_date(asset)
1066
        day_slice = days_in_window.slice_indexer(start_date, end_date)
1067
        active_days = days_in_window[day_slice]
1068
1069
        if active_days.shape[0]:
1070
            data = self._daily_bar_reader.history_window(field,
1071
                                                         active_days[0],
1072
                                                         active_days[-1],
1073
                                                         asset)
1074
            sid = int(asset)
1075
            return_array[day_slice] = data
1076
            self._apply_all_adjustments(
1077
                return_array,
1078
                sid,
1079
                active_days,
1080
                field,
1081
            )
1082
1083
        return return_array
1084
1085
    @staticmethod
1086
    def _apply_adjustments_to_window(adjustments_list, window_data,
1087
                                     dts_in_window, multiply):
1088
        if len(adjustments_list) == 0:
1089
            return
1090
1091
        # advance idx to the correct spot in the adjustments list, based on
1092
        # when the window starts
1093
        idx = 0
1094
1095
        while idx < len(adjustments_list) and dts_in_window[0] >\
1096
                adjustments_list[idx][0]:
1097
            idx += 1
1098
1099
        # if we've advanced through all the adjustments, then there's nothing
1100
        # to do.
1101
        if idx == len(adjustments_list):
1102
            return
1103
1104
        while idx < len(adjustments_list):
1105
            adjustment_to_apply = adjustments_list[idx]
1106
1107
            if adjustment_to_apply[0] > dts_in_window[-1]:
1108
                break
1109
1110
            range_end = dts_in_window.searchsorted(adjustment_to_apply[0])
1111
            if multiply:
1112
                window_data[0:range_end] *= adjustment_to_apply[1]
1113
            else:
1114
                window_data[0:range_end] /= adjustment_to_apply[1]
1115
1116
            idx += 1
1117
1118
    def _get_adjustment_list(self, asset, adjustments_dict, table_name):
1119
        """
1120
        Internal method that returns a list of adjustments for the given sid.
1121
1122
        Parameters
1123
        ----------
1124
        asset : Asset
1125
            The asset for which to return adjustments.
1126
1127
        adjustments_dict: dict
1128
            A dictionary of sid -> list that is used as a cache.
1129
1130
        table_name: string
1131
            The table that contains this data in the adjustments db.
1132
1133
        Returns
1134
        -------
1135
        adjustments: list
1136
            A list of [multiplier, pd.Timestamp], earliest first
1137
1138
        """
1139
        if self._adjustment_reader is None:
1140
            return []
1141
1142
        sid = int(asset)
1143
1144
        try:
1145
            adjustments = adjustments_dict[sid]
1146
        except KeyError:
1147
            adjustments = adjustments_dict[sid] = self._adjustment_reader.\
1148
                get_adjustments_for_sid(table_name, sid)
1149
1150
        return adjustments
1151
1152
    def get_equity_price_view(self, asset):
1153
        """
1154
        Returns a DataPortalSidView for the given asset.  Used to support the
1155
        data[sid(N)] public API.  Not needed if DataPortal is used standalone.
1156
1157
        Parameters
1158
        ----------
1159
        asset : Asset
1160
            Asset that is being queried.
1161
1162
        Returns
1163
        -------
1164
        DataPortalSidView: Accessor into the given asset's data.
1165
        """
1166
        try:
1167
            view = self.views[asset]
1168
        except KeyError:
1169
            view = self.views[asset] = DataPortalSidView(asset, self)
1170
1171
        return view
1172
1173
    def _check_is_currently_alive(self, asset, dt):
1174
        if dt is None:
1175
            dt = self.current_day
1176
1177
        sid = int(asset)
1178
1179
        if sid not in self._asset_start_dates:
1180
            self._get_asset_start_date(asset)
1181
1182
        start_date = self._asset_start_dates[sid]
1183
        if self._asset_start_dates[sid] > dt:
1184
            raise NoTradeDataAvailableTooEarly(
1185
                sid=sid,
1186
                dt=dt,
1187
                start_dt=start_date
1188
            )
1189
1190
        end_date = self._asset_end_dates[sid]
1191
        if self._asset_end_dates[sid] < dt:
1192
            raise NoTradeDataAvailableTooLate(
1193
                sid=sid,
1194
                dt=dt,
1195
                end_dt=end_date
1196
            )
1197
1198
    def _get_asset_start_date(self, asset):
1199
        self._ensure_asset_dates(asset)
1200
        return self._asset_start_dates[asset]
1201
1202
    def _get_asset_end_date(self, asset):
1203
        self._ensure_asset_dates(asset)
1204
        return self._asset_end_dates[asset]
1205
1206
    def _ensure_asset_dates(self, asset):
1207
        sid = int(asset)
1208
1209
        if sid not in self._asset_start_dates:
1210
            self._asset_start_dates[sid] = asset.start_date
1211
            self._asset_end_dates[sid] = asset.end_date
1212
1213
    def get_splits(self, sids, dt):
1214
        """
1215
        Returns any splits for the given sids and the given dt.
1216
1217
        Parameters
1218
        ----------
1219
        sids : list
1220
            Sids for which we want splits.
1221
1222
        dt: pd.Timestamp
1223
            The date for which we are checking for splits.  Note: this is
1224
            expected to be midnight UTC.
1225
1226
        Returns
1227
        -------
1228
        list: List of splits, where each split is a (sid, ratio) tuple.
1229
        """
1230
        if self._adjustment_reader is None or len(sids) == 0:
1231
            return {}
1232
1233
        # convert dt to # of seconds since epoch, because that's what we use
1234
        # in the adjustments db
1235
        seconds = int(dt.value / 1e9)
1236
1237
        splits = self._adjustment_reader.conn.execute(
1238
            "SELECT sid, ratio FROM SPLITS WHERE effective_date = ?",
1239
            (seconds,)).fetchall()
1240
1241
        sids_set = set(sids)
1242
        splits = [split for split in splits if split[0] in sids_set]
1243
1244
        return splits
1245
1246
    def get_stock_dividends(self, sid, trading_days):
1247
        """
1248
        Returns all the stock dividends for a specific sid that occur
1249
        in the given trading range.
1250
1251
        Parameters
1252
        ----------
1253
        sid: int
1254
            The asset whose stock dividends should be returned.
1255
1256
        trading_days: pd.DatetimeIndex
1257
            The trading range.
1258
1259
        Returns
1260
        -------
1261
        list: A list of objects with all relevant attributes populated.
1262
        All timestamp fields are converted to pd.Timestamps.
1263
        """
1264
1265
        if self._adjustment_reader is None:
1266
            return []
1267
1268
        if len(trading_days) == 0:
1269
            return []
1270
1271
        start_dt = trading_days[0].value / 1e9
1272
        end_dt = trading_days[-1].value / 1e9
1273
1274
        dividends = self._adjustment_reader.conn.execute(
1275
            "SELECT * FROM stock_dividend_payouts WHERE sid = ? AND "
1276
            "ex_date > ? AND pay_date < ?", (int(sid), start_dt, end_dt,)).\
1277
            fetchall()
1278
1279
        dividend_info = []
1280
        for dividend_tuple in dividends:
1281
            dividend_info.append({
1282
                "declared_date": dividend_tuple[1],
1283
                "ex_date": pd.Timestamp(dividend_tuple[2], unit="s"),
1284
                "pay_date": pd.Timestamp(dividend_tuple[3], unit="s"),
1285
                "payment_sid": dividend_tuple[4],
1286
                "ratio": dividend_tuple[5],
1287
                "record_date": pd.Timestamp(dividend_tuple[6], unit="s"),
1288
                "sid": dividend_tuple[7]
1289
            })
1290
1291
        return dividend_info
1292
1293
    def contains(self, asset, field):
1294
        return field in BASE_FIELDS or \
1295
            (field in self._augmented_sources_map and
1296
             asset in self._augmented_sources_map[field])
1297
1298
    def get_fetcher_assets(self):
1299
        """
1300
        Returns a list of assets for the current date, as defined by the
1301
        fetcher data.
1302
1303
        Notes
1304
        -----
1305
        Data is forward-filled.  If there is no fetcher data defined for day
1306
        N, we use day N-1's data (if available, otherwise we keep going back).
1307
1308
        Returns
1309
        -------
1310
        list: a list of Asset objects.
1311
        """
1312
        # return a list of assets for the current date, as defined by the
1313
        # fetcher source
1314
        if self._extra_source_df is None:
1315
            return []
1316
1317
        if self.current_day in self._extra_source_df.index:
1318
            date_to_use = self.current_day
1319
        else:
1320
            # current day isn't in the fetcher df, go back the last
1321
            # available day
1322
            idx = self._extra_source_df.index.searchsorted(self.current_day)
1323
            if idx == 0:
1324
                return []
1325
1326
            date_to_use = self._extra_source_df.index[idx - 1]
1327
1328
        asset_list = self._extra_source_df.loc[date_to_use]["sid"]
1329
1330
        # make sure they're actually assets
1331
        asset_list = [asset for asset in asset_list
1332
                      if isinstance(asset, Asset)]
1333
1334
        return asset_list
1335
1336
1337
class DataPortalSidView(object):
1338
    def __init__(self, asset, portal):
1339
        self.asset = asset
1340
        self.portal = portal
1341
1342
    def __getattr__(self, column):
1343
        return self.portal.get_spot_value(self.asset, column)
1344
1345
    def __contains__(self, column):
1346
        return self.portal.contains(self.asset, column)
1347
1348
    def __getitem__(self, column):
1349
        return self.__getattr__(column)
1350