Completed
Pull Request — master (#858)
by
unknown
02:41
created

zipline.data.DataPortal._get_minute_spot_value()   D

Complexity

Conditions 8

Size

Total Lines 77

Duplication

Lines 0
Ratio 0 %
Metric Value
cc 8
dl 0
loc 77
rs 4.8356

How to fix   Long Method   

Long Method

Small methods make your code easier to understand, in particular if combined with a good name. Besides, if your method is small, finding a good name is usually much easier.

For example, if you find yourself adding comments to a method's body, this is usually a good sign to extract the commented part to a new method, and use the comment as a starting point when coming up with a good name for this new method.

Commonly applied refactorings include:

1
#
2
# Copyright 2015 Quantopian, Inc.
3
#
4
# Licensed under the Apache License, Version 2.0 (the "License");
5
# you may not use this file except in compliance with the License.
6
# You may obtain a copy of the License at
7
#
8
#     http://www.apache.org/licenses/LICENSE-2.0
9
#
10
# Unless required by applicable law or agreed to in writing, software
11
# distributed under the License is distributed on an "AS IS" BASIS,
12
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13
# See the License for the specific language governing permissions and
14
# limitations under the License.
15
16
from datetime import datetime
17
import bcolz
18
from logbook import Logger
19
20
import numpy as np
21
import pandas as pd
22
from pandas.tslib import normalize_date
23
from six import iteritems
24
25
from zipline.assets import Asset, Future, Equity
26
from zipline.data.us_equity_pricing import (
27
    BcolzDailyBarReader,
28
    NoDataOnDate
29
)
30
from zipline.pipeline.data.equity_pricing import USEquityPricing
31
32
from zipline.utils import tradingcalendar
33
from zipline.errors import (
34
    NoTradeDataAvailableTooEarly,
35
    NoTradeDataAvailableTooLate
36
)
37
38
FIRST_TRADING_MINUTE = pd.Timestamp("2002-01-02 14:31:00", tz='UTC')
39
40
# FIXME should this be passed in (is this qexec specific?)?
41
INDEX_OF_FIRST_TRADING_DAY = 3028
42
43
log = Logger('DataPortal')
44
45
HISTORY_FREQUENCIES = ["1d", "1m"]
46
47
BASE_FIELDS = {
48
    'open': 'open',
49
    'open_price': 'open',
50
    'high': 'high',
51
    'low': 'low',
52
    'close': 'close',
53
    'close_price': 'close',
54
    'volume': 'volume',
55
    'price': 'close'
56
}
57
58
59
US_EQUITY_COLUMNS = {
60
    'open': USEquityPricing.open,
61
    'open_price': USEquityPricing.open,
62
    'high': USEquityPricing.high,
63
    'low': USEquityPricing.low,
64
    'close': USEquityPricing.close,
65
    'close_price': USEquityPricing.close,
66
    'volume': USEquityPricing.volume,
67
    'price': USEquityPricing.close,
68
}
69
70
71
class DataPortal(object):
72
    def __init__(self,
73
                 env,
74
                 sim_params=None,
75
                 minutes_equities_path=None,
76
                 daily_equities_path=None,
77
                 adjustment_reader=None,
78
                 equity_sid_path_func=None,
79
                 futures_sid_path_func=None):
80
        self.env = env
81
82
        # Internal pointers to the current dt (can be minute) and current day.
83
        # In daily mode, they point to the same thing. In minute mode, it's
84
        # useful to have separate pointers to the current day and to the
85
        # current minute.  These pointers are updated by the
86
        # AlgorithmSimulator's transform loop.
87
        self.current_dt = None
88
        self.current_day = None
89
90
        # This is a bit ugly, but is here for performance reasons.  In minute
91
        # simulations, we need to very quickly go from dt -> (# of minutes
92
        # since Jan 1 2002 9:30 Eastern).
93
        #
94
        # The clock that heartbeats the simulation has all the necessary
95
        # information to do this calculation very quickly.  This value is
96
        # calculated there, and then set here
97
        self.cur_data_offset = 0
98
99
        self.views = {}
100
101
        self._minutes_equities_path = minutes_equities_path
102
        self._daily_equities_path = daily_equities_path
103
        self._asset_finder = env.asset_finder
104
105
        self._carrays = {
106
            'open': {},
107
            'high': {},
108
            'low': {},
109
            'close': {},
110
            'volume': {},
111
            'sid': {},
112
            'dt': {},
113
        }
114
115
        self._adjustment_reader = adjustment_reader
116
117
        # caches of sid -> adjustment list
118
        self._splits_dict = {}
119
        self._mergers_dict = {}
120
        self._dividends_dict = {}
121
122
        # Pointer to the daily bcolz file.
123
        self._daily_equities_data = None
124
125
        # Cache of sid -> the first trading day of an asset, even if that day
126
        # is before 1/2/2002.
127
        self._asset_start_dates = {}
128
        self._asset_end_dates = {}
129
130
        # Fetcher state
131
        self._augmented_sources_map = {}
132
        self._fetcher_df = None
133
134
        self._sim_params = sim_params
135
        if self._sim_params is not None:
136
            self._data_frequency = self._sim_params.data_frequency
137
        else:
138
            self._data_frequency = "minute"
139
140
        self._equity_sid_path_func = equity_sid_path_func
141
        self._futures_sid_path_func = futures_sid_path_func
142
143
        self.MINUTE_PRICE_ADJUSTMENT_FACTOR = 0.001
144
145
        if daily_equities_path is not None:
146
            self._daily_bar_reader = BcolzDailyBarReader(daily_equities_path)
147
        else:
148
            self._daily_bar_reader = None
149
150
    def handle_extra_source(self, source_df):
151
        """
152
        Extra sources always have a sid column.
153
154
        We expand the given data (by forward filling) to the full range of
155
        the simulation dates, so that lookup is fast during simulation.
156
        """
157
        if source_df is None:
158
            return
159
160
        self._fetcher_df = source_df
161
162
        # source_df's sid column can either consist of assets we know about
163
        # (such as sid(24)) or of assets we don't know about (such as
164
        # palladium).
165
        #
166
        # In both cases, we break up the dataframe into individual dfs
167
        # that only contain a single asset's information.  ie, if source_df
168
        # has data for PALLADIUM and GOLD, we split source_df into two
169
        # dataframes, one for each. (same applies if source_df has data for
170
        # AAPL and IBM).
171
        #
172
        # We then take each child df and reindex it to the simulation's date
173
        # range by forward-filling missing values. this makes reads simpler.
174
        #
175
        # Finally, we store the data. For each fetcher column, we store a
176
        # mapping in self.augmented_sources_map from it to a dictionary of
177
        # asset -> df.  In other words,
178
        # self.augmented_sources_map['days_to_cover']['AAPL'] gives us the df
179
        # holding that data.
180
181
        if self._sim_params.emission_rate == "daily":
182
            fetcher_date_index = self.env.days_in_range(
183
                start=self._sim_params.period_start,
184
                end=self._sim_params.period_end
185
            )
186
        else:
187
            fetcher_date_index = self.env.minutes_for_days_in_range(
188
                start=self._sim_params.period_start,
189
                end=self._sim_params.period_end
190
            )
191
192
        # break the source_df up into one dataframe per sid.  this lets
193
        # us (more easily) calculate accurate start/end dates for each sid,
194
        # de-dup data, and expand the data to fit the backtest start/end date.
195
        grouped_by_sid = source_df.groupby(["sid"])
196
        group_names = grouped_by_sid.groups.keys()
197
        group_dict = {}
198
        for group_name in group_names:
199
            group_dict[group_name] = grouped_by_sid.get_group(group_name)
200
201
        for identifier, df in iteritems(group_dict):
202
            # before reindexing, save the earliest and latest dates
203
            earliest_date = df.index[0]
204
            latest_date = df.index[-1]
205
206
            # since we know this df only contains a single sid, we can safely
207
            # de-dupe by the index (dt)
208
            df = df.groupby(level=0).last()
209
210
            # reindex the dataframe based on the backtest start/end date.
211
            # this makes reads easier during the backtest.
212
            df = df.reindex(index=fetcher_date_index, method='ffill')
213
214
            if not isinstance(identifier, Asset):
215
                # for fake assets we need to store a start/end date
216
                self._asset_start_dates[identifier] = earliest_date
217
                self._asset_end_dates[identifier] = latest_date
218
219
            for col_name in df.columns.difference(['sid']):
220
                if col_name not in self._augmented_sources_map:
221
                    self._augmented_sources_map[col_name] = {}
222
223
                self._augmented_sources_map[col_name][identifier] = df
224
225
    def _open_minute_file(self, field, asset):
226
        sid_str = str(int(asset))
227
228
        try:
229
            carray = self._carrays[field][sid_str]
230
        except KeyError:
231
            carray = self._carrays[field][sid_str] = \
232
                self._get_ctable(asset)[field]
233
234
        return carray
235
236
    def _get_ctable(self, asset):
237
        sid = int(asset)
238
239
        if isinstance(asset, Future) and \
240
                self._futures_sid_path_func is not None:
241
            path = self._futures_sid_path_func(
242
                self._minutes_equities_path, sid
243
            )
244
        elif isinstance(asset, Equity) and \
245
                self._equity_sid_path_func is not None:
246
            path = self._equity_sid_path_func(
247
                self._minutes_equities_path, sid
248
            )
249
        else:
250
            path = "{0}/{1}.bcolz".format(self._minutes_equities_path, sid)
251
252
        return bcolz.open(path, mode='r')
253
254
    def get_previous_value(self, asset, field, dt):
255
        """
256
        Given an asset and a column and a dt, returns the previous value for
257
        the same asset/column pair.  If this data portal is in minute mode,
258
        it's the previous minute value, otherwise it's the previous day's
259
        value.
260
261
        Parameters
262
        ---------
263
        asset : Asset
264
            The asset whose data is desired.
265
266
        field: string
267
            The desired field of the asset.  Valid values are "open",
268
            "open_price", "high", "low", "close", "close_price", "volume", and
269
            "price".
270
271
        dt: pd.Timestamp
272
            The timestamp from which to go back in time one slot.
273
274
        Returns
275
        -------
276
        The value of the desired field at the desired time.
277
        """
278
        if self._data_frequency == 'daily':
279
            prev_dt = self.env.previous_trading_day(dt)
280
        elif self._data_frequency == 'minute':
281
            prev_dt = self.env.previous_market_minute(dt)
282
283
        return self.get_spot_value(asset, field, prev_dt)
284
285
    def _check_fetcher(self, asset, column, day):
286
        # if there is a fetcher column called "price", only look at it if
287
        # it's on something like palladium and not AAPL (since our own price
288
        # data always wins when dealing with assets)
289
        look_in_augmented_sources = column in self._augmented_sources_map and \
290
            not (column in BASE_FIELDS and isinstance(asset, Asset))
291
292
        if look_in_augmented_sources:
293
            # we're being asked for a fetcher field
294
            try:
295
                return self._augmented_sources_map[column][asset].\
296
                    loc[day, column]
297
            except:
298
                log.error(
299
                    "Could not find value for asset={0}, current_day={1},"
300
                    "column={2}".format(
301
                        str(asset),
302
                        str(self.current_day),
303
                        str(column)))
304
305
                raise KeyError
306
307
    def get_spot_value(self, asset, field, dt=None):
308
        """
309
        Public API method that returns a scalar value representing the value
310
        of the desired asset's field at either the given dt, or this data
311
        portal's current_dt.
312
313
        Parameters
314
        ---------
315
        asset : Asset
316
            The asset whose data is desired.
317
318
        field: string
319
            The desired field of the asset.  Valid values are "open",
320
            "open_price", "high", "low", "close", "close_price", "volume", and
321
            "price".
322
323
        dt: pd.Timestamp
324
            (Optional) The timestamp for the desired value.
325
326
        Returns
327
        -------
328
        The value of the desired field at the desired time.
329
        """
330
        fetcher_val = self._check_fetcher(asset, field,
331
                                          (dt or self.current_dt))
332
333
        if fetcher_val is not None:
334
            return fetcher_val
335
336
        if field not in BASE_FIELDS:
337
            raise KeyError("Invalid column: " + str(field))
338
339
        column_to_use = BASE_FIELDS[field]
340
341
        if isinstance(asset, int):
342
            asset = self._asset_finder.retrieve_asset(asset)
343
344
        self._check_is_currently_alive(asset, dt)
345
346
        if self._data_frequency == "daily":
347
            day_to_use = dt or self.current_day
348
            day_to_use = normalize_date(day_to_use)
349
            return self._get_daily_data(asset, column_to_use, day_to_use)
350
        else:
351
            dt_to_use = dt or self.current_dt
352
353
            if isinstance(asset, Future):
354
                return self._get_minute_spot_value_future(
355
                    asset, column_to_use, dt_to_use)
356
            else:
357
                return self._get_minute_spot_value(
358
                    asset, column_to_use, dt_to_use)
359
360
    def _get_minute_spot_value_future(self, asset, column, dt):
361
        # Futures bcolz files have 1440 bars per day (24 hours), 7 days a week.
362
        # The file attributes contain the "start_dt" and "last_dt" fields,
363
        # which represent the time period for this bcolz file.
364
365
        # The start_dt is midnight of the first day that this future started
366
        # trading.
367
368
        # figure out the # of minutes between dt and this asset's start_dt
369
        start_date = self._get_asset_start_date(asset)
370
        minute_offset = int((dt - start_date).total_seconds() / 60)
371
372
        if minute_offset < 0:
373
            # asking for a date that is before the asset's start date, no dice
374
            return 0.0
375
376
        # then just index into the bcolz carray at that offset
377
        carray = self._open_minute_file(column, asset)
378
        result = carray[minute_offset]
379
380
        # if there's missing data, go backwards until we run out of file
381
        while result == 0 and minute_offset > 0:
382
            minute_offset -= 1
383
            result = carray[minute_offset]
384
385
        if column != 'volume':
386
            return result * self.MINUTE_PRICE_ADJUSTMENT_FACTOR
387
        else:
388
            return result
389
390
    def _get_minute_spot_value(self, asset, column, dt):
391
        # All our minute bcolz files are written starting on 1/2/2002,
392
        # with 390 minutes per day, regarding of when the security started
393
        # trading. This lets us avoid doing an offset calculation related
394
        # to the asset start date.  Hard-coding 390 minutes per day lets us
395
        # ignore half days.
396
397
        if dt == self.current_dt:
398
            minute_offset_to_use = self.cur_data_offset
399
        else:
400
            # this is the slow path.
401
            # dt was passed in, so calculate the offset.
402
            # = (390 * number of trading days since 1/2/2002) +
403
            #   (index of minute in day)
404
            given_day = pd.Timestamp(dt.date(), tz='utc')
405
            day_index = tradingcalendar.trading_days.searchsorted(
406
                given_day) - INDEX_OF_FIRST_TRADING_DAY
407
408
            # if dt is before the first market minute, minute_index
409
            # will be 0.  if it's after the last market minute, it'll
410
            # be len(minutes_for_day)
411
            minute_index = self.env.market_minutes_for_day(given_day).\
412
                searchsorted(dt)
413
414
            minute_offset_to_use = (day_index * 390) + minute_index
415
416
        carray = self._open_minute_file(column, asset)
417
        result = carray[minute_offset_to_use]
418
419
        if result == 0:
420
            # if the given minute doesn't have data, we need to seek
421
            # backwards until we find data. This makes the data
422
            # forward-filled.
423
424
            # get this asset's start date, so that we don't look before it.
425
            start_date = self._get_asset_start_date(asset)
426
            start_date_idx = tradingcalendar.trading_days.searchsorted(
427
                start_date) - INDEX_OF_FIRST_TRADING_DAY
428
            start_day_offset = start_date_idx * 390
429
430
            original_start = minute_offset_to_use
431
432
            while result == 0 and minute_offset_to_use > start_day_offset:
433
                minute_offset_to_use -= 1
434
                result = carray[minute_offset_to_use]
435
436
            # once we've found data, we need to check whether it needs
437
            # to be adjusted.
438
            if result != 0:
439
                minutes = self.env.market_minute_window(
440
                    start=(dt or self.current_dt),
441
                    count=(original_start - minute_offset_to_use + 1),
442
                    step=-1
443
                ).order()
444
445
                # only need to check for adjustments if we've gone back
446
                # far enough to cross the day boundary.
447
                if minutes[0].date() != minutes[-1].date():
448
                    # create a np array of size minutes, fill it all with
449
                    # the same value.  and adjust the array.
450
                    arr = np.array([result] * len(minutes),
451
                                   dtype=np.float64)
452
                    self._apply_all_adjustments(
453
                        data=arr,
454
                        asset=asset,
455
                        dts=minutes,
456
                        field=column
457
                    )
458
459
                    # The first value of the adjusted array is the value
460
                    # we want.
461
                    result = arr[0]
462
463
        if column != 'volume':
464
            return result * self.MINUTE_PRICE_ADJUSTMENT_FACTOR
465
        else:
466
            return result
467
468
    def _get_daily_data(self, asset, column, dt):
469
        while True:
470
            try:
471
                value = self._daily_bar_reader.spot_price(asset, dt, column)
472
                if value != -1:
473
                    return value
474
                else:
475
                    dt -= tradingcalendar.trading_day
476
            except NoDataOnDate:
477
                return 0
478
479
    def _get_history_daily_window(self, assets, end_dt, bar_count,
480
                                  field_to_use):
481
        """
482
        Internal method that returns a dataframe containing history bars
483
        of daily frequency for the given sids.
484
        """
485
        day_idx = tradingcalendar.trading_days.searchsorted(end_dt.date())
486
        days_for_window = tradingcalendar.trading_days[
487
            (day_idx - bar_count + 1):(day_idx + 1)]
488
489
        if len(assets) == 0:
490
            return pd.DataFrame(None,
491
                                index=days_for_window,
492
                                columns=None)
493
494
        data = []
495
496
        for asset in assets:
497
            if isinstance(asset, Future):
498
                data.append(self._get_history_daily_window_future(
499
                    asset, days_for_window, end_dt, field_to_use
500
                ))
501
            else:
502
                data.append(self._get_history_daily_window_equity(
503
                    asset, days_for_window, end_dt, field_to_use
504
                ))
505
506
        return pd.DataFrame(
507
            np.array(data).T,
508
            index=days_for_window,
509
            columns=assets
510
        )
511
512
    def _get_history_daily_window_future(self, asset, days_for_window,
513
                                         end_dt, column):
514
        # Since we don't have daily bcolz files for futures (yet), use minute
515
        # bars to calculate the daily values.
516
        data = []
517
        data_groups = []
518
519
        # get all the minutes for the days NOT including today
520
        for day in days_for_window[:-1]:
521
            minutes = self.env.market_minutes_for_day(day)
522
523
            values_for_day = np.zeros(len(minutes), dtype=np.float64)
524
525
            for idx, minute in enumerate(minutes):
526
                minute_val = self._get_minute_spot_value_future(
527
                    asset, column, minute
528
                )
529
530
                values_for_day[idx] = minute_val
531
532
            data_groups.append(values_for_day)
533
534
        # get the minutes for today
535
        last_day_minutes = pd.date_range(
536
            start=self.env.get_open_and_close(end_dt)[0],
537
            end=end_dt,
538
            freq="T"
539
        )
540
541
        values_for_last_day = np.zeros(len(last_day_minutes), dtype=np.float64)
542
543
        for idx, minute in enumerate(last_day_minutes):
544
            minute_val = self._get_minute_spot_value_future(
545
                asset, column, minute
546
            )
547
548
            values_for_last_day[idx] = minute_val
549
550
        data_groups.append(values_for_last_day)
551
552
        for group in data_groups:
553
            if len(group) == 0:
554
                continue
555
556
            if column == 'volume':
557
                data.append(np.sum(group))
558
            elif column == 'open':
559
                data.append(group[0])
560
            elif column == 'close':
561
                data.append(group[-1])
562
            elif column == 'high':
563
                data.append(np.amax(group))
564
            elif column == 'low':
565
                data.append(np.amin(group))
566
567
        return data
568
569
    def _get_history_daily_window_equity(self, asset, days_for_window,
570
                                         end_dt, field_to_use):
571
        sid = int(asset)
572
        ends_at_midnight = end_dt.hour == 0 and end_dt.minute == 0
573
574
        # get the start and end dates for this sid
575
        end_date = self._get_asset_end_date(asset)
576
577
        if ends_at_midnight or (days_for_window[-1] > end_date):
578
            # two cases where we use daily data for the whole range:
579
            # 1) the history window ends at midnight utc.
580
            # 2) the last desired day of the window is after the
581
            # last trading day, use daily data for the whole range.
582
            return self._get_daily_window_for_sid(
583
                asset,
584
                field_to_use,
585
                days_for_window,
586
                extra_slot=False
587
            )
588
        else:
589
            # for the last day of the desired window, use minute
590
            # data and aggregate it.
591
            all_minutes_for_day = self.env.market_minutes_for_day(
592
                pd.Timestamp(end_dt.date()))
593
594
            last_minute_idx = all_minutes_for_day.searchsorted(end_dt)
595
596
            # these are the minutes for the partial day
597
            minutes_for_partial_day =\
598
                all_minutes_for_day[0:(last_minute_idx + 1)]
599
600
            daily_data = self._get_daily_window_for_sid(
601
                sid,
602
                field_to_use,
603
                days_for_window[0:-1]
604
            )
605
606
            minute_data = self._get_minute_window_for_equity(
607
                sid,
608
                field_to_use,
609
                minutes_for_partial_day
610
            )
611
612
            if field_to_use == 'volume':
613
                minute_value = np.sum(minute_data)
614
            elif field_to_use == 'open':
615
                minute_value = minute_data[0]
616
            elif field_to_use == 'close':
617
                minute_value = minute_data[-1]
618
            elif field_to_use == 'high':
619
                minute_value = np.amax(minute_data)
620
            elif field_to_use == 'low':
621
                minute_value = np.amin(minute_data)
622
623
            # append the partial day.
624
            daily_data[-1] = minute_value
625
626
            return daily_data
627
628
    def _get_history_minute_window(self, assets, end_dt, bar_count,
629
                                   field_to_use):
630
        """
631
        Internal method that returns a dataframe containing history bars
632
        of minute frequency for the given sids.
633
        """
634
        # get all the minutes for this window
635
        minutes_for_window = self.env.market_minute_window(
636
            end_dt, bar_count, step=-1)[::-1]
637
638
        # but then cut it down to only the minutes after
639
        # FIRST_TRADING_MINUTE
640
        modified_minutes_for_window = minutes_for_window[
641
            minutes_for_window.slice_indexer(FIRST_TRADING_MINUTE)]
642
643
        modified_minutes_length = len(modified_minutes_for_window)
644
645
        if modified_minutes_length == 0:
646
            raise ValueError("Cannot calculate history window that ends"
647
                             "before 2002-01-02 14:31 UTC!")
648
649
        data = []
650
        bars_to_prepend = 0
651
        nans_to_prepend = None
652
653
        if modified_minutes_length < bar_count and \
654
           (modified_minutes_for_window[0] == FIRST_TRADING_MINUTE):
655
            # the beginning of the window goes before our global trading
656
            # start date
657
            bars_to_prepend = bar_count - modified_minutes_length
658
            nans_to_prepend = np.repeat(np.nan, bars_to_prepend)
659
660
        if len(assets) == 0:
661
            return pd.DataFrame(
662
                None,
663
                index=modified_minutes_for_window,
664
                columns=None
665
            )
666
667
        for asset in assets:
668
            asset_minute_data = self._get_minute_window_for_asset(
669
                asset,
670
                field_to_use,
671
                modified_minutes_for_window
672
            )
673
674
            if bars_to_prepend != 0:
675
                asset_minute_data = np.insert(asset_minute_data, 0,
676
                                              nans_to_prepend)
677
678
            data.append(asset_minute_data)
679
680
        return pd.DataFrame(
681
            np.array(data).T,
682
            index=minutes_for_window,
683
            columns=map(int, assets)
684
        )
685
686
    def get_history_window(self, assets, end_dt, bar_count, frequency, field,
687
                           ffill=True):
688
        """
689
        Public API method that returns a dataframe containing the requested
690
        history window.  Data is fully adjusted.
691
692
        Parameters
693
        ---------
694
        assets : list of zipline.data.Asset objects
695
            The assets whose data is desired.
696
697
        bar_count: int
698
            The number of bars desired.
699
700
        frequency: string
701
            "1d" or "1m"
702
703
        field: string
704
            The desired field of the asset.
705
706
        ffill: boolean
707
            Forward-fill missing values. Only has effect if field
708
            is 'price'.
709
710
        Returns
711
        -------
712
        A dataframe containing the requested data.
713
        """
714
        try:
715
            field_to_use = BASE_FIELDS[field]
716
        except KeyError:
717
            raise ValueError("Invalid history field: " + str(field))
718
719
        # sanity check in case sids were passed in
720
        assets = [(self.env.asset_finder.retrieve_asset(asset) if
721
                   isinstance(asset, int) else asset) for asset in assets]
722
723
        if frequency == "1d":
724
            df = self._get_history_daily_window(assets, end_dt, bar_count,
725
                                                field_to_use)
726
        elif frequency == "1m":
727
            df = self._get_history_minute_window(assets, end_dt, bar_count,
728
                                                 field_to_use)
729
        else:
730
            raise ValueError("Invalid frequency: {0}".format(frequency))
731
732
        # forward-fill if needed
733
        if field == "price" and ffill:
734
            df.fillna(method='ffill', inplace=True)
735
736
        return df
737
738
    def _get_minute_window_for_asset(self, asset, field, minutes_for_window):
739
        """
740
        Internal method that gets a window of adjusted minute data for an asset
741
        and specified date range.  Used to support the history API method for
742
        minute bars.
743
744
        Missing bars are filled with NaN.
745
746
        Parameters
747
        ----------
748
        asset : Asset
749
            The asset whose data is desired.
750
751
        field: string
752
            The specific field to return.  "open", "high", "close_price", etc.
753
754
        minutes_for_window: pd.DateTimeIndex
755
            The list of minutes representing the desired window.  Each minute
756
            is a pd.Timestamp.
757
758
        Returns
759
        -------
760
        A numpy array with requested values.
761
        """
762
        if isinstance(asset, int):
763
            asset = self.env.asset_finder.retrieve_asset(asset)
764
765
        if isinstance(asset, Future):
766
            return self._get_minute_window_for_future(asset, field,
767
                                                      minutes_for_window)
768
        else:
769
            return self._get_minute_window_for_equity(asset, field,
770
                                                      minutes_for_window)
771
772
    def _get_minute_window_for_future(self, asset, field, minutes_for_window):
773
        # THIS IS TEMPORARY.  For now, we are only exposing futures within
774
        # equity trading hours (9:30 am to 4pm, Eastern).  The easiest way to
775
        # do this is to simply do a spot lookup for each desired minute.
776
        return_data = np.zeros(len(minutes_for_window), dtype=np.float64)
777
        for idx, minute in enumerate(minutes_for_window):
778
            return_data[idx] = \
779
                self._get_minute_spot_value_future(asset, field, minute)
780
781
        # Note: an improvement could be to find the consecutive runs within
782
        # minutes_for_window, and use them to read the underlying ctable
783
        # more efficiently.
784
785
        # Once futures are on 24-hour clock, then we can just grab all the
786
        # requested minutes in one shot from the ctable.
787
788
        # no adjustments for futures, yay.
789
        return return_data
790
791
    def _get_minute_window_for_equity(self, asset, field, minutes_for_window):
792
        # each sid's minutes are stored in a bcolz file
793
        # the bcolz file has 390 bars per day, starting at 1/2/2002, regardless
794
        # of when the asset started trading and regardless of half days.
795
        # for a half day, the second half is filled with zeroes.
796
797
        # find the position of start_dt in the entire timeline, go back
798
        # bar_count bars, and that's the unadjusted data
799
        raw_data = self._open_minute_file(field, asset)
800
801
        start_idx = max(self._find_position_of_minute(minutes_for_window[0]),
802
                        0)
803
        end_idx = self._find_position_of_minute(minutes_for_window[-1]) + 1
804
805
        if end_idx == 0:
806
            # No data to return for minute window.
807
            return np.full(len(minutes_for_window), np.nan)
808
809
        return_data = np.zeros(len(minutes_for_window), dtype=np.float64)
810
811
        data_to_copy = raw_data[start_idx:end_idx]
812
813
        num_minutes = len(minutes_for_window)
814
815
        # data_to_copy contains all the zeros (from 1pm to 4pm of an early
816
        # close).  num_minutes is the number of actual trading minutes.  if
817
        # these two have different lengths, that means that we need to trim
818
        # away data due to early closes.
819
        if len(data_to_copy) != num_minutes:
820
            # get a copy of the minutes in Eastern time, since we depend on
821
            # an early close being at 1pm Eastern.
822
            eastern_minutes = minutes_for_window.tz_convert("US/Eastern")
823
824
            # accumulate a list of indices of the last minute of an early
825
            # close day.  For example, if data_to_copy starts at 12:55 pm, and
826
            # there are five minutes of real data before 180 zeroes, we would
827
            # put 5 into last_minute_idx_of_early_close_day, because the fifth
828
            # minute is the last "real" minute of the day.
829
            last_minute_idx_of_early_close_day = []
830
            for minute_idx, minute_dt in enumerate(eastern_minutes):
831
                if minute_idx == (num_minutes - 1):
832
                    break
833
834
                if minute_dt.hour == 13 and minute_dt.minute == 0:
835
                    next_minute = eastern_minutes[minute_idx + 1]
836
                    if next_minute.hour != 13:
837
                        # minute_dt is the last minute of an early close day
838
                        last_minute_idx_of_early_close_day.append(minute_idx)
839
840
            # spin through the list of early close markers, and use them to
841
            # chop off 180 minutes at a time from data_to_copy.
842
            for idx, early_close_minute_idx in \
843
                    enumerate(last_minute_idx_of_early_close_day):
844
                early_close_minute_idx -= (180 * idx)
845
                data_to_copy = np.delete(
846
                    data_to_copy,
847
                    range(
848
                        early_close_minute_idx + 1,
849
                        early_close_minute_idx + 181
850
                    )
851
                )
852
853
        return_data[0:len(data_to_copy)] = data_to_copy
854
855
        self._apply_all_adjustments(
856
            return_data,
857
            asset,
858
            minutes_for_window,
859
            field,
860
            self.MINUTE_PRICE_ADJUSTMENT_FACTOR
861
        )
862
863
        return return_data
864
865
    def _apply_all_adjustments(self, data, asset, dts, field,
866
                               price_adj_factor=1.0):
867
        """
868
        Internal method that applies all the necessary adjustments on the
869
        given data array.
870
871
        The adjustments are:
872
        - splits
873
        - if field != "volume":
874
            - mergers
875
            - dividends
876
            - * 0.001
877
            - any zero fields replaced with NaN
878
        - all values rounded to 3 digits after the decimal point.
879
880
        Parameters
881
        ----------
882
        data : np.array
883
            The data to be adjusted.
884
885
        asset: Asset
886
            The asset whose data is being adjusted.
887
888
        dts: pd.DateTimeIndex
889
            The list of minutes or days representing the desired window.
890
891
        field: string
892
            The field whose values are in the data array.
893
894
        price_adj_factor: float
895
            Factor with which to adjust OHLC values.
896
        Returns
897
        -------
898
        None.  The data array is modified in place.
899
        """
900
        self._apply_adjustments_to_window(
901
            self._get_adjustment_list(
902
                asset, self._splits_dict, "SPLITS"
903
            ),
904
            data,
905
            dts,
906
            field != 'volume'
907
        )
908
909
        if field != 'volume':
910
            self._apply_adjustments_to_window(
911
                self._get_adjustment_list(
912
                    asset, self._mergers_dict, "MERGERS"
913
                ),
914
                data,
915
                dts,
916
                True
917
            )
918
919
            self._apply_adjustments_to_window(
920
                self._get_adjustment_list(
921
                    asset, self._dividends_dict, "DIVIDENDS"
922
                ),
923
                data,
924
                dts,
925
                True
926
            )
927
928
            data *= price_adj_factor
929
930
            # if anything is zero, it's a missing bar, so replace it with NaN.
931
            # we only want to do this for non-volume fields, because a missing
932
            # volume should be 0.
933
            data[data == 0] = np.NaN
934
935
        np.around(data, 3, out=data)
936
937
    @staticmethod
938
    def _find_position_of_minute(minute_dt):
939
        """
940
        Internal method that returns the position of the given minute in the
941
        list of every trading minute since market open on 1/2/2002.
942
943
        IMPORTANT: This method assumes every day is 390 minutes long, even
944
        early closes.  Our minute bcolz files are generated like this to
945
        support fast lookup.
946
947
        ex. this method would return 2 for 1/2/2002 9:32 AM Eastern.
948
949
        Parameters
950
        ----------
951
        minute_dt: pd.Timestamp
952
            The minute whose position should be calculated.
953
954
        Returns
955
        -------
956
        The position of the given minute in the list of all trading minutes
957
        since market open on 1/2/2002.
958
        """
959
        day = minute_dt.date()
960
        day_idx = tradingcalendar.trading_days.searchsorted(day) -\
961
            INDEX_OF_FIRST_TRADING_DAY
962
        if day_idx < 0:
963
            return -1
964
965
        day_open = pd.Timestamp(
966
            datetime(
967
                year=day.year,
968
                month=day.month,
969
                day=day.day,
970
                hour=9,
971
                minute=31),
972
            tz='US/Eastern').tz_convert('UTC')
973
974
        minutes_offset = int((minute_dt - day_open).total_seconds()) / 60
975
976
        return int((390 * day_idx) + minutes_offset)
977
978
    def _get_daily_window_for_sid(self, asset, field, days_in_window,
979
                                  extra_slot=True):
980
        """
981
        Internal method that gets a window of adjusted daily data for a sid
982
        and specified date range.  Used to support the history API method for
983
        daily bars.
984
985
        Parameters
986
        ----------
987
        asset : Asset
988
            The asset whose data is desired.
989
990
        start_dt: pandas.Timestamp
991
            The start of the desired window of data.
992
993
        bar_count: int
994
            The number of days of data to return.
995
996
        field: string
997
            The specific field to return.  "open", "high", "close_price", etc.
998
999
        extra_slot: boolean
1000
            Whether to allocate an extra slot in the returned numpy array.
1001
            This extra slot will hold the data for the last partial day.  It's
1002
            much better to create it here than to create a copy of the array
1003
            later just to add a slot.
1004
1005
        Returns
1006
        -------
1007
        A numpy array with requested values.  Any missing slots filled with
1008
        nan.
1009
1010
        """
1011
        column = US_EQUITY_COLUMNS[field]
1012
        data = self._daily_bar_reader.load_raw_arrays([column],
1013
                                                      days_in_window[0],
1014
                                                      days_in_window[-1],
1015
                                                      [asset])
1016
1017
        bar_count = len(days_in_window)
1018
1019
        # create an np.array of size bar_count
1020
        if extra_slot:
1021
            return_array = np.zeros((bar_count + 1,))
1022
        else:
1023
            return_array = np.zeros((bar_count,))
1024
1025
        return_array[:] = np.NAN
1026
        sid = int(asset)
1027
1028
        return_array[0:bar_count] = data[0].T[0]
1029
1030
        self._apply_all_adjustments(
1031
            return_array,
1032
            sid,
1033
            days_in_window,
1034
            field,
1035
        )
1036
1037
        return return_array
1038
1039
    @staticmethod
1040
    def _apply_adjustments_to_window(adjustments_list, window_data,
1041
                                     dts_in_window, multiply):
1042
        if len(adjustments_list) == 0:
1043
            return
1044
1045
        # advance idx to the correct spot in the adjustments list, based on
1046
        # when the window starts
1047
        idx = 0
1048
1049
        while idx < len(adjustments_list) and dts_in_window[0] >\
1050
                adjustments_list[idx][0]:
1051
            idx += 1
1052
1053
        # if we've advanced through all the adjustments, then there's nothing
1054
        # to do.
1055
        if idx == len(adjustments_list):
1056
            return
1057
1058
        while idx < len(adjustments_list):
1059
            adjustment_to_apply = adjustments_list[idx]
1060
1061
            if adjustment_to_apply[0] > dts_in_window[-1]:
1062
                break
1063
1064
            range_end = dts_in_window.searchsorted(adjustment_to_apply[0])
1065
            if multiply:
1066
                window_data[0:range_end] *= adjustment_to_apply[1]
1067
            else:
1068
                window_data[0:range_end] /= adjustment_to_apply[1]
1069
1070
            idx += 1
1071
1072
    def _get_adjustment_list(self, asset, adjustments_dict, table_name):
1073
        """
1074
        Internal method that returns a list of adjustments for the given sid.
1075
1076
        Parameters
1077
        ----------
1078
        asset : Asset
1079
            The asset for which to return adjustments.
1080
1081
        adjustments_dict: dict
1082
            A dictionary of sid -> list that is used as a cache.
1083
1084
        table_name: string
1085
            The table that contains this data in the adjustments db.
1086
1087
        Returns
1088
        -------
1089
        adjustments: list
1090
            A list of [multiplier, pd.Timestamp], earliest first
1091
1092
        """
1093
        if self._adjustment_reader is None:
1094
            return []
1095
1096
        sid = int(asset)
1097
1098
        try:
1099
            adjustments = adjustments_dict[sid]
1100
        except KeyError:
1101
            adjustments = adjustments_dict[sid] = self._adjustment_reader.\
1102
                get_adjustments_for_sid(table_name, sid)
1103
1104
        return adjustments
1105
1106
    def get_equity_price_view(self, asset):
1107
        """
1108
        Returns a DataPortalSidView for the given asset.  Used to support the
1109
        data[sid(N)] public API.  Not needed if DataPortal is used standalone.
1110
1111
        Parameters
1112
        ----------
1113
        asset : Asset
1114
            Asset that is being queried.
1115
1116
        Returns
1117
        -------
1118
        DataPortalSidView: Accessor into the given asset's data.
1119
        """
1120
        try:
1121
            view = self.views[asset]
1122
        except KeyError:
1123
            view = self.views[asset] = DataPortalSidView(asset, self)
1124
1125
        return view
1126
1127
    def _check_is_currently_alive(self, asset, dt):
1128
        if dt is None:
1129
            dt = self.current_day
1130
1131
        sid = int(asset)
1132
1133
        if sid not in self._asset_start_dates:
1134
            self._get_asset_start_date(asset)
1135
1136
        start_date = self._asset_start_dates[sid]
1137
        if self._asset_start_dates[sid] > dt:
1138
            raise NoTradeDataAvailableTooEarly(
1139
                sid=sid,
1140
                dt=dt,
1141
                start_dt=start_date
1142
            )
1143
1144
        end_date = self._asset_end_dates[sid]
1145
        if self._asset_end_dates[sid] < dt:
1146
            raise NoTradeDataAvailableTooLate(
1147
                sid=sid,
1148
                dt=dt,
1149
                end_dt=end_date
1150
            )
1151
1152
    def _get_asset_start_date(self, asset):
1153
        self._ensure_asset_dates(asset)
1154
        return self._asset_start_dates[asset]
1155
1156
    def _get_asset_end_date(self, asset):
1157
        self._ensure_asset_dates(asset)
1158
        return self._asset_end_dates[asset]
1159
1160
    def _ensure_asset_dates(self, asset):
1161
        sid = int(asset)
1162
1163
        if sid not in self._asset_start_dates:
1164
            self._asset_start_dates[sid] = asset.start_date
1165
            self._asset_end_dates[sid] = asset.end_date
1166
1167
    def get_splits(self, sids, dt):
1168
        """
1169
        Returns any splits for the given sids and the given dt.
1170
1171
        Parameters
1172
        ----------
1173
        sids : list
1174
            Sids for which we want splits.
1175
1176
        dt: pd.Timestamp
1177
            The date for which we are checking for splits.  Note: this is
1178
            expected to be midnight UTC.
1179
1180
        Returns
1181
        -------
1182
        list: List of splits, where each split is a (sid, ratio) tuple.
1183
        """
1184
        if self._adjustment_reader is None or len(sids) == 0:
1185
            return {}
1186
1187
        # convert dt to # of seconds since epoch, because that's what we use
1188
        # in the adjustments db
1189
        seconds = int(dt.value / 1e9)
1190
1191
        splits = self._adjustment_reader.conn.execute(
1192
            "SELECT sid, ratio FROM SPLITS WHERE effective_date = ?",
1193
            (seconds,)).fetchall()
1194
1195
        sids_set = set(sids)
1196
        splits = [split for split in splits if split[0] in sids_set]
1197
1198
        return splits
1199
1200
    def get_stock_dividends(self, sid, trading_days):
1201
        """
1202
        Returns all the stock dividends for a specific sid that occur
1203
        in the given trading range.
1204
1205
        Parameters
1206
        ----------
1207
        sid: int
1208
            The asset whose stock dividends should be returned.
1209
1210
        trading_days: pd.DatetimeIndex
1211
            The trading range.
1212
1213
        Returns
1214
        -------
1215
        list: A list of objects with all relevant attributes populated.
1216
        All timestamp fields are converted to pd.Timestamps.
1217
        """
1218
1219
        if self._adjustment_reader is None:
1220
            return []
1221
1222
        if len(trading_days) == 0:
1223
            return []
1224
1225
        start_dt = trading_days[0].value / 1e9
1226
        end_dt = trading_days[-1].value / 1e9
1227
1228
        dividends = self._adjustment_reader.conn.execute(
1229
            "SELECT * FROM stock_dividend_payouts WHERE sid = ? AND "
1230
            "ex_date > ? AND pay_date < ?", (int(sid), start_dt, end_dt,)).\
1231
            fetchall()
1232
1233
        dividend_info = []
1234
        for dividend_tuple in dividends:
1235
            dividend_info.append({
1236
                "declared_date": dividend_tuple[1],
1237
                "ex_date": pd.Timestamp(dividend_tuple[2], unit="s"),
1238
                "pay_date": pd.Timestamp(dividend_tuple[3], unit="s"),
1239
                "payment_sid": dividend_tuple[4],
1240
                "ratio": dividend_tuple[5],
1241
                "record_date": pd.Timestamp(dividend_tuple[6], unit="s"),
1242
                "sid": dividend_tuple[7]
1243
            })
1244
1245
        return dividend_info
1246
1247
    def contains(self, asset, field):
1248
        return field in BASE_FIELDS or \
1249
            (field in self._augmented_sources_map and
1250
             asset in self._augmented_sources_map[field])
1251
1252
    def get_fetcher_assets(self):
1253
        """
1254
        Returns a list of assets for the current date, as defined by the
1255
        fetcher data.
1256
1257
        Notes
1258
        -----
1259
        Data is forward-filled.  If there is no fetcher data defined for day
1260
        N, we use day N-1's data (if available, otherwise we keep going back).
1261
1262
        Returns
1263
        -------
1264
        list: a list of Asset objects.
1265
        """
1266
        # return a list of assets for the current date, as defined by the
1267
        # fetcher source
1268
        if self._fetcher_df is None:
1269
            return []
1270
1271
        if self.current_day in self._fetcher_df.index:
1272
            date_to_use = self.current_day
1273
        else:
1274
            # current day isn't in the fetcher df, go back the last
1275
            # available day
1276
            idx = self._fetcher_df.index.searchsorted(self.current_day)
1277
            if idx == 0:
1278
                return []
1279
1280
            date_to_use = self._fetcher_df.index[idx - 1]
1281
1282
        asset_list = self._fetcher_df.loc[date_to_use]["sid"]
1283
1284
        # make sure they're actually assets
1285
        asset_list = [asset for asset in asset_list
1286
                      if isinstance(asset, Asset)]
1287
1288
        return asset_list
1289
1290
1291
class DataPortalSidView(object):
1292
    def __init__(self, asset, portal):
1293
        self.asset = asset
1294
        self.portal = portal
1295
1296
    def __getattr__(self, column):
1297
        return self.portal.get_spot_value(self.asset, column)
1298
1299
    def __contains__(self, column):
1300
        return self.portal.contains(self.asset, column)
1301
1302
    def __getitem__(self, column):
1303
        return self.__getattr__(column)
1304