Completed
Pull Request — master (#858)
by Eddie
02:03
created

_apply_adjustments_to_window()   D

Complexity

Conditions 8

Size

Total Lines 32

Duplication

Lines 0
Ratio 0 %
Metric Value
cc 8
dl 0
loc 32
rs 4
1
#
2
# Copyright 2015 Quantopian, Inc.
3
#
4
# Licensed under the Apache License, Version 2.0 (the "License");
5
# you may not use this file except in compliance with the License.
6
# You may obtain a copy of the License at
7
#
8
#     http://www.apache.org/licenses/LICENSE-2.0
9
#
10
# Unless required by applicable law or agreed to in writing, software
11
# distributed under the License is distributed on an "AS IS" BASIS,
12
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13
# See the License for the specific language governing permissions and
14
# limitations under the License.
15
16
from datetime import datetime
17
import bcolz
18
from logbook import Logger
19
20
import numpy as np
21
import pandas as pd
22
23
from zipline.assets import Asset
24
25
from zipline.utils import tradingcalendar
26
from zipline.errors import (
27
    NoTradeDataAvailableTooEarly,
28
    NoTradeDataAvailableTooLate
29
)
30
31
# FIXME anything to do with 2002-01-02 probably belongs in qexec, right/
32
FIRST_TRADING_DAY = pd.Timestamp("2002-01-02 00:00:00", tz='UTC')
33
FIRST_TRADING_MINUTE = pd.Timestamp("2002-01-02 14:31:00", tz='UTC')
34
35
# FIXME should this be passed in (is this qexec specific?)?
36
INDEX_OF_FIRST_TRADING_DAY = 3028
37
38
log = Logger('DataPortal')
39
40
HISTORY_FREQUENCIES = ["1d", "1m"]
41
42
BASE_FIELDS = {
43
    'open': 'open',
44
    'open_price': 'open',
45
    'high': 'high',
46
    'low': 'low',
47
    'close': 'close',
48
    'close_price': 'close',
49
    'volume': 'volume',
50
    'price': 'close'
51
}
52
53
54
class DataPortal(object):
55
    def __init__(self,
56
                 env,
57
                 sim_params=None,
58
                 minutes_equities_path=None,
59
                 daily_equities_path=None,
60
                 adjustment_reader=None,
61
                 sid_path_func=None):
62
        self.env = env
63
64
        # Internal pointers to the current dt (can be minute) and current day.
65
        # In daily mode, they point to the same thing. In minute mode, it's
66
        # useful to have separate pointers to the current day and to the
67
        # current minute.  These pointers are updated by the
68
        # AlgorithmSimulator's transform loop.
69
        self.current_dt = None
70
        self.current_day = None
71
72
        # This is a bit ugly, but is here for performance reasons.  In minute
73
        # simulations, we need to very quickly go from dt -> (# of minutes
74
        # since Jan 1 2002 9:30 Eastern).
75
        #
76
        # The clock that heartbeats the simulation has all the necessary
77
        # information to do this calculation very quickly.  This value is
78
        # calculated there, and then set here
79
        self.cur_data_offset = 0
80
81
        self.views = {}
82
83
        if minutes_equities_path is None and daily_equities_path is None:
84
            raise ValueError("Must provide at least one of minute or "
85
                             "daily data path!")
86
87
        self._minutes_equities_path = minutes_equities_path
88
        self._daily_equities_path = daily_equities_path
89
        self._asset_finder = env.asset_finder
90
91
        self._carrays = {
92
            'open': {},
93
            'high': {},
94
            'low': {},
95
            'close': {},
96
            'volume': {},
97
            'sid': {},
98
            'dt': {},
99
        }
100
101
        self._adjustment_reader = adjustment_reader
102
103
        # caches of sid -> adjustment list
104
        self._splits_dict = {}
105
        self._mergers_dict = {}
106
        self._dividends_dict = {}
107
108
        # Pointer to the daily bcolz file.
109
        self._daily_equities_data = None
110
111
        # Cache of sid -> the first trading day of an asset, even if that day
112
        # is before 1/2/2002.
113
        self._asset_start_dates = {}
114
        self._asset_end_dates = {}
115
116
        # Fetcher state
117
        self._augmented_sources_map = {}
118
        self._fetcher_df = None
119
120
        self._sim_params = sim_params
121
        if self._sim_params is not None:
122
            self._data_frequency = self._sim_params.data_frequency
123
124
        self._sid_path_func = sid_path_func
125
126
        self.DAILY_PRICE_ADJUSTMENT_FACTOR = 0.001
127
        self.MINUTE_PRICE_ADJUSTMENT_FACTOR = 0.001
128
129
    def handle_extra_source(self, source_df):
130
        """
131
        Extra sources always have a sid column.
132
133
        We expand the given data (by forward filling) to the full range of
134
        the simulation dates, so that lookup is fast during simulation.
135
        """
136
        if source_df is None:
137
            return
138
139
        self._fetcher_df = source_df
140
141
        # source_df's sid column can either consist of assets we know about
142
        # (such as sid(24)) or of assets we don't know about (such as
143
        # palladium).
144
        #
145
        # In both cases, we break up the dataframe into individual dfs
146
        # that only contain a single asset's information.  ie, if source_df
147
        # has data for PALLADIUM and GOLD, we split source_df into two
148
        # dataframes, one for each. (same applies if source_df has data for
149
        # AAPL and IBM).
150
        #
151
        # We then take each child df and reindex it to the simulation's date
152
        # range by forward-filling missing values. this makes reads simpler.
153
        #
154
        # Finally, we store the data. For each fetcher column, we store a
155
        # mapping in self.augmented_sources_map from it to a dictionary of
156
        # asset -> df.  In other words,
157
        # self.augmented_sources_map['days_to_cover']['AAPL'] gives us the df
158
        # holding that data.
159
160
        if self._sim_params.emission_rate == "daily":
161
            fetcher_date_index = self.env.days_in_range(
162
                start=self._sim_params.period_start,
163
                end=self._sim_params.period_end
164
            )
165
        else:
166
            fetcher_date_index = self.env.minutes_for_days_in_range(
167
                start=self._sim_params.period_start,
168
                end=self._sim_params.period_end
169
            )
170
171
        # break the source_df up into one dataframe per sid.  this lets
172
        # us (more easily) calculate accurate start/end dates for each sid,
173
        # de-dup data, and expand the data to fit the backtest start/end date.
174
        grouped_by_sid = source_df.groupby(["sid"])
175
        group_names = grouped_by_sid.groups.keys()
176
        group_dict = {}
177
        for group_name in group_names:
178
            group_dict[group_name] = grouped_by_sid.get_group(group_name)
179
180
        for identifier, df in group_dict.iteritems():
181
            # before reindexing, save the earliest and latest dates
182
            earliest_date = df.index[0]
183
            latest_date = df.index[-1]
184
185
            # since we know this df only contains a single sid, we can safely
186
            # de-dupe by the index (dt)
187
            df = df.groupby(level=0).last()
188
189
            # reindex the dataframe based on the backtest start/end date.
190
            # this makes reads easier during the backtest.
191
            df = df.reindex(index=fetcher_date_index, method='ffill')
192
193
            if not isinstance(identifier, Asset):
194
                # for fake assets we need to store a start/end date
195
                self._asset_start_dates[identifier] = earliest_date
196
                self._asset_end_dates[identifier] = latest_date
197
198
            for col_name in df.columns.difference(['sid']):
199
                if col_name not in self._augmented_sources_map:
200
                    self._augmented_sources_map[col_name] = {}
201
202
                self._augmented_sources_map[col_name][identifier] = df
203
204
    def _open_daily_file(self):
205
        if self._daily_equities_data is None:
206
            self._daily_equities_data = bcolz.open(self._daily_equities_path)
207
            self.daily_equities_attrs = self._daily_equities_data.attrs
208
209
        return self._daily_equities_data, self.daily_equities_attrs
210
211
    def _open_minute_file(self, field, sid):
212
        if self._sid_path_func is None:
213
            path = "{0}/{1}.bcolz".format(self._minutes_equities_path, sid)
214
        else:
215
            path = self._sid_path_func(self._minutes_equities_path, sid)
216
217
        try:
218
            carray = self._carrays[field][path]
219
        except KeyError:
220
            carray = self._carrays[field][path] = bcolz.carray(
221
                rootdir=path + "/" + field, mode='r')
222
223
        return carray
224
225
    def get_previous_value(self, asset, field, dt):
226
        """
227
        Given an asset and a column and a dt, returns the previous value for
228
        the same asset/column pair.  If this data portal is in minute mode,
229
        it's the previous minute value, otherwise it's the previous day's
230
        value.
231
232
        Parameters
233
        ---------
234
        asset : Asset
235
            The asset whose data is desired.
236
237
        field: string
238
            The desired field of the asset.  Valid values are "open",
239
            "open_price", "high", "low", "close", "close_price", "volume", and
240
            "price".
241
242
        dt: pd.Timestamp
243
            The timestamp from which to go back in time one slot.
244
245
        Returns
246
        -------
247
        The value of the desired field at the desired time.
248
        """
249
        if self._data_frequency == 'daily':
250
            prev_dt = self.env.previous_trading_day(dt)
251
        elif self._data_frequency == 'minute':
252
            prev_dt = self.env.previous_market_minute(dt)
253
254
        return self.get_spot_value(asset, field, prev_dt)
255
256
    def _check_fetcher(self, asset, column, day):
257
        # if there is a fetcher column called "price", only look at it if
258
        # it's on something like palladium and not AAPL (since our own price
259
        # data always wins when dealing with assets)
260
        look_in_augmented_sources = column in self._augmented_sources_map and \
261
            not (column in BASE_FIELDS and isinstance(asset, Asset))
262
263
        if look_in_augmented_sources:
264
            # we're being asked for a fetcher field
265
            try:
266
                return self._augmented_sources_map[column][asset].\
267
                    loc[day, column]
268
            except:
269
                log.error(
270
                    "Could not find value for asset={0}, current_day={1},"
271
                    "column={2}".format(
272
                        str(asset),
273
                        str(self.current_day),
274
                        str(column)))
275
276
                raise KeyError
277
278
    def get_spot_value(self, asset, field, dt=None):
279
        """
280
        Public API method that returns a scalar value representing the value
281
        of the desired asset's field at either the given dt, or this data
282
        portal's current_dt.
283
284
        Parameters
285
        ---------
286
        asset : Asset
287
            The asset whose data is desired.
288
289
        field: string
290
            The desired field of the asset.  Valid values are "open",
291
            "open_price", "high", "low", "close", "close_price", "volume", and
292
            "price".
293
294
        dt: pd.Timestamp
295
            (Optional) The timestamp for the desired value.
296
297
        Returns
298
        -------
299
        The value of the desired field at the desired time.
300
        """
301
        fetcher_val = self._check_fetcher(asset, field,
302
                                          (dt or self.current_dt))
303
304
        if fetcher_val is not None:
305
            return fetcher_val
306
307
        if field not in BASE_FIELDS:
308
            raise KeyError("Invalid column: " + str(field))
309
310
        asset_int = int(asset)
311
        column_to_use = BASE_FIELDS[field]
312
313
        self._check_is_currently_alive(asset_int, dt)
314
315
        if self._data_frequency == "daily":
316
            day_to_use = dt or self.current_day
317
            return self._get_daily_data(asset_int, column_to_use, day_to_use)
318
        else:
319
            # keeping minute data logic in-lined to avoid the cost of calling
320
            # another method.
321
322
            # all our minute bcolz files are written starting on 1/2/2002,
323
            # with 390 minutes per day, regarding of when the security started
324
            # trading. This lets us avoid doing an offset calculation related
325
            # to the asset start date.  Hard-coding 390 minutes per day lets us
326
            # ignore half days.
327
            carray = self._open_minute_file(column_to_use, asset_int)
328
329
            if dt is None or dt == self.current_dt:
330
                minute_offset_to_use = self.cur_data_offset
331
            else:
332
                # this is the slow path.
333
                # dt was passed in, so calculate the offset.
334
                # = (390 * number of trading days since 1/2/2002) +
335
                #   (index of minute in day)
336
                given_day = pd.Timestamp(dt.date(), tz='utc')
337
                day_index = tradingcalendar.trading_days.searchsorted(
338
                    given_day) - INDEX_OF_FIRST_TRADING_DAY
339
340
                # if dt is before the first market minute, minute_index
341
                # will be 0.  if it's after the last market minute, it'll
342
                # be len(minutes_for_day)
343
                minute_index = self.env.market_minutes_for_day(given_day).\
344
                    searchsorted(dt)
345
346
                minute_offset_to_use = (day_index * 390) + minute_index
347
348
            result = carray[minute_offset_to_use]
349
            if result == 0:
350
                # if the given minute doesn't have data, we need to seek
351
                # backwards until we find data. This makes the data
352
                # forward-filled.
353
354
                # get this asset's start date, so that we don't look before it.
355
                start_date = self._get_asset_start_date(asset_int)
356
                start_date_idx = tradingcalendar.trading_days.searchsorted(
357
                    start_date) - INDEX_OF_FIRST_TRADING_DAY
358
                start_day_offset = start_date_idx * 390
359
360
                original_start = minute_offset_to_use
361
362
                while result == 0 and minute_offset_to_use > start_day_offset:
363
                    minute_offset_to_use -= 1
364
                    result = carray[minute_offset_to_use]
365
366
                # once we've found data, we need to check whether it needs
367
                # to be adjusted.
368
                if result != 0:
369
                    minutes = self.env.market_minute_window(
370
                        start=(dt or self.current_dt),
371
                        count=(original_start - minute_offset_to_use + 1),
372
                        step=-1
373
                    ).order()
374
375
                    # only need to check for adjustments if we've gone back
376
                    # far enough to cross the day boundary.
377
                    if minutes[0].date() != minutes[-1].date():
378
                        # create a np array of size minutes, fill it all with
379
                        # the same value.  and adjust the array.
380
                        arr = np.array([result] * len(minutes),
381
                                       dtype=np.float64)
382
                        self._apply_all_adjustments(
383
                            data=arr,
384
                            sid=asset_int,
385
                            dts=minutes,
386
                            field=column_to_use
387
                        )
388
389
                        # The first value of the adjusted array is the value
390
                        # we want.
391
                        result = arr[0]
392
393
            if column_to_use != 'volume':
394
                return result * self.MINUTE_PRICE_ADJUSTMENT_FACTOR
395
            else:
396
                return result
397
398
    def _get_daily_data(self, asset_int, column, dt):
399
        dt = pd.Timestamp(dt.date(), tz='utc')
400
        daily_data, daily_attrs = self._open_daily_file()
401
402
        # find the start index in the daily file for this asset
403
        asset_file_index = daily_attrs['first_row'][str(asset_int)]
404
405
        # find when the asset started trading
406
        asset_data_start_date = max(self._get_asset_start_date(asset_int),
407
                                    FIRST_TRADING_DAY)
408
409
        tradingdays = tradingcalendar.trading_days
410
411
        # figure out how many days it's been between now and when this
412
        # asset starting trading
413
        # FIXME can cache tradingdays.searchsorted(asset_data_start_date)
414
        window_offset = tradingdays.searchsorted(dt) - \
415
            tradingdays.searchsorted(asset_data_start_date)
416
417
        # and use that offset to find our lookup index
418
        lookup_idx = asset_file_index + window_offset
419
420
        # sanity check
421
        assert lookup_idx >= asset_file_index
422
        assert lookup_idx <= daily_attrs['last_row'][str(asset_int)] + 1
423
424
        ctable = daily_data[column]
425
        raw_value = ctable[lookup_idx]
426
427
        while raw_value == 0 and lookup_idx > asset_file_index:
428
            lookup_idx -= 1
429
            raw_value = ctable[lookup_idx]
430
431
        if column != 'volume':
432
            return raw_value * self.DAILY_PRICE_ADJUSTMENT_FACTOR
433
        else:
434
            return raw_value
435
436
    def _get_history_daily_window(self, sids, end_dt, bar_count, field_to_use):
437
        """
438
        Internal method that returns a dataframe containing history bars
439
        of daily frequency for the given sids.
440
        """
441
        data = []
442
443
        day = end_dt.date()
444
        day_idx = tradingcalendar.trading_days.searchsorted(day)
445
        days_for_window = tradingcalendar.trading_days[
446
            (day_idx - bar_count + 1):(day_idx + 1)]
447
448
        ends_at_midnight = end_dt.hour == 0 and end_dt.minute == 0
449
450
        if len(sids) == 0:
451
            return pd.DataFrame(None,
452
                                index=days_for_window,
453
                                columns=None)
454
455
        for sid in sids:
456
            sid = int(sid)
457
458
            # get the start and end dates for this sid
459
            if sid not in self._asset_start_dates:
460
                asset = self._asset_finder.retrieve_asset(sid)
461
                self._asset_start_dates[sid] = asset.start_date
462
                self._asset_end_dates[sid] = asset.end_date
463
464
            if ends_at_midnight or \
465
                    (days_for_window[-1] > self._asset_end_dates[sid]):
466
                # two cases where we use daily data for the whole range:
467
                # 1) the history window ends at midnight utc.
468
                # 2) the last desired day of the window is after the
469
                # last trading day, use daily data for the whole range.
470
                data.append(self._get_daily_window_for_sid(
471
                    sid,
472
                    field_to_use,
473
                    days_for_window,
474
                    extra_slot=False
475
                ))
476
            else:
477
                # for the last day of the desired window, use minute
478
                # data and aggregate it.
479
                all_minutes_for_day = self.env.market_minutes_for_day(
480
                    pd.Timestamp(day))
481
482
                last_minute_idx = all_minutes_for_day.searchsorted(end_dt)
483
484
                # these are the minutes for the partial day
485
                minutes_for_partial_day =\
486
                    all_minutes_for_day[0:(last_minute_idx + 1)]
487
488
                daily_data = self._get_daily_window_for_sid(
489
                    sid,
490
                    field_to_use,
491
                    days_for_window[0:-1]
492
                )
493
494
                minute_data = self._get_minute_window_for_sid(
495
                    sid,
496
                    field_to_use,
497
                    minutes_for_partial_day
498
                )
499
500
                if field_to_use == 'volume':
501
                    minute_value = np.sum(minute_data)
502
                elif field_to_use == 'open':
503
                    minute_value = minute_data[0]
504
                elif field_to_use == 'close':
505
                    minute_value = minute_data[-1]
506
                elif field_to_use == 'high':
507
                    minute_value = np.amax(minute_data)
508
                elif field_to_use == 'low':
509
                    minute_value = np.amin(minute_data)
510
511
                # append the partial day.
512
                daily_data[-1] = minute_value
513
514
                data.append(daily_data)
515
516
        return pd.DataFrame(np.array(data).T,
517
                            index=days_for_window,
518
                            columns=sids)
519
520
    def _get_history_minute_window(self, sids, end_dt, bar_count,
521
                                   field_to_use):
522
        """
523
        Internal method that returns a dataframe containing history bars
524
        of minute frequency for the given sids.
525
        """
526
        # get all the minutes for this window
527
        minutes_for_window = self.env.market_minute_window(
528
            end_dt, bar_count, step=-1)[::-1]
529
530
        # but then cut it down to only the minutes after
531
        # FIRST_TRADING_MINUTE
532
        modified_minutes_for_window = minutes_for_window[
533
            minutes_for_window.slice_indexer(FIRST_TRADING_MINUTE)]
534
535
        modified_minutes_length = len(modified_minutes_for_window)
536
537
        if modified_minutes_length == 0:
538
            raise ValueError("Cannot calculate history window that ends"
539
                             "before 2002-01-02 14:31 UTC!")
540
541
        data = []
542
        bars_to_prepend = 0
543
        nans_to_prepend = None
544
545
        if modified_minutes_length < bar_count and \
546
           (modified_minutes_for_window[0] == FIRST_TRADING_MINUTE):
547
            # the beginning of the window goes before our global trading
548
            # start date
549
            bars_to_prepend = bar_count - modified_minutes_length
550
            nans_to_prepend = np.repeat(np.nan, bars_to_prepend)
551
552
        if len(sids) == 0:
553
            return pd.DataFrame(
554
                None,
555
                index=modified_minutes_for_window,
556
                columns=None
557
            )
558
559
        for sid in sids:
560
            sid_minute_data = self._get_minute_window_for_sid(
561
                int(sid),
562
                field_to_use,
563
                modified_minutes_for_window
564
            )
565
566
            if bars_to_prepend != 0:
567
                sid_minute_data = np.insert(sid_minute_data, 0,
568
                                            nans_to_prepend)
569
570
            data.append(sid_minute_data)
571
572
        return pd.DataFrame(np.array(data).T,
573
                            index=minutes_for_window,
574
                            columns=sids)
575
576
    def get_history_window(self, sids, end_dt, bar_count, frequency, field,
577
                           ffill=True):
578
        """
579
        Public API method that returns a dataframe containing the requested
580
        history window.  Data is fully adjusted.
581
582
        Parameters
583
        ---------
584
        sids : list
585
            The sids whose data is desired.
586
587
        bar_count: int
588
            The number of bars desired.
589
590
        frequency: string
591
            "1d" or "1m"
592
593
        field: string
594
            The desired field of the asset.
595
596
        ffill: boolean
597
            Forward-fill missing values. Only has effect if field
598
            is 'price'.
599
600
        Returns
601
        -------
602
        A dataframe containing the requested data.
603
        """
604
        try:
605
            field_to_use = BASE_FIELDS[field]
606
        except KeyError:
607
            raise ValueError("Invalid history field: " + str(field))
608
609
        if frequency == "1d":
610
            df = self._get_history_daily_window(sids, end_dt, bar_count,
611
                                                field_to_use)
612
        elif frequency == "1m":
613
            df = self._get_history_minute_window(sids, end_dt, bar_count,
614
                                                 field_to_use)
615
        else:
616
            raise ValueError("Invalid frequency: {0}".format(frequency))
617
618
        # forward-fill if needed
619
        if field == "price" and ffill:
620
            df.fillna(method='ffill', inplace=True)
621
622
        return df
623
624
    def _get_minute_window_for_sid(self, sid, field, minutes_for_window):
625
        """
626
        Internal method that gets a window of adjusted minute data for a sid
627
        and specified date range.  Used to support the history API method for
628
        minute bars.
629
630
        Missing bars are filled with NaN.
631
632
        Parameters
633
        ----------
634
        sid : int
635
            The sid whose data is desired.
636
637
        field: string
638
            The specific field to return.  "open", "high", "close_price", etc.
639
640
        minutes_for_window: pd.DateTimeIndex
641
            The list of minutes representing the desired window.  Each minute
642
            is a pd.Timestamp.
643
644
        Returns
645
        -------
646
        A numpy array with requested values.
647
        """
648
        # each sid's minutes are stored in a bcolz file
649
        # the bcolz file has 390 bars per day, starting at 1/2/2002, regardless
650
        # of when the asset started trading and regardless of half days.
651
        # for a half day, the second half is filled with zeroes.
652
653
        # find the position of start_dt in the entire timeline, go back
654
        # bar_count bars, and that's the unadjusted data
655
        raw_data = self._open_minute_file(field, sid)
656
657
        start_idx = max(self._find_position_of_minute(minutes_for_window[0]),
658
                        0)
659
        end_idx = self._find_position_of_minute(minutes_for_window[-1]) + 1
660
661
        return_data = np.zeros(len(minutes_for_window), dtype=np.float64)
662
        data_to_copy = raw_data[start_idx:end_idx]
663
664
        num_minutes = len(minutes_for_window)
665
666
        # data_to_copy contains all the zeros (from 1pm to 4pm of an early
667
        # close).  num_minutes is the number of actual trading minutes.  if
668
        # these two have different lengths, that means that we need to trim
669
        # away data due to early closes.
670
        if len(data_to_copy) != num_minutes:
671
            # get a copy of the minutes in Eastern time, since we depend on
672
            # an early close being at 1pm Eastern.
673
            eastern_minutes = minutes_for_window.tz_convert("US/Eastern")
674
675
            # accumulate a list of indices of the last minute of an early
676
            # close day.  For example, if data_to_copy starts at 12:55 pm, and
677
            # there are five minutes of real data before 180 zeroes, we would
678
            # put 5 into last_minute_idx_of_early_close_day, because the fifth
679
            # minute is the last "real" minute of the day.
680
            last_minute_idx_of_early_close_day = []
681
            for minute_idx, minute_dt in enumerate(eastern_minutes):
682
                if minute_idx == (num_minutes - 1):
683
                    break
684
685
                if minute_dt.hour == 13 and minute_dt.minute == 0:
686
                    next_minute = eastern_minutes[minute_idx + 1]
687
                    if next_minute.hour != 13:
688
                        # minute_dt is the last minute of an early close day
689
                        last_minute_idx_of_early_close_day.append(minute_idx)
690
691
            # spin through the list of early close markers, and use them to
692
            # chop off 180 minutes at a time from data_to_copy.
693
            for idx, early_close_minute_idx in \
694
                    enumerate(last_minute_idx_of_early_close_day):
695
                early_close_minute_idx -= (180 * idx)
696
                data_to_copy = np.delete(
697
                    data_to_copy,
698
                    range(
699
                        early_close_minute_idx + 1,
700
                        early_close_minute_idx + 181
701
                    )
702
                )
703
704
        return_data[0:len(data_to_copy)] = data_to_copy
705
706
        self._apply_all_adjustments(
707
            return_data,
708
            sid,
709
            minutes_for_window,
710
            field,
711
            self.MINUTE_PRICE_ADJUSTMENT_FACTOR
712
        )
713
714
        return return_data
715
716
    def _apply_all_adjustments(self, data, sid, dts, field,
717
                               price_adj_factor=1.0):
718
        """
719
        Internal method that applies all the necessary adjustments on the
720
        given data array.
721
722
        The adjustments are:
723
        - splits
724
        - if field != "volume":
725
            - mergers
726
            - dividends
727
            - * 0.001
728
            - any zero fields replaced with NaN
729
        - all values rounded to 3 digits after the decimal point.
730
731
        Parameters
732
        ----------
733
        data : np.array
734
            The data to be adjusted.
735
736
        sid: integer
737
            The sid whose data is being adjusted.
738
739
        dts: pd.DateTimeIndex
740
            The list of minutes or days representing the desired window.
741
742
        field: string
743
            The field whose values are in the data array.
744
745
        price_adj_factor: float
746
            Factor with which to adjust OHLC values.
747
        Returns
748
        -------
749
        None.  The data array is modified in place.
750
        """
751
        self._apply_adjustments_to_window(
752
            self._get_adjustment_list(
753
                sid, self._splits_dict, "SPLITS"
754
            ),
755
            data,
756
            dts,
757
            field != 'volume'
758
        )
759
760
        if field != 'volume':
761
            self._apply_adjustments_to_window(
762
                self._get_adjustment_list(
763
                    sid, self._mergers_dict, "MERGERS"
764
                ),
765
                data,
766
                dts,
767
                True
768
            )
769
770
            self._apply_adjustments_to_window(
771
                self._get_adjustment_list(
772
                    sid, self._dividends_dict, "DIVIDENDS"
773
                ),
774
                data,
775
                dts,
776
                True
777
            )
778
779
            data *= price_adj_factor
780
781
            # if anything is zero, it's a missing bar, so replace it with NaN.
782
            # we only want to do this for non-volume fields, because a missing
783
            # volume should be 0.
784
            data[data == 0] = np.NaN
785
786
        np.around(data, 3, out=data)
787
788
    @staticmethod
789
    def _find_position_of_minute(minute_dt):
790
        """
791
        Internal method that returns the position of the given minute in the
792
        list of every trading minute since market open on 1/2/2002.
793
794
        IMPORTANT: This method assumes every day is 390 minutes long, even
795
        early closes.  Our minute bcolz files are generated like this to
796
        support fast lookup.
797
798
        ex. this method would return 2 for 1/2/2002 9:32 AM Eastern.
799
800
        Parameters
801
        ----------
802
        minute_dt: pd.Timestamp
803
            The minute whose position should be calculated.
804
805
        Returns
806
        -------
807
        The position of the given minute in the list of all trading minutes
808
        since market open on 1/2/2002.
809
        """
810
        day = minute_dt.date()
811
        day_idx = tradingcalendar.trading_days.searchsorted(day) -\
812
            INDEX_OF_FIRST_TRADING_DAY
813
814
        day_open = pd.Timestamp(
815
            datetime(
816
                year=day.year,
817
                month=day.month,
818
                day=day.day,
819
                hour=9,
820
                minute=31),
821
            tz='US/Eastern').tz_convert('UTC')
822
823
        minutes_offset = int((minute_dt - day_open).total_seconds()) / 60
824
825
        return int((390 * day_idx) + minutes_offset)
826
827
    def _get_daily_window_for_sid(self, sid, field, days_in_window,
828
                                  extra_slot=True):
829
        """
830
        Internal method that gets a window of adjusted daily data for a sid
831
        and specified date range.  Used to support the history API method for
832
        daily bars.
833
834
        Parameters
835
        ----------
836
        sid : int
837
            The sid whose data is desired.
838
839
        start_dt: pandas.Timestamp
840
            The start of the desired window of data.
841
842
        bar_count: int
843
            The number of days of data to return.
844
845
        field: string
846
            The specific field to return.  "open", "high", "close_price", etc.
847
848
        extra_slot: boolean
849
            Whether to allocate an extra slot in the returned numpy array.
850
            This extra slot will hold the data for the last partial day.  It's
851
            much better to create it here than to create a copy of the array
852
            later just to add a slot.
853
854
        Returns
855
        -------
856
        A numpy array with requested values.  Any missing slots filled with
857
        nan.
858
859
        """
860
        daily_data, daily_attrs = self._open_daily_file()
861
862
        # the daily file stores each sid's daily OHLCV in a contiguous block.
863
        # the first row per sid is either 1/2/2002, or the sid's start_date if
864
        # it started after 1/2/2002.  once a sid stops trading, there are no
865
        # rows for it.
866
867
        bar_count = len(days_in_window)
868
869
        # create an np.array of size bar_count
870
        if extra_slot:
871
            return_array = np.zeros((bar_count + 1,))
872
        else:
873
            return_array = np.zeros((bar_count,))
874
875
        return_array[:] = np.NAN
876
877
        # find the start index in the daily file for this asset
878
        asset_file_index = daily_attrs['first_row'][str(sid)]
879
880
        trading_days = tradingcalendar.trading_days
881
882
        # Calculate the starting day to use (either the asset's first trading
883
        # day, or 1/1/2002 (which is the 3028th day in the trading calendar).
884
        first_trading_day_to_use = max(trading_days.searchsorted(
885
            self._asset_start_dates[sid]), INDEX_OF_FIRST_TRADING_DAY)
886
887
        # find the # of trading days between max(asset's first trade date,
888
        # 2002-01-02) and start_dt
889
        window_offset = (trading_days.searchsorted(days_in_window[0]) -
890
                         first_trading_day_to_use)
891
892
        start_index = max(asset_file_index, asset_file_index + window_offset)
893
894
        if window_offset < 0 and (abs(window_offset) > bar_count):
895
            # consumer is requesting a history window that starts AND ends
896
            # before this equity started trading, so gtfo
897
            return return_array
898
899
        # find the end index in the daily file. make sure it doesn't extend
900
        # past the end of this asset's data in the daily file.
901
        if window_offset < 0:
902
            # if the window_offset is negative, we need to decrease the
903
            # end_index accordingly.
904
            end_index = min(start_index + window_offset + bar_count,
905
                            daily_attrs['last_row'][str(sid)] + 1)
906
907
            # get data from bcolz file
908
            data = daily_data[field][start_index:end_index]
909
910
            # have to leave a bunch of empty slots at the beginning of
911
            # return_array, since they represent days before this asset
912
            # started trading.
913
            return_array[abs(window_offset):bar_count] = data
914
        else:
915
            end_index = min(start_index + bar_count,
916
                            daily_attrs['last_row'][str(sid)])
917
            data = daily_data[field][start_index:(end_index + 1)]
918
919
            if len(data) > len(return_array):
920
                return_array[:] = data[0:len(return_array)]
921
            else:
922
                return_array[0:len(data)] = data
923
924
        self._apply_all_adjustments(
925
            return_array,
926
            sid,
927
            days_in_window,
928
            field,
929
            self.DAILY_PRICE_ADJUSTMENT_FACTOR
930
        )
931
932
        return return_array
933
934
    @staticmethod
935
    def _apply_adjustments_to_window(adjustments_list, window_data,
936
                                     dts_in_window, multiply):
937
        if len(adjustments_list) == 0:
938
            return
939
940
        # advance idx to the correct spot in the adjustments list, based on
941
        # when the window starts
942
        idx = 0
943
944
        while idx < len(adjustments_list) and dts_in_window[0] >\
945
                adjustments_list[idx][0]:
946
            idx += 1
947
948
        # if we've advanced through all the adjustments, then there's nothing
949
        # to do.
950
        if idx == len(adjustments_list):
951
            return
952
953
        while idx < len(adjustments_list):
954
            adjustment_to_apply = adjustments_list[idx]
955
956
            if adjustment_to_apply[0] > dts_in_window[-1]:
957
                break
958
959
            range_end = dts_in_window.searchsorted(adjustment_to_apply[0])
960
            if multiply:
961
                window_data[0:range_end] *= adjustment_to_apply[1]
962
            else:
963
                window_data[0:range_end] /= adjustment_to_apply[1]
964
965
            idx += 1
966
967
    def _get_adjustment_list(self, sid, adjustments_dict, table_name):
968
        """
969
        Internal method that returns a list of adjustments for the given sid.
970
971
        Parameters
972
        ----------
973
        sid : int
974
            The asset for which to return adjustments.
975
976
        adjustments_dict: dict
977
            A dictionary of sid -> list that is used as a cache.
978
979
        table_name: string
980
            The table that contains this data in the adjustments db.
981
982
        Returns
983
        -------
984
        adjustments: list
985
            A list of [multiplier, pd.Timestamp], earliest first
986
987
        """
988
        if self._adjustment_reader is None:
989
            return []
990
991
        if sid not in adjustments_dict:
992
            adjustments_for_sid = self._adjustment_reader.\
993
                get_adjustments_for_sid(table_name, sid)
994
            adjustments_dict[sid] = adjustments_for_sid
995
996
        return adjustments_dict[sid]
997
998
    def get_equity_price_view(self, asset):
999
        """
1000
        Returns a DataPortalSidView for the given asset.  Used to support the
1001
        data[sid(N)] public API.  Not needed if DataPortal is used standalone.
1002
1003
        Parameters
1004
        ----------
1005
        asset : Asset
1006
            Asset that is being queried.
1007
1008
        Returns
1009
        -------
1010
        DataPortalSidView: Accessor into the given asset's data.
1011
        """
1012
        try:
1013
            view = self.views[asset]
1014
        except KeyError:
1015
            view = self.views[asset] = DataPortalSidView(asset, self)
1016
1017
        return view
1018
1019
    def _check_is_currently_alive(self, name, dt):
1020
        if dt is None:
1021
            dt = self.current_day
1022
1023
        if name not in self._asset_start_dates:
1024
            self._get_asset_start_date(name)
1025
1026
        start_date = self._asset_start_dates[name]
1027
        if self._asset_start_dates[name] > dt:
1028
            raise NoTradeDataAvailableTooEarly(
1029
                sid=name,
1030
                dt=dt,
1031
                start_dt=start_date
1032
            )
1033
1034
        end_date = self._asset_end_dates[name]
1035
        if self._asset_end_dates[name] < dt:
1036
            raise NoTradeDataAvailableTooLate(
1037
                sid=name,
1038
                dt=dt,
1039
                end_dt=end_date
1040
            )
1041
1042
    def _get_asset_start_date(self, sid):
1043
        if sid not in self._asset_start_dates:
1044
            asset = self._asset_finder.retrieve_asset(sid)
1045
            self._asset_start_dates[sid] = asset.start_date
1046
            self._asset_end_dates[sid] = asset.end_date
1047
1048
        return self._asset_start_dates[sid]
1049
1050
    def get_splits(self, sids, dt):
1051
        """
1052
        Returns any splits for the given sids and the given dt.
1053
1054
        Parameters
1055
        ----------
1056
        sids : list
1057
            Sids for which we want splits.
1058
1059
        dt: pd.Timestamp
1060
            The date for which we are checking for splits.  Note: this is
1061
            expected to be midnight UTC.
1062
1063
        Returns
1064
        -------
1065
        list: List of splits, where each split is a (sid, ratio) tuple.
1066
        """
1067
        if self._adjustment_reader is None or len(sids) == 0:
1068
            return {}
1069
1070
        # convert dt to # of seconds since epoch, because that's what we use
1071
        # in the adjustments db
1072
        seconds = int(dt.value / 1e9)
1073
1074
        splits = self._adjustment_reader.conn.execute(
1075
            "SELECT sid, ratio FROM SPLITS WHERE effective_date = ?",
1076
            (seconds,)).fetchall()
1077
1078
        sids_set = set(sids)
1079
        splits = [split for split in splits if split[0] in sids_set]
1080
1081
        return splits
1082
1083
    def get_stock_dividends(self, sid, trading_days):
1084
        """
1085
        Returns all the stock dividends for a specific sid that occur
1086
        in the given trading range.
1087
1088
        Parameters
1089
        ----------
1090
        sid: int
1091
            The asset whose stock dividends should be returned.
1092
1093
        trading_days: pd.DatetimeIndex
1094
            The trading range.
1095
1096
        Returns
1097
        -------
1098
        list: A list of objects with all relevant attributes populated.
1099
        All timestamp fields are converted to pd.Timestamps.
1100
        """
1101
1102
        if self._adjustment_reader is None:
1103
            return []
1104
1105
        if len(trading_days) == 0:
1106
            return []
1107
1108
        start_dt = trading_days[0].value / 1e9
1109
        end_dt = trading_days[-1].value / 1e9
1110
1111
        dividends = self._adjustment_reader.conn.execute(
1112
            "SELECT * FROM stock_dividend_payouts WHERE sid = ? AND "
1113
            "ex_date > ? AND pay_date < ?", (int(sid), start_dt, end_dt,)).\
1114
            fetchall()
1115
1116
        dividend_info = []
1117
        for dividend_tuple in dividends:
1118
            dividend_info.append({
1119
                "declared_date": dividend_tuple[1],
1120
                "ex_date": pd.Timestamp(dividend_tuple[2], unit="s"),
1121
                "pay_date": pd.Timestamp(dividend_tuple[3], unit="s"),
1122
                "payment_sid": dividend_tuple[4],
1123
                "ratio": dividend_tuple[5],
1124
                "record_date": pd.Timestamp(dividend_tuple[6], unit="s"),
1125
                "sid": dividend_tuple[7]
1126
            })
1127
1128
        return dividend_info
1129
1130
    def contains(self, asset, field):
1131
        return field in BASE_FIELDS or \
1132
            (field in self._augmented_sources_map and
1133
             asset in self._augmented_sources_map[field])
1134
1135
    def get_fetcher_assets(self):
1136
        """
1137
        Returns a list of assets for the current date, as defined by the
1138
        fetcher data.
1139
1140
        Notes
1141
        -----
1142
        Data is forward-filled.  If there is no fetcher data defined for day
1143
        N, we use day N-1's data (if available, otherwise we keep going back).
1144
1145
        Returns
1146
        -------
1147
        list: a list of Asset objects.
1148
        """
1149
        # return a list of assets for the current date, as defined by the
1150
        # fetcher source
1151
        if self._fetcher_df is None:
1152
            return []
1153
1154
        if self.current_day in self._fetcher_df.index:
1155
            date_to_use = self.current_day
1156
        else:
1157
            # current day isn't in the fetcher df, go back the last
1158
            # available day
1159
            idx = self._fetcher_df.index.searchsorted(self.current_day)
1160
            if idx == 0:
1161
                return []
1162
1163
            date_to_use = self._fetcher_df.index[idx - 1]
1164
1165
        asset_list = self._fetcher_df.loc[date_to_use]["sid"]
1166
1167
        # make sure they're actually assets
1168
        asset_list = [asset for asset in asset_list
1169
                      if isinstance(asset, Asset)]
1170
1171
        return asset_list
1172
1173
1174
class DataPortalSidView(object):
1175
    def __init__(self, asset, portal):
1176
        self.asset = asset
1177
        self.portal = portal
1178
1179
    def __getattr__(self, column):
1180
        return self.portal.get_spot_value(self.asset, column)
1181
1182
    def __contains__(self, column):
1183
        return self.portal.contains(self.asset, column)
1184
1185
    def __getitem__(self, column):
1186
        return self.__getattr__(column)
1187