Completed
Pull Request — master (#858)
by
unknown
02:43 queued 37s
created

zipline.data.DataPortal._get_asset_start_date()   A

Complexity

Conditions 1

Size

Total Lines 3

Duplication

Lines 0
Ratio 0 %
Metric Value
cc 1
dl 0
loc 3
rs 10
1
#
2
# Copyright 2015 Quantopian, Inc.
3
#
4
# Licensed under the Apache License, Version 2.0 (the "License");
5
# you may not use this file except in compliance with the License.
6
# You may obtain a copy of the License at
7
#
8
#     http://www.apache.org/licenses/LICENSE-2.0
9
#
10
# Unless required by applicable law or agreed to in writing, software
11
# distributed under the License is distributed on an "AS IS" BASIS,
12
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13
# See the License for the specific language governing permissions and
14
# limitations under the License.
15
16
from datetime import datetime
17
import bcolz
18
from logbook import Logger
19
20
import numpy as np
21
import pandas as pd
22
from pandas.tslib import normalize_date
23
from six import iteritems
24
25
from zipline.assets import Asset, Future, Equity
26
from zipline.data.us_equity_pricing import (
27
    BcolzDailyBarReader,
28
    NoDataOnDate
29
)
30
31
from zipline.utils import tradingcalendar
32
from zipline.errors import (
33
    NoTradeDataAvailableTooEarly,
34
    NoTradeDataAvailableTooLate
35
)
36
37
# FIXME anything to do with 2002-01-02 probably belongs in qexec, right/
38
FIRST_TRADING_DAY = pd.Timestamp("2002-01-02 00:00:00", tz='UTC')
39
FIRST_TRADING_MINUTE = pd.Timestamp("2002-01-02 14:31:00", tz='UTC')
40
41
# FIXME should this be passed in (is this qexec specific?)?
42
INDEX_OF_FIRST_TRADING_DAY = 3028
43
44
log = Logger('DataPortal')
45
46
HISTORY_FREQUENCIES = ["1d", "1m"]
47
48
BASE_FIELDS = {
49
    'open': 'open',
50
    'open_price': 'open',
51
    'high': 'high',
52
    'low': 'low',
53
    'close': 'close',
54
    'close_price': 'close',
55
    'volume': 'volume',
56
    'price': 'close'
57
}
58
59
60
class DataPortal(object):
61
    def __init__(self,
62
                 env,
63
                 sim_params=None,
64
                 minutes_equities_path=None,
65
                 daily_equities_path=None,
66
                 adjustment_reader=None,
67
                 equity_sid_path_func=None,
68
                 futures_sid_path_func=None):
69
        self.env = env
70
71
        # Internal pointers to the current dt (can be minute) and current day.
72
        # In daily mode, they point to the same thing. In minute mode, it's
73
        # useful to have separate pointers to the current day and to the
74
        # current minute.  These pointers are updated by the
75
        # AlgorithmSimulator's transform loop.
76
        self.current_dt = None
77
        self.current_day = None
78
79
        # This is a bit ugly, but is here for performance reasons.  In minute
80
        # simulations, we need to very quickly go from dt -> (# of minutes
81
        # since Jan 1 2002 9:30 Eastern).
82
        #
83
        # The clock that heartbeats the simulation has all the necessary
84
        # information to do this calculation very quickly.  This value is
85
        # calculated there, and then set here
86
        self.cur_data_offset = 0
87
88
        self.views = {}
89
90
        self._minutes_equities_path = minutes_equities_path
91
        self._daily_equities_path = daily_equities_path
92
        self._asset_finder = env.asset_finder
93
94
        self._carrays = {
95
            'open': {},
96
            'high': {},
97
            'low': {},
98
            'close': {},
99
            'volume': {},
100
            'sid': {},
101
            'dt': {},
102
        }
103
104
        self._adjustment_reader = adjustment_reader
105
106
        # caches of sid -> adjustment list
107
        self._splits_dict = {}
108
        self._mergers_dict = {}
109
        self._dividends_dict = {}
110
111
        # Pointer to the daily bcolz file.
112
        self._daily_equities_data = None
113
114
        # Cache of sid -> the first trading day of an asset, even if that day
115
        # is before 1/2/2002.
116
        self._asset_start_dates = {}
117
        self._asset_end_dates = {}
118
119
        # Fetcher state
120
        self._augmented_sources_map = {}
121
        self._fetcher_df = None
122
123
        self._sim_params = sim_params
124
        if self._sim_params is not None:
125
            self._data_frequency = self._sim_params.data_frequency
126
        else:
127
            self._data_frequency = "minute"
128
129
        self._equity_sid_path_func = equity_sid_path_func
130
        self._futures_sid_path_func = futures_sid_path_func
131
132
        self.DAILY_PRICE_ADJUSTMENT_FACTOR = 0.001
133
        self.MINUTE_PRICE_ADJUSTMENT_FACTOR = 0.001
134
135
        if daily_equities_path is not None:
136
            self._daily_bar_reader = BcolzDailyBarReader(daily_equities_path)
137
        else:
138
            self._daily_bar_reader = None
139
140
    def handle_extra_source(self, source_df):
141
        """
142
        Extra sources always have a sid column.
143
144
        We expand the given data (by forward filling) to the full range of
145
        the simulation dates, so that lookup is fast during simulation.
146
        """
147
        if source_df is None:
148
            return
149
150
        self._fetcher_df = source_df
151
152
        # source_df's sid column can either consist of assets we know about
153
        # (such as sid(24)) or of assets we don't know about (such as
154
        # palladium).
155
        #
156
        # In both cases, we break up the dataframe into individual dfs
157
        # that only contain a single asset's information.  ie, if source_df
158
        # has data for PALLADIUM and GOLD, we split source_df into two
159
        # dataframes, one for each. (same applies if source_df has data for
160
        # AAPL and IBM).
161
        #
162
        # We then take each child df and reindex it to the simulation's date
163
        # range by forward-filling missing values. this makes reads simpler.
164
        #
165
        # Finally, we store the data. For each fetcher column, we store a
166
        # mapping in self.augmented_sources_map from it to a dictionary of
167
        # asset -> df.  In other words,
168
        # self.augmented_sources_map['days_to_cover']['AAPL'] gives us the df
169
        # holding that data.
170
171
        if self._sim_params.emission_rate == "daily":
172
            fetcher_date_index = self.env.days_in_range(
173
                start=self._sim_params.period_start,
174
                end=self._sim_params.period_end
175
            )
176
        else:
177
            fetcher_date_index = self.env.minutes_for_days_in_range(
178
                start=self._sim_params.period_start,
179
                end=self._sim_params.period_end
180
            )
181
182
        # break the source_df up into one dataframe per sid.  this lets
183
        # us (more easily) calculate accurate start/end dates for each sid,
184
        # de-dup data, and expand the data to fit the backtest start/end date.
185
        grouped_by_sid = source_df.groupby(["sid"])
186
        group_names = grouped_by_sid.groups.keys()
187
        group_dict = {}
188
        for group_name in group_names:
189
            group_dict[group_name] = grouped_by_sid.get_group(group_name)
190
191
        for identifier, df in iteritems(group_dict):
192
            # before reindexing, save the earliest and latest dates
193
            earliest_date = df.index[0]
194
            latest_date = df.index[-1]
195
196
            # since we know this df only contains a single sid, we can safely
197
            # de-dupe by the index (dt)
198
            df = df.groupby(level=0).last()
199
200
            # reindex the dataframe based on the backtest start/end date.
201
            # this makes reads easier during the backtest.
202
            df = df.reindex(index=fetcher_date_index, method='ffill')
203
204
            if not isinstance(identifier, Asset):
205
                # for fake assets we need to store a start/end date
206
                self._asset_start_dates[identifier] = earliest_date
207
                self._asset_end_dates[identifier] = latest_date
208
209
            for col_name in df.columns.difference(['sid']):
210
                if col_name not in self._augmented_sources_map:
211
                    self._augmented_sources_map[col_name] = {}
212
213
                self._augmented_sources_map[col_name][identifier] = df
214
215
    def _open_daily_file(self):
216
        if self._daily_equities_data is None:
217
            self._daily_equities_data = bcolz.open(self._daily_equities_path)
218
            self.daily_equities_attrs = self._daily_equities_data.attrs
219
220
        return self._daily_equities_data, self.daily_equities_attrs
221
222
    def _open_minute_file(self, field, asset):
223
        sid_str = str(int(asset))
224
225
        try:
226
            carray = self._carrays[field][sid_str]
227
        except KeyError:
228
            carray = self._carrays[field][sid_str] = \
229
                self._get_ctable(asset)[field]
230
231
        return carray
232
233
    def _get_ctable(self, asset):
234
        sid = int(asset)
235
236
        if isinstance(asset, Future) and \
237
                self._futures_sid_path_func is not None:
238
            path = self._futures_sid_path_func(
239
                self._minutes_equities_path, sid
240
            )
241
        elif isinstance(asset, Equity) and \
242
                self._equity_sid_path_func is not None:
243
            path = self._equity_sid_path_func(
244
                self._minutes_equities_path, sid
245
            )
246
        else:
247
            path = "{0}/{1}.bcolz".format(self._minutes_equities_path, sid)
248
249
        return bcolz.open(path, mode='r')
250
251
    def get_previous_value(self, asset, field, dt):
252
        """
253
        Given an asset and a column and a dt, returns the previous value for
254
        the same asset/column pair.  If this data portal is in minute mode,
255
        it's the previous minute value, otherwise it's the previous day's
256
        value.
257
258
        Parameters
259
        ---------
260
        asset : Asset
261
            The asset whose data is desired.
262
263
        field: string
264
            The desired field of the asset.  Valid values are "open",
265
            "open_price", "high", "low", "close", "close_price", "volume", and
266
            "price".
267
268
        dt: pd.Timestamp
269
            The timestamp from which to go back in time one slot.
270
271
        Returns
272
        -------
273
        The value of the desired field at the desired time.
274
        """
275
        if self._data_frequency == 'daily':
276
            prev_dt = self.env.previous_trading_day(dt)
277
        elif self._data_frequency == 'minute':
278
            prev_dt = self.env.previous_market_minute(dt)
279
280
        return self.get_spot_value(asset, field, prev_dt)
281
282
    def _check_fetcher(self, asset, column, day):
283
        # if there is a fetcher column called "price", only look at it if
284
        # it's on something like palladium and not AAPL (since our own price
285
        # data always wins when dealing with assets)
286
        look_in_augmented_sources = column in self._augmented_sources_map and \
287
            not (column in BASE_FIELDS and isinstance(asset, Asset))
288
289
        if look_in_augmented_sources:
290
            # we're being asked for a fetcher field
291
            try:
292
                return self._augmented_sources_map[column][asset].\
293
                    loc[day, column]
294
            except:
295
                log.error(
296
                    "Could not find value for asset={0}, current_day={1},"
297
                    "column={2}".format(
298
                        str(asset),
299
                        str(self.current_day),
300
                        str(column)))
301
302
                raise KeyError
303
304
    def get_spot_value(self, asset, field, dt=None):
305
        """
306
        Public API method that returns a scalar value representing the value
307
        of the desired asset's field at either the given dt, or this data
308
        portal's current_dt.
309
310
        Parameters
311
        ---------
312
        asset : Asset
313
            The asset whose data is desired.
314
315
        field: string
316
            The desired field of the asset.  Valid values are "open",
317
            "open_price", "high", "low", "close", "close_price", "volume", and
318
            "price".
319
320
        dt: pd.Timestamp
321
            (Optional) The timestamp for the desired value.
322
323
        Returns
324
        -------
325
        The value of the desired field at the desired time.
326
        """
327
        fetcher_val = self._check_fetcher(asset, field,
328
                                          (dt or self.current_dt))
329
330
        if fetcher_val is not None:
331
            return fetcher_val
332
333
        if field not in BASE_FIELDS:
334
            raise KeyError("Invalid column: " + str(field))
335
336
        column_to_use = BASE_FIELDS[field]
337
338
        if isinstance(asset, int):
339
            asset = self._asset_finder.retrieve_asset(asset)
340
341
        self._check_is_currently_alive(asset, dt)
342
343
        if self._data_frequency == "daily":
344
            day_to_use = dt or self.current_day
345
            day_to_use = normalize_date(day_to_use)
346
            return self._get_daily_data(asset, column_to_use, day_to_use)
347
        else:
348
            dt_to_use = dt or self.current_dt
349
350
            if isinstance(asset, Future):
351
                return self._get_minute_spot_value_future(
352
                    asset, column_to_use, dt_to_use)
353
            else:
354
                return self._get_minute_spot_value(
355
                    asset, column_to_use, dt_to_use)
356
357
    def _get_minute_spot_value_future(self, asset, column, dt):
358
        # Futures bcolz files have 1440 bars per day (24 hours), 7 days a week.
359
        # The file attributes contain the "start_dt" and "last_dt" fields,
360
        # which represent the time period for this bcolz file.
361
362
        # The start_dt is midnight of the first day that this future started
363
        # trading.
364
365
        # figure out the # of minutes between dt and this asset's start_dt
366
        start_date = self._get_asset_start_date(asset)
367
        minute_offset = int((dt - start_date).total_seconds() / 60)
368
369
        if minute_offset < 0:
370
            # asking for a date that is before the asset's start date, no dice
371
            return 0.0
372
373
        # then just index into the bcolz carray at that offset
374
        carray = self._open_minute_file(column, asset)
375
        result = carray[minute_offset]
376
377
        # if there's missing data, go backwards until we run out of file
378
        while result == 0 and minute_offset > 0:
379
            minute_offset -= 1
380
            result = carray[minute_offset]
381
382
        if column != 'volume':
383
            return result * self.MINUTE_PRICE_ADJUSTMENT_FACTOR
384
        else:
385
            return result
386
387
    def _get_minute_spot_value(self, asset, column, dt):
388
        # All our minute bcolz files are written starting on 1/2/2002,
389
        # with 390 minutes per day, regarding of when the security started
390
        # trading. This lets us avoid doing an offset calculation related
391
        # to the asset start date.  Hard-coding 390 minutes per day lets us
392
        # ignore half days.
393
394
        if dt == self.current_dt:
395
            minute_offset_to_use = self.cur_data_offset
396
        else:
397
            # this is the slow path.
398
            # dt was passed in, so calculate the offset.
399
            # = (390 * number of trading days since 1/2/2002) +
400
            #   (index of minute in day)
401
            given_day = pd.Timestamp(dt.date(), tz='utc')
402
            day_index = tradingcalendar.trading_days.searchsorted(
403
                given_day) - INDEX_OF_FIRST_TRADING_DAY
404
405
            # if dt is before the first market minute, minute_index
406
            # will be 0.  if it's after the last market minute, it'll
407
            # be len(minutes_for_day)
408
            minute_index = self.env.market_minutes_for_day(given_day).\
409
                searchsorted(dt)
410
411
            minute_offset_to_use = (day_index * 390) + minute_index
412
413
        carray = self._open_minute_file(column, asset)
414
        result = carray[minute_offset_to_use]
415
416
        if result == 0:
417
            # if the given minute doesn't have data, we need to seek
418
            # backwards until we find data. This makes the data
419
            # forward-filled.
420
421
            # get this asset's start date, so that we don't look before it.
422
            start_date = self._get_asset_start_date(asset)
423
            start_date_idx = tradingcalendar.trading_days.searchsorted(
424
                start_date) - INDEX_OF_FIRST_TRADING_DAY
425
            start_day_offset = start_date_idx * 390
426
427
            original_start = minute_offset_to_use
428
429
            while result == 0 and minute_offset_to_use > start_day_offset:
430
                minute_offset_to_use -= 1
431
                result = carray[minute_offset_to_use]
432
433
            # once we've found data, we need to check whether it needs
434
            # to be adjusted.
435
            if result != 0:
436
                minutes = self.env.market_minute_window(
437
                    start=(dt or self.current_dt),
438
                    count=(original_start - minute_offset_to_use + 1),
439
                    step=-1
440
                ).order()
441
442
                # only need to check for adjustments if we've gone back
443
                # far enough to cross the day boundary.
444
                if minutes[0].date() != minutes[-1].date():
445
                    # create a np array of size minutes, fill it all with
446
                    # the same value.  and adjust the array.
447
                    arr = np.array([result] * len(minutes),
448
                                   dtype=np.float64)
449
                    self._apply_all_adjustments(
450
                        data=arr,
451
                        asset=asset,
452
                        dts=minutes,
453
                        field=column
454
                    )
455
456
                    # The first value of the adjusted array is the value
457
                    # we want.
458
                    result = arr[0]
459
460
        if column != 'volume':
461
            return result * self.MINUTE_PRICE_ADJUSTMENT_FACTOR
462
        else:
463
            return result
464
465
    def _get_daily_data(self, asset, column, dt):
466
        while True:
467
            try:
468
                value = self._daily_bar_reader.spot_price(asset, dt, column)
469
                if value != -1:
470
                    return value
471
                else:
472
                    dt -= tradingcalendar.trading_day
473
            except NoDataOnDate:
474
                return 0
475
476
    def _get_history_daily_window(self, assets, end_dt, bar_count,
477
                                  field_to_use):
478
        """
479
        Internal method that returns a dataframe containing history bars
480
        of daily frequency for the given sids.
481
        """
482
        day_idx = tradingcalendar.trading_days.searchsorted(end_dt.date())
483
        days_for_window = tradingcalendar.trading_days[
484
            (day_idx - bar_count + 1):(day_idx + 1)]
485
486
        if len(assets) == 0:
487
            return pd.DataFrame(None,
488
                                index=days_for_window,
489
                                columns=None)
490
491
        data = []
492
493
        for asset in assets:
494
            if isinstance(asset, Future):
495
                data.append(self._get_history_daily_window_future(
496
                    asset, days_for_window, end_dt, field_to_use
497
                ))
498
            else:
499
                data.append(self._get_history_daily_window_equity(
500
                    asset, days_for_window, end_dt, field_to_use
501
                ))
502
503
        return pd.DataFrame(
504
            np.array(data).T,
505
            index=days_for_window,
506
            columns=assets
507
        )
508
509
    def _get_history_daily_window_future(self, asset, days_for_window,
510
                                         end_dt, column):
511
        # Since we don't have daily bcolz files for futures (yet), use minute
512
        # bars to calculate the daily values.
513
        data = []
514
        data_groups = []
515
516
        # get all the minutes for the days NOT including today
517
        for day in days_for_window[:-1]:
518
            minutes = self.env.market_minutes_for_day(day)
519
520
            values_for_day = np.zeros(len(minutes), dtype=np.float64)
521
522
            for idx, minute in enumerate(minutes):
523
                minute_val = self._get_minute_spot_value_future(
524
                    asset, column, minute
525
                )
526
527
                values_for_day[idx] = minute_val
528
529
            data_groups.append(values_for_day)
530
531
        # get the minutes for today
532
        last_day_minutes = pd.date_range(
533
            start=self.env.get_open_and_close(end_dt)[0],
534
            end=end_dt,
535
            freq="T"
536
        )
537
538
        values_for_last_day = np.zeros(len(last_day_minutes), dtype=np.float64)
539
540
        for idx, minute in enumerate(last_day_minutes):
541
            minute_val = self._get_minute_spot_value_future(
542
                asset, column, minute
543
            )
544
545
            values_for_last_day[idx] = minute_val
546
547
        data_groups.append(values_for_last_day)
548
549
        for group in data_groups:
550
            if len(group) == 0:
551
                continue
552
553
            if column == 'volume':
554
                data.append(np.sum(group))
555
            elif column == 'open':
556
                data.append(group[0])
557
            elif column == 'close':
558
                data.append(group[-1])
559
            elif column == 'high':
560
                data.append(np.amax(group))
561
            elif column == 'low':
562
                data.append(np.amin(group))
563
564
        return data
565
566
    def _get_history_daily_window_equity(self, asset, days_for_window,
567
                                         end_dt, field_to_use):
568
        sid = int(asset)
569
        ends_at_midnight = end_dt.hour == 0 and end_dt.minute == 0
570
571
        # get the start and end dates for this sid
572
        end_date = self._get_asset_end_date(asset)
573
574
        if ends_at_midnight or (days_for_window[-1] > end_date):
575
            # two cases where we use daily data for the whole range:
576
            # 1) the history window ends at midnight utc.
577
            # 2) the last desired day of the window is after the
578
            # last trading day, use daily data for the whole range.
579
            return self._get_daily_window_for_sid(
580
                asset,
581
                field_to_use,
582
                days_for_window,
583
                extra_slot=False
584
            )
585
        else:
586
            # for the last day of the desired window, use minute
587
            # data and aggregate it.
588
            all_minutes_for_day = self.env.market_minutes_for_day(
589
                pd.Timestamp(end_dt.date()))
590
591
            last_minute_idx = all_minutes_for_day.searchsorted(end_dt)
592
593
            # these are the minutes for the partial day
594
            minutes_for_partial_day =\
595
                all_minutes_for_day[0:(last_minute_idx + 1)]
596
597
            daily_data = self._get_daily_window_for_sid(
598
                sid,
599
                field_to_use,
600
                days_for_window[0:-1]
601
            )
602
603
            minute_data = self._get_minute_window_for_equity(
604
                sid,
605
                field_to_use,
606
                minutes_for_partial_day
607
            )
608
609
            if field_to_use == 'volume':
610
                minute_value = np.sum(minute_data)
611
            elif field_to_use == 'open':
612
                minute_value = minute_data[0]
613
            elif field_to_use == 'close':
614
                minute_value = minute_data[-1]
615
            elif field_to_use == 'high':
616
                minute_value = np.amax(minute_data)
617
            elif field_to_use == 'low':
618
                minute_value = np.amin(minute_data)
619
620
            # append the partial day.
621
            daily_data[-1] = minute_value
622
623
            return daily_data
624
625
    def _get_history_minute_window(self, assets, end_dt, bar_count,
626
                                   field_to_use):
627
        """
628
        Internal method that returns a dataframe containing history bars
629
        of minute frequency for the given sids.
630
        """
631
        # get all the minutes for this window
632
        minutes_for_window = self.env.market_minute_window(
633
            end_dt, bar_count, step=-1)[::-1]
634
635
        # but then cut it down to only the minutes after
636
        # FIRST_TRADING_MINUTE
637
        modified_minutes_for_window = minutes_for_window[
638
            minutes_for_window.slice_indexer(FIRST_TRADING_MINUTE)]
639
640
        modified_minutes_length = len(modified_minutes_for_window)
641
642
        if modified_minutes_length == 0:
643
            raise ValueError("Cannot calculate history window that ends"
644
                             "before 2002-01-02 14:31 UTC!")
645
646
        data = []
647
        bars_to_prepend = 0
648
        nans_to_prepend = None
649
650
        if modified_minutes_length < bar_count and \
651
           (modified_minutes_for_window[0] == FIRST_TRADING_MINUTE):
652
            # the beginning of the window goes before our global trading
653
            # start date
654
            bars_to_prepend = bar_count - modified_minutes_length
655
            nans_to_prepend = np.repeat(np.nan, bars_to_prepend)
656
657
        if len(assets) == 0:
658
            return pd.DataFrame(
659
                None,
660
                index=modified_minutes_for_window,
661
                columns=None
662
            )
663
664
        for asset in assets:
665
            asset_minute_data = self._get_minute_window_for_asset(
666
                asset,
667
                field_to_use,
668
                modified_minutes_for_window
669
            )
670
671
            if bars_to_prepend != 0:
672
                asset_minute_data = np.insert(asset_minute_data, 0,
673
                                              nans_to_prepend)
674
675
            data.append(asset_minute_data)
676
677
        return pd.DataFrame(
678
            np.array(data).T,
679
            index=minutes_for_window,
680
            columns=map(int, assets)
681
        )
682
683
    def get_history_window(self, assets, end_dt, bar_count, frequency, field,
684
                           ffill=True):
685
        """
686
        Public API method that returns a dataframe containing the requested
687
        history window.  Data is fully adjusted.
688
689
        Parameters
690
        ---------
691
        assets : list of zipline.data.Asset objects
692
            The assets whose data is desired.
693
694
        bar_count: int
695
            The number of bars desired.
696
697
        frequency: string
698
            "1d" or "1m"
699
700
        field: string
701
            The desired field of the asset.
702
703
        ffill: boolean
704
            Forward-fill missing values. Only has effect if field
705
            is 'price'.
706
707
        Returns
708
        -------
709
        A dataframe containing the requested data.
710
        """
711
        try:
712
            field_to_use = BASE_FIELDS[field]
713
        except KeyError:
714
            raise ValueError("Invalid history field: " + str(field))
715
716
        # sanity check in case sids were passed in
717
        assets = [(self.env.asset_finder.retrieve_asset(asset) if
718
                   isinstance(asset, int) else asset) for asset in assets]
719
720
        if frequency == "1d":
721
            df = self._get_history_daily_window(assets, end_dt, bar_count,
722
                                                field_to_use)
723
        elif frequency == "1m":
724
            df = self._get_history_minute_window(assets, end_dt, bar_count,
725
                                                 field_to_use)
726
        else:
727
            raise ValueError("Invalid frequency: {0}".format(frequency))
728
729
        # forward-fill if needed
730
        if field == "price" and ffill:
731
            df.fillna(method='ffill', inplace=True)
732
733
        return df
734
735
    def _get_minute_window_for_asset(self, asset, field, minutes_for_window):
736
        """
737
        Internal method that gets a window of adjusted minute data for an asset
738
        and specified date range.  Used to support the history API method for
739
        minute bars.
740
741
        Missing bars are filled with NaN.
742
743
        Parameters
744
        ----------
745
        asset : Asset
746
            The asset whose data is desired.
747
748
        field: string
749
            The specific field to return.  "open", "high", "close_price", etc.
750
751
        minutes_for_window: pd.DateTimeIndex
752
            The list of minutes representing the desired window.  Each minute
753
            is a pd.Timestamp.
754
755
        Returns
756
        -------
757
        A numpy array with requested values.
758
        """
759
        if isinstance(asset, int):
760
            asset = self.env.asset_finder.retrieve_asset(asset)
761
762
        if isinstance(asset, Future):
763
            return self._get_minute_window_for_future(asset, field,
764
                                                      minutes_for_window)
765
        else:
766
            return self._get_minute_window_for_equity(asset, field,
767
                                                      minutes_for_window)
768
769
    def _get_minute_window_for_future(self, asset, field, minutes_for_window):
770
        # THIS IS TEMPORARY.  For now, we are only exposing futures within
771
        # equity trading hours (9:30 am to 4pm, Eastern).  The easiest way to
772
        # do this is to simply do a spot lookup for each desired minute.
773
        return_data = np.zeros(len(minutes_for_window), dtype=np.float64)
774
        for idx, minute in enumerate(minutes_for_window):
775
            return_data[idx] = \
776
                self._get_minute_spot_value_future(asset, field, minute)
777
778
        # Note: an improvement could be to find the consecutive runs within
779
        # minutes_for_window, and use them to read the underlying ctable
780
        # more efficiently.
781
782
        # Once futures are on 24-hour clock, then we can just grab all the
783
        # requested minutes in one shot from the ctable.
784
785
        # no adjustments for futures, yay.
786
        return return_data
787
788
    def _get_minute_window_for_equity(self, asset, field, minutes_for_window):
789
        # each sid's minutes are stored in a bcolz file
790
        # the bcolz file has 390 bars per day, starting at 1/2/2002, regardless
791
        # of when the asset started trading and regardless of half days.
792
        # for a half day, the second half is filled with zeroes.
793
794
        # find the position of start_dt in the entire timeline, go back
795
        # bar_count bars, and that's the unadjusted data
796
        raw_data = self._open_minute_file(field, asset)
797
798
        start_idx = max(self._find_position_of_minute(minutes_for_window[0]),
799
                        0)
800
        end_idx = self._find_position_of_minute(minutes_for_window[-1]) + 1
801
802
        return_data = np.zeros(len(minutes_for_window), dtype=np.float64)
803
        data_to_copy = raw_data[start_idx:end_idx]
804
805
        num_minutes = len(minutes_for_window)
806
807
        # data_to_copy contains all the zeros (from 1pm to 4pm of an early
808
        # close).  num_minutes is the number of actual trading minutes.  if
809
        # these two have different lengths, that means that we need to trim
810
        # away data due to early closes.
811
        if len(data_to_copy) != num_minutes:
812
            # get a copy of the minutes in Eastern time, since we depend on
813
            # an early close being at 1pm Eastern.
814
            eastern_minutes = minutes_for_window.tz_convert("US/Eastern")
815
816
            # accumulate a list of indices of the last minute of an early
817
            # close day.  For example, if data_to_copy starts at 12:55 pm, and
818
            # there are five minutes of real data before 180 zeroes, we would
819
            # put 5 into last_minute_idx_of_early_close_day, because the fifth
820
            # minute is the last "real" minute of the day.
821
            last_minute_idx_of_early_close_day = []
822
            for minute_idx, minute_dt in enumerate(eastern_minutes):
823
                if minute_idx == (num_minutes - 1):
824
                    break
825
826
                if minute_dt.hour == 13 and minute_dt.minute == 0:
827
                    next_minute = eastern_minutes[minute_idx + 1]
828
                    if next_minute.hour != 13:
829
                        # minute_dt is the last minute of an early close day
830
                        last_minute_idx_of_early_close_day.append(minute_idx)
831
832
            # spin through the list of early close markers, and use them to
833
            # chop off 180 minutes at a time from data_to_copy.
834
            for idx, early_close_minute_idx in \
835
                    enumerate(last_minute_idx_of_early_close_day):
836
                early_close_minute_idx -= (180 * idx)
837
                data_to_copy = np.delete(
838
                    data_to_copy,
839
                    range(
840
                        early_close_minute_idx + 1,
841
                        early_close_minute_idx + 181
842
                    )
843
                )
844
845
        return_data[0:len(data_to_copy)] = data_to_copy
846
847
        self._apply_all_adjustments(
848
            return_data,
849
            asset,
850
            minutes_for_window,
851
            field,
852
            self.MINUTE_PRICE_ADJUSTMENT_FACTOR
853
        )
854
855
        return return_data
856
857
    def _apply_all_adjustments(self, data, asset, dts, field,
858
                               price_adj_factor=1.0):
859
        """
860
        Internal method that applies all the necessary adjustments on the
861
        given data array.
862
863
        The adjustments are:
864
        - splits
865
        - if field != "volume":
866
            - mergers
867
            - dividends
868
            - * 0.001
869
            - any zero fields replaced with NaN
870
        - all values rounded to 3 digits after the decimal point.
871
872
        Parameters
873
        ----------
874
        data : np.array
875
            The data to be adjusted.
876
877
        asset: Asset
878
            The asset whose data is being adjusted.
879
880
        dts: pd.DateTimeIndex
881
            The list of minutes or days representing the desired window.
882
883
        field: string
884
            The field whose values are in the data array.
885
886
        price_adj_factor: float
887
            Factor with which to adjust OHLC values.
888
        Returns
889
        -------
890
        None.  The data array is modified in place.
891
        """
892
        self._apply_adjustments_to_window(
893
            self._get_adjustment_list(
894
                asset, self._splits_dict, "SPLITS"
895
            ),
896
            data,
897
            dts,
898
            field != 'volume'
899
        )
900
901
        if field != 'volume':
902
            self._apply_adjustments_to_window(
903
                self._get_adjustment_list(
904
                    asset, self._mergers_dict, "MERGERS"
905
                ),
906
                data,
907
                dts,
908
                True
909
            )
910
911
            self._apply_adjustments_to_window(
912
                self._get_adjustment_list(
913
                    asset, self._dividends_dict, "DIVIDENDS"
914
                ),
915
                data,
916
                dts,
917
                True
918
            )
919
920
            data *= price_adj_factor
921
922
            # if anything is zero, it's a missing bar, so replace it with NaN.
923
            # we only want to do this for non-volume fields, because a missing
924
            # volume should be 0.
925
            data[data == 0] = np.NaN
926
927
        np.around(data, 3, out=data)
928
929
    @staticmethod
930
    def _find_position_of_minute(minute_dt):
931
        """
932
        Internal method that returns the position of the given minute in the
933
        list of every trading minute since market open on 1/2/2002.
934
935
        IMPORTANT: This method assumes every day is 390 minutes long, even
936
        early closes.  Our minute bcolz files are generated like this to
937
        support fast lookup.
938
939
        ex. this method would return 2 for 1/2/2002 9:32 AM Eastern.
940
941
        Parameters
942
        ----------
943
        minute_dt: pd.Timestamp
944
            The minute whose position should be calculated.
945
946
        Returns
947
        -------
948
        The position of the given minute in the list of all trading minutes
949
        since market open on 1/2/2002.
950
        """
951
        day = minute_dt.date()
952
        day_idx = tradingcalendar.trading_days.searchsorted(day) -\
953
            INDEX_OF_FIRST_TRADING_DAY
954
955
        day_open = pd.Timestamp(
956
            datetime(
957
                year=day.year,
958
                month=day.month,
959
                day=day.day,
960
                hour=9,
961
                minute=31),
962
            tz='US/Eastern').tz_convert('UTC')
963
964
        minutes_offset = int((minute_dt - day_open).total_seconds()) / 60
965
966
        return int((390 * day_idx) + minutes_offset)
967
968
    def _get_daily_window_for_sid(self, asset, field, days_in_window,
969
                                  extra_slot=True):
970
        """
971
        Internal method that gets a window of adjusted daily data for a sid
972
        and specified date range.  Used to support the history API method for
973
        daily bars.
974
975
        Parameters
976
        ----------
977
        asset : Asset
978
            The asset whose data is desired.
979
980
        start_dt: pandas.Timestamp
981
            The start of the desired window of data.
982
983
        bar_count: int
984
            The number of days of data to return.
985
986
        field: string
987
            The specific field to return.  "open", "high", "close_price", etc.
988
989
        extra_slot: boolean
990
            Whether to allocate an extra slot in the returned numpy array.
991
            This extra slot will hold the data for the last partial day.  It's
992
            much better to create it here than to create a copy of the array
993
            later just to add a slot.
994
995
        Returns
996
        -------
997
        A numpy array with requested values.  Any missing slots filled with
998
        nan.
999
1000
        """
1001
        daily_data, daily_attrs = self._open_daily_file()
1002
1003
        # the daily file stores each sid's daily OHLCV in a contiguous block.
1004
        # the first row per sid is either 1/2/2002, or the sid's start_date if
1005
        # it started after 1/2/2002.  once a sid stops trading, there are no
1006
        # rows for it.
1007
1008
        bar_count = len(days_in_window)
1009
1010
        # create an np.array of size bar_count
1011
        if extra_slot:
1012
            return_array = np.zeros((bar_count + 1,))
1013
        else:
1014
            return_array = np.zeros((bar_count,))
1015
1016
        return_array[:] = np.NAN
1017
        sid = int(asset)
1018
1019
        # find the start index in the daily file for this asset
1020
        asset_file_index = daily_attrs['first_row'][str(sid)]
1021
1022
        trading_days = tradingcalendar.trading_days
1023
1024
        # Calculate the starting day to use (either the asset's first trading
1025
        # day, or 1/1/2002 (which is the 3028th day in the trading calendar).
1026
        first_trading_day_to_use = max(trading_days.searchsorted(
1027
            self._asset_start_dates[asset]), INDEX_OF_FIRST_TRADING_DAY)
1028
1029
        # find the # of trading days between max(asset's first trade date,
1030
        # 2002-01-02) and start_dt
1031
        window_offset = (trading_days.searchsorted(days_in_window[0]) -
1032
                         first_trading_day_to_use)
1033
1034
        start_index = max(asset_file_index, asset_file_index + window_offset)
1035
1036
        if window_offset < 0 and (abs(window_offset) > bar_count):
1037
            # consumer is requesting a history window that starts AND ends
1038
            # before this equity started trading, so gtfo
1039
            return return_array
1040
1041
        # find the end index in the daily file. make sure it doesn't extend
1042
        # past the end of this asset's data in the daily file.
1043
        if window_offset < 0:
1044
            # if the window_offset is negative, we need to decrease the
1045
            # end_index accordingly.
1046
            end_index = min(start_index + window_offset + bar_count,
1047
                            daily_attrs['last_row'][str(sid)] + 1)
1048
1049
            # get data from bcolz file
1050
            data = daily_data[field][start_index:end_index]
1051
1052
            # have to leave a bunch of empty slots at the beginning of
1053
            # return_array, since they represent days before this asset
1054
            # started trading.
1055
            return_array[abs(window_offset):bar_count] = data
1056
        else:
1057
            end_index = min(start_index + bar_count,
1058
                            daily_attrs['last_row'][str(sid)])
1059
            data = daily_data[field][start_index:(end_index + 1)]
1060
1061
            if len(data) > len(return_array):
1062
                return_array[:] = data[0:len(return_array)]
1063
            else:
1064
                return_array[0:len(data)] = data
1065
1066
        self._apply_all_adjustments(
1067
            return_array,
1068
            sid,
1069
            days_in_window,
1070
            field,
1071
            self.DAILY_PRICE_ADJUSTMENT_FACTOR
1072
        )
1073
1074
        return return_array
1075
1076
    @staticmethod
1077
    def _apply_adjustments_to_window(adjustments_list, window_data,
1078
                                     dts_in_window, multiply):
1079
        if len(adjustments_list) == 0:
1080
            return
1081
1082
        # advance idx to the correct spot in the adjustments list, based on
1083
        # when the window starts
1084
        idx = 0
1085
1086
        while idx < len(adjustments_list) and dts_in_window[0] >\
1087
                adjustments_list[idx][0]:
1088
            idx += 1
1089
1090
        # if we've advanced through all the adjustments, then there's nothing
1091
        # to do.
1092
        if idx == len(adjustments_list):
1093
            return
1094
1095
        while idx < len(adjustments_list):
1096
            adjustment_to_apply = adjustments_list[idx]
1097
1098
            if adjustment_to_apply[0] > dts_in_window[-1]:
1099
                break
1100
1101
            range_end = dts_in_window.searchsorted(adjustment_to_apply[0])
1102
            if multiply:
1103
                window_data[0:range_end] *= adjustment_to_apply[1]
1104
            else:
1105
                window_data[0:range_end] /= adjustment_to_apply[1]
1106
1107
            idx += 1
1108
1109
    def _get_adjustment_list(self, asset, adjustments_dict, table_name):
1110
        """
1111
        Internal method that returns a list of adjustments for the given sid.
1112
1113
        Parameters
1114
        ----------
1115
        asset : Asset
1116
            The asset for which to return adjustments.
1117
1118
        adjustments_dict: dict
1119
            A dictionary of sid -> list that is used as a cache.
1120
1121
        table_name: string
1122
            The table that contains this data in the adjustments db.
1123
1124
        Returns
1125
        -------
1126
        adjustments: list
1127
            A list of [multiplier, pd.Timestamp], earliest first
1128
1129
        """
1130
        if self._adjustment_reader is None:
1131
            return []
1132
1133
        sid = int(asset)
1134
1135
        try:
1136
            adjustments = adjustments_dict[sid]
1137
        except KeyError:
1138
            adjustments = adjustments_dict[sid] = self._adjustment_reader.\
1139
                get_adjustments_for_sid(table_name, sid)
1140
1141
        return adjustments
1142
1143
    def get_equity_price_view(self, asset):
1144
        """
1145
        Returns a DataPortalSidView for the given asset.  Used to support the
1146
        data[sid(N)] public API.  Not needed if DataPortal is used standalone.
1147
1148
        Parameters
1149
        ----------
1150
        asset : Asset
1151
            Asset that is being queried.
1152
1153
        Returns
1154
        -------
1155
        DataPortalSidView: Accessor into the given asset's data.
1156
        """
1157
        try:
1158
            view = self.views[asset]
1159
        except KeyError:
1160
            view = self.views[asset] = DataPortalSidView(asset, self)
1161
1162
        return view
1163
1164
    def _check_is_currently_alive(self, asset, dt):
1165
        if dt is None:
1166
            dt = self.current_day
1167
1168
        sid = int(asset)
1169
1170
        if sid not in self._asset_start_dates:
1171
            self._get_asset_start_date(asset)
1172
1173
        start_date = self._asset_start_dates[sid]
1174
        if self._asset_start_dates[sid] > dt:
1175
            raise NoTradeDataAvailableTooEarly(
1176
                sid=sid,
1177
                dt=dt,
1178
                start_dt=start_date
1179
            )
1180
1181
        end_date = self._asset_end_dates[sid]
1182
        if self._asset_end_dates[sid] < dt:
1183
            raise NoTradeDataAvailableTooLate(
1184
                sid=sid,
1185
                dt=dt,
1186
                end_dt=end_date
1187
            )
1188
1189
    def _get_asset_start_date(self, asset):
1190
        self._ensure_asset_dates(asset)
1191
        return self._asset_start_dates[asset]
1192
1193
    def _get_asset_end_date(self, asset):
1194
        self._ensure_asset_dates(asset)
1195
        return self._asset_end_dates[asset]
1196
1197
    def _ensure_asset_dates(self, asset):
1198
        sid = int(asset)
1199
1200
        if sid not in self._asset_start_dates:
1201
            self._asset_start_dates[sid] = asset.start_date
1202
            self._asset_end_dates[sid] = asset.end_date
1203
1204
    def get_splits(self, sids, dt):
1205
        """
1206
        Returns any splits for the given sids and the given dt.
1207
1208
        Parameters
1209
        ----------
1210
        sids : list
1211
            Sids for which we want splits.
1212
1213
        dt: pd.Timestamp
1214
            The date for which we are checking for splits.  Note: this is
1215
            expected to be midnight UTC.
1216
1217
        Returns
1218
        -------
1219
        list: List of splits, where each split is a (sid, ratio) tuple.
1220
        """
1221
        if self._adjustment_reader is None or len(sids) == 0:
1222
            return {}
1223
1224
        # convert dt to # of seconds since epoch, because that's what we use
1225
        # in the adjustments db
1226
        seconds = int(dt.value / 1e9)
1227
1228
        splits = self._adjustment_reader.conn.execute(
1229
            "SELECT sid, ratio FROM SPLITS WHERE effective_date = ?",
1230
            (seconds,)).fetchall()
1231
1232
        sids_set = set(sids)
1233
        splits = [split for split in splits if split[0] in sids_set]
1234
1235
        return splits
1236
1237
    def get_stock_dividends(self, sid, trading_days):
1238
        """
1239
        Returns all the stock dividends for a specific sid that occur
1240
        in the given trading range.
1241
1242
        Parameters
1243
        ----------
1244
        sid: int
1245
            The asset whose stock dividends should be returned.
1246
1247
        trading_days: pd.DatetimeIndex
1248
            The trading range.
1249
1250
        Returns
1251
        -------
1252
        list: A list of objects with all relevant attributes populated.
1253
        All timestamp fields are converted to pd.Timestamps.
1254
        """
1255
1256
        if self._adjustment_reader is None:
1257
            return []
1258
1259
        if len(trading_days) == 0:
1260
            return []
1261
1262
        start_dt = trading_days[0].value / 1e9
1263
        end_dt = trading_days[-1].value / 1e9
1264
1265
        dividends = self._adjustment_reader.conn.execute(
1266
            "SELECT * FROM stock_dividend_payouts WHERE sid = ? AND "
1267
            "ex_date > ? AND pay_date < ?", (int(sid), start_dt, end_dt,)).\
1268
            fetchall()
1269
1270
        dividend_info = []
1271
        for dividend_tuple in dividends:
1272
            dividend_info.append({
1273
                "declared_date": dividend_tuple[1],
1274
                "ex_date": pd.Timestamp(dividend_tuple[2], unit="s"),
1275
                "pay_date": pd.Timestamp(dividend_tuple[3], unit="s"),
1276
                "payment_sid": dividend_tuple[4],
1277
                "ratio": dividend_tuple[5],
1278
                "record_date": pd.Timestamp(dividend_tuple[6], unit="s"),
1279
                "sid": dividend_tuple[7]
1280
            })
1281
1282
        return dividend_info
1283
1284
    def contains(self, asset, field):
1285
        return field in BASE_FIELDS or \
1286
            (field in self._augmented_sources_map and
1287
             asset in self._augmented_sources_map[field])
1288
1289
    def get_fetcher_assets(self):
1290
        """
1291
        Returns a list of assets for the current date, as defined by the
1292
        fetcher data.
1293
1294
        Notes
1295
        -----
1296
        Data is forward-filled.  If there is no fetcher data defined for day
1297
        N, we use day N-1's data (if available, otherwise we keep going back).
1298
1299
        Returns
1300
        -------
1301
        list: a list of Asset objects.
1302
        """
1303
        # return a list of assets for the current date, as defined by the
1304
        # fetcher source
1305
        if self._fetcher_df is None:
1306
            return []
1307
1308
        if self.current_day in self._fetcher_df.index:
1309
            date_to_use = self.current_day
1310
        else:
1311
            # current day isn't in the fetcher df, go back the last
1312
            # available day
1313
            idx = self._fetcher_df.index.searchsorted(self.current_day)
1314
            if idx == 0:
1315
                return []
1316
1317
            date_to_use = self._fetcher_df.index[idx - 1]
1318
1319
        asset_list = self._fetcher_df.loc[date_to_use]["sid"]
1320
1321
        # make sure they're actually assets
1322
        asset_list = [asset for asset in asset_list
1323
                      if isinstance(asset, Asset)]
1324
1325
        return asset_list
1326
1327
1328
class DataPortalSidView(object):
1329
    def __init__(self, asset, portal):
1330
        self.asset = asset
1331
        self.portal = portal
1332
1333
    def __getattr__(self, column):
1334
        return self.portal.get_spot_value(self.asset, column)
1335
1336
    def __contains__(self, column):
1337
        return self.portal.contains(self.asset, column)
1338
1339
    def __getitem__(self, column):
1340
        return self.__getattr__(column)
1341