Completed
Pull Request — master (#858)
by Eddie
01:46
created

zipline.data.DataPortal.get_stock_dividends()   B

Complexity

Conditions 4

Size

Total Lines 46

Duplication

Lines 0
Ratio 0 %
Metric Value
cc 4
dl 0
loc 46
rs 8.6316
1
#
2
# Copyright 2015 Quantopian, Inc.
3
#
4
# Licensed under the Apache License, Version 2.0 (the "License");
5
# you may not use this file except in compliance with the License.
6
# You may obtain a copy of the License at
7
#
8
#     http://www.apache.org/licenses/LICENSE-2.0
9
#
10
# Unless required by applicable law or agreed to in writing, software
11
# distributed under the License is distributed on an "AS IS" BASIS,
12
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13
# See the License for the specific language governing permissions and
14
# limitations under the License.
15
16
import bcolz
17
from logbook import Logger
18
19
import numpy as np
20
import pandas as pd
21
from pandas.tslib import normalize_date
22
from six import iteritems
23
24
from zipline.assets import Asset, Future, Equity
25
from zipline.data.us_equity_pricing import NoDataOnDate
26
27
from zipline.utils import tradingcalendar
28
from zipline.errors import (
29
    NoTradeDataAvailableTooEarly,
30
    NoTradeDataAvailableTooLate
31
)
32
33
log = Logger('DataPortal')
34
35
BASE_FIELDS = {
36
    'open': 'open',
37
    'open_price': 'open',
38
    'high': 'high',
39
    'low': 'low',
40
    'close': 'close',
41
    'close_price': 'close',
42
    'volume': 'volume',
43
    'price': 'close'
44
}
45
46
HISTORY_FREQUENCIES = set(["1m", "1d"])
47
48
49
class DataPortal(object):
50
    def __init__(self,
51
                 env,
52
                 equity_daily_reader=None,
53
                 equity_minute_reader=None,
54
                 future_daily_reader=None,
55
                 future_minute_reader=None,
56
                 adjustment_reader=None):
57
        self.env = env
58
59
        self.views = {}
60
61
        self._asset_finder = env.asset_finder
62
63
        self._carrays = {
64
            'open': {},
65
            'high': {},
66
            'low': {},
67
            'close': {},
68
            'volume': {},
69
            'sid': {},
70
            'dt': {},
71
        }
72
73
        self._adjustment_reader = adjustment_reader
74
75
        # caches of sid -> adjustment list
76
        self._splits_dict = {}
77
        self._mergers_dict = {}
78
        self._dividends_dict = {}
79
80
        # Cache of sid -> the first trading day of an asset, even if that day
81
        # is before 1/2/2002.
82
        self._asset_start_dates = {}
83
        self._asset_end_dates = {}
84
85
        # Handle extra sources, like Fetcher.
86
        self._augmented_sources_map = {}
87
        self._extra_source_df = None
88
89
        self.MINUTE_PRICE_ADJUSTMENT_FACTOR = 0.001
90
91
        self._equity_daily_reader = equity_daily_reader
92
        self._equity_minute_reader = equity_minute_reader
93
        self._future_daily_reader = future_daily_reader
94
        self._future_minute_reader = future_minute_reader
95
96
    def handle_extra_source(self, source_df, sim_params):
97
        """
98
        Extra sources always have a sid column.
99
100
        We expand the given data (by forward filling) to the full range of
101
        the simulation dates, so that lookup is fast during simulation.
102
        """
103
        if source_df is None:
104
            return
105
106
        self._extra_source_df = source_df
107
108
        # source_df's sid column can either consist of assets we know about
109
        # (such as sid(24)) or of assets we don't know about (such as
110
        # palladium).
111
        #
112
        # In both cases, we break up the dataframe into individual dfs
113
        # that only contain a single asset's information.  ie, if source_df
114
        # has data for PALLADIUM and GOLD, we split source_df into two
115
        # dataframes, one for each. (same applies if source_df has data for
116
        # AAPL and IBM).
117
        #
118
        # We then take each child df and reindex it to the simulation's date
119
        # range by forward-filling missing values. this makes reads simpler.
120
        #
121
        # Finally, we store the data. For each column, we store a mapping in
122
        # self.augmented_sources_map from the column to a dictionary of
123
        # asset -> df.  In other words,
124
        # self.augmented_sources_map['days_to_cover']['AAPL'] gives us the df
125
        # holding that data.
126
127
        if sim_params.emission_rate == "daily":
128
            source_date_index = self.env.days_in_range(
129
                start=sim_params.period_start,
130
                end=sim_params.period_end
131
            )
132
        else:
133
            source_date_index = self.env.minutes_for_days_in_range(
134
                start=sim_params.period_start,
135
                end=sim_params.period_end
136
            )
137
138
        # Break the source_df up into one dataframe per sid.  This lets
139
        # us (more easily) calculate accurate start/end dates for each sid,
140
        # de-dup data, and expand the data to fit the backtest start/end date.
141
        grouped_by_sid = source_df.groupby(["sid"])
142
        group_names = grouped_by_sid.groups.keys()
143
        group_dict = {}
144
        for group_name in group_names:
145
            group_dict[group_name] = grouped_by_sid.get_group(group_name)
146
147
        for identifier, df in iteritems(group_dict):
148
            # Before reindexing, save the earliest and latest dates
149
            earliest_date = df.index[0]
150
            latest_date = df.index[-1]
151
152
            # Since we know this df only contains a single sid, we can safely
153
            # de-dupe by the index (dt)
154
            df = df.groupby(level=0).last()
155
156
            # Reindex the dataframe based on the backtest start/end date.
157
            # This makes reads easier during the backtest.
158
            df = df.reindex(index=source_date_index, method='ffill')
159
160
            if not isinstance(identifier, Asset):
161
                # for fake assets we need to store a start/end date
162
                self._asset_start_dates[identifier] = earliest_date
163
                self._asset_end_dates[identifier] = latest_date
164
165
            for col_name in df.columns.difference(['sid']):
166
                if col_name not in self._augmented_sources_map:
167
                    self._augmented_sources_map[col_name] = {}
168
169
                self._augmented_sources_map[col_name][identifier] = df
170
171
    def _open_minute_file(self, field, asset):
172
        sid_str = str(int(asset))
173
174
        try:
175
            carray = self._carrays[field][sid_str]
176
        except KeyError:
177
            carray = self._carrays[field][sid_str] = \
178
                self._get_ctable(asset)[field]
179
180
        return carray
181
182
    def _get_ctable(self, asset):
183
        sid = int(asset)
184
185
        if isinstance(asset, Future):
186
            if self._future_minute_reader.sid_path_func is not None:
187
                path = self._future_minute_reader.sid_path_func(
188
                    self._future_minute_reader.rootdir, sid
189
                )
190
            else:
191
                path = "{0}/{1}.bcolz".format(
192
                    self._future_minute_reader.rootdir, sid)
193
        elif isinstance(asset, Equity):
194
            if self._equity_minute_reader.sid_path_func is not None:
195
                path = self._equity_minute_reader.sid_path_func(
196
                    self._equity_minute_reader.rootdir, sid
197
                )
198
            else:
199
                path = "{0}/{1}.bcolz".format(
200
                    self._equity_minute_reader.rootdir, sid)
201
202
        else:
203
            # TODO: Figure out if assets should be allowed if neither, and
204
            # why this code path is being hit.
205
            if self._equity_minute_reader.sid_path_func is not None:
206
                path = self._equity_minute_reader.sid_path_func(
207
                    self._equity_minute_reader.rootdir, sid
208
                )
209
            else:
210
                path = "{0}/{1}.bcolz".format(
211
                    self._equity_minute_reader.rootdir, sid)
212
213
        return bcolz.open(path, mode='r')
214
215
    def get_last_traded_dt(self, asset, dt, data_frequency):
216
        """
217
        Given an asset and dt, returns the last traded dt from the viewpoint
218
        of the given dt.
219
220
        If there is a trade on the dt, the answer is dt provided.
221
        """
222
        if data_frequency == 'minute':
223
            return self._equity_minute_reader.get_last_traded_dt(asset, dt)
224
        elif data_frequency == 'daily':
225
            return self._equity_daily_reader.get_last_traded_dt(asset, dt)
226
227
    def get_previous_value(self, asset, field, dt, data_frequency):
228
        """
229
        Given an asset and a column and a dt, returns the previous value for
230
        the same asset/column pair.  If this data portal is in minute mode,
231
        it's the previous minute value, otherwise it's the previous day's
232
        value.
233
234
        Parameters
235
        ---------
236
        asset : Asset
237
            The asset whose data is desired.
238
239
        field: string
240
            The desired field of the asset.  Valid values are "open",
241
            "open_price", "high", "low", "close", "close_price", "volume", and
242
            "price".
243
244
        dt: pd.Timestamp
245
            The timestamp from which to go back in time one slot.
246
247
        data_frequency: string
248
            The frequency of the data to query; i.e. whether the data is
249
            'daily' or 'minute' bars
250
251
        Returns
252
        -------
253
        The value of the desired field at the desired time.
254
        """
255
        if data_frequency == 'daily':
256
            prev_dt = self.env.previous_trading_day(dt)
257
        elif data_frequency == 'minute':
258
            prev_dt = self.env.previous_market_minute(dt)
259
260
        return self.get_spot_value(asset, field, prev_dt, data_frequency)
261
262
    def _check_extra_sources(self, asset, column, day):
263
        # If we have an extra source with a column called "price", only look
264
        # at it if it's on something like palladium and not AAPL (since our
265
        # own price data always wins when dealing with assets).
266
        look_in_augmented_sources = column in self._augmented_sources_map and \
267
            not (column in BASE_FIELDS and isinstance(asset, Asset))
268
269
        if look_in_augmented_sources:
270
            # we're being asked for a field in an extra source
271
            try:
272
                return self._augmented_sources_map[column][asset].\
273
                    loc[day, column]
274
            except:
275
                log.error(
276
                    "Could not find value for asset={0}, day={1},"
277
                    "column={2}".format(
278
                        str(asset),
279
                        str(day),
280
                        str(column)))
281
282
                raise KeyError
283
284
    def get_spot_value(self, asset, field, dt, data_frequency):
285
        """
286
        Public API method that returns a scalar value representing the value
287
        of the desired asset's field at either the given dt.
288
289
        Parameters
290
        ---------
291
        asset : Asset
292
            The asset whose data is desired.gith
293
294
        field: string
295
            The desired field of the asset.  Valid values are "open",
296
            "open_price", "high", "low", "close", "close_price", "volume", and
297
            "price".
298
299
        dt: pd.Timestamp
300
            The timestamp for the desired value.
301
302
        data_frequency: string
303
            The frequency of the data to query; i.e. whether the data is
304
            'daily' or 'minute' bars
305
306
        Returns
307
        -------
308
        The value of the desired field at the desired time.
309
        """
310
        extra_source_val = self._check_extra_sources(
311
            asset,
312
            field,
313
            dt,
314
        )
315
316
        if extra_source_val is not None:
317
            return extra_source_val
318
319
        if field not in BASE_FIELDS:
320
            raise KeyError("Invalid column: " + str(field))
321
322
        column_to_use = BASE_FIELDS[field]
323
324
        if isinstance(asset, int):
325
            asset = self._asset_finder.retrieve_asset(asset)
326
327
        self._check_is_currently_alive(asset, dt)
328
329
        if data_frequency == "daily":
330
            day_to_use = dt
331
            day_to_use = normalize_date(day_to_use)
332
            return self._get_daily_data(asset, column_to_use, day_to_use)
333
        else:
334
            if isinstance(asset, Future):
335
                return self._get_minute_spot_value_future(
336
                    asset, column_to_use, dt)
337
            else:
338
                return self._get_minute_spot_value(
339
                    asset, column_to_use, dt)
340
341
    def _get_minute_spot_value_future(self, asset, column, dt):
342
        # Futures bcolz files have 1440 bars per day (24 hours), 7 days a week.
343
        # The file attributes contain the "start_dt" and "last_dt" fields,
344
        # which represent the time period for this bcolz file.
345
346
        # The start_dt is midnight of the first day that this future started
347
        # trading.
348
349
        # figure out the # of minutes between dt and this asset's start_dt
350
        start_date = self._get_asset_start_date(asset)
351
        minute_offset = int((dt - start_date).total_seconds() / 60)
352
353
        if minute_offset < 0:
354
            # asking for a date that is before the asset's start date, no dice
355
            return 0.0
356
357
        # then just index into the bcolz carray at that offset
358
        carray = self._open_minute_file(column, asset)
359
        result = carray[minute_offset]
360
361
        # if there's missing data, go backwards until we run out of file
362
        while result == 0 and minute_offset > 0:
363
            minute_offset -= 1
364
            result = carray[minute_offset]
365
366
        if column != 'volume':
367
            return result * self.MINUTE_PRICE_ADJUSTMENT_FACTOR
368
        else:
369
            return result
370
371
    def _get_minute_spot_value(self, asset, column, dt):
372
        # if dt is before the first market minute, minute_index
373
        # will be 0.  if it's after the last market minute, it'll
374
        # be len(minutes_for_day)
375
        minute_offset_to_use = \
376
            self._equity_minute_reader._find_position_of_minute(dt)
377
378
        carray = self._equity_minute_reader._open_minute_file(column, asset)
379
        result = carray[minute_offset_to_use]
380
381
        if result == 0:
382
            # if the given minute doesn't have data, we need to seek
383
            # backwards until we find data. This makes the data
384
            # forward-filled.
385
386
            # get this asset's start date, so that we don't look before it.
387
            start_date = self._get_asset_start_date(asset)
388
            start_date_idx = self._equity_minute_reader.trading_days.\
389
                searchsorted(start_date)
390
            start_day_offset = start_date_idx * 390
391
392
            original_start = minute_offset_to_use
393
394
            while result == 0 and minute_offset_to_use > start_day_offset:
395
                minute_offset_to_use -= 1
396
                result = carray[minute_offset_to_use]
397
398
            # once we've found data, we need to check whether it needs
399
            # to be adjusted.
400
            if result != 0:
401
                minutes = self.env.market_minute_window(
402
                    start=dt,
403
                    count=(original_start - minute_offset_to_use + 1),
404
                    step=-1
405
                ).order()
406
407
                # only need to check for adjustments if we've gone back
408
                # far enough to cross the day boundary.
409
                if minutes[0].date() != minutes[-1].date():
410
                    # create a np array of size minutes, fill it all with
411
                    # the same value.  and adjust the array.
412
                    arr = np.array([result] * len(minutes),
413
                                   dtype=np.float64)
414
                    self._apply_all_adjustments(
415
                        data=arr,
416
                        asset=asset,
417
                        dts=minutes,
418
                        field=column
419
                    )
420
421
                    # The first value of the adjusted array is the value
422
                    # we want.
423
                    result = arr[0]
424
425
        if column != 'volume':
426
            return result * self.MINUTE_PRICE_ADJUSTMENT_FACTOR
427
        else:
428
            return result
429
430
    def _get_daily_data(self, asset, column, dt):
431
        while True:
432
            try:
433
                value = self._equity_daily_reader.spot_price(asset, dt, column)
434
                if value != -1:
435
                    return value
436
                else:
437
                    dt -= tradingcalendar.trading_day
438
            except NoDataOnDate:
439
                return 0
440
441
    def _get_history_daily_window(self, assets, end_dt, bar_count,
442
                                  field_to_use):
443
        """
444
        Internal method that returns a dataframe containing history bars
445
        of daily frequency for the given sids.
446
        """
447
        day_idx = tradingcalendar.trading_days.searchsorted(end_dt.date())
448
        days_for_window = tradingcalendar.trading_days[
449
            (day_idx - bar_count + 1):(day_idx + 1)]
450
451
        if len(assets) == 0:
452
            return pd.DataFrame(None,
453
                                index=days_for_window,
454
                                columns=None)
455
456
        data = []
457
458
        for asset in assets:
459
            if isinstance(asset, Future):
460
                data.append(self._get_history_daily_window_future(
461
                    asset, days_for_window, end_dt, field_to_use
462
                ))
463
            else:
464
                data.append(self._get_history_daily_window_equity(
465
                    asset, days_for_window, end_dt, field_to_use
466
                ))
467
468
        return pd.DataFrame(
469
            np.array(data).T,
470
            index=days_for_window,
471
            columns=assets
472
        )
473
474
    def _get_history_daily_window_future(self, asset, days_for_window,
475
                                         end_dt, column):
476
        # Since we don't have daily bcolz files for futures (yet), use minute
477
        # bars to calculate the daily values.
478
        data = []
479
        data_groups = []
480
481
        # get all the minutes for the days NOT including today
482
        for day in days_for_window[:-1]:
483
            minutes = self.env.market_minutes_for_day(day)
484
485
            values_for_day = np.zeros(len(minutes), dtype=np.float64)
486
487
            for idx, minute in enumerate(minutes):
488
                minute_val = self._get_minute_spot_value_future(
489
                    asset, column, minute
490
                )
491
492
                values_for_day[idx] = minute_val
493
494
            data_groups.append(values_for_day)
495
496
        # get the minutes for today
497
        last_day_minutes = pd.date_range(
498
            start=self.env.get_open_and_close(end_dt)[0],
499
            end=end_dt,
500
            freq="T"
501
        )
502
503
        values_for_last_day = np.zeros(len(last_day_minutes), dtype=np.float64)
504
505
        for idx, minute in enumerate(last_day_minutes):
506
            minute_val = self._get_minute_spot_value_future(
507
                asset, column, minute
508
            )
509
510
            values_for_last_day[idx] = minute_val
511
512
        data_groups.append(values_for_last_day)
513
514
        for group in data_groups:
515
            if len(group) == 0:
516
                continue
517
518
            if column == 'volume':
519
                data.append(np.sum(group))
520
            elif column == 'open':
521
                data.append(group[0])
522
            elif column == 'close':
523
                data.append(group[-1])
524
            elif column == 'high':
525
                data.append(np.amax(group))
526
            elif column == 'low':
527
                data.append(np.amin(group))
528
529
        return data
530
531
    def _get_history_daily_window_equity(self, asset, days_for_window,
532
                                         end_dt, field_to_use):
533
        sid = int(asset)
534
        ends_at_midnight = end_dt.hour == 0 and end_dt.minute == 0
535
536
        # get the start and end dates for this sid
537
        end_date = self._get_asset_end_date(asset)
538
539
        if ends_at_midnight or (days_for_window[-1] > end_date):
540
            # two cases where we use daily data for the whole range:
541
            # 1) the history window ends at midnight utc.
542
            # 2) the last desired day of the window is after the
543
            # last trading day, use daily data for the whole range.
544
            return self._get_daily_window_for_sid(
545
                asset,
546
                field_to_use,
547
                days_for_window,
548
                extra_slot=False
549
            )
550
        else:
551
            # for the last day of the desired window, use minute
552
            # data and aggregate it.
553
            all_minutes_for_day = self.env.market_minutes_for_day(
554
                pd.Timestamp(end_dt.date()))
555
556
            last_minute_idx = all_minutes_for_day.searchsorted(end_dt)
557
558
            # these are the minutes for the partial day
559
            minutes_for_partial_day =\
560
                all_minutes_for_day[0:(last_minute_idx + 1)]
561
562
            daily_data = self._get_daily_window_for_sid(
563
                sid,
564
                field_to_use,
565
                days_for_window[0:-1]
566
            )
567
568
            minute_data = self._get_minute_window_for_equity(
569
                sid,
570
                field_to_use,
571
                minutes_for_partial_day
572
            )
573
574
            if field_to_use == 'volume':
575
                minute_value = np.sum(minute_data)
576
            elif field_to_use == 'open':
577
                minute_value = minute_data[0]
578
            elif field_to_use == 'close':
579
                minute_value = minute_data[-1]
580
            elif field_to_use == 'high':
581
                minute_value = np.amax(minute_data)
582
            elif field_to_use == 'low':
583
                minute_value = np.amin(minute_data)
584
585
            # append the partial day.
586
            daily_data[-1] = minute_value
587
588
            return daily_data
589
590
    def _get_history_minute_window(self, assets, end_dt, bar_count,
591
                                   field_to_use):
592
        """
593
        Internal method that returns a dataframe containing history bars
594
        of minute frequency for the given sids.
595
        """
596
        # get all the minutes for this window
597
        minutes_for_window = self.env.market_minute_window(
598
            end_dt, bar_count, step=-1)[::-1]
599
600
        first_trading_day = self._equity_minute_reader.first_trading_day
601
602
        # but then cut it down to only the minutes after
603
        # the first trading day.
604
        modified_minutes_for_window = minutes_for_window[
605
            minutes_for_window.slice_indexer(first_trading_day)]
606
607
        modified_minutes_length = len(modified_minutes_for_window)
608
609
        if modified_minutes_length == 0:
610
            raise ValueError("Cannot calculate history window that ends"
611
                             "before 2002-01-02 14:31 UTC!")
612
613
        data = []
614
        bars_to_prepend = 0
615
        nans_to_prepend = None
616
617
        if modified_minutes_length < bar_count:
618
            first_trading_date = first_trading_day.date()
619
            if modified_minutes_for_window[0].date() == first_trading_date:
620
                # the beginning of the window goes before our global trading
621
                # start date
622
                bars_to_prepend = bar_count - modified_minutes_length
623
                nans_to_prepend = np.repeat(np.nan, bars_to_prepend)
624
625
        if len(assets) == 0:
626
            return pd.DataFrame(
627
                None,
628
                index=modified_minutes_for_window,
629
                columns=None
630
            )
631
632
        for asset in assets:
633
            asset_minute_data = self._get_minute_window_for_asset(
634
                asset,
635
                field_to_use,
636
                modified_minutes_for_window
637
            )
638
639
            if bars_to_prepend != 0:
640
                asset_minute_data = np.insert(asset_minute_data, 0,
641
                                              nans_to_prepend)
642
643
            data.append(asset_minute_data)
644
645
        return pd.DataFrame(
646
            np.array(data).T,
647
            index=minutes_for_window,
648
            columns=assets
649
        )
650
651
    def get_history_window(self, assets, end_dt, bar_count, frequency, field,
652
                           ffill=True):
653
        """
654
        Public API method that returns a dataframe containing the requested
655
        history window.  Data is fully adjusted.
656
657
        Parameters
658
        ---------
659
        assets : list of zipline.data.Asset objects
660
            The assets whose data is desired.
661
662
        bar_count: int
663
            The number of bars desired.
664
665
        frequency: string
666
            "1d" or "1m"
667
668
        field: string
669
            The desired field of the asset.
670
671
        ffill: boolean
672
            Forward-fill missing values. Only has effect if field
673
            is 'price'.
674
675
        Returns
676
        -------
677
        A dataframe containing the requested data.
678
        """
679
        try:
680
            field_to_use = BASE_FIELDS[field]
681
        except KeyError:
682
            raise ValueError("Invalid history field: " + str(field))
683
684
        # sanity check in case sids were passed in
685
        assets = [(self.env.asset_finder.retrieve_asset(asset) if
686
                   isinstance(asset, int) else asset) for asset in assets]
687
688
        if frequency == "1d":
689
            df = self._get_history_daily_window(assets, end_dt, bar_count,
690
                                                field_to_use)
691
        elif frequency == "1m":
692
            df = self._get_history_minute_window(assets, end_dt, bar_count,
693
                                                 field_to_use)
694
        else:
695
            raise ValueError("Invalid frequency: {0}".format(frequency))
696
697
        # forward-fill if needed
698
        if field == "price" and ffill:
699
            df.fillna(method='ffill', inplace=True)
700
701
        return df
702
703
    def _get_minute_window_for_asset(self, asset, field, minutes_for_window):
704
        """
705
        Internal method that gets a window of adjusted minute data for an asset
706
        and specified date range.  Used to support the history API method for
707
        minute bars.
708
709
        Missing bars are filled with NaN.
710
711
        Parameters
712
        ----------
713
        asset : Asset
714
            The asset whose data is desired.
715
716
        field: string
717
            The specific field to return.  "open", "high", "close_price", etc.
718
719
        minutes_for_window: pd.DateTimeIndex
720
            The list of minutes representing the desired window.  Each minute
721
            is a pd.Timestamp.
722
723
        Returns
724
        -------
725
        A numpy array with requested values.
726
        """
727
        if isinstance(asset, int):
728
            asset = self.env.asset_finder.retrieve_asset(asset)
729
730
        if isinstance(asset, Future):
731
            return self._get_minute_window_for_future(asset, field,
732
                                                      minutes_for_window)
733
        else:
734
            return self._get_minute_window_for_equity(asset, field,
735
                                                      minutes_for_window)
736
737
    def _get_minute_window_for_future(self, asset, field, minutes_for_window):
738
        # THIS IS TEMPORARY.  For now, we are only exposing futures within
739
        # equity trading hours (9:30 am to 4pm, Eastern).  The easiest way to
740
        # do this is to simply do a spot lookup for each desired minute.
741
        return_data = np.zeros(len(minutes_for_window), dtype=np.float64)
742
        for idx, minute in enumerate(minutes_for_window):
743
            return_data[idx] = \
744
                self._get_minute_spot_value_future(asset, field, minute)
745
746
        # Note: an improvement could be to find the consecutive runs within
747
        # minutes_for_window, and use them to read the underlying ctable
748
        # more efficiently.
749
750
        # Once futures are on 24-hour clock, then we can just grab all the
751
        # requested minutes in one shot from the ctable.
752
753
        # no adjustments for futures, yay.
754
        return return_data
755
756
    def _get_minute_window_for_equity(self, asset, field, minutes_for_window):
757
        # each sid's minutes are stored in a bcolz file
758
        # the bcolz file has 390 bars per day, starting at 1/2/2002, regardless
759
        # of when the asset started trading and regardless of half days.
760
        # for a half day, the second half is filled with zeroes.
761
762
        # find the position of start_dt in the entire timeline, go back
763
        # bar_count bars, and that's the unadjusted data
764
        raw_data = self._equity_minute_reader._open_minute_file(field, asset)
765
766
        try:
767
            start_idx = self._equity_minute_reader._find_position_of_minute(
768
                minutes_for_window[0])
769
        except KeyError:
770
            start_idx = 0
771
772
        try:
773
            end_idx = self._equity_minute_reader._find_position_of_minute(
774
                minutes_for_window[-1]) + 1
775
        except KeyError:
776
            end_idx = 0
777
778
        if end_idx == 0:
779
            # No data to return for minute window.
780
            return np.full(len(minutes_for_window), np.nan)
781
782
        return_data = np.zeros(len(minutes_for_window), dtype=np.float64)
783
784
        data_to_copy = raw_data[start_idx:end_idx]
785
786
        num_minutes = len(minutes_for_window)
787
788
        # data_to_copy contains all the zeros (from 1pm to 4pm of an early
789
        # close).  num_minutes is the number of actual trading minutes.  if
790
        # these two have different lengths, that means that we need to trim
791
        # away data due to early closes.
792
        if len(data_to_copy) != num_minutes:
793
            # get a copy of the minutes in Eastern time, since we depend on
794
            # an early close being at 1pm Eastern.
795
            eastern_minutes = minutes_for_window.tz_convert("US/Eastern")
796
797
            # accumulate a list of indices of the last minute of an early
798
            # close day.  For example, if data_to_copy starts at 12:55 pm, and
799
            # there are five minutes of real data before 180 zeroes, we would
800
            # put 5 into last_minute_idx_of_early_close_day, because the fifth
801
            # minute is the last "real" minute of the day.
802
            last_minute_idx_of_early_close_day = []
803
            for minute_idx, minute_dt in enumerate(eastern_minutes):
804
                if minute_idx == (num_minutes - 1):
805
                    break
806
807
                if minute_dt.hour == 13 and minute_dt.minute == 0:
808
                    next_minute = eastern_minutes[minute_idx + 1]
809
                    if next_minute.hour != 13:
810
                        # minute_dt is the last minute of an early close day
811
                        last_minute_idx_of_early_close_day.append(minute_idx)
812
813
            # spin through the list of early close markers, and use them to
814
            # chop off 180 minutes at a time from data_to_copy.
815
            for idx, early_close_minute_idx in \
816
                    enumerate(last_minute_idx_of_early_close_day):
817
                early_close_minute_idx -= (180 * idx)
818
                data_to_copy = np.delete(
819
                    data_to_copy,
820
                    range(
821
                        early_close_minute_idx + 1,
822
                        early_close_minute_idx + 181
823
                    )
824
                )
825
826
        return_data[0:len(data_to_copy)] = data_to_copy
827
828
        self._apply_all_adjustments(
829
            return_data,
830
            asset,
831
            minutes_for_window,
832
            field,
833
            self.MINUTE_PRICE_ADJUSTMENT_FACTOR
834
        )
835
836
        return return_data
837
838
    def _apply_all_adjustments(self, data, asset, dts, field,
839
                               price_adj_factor=1.0):
840
        """
841
        Internal method that applies all the necessary adjustments on the
842
        given data array.
843
844
        The adjustments are:
845
        - splits
846
        - if field != "volume":
847
            - mergers
848
            - dividends
849
            - * 0.001
850
            - any zero fields replaced with NaN
851
        - all values rounded to 3 digits after the decimal point.
852
853
        Parameters
854
        ----------
855
        data : np.array
856
            The data to be adjusted.
857
858
        asset: Asset
859
            The asset whose data is being adjusted.
860
861
        dts: pd.DateTimeIndex
862
            The list of minutes or days representing the desired window.
863
864
        field: string
865
            The field whose values are in the data array.
866
867
        price_adj_factor: float
868
            Factor with which to adjust OHLC values.
869
        Returns
870
        -------
871
        None.  The data array is modified in place.
872
        """
873
        self._apply_adjustments_to_window(
874
            self._get_adjustment_list(
875
                asset, self._splits_dict, "SPLITS"
876
            ),
877
            data,
878
            dts,
879
            field != 'volume'
880
        )
881
882
        if field != 'volume':
883
            self._apply_adjustments_to_window(
884
                self._get_adjustment_list(
885
                    asset, self._mergers_dict, "MERGERS"
886
                ),
887
                data,
888
                dts,
889
                True
890
            )
891
892
            self._apply_adjustments_to_window(
893
                self._get_adjustment_list(
894
                    asset, self._dividends_dict, "DIVIDENDS"
895
                ),
896
                data,
897
                dts,
898
                True
899
            )
900
901
            data *= price_adj_factor
902
903
            # if anything is zero, it's a missing bar, so replace it with NaN.
904
            # we only want to do this for non-volume fields, because a missing
905
            # volume should be 0.
906
            data[data == 0] = np.NaN
907
908
        np.around(data, 3, out=data)
909
910
    def _get_daily_window_for_sid(self, asset, field, days_in_window,
911
                                  extra_slot=True):
912
        """
913
        Internal method that gets a window of adjusted daily data for a sid
914
        and specified date range.  Used to support the history API method for
915
        daily bars.
916
917
        Parameters
918
        ----------
919
        asset : Asset
920
            The asset whose data is desired.
921
922
        start_dt: pandas.Timestamp
923
            The start of the desired window of data.
924
925
        bar_count: int
926
            The number of days of data to return.
927
928
        field: string
929
            The specific field to return.  "open", "high", "close_price", etc.
930
931
        extra_slot: boolean
932
            Whether to allocate an extra slot in the returned numpy array.
933
            This extra slot will hold the data for the last partial day.  It's
934
            much better to create it here than to create a copy of the array
935
            later just to add a slot.
936
937
        Returns
938
        -------
939
        A numpy array with requested values.  Any missing slots filled with
940
        nan.
941
942
        """
943
        bar_count = len(days_in_window)
944
        # create an np.array of size bar_count
945
        if extra_slot:
946
            return_array = np.zeros((bar_count + 1,))
947
        else:
948
            return_array = np.zeros((bar_count,))
949
950
        return_array[:] = np.NAN
951
952
        start_date = self._get_asset_start_date(asset)
953
        end_date = self._get_asset_end_date(asset)
954
        day_slice = days_in_window.slice_indexer(start_date, end_date)
955
        active_days = days_in_window[day_slice]
956
957
        if active_days.shape[0]:
958
            data = self._equity_daily_reader.history_window(field,
959
                                                            active_days[0],
960
                                                            active_days[-1],
961
                                                            asset)
962
            return_array[day_slice] = data
963
            self._apply_all_adjustments(
964
                return_array,
965
                asset,
966
                active_days,
967
                field,
968
            )
969
970
        return return_array
971
972
    @staticmethod
973
    def _apply_adjustments_to_window(adjustments_list, window_data,
974
                                     dts_in_window, multiply):
975
        if len(adjustments_list) == 0:
976
            return
977
978
        # advance idx to the correct spot in the adjustments list, based on
979
        # when the window starts
980
        idx = 0
981
982
        while idx < len(adjustments_list) and dts_in_window[0] >\
983
                adjustments_list[idx][0]:
984
            idx += 1
985
986
        # if we've advanced through all the adjustments, then there's nothing
987
        # to do.
988
        if idx == len(adjustments_list):
989
            return
990
991
        while idx < len(adjustments_list):
992
            adjustment_to_apply = adjustments_list[idx]
993
994
            if adjustment_to_apply[0] > dts_in_window[-1]:
995
                break
996
997
            range_end = dts_in_window.searchsorted(adjustment_to_apply[0])
998
            if multiply:
999
                window_data[0:range_end] *= adjustment_to_apply[1]
1000
            else:
1001
                window_data[0:range_end] /= adjustment_to_apply[1]
1002
1003
            idx += 1
1004
1005
    def _get_adjustment_list(self, asset, adjustments_dict, table_name):
1006
        """
1007
        Internal method that returns a list of adjustments for the given sid.
1008
1009
        Parameters
1010
        ----------
1011
        asset : Asset
1012
            The asset for which to return adjustments.
1013
1014
        adjustments_dict: dict
1015
            A dictionary of sid -> list that is used as a cache.
1016
1017
        table_name: string
1018
            The table that contains this data in the adjustments db.
1019
1020
        Returns
1021
        -------
1022
        adjustments: list
1023
            A list of [multiplier, pd.Timestamp], earliest first
1024
1025
        """
1026
        if self._adjustment_reader is None:
1027
            return []
1028
1029
        sid = int(asset)
1030
1031
        try:
1032
            adjustments = adjustments_dict[sid]
1033
        except KeyError:
1034
            adjustments = adjustments_dict[sid] = self._adjustment_reader.\
1035
                get_adjustments_for_sid(table_name, sid)
1036
1037
        return adjustments
1038
1039
    def _check_is_currently_alive(self, asset, dt):
1040
        sid = int(asset)
1041
1042
        if sid not in self._asset_start_dates:
1043
            self._get_asset_start_date(asset)
1044
1045
        start_date = self._asset_start_dates[sid]
1046
        if self._asset_start_dates[sid] > dt:
1047
            raise NoTradeDataAvailableTooEarly(
1048
                sid=sid,
1049
                dt=normalize_date(dt),
1050
                start_dt=start_date
1051
            )
1052
1053
        end_date = self._asset_end_dates[sid]
1054
        if self._asset_end_dates[sid] < dt:
1055
            raise NoTradeDataAvailableTooLate(
1056
                sid=sid,
1057
                dt=normalize_date(dt),
1058
                end_dt=end_date
1059
            )
1060
1061
    def _get_asset_start_date(self, asset):
1062
        self._ensure_asset_dates(asset)
1063
        return self._asset_start_dates[asset]
1064
1065
    def _get_asset_end_date(self, asset):
1066
        self._ensure_asset_dates(asset)
1067
        return self._asset_end_dates[asset]
1068
1069
    def _ensure_asset_dates(self, asset):
1070
        sid = int(asset)
1071
1072
        if sid not in self._asset_start_dates:
1073
            self._asset_start_dates[sid] = asset.start_date
1074
            self._asset_end_dates[sid] = asset.end_date
1075
1076
    def get_splits(self, sids, dt):
1077
        """
1078
        Returns any splits for the given sids and the given dt.
1079
1080
        Parameters
1081
        ----------
1082
        sids : list
1083
            Sids for which we want splits.
1084
1085
        dt: pd.Timestamp
1086
            The date for which we are checking for splits.  Note: this is
1087
            expected to be midnight UTC.
1088
1089
        Returns
1090
        -------
1091
        list: List of splits, where each split is a (sid, ratio) tuple.
1092
        """
1093
        if self._adjustment_reader is None or len(sids) == 0:
1094
            return {}
1095
1096
        # convert dt to # of seconds since epoch, because that's what we use
1097
        # in the adjustments db
1098
        seconds = int(dt.value / 1e9)
1099
1100
        splits = self._adjustment_reader.conn.execute(
1101
            "SELECT sid, ratio FROM SPLITS WHERE effective_date = ?",
1102
            (seconds,)).fetchall()
1103
1104
        sids_set = set(sids)
1105
        splits = [split for split in splits if split[0] in sids_set]
1106
1107
        return splits
1108
1109
    def get_stock_dividends(self, sid, trading_days):
1110
        """
1111
        Returns all the stock dividends for a specific sid that occur
1112
        in the given trading range.
1113
1114
        Parameters
1115
        ----------
1116
        sid: int
1117
            The asset whose stock dividends should be returned.
1118
1119
        trading_days: pd.DatetimeIndex
1120
            The trading range.
1121
1122
        Returns
1123
        -------
1124
        list: A list of objects with all relevant attributes populated.
1125
        All timestamp fields are converted to pd.Timestamps.
1126
        """
1127
1128
        if self._adjustment_reader is None:
1129
            return []
1130
1131
        if len(trading_days) == 0:
1132
            return []
1133
1134
        start_dt = trading_days[0].value / 1e9
1135
        end_dt = trading_days[-1].value / 1e9
1136
1137
        dividends = self._adjustment_reader.conn.execute(
1138
            "SELECT * FROM stock_dividend_payouts WHERE sid = ? AND "
1139
            "ex_date > ? AND pay_date < ?", (int(sid), start_dt, end_dt,)).\
1140
            fetchall()
1141
1142
        dividend_info = []
1143
        for dividend_tuple in dividends:
1144
            dividend_info.append({
1145
                "declared_date": dividend_tuple[1],
1146
                "ex_date": pd.Timestamp(dividend_tuple[2], unit="s"),
1147
                "pay_date": pd.Timestamp(dividend_tuple[3], unit="s"),
1148
                "payment_sid": dividend_tuple[4],
1149
                "ratio": dividend_tuple[5],
1150
                "record_date": pd.Timestamp(dividend_tuple[6], unit="s"),
1151
                "sid": dividend_tuple[7]
1152
            })
1153
1154
        return dividend_info
1155
1156
    def contains(self, asset, field):
1157
        return field in BASE_FIELDS or \
1158
            (field in self._augmented_sources_map and
1159
             asset in self._augmented_sources_map[field])
1160
1161
    def get_fetcher_assets(self, day):
1162
        """
1163
        Returns a list of assets for the current date, as defined by the
1164
        fetcher data.
1165
1166
        Notes
1167
        -----
1168
        Data is forward-filled.  If there is no fetcher data defined for day
1169
        N, we use day N-1's data (if available, otherwise we keep going back).
1170
1171
        Returns
1172
        -------
1173
        list: a list of Asset objects.
1174
        """
1175
        # return a list of assets for the current date, as defined by the
1176
        # fetcher source
1177
        if self._extra_source_df is None:
1178
            return []
1179
1180
        if day in self._extra_source_df.index:
1181
            date_to_use = day
1182
        else:
1183
            # current day isn't in the fetcher df, go back the last
1184
            # available day
1185
            idx = self._extra_source_df.index.searchsorted(day)
1186
            if idx == 0:
1187
                return []
1188
1189
            date_to_use = self._extra_source_df.index[idx - 1]
1190
1191
        asset_list = self._extra_source_df.loc[date_to_use]["sid"]
1192
1193
        # make sure they're actually assets
1194
        asset_list = [asset for asset in asset_list
1195
                      if isinstance(asset, Asset)]
1196
1197
        return asset_list
1198