Completed
Pull Request — master (#858)
by Eddie
01:50
created

zipline.data.DataPortal._get_ctable()   B

Complexity

Conditions 5

Size

Total Lines 17

Duplication

Lines 0
Ratio 0 %
Metric Value
cc 5
dl 0
loc 17
rs 8.5455
1
#
2
# Copyright 2015 Quantopian, Inc.
3
#
4
# Licensed under the Apache License, Version 2.0 (the "License");
5
# you may not use this file except in compliance with the License.
6
# You may obtain a copy of the License at
7
#
8
#     http://www.apache.org/licenses/LICENSE-2.0
9
#
10
# Unless required by applicable law or agreed to in writing, software
11
# distributed under the License is distributed on an "AS IS" BASIS,
12
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13
# See the License for the specific language governing permissions and
14
# limitations under the License.
15
16
from datetime import datetime
17
import bcolz
18
from logbook import Logger
19
20
import numpy as np
21
import pandas as pd
22
from pandas.tslib import normalize_date
23
from six import iteritems
24
25
from zipline.assets import Asset, Future, Equity
26
from zipline.data.us_equity_pricing import (
27
    BcolzDailyBarReader,
28
    NoDataOnDate
29
)
30
from zipline.pipeline.data.equity_pricing import USEquityPricing
31
32
from zipline.utils import tradingcalendar
33
from zipline.errors import (
34
    NoTradeDataAvailableTooEarly,
35
    NoTradeDataAvailableTooLate
36
)
37
38
FIRST_TRADING_MINUTE = pd.Timestamp("2002-01-02 14:31:00", tz='UTC')
39
40
# FIXME should this be passed in (is this qexec specific?)?
41
INDEX_OF_FIRST_TRADING_DAY = 3028
42
43
log = Logger('DataPortal')
44
45
HISTORY_FREQUENCIES = ["1d", "1m"]
46
47
BASE_FIELDS = {
48
    'open': 'open',
49
    'open_price': 'open',
50
    'high': 'high',
51
    'low': 'low',
52
    'close': 'close',
53
    'close_price': 'close',
54
    'volume': 'volume',
55
    'price': 'close'
56
}
57
58
59
US_EQUITY_COLUMNS = {
60
    'open': USEquityPricing.open,
61
    'open_price': USEquityPricing.open,
62
    'high': USEquityPricing.high,
63
    'low': USEquityPricing.low,
64
    'close': USEquityPricing.close,
65
    'close_price': USEquityPricing.close,
66
    'volume': USEquityPricing.volume,
67
    'price': USEquityPricing.close,
68
}
69
70
71
class DataPortal(object):
72
    def __init__(self,
73
                 env,
74
                 sim_params=None,
75
                 minutes_equities_path=None,
76
                 daily_equities_path=None,
77
                 adjustment_reader=None,
78
                 equity_sid_path_func=None,
79
                 futures_sid_path_func=None):
80
        self.env = env
81
82
        # Internal pointers to the current dt (can be minute) and current day.
83
        # In daily mode, they point to the same thing. In minute mode, it's
84
        # useful to have separate pointers to the current day and to the
85
        # current minute.  These pointers are updated by the
86
        # AlgorithmSimulator's transform loop.
87
        self.current_dt = None
88
        self.current_day = None
89
90
        # This is a bit ugly, but is here for performance reasons.  In minute
91
        # simulations, we need to very quickly go from dt -> (# of minutes
92
        # since Jan 1 2002 9:30 Eastern).
93
        #
94
        # The clock that heartbeats the simulation has all the necessary
95
        # information to do this calculation very quickly.  This value is
96
        # calculated there, and then set here
97
        self.cur_data_offset = 0
98
99
        self.views = {}
100
101
        self._minutes_equities_path = minutes_equities_path
102
        self._daily_equities_path = daily_equities_path
103
        self._asset_finder = env.asset_finder
104
105
        self._carrays = {
106
            'open': {},
107
            'high': {},
108
            'low': {},
109
            'close': {},
110
            'volume': {},
111
            'sid': {},
112
            'dt': {},
113
        }
114
115
        self._adjustment_reader = adjustment_reader
116
117
        # caches of sid -> adjustment list
118
        self._splits_dict = {}
119
        self._mergers_dict = {}
120
        self._dividends_dict = {}
121
122
        # Cache of sid -> the first trading day of an asset, even if that day
123
        # is before 1/2/2002.
124
        self._asset_start_dates = {}
125
        self._asset_end_dates = {}
126
127
        # Handle extra sources, like Fetcher.
128
        self._augmented_sources_map = {}
129
        self._extra_source_df = None
130
131
        self._sim_params = sim_params
132
        if self._sim_params is not None:
133
            self._data_frequency = self._sim_params.data_frequency
134
        else:
135
            self._data_frequency = "minute"
136
137
        self._equity_sid_path_func = equity_sid_path_func
138
        self._futures_sid_path_func = futures_sid_path_func
139
140
        self.MINUTE_PRICE_ADJUSTMENT_FACTOR = 0.001
141
142
        if daily_equities_path is not None:
143
            self._daily_bar_reader = BcolzDailyBarReader(daily_equities_path)
144
        else:
145
            self._daily_bar_reader = None
146
147
        # The following values are used by _minute_offset to calculate the
148
        # index into the minute bcolz date.
149
150
        # A lookup of table every minute to the corresponding day, to avoid
151
        # calling `.date()` on every lookup.
152
        self._minutes_to_day = {}
153
        # A map of days (keyed by midnight) to a DatetimeIndex of market
154
        # minutes for that day.
155
        self._minutes_by_day = {}
156
        # A dict of day to the offset into the minute bcolz on which that
157
        # days data starts.
158
        self._day_offsets = None
159
160
    def handle_extra_source(self, source_df):
161
        """
162
        Extra sources always have a sid column.
163
164
        We expand the given data (by forward filling) to the full range of
165
        the simulation dates, so that lookup is fast during simulation.
166
        """
167
        if source_df is None:
168
            return
169
170
        self._extra_source_df = source_df
171
172
        # source_df's sid column can either consist of assets we know about
173
        # (such as sid(24)) or of assets we don't know about (such as
174
        # palladium).
175
        #
176
        # In both cases, we break up the dataframe into individual dfs
177
        # that only contain a single asset's information.  ie, if source_df
178
        # has data for PALLADIUM and GOLD, we split source_df into two
179
        # dataframes, one for each. (same applies if source_df has data for
180
        # AAPL and IBM).
181
        #
182
        # We then take each child df and reindex it to the simulation's date
183
        # range by forward-filling missing values. this makes reads simpler.
184
        #
185
        # Finally, we store the data. For each column, we store a mapping in
186
        # self.augmented_sources_map from the column to a dictionary of
187
        # asset -> df.  In other words,
188
        # self.augmented_sources_map['days_to_cover']['AAPL'] gives us the df
189
        # holding that data.
190
191
        if self._sim_params.emission_rate == "daily":
192
            source_date_index = self.env.days_in_range(
193
                start=self._sim_params.period_start,
194
                end=self._sim_params.period_end
195
            )
196
        else:
197
            source_date_index = self.env.minutes_for_days_in_range(
198
                start=self._sim_params.period_start,
199
                end=self._sim_params.period_end
200
            )
201
202
        # Break the source_df up into one dataframe per sid.  This lets
203
        # us (more easily) calculate accurate start/end dates for each sid,
204
        # de-dup data, and expand the data to fit the backtest start/end date.
205
        grouped_by_sid = source_df.groupby(["sid"])
206
        group_names = grouped_by_sid.groups.keys()
207
        group_dict = {}
208
        for group_name in group_names:
209
            group_dict[group_name] = grouped_by_sid.get_group(group_name)
210
211
        for identifier, df in iteritems(group_dict):
212
            # Before reindexing, save the earliest and latest dates
213
            earliest_date = df.index[0]
214
            latest_date = df.index[-1]
215
216
            # Since we know this df only contains a single sid, we can safely
217
            # de-dupe by the index (dt)
218
            df = df.groupby(level=0).last()
219
220
            # Reindex the dataframe based on the backtest start/end date.
221
            # This makes reads easier during the backtest.
222
            df = df.reindex(index=source_date_index, method='ffill')
223
224
            if not isinstance(identifier, Asset):
225
                # for fake assets we need to store a start/end date
226
                self._asset_start_dates[identifier] = earliest_date
227
                self._asset_end_dates[identifier] = latest_date
228
229
            for col_name in df.columns.difference(['sid']):
230
                if col_name not in self._augmented_sources_map:
231
                    self._augmented_sources_map[col_name] = {}
232
233
                self._augmented_sources_map[col_name][identifier] = df
234
235
    def _open_minute_file(self, field, asset):
236
        sid_str = str(int(asset))
237
238
        try:
239
            carray = self._carrays[field][sid_str]
240
        except KeyError:
241
            carray = self._carrays[field][sid_str] = \
242
                self._get_ctable(asset)[field]
243
244
        return carray
245
246
    def _get_ctable(self, asset):
247
        sid = int(asset)
248
249
        if isinstance(asset, Future) and \
250
                self._futures_sid_path_func is not None:
251
            path = self._futures_sid_path_func(
252
                self._minutes_equities_path, sid
253
            )
254
        elif isinstance(asset, Equity) and \
255
                self._equity_sid_path_func is not None:
256
            path = self._equity_sid_path_func(
257
                self._minutes_equities_path, sid
258
            )
259
        else:
260
            path = "{0}/{1}.bcolz".format(self._minutes_equities_path, sid)
261
262
        return bcolz.open(path, mode='r')
263
264
    def get_previous_value(self, asset, field, dt):
265
        """
266
        Given an asset and a column and a dt, returns the previous value for
267
        the same asset/column pair.  If this data portal is in minute mode,
268
        it's the previous minute value, otherwise it's the previous day's
269
        value.
270
271
        Parameters
272
        ---------
273
        asset : Asset
274
            The asset whose data is desired.
275
276
        field: string
277
            The desired field of the asset.  Valid values are "open",
278
            "open_price", "high", "low", "close", "close_price", "volume", and
279
            "price".
280
281
        dt: pd.Timestamp
282
            The timestamp from which to go back in time one slot.
283
284
        Returns
285
        -------
286
        The value of the desired field at the desired time.
287
        """
288
        if self._data_frequency == 'daily':
289
            prev_dt = self.env.previous_trading_day(dt)
290
        elif self._data_frequency == 'minute':
291
            prev_dt = self.env.previous_market_minute(dt)
292
293
        return self.get_spot_value(asset, field, prev_dt)
294
295
    def _check_extra_sources(self, asset, column, day):
296
        # If we have an extra source with a column called "price", only look
297
        # at it if it's on something like palladium and not AAPL (since our
298
        # own price data always wins when dealing with assets).
299
        look_in_augmented_sources = column in self._augmented_sources_map and \
300
            not (column in BASE_FIELDS and isinstance(asset, Asset))
301
302
        if look_in_augmented_sources:
303
            # we're being asked for a field in an extra source
304
            try:
305
                return self._augmented_sources_map[column][asset].\
306
                    loc[day, column]
307
            except:
308
                log.error(
309
                    "Could not find value for asset={0}, current_day={1},"
310
                    "column={2}".format(
311
                        str(asset),
312
                        str(self.current_day),
313
                        str(column)))
314
315
                raise KeyError
316
317
    def get_spot_value(self, asset, field, dt=None):
318
        """
319
        Public API method that returns a scalar value representing the value
320
        of the desired asset's field at either the given dt, or this data
321
        portal's current_dt.
322
323
        Parameters
324
        ---------
325
        asset : Asset
326
            The asset whose data is desired.gith
327
328
        field: string
329
            The desired field of the asset.  Valid values are "open",
330
            "open_price", "high", "low", "close", "close_price", "volume", and
331
            "price".
332
333
        dt: pd.Timestamp
334
            (Optional) The timestamp for the desired value.
335
336
        Returns
337
        -------
338
        The value of the desired field at the desired time.
339
        """
340
        extra_source_val = self._check_extra_sources(
341
            asset,
342
            field,
343
            (dt or self.current_dt)
344
        )
345
346
        if extra_source_val is not None:
347
            return extra_source_val
348
349
        if field not in BASE_FIELDS:
350
            raise KeyError("Invalid column: " + str(field))
351
352
        column_to_use = BASE_FIELDS[field]
353
354
        if isinstance(asset, int):
355
            asset = self._asset_finder.retrieve_asset(asset)
356
357
        self._check_is_currently_alive(asset, dt)
358
359
        if self._data_frequency == "daily":
360
            day_to_use = dt or self.current_day
361
            day_to_use = normalize_date(day_to_use)
362
            return self._get_daily_data(asset, column_to_use, day_to_use)
363
        else:
364
            dt_to_use = dt or self.current_dt
365
366
            if isinstance(asset, Future):
367
                return self._get_minute_spot_value_future(
368
                    asset, column_to_use, dt_to_use)
369
            else:
370
                return self._get_minute_spot_value(
371
                    asset, column_to_use, dt_to_use)
372
373
    def _get_minute_spot_value_future(self, asset, column, dt):
374
        # Futures bcolz files have 1440 bars per day (24 hours), 7 days a week.
375
        # The file attributes contain the "start_dt" and "last_dt" fields,
376
        # which represent the time period for this bcolz file.
377
378
        # The start_dt is midnight of the first day that this future started
379
        # trading.
380
381
        # figure out the # of minutes between dt and this asset's start_dt
382
        start_date = self._get_asset_start_date(asset)
383
        minute_offset = int((dt - start_date).total_seconds() / 60)
384
385
        if minute_offset < 0:
386
            # asking for a date that is before the asset's start date, no dice
387
            return 0.0
388
389
        # then just index into the bcolz carray at that offset
390
        carray = self._open_minute_file(column, asset)
391
        result = carray[minute_offset]
392
393
        # if there's missing data, go backwards until we run out of file
394
        while result == 0 and minute_offset > 0:
395
            minute_offset -= 1
396
            result = carray[minute_offset]
397
398
        if column != 'volume':
399
            return result * self.MINUTE_PRICE_ADJUSTMENT_FACTOR
400
        else:
401
            return result
402
403
    def setup_offset_cache(self, minutes_by_day, minutes_to_day):
404
        # All our minute bcolz files are written starting on 1/2/2002,
405
        # with 390 minutes per day, regarding of when the security started
406
        # trading. This lets us avoid doing an offset calculation related
407
        # to the asset start date.  Hard-coding 390 minutes per day lets us
408
        # ignore half days.
409
        td = tradingcalendar.trading_days
410
411
        self._minutes_to_day = minutes_to_day
412
        self._minutes_by_day = minutes_by_day
413
        if self._sim_params is not None:
414
            start = self._sim_params.trading_days[0]
415
            first_trading_day_idx = td.searchsorted(start) - \
416
                INDEX_OF_FIRST_TRADING_DAY
417
            self._day_offsets = {
418
                day: (i + first_trading_day_idx) * 390
419
                for i, day in enumerate(
420
                    self._sim_params.trading_days)}
421
422
    def _minute_offset(self, dt):
423
        if self._day_offsets is not None:
424
            try:
425
                day = self._minutes_to_day[dt]
426
                minutes = self._minutes_by_day[day]
427
                return self._day_offsets[day] + minutes.get_loc(dt)
428
            except KeyError:
429
                return None
430
431
    def _get_minute_spot_value(self, asset, column, dt):
432
        # if dt is before the first market minute, minute_index
433
        # will be 0.  if it's after the last market minute, it'll
434
        # be len(minutes_for_day)
435
        minute_offset_to_use = self._minute_offset(dt)
436
437
        if minute_offset_to_use is None:
438
            given_day = pd.Timestamp(dt.date(), tz='utc')
439
            day_index = tradingcalendar.trading_days.searchsorted(
440
                given_day) - INDEX_OF_FIRST_TRADING_DAY
441
442
            # if dt is before the first market minute, minute_index
443
            # will be 0.  if it's after the last market minute, it'll
444
            # be len(minutes_for_day)
445
            minute_index = self.env.market_minutes_for_day(given_day).\
446
                searchsorted(dt)
447
448
            minute_offset_to_use = (day_index * 390) + minute_index
449
450
        carray = self._open_minute_file(column, asset)
451
        result = carray[minute_offset_to_use]
452
453
        if result == 0:
454
            # if the given minute doesn't have data, we need to seek
455
            # backwards until we find data. This makes the data
456
            # forward-filled.
457
458
            # get this asset's start date, so that we don't look before it.
459
            start_date = self._get_asset_start_date(asset)
460
            start_date_idx = tradingcalendar.trading_days.searchsorted(
461
                start_date) - INDEX_OF_FIRST_TRADING_DAY
462
            start_day_offset = start_date_idx * 390
463
464
            original_start = minute_offset_to_use
465
466
            while result == 0 and minute_offset_to_use > start_day_offset:
467
                minute_offset_to_use -= 1
468
                result = carray[minute_offset_to_use]
469
470
            # once we've found data, we need to check whether it needs
471
            # to be adjusted.
472
            if result != 0:
473
                minutes = self.env.market_minute_window(
474
                    start=(dt or self.current_dt),
475
                    count=(original_start - minute_offset_to_use + 1),
476
                    step=-1
477
                ).order()
478
479
                # only need to check for adjustments if we've gone back
480
                # far enough to cross the day boundary.
481
                if minutes[0].date() != minutes[-1].date():
482
                    # create a np array of size minutes, fill it all with
483
                    # the same value.  and adjust the array.
484
                    arr = np.array([result] * len(minutes),
485
                                   dtype=np.float64)
486
                    self._apply_all_adjustments(
487
                        data=arr,
488
                        asset=asset,
489
                        dts=minutes,
490
                        field=column
491
                    )
492
493
                    # The first value of the adjusted array is the value
494
                    # we want.
495
                    result = arr[0]
496
497
        if column != 'volume':
498
            return result * self.MINUTE_PRICE_ADJUSTMENT_FACTOR
499
        else:
500
            return result
501
502
    def _get_daily_data(self, asset, column, dt):
503
        while True:
504
            try:
505
                value = self._daily_bar_reader.spot_price(asset, dt, column)
506
                if value != -1:
507
                    return value
508
                else:
509
                    dt -= tradingcalendar.trading_day
510
            except NoDataOnDate:
511
                return 0
512
513
    def _get_history_daily_window(self, assets, end_dt, bar_count,
514
                                  field_to_use):
515
        """
516
        Internal method that returns a dataframe containing history bars
517
        of daily frequency for the given sids.
518
        """
519
        day_idx = tradingcalendar.trading_days.searchsorted(end_dt.date())
520
        days_for_window = tradingcalendar.trading_days[
521
            (day_idx - bar_count + 1):(day_idx + 1)]
522
523
        if len(assets) == 0:
524
            return pd.DataFrame(None,
525
                                index=days_for_window,
526
                                columns=None)
527
528
        data = []
529
530
        for asset in assets:
531
            if isinstance(asset, Future):
532
                data.append(self._get_history_daily_window_future(
533
                    asset, days_for_window, end_dt, field_to_use
534
                ))
535
            else:
536
                data.append(self._get_history_daily_window_equity(
537
                    asset, days_for_window, end_dt, field_to_use
538
                ))
539
540
        return pd.DataFrame(
541
            np.array(data).T,
542
            index=days_for_window,
543
            columns=assets
544
        )
545
546
    def _get_history_daily_window_future(self, asset, days_for_window,
547
                                         end_dt, column):
548
        # Since we don't have daily bcolz files for futures (yet), use minute
549
        # bars to calculate the daily values.
550
        data = []
551
        data_groups = []
552
553
        # get all the minutes for the days NOT including today
554
        for day in days_for_window[:-1]:
555
            minutes = self.env.market_minutes_for_day(day)
556
557
            values_for_day = np.zeros(len(minutes), dtype=np.float64)
558
559
            for idx, minute in enumerate(minutes):
560
                minute_val = self._get_minute_spot_value_future(
561
                    asset, column, minute
562
                )
563
564
                values_for_day[idx] = minute_val
565
566
            data_groups.append(values_for_day)
567
568
        # get the minutes for today
569
        last_day_minutes = pd.date_range(
570
            start=self.env.get_open_and_close(end_dt)[0],
571
            end=end_dt,
572
            freq="T"
573
        )
574
575
        values_for_last_day = np.zeros(len(last_day_minutes), dtype=np.float64)
576
577
        for idx, minute in enumerate(last_day_minutes):
578
            minute_val = self._get_minute_spot_value_future(
579
                asset, column, minute
580
            )
581
582
            values_for_last_day[idx] = minute_val
583
584
        data_groups.append(values_for_last_day)
585
586
        for group in data_groups:
587
            if len(group) == 0:
588
                continue
589
590
            if column == 'volume':
591
                data.append(np.sum(group))
592
            elif column == 'open':
593
                data.append(group[0])
594
            elif column == 'close':
595
                data.append(group[-1])
596
            elif column == 'high':
597
                data.append(np.amax(group))
598
            elif column == 'low':
599
                data.append(np.amin(group))
600
601
        return data
602
603
    def _get_history_daily_window_equity(self, asset, days_for_window,
604
                                         end_dt, field_to_use):
605
        sid = int(asset)
606
        ends_at_midnight = end_dt.hour == 0 and end_dt.minute == 0
607
608
        # get the start and end dates for this sid
609
        end_date = self._get_asset_end_date(asset)
610
611
        if ends_at_midnight or (days_for_window[-1] > end_date):
612
            # two cases where we use daily data for the whole range:
613
            # 1) the history window ends at midnight utc.
614
            # 2) the last desired day of the window is after the
615
            # last trading day, use daily data for the whole range.
616
            return self._get_daily_window_for_sid(
617
                asset,
618
                field_to_use,
619
                days_for_window,
620
                extra_slot=False
621
            )
622
        else:
623
            # for the last day of the desired window, use minute
624
            # data and aggregate it.
625
            all_minutes_for_day = self.env.market_minutes_for_day(
626
                pd.Timestamp(end_dt.date()))
627
628
            last_minute_idx = all_minutes_for_day.searchsorted(end_dt)
629
630
            # these are the minutes for the partial day
631
            minutes_for_partial_day =\
632
                all_minutes_for_day[0:(last_minute_idx + 1)]
633
634
            daily_data = self._get_daily_window_for_sid(
635
                sid,
636
                field_to_use,
637
                days_for_window[0:-1]
638
            )
639
640
            minute_data = self._get_minute_window_for_equity(
641
                sid,
642
                field_to_use,
643
                minutes_for_partial_day
644
            )
645
646
            if field_to_use == 'volume':
647
                minute_value = np.sum(minute_data)
648
            elif field_to_use == 'open':
649
                minute_value = minute_data[0]
650
            elif field_to_use == 'close':
651
                minute_value = minute_data[-1]
652
            elif field_to_use == 'high':
653
                minute_value = np.amax(minute_data)
654
            elif field_to_use == 'low':
655
                minute_value = np.amin(minute_data)
656
657
            # append the partial day.
658
            daily_data[-1] = minute_value
659
660
            return daily_data
661
662
    def _get_history_minute_window(self, assets, end_dt, bar_count,
663
                                   field_to_use):
664
        """
665
        Internal method that returns a dataframe containing history bars
666
        of minute frequency for the given sids.
667
        """
668
        # get all the minutes for this window
669
        minutes_for_window = self.env.market_minute_window(
670
            end_dt, bar_count, step=-1)[::-1]
671
672
        # but then cut it down to only the minutes after
673
        # FIRST_TRADING_MINUTE
674
        modified_minutes_for_window = minutes_for_window[
675
            minutes_for_window.slice_indexer(FIRST_TRADING_MINUTE)]
676
677
        modified_minutes_length = len(modified_minutes_for_window)
678
679
        if modified_minutes_length == 0:
680
            raise ValueError("Cannot calculate history window that ends"
681
                             "before 2002-01-02 14:31 UTC!")
682
683
        data = []
684
        bars_to_prepend = 0
685
        nans_to_prepend = None
686
687
        if modified_minutes_length < bar_count and \
688
           (modified_minutes_for_window[0] == FIRST_TRADING_MINUTE):
689
            # the beginning of the window goes before our global trading
690
            # start date
691
            bars_to_prepend = bar_count - modified_minutes_length
692
            nans_to_prepend = np.repeat(np.nan, bars_to_prepend)
693
694
        if len(assets) == 0:
695
            return pd.DataFrame(
696
                None,
697
                index=modified_minutes_for_window,
698
                columns=None
699
            )
700
701
        for asset in assets:
702
            asset_minute_data = self._get_minute_window_for_asset(
703
                asset,
704
                field_to_use,
705
                modified_minutes_for_window
706
            )
707
708
            if bars_to_prepend != 0:
709
                asset_minute_data = np.insert(asset_minute_data, 0,
710
                                              nans_to_prepend)
711
712
            data.append(asset_minute_data)
713
714
        return pd.DataFrame(
715
            np.array(data).T,
716
            index=minutes_for_window,
717
            columns=map(int, assets)
718
        )
719
720
    def get_history_window(self, assets, end_dt, bar_count, frequency, field,
721
                           ffill=True):
722
        """
723
        Public API method that returns a dataframe containing the requested
724
        history window.  Data is fully adjusted.
725
726
        Parameters
727
        ---------
728
        assets : list of zipline.data.Asset objects
729
            The assets whose data is desired.
730
731
        bar_count: int
732
            The number of bars desired.
733
734
        frequency: string
735
            "1d" or "1m"
736
737
        field: string
738
            The desired field of the asset.
739
740
        ffill: boolean
741
            Forward-fill missing values. Only has effect if field
742
            is 'price'.
743
744
        Returns
745
        -------
746
        A dataframe containing the requested data.
747
        """
748
        try:
749
            field_to_use = BASE_FIELDS[field]
750
        except KeyError:
751
            raise ValueError("Invalid history field: " + str(field))
752
753
        # sanity check in case sids were passed in
754
        assets = [(self.env.asset_finder.retrieve_asset(asset) if
755
                   isinstance(asset, int) else asset) for asset in assets]
756
757
        if frequency == "1d":
758
            df = self._get_history_daily_window(assets, end_dt, bar_count,
759
                                                field_to_use)
760
        elif frequency == "1m":
761
            df = self._get_history_minute_window(assets, end_dt, bar_count,
762
                                                 field_to_use)
763
        else:
764
            raise ValueError("Invalid frequency: {0}".format(frequency))
765
766
        # forward-fill if needed
767
        if field == "price" and ffill:
768
            df.fillna(method='ffill', inplace=True)
769
770
        return df
771
772
    def _get_minute_window_for_asset(self, asset, field, minutes_for_window):
773
        """
774
        Internal method that gets a window of adjusted minute data for an asset
775
        and specified date range.  Used to support the history API method for
776
        minute bars.
777
778
        Missing bars are filled with NaN.
779
780
        Parameters
781
        ----------
782
        asset : Asset
783
            The asset whose data is desired.
784
785
        field: string
786
            The specific field to return.  "open", "high", "close_price", etc.
787
788
        minutes_for_window: pd.DateTimeIndex
789
            The list of minutes representing the desired window.  Each minute
790
            is a pd.Timestamp.
791
792
        Returns
793
        -------
794
        A numpy array with requested values.
795
        """
796
        if isinstance(asset, int):
797
            asset = self.env.asset_finder.retrieve_asset(asset)
798
799
        if isinstance(asset, Future):
800
            return self._get_minute_window_for_future(asset, field,
801
                                                      minutes_for_window)
802
        else:
803
            return self._get_minute_window_for_equity(asset, field,
804
                                                      minutes_for_window)
805
806
    def _get_minute_window_for_future(self, asset, field, minutes_for_window):
807
        # THIS IS TEMPORARY.  For now, we are only exposing futures within
808
        # equity trading hours (9:30 am to 4pm, Eastern).  The easiest way to
809
        # do this is to simply do a spot lookup for each desired minute.
810
        return_data = np.zeros(len(minutes_for_window), dtype=np.float64)
811
        for idx, minute in enumerate(minutes_for_window):
812
            return_data[idx] = \
813
                self._get_minute_spot_value_future(asset, field, minute)
814
815
        # Note: an improvement could be to find the consecutive runs within
816
        # minutes_for_window, and use them to read the underlying ctable
817
        # more efficiently.
818
819
        # Once futures are on 24-hour clock, then we can just grab all the
820
        # requested minutes in one shot from the ctable.
821
822
        # no adjustments for futures, yay.
823
        return return_data
824
825
    def _get_minute_window_for_equity(self, asset, field, minutes_for_window):
826
        # each sid's minutes are stored in a bcolz file
827
        # the bcolz file has 390 bars per day, starting at 1/2/2002, regardless
828
        # of when the asset started trading and regardless of half days.
829
        # for a half day, the second half is filled with zeroes.
830
831
        # find the position of start_dt in the entire timeline, go back
832
        # bar_count bars, and that's the unadjusted data
833
        raw_data = self._open_minute_file(field, asset)
834
835
        start_idx = max(self._find_position_of_minute(minutes_for_window[0]),
836
                        0)
837
        end_idx = self._find_position_of_minute(minutes_for_window[-1]) + 1
838
839
        if end_idx == 0:
840
            # No data to return for minute window.
841
            return np.full(len(minutes_for_window), np.nan)
842
843
        return_data = np.zeros(len(minutes_for_window), dtype=np.float64)
844
845
        data_to_copy = raw_data[start_idx:end_idx]
846
847
        num_minutes = len(minutes_for_window)
848
849
        # data_to_copy contains all the zeros (from 1pm to 4pm of an early
850
        # close).  num_minutes is the number of actual trading minutes.  if
851
        # these two have different lengths, that means that we need to trim
852
        # away data due to early closes.
853
        if len(data_to_copy) != num_minutes:
854
            # get a copy of the minutes in Eastern time, since we depend on
855
            # an early close being at 1pm Eastern.
856
            eastern_minutes = minutes_for_window.tz_convert("US/Eastern")
857
858
            # accumulate a list of indices of the last minute of an early
859
            # close day.  For example, if data_to_copy starts at 12:55 pm, and
860
            # there are five minutes of real data before 180 zeroes, we would
861
            # put 5 into last_minute_idx_of_early_close_day, because the fifth
862
            # minute is the last "real" minute of the day.
863
            last_minute_idx_of_early_close_day = []
864
            for minute_idx, minute_dt in enumerate(eastern_minutes):
865
                if minute_idx == (num_minutes - 1):
866
                    break
867
868
                if minute_dt.hour == 13 and minute_dt.minute == 0:
869
                    next_minute = eastern_minutes[minute_idx + 1]
870
                    if next_minute.hour != 13:
871
                        # minute_dt is the last minute of an early close day
872
                        last_minute_idx_of_early_close_day.append(minute_idx)
873
874
            # spin through the list of early close markers, and use them to
875
            # chop off 180 minutes at a time from data_to_copy.
876
            for idx, early_close_minute_idx in \
877
                    enumerate(last_minute_idx_of_early_close_day):
878
                early_close_minute_idx -= (180 * idx)
879
                data_to_copy = np.delete(
880
                    data_to_copy,
881
                    range(
882
                        early_close_minute_idx + 1,
883
                        early_close_minute_idx + 181
884
                    )
885
                )
886
887
        return_data[0:len(data_to_copy)] = data_to_copy
888
889
        self._apply_all_adjustments(
890
            return_data,
891
            asset,
892
            minutes_for_window,
893
            field,
894
            self.MINUTE_PRICE_ADJUSTMENT_FACTOR
895
        )
896
897
        return return_data
898
899
    def _apply_all_adjustments(self, data, asset, dts, field,
900
                               price_adj_factor=1.0):
901
        """
902
        Internal method that applies all the necessary adjustments on the
903
        given data array.
904
905
        The adjustments are:
906
        - splits
907
        - if field != "volume":
908
            - mergers
909
            - dividends
910
            - * 0.001
911
            - any zero fields replaced with NaN
912
        - all values rounded to 3 digits after the decimal point.
913
914
        Parameters
915
        ----------
916
        data : np.array
917
            The data to be adjusted.
918
919
        asset: Asset
920
            The asset whose data is being adjusted.
921
922
        dts: pd.DateTimeIndex
923
            The list of minutes or days representing the desired window.
924
925
        field: string
926
            The field whose values are in the data array.
927
928
        price_adj_factor: float
929
            Factor with which to adjust OHLC values.
930
        Returns
931
        -------
932
        None.  The data array is modified in place.
933
        """
934
        self._apply_adjustments_to_window(
935
            self._get_adjustment_list(
936
                asset, self._splits_dict, "SPLITS"
937
            ),
938
            data,
939
            dts,
940
            field != 'volume'
941
        )
942
943
        if field != 'volume':
944
            self._apply_adjustments_to_window(
945
                self._get_adjustment_list(
946
                    asset, self._mergers_dict, "MERGERS"
947
                ),
948
                data,
949
                dts,
950
                True
951
            )
952
953
            self._apply_adjustments_to_window(
954
                self._get_adjustment_list(
955
                    asset, self._dividends_dict, "DIVIDENDS"
956
                ),
957
                data,
958
                dts,
959
                True
960
            )
961
962
            data *= price_adj_factor
963
964
            # if anything is zero, it's a missing bar, so replace it with NaN.
965
            # we only want to do this for non-volume fields, because a missing
966
            # volume should be 0.
967
            data[data == 0] = np.NaN
968
969
        np.around(data, 3, out=data)
970
971
    @staticmethod
972
    def _find_position_of_minute(minute_dt):
973
        """
974
        Internal method that returns the position of the given minute in the
975
        list of every trading minute since market open on 1/2/2002.
976
977
        IMPORTANT: This method assumes every day is 390 minutes long, even
978
        early closes.  Our minute bcolz files are generated like this to
979
        support fast lookup.
980
981
        ex. this method would return 2 for 1/2/2002 9:32 AM Eastern.
982
983
        Parameters
984
        ----------
985
        minute_dt: pd.Timestamp
986
            The minute whose position should be calculated.
987
988
        Returns
989
        -------
990
        The position of the given minute in the list of all trading minutes
991
        since market open on 1/2/2002.
992
        """
993
        day = minute_dt.date()
994
        day_idx = tradingcalendar.trading_days.searchsorted(day) -\
995
            INDEX_OF_FIRST_TRADING_DAY
996
        if day_idx < 0:
997
            return -1
998
999
        day_open = pd.Timestamp(
1000
            datetime(
1001
                year=day.year,
1002
                month=day.month,
1003
                day=day.day,
1004
                hour=9,
1005
                minute=31),
1006
            tz='US/Eastern').tz_convert('UTC')
1007
1008
        minutes_offset = int((minute_dt - day_open).total_seconds()) / 60
1009
1010
        return int((390 * day_idx) + minutes_offset)
1011
1012
    def _get_daily_window_for_sid(self, asset, field, days_in_window,
1013
                                  extra_slot=True):
1014
        """
1015
        Internal method that gets a window of adjusted daily data for a sid
1016
        and specified date range.  Used to support the history API method for
1017
        daily bars.
1018
1019
        Parameters
1020
        ----------
1021
        asset : Asset
1022
            The asset whose data is desired.
1023
1024
        start_dt: pandas.Timestamp
1025
            The start of the desired window of data.
1026
1027
        bar_count: int
1028
            The number of days of data to return.
1029
1030
        field: string
1031
            The specific field to return.  "open", "high", "close_price", etc.
1032
1033
        extra_slot: boolean
1034
            Whether to allocate an extra slot in the returned numpy array.
1035
            This extra slot will hold the data for the last partial day.  It's
1036
            much better to create it here than to create a copy of the array
1037
            later just to add a slot.
1038
1039
        Returns
1040
        -------
1041
        A numpy array with requested values.  Any missing slots filled with
1042
        nan.
1043
1044
        """
1045
        column = US_EQUITY_COLUMNS[field]
1046
        data = self._daily_bar_reader.load_raw_arrays([column],
1047
                                                      days_in_window[0],
1048
                                                      days_in_window[-1],
1049
                                                      [asset])
1050
1051
        bar_count = len(days_in_window)
1052
1053
        # create an np.array of size bar_count
1054
        if extra_slot:
1055
            return_array = np.zeros((bar_count + 1,))
1056
        else:
1057
            return_array = np.zeros((bar_count,))
1058
1059
        return_array[:] = np.NAN
1060
        sid = int(asset)
1061
1062
        return_array[0:bar_count] = data[0].T[0]
1063
1064
        self._apply_all_adjustments(
1065
            return_array,
1066
            sid,
1067
            days_in_window,
1068
            field,
1069
        )
1070
1071
        return return_array
1072
1073
    @staticmethod
1074
    def _apply_adjustments_to_window(adjustments_list, window_data,
1075
                                     dts_in_window, multiply):
1076
        if len(adjustments_list) == 0:
1077
            return
1078
1079
        # advance idx to the correct spot in the adjustments list, based on
1080
        # when the window starts
1081
        idx = 0
1082
1083
        while idx < len(adjustments_list) and dts_in_window[0] >\
1084
                adjustments_list[idx][0]:
1085
            idx += 1
1086
1087
        # if we've advanced through all the adjustments, then there's nothing
1088
        # to do.
1089
        if idx == len(adjustments_list):
1090
            return
1091
1092
        while idx < len(adjustments_list):
1093
            adjustment_to_apply = adjustments_list[idx]
1094
1095
            if adjustment_to_apply[0] > dts_in_window[-1]:
1096
                break
1097
1098
            range_end = dts_in_window.searchsorted(adjustment_to_apply[0])
1099
            if multiply:
1100
                window_data[0:range_end] *= adjustment_to_apply[1]
1101
            else:
1102
                window_data[0:range_end] /= adjustment_to_apply[1]
1103
1104
            idx += 1
1105
1106
    def _get_adjustment_list(self, asset, adjustments_dict, table_name):
1107
        """
1108
        Internal method that returns a list of adjustments for the given sid.
1109
1110
        Parameters
1111
        ----------
1112
        asset : Asset
1113
            The asset for which to return adjustments.
1114
1115
        adjustments_dict: dict
1116
            A dictionary of sid -> list that is used as a cache.
1117
1118
        table_name: string
1119
            The table that contains this data in the adjustments db.
1120
1121
        Returns
1122
        -------
1123
        adjustments: list
1124
            A list of [multiplier, pd.Timestamp], earliest first
1125
1126
        """
1127
        if self._adjustment_reader is None:
1128
            return []
1129
1130
        sid = int(asset)
1131
1132
        try:
1133
            adjustments = adjustments_dict[sid]
1134
        except KeyError:
1135
            adjustments = adjustments_dict[sid] = self._adjustment_reader.\
1136
                get_adjustments_for_sid(table_name, sid)
1137
1138
        return adjustments
1139
1140
    def get_equity_price_view(self, asset):
1141
        """
1142
        Returns a DataPortalSidView for the given asset.  Used to support the
1143
        data[sid(N)] public API.  Not needed if DataPortal is used standalone.
1144
1145
        Parameters
1146
        ----------
1147
        asset : Asset
1148
            Asset that is being queried.
1149
1150
        Returns
1151
        -------
1152
        DataPortalSidView: Accessor into the given asset's data.
1153
        """
1154
        try:
1155
            view = self.views[asset]
1156
        except KeyError:
1157
            view = self.views[asset] = DataPortalSidView(asset, self)
1158
1159
        return view
1160
1161
    def _check_is_currently_alive(self, asset, dt):
1162
        if dt is None:
1163
            dt = self.current_day
1164
1165
        sid = int(asset)
1166
1167
        if sid not in self._asset_start_dates:
1168
            self._get_asset_start_date(asset)
1169
1170
        start_date = self._asset_start_dates[sid]
1171
        if self._asset_start_dates[sid] > dt:
1172
            raise NoTradeDataAvailableTooEarly(
1173
                sid=sid,
1174
                dt=dt,
1175
                start_dt=start_date
1176
            )
1177
1178
        end_date = self._asset_end_dates[sid]
1179
        if self._asset_end_dates[sid] < dt:
1180
            raise NoTradeDataAvailableTooLate(
1181
                sid=sid,
1182
                dt=dt,
1183
                end_dt=end_date
1184
            )
1185
1186
    def _get_asset_start_date(self, asset):
1187
        self._ensure_asset_dates(asset)
1188
        return self._asset_start_dates[asset]
1189
1190
    def _get_asset_end_date(self, asset):
1191
        self._ensure_asset_dates(asset)
1192
        return self._asset_end_dates[asset]
1193
1194
    def _ensure_asset_dates(self, asset):
1195
        sid = int(asset)
1196
1197
        if sid not in self._asset_start_dates:
1198
            self._asset_start_dates[sid] = asset.start_date
1199
            self._asset_end_dates[sid] = asset.end_date
1200
1201
    def get_splits(self, sids, dt):
1202
        """
1203
        Returns any splits for the given sids and the given dt.
1204
1205
        Parameters
1206
        ----------
1207
        sids : list
1208
            Sids for which we want splits.
1209
1210
        dt: pd.Timestamp
1211
            The date for which we are checking for splits.  Note: this is
1212
            expected to be midnight UTC.
1213
1214
        Returns
1215
        -------
1216
        list: List of splits, where each split is a (sid, ratio) tuple.
1217
        """
1218
        if self._adjustment_reader is None or len(sids) == 0:
1219
            return {}
1220
1221
        # convert dt to # of seconds since epoch, because that's what we use
1222
        # in the adjustments db
1223
        seconds = int(dt.value / 1e9)
1224
1225
        splits = self._adjustment_reader.conn.execute(
1226
            "SELECT sid, ratio FROM SPLITS WHERE effective_date = ?",
1227
            (seconds,)).fetchall()
1228
1229
        sids_set = set(sids)
1230
        splits = [split for split in splits if split[0] in sids_set]
1231
1232
        return splits
1233
1234
    def get_stock_dividends(self, sid, trading_days):
1235
        """
1236
        Returns all the stock dividends for a specific sid that occur
1237
        in the given trading range.
1238
1239
        Parameters
1240
        ----------
1241
        sid: int
1242
            The asset whose stock dividends should be returned.
1243
1244
        trading_days: pd.DatetimeIndex
1245
            The trading range.
1246
1247
        Returns
1248
        -------
1249
        list: A list of objects with all relevant attributes populated.
1250
        All timestamp fields are converted to pd.Timestamps.
1251
        """
1252
1253
        if self._adjustment_reader is None:
1254
            return []
1255
1256
        if len(trading_days) == 0:
1257
            return []
1258
1259
        start_dt = trading_days[0].value / 1e9
1260
        end_dt = trading_days[-1].value / 1e9
1261
1262
        dividends = self._adjustment_reader.conn.execute(
1263
            "SELECT * FROM stock_dividend_payouts WHERE sid = ? AND "
1264
            "ex_date > ? AND pay_date < ?", (int(sid), start_dt, end_dt,)).\
1265
            fetchall()
1266
1267
        dividend_info = []
1268
        for dividend_tuple in dividends:
1269
            dividend_info.append({
1270
                "declared_date": dividend_tuple[1],
1271
                "ex_date": pd.Timestamp(dividend_tuple[2], unit="s"),
1272
                "pay_date": pd.Timestamp(dividend_tuple[3], unit="s"),
1273
                "payment_sid": dividend_tuple[4],
1274
                "ratio": dividend_tuple[5],
1275
                "record_date": pd.Timestamp(dividend_tuple[6], unit="s"),
1276
                "sid": dividend_tuple[7]
1277
            })
1278
1279
        return dividend_info
1280
1281
    def contains(self, asset, field):
1282
        return field in BASE_FIELDS or \
1283
            (field in self._augmented_sources_map and
1284
             asset in self._augmented_sources_map[field])
1285
1286
    def get_fetcher_assets(self):
1287
        """
1288
        Returns a list of assets for the current date, as defined by the
1289
        fetcher data.
1290
1291
        Notes
1292
        -----
1293
        Data is forward-filled.  If there is no fetcher data defined for day
1294
        N, we use day N-1's data (if available, otherwise we keep going back).
1295
1296
        Returns
1297
        -------
1298
        list: a list of Asset objects.
1299
        """
1300
        # return a list of assets for the current date, as defined by the
1301
        # fetcher source
1302
        if self._extra_source_df is None:
1303
            return []
1304
1305
        if self.current_day in self._extra_source_df.index:
1306
            date_to_use = self.current_day
1307
        else:
1308
            # current day isn't in the fetcher df, go back the last
1309
            # available day
1310
            idx = self._extra_source_df.index.searchsorted(self.current_day)
1311
            if idx == 0:
1312
                return []
1313
1314
            date_to_use = self._extra_source_df.index[idx - 1]
1315
1316
        asset_list = self._extra_source_df.loc[date_to_use]["sid"]
1317
1318
        # make sure they're actually assets
1319
        asset_list = [asset for asset in asset_list
1320
                      if isinstance(asset, Asset)]
1321
1322
        return asset_list
1323
1324
1325
class DataPortalSidView(object):
1326
    def __init__(self, asset, portal):
1327
        self.asset = asset
1328
        self.portal = portal
1329
1330
    def __getattr__(self, column):
1331
        return self.portal.get_spot_value(self.asset, column)
1332
1333
    def __contains__(self, column):
1334
        return self.portal.contains(self.asset, column)
1335
1336
    def __getitem__(self, column):
1337
        return self.__getattr__(column)
1338