Completed
Pull Request — master (#858)
by Eddie
01:43
created

zipline.data.DataPortal._get_asset_start_date()   A

Complexity

Conditions 1

Size

Total Lines 3

Duplication

Lines 0
Ratio 0 %
Metric Value
cc 1
dl 0
loc 3
rs 10
1
#
2
# Copyright 2015 Quantopian, Inc.
3
#
4
# Licensed under the Apache License, Version 2.0 (the "License");
5
# you may not use this file except in compliance with the License.
6
# You may obtain a copy of the License at
7
#
8
#     http://www.apache.org/licenses/LICENSE-2.0
9
#
10
# Unless required by applicable law or agreed to in writing, software
11
# distributed under the License is distributed on an "AS IS" BASIS,
12
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13
# See the License for the specific language governing permissions and
14
# limitations under the License.
15
16
from datetime import datetime
17
import bcolz
18
from logbook import Logger
19
20
import numpy as np
21
import pandas as pd
22
from pandas.tslib import normalize_date
23
from six import iteritems
24
25
from zipline.assets import Asset, Future, Equity
26
from zipline.data.us_equity_pricing import (
27
    BcolzDailyBarReader,
28
    NoDataOnDate
29
)
30
from zipline.pipeline.data.equity_pricing import USEquityPricing
31
32
from zipline.utils import tradingcalendar
33
from zipline.errors import (
34
    NoTradeDataAvailableTooEarly,
35
    NoTradeDataAvailableTooLate
36
)
37
38
log = Logger('DataPortal')
39
40
HISTORY_FREQUENCIES = ["1d", "1m"]
41
42
BASE_FIELDS = {
43
    'open': 'open',
44
    'open_price': 'open',
45
    'high': 'high',
46
    'low': 'low',
47
    'close': 'close',
48
    'close_price': 'close',
49
    'volume': 'volume',
50
    'price': 'close'
51
}
52
53
54
US_EQUITY_COLUMNS = {
55
    'open': USEquityPricing.open,
56
    'open_price': USEquityPricing.open,
57
    'high': USEquityPricing.high,
58
    'low': USEquityPricing.low,
59
    'close': USEquityPricing.close,
60
    'close_price': USEquityPricing.close,
61
    'volume': USEquityPricing.volume,
62
    'price': USEquityPricing.close,
63
}
64
65
66
class DataPortal(object):
67
    def __init__(self,
68
                 env,
69
                 sim_params=None,
70
                 equity_minute_reader=None,
71
                 minutes_futures_path=None,
72
                 daily_equities_path=None,
73
                 adjustment_reader=None,
74
                 futures_sid_path_func=None):
75
        self.env = env
76
77
        # Internal pointers to the current dt (can be minute) and current day.
78
        # In daily mode, they point to the same thing. In minute mode, it's
79
        # useful to have separate pointers to the current day and to the
80
        # current minute.  These pointers are updated by the
81
        # AlgorithmSimulator's transform loop.
82
        self.current_dt = None
83
        self.current_day = None
84
85
        # This is a bit ugly, but is here for performance reasons.  In minute
86
        # simulations, we need to very quickly go from dt -> (# of minutes
87
        # since Jan 1 2002 9:30 Eastern).
88
        #
89
        # The clock that heartbeats the simulation has all the necessary
90
        # information to do this calculation very quickly.  This value is
91
        # calculated there, and then set here
92
        self.cur_data_offset = 0
93
94
        self.views = {}
95
96
        self._daily_equities_path = daily_equities_path
97
        self._minutes_futures_path = minutes_futures_path
98
99
        self._asset_finder = env.asset_finder
100
101
        self._carrays = {
102
            'open': {},
103
            'high': {},
104
            'low': {},
105
            'close': {},
106
            'volume': {},
107
            'sid': {},
108
            'dt': {},
109
        }
110
111
        self._adjustment_reader = adjustment_reader
112
113
        # caches of sid -> adjustment list
114
        self._splits_dict = {}
115
        self._mergers_dict = {}
116
        self._dividends_dict = {}
117
118
        # Cache of sid -> the first trading day of an asset, even if that day
119
        # is before 1/2/2002.
120
        self._asset_start_dates = {}
121
        self._asset_end_dates = {}
122
123
        # Handle extra sources, like Fetcher.
124
        self._augmented_sources_map = {}
125
        self._extra_source_df = None
126
127
        self._sim_params = sim_params
128
        if self._sim_params is not None:
129
            self._data_frequency = self._sim_params.data_frequency
130
        else:
131
            self._data_frequency = "minute"
132
133
        self._futures_sid_path_func = futures_sid_path_func
134
135
        self.MINUTE_PRICE_ADJUSTMENT_FACTOR = 0.001
136
137
        if daily_equities_path is not None:
138
            self._daily_bar_reader = BcolzDailyBarReader(daily_equities_path)
139
        else:
140
            self._daily_bar_reader = None
141
142
        self._equity_minute_reader = equity_minute_reader
143
144
        # The following values are used by _minute_offset to calculate the
145
        # index into the minute bcolz date.
146
147
        # A lookup of table every minute to the corresponding day, to avoid
148
        # calling `.date()` on every lookup.
149
        self._minutes_to_day = {}
150
        # A map of days (keyed by midnight) to a DatetimeIndex of market
151
        # minutes for that day.
152
        self._minutes_by_day = {}
153
        # A dict of day to the offset into the minute bcolz on which that
154
        # days data starts.
155
        self._day_offsets = None
156
157
    def handle_extra_source(self, source_df):
158
        """
159
        Extra sources always have a sid column.
160
161
        We expand the given data (by forward filling) to the full range of
162
        the simulation dates, so that lookup is fast during simulation.
163
        """
164
        if source_df is None:
165
            return
166
167
        self._extra_source_df = source_df
168
169
        # source_df's sid column can either consist of assets we know about
170
        # (such as sid(24)) or of assets we don't know about (such as
171
        # palladium).
172
        #
173
        # In both cases, we break up the dataframe into individual dfs
174
        # that only contain a single asset's information.  ie, if source_df
175
        # has data for PALLADIUM and GOLD, we split source_df into two
176
        # dataframes, one for each. (same applies if source_df has data for
177
        # AAPL and IBM).
178
        #
179
        # We then take each child df and reindex it to the simulation's date
180
        # range by forward-filling missing values. this makes reads simpler.
181
        #
182
        # Finally, we store the data. For each column, we store a mapping in
183
        # self.augmented_sources_map from the column to a dictionary of
184
        # asset -> df.  In other words,
185
        # self.augmented_sources_map['days_to_cover']['AAPL'] gives us the df
186
        # holding that data.
187
188
        if self._sim_params.emission_rate == "daily":
189
            source_date_index = self.env.days_in_range(
190
                start=self._sim_params.period_start,
191
                end=self._sim_params.period_end
192
            )
193
        else:
194
            source_date_index = self.env.minutes_for_days_in_range(
195
                start=self._sim_params.period_start,
196
                end=self._sim_params.period_end
197
            )
198
199
        # Break the source_df up into one dataframe per sid.  This lets
200
        # us (more easily) calculate accurate start/end dates for each sid,
201
        # de-dup data, and expand the data to fit the backtest start/end date.
202
        grouped_by_sid = source_df.groupby(["sid"])
203
        group_names = grouped_by_sid.groups.keys()
204
        group_dict = {}
205
        for group_name in group_names:
206
            group_dict[group_name] = grouped_by_sid.get_group(group_name)
207
208
        for identifier, df in iteritems(group_dict):
209
            # Before reindexing, save the earliest and latest dates
210
            earliest_date = df.index[0]
211
            latest_date = df.index[-1]
212
213
            # Since we know this df only contains a single sid, we can safely
214
            # de-dupe by the index (dt)
215
            df = df.groupby(level=0).last()
216
217
            # Reindex the dataframe based on the backtest start/end date.
218
            # This makes reads easier during the backtest.
219
            df = df.reindex(index=source_date_index, method='ffill')
220
221
            if not isinstance(identifier, Asset):
222
                # for fake assets we need to store a start/end date
223
                self._asset_start_dates[identifier] = earliest_date
224
                self._asset_end_dates[identifier] = latest_date
225
226
            for col_name in df.columns.difference(['sid']):
227
                if col_name not in self._augmented_sources_map:
228
                    self._augmented_sources_map[col_name] = {}
229
230
                self._augmented_sources_map[col_name][identifier] = df
231
232
    def _open_minute_file(self, field, asset):
233
        sid_str = str(int(asset))
234
235
        try:
236
            carray = self._carrays[field][sid_str]
237
        except KeyError:
238
            carray = self._carrays[field][sid_str] = \
239
                self._get_ctable(asset)[field]
240
241
        return carray
242
243
    def _get_ctable(self, asset):
244
        sid = int(asset)
245
246
        if isinstance(asset, Future):
247
            if self._futures_sid_path_func is not None:
248
                path = self._futures_sid_path_func(
249
                    self._minutes_futures_path, sid
250
                )
251
            else:
252
                path = "{0}/{1}.bcolz".format(self._minutes_futures_path, sid)
253
        elif isinstance(asset, Equity):
254
            if self._equity_minute_reader.sid_path_func is not None:
255
                path = self._equity_minute_reader.sid_path_func(
256
                    self._equity_minute_reader.rootdir, sid
257
                )
258
            else:
259
                path = "{0}/{1}.bcolz".format(
260
                    self._equity_minute_reader.rootdir, sid)
261
262
        else:
263
            # TODO: Figure out if assets should be allowed if neither, and
264
            # why this code path is being hit.
265
            if self._equity_minute_reader.sid_path_func is not None:
266
                path = self._equity_minute_reader.sid_path_func(
267
                    self._equity_minute_reader.rootdir, sid
268
                )
269
            else:
270
                path = "{0}/{1}.bcolz".format(
271
                    self._equity_minute_reader.rootdir, sid)
272
273
        return bcolz.open(path, mode='r')
274
275
    def get_previous_value(self, asset, field, dt):
276
        """
277
        Given an asset and a column and a dt, returns the previous value for
278
        the same asset/column pair.  If this data portal is in minute mode,
279
        it's the previous minute value, otherwise it's the previous day's
280
        value.
281
282
        Parameters
283
        ---------
284
        asset : Asset
285
            The asset whose data is desired.
286
287
        field: string
288
            The desired field of the asset.  Valid values are "open",
289
            "open_price", "high", "low", "close", "close_price", "volume", and
290
            "price".
291
292
        dt: pd.Timestamp
293
            The timestamp from which to go back in time one slot.
294
295
        Returns
296
        -------
297
        The value of the desired field at the desired time.
298
        """
299
        if self._data_frequency == 'daily':
300
            prev_dt = self.env.previous_trading_day(dt)
301
        elif self._data_frequency == 'minute':
302
            prev_dt = self.env.previous_market_minute(dt)
303
304
        return self.get_spot_value(asset, field, prev_dt)
305
306
    def _check_extra_sources(self, asset, column, day):
307
        # If we have an extra source with a column called "price", only look
308
        # at it if it's on something like palladium and not AAPL (since our
309
        # own price data always wins when dealing with assets).
310
        look_in_augmented_sources = column in self._augmented_sources_map and \
311
            not (column in BASE_FIELDS and isinstance(asset, Asset))
312
313
        if look_in_augmented_sources:
314
            # we're being asked for a field in an extra source
315
            try:
316
                return self._augmented_sources_map[column][asset].\
317
                    loc[day, column]
318
            except:
319
                log.error(
320
                    "Could not find value for asset={0}, current_day={1},"
321
                    "column={2}".format(
322
                        str(asset),
323
                        str(self.current_day),
324
                        str(column)))
325
326
                raise KeyError
327
328
    def get_spot_value(self, asset, field, dt=None):
329
        """
330
        Public API method that returns a scalar value representing the value
331
        of the desired asset's field at either the given dt, or this data
332
        portal's current_dt.
333
334
        Parameters
335
        ---------
336
        asset : Asset
337
            The asset whose data is desired.gith
338
339
        field: string
340
            The desired field of the asset.  Valid values are "open",
341
            "open_price", "high", "low", "close", "close_price", "volume", and
342
            "price".
343
344
        dt: pd.Timestamp
345
            (Optional) The timestamp for the desired value.
346
347
        Returns
348
        -------
349
        The value of the desired field at the desired time.
350
        """
351
        extra_source_val = self._check_extra_sources(
352
            asset,
353
            field,
354
            (dt or self.current_dt)
355
        )
356
357
        if extra_source_val is not None:
358
            return extra_source_val
359
360
        if field not in BASE_FIELDS:
361
            raise KeyError("Invalid column: " + str(field))
362
363
        column_to_use = BASE_FIELDS[field]
364
365
        if isinstance(asset, int):
366
            asset = self._asset_finder.retrieve_asset(asset)
367
368
        self._check_is_currently_alive(asset, dt)
369
370
        if self._data_frequency == "daily":
371
            day_to_use = dt or self.current_day
372
            day_to_use = normalize_date(day_to_use)
373
            return self._get_daily_data(asset, column_to_use, day_to_use)
374
        else:
375
            dt_to_use = dt or self.current_dt
376
377
            if isinstance(asset, Future):
378
                return self._get_minute_spot_value_future(
379
                    asset, column_to_use, dt_to_use)
380
            else:
381
                return self._get_minute_spot_value(
382
                    asset, column_to_use, dt_to_use)
383
384
    def _get_minute_spot_value_future(self, asset, column, dt):
385
        # Futures bcolz files have 1440 bars per day (24 hours), 7 days a week.
386
        # The file attributes contain the "start_dt" and "last_dt" fields,
387
        # which represent the time period for this bcolz file.
388
389
        # The start_dt is midnight of the first day that this future started
390
        # trading.
391
392
        # figure out the # of minutes between dt and this asset's start_dt
393
        start_date = self._get_asset_start_date(asset)
394
        minute_offset = int((dt - start_date).total_seconds() / 60)
395
396
        if minute_offset < 0:
397
            # asking for a date that is before the asset's start date, no dice
398
            return 0.0
399
400
        # then just index into the bcolz carray at that offset
401
        carray = self._open_minute_file(column, asset)
402
        result = carray[minute_offset]
403
404
        # if there's missing data, go backwards until we run out of file
405
        while result == 0 and minute_offset > 0:
406
            minute_offset -= 1
407
            result = carray[minute_offset]
408
409
        if column != 'volume':
410
            return result * self.MINUTE_PRICE_ADJUSTMENT_FACTOR
411
        else:
412
            return result
413
414
    def setup_offset_cache(self, minutes_by_day, minutes_to_day):
415
        # TODO: This case should not be hit, but is when tests are setup
416
        # with data_frequency of daily, but run with minutely.
417
        if self._equity_minute_reader is None:
418
            return
419
420
        self._minutes_to_day = minutes_to_day
421
        self._minutes_by_day = minutes_by_day
422
        if self._sim_params is not None:
423
            start = self._sim_params.trading_days[0]
424
            first_trading_day_idx = self._equity_minute_reader.trading_days.\
425
                searchsorted(start)
426
            self._day_offsets = {
427
                day: (i + first_trading_day_idx) * 390
428
                for i, day in enumerate(
429
                    self._sim_params.trading_days)}
430
431
    def _minute_offset(self, dt):
432
        if self._day_offsets is not None:
433
            try:
434
                day = self._minutes_to_day[dt]
435
                minutes = self._minutes_by_day[day]
436
                return self._day_offsets[day] + minutes.get_loc(dt)
437
            except KeyError:
438
                return None
439
440
    def _get_minute_spot_value(self, asset, column, dt):
441
        # if dt is before the first market minute, minute_index
442
        # will be 0.  if it's after the last market minute, it'll
443
        # be len(minutes_for_day)
444
        minute_offset_to_use = self._minute_offset(dt)
445
446
        if minute_offset_to_use is None:
447
            given_day = pd.Timestamp(dt.date(), tz='utc')
448
            day_index = self._equity_minute_reader.trading_days.searchsorted(
449
                given_day)
450
451
            # if dt is before the first market minute, minute_index
452
            # will be 0.  if it's after the last market minute, it'll
453
            # be len(minutes_for_day)
454
            minute_index = self.env.market_minutes_for_day(given_day).\
455
                searchsorted(dt)
456
457
            minute_offset_to_use = (day_index * 390) + minute_index
458
459
        carray = self._equity_minute_reader._open_minute_file(column, asset)
460
        result = carray[minute_offset_to_use]
461
462
        if result == 0:
463
            # if the given minute doesn't have data, we need to seek
464
            # backwards until we find data. This makes the data
465
            # forward-filled.
466
467
            # get this asset's start date, so that we don't look before it.
468
            start_date = self._get_asset_start_date(asset)
469
            start_date_idx = self._equity_minute_reader.trading_days.\
470
                searchsorted(start_date)
471
            start_day_offset = start_date_idx * 390
472
473
            original_start = minute_offset_to_use
474
475
            while result == 0 and minute_offset_to_use > start_day_offset:
476
                minute_offset_to_use -= 1
477
                result = carray[minute_offset_to_use]
478
479
            # once we've found data, we need to check whether it needs
480
            # to be adjusted.
481
            if result != 0:
482
                minutes = self.env.market_minute_window(
483
                    start=(dt or self.current_dt),
484
                    count=(original_start - minute_offset_to_use + 1),
485
                    step=-1
486
                ).order()
487
488
                # only need to check for adjustments if we've gone back
489
                # far enough to cross the day boundary.
490
                if minutes[0].date() != minutes[-1].date():
491
                    # create a np array of size minutes, fill it all with
492
                    # the same value.  and adjust the array.
493
                    arr = np.array([result] * len(minutes),
494
                                   dtype=np.float64)
495
                    self._apply_all_adjustments(
496
                        data=arr,
497
                        asset=asset,
498
                        dts=minutes,
499
                        field=column
500
                    )
501
502
                    # The first value of the adjusted array is the value
503
                    # we want.
504
                    result = arr[0]
505
506
        if column != 'volume':
507
            return result * self.MINUTE_PRICE_ADJUSTMENT_FACTOR
508
        else:
509
            return result
510
511
    def _get_daily_data(self, asset, column, dt):
512
        while True:
513
            try:
514
                value = self._daily_bar_reader.spot_price(asset, dt, column)
515
                if value != -1:
516
                    return value
517
                else:
518
                    dt -= tradingcalendar.trading_day
519
            except NoDataOnDate:
520
                return 0
521
522
    def _get_history_daily_window(self, assets, end_dt, bar_count,
523
                                  field_to_use):
524
        """
525
        Internal method that returns a dataframe containing history bars
526
        of daily frequency for the given sids.
527
        """
528
        day_idx = tradingcalendar.trading_days.searchsorted(end_dt.date())
529
        days_for_window = tradingcalendar.trading_days[
530
            (day_idx - bar_count + 1):(day_idx + 1)]
531
532
        if len(assets) == 0:
533
            return pd.DataFrame(None,
534
                                index=days_for_window,
535
                                columns=None)
536
537
        data = []
538
539
        for asset in assets:
540
            if isinstance(asset, Future):
541
                data.append(self._get_history_daily_window_future(
542
                    asset, days_for_window, end_dt, field_to_use
543
                ))
544
            else:
545
                data.append(self._get_history_daily_window_equity(
546
                    asset, days_for_window, end_dt, field_to_use
547
                ))
548
549
        return pd.DataFrame(
550
            np.array(data).T,
551
            index=days_for_window,
552
            columns=assets
553
        )
554
555
    def _get_history_daily_window_future(self, asset, days_for_window,
556
                                         end_dt, column):
557
        # Since we don't have daily bcolz files for futures (yet), use minute
558
        # bars to calculate the daily values.
559
        data = []
560
        data_groups = []
561
562
        # get all the minutes for the days NOT including today
563
        for day in days_for_window[:-1]:
564
            minutes = self.env.market_minutes_for_day(day)
565
566
            values_for_day = np.zeros(len(minutes), dtype=np.float64)
567
568
            for idx, minute in enumerate(minutes):
569
                minute_val = self._get_minute_spot_value_future(
570
                    asset, column, minute
571
                )
572
573
                values_for_day[idx] = minute_val
574
575
            data_groups.append(values_for_day)
576
577
        # get the minutes for today
578
        last_day_minutes = pd.date_range(
579
            start=self.env.get_open_and_close(end_dt)[0],
580
            end=end_dt,
581
            freq="T"
582
        )
583
584
        values_for_last_day = np.zeros(len(last_day_minutes), dtype=np.float64)
585
586
        for idx, minute in enumerate(last_day_minutes):
587
            minute_val = self._get_minute_spot_value_future(
588
                asset, column, minute
589
            )
590
591
            values_for_last_day[idx] = minute_val
592
593
        data_groups.append(values_for_last_day)
594
595
        for group in data_groups:
596
            if len(group) == 0:
597
                continue
598
599
            if column == 'volume':
600
                data.append(np.sum(group))
601
            elif column == 'open':
602
                data.append(group[0])
603
            elif column == 'close':
604
                data.append(group[-1])
605
            elif column == 'high':
606
                data.append(np.amax(group))
607
            elif column == 'low':
608
                data.append(np.amin(group))
609
610
        return data
611
612
    def _get_history_daily_window_equity(self, asset, days_for_window,
613
                                         end_dt, field_to_use):
614
        sid = int(asset)
615
        ends_at_midnight = end_dt.hour == 0 and end_dt.minute == 0
616
617
        # get the start and end dates for this sid
618
        end_date = self._get_asset_end_date(asset)
619
620
        if ends_at_midnight or (days_for_window[-1] > end_date):
621
            # two cases where we use daily data for the whole range:
622
            # 1) the history window ends at midnight utc.
623
            # 2) the last desired day of the window is after the
624
            # last trading day, use daily data for the whole range.
625
            return self._get_daily_window_for_sid(
626
                asset,
627
                field_to_use,
628
                days_for_window,
629
                extra_slot=False
630
            )
631
        else:
632
            # for the last day of the desired window, use minute
633
            # data and aggregate it.
634
            all_minutes_for_day = self.env.market_minutes_for_day(
635
                pd.Timestamp(end_dt.date()))
636
637
            last_minute_idx = all_minutes_for_day.searchsorted(end_dt)
638
639
            # these are the minutes for the partial day
640
            minutes_for_partial_day =\
641
                all_minutes_for_day[0:(last_minute_idx + 1)]
642
643
            daily_data = self._get_daily_window_for_sid(
644
                sid,
645
                field_to_use,
646
                days_for_window[0:-1]
647
            )
648
649
            minute_data = self._get_minute_window_for_equity(
650
                sid,
651
                field_to_use,
652
                minutes_for_partial_day
653
            )
654
655
            if field_to_use == 'volume':
656
                minute_value = np.sum(minute_data)
657
            elif field_to_use == 'open':
658
                minute_value = minute_data[0]
659
            elif field_to_use == 'close':
660
                minute_value = minute_data[-1]
661
            elif field_to_use == 'high':
662
                minute_value = np.amax(minute_data)
663
            elif field_to_use == 'low':
664
                minute_value = np.amin(minute_data)
665
666
            # append the partial day.
667
            daily_data[-1] = minute_value
668
669
            return daily_data
670
671
    def _get_history_minute_window(self, assets, end_dt, bar_count,
672
                                   field_to_use):
673
        """
674
        Internal method that returns a dataframe containing history bars
675
        of minute frequency for the given sids.
676
        """
677
        # get all the minutes for this window
678
        minutes_for_window = self.env.market_minute_window(
679
            end_dt, bar_count, step=-1)[::-1]
680
681
        first_trading_day = self._equity_minute_reader.first_trading_day
682
683
        # but then cut it down to only the minutes after
684
        # the first trading day.
685
        modified_minutes_for_window = minutes_for_window[
686
            minutes_for_window.slice_indexer(first_trading_day)]
687
688
        modified_minutes_length = len(modified_minutes_for_window)
689
690
        if modified_minutes_length == 0:
691
            raise ValueError("Cannot calculate history window that ends"
692
                             "before 2002-01-02 14:31 UTC!")
693
694
        data = []
695
        bars_to_prepend = 0
696
        nans_to_prepend = None
697
698
        if modified_minutes_length < bar_count:
699
            first_trading_date = first_trading_day.date()
700
            if modified_minutes_for_window[0].date() == first_trading_date:
701
                # the beginning of the window goes before our global trading
702
                # start date
703
                bars_to_prepend = bar_count - modified_minutes_length
704
                nans_to_prepend = np.repeat(np.nan, bars_to_prepend)
705
706
        if len(assets) == 0:
707
            return pd.DataFrame(
708
                None,
709
                index=modified_minutes_for_window,
710
                columns=None
711
            )
712
713
        for asset in assets:
714
            asset_minute_data = self._get_minute_window_for_asset(
715
                asset,
716
                field_to_use,
717
                modified_minutes_for_window
718
            )
719
720
            if bars_to_prepend != 0:
721
                asset_minute_data = np.insert(asset_minute_data, 0,
722
                                              nans_to_prepend)
723
724
            data.append(asset_minute_data)
725
726
        return pd.DataFrame(
727
            np.array(data).T,
728
            index=minutes_for_window,
729
            columns=map(int, assets)
730
        )
731
732
    def get_history_window(self, assets, end_dt, bar_count, frequency, field,
733
                           ffill=True):
734
        """
735
        Public API method that returns a dataframe containing the requested
736
        history window.  Data is fully adjusted.
737
738
        Parameters
739
        ---------
740
        assets : list of zipline.data.Asset objects
741
            The assets whose data is desired.
742
743
        bar_count: int
744
            The number of bars desired.
745
746
        frequency: string
747
            "1d" or "1m"
748
749
        field: string
750
            The desired field of the asset.
751
752
        ffill: boolean
753
            Forward-fill missing values. Only has effect if field
754
            is 'price'.
755
756
        Returns
757
        -------
758
        A dataframe containing the requested data.
759
        """
760
        try:
761
            field_to_use = BASE_FIELDS[field]
762
        except KeyError:
763
            raise ValueError("Invalid history field: " + str(field))
764
765
        # sanity check in case sids were passed in
766
        assets = [(self.env.asset_finder.retrieve_asset(asset) if
767
                   isinstance(asset, int) else asset) for asset in assets]
768
769
        if frequency == "1d":
770
            df = self._get_history_daily_window(assets, end_dt, bar_count,
771
                                                field_to_use)
772
        elif frequency == "1m":
773
            df = self._get_history_minute_window(assets, end_dt, bar_count,
774
                                                 field_to_use)
775
        else:
776
            raise ValueError("Invalid frequency: {0}".format(frequency))
777
778
        # forward-fill if needed
779
        if field == "price" and ffill:
780
            df.fillna(method='ffill', inplace=True)
781
782
        return df
783
784
    def _get_minute_window_for_asset(self, asset, field, minutes_for_window):
785
        """
786
        Internal method that gets a window of adjusted minute data for an asset
787
        and specified date range.  Used to support the history API method for
788
        minute bars.
789
790
        Missing bars are filled with NaN.
791
792
        Parameters
793
        ----------
794
        asset : Asset
795
            The asset whose data is desired.
796
797
        field: string
798
            The specific field to return.  "open", "high", "close_price", etc.
799
800
        minutes_for_window: pd.DateTimeIndex
801
            The list of minutes representing the desired window.  Each minute
802
            is a pd.Timestamp.
803
804
        Returns
805
        -------
806
        A numpy array with requested values.
807
        """
808
        if isinstance(asset, int):
809
            asset = self.env.asset_finder.retrieve_asset(asset)
810
811
        if isinstance(asset, Future):
812
            return self._get_minute_window_for_future(asset, field,
813
                                                      minutes_for_window)
814
        else:
815
            return self._get_minute_window_for_equity(asset, field,
816
                                                      minutes_for_window)
817
818
    def _get_minute_window_for_future(self, asset, field, minutes_for_window):
819
        # THIS IS TEMPORARY.  For now, we are only exposing futures within
820
        # equity trading hours (9:30 am to 4pm, Eastern).  The easiest way to
821
        # do this is to simply do a spot lookup for each desired minute.
822
        return_data = np.zeros(len(minutes_for_window), dtype=np.float64)
823
        for idx, minute in enumerate(minutes_for_window):
824
            return_data[idx] = \
825
                self._get_minute_spot_value_future(asset, field, minute)
826
827
        # Note: an improvement could be to find the consecutive runs within
828
        # minutes_for_window, and use them to read the underlying ctable
829
        # more efficiently.
830
831
        # Once futures are on 24-hour clock, then we can just grab all the
832
        # requested minutes in one shot from the ctable.
833
834
        # no adjustments for futures, yay.
835
        return return_data
836
837
    def _get_minute_window_for_equity(self, asset, field, minutes_for_window):
838
        # each sid's minutes are stored in a bcolz file
839
        # the bcolz file has 390 bars per day, starting at 1/2/2002, regardless
840
        # of when the asset started trading and regardless of half days.
841
        # for a half day, the second half is filled with zeroes.
842
843
        # find the position of start_dt in the entire timeline, go back
844
        # bar_count bars, and that's the unadjusted data
845
        raw_data = self._equity_minute_reader._open_minute_file(field, asset)
846
847
        start_idx = max(
848
            self._equity_minute_reader._find_position_of_minute(
849
                minutes_for_window[0]),
850
            0)
851
        end_idx = self._equity_minute_reader._find_position_of_minute(
852
            minutes_for_window[-1]) + 1
853
854
        if end_idx == 0:
855
            # No data to return for minute window.
856
            return np.full(len(minutes_for_window), np.nan)
857
858
        return_data = np.zeros(len(minutes_for_window), dtype=np.float64)
859
860
        data_to_copy = raw_data[start_idx:end_idx]
861
862
        num_minutes = len(minutes_for_window)
863
864
        # data_to_copy contains all the zeros (from 1pm to 4pm of an early
865
        # close).  num_minutes is the number of actual trading minutes.  if
866
        # these two have different lengths, that means that we need to trim
867
        # away data due to early closes.
868
        if len(data_to_copy) != num_minutes:
869
            # get a copy of the minutes in Eastern time, since we depend on
870
            # an early close being at 1pm Eastern.
871
            eastern_minutes = minutes_for_window.tz_convert("US/Eastern")
872
873
            # accumulate a list of indices of the last minute of an early
874
            # close day.  For example, if data_to_copy starts at 12:55 pm, and
875
            # there are five minutes of real data before 180 zeroes, we would
876
            # put 5 into last_minute_idx_of_early_close_day, because the fifth
877
            # minute is the last "real" minute of the day.
878
            last_minute_idx_of_early_close_day = []
879
            for minute_idx, minute_dt in enumerate(eastern_minutes):
880
                if minute_idx == (num_minutes - 1):
881
                    break
882
883
                if minute_dt.hour == 13 and minute_dt.minute == 0:
884
                    next_minute = eastern_minutes[minute_idx + 1]
885
                    if next_minute.hour != 13:
886
                        # minute_dt is the last minute of an early close day
887
                        last_minute_idx_of_early_close_day.append(minute_idx)
888
889
            # spin through the list of early close markers, and use them to
890
            # chop off 180 minutes at a time from data_to_copy.
891
            for idx, early_close_minute_idx in \
892
                    enumerate(last_minute_idx_of_early_close_day):
893
                early_close_minute_idx -= (180 * idx)
894
                data_to_copy = np.delete(
895
                    data_to_copy,
896
                    range(
897
                        early_close_minute_idx + 1,
898
                        early_close_minute_idx + 181
899
                    )
900
                )
901
902
        return_data[0:len(data_to_copy)] = data_to_copy
903
904
        self._apply_all_adjustments(
905
            return_data,
906
            asset,
907
            minutes_for_window,
908
            field,
909
            self.MINUTE_PRICE_ADJUSTMENT_FACTOR
910
        )
911
912
        return return_data
913
914
    def _apply_all_adjustments(self, data, asset, dts, field,
915
                               price_adj_factor=1.0):
916
        """
917
        Internal method that applies all the necessary adjustments on the
918
        given data array.
919
920
        The adjustments are:
921
        - splits
922
        - if field != "volume":
923
            - mergers
924
            - dividends
925
            - * 0.001
926
            - any zero fields replaced with NaN
927
        - all values rounded to 3 digits after the decimal point.
928
929
        Parameters
930
        ----------
931
        data : np.array
932
            The data to be adjusted.
933
934
        asset: Asset
935
            The asset whose data is being adjusted.
936
937
        dts: pd.DateTimeIndex
938
            The list of minutes or days representing the desired window.
939
940
        field: string
941
            The field whose values are in the data array.
942
943
        price_adj_factor: float
944
            Factor with which to adjust OHLC values.
945
        Returns
946
        -------
947
        None.  The data array is modified in place.
948
        """
949
        self._apply_adjustments_to_window(
950
            self._get_adjustment_list(
951
                asset, self._splits_dict, "SPLITS"
952
            ),
953
            data,
954
            dts,
955
            field != 'volume'
956
        )
957
958
        if field != 'volume':
959
            self._apply_adjustments_to_window(
960
                self._get_adjustment_list(
961
                    asset, self._mergers_dict, "MERGERS"
962
                ),
963
                data,
964
                dts,
965
                True
966
            )
967
968
            self._apply_adjustments_to_window(
969
                self._get_adjustment_list(
970
                    asset, self._dividends_dict, "DIVIDENDS"
971
                ),
972
                data,
973
                dts,
974
                True
975
            )
976
977
            data *= price_adj_factor
978
979
            # if anything is zero, it's a missing bar, so replace it with NaN.
980
            # we only want to do this for non-volume fields, because a missing
981
            # volume should be 0.
982
            data[data == 0] = np.NaN
983
984
        np.around(data, 3, out=data)
985
986
    def _get_daily_window_for_sid(self, asset, field, days_in_window,
987
                                  extra_slot=True):
988
        """
989
        Internal method that gets a window of adjusted daily data for a sid
990
        and specified date range.  Used to support the history API method for
991
        daily bars.
992
993
        Parameters
994
        ----------
995
        asset : Asset
996
            The asset whose data is desired.
997
998
        start_dt: pandas.Timestamp
999
            The start of the desired window of data.
1000
1001
        bar_count: int
1002
            The number of days of data to return.
1003
1004
        field: string
1005
            The specific field to return.  "open", "high", "close_price", etc.
1006
1007
        extra_slot: boolean
1008
            Whether to allocate an extra slot in the returned numpy array.
1009
            This extra slot will hold the data for the last partial day.  It's
1010
            much better to create it here than to create a copy of the array
1011
            later just to add a slot.
1012
1013
        Returns
1014
        -------
1015
        A numpy array with requested values.  Any missing slots filled with
1016
        nan.
1017
1018
        """
1019
        bar_count = len(days_in_window)
1020
        # create an np.array of size bar_count
1021
        if extra_slot:
1022
            return_array = np.zeros((bar_count + 1,))
1023
        else:
1024
            return_array = np.zeros((bar_count,))
1025
1026
        return_array[:] = np.NAN
1027
1028
        start_date = self._get_asset_start_date(asset)
1029
        end_date = self._get_asset_end_date(asset)
1030
        day_slice = days_in_window.slice_indexer(start_date, end_date)
1031
        active_days = days_in_window[day_slice]
1032
1033
        if active_days.shape[0]:
1034
            data = self._daily_bar_reader.history_window(field,
1035
                                                         active_days[0],
1036
                                                         active_days[-1],
1037
                                                         asset)
1038
            return_array[day_slice] = data
1039
            self._apply_all_adjustments(
1040
                return_array,
1041
                asset,
1042
                active_days,
1043
                field,
1044
            )
1045
1046
        return return_array
1047
1048
    @staticmethod
1049
    def _apply_adjustments_to_window(adjustments_list, window_data,
1050
                                     dts_in_window, multiply):
1051
        if len(adjustments_list) == 0:
1052
            return
1053
1054
        # advance idx to the correct spot in the adjustments list, based on
1055
        # when the window starts
1056
        idx = 0
1057
1058
        while idx < len(adjustments_list) and dts_in_window[0] >\
1059
                adjustments_list[idx][0]:
1060
            idx += 1
1061
1062
        # if we've advanced through all the adjustments, then there's nothing
1063
        # to do.
1064
        if idx == len(adjustments_list):
1065
            return
1066
1067
        while idx < len(adjustments_list):
1068
            adjustment_to_apply = adjustments_list[idx]
1069
1070
            if adjustment_to_apply[0] > dts_in_window[-1]:
1071
                break
1072
1073
            range_end = dts_in_window.searchsorted(adjustment_to_apply[0])
1074
            if multiply:
1075
                window_data[0:range_end] *= adjustment_to_apply[1]
1076
            else:
1077
                window_data[0:range_end] /= adjustment_to_apply[1]
1078
1079
            idx += 1
1080
1081
    def _get_adjustment_list(self, asset, adjustments_dict, table_name):
1082
        """
1083
        Internal method that returns a list of adjustments for the given sid.
1084
1085
        Parameters
1086
        ----------
1087
        asset : Asset
1088
            The asset for which to return adjustments.
1089
1090
        adjustments_dict: dict
1091
            A dictionary of sid -> list that is used as a cache.
1092
1093
        table_name: string
1094
            The table that contains this data in the adjustments db.
1095
1096
        Returns
1097
        -------
1098
        adjustments: list
1099
            A list of [multiplier, pd.Timestamp], earliest first
1100
1101
        """
1102
        if self._adjustment_reader is None:
1103
            return []
1104
1105
        sid = int(asset)
1106
1107
        try:
1108
            adjustments = adjustments_dict[sid]
1109
        except KeyError:
1110
            adjustments = adjustments_dict[sid] = self._adjustment_reader.\
1111
                get_adjustments_for_sid(table_name, sid)
1112
1113
        return adjustments
1114
1115
    def get_equity_price_view(self, asset):
1116
        """
1117
        Returns a DataPortalSidView for the given asset.  Used to support the
1118
        data[sid(N)] public API.  Not needed if DataPortal is used standalone.
1119
1120
        Parameters
1121
        ----------
1122
        asset : Asset
1123
            Asset that is being queried.
1124
1125
        Returns
1126
        -------
1127
        DataPortalSidView: Accessor into the given asset's data.
1128
        """
1129
        try:
1130
            view = self.views[asset]
1131
        except KeyError:
1132
            view = self.views[asset] = DataPortalSidView(asset, self)
1133
1134
        return view
1135
1136
    def _check_is_currently_alive(self, asset, dt):
1137
        if dt is None:
1138
            dt = self.current_day
1139
1140
        sid = int(asset)
1141
1142
        if sid not in self._asset_start_dates:
1143
            self._get_asset_start_date(asset)
1144
1145
        start_date = self._asset_start_dates[sid]
1146
        if self._asset_start_dates[sid] > dt:
1147
            raise NoTradeDataAvailableTooEarly(
1148
                sid=sid,
1149
                dt=dt,
1150
                start_dt=start_date
1151
            )
1152
1153
        end_date = self._asset_end_dates[sid]
1154
        if self._asset_end_dates[sid] < dt:
1155
            raise NoTradeDataAvailableTooLate(
1156
                sid=sid,
1157
                dt=dt,
1158
                end_dt=end_date
1159
            )
1160
1161
    def _get_asset_start_date(self, asset):
1162
        self._ensure_asset_dates(asset)
1163
        return self._asset_start_dates[asset]
1164
1165
    def _get_asset_end_date(self, asset):
1166
        self._ensure_asset_dates(asset)
1167
        return self._asset_end_dates[asset]
1168
1169
    def _ensure_asset_dates(self, asset):
1170
        sid = int(asset)
1171
1172
        if sid not in self._asset_start_dates:
1173
            self._asset_start_dates[sid] = asset.start_date
1174
            self._asset_end_dates[sid] = asset.end_date
1175
1176
    def get_splits(self, sids, dt):
1177
        """
1178
        Returns any splits for the given sids and the given dt.
1179
1180
        Parameters
1181
        ----------
1182
        sids : list
1183
            Sids for which we want splits.
1184
1185
        dt: pd.Timestamp
1186
            The date for which we are checking for splits.  Note: this is
1187
            expected to be midnight UTC.
1188
1189
        Returns
1190
        -------
1191
        list: List of splits, where each split is a (sid, ratio) tuple.
1192
        """
1193
        if self._adjustment_reader is None or len(sids) == 0:
1194
            return {}
1195
1196
        # convert dt to # of seconds since epoch, because that's what we use
1197
        # in the adjustments db
1198
        seconds = int(dt.value / 1e9)
1199
1200
        splits = self._adjustment_reader.conn.execute(
1201
            "SELECT sid, ratio FROM SPLITS WHERE effective_date = ?",
1202
            (seconds,)).fetchall()
1203
1204
        sids_set = set(sids)
1205
        splits = [split for split in splits if split[0] in sids_set]
1206
1207
        return splits
1208
1209
    def get_stock_dividends(self, sid, trading_days):
1210
        """
1211
        Returns all the stock dividends for a specific sid that occur
1212
        in the given trading range.
1213
1214
        Parameters
1215
        ----------
1216
        sid: int
1217
            The asset whose stock dividends should be returned.
1218
1219
        trading_days: pd.DatetimeIndex
1220
            The trading range.
1221
1222
        Returns
1223
        -------
1224
        list: A list of objects with all relevant attributes populated.
1225
        All timestamp fields are converted to pd.Timestamps.
1226
        """
1227
1228
        if self._adjustment_reader is None:
1229
            return []
1230
1231
        if len(trading_days) == 0:
1232
            return []
1233
1234
        start_dt = trading_days[0].value / 1e9
1235
        end_dt = trading_days[-1].value / 1e9
1236
1237
        dividends = self._adjustment_reader.conn.execute(
1238
            "SELECT * FROM stock_dividend_payouts WHERE sid = ? AND "
1239
            "ex_date > ? AND pay_date < ?", (int(sid), start_dt, end_dt,)).\
1240
            fetchall()
1241
1242
        dividend_info = []
1243
        for dividend_tuple in dividends:
1244
            dividend_info.append({
1245
                "declared_date": dividend_tuple[1],
1246
                "ex_date": pd.Timestamp(dividend_tuple[2], unit="s"),
1247
                "pay_date": pd.Timestamp(dividend_tuple[3], unit="s"),
1248
                "payment_sid": dividend_tuple[4],
1249
                "ratio": dividend_tuple[5],
1250
                "record_date": pd.Timestamp(dividend_tuple[6], unit="s"),
1251
                "sid": dividend_tuple[7]
1252
            })
1253
1254
        return dividend_info
1255
1256
    def contains(self, asset, field):
1257
        return field in BASE_FIELDS or \
1258
            (field in self._augmented_sources_map and
1259
             asset in self._augmented_sources_map[field])
1260
1261
    def get_fetcher_assets(self):
1262
        """
1263
        Returns a list of assets for the current date, as defined by the
1264
        fetcher data.
1265
1266
        Notes
1267
        -----
1268
        Data is forward-filled.  If there is no fetcher data defined for day
1269
        N, we use day N-1's data (if available, otherwise we keep going back).
1270
1271
        Returns
1272
        -------
1273
        list: a list of Asset objects.
1274
        """
1275
        # return a list of assets for the current date, as defined by the
1276
        # fetcher source
1277
        if self._extra_source_df is None:
1278
            return []
1279
1280
        if self.current_day in self._extra_source_df.index:
1281
            date_to_use = self.current_day
1282
        else:
1283
            # current day isn't in the fetcher df, go back the last
1284
            # available day
1285
            idx = self._extra_source_df.index.searchsorted(self.current_day)
1286
            if idx == 0:
1287
                return []
1288
1289
            date_to_use = self._extra_source_df.index[idx - 1]
1290
1291
        asset_list = self._extra_source_df.loc[date_to_use]["sid"]
1292
1293
        # make sure they're actually assets
1294
        asset_list = [asset for asset in asset_list
1295
                      if isinstance(asset, Asset)]
1296
1297
        return asset_list
1298
1299
1300
class DataPortalSidView(object):
1301
    def __init__(self, asset, portal):
1302
        self.asset = asset
1303
        self.portal = portal
1304
1305
    def __getattr__(self, column):
1306
        return self.portal.get_spot_value(self.asset, column)
1307
1308
    def __contains__(self, column):
1309
        return self.portal.contains(self.asset, column)
1310
1311
    def __getitem__(self, column):
1312
        return self.__getattr__(column)
1313