Completed
Pull Request — master (#858)
by Eddie
05:34 queued 02:25
created

zipline.data.DataPortal.handle_extra_source()   D

Complexity

Conditions 8

Size

Total Lines 74

Duplication

Lines 0
Ratio 0 %
Metric Value
cc 8
dl 0
loc 74
rs 4.9793

How to fix   Long Method   

Long Method

Small methods make your code easier to understand, in particular if combined with a good name. Besides, if your method is small, finding a good name is usually much easier.

For example, if you find yourself adding comments to a method's body, this is usually a good sign to extract the commented part to a new method, and use the comment as a starting point when coming up with a good name for this new method.

Commonly applied refactorings include:

1
#
2
# Copyright 2015 Quantopian, Inc.
3
#
4
# Licensed under the Apache License, Version 2.0 (the "License");
5
# you may not use this file except in compliance with the License.
6
# You may obtain a copy of the License at
7
#
8
#     http://www.apache.org/licenses/LICENSE-2.0
9
#
10
# Unless required by applicable law or agreed to in writing, software
11
# distributed under the License is distributed on an "AS IS" BASIS,
12
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13
# See the License for the specific language governing permissions and
14
# limitations under the License.
15
16
from datetime import datetime
17
import bcolz
18
from logbook import Logger
19
20
import numpy as np
21
import pandas as pd
22
from pandas.tslib import normalize_date
23
24
from zipline.assets import Asset, Future, Equity
25
from zipline.data.us_equity_pricing import (
26
    BcolzDailyBarReader,
27
    NoDataOnDate
28
)
29
30
from zipline.utils import tradingcalendar
31
from zipline.errors import (
32
    NoTradeDataAvailableTooEarly,
33
    NoTradeDataAvailableTooLate
34
)
35
36
# FIXME anything to do with 2002-01-02 probably belongs in qexec, right/
37
FIRST_TRADING_DAY = pd.Timestamp("2002-01-02 00:00:00", tz='UTC')
38
FIRST_TRADING_MINUTE = pd.Timestamp("2002-01-02 14:31:00", tz='UTC')
39
40
# FIXME should this be passed in (is this qexec specific?)?
41
INDEX_OF_FIRST_TRADING_DAY = 3028
42
43
log = Logger('DataPortal')
44
45
HISTORY_FREQUENCIES = ["1d", "1m"]
46
47
BASE_FIELDS = {
48
    'open': 'open',
49
    'open_price': 'open',
50
    'high': 'high',
51
    'low': 'low',
52
    'close': 'close',
53
    'close_price': 'close',
54
    'volume': 'volume',
55
    'price': 'close'
56
}
57
58
59
class DataPortal(object):
60
    def __init__(self,
61
                 env,
62
                 sim_params=None,
63
                 minutes_equities_path=None,
64
                 daily_equities_path=None,
65
                 adjustment_reader=None,
66
                 equity_sid_path_func=None,
67
                 futures_sid_path_func=None):
68
        self.env = env
69
70
        # Internal pointers to the current dt (can be minute) and current day.
71
        # In daily mode, they point to the same thing. In minute mode, it's
72
        # useful to have separate pointers to the current day and to the
73
        # current minute.  These pointers are updated by the
74
        # AlgorithmSimulator's transform loop.
75
        self.current_dt = None
76
        self.current_day = None
77
78
        # This is a bit ugly, but is here for performance reasons.  In minute
79
        # simulations, we need to very quickly go from dt -> (# of minutes
80
        # since Jan 1 2002 9:30 Eastern).
81
        #
82
        # The clock that heartbeats the simulation has all the necessary
83
        # information to do this calculation very quickly.  This value is
84
        # calculated there, and then set here
85
        self.cur_data_offset = 0
86
87
        self.views = {}
88
89
        if minutes_equities_path is None and daily_equities_path is None:
90
            raise ValueError("Must provide at least one of minute or "
91
                             "daily data path!")
92
93
        self._minutes_equities_path = minutes_equities_path
94
        self._daily_equities_path = daily_equities_path
95
        self._asset_finder = env.asset_finder
96
97
        self._carrays = {
98
            'open': {},
99
            'high': {},
100
            'low': {},
101
            'close': {},
102
            'volume': {},
103
            'sid': {},
104
            'dt': {},
105
        }
106
107
        self._adjustment_reader = adjustment_reader
108
109
        # caches of sid -> adjustment list
110
        self._splits_dict = {}
111
        self._mergers_dict = {}
112
        self._dividends_dict = {}
113
114
        # Pointer to the daily bcolz file.
115
        self._daily_equities_data = None
116
117
        # Cache of sid -> the first trading day of an asset, even if that day
118
        # is before 1/2/2002.
119
        self._asset_start_dates = {}
120
        self._asset_end_dates = {}
121
122
        # Fetcher state
123
        self._augmented_sources_map = {}
124
        self._fetcher_df = None
125
126
        self._sim_params = sim_params
127
        if self._sim_params is not None:
128
            self._data_frequency = self._sim_params.data_frequency
129
        else:
130
            self._data_frequency = "minute"
131
132
        self._equity_sid_path_func = equity_sid_path_func
133
        self._futures_sid_path_func = futures_sid_path_func
134
135
        self.DAILY_PRICE_ADJUSTMENT_FACTOR = 0.001
136
        self.MINUTE_PRICE_ADJUSTMENT_FACTOR = 0.001
137
138
        if daily_equities_path is not None:
139
            self._daily_bar_reader = BcolzDailyBarReader(daily_equities_path)
140
        else:
141
            self._daily_bar_reader = None
142
143
    def handle_extra_source(self, source_df):
144
        """
145
        Extra sources always have a sid column.
146
147
        We expand the given data (by forward filling) to the full range of
148
        the simulation dates, so that lookup is fast during simulation.
149
        """
150
        if source_df is None:
151
            return
152
153
        self._fetcher_df = source_df
154
155
        # source_df's sid column can either consist of assets we know about
156
        # (such as sid(24)) or of assets we don't know about (such as
157
        # palladium).
158
        #
159
        # In both cases, we break up the dataframe into individual dfs
160
        # that only contain a single asset's information.  ie, if source_df
161
        # has data for PALLADIUM and GOLD, we split source_df into two
162
        # dataframes, one for each. (same applies if source_df has data for
163
        # AAPL and IBM).
164
        #
165
        # We then take each child df and reindex it to the simulation's date
166
        # range by forward-filling missing values. this makes reads simpler.
167
        #
168
        # Finally, we store the data. For each fetcher column, we store a
169
        # mapping in self.augmented_sources_map from it to a dictionary of
170
        # asset -> df.  In other words,
171
        # self.augmented_sources_map['days_to_cover']['AAPL'] gives us the df
172
        # holding that data.
173
174
        if self._sim_params.emission_rate == "daily":
175
            fetcher_date_index = self.env.days_in_range(
176
                start=self._sim_params.period_start,
177
                end=self._sim_params.period_end
178
            )
179
        else:
180
            fetcher_date_index = self.env.minutes_for_days_in_range(
181
                start=self._sim_params.period_start,
182
                end=self._sim_params.period_end
183
            )
184
185
        # break the source_df up into one dataframe per sid.  this lets
186
        # us (more easily) calculate accurate start/end dates for each sid,
187
        # de-dup data, and expand the data to fit the backtest start/end date.
188
        grouped_by_sid = source_df.groupby(["sid"])
189
        group_names = grouped_by_sid.groups.keys()
190
        group_dict = {}
191
        for group_name in group_names:
192
            group_dict[group_name] = grouped_by_sid.get_group(group_name)
193
194
        for identifier, df in group_dict.iteritems():
195
            # before reindexing, save the earliest and latest dates
196
            earliest_date = df.index[0]
197
            latest_date = df.index[-1]
198
199
            # since we know this df only contains a single sid, we can safely
200
            # de-dupe by the index (dt)
201
            df = df.groupby(level=0).last()
202
203
            # reindex the dataframe based on the backtest start/end date.
204
            # this makes reads easier during the backtest.
205
            df = df.reindex(index=fetcher_date_index, method='ffill')
206
207
            if not isinstance(identifier, Asset):
208
                # for fake assets we need to store a start/end date
209
                self._asset_start_dates[identifier] = earliest_date
210
                self._asset_end_dates[identifier] = latest_date
211
212
            for col_name in df.columns.difference(['sid']):
213
                if col_name not in self._augmented_sources_map:
214
                    self._augmented_sources_map[col_name] = {}
215
216
                self._augmented_sources_map[col_name][identifier] = df
217
218
    def _open_daily_file(self):
219
        if self._daily_equities_data is None:
220
            self._daily_equities_data = bcolz.open(self._daily_equities_path)
221
            self.daily_equities_attrs = self._daily_equities_data.attrs
222
223
        return self._daily_equities_data, self.daily_equities_attrs
224
225
    def _open_minute_file(self, field, asset):
226
        sid_str = str(int(asset))
227
228
        try:
229
            carray = self._carrays[field][sid_str]
230
        except KeyError:
231
            carray = self._carrays[field][sid_str] = \
232
                self._get_ctable(asset)[field]
233
234
        return carray
235
236
    def _get_ctable(self, asset):
237
        sid = int(asset)
238
239
        if isinstance(asset, Future) and \
240
                self._futures_sid_path_func is not None:
241
            path = self._futures_sid_path_func(
242
                self._minutes_equities_path, sid
243
            )
244
        elif isinstance(asset, Equity) and \
245
                self._equity_sid_path_func is not None:
246
            path = self._equity_sid_path_func(
247
                self._minutes_equities_path, sid
248
            )
249
        else:
250
            path = "{0}/{1}.bcolz".format(self._minutes_equities_path, sid)
251
252
        return bcolz.open(path, mode='r')
253
254
    def get_previous_value(self, asset, field, dt):
255
        """
256
        Given an asset and a column and a dt, returns the previous value for
257
        the same asset/column pair.  If this data portal is in minute mode,
258
        it's the previous minute value, otherwise it's the previous day's
259
        value.
260
261
        Parameters
262
        ---------
263
        asset : Asset
264
            The asset whose data is desired.
265
266
        field: string
267
            The desired field of the asset.  Valid values are "open",
268
            "open_price", "high", "low", "close", "close_price", "volume", and
269
            "price".
270
271
        dt: pd.Timestamp
272
            The timestamp from which to go back in time one slot.
273
274
        Returns
275
        -------
276
        The value of the desired field at the desired time.
277
        """
278
        if self._data_frequency == 'daily':
279
            prev_dt = self.env.previous_trading_day(dt)
280
        elif self._data_frequency == 'minute':
281
            prev_dt = self.env.previous_market_minute(dt)
282
283
        return self.get_spot_value(asset, field, prev_dt)
284
285
    def _check_fetcher(self, asset, column, day):
286
        # if there is a fetcher column called "price", only look at it if
287
        # it's on something like palladium and not AAPL (since our own price
288
        # data always wins when dealing with assets)
289
        look_in_augmented_sources = column in self._augmented_sources_map and \
290
            not (column in BASE_FIELDS and isinstance(asset, Asset))
291
292
        if look_in_augmented_sources:
293
            # we're being asked for a fetcher field
294
            try:
295
                return self._augmented_sources_map[column][asset].\
296
                    loc[day, column]
297
            except:
298
                log.error(
299
                    "Could not find value for asset={0}, current_day={1},"
300
                    "column={2}".format(
301
                        str(asset),
302
                        str(self.current_day),
303
                        str(column)))
304
305
                raise KeyError
306
307
    def get_spot_value(self, asset, field, dt=None):
308
        """
309
        Public API method that returns a scalar value representing the value
310
        of the desired asset's field at either the given dt, or this data
311
        portal's current_dt.
312
313
        Parameters
314
        ---------
315
        asset : Asset
316
            The asset whose data is desired.
317
318
        field: string
319
            The desired field of the asset.  Valid values are "open",
320
            "open_price", "high", "low", "close", "close_price", "volume", and
321
            "price".
322
323
        dt: pd.Timestamp
324
            (Optional) The timestamp for the desired value.
325
326
        Returns
327
        -------
328
        The value of the desired field at the desired time.
329
        """
330
        fetcher_val = self._check_fetcher(asset, field,
331
                                          (dt or self.current_dt))
332
333
        if fetcher_val is not None:
334
            return fetcher_val
335
336
        if field not in BASE_FIELDS:
337
            raise KeyError("Invalid column: " + str(field))
338
339
        column_to_use = BASE_FIELDS[field]
340
341
        if isinstance(asset, int):
342
            asset = self._asset_finder.retrieve_asset(asset)
343
344
        self._check_is_currently_alive(asset, dt)
345
346
        if self._data_frequency == "daily":
347
            day_to_use = dt or self.current_day
348
            day_to_use = normalize_date(day_to_use)
349
            return self._get_daily_data(asset, column_to_use, day_to_use)
350
        else:
351
            dt_to_use = dt or self.current_dt
352
353
            if isinstance(asset, Future):
354
                return self._get_minute_spot_value_future(
355
                    asset, column_to_use, dt_to_use)
356
            else:
357
                return self._get_minute_spot_value(
358
                    asset, column_to_use, dt_to_use)
359
360
    def _get_minute_spot_value_future(self, asset, column, dt):
361
        # Futures bcolz files have 1440 bars per day (24 hours), 7 days a week.
362
        # The file attributes contain the "start_dt" and "last_dt" fields,
363
        # which represent the time period for this bcolz file.
364
365
        # The start_dt is midnight of the first day that this future started
366
        # trading.
367
368
        # figure out the # of minutes between dt and this asset's start_dt
369
        start_date = self._get_asset_start_date(asset)
370
        minute_offset = int((dt - start_date).total_seconds() / 60)
371
372
        if minute_offset < 0:
373
            # asking for a date that is before the asset's start date, no dice
374
            return 0.0
375
376
        # then just index into the bcolz carray at that offset
377
        carray = self._open_minute_file(column, asset)
378
        result = carray[minute_offset]
379
380
        # if there's missing data, go backwards until we run out of file
381
        while result == 0 and minute_offset > 0:
382
            minute_offset -= 1
383
            result = carray[minute_offset]
384
385
        if column != 'volume':
386
            return result * self.MINUTE_PRICE_ADJUSTMENT_FACTOR
387
        else:
388
            return result
389
390
    def _get_minute_spot_value(self, asset, column, dt):
391
        # All our minute bcolz files are written starting on 1/2/2002,
392
        # with 390 minutes per day, regarding of when the security started
393
        # trading. This lets us avoid doing an offset calculation related
394
        # to the asset start date.  Hard-coding 390 minutes per day lets us
395
        # ignore half days.
396
397
        if dt == self.current_dt:
398
            minute_offset_to_use = self.cur_data_offset
399
        else:
400
            # this is the slow path.
401
            # dt was passed in, so calculate the offset.
402
            # = (390 * number of trading days since 1/2/2002) +
403
            #   (index of minute in day)
404
            given_day = pd.Timestamp(dt.date(), tz='utc')
405
            day_index = tradingcalendar.trading_days.searchsorted(
406
                given_day) - INDEX_OF_FIRST_TRADING_DAY
407
408
            # if dt is before the first market minute, minute_index
409
            # will be 0.  if it's after the last market minute, it'll
410
            # be len(minutes_for_day)
411
            minute_index = self.env.market_minutes_for_day(given_day).\
412
                searchsorted(dt)
413
414
            minute_offset_to_use = (day_index * 390) + minute_index
415
416
        carray = self._open_minute_file(column, asset)
417
        result = carray[minute_offset_to_use]
418
419
        if result == 0:
420
            # if the given minute doesn't have data, we need to seek
421
            # backwards until we find data. This makes the data
422
            # forward-filled.
423
424
            # get this asset's start date, so that we don't look before it.
425
            start_date = self._get_asset_start_date(asset)
426
            start_date_idx = tradingcalendar.trading_days.searchsorted(
427
                start_date) - INDEX_OF_FIRST_TRADING_DAY
428
            start_day_offset = start_date_idx * 390
429
430
            original_start = minute_offset_to_use
431
432
            while result == 0 and minute_offset_to_use > start_day_offset:
433
                minute_offset_to_use -= 1
434
                result = carray[minute_offset_to_use]
435
436
            # once we've found data, we need to check whether it needs
437
            # to be adjusted.
438
            if result != 0:
439
                minutes = self.env.market_minute_window(
440
                    start=(dt or self.current_dt),
441
                    count=(original_start - minute_offset_to_use + 1),
442
                    step=-1
443
                ).order()
444
445
                # only need to check for adjustments if we've gone back
446
                # far enough to cross the day boundary.
447
                if minutes[0].date() != minutes[-1].date():
448
                    # create a np array of size minutes, fill it all with
449
                    # the same value.  and adjust the array.
450
                    arr = np.array([result] * len(minutes),
451
                                   dtype=np.float64)
452
                    self._apply_all_adjustments(
453
                        data=arr,
454
                        asset=asset,
455
                        dts=minutes,
456
                        field=column
457
                    )
458
459
                    # The first value of the adjusted array is the value
460
                    # we want.
461
                    result = arr[0]
462
463
        if column != 'volume':
464
            return result * self.MINUTE_PRICE_ADJUSTMENT_FACTOR
465
        else:
466
            return result
467
468
    def _get_daily_data(self, asset, column, dt):
469
        while True:
470
            try:
471
                value = self._daily_bar_reader.spot_price(asset, dt, column)
472
                if value != -1:
473
                    return value
474
                else:
475
                    dt -= tradingcalendar.trading_day
476
            except NoDataOnDate:
477
                return 0
478
479
    def _get_history_daily_window(self, assets, end_dt, bar_count,
480
                                  field_to_use):
481
        """
482
        Internal method that returns a dataframe containing history bars
483
        of daily frequency for the given sids.
484
        """
485
        day_idx = tradingcalendar.trading_days.searchsorted(end_dt.date())
486
        days_for_window = tradingcalendar.trading_days[
487
            (day_idx - bar_count + 1):(day_idx + 1)]
488
489
        if len(assets) == 0:
490
            return pd.DataFrame(None,
491
                                index=days_for_window,
492
                                columns=None)
493
494
        data = []
495
496
        for asset in assets:
497
            if isinstance(asset, Future):
498
                data.append(self._get_history_daily_window_future(
499
                    asset, days_for_window, end_dt, field_to_use
500
                ))
501
            else:
502
                data.append(self._get_history_daily_window_equity(
503
                    asset, days_for_window, end_dt, field_to_use
504
                ))
505
506
        return pd.DataFrame(
507
            np.array(data).T,
508
            index=days_for_window,
509
            columns=assets
510
        )
511
512
    def _get_history_daily_window_future(self, asset, days_for_window,
513
                                         end_dt, column):
514
        # Since we don't have daily bcolz files for futures (yet), use minute
515
        # bars to calculate the daily values.
516
        data = []
517
        data_groups = []
518
519
        # get all the minutes for the days NOT including today
520
        for day in days_for_window[:-1]:
521
            minutes = self.env.market_minutes_for_day(day)
522
523
            values_for_day = np.zeros(len(minutes), dtype=np.float64)
524
525
            for idx, minute in enumerate(minutes):
526
                minute_val = self._get_minute_spot_value_future(
527
                    asset, column, minute
528
                )
529
530
                values_for_day[idx] = minute_val
531
532
            data_groups.append(values_for_day)
533
534
        # get the minutes for today
535
        last_day_minutes = pd.date_range(
536
            start=self.env.get_open_and_close(end_dt)[0],
537
            end=end_dt,
538
            freq="T"
539
        )
540
541
        values_for_last_day = np.zeros(len(last_day_minutes), dtype=np.float64)
542
543
        for idx, minute in enumerate(last_day_minutes):
544
            minute_val = self._get_minute_spot_value_future(
545
                asset, column, minute
546
            )
547
548
            values_for_last_day[idx] = minute_val
549
550
        data_groups.append(values_for_last_day)
551
552
        for group in data_groups:
553
            if len(group) == 0:
554
                continue
555
556
            if column == 'volume':
557
                data.append(np.sum(group))
558
            elif column == 'open':
559
                data.append(group[0])
560
            elif column == 'close':
561
                data.append(group[-1])
562
            elif column == 'high':
563
                data.append(np.amax(group))
564
            elif column == 'low':
565
                data.append(np.amin(group))
566
567
        return data
568
569
    def _get_history_daily_window_equity(self, asset, days_for_window,
570
                                         end_dt, field_to_use):
571
        sid = int(asset)
572
        ends_at_midnight = end_dt.hour == 0 and end_dt.minute == 0
573
574
        # get the start and end dates for this sid
575
        end_date = self._get_asset_end_date(asset)
576
577
        if ends_at_midnight or (days_for_window[-1] > end_date):
578
            # two cases where we use daily data for the whole range:
579
            # 1) the history window ends at midnight utc.
580
            # 2) the last desired day of the window is after the
581
            # last trading day, use daily data for the whole range.
582
            return self._get_daily_window_for_sid(
583
                asset,
584
                field_to_use,
585
                days_for_window,
586
                extra_slot=False
587
            )
588
        else:
589
            # for the last day of the desired window, use minute
590
            # data and aggregate it.
591
            all_minutes_for_day = self.env.market_minutes_for_day(
592
                pd.Timestamp(end_dt.date()))
593
594
            last_minute_idx = all_minutes_for_day.searchsorted(end_dt)
595
596
            # these are the minutes for the partial day
597
            minutes_for_partial_day =\
598
                all_minutes_for_day[0:(last_minute_idx + 1)]
599
600
            daily_data = self._get_daily_window_for_sid(
601
                sid,
602
                field_to_use,
603
                days_for_window[0:-1]
604
            )
605
606
            minute_data = self._get_minute_window_for_equity(
607
                sid,
608
                field_to_use,
609
                minutes_for_partial_day
610
            )
611
612
            if field_to_use == 'volume':
613
                minute_value = np.sum(minute_data)
614
            elif field_to_use == 'open':
615
                minute_value = minute_data[0]
616
            elif field_to_use == 'close':
617
                minute_value = minute_data[-1]
618
            elif field_to_use == 'high':
619
                minute_value = np.amax(minute_data)
620
            elif field_to_use == 'low':
621
                minute_value = np.amin(minute_data)
622
623
            # append the partial day.
624
            daily_data[-1] = minute_value
625
626
            return daily_data
627
628
    def _get_history_minute_window(self, assets, end_dt, bar_count,
629
                                   field_to_use):
630
        """
631
        Internal method that returns a dataframe containing history bars
632
        of minute frequency for the given sids.
633
        """
634
        # get all the minutes for this window
635
        minutes_for_window = self.env.market_minute_window(
636
            end_dt, bar_count, step=-1)[::-1]
637
638
        # but then cut it down to only the minutes after
639
        # FIRST_TRADING_MINUTE
640
        modified_minutes_for_window = minutes_for_window[
641
            minutes_for_window.slice_indexer(FIRST_TRADING_MINUTE)]
642
643
        modified_minutes_length = len(modified_minutes_for_window)
644
645
        if modified_minutes_length == 0:
646
            raise ValueError("Cannot calculate history window that ends"
647
                             "before 2002-01-02 14:31 UTC!")
648
649
        data = []
650
        bars_to_prepend = 0
651
        nans_to_prepend = None
652
653
        if modified_minutes_length < bar_count and \
654
           (modified_minutes_for_window[0] == FIRST_TRADING_MINUTE):
655
            # the beginning of the window goes before our global trading
656
            # start date
657
            bars_to_prepend = bar_count - modified_minutes_length
658
            nans_to_prepend = np.repeat(np.nan, bars_to_prepend)
659
660
        if len(assets) == 0:
661
            return pd.DataFrame(
662
                None,
663
                index=modified_minutes_for_window,
664
                columns=None
665
            )
666
667
        for asset in assets:
668
            asset_minute_data = self._get_minute_window_for_asset(
669
                asset,
670
                field_to_use,
671
                modified_minutes_for_window
672
            )
673
674
            if bars_to_prepend != 0:
675
                asset_minute_data = np.insert(asset_minute_data, 0,
676
                                              nans_to_prepend)
677
678
            data.append(asset_minute_data)
679
680
        return pd.DataFrame(
681
            np.array(data).T,
682
            index=minutes_for_window,
683
            columns=map(int, assets)
684
        )
685
686
    def get_history_window(self, assets, end_dt, bar_count, frequency, field,
687
                           ffill=True):
688
        """
689
        Public API method that returns a dataframe containing the requested
690
        history window.  Data is fully adjusted.
691
692
        Parameters
693
        ---------
694
        assets : list of zipline.data.Asset objects
695
            The assets whose data is desired.
696
697
        bar_count: int
698
            The number of bars desired.
699
700
        frequency: string
701
            "1d" or "1m"
702
703
        field: string
704
            The desired field of the asset.
705
706
        ffill: boolean
707
            Forward-fill missing values. Only has effect if field
708
            is 'price'.
709
710
        Returns
711
        -------
712
        A dataframe containing the requested data.
713
        """
714
        try:
715
            field_to_use = BASE_FIELDS[field]
716
        except KeyError:
717
            raise ValueError("Invalid history field: " + str(field))
718
719
        # sanity check in case sids were passed in
720
        assets = [(self.env.asset_finder.retrieve_asset(asset) if
721
                   isinstance(asset, int) else asset) for asset in assets]
722
723
        if frequency == "1d":
724
            df = self._get_history_daily_window(assets, end_dt, bar_count,
725
                                                field_to_use)
726
        elif frequency == "1m":
727
            df = self._get_history_minute_window(assets, end_dt, bar_count,
728
                                                 field_to_use)
729
        else:
730
            raise ValueError("Invalid frequency: {0}".format(frequency))
731
732
        # forward-fill if needed
733
        if field == "price" and ffill:
734
            df.fillna(method='ffill', inplace=True)
735
736
        return df
737
738
    def _get_minute_window_for_asset(self, asset, field, minutes_for_window):
739
        """
740
        Internal method that gets a window of adjusted minute data for an asset
741
        and specified date range.  Used to support the history API method for
742
        minute bars.
743
744
        Missing bars are filled with NaN.
745
746
        Parameters
747
        ----------
748
        asset : Asset
749
            The asset whose data is desired.
750
751
        field: string
752
            The specific field to return.  "open", "high", "close_price", etc.
753
754
        minutes_for_window: pd.DateTimeIndex
755
            The list of minutes representing the desired window.  Each minute
756
            is a pd.Timestamp.
757
758
        Returns
759
        -------
760
        A numpy array with requested values.
761
        """
762
        if isinstance(asset, int):
763
            asset = self.env.asset_finder.retrieve_asset(asset)
764
765
        if isinstance(asset, Future):
766
            return self._get_minute_window_for_future(asset, field,
767
                                                      minutes_for_window)
768
        else:
769
            return self._get_minute_window_for_equity(asset, field,
770
                                                      minutes_for_window)
771
772
    def _get_minute_window_for_future(self, asset, field, minutes_for_window):
773
        # THIS IS TEMPORARY.  For now, we are only exposing futures within
774
        # equity trading hours (9:30 am to 4pm, Eastern).  The easiest way to
775
        # do this is to simply do a spot lookup for each desired minute.
776
        return_data = np.zeros(len(minutes_for_window), dtype=np.float64)
777
        for idx, minute in enumerate(minutes_for_window):
778
            return_data[idx] = \
779
                self._get_minute_spot_value_future(asset, field, minute)
780
781
        # Note: an improvement could be to find the consecutive runs within
782
        # minutes_for_window, and use them to read the underlying ctable
783
        # more efficiently.
784
785
        # Once futures are on 24-hour clock, then we can just grab all the
786
        # requested minutes in one shot from the ctable.
787
788
        # no adjustments for futures, yay.
789
        return return_data
790
791
    def _get_minute_window_for_equity(self, asset, field, minutes_for_window):
792
        # each sid's minutes are stored in a bcolz file
793
        # the bcolz file has 390 bars per day, starting at 1/2/2002, regardless
794
        # of when the asset started trading and regardless of half days.
795
        # for a half day, the second half is filled with zeroes.
796
797
        # find the position of start_dt in the entire timeline, go back
798
        # bar_count bars, and that's the unadjusted data
799
        raw_data = self._open_minute_file(field, asset)
800
801
        start_idx = max(self._find_position_of_minute(minutes_for_window[0]),
802
                        0)
803
        end_idx = self._find_position_of_minute(minutes_for_window[-1]) + 1
804
805
        return_data = np.zeros(len(minutes_for_window), dtype=np.float64)
806
        data_to_copy = raw_data[start_idx:end_idx]
807
808
        num_minutes = len(minutes_for_window)
809
810
        # data_to_copy contains all the zeros (from 1pm to 4pm of an early
811
        # close).  num_minutes is the number of actual trading minutes.  if
812
        # these two have different lengths, that means that we need to trim
813
        # away data due to early closes.
814
        if len(data_to_copy) != num_minutes:
815
            # get a copy of the minutes in Eastern time, since we depend on
816
            # an early close being at 1pm Eastern.
817
            eastern_minutes = minutes_for_window.tz_convert("US/Eastern")
818
819
            # accumulate a list of indices of the last minute of an early
820
            # close day.  For example, if data_to_copy starts at 12:55 pm, and
821
            # there are five minutes of real data before 180 zeroes, we would
822
            # put 5 into last_minute_idx_of_early_close_day, because the fifth
823
            # minute is the last "real" minute of the day.
824
            last_minute_idx_of_early_close_day = []
825
            for minute_idx, minute_dt in enumerate(eastern_minutes):
826
                if minute_idx == (num_minutes - 1):
827
                    break
828
829
                if minute_dt.hour == 13 and minute_dt.minute == 0:
830
                    next_minute = eastern_minutes[minute_idx + 1]
831
                    if next_minute.hour != 13:
832
                        # minute_dt is the last minute of an early close day
833
                        last_minute_idx_of_early_close_day.append(minute_idx)
834
835
            # spin through the list of early close markers, and use them to
836
            # chop off 180 minutes at a time from data_to_copy.
837
            for idx, early_close_minute_idx in \
838
                    enumerate(last_minute_idx_of_early_close_day):
839
                early_close_minute_idx -= (180 * idx)
840
                data_to_copy = np.delete(
841
                    data_to_copy,
842
                    range(
843
                        early_close_minute_idx + 1,
844
                        early_close_minute_idx + 181
845
                    )
846
                )
847
848
        return_data[0:len(data_to_copy)] = data_to_copy
849
850
        self._apply_all_adjustments(
851
            return_data,
852
            asset,
853
            minutes_for_window,
854
            field,
855
            self.MINUTE_PRICE_ADJUSTMENT_FACTOR
856
        )
857
858
        return return_data
859
860
    def _apply_all_adjustments(self, data, asset, dts, field,
861
                               price_adj_factor=1.0):
862
        """
863
        Internal method that applies all the necessary adjustments on the
864
        given data array.
865
866
        The adjustments are:
867
        - splits
868
        - if field != "volume":
869
            - mergers
870
            - dividends
871
            - * 0.001
872
            - any zero fields replaced with NaN
873
        - all values rounded to 3 digits after the decimal point.
874
875
        Parameters
876
        ----------
877
        data : np.array
878
            The data to be adjusted.
879
880
        asset: Asset
881
            The asset whose data is being adjusted.
882
883
        dts: pd.DateTimeIndex
884
            The list of minutes or days representing the desired window.
885
886
        field: string
887
            The field whose values are in the data array.
888
889
        price_adj_factor: float
890
            Factor with which to adjust OHLC values.
891
        Returns
892
        -------
893
        None.  The data array is modified in place.
894
        """
895
        self._apply_adjustments_to_window(
896
            self._get_adjustment_list(
897
                asset, self._splits_dict, "SPLITS"
898
            ),
899
            data,
900
            dts,
901
            field != 'volume'
902
        )
903
904
        if field != 'volume':
905
            self._apply_adjustments_to_window(
906
                self._get_adjustment_list(
907
                    asset, self._mergers_dict, "MERGERS"
908
                ),
909
                data,
910
                dts,
911
                True
912
            )
913
914
            self._apply_adjustments_to_window(
915
                self._get_adjustment_list(
916
                    asset, self._dividends_dict, "DIVIDENDS"
917
                ),
918
                data,
919
                dts,
920
                True
921
            )
922
923
            data *= price_adj_factor
924
925
            # if anything is zero, it's a missing bar, so replace it with NaN.
926
            # we only want to do this for non-volume fields, because a missing
927
            # volume should be 0.
928
            data[data == 0] = np.NaN
929
930
        np.around(data, 3, out=data)
931
932
    @staticmethod
933
    def _find_position_of_minute(minute_dt):
934
        """
935
        Internal method that returns the position of the given minute in the
936
        list of every trading minute since market open on 1/2/2002.
937
938
        IMPORTANT: This method assumes every day is 390 minutes long, even
939
        early closes.  Our minute bcolz files are generated like this to
940
        support fast lookup.
941
942
        ex. this method would return 2 for 1/2/2002 9:32 AM Eastern.
943
944
        Parameters
945
        ----------
946
        minute_dt: pd.Timestamp
947
            The minute whose position should be calculated.
948
949
        Returns
950
        -------
951
        The position of the given minute in the list of all trading minutes
952
        since market open on 1/2/2002.
953
        """
954
        day = minute_dt.date()
955
        day_idx = tradingcalendar.trading_days.searchsorted(day) -\
956
            INDEX_OF_FIRST_TRADING_DAY
957
958
        day_open = pd.Timestamp(
959
            datetime(
960
                year=day.year,
961
                month=day.month,
962
                day=day.day,
963
                hour=9,
964
                minute=31),
965
            tz='US/Eastern').tz_convert('UTC')
966
967
        minutes_offset = int((minute_dt - day_open).total_seconds()) / 60
968
969
        return int((390 * day_idx) + minutes_offset)
970
971
    def _get_daily_window_for_sid(self, asset, field, days_in_window,
972
                                  extra_slot=True):
973
        """
974
        Internal method that gets a window of adjusted daily data for a sid
975
        and specified date range.  Used to support the history API method for
976
        daily bars.
977
978
        Parameters
979
        ----------
980
        asset : Asset
981
            The asset whose data is desired.
982
983
        start_dt: pandas.Timestamp
984
            The start of the desired window of data.
985
986
        bar_count: int
987
            The number of days of data to return.
988
989
        field: string
990
            The specific field to return.  "open", "high", "close_price", etc.
991
992
        extra_slot: boolean
993
            Whether to allocate an extra slot in the returned numpy array.
994
            This extra slot will hold the data for the last partial day.  It's
995
            much better to create it here than to create a copy of the array
996
            later just to add a slot.
997
998
        Returns
999
        -------
1000
        A numpy array with requested values.  Any missing slots filled with
1001
        nan.
1002
1003
        """
1004
        daily_data, daily_attrs = self._open_daily_file()
1005
1006
        # the daily file stores each sid's daily OHLCV in a contiguous block.
1007
        # the first row per sid is either 1/2/2002, or the sid's start_date if
1008
        # it started after 1/2/2002.  once a sid stops trading, there are no
1009
        # rows for it.
1010
1011
        bar_count = len(days_in_window)
1012
1013
        # create an np.array of size bar_count
1014
        if extra_slot:
1015
            return_array = np.zeros((bar_count + 1,))
1016
        else:
1017
            return_array = np.zeros((bar_count,))
1018
1019
        return_array[:] = np.NAN
1020
        sid = int(asset)
1021
1022
        # find the start index in the daily file for this asset
1023
        asset_file_index = daily_attrs['first_row'][str(sid)]
1024
1025
        trading_days = tradingcalendar.trading_days
1026
1027
        # Calculate the starting day to use (either the asset's first trading
1028
        # day, or 1/1/2002 (which is the 3028th day in the trading calendar).
1029
        first_trading_day_to_use = max(trading_days.searchsorted(
1030
            self._asset_start_dates[asset]), INDEX_OF_FIRST_TRADING_DAY)
1031
1032
        # find the # of trading days between max(asset's first trade date,
1033
        # 2002-01-02) and start_dt
1034
        window_offset = (trading_days.searchsorted(days_in_window[0]) -
1035
                         first_trading_day_to_use)
1036
1037
        start_index = max(asset_file_index, asset_file_index + window_offset)
1038
1039
        if window_offset < 0 and (abs(window_offset) > bar_count):
1040
            # consumer is requesting a history window that starts AND ends
1041
            # before this equity started trading, so gtfo
1042
            return return_array
1043
1044
        # find the end index in the daily file. make sure it doesn't extend
1045
        # past the end of this asset's data in the daily file.
1046
        if window_offset < 0:
1047
            # if the window_offset is negative, we need to decrease the
1048
            # end_index accordingly.
1049
            end_index = min(start_index + window_offset + bar_count,
1050
                            daily_attrs['last_row'][str(sid)] + 1)
1051
1052
            # get data from bcolz file
1053
            data = daily_data[field][start_index:end_index]
1054
1055
            # have to leave a bunch of empty slots at the beginning of
1056
            # return_array, since they represent days before this asset
1057
            # started trading.
1058
            return_array[abs(window_offset):bar_count] = data
1059
        else:
1060
            end_index = min(start_index + bar_count,
1061
                            daily_attrs['last_row'][str(sid)])
1062
            data = daily_data[field][start_index:(end_index + 1)]
1063
1064
            if len(data) > len(return_array):
1065
                return_array[:] = data[0:len(return_array)]
1066
            else:
1067
                return_array[0:len(data)] = data
1068
1069
        self._apply_all_adjustments(
1070
            return_array,
1071
            sid,
1072
            days_in_window,
1073
            field,
1074
            self.DAILY_PRICE_ADJUSTMENT_FACTOR
1075
        )
1076
1077
        return return_array
1078
1079
    @staticmethod
1080
    def _apply_adjustments_to_window(adjustments_list, window_data,
1081
                                     dts_in_window, multiply):
1082
        if len(adjustments_list) == 0:
1083
            return
1084
1085
        # advance idx to the correct spot in the adjustments list, based on
1086
        # when the window starts
1087
        idx = 0
1088
1089
        while idx < len(adjustments_list) and dts_in_window[0] >\
1090
                adjustments_list[idx][0]:
1091
            idx += 1
1092
1093
        # if we've advanced through all the adjustments, then there's nothing
1094
        # to do.
1095
        if idx == len(adjustments_list):
1096
            return
1097
1098
        while idx < len(adjustments_list):
1099
            adjustment_to_apply = adjustments_list[idx]
1100
1101
            if adjustment_to_apply[0] > dts_in_window[-1]:
1102
                break
1103
1104
            range_end = dts_in_window.searchsorted(adjustment_to_apply[0])
1105
            if multiply:
1106
                window_data[0:range_end] *= adjustment_to_apply[1]
1107
            else:
1108
                window_data[0:range_end] /= adjustment_to_apply[1]
1109
1110
            idx += 1
1111
1112
    def _get_adjustment_list(self, asset, adjustments_dict, table_name):
1113
        """
1114
        Internal method that returns a list of adjustments for the given sid.
1115
1116
        Parameters
1117
        ----------
1118
        asset : Asset
1119
            The asset for which to return adjustments.
1120
1121
        adjustments_dict: dict
1122
            A dictionary of sid -> list that is used as a cache.
1123
1124
        table_name: string
1125
            The table that contains this data in the adjustments db.
1126
1127
        Returns
1128
        -------
1129
        adjustments: list
1130
            A list of [multiplier, pd.Timestamp], earliest first
1131
1132
        """
1133
        if self._adjustment_reader is None:
1134
            return []
1135
1136
        sid = int(asset)
1137
1138
        try:
1139
            adjustments = adjustments_dict[sid]
1140
        except KeyError:
1141
            adjustments = adjustments_dict[sid] = self._adjustment_reader.\
1142
                get_adjustments_for_sid(table_name, sid)
1143
1144
        return adjustments
1145
1146
    def get_equity_price_view(self, asset):
1147
        """
1148
        Returns a DataPortalSidView for the given asset.  Used to support the
1149
        data[sid(N)] public API.  Not needed if DataPortal is used standalone.
1150
1151
        Parameters
1152
        ----------
1153
        asset : Asset
1154
            Asset that is being queried.
1155
1156
        Returns
1157
        -------
1158
        DataPortalSidView: Accessor into the given asset's data.
1159
        """
1160
        try:
1161
            view = self.views[asset]
1162
        except KeyError:
1163
            view = self.views[asset] = DataPortalSidView(asset, self)
1164
1165
        return view
1166
1167
    def _check_is_currently_alive(self, asset, dt):
1168
        if dt is None:
1169
            dt = self.current_day
1170
1171
        sid = int(asset)
1172
1173
        if sid not in self._asset_start_dates:
1174
            self._get_asset_start_date(asset)
1175
1176
        start_date = self._asset_start_dates[sid]
1177
        if self._asset_start_dates[sid] > dt:
1178
            raise NoTradeDataAvailableTooEarly(
1179
                sid=sid,
1180
                dt=dt,
1181
                start_dt=start_date
1182
            )
1183
1184
        end_date = self._asset_end_dates[sid]
1185
        if self._asset_end_dates[sid] < dt:
1186
            raise NoTradeDataAvailableTooLate(
1187
                sid=sid,
1188
                dt=dt,
1189
                end_dt=end_date
1190
            )
1191
1192
    def _get_asset_start_date(self, asset):
1193
        self._ensure_asset_dates(asset)
1194
        return self._asset_start_dates[asset]
1195
1196
    def _get_asset_end_date(self, asset):
1197
        self._ensure_asset_dates(asset)
1198
        return self._asset_end_dates[asset]
1199
1200
    def _ensure_asset_dates(self, asset):
1201
        sid = int(asset)
1202
1203
        if sid not in self._asset_start_dates:
1204
            self._asset_start_dates[sid] = asset.start_date
1205
            self._asset_end_dates[sid] = asset.end_date
1206
1207
    def get_splits(self, sids, dt):
1208
        """
1209
        Returns any splits for the given sids and the given dt.
1210
1211
        Parameters
1212
        ----------
1213
        sids : list
1214
            Sids for which we want splits.
1215
1216
        dt: pd.Timestamp
1217
            The date for which we are checking for splits.  Note: this is
1218
            expected to be midnight UTC.
1219
1220
        Returns
1221
        -------
1222
        list: List of splits, where each split is a (sid, ratio) tuple.
1223
        """
1224
        if self._adjustment_reader is None or len(sids) == 0:
1225
            return {}
1226
1227
        # convert dt to # of seconds since epoch, because that's what we use
1228
        # in the adjustments db
1229
        seconds = int(dt.value / 1e9)
1230
1231
        splits = self._adjustment_reader.conn.execute(
1232
            "SELECT sid, ratio FROM SPLITS WHERE effective_date = ?",
1233
            (seconds,)).fetchall()
1234
1235
        sids_set = set(sids)
1236
        splits = [split for split in splits if split[0] in sids_set]
1237
1238
        return splits
1239
1240
    def get_stock_dividends(self, sid, trading_days):
1241
        """
1242
        Returns all the stock dividends for a specific sid that occur
1243
        in the given trading range.
1244
1245
        Parameters
1246
        ----------
1247
        sid: int
1248
            The asset whose stock dividends should be returned.
1249
1250
        trading_days: pd.DatetimeIndex
1251
            The trading range.
1252
1253
        Returns
1254
        -------
1255
        list: A list of objects with all relevant attributes populated.
1256
        All timestamp fields are converted to pd.Timestamps.
1257
        """
1258
1259
        if self._adjustment_reader is None:
1260
            return []
1261
1262
        if len(trading_days) == 0:
1263
            return []
1264
1265
        start_dt = trading_days[0].value / 1e9
1266
        end_dt = trading_days[-1].value / 1e9
1267
1268
        dividends = self._adjustment_reader.conn.execute(
1269
            "SELECT * FROM stock_dividend_payouts WHERE sid = ? AND "
1270
            "ex_date > ? AND pay_date < ?", (int(sid), start_dt, end_dt,)).\
1271
            fetchall()
1272
1273
        dividend_info = []
1274
        for dividend_tuple in dividends:
1275
            dividend_info.append({
1276
                "declared_date": dividend_tuple[1],
1277
                "ex_date": pd.Timestamp(dividend_tuple[2], unit="s"),
1278
                "pay_date": pd.Timestamp(dividend_tuple[3], unit="s"),
1279
                "payment_sid": dividend_tuple[4],
1280
                "ratio": dividend_tuple[5],
1281
                "record_date": pd.Timestamp(dividend_tuple[6], unit="s"),
1282
                "sid": dividend_tuple[7]
1283
            })
1284
1285
        return dividend_info
1286
1287
    def contains(self, asset, field):
1288
        return field in BASE_FIELDS or \
1289
            (field in self._augmented_sources_map and
1290
             asset in self._augmented_sources_map[field])
1291
1292
    def get_fetcher_assets(self):
1293
        """
1294
        Returns a list of assets for the current date, as defined by the
1295
        fetcher data.
1296
1297
        Notes
1298
        -----
1299
        Data is forward-filled.  If there is no fetcher data defined for day
1300
        N, we use day N-1's data (if available, otherwise we keep going back).
1301
1302
        Returns
1303
        -------
1304
        list: a list of Asset objects.
1305
        """
1306
        # return a list of assets for the current date, as defined by the
1307
        # fetcher source
1308
        if self._fetcher_df is None:
1309
            return []
1310
1311
        if self.current_day in self._fetcher_df.index:
1312
            date_to_use = self.current_day
1313
        else:
1314
            # current day isn't in the fetcher df, go back the last
1315
            # available day
1316
            idx = self._fetcher_df.index.searchsorted(self.current_day)
1317
            if idx == 0:
1318
                return []
1319
1320
            date_to_use = self._fetcher_df.index[idx - 1]
1321
1322
        asset_list = self._fetcher_df.loc[date_to_use]["sid"]
1323
1324
        # make sure they're actually assets
1325
        asset_list = [asset for asset in asset_list
1326
                      if isinstance(asset, Asset)]
1327
1328
        return asset_list
1329
1330
1331
class DataPortalSidView(object):
1332
    def __init__(self, asset, portal):
1333
        self.asset = asset
1334
        self.portal = portal
1335
1336
    def __getattr__(self, column):
1337
        return self.portal.get_spot_value(self.asset, column)
1338
1339
    def __contains__(self, column):
1340
        return self.portal.contains(self.asset, column)
1341
1342
    def __getitem__(self, column):
1343
        return self.__getattr__(column)
1344