Completed
Pull Request — master (#858)
by Eddie
01:35
created

_get_minute_spot_value_future()   B

Complexity

Conditions 5

Size

Total Lines 29

Duplication

Lines 0
Ratio 0 %
Metric Value
cc 5
dl 0
loc 29
rs 8.0896
1
#
2
# Copyright 2015 Quantopian, Inc.
3
#
4
# Licensed under the Apache License, Version 2.0 (the "License");
5
# you may not use this file except in compliance with the License.
6
# You may obtain a copy of the License at
7
#
8
#     http://www.apache.org/licenses/LICENSE-2.0
9
#
10
# Unless required by applicable law or agreed to in writing, software
11
# distributed under the License is distributed on an "AS IS" BASIS,
12
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13
# See the License for the specific language governing permissions and
14
# limitations under the License.
15
16
from datetime import datetime
17
import bcolz
18
from logbook import Logger
19
20
import numpy as np
21
import pandas as pd
22
23
from zipline.assets import Asset, Future, Equity
24
25
from zipline.utils import tradingcalendar
26
from zipline.errors import (
27
    NoTradeDataAvailableTooEarly,
28
    NoTradeDataAvailableTooLate
29
)
30
31
# FIXME anything to do with 2002-01-02 probably belongs in qexec, right/
32
FIRST_TRADING_DAY = pd.Timestamp("2002-01-02 00:00:00", tz='UTC')
33
FIRST_TRADING_MINUTE = pd.Timestamp("2002-01-02 14:31:00", tz='UTC')
34
35
# FIXME should this be passed in (is this qexec specific?)?
36
INDEX_OF_FIRST_TRADING_DAY = 3028
37
38
log = Logger('DataPortal')
39
40
HISTORY_FREQUENCIES = ["1d", "1m"]
41
42
BASE_FIELDS = {
43
    'open': 'open',
44
    'open_price': 'open',
45
    'high': 'high',
46
    'low': 'low',
47
    'close': 'close',
48
    'close_price': 'close',
49
    'volume': 'volume',
50
    'price': 'close'
51
}
52
53
54
class DataPortal(object):
55
    def __init__(self,
56
                 env,
57
                 sim_params=None,
58
                 minutes_equities_path=None,
59
                 daily_equities_path=None,
60
                 adjustment_reader=None,
61
                 equity_sid_path_func=None,
62
                 futures_sid_path_func=None):
63
        self.env = env
64
65
        # Internal pointers to the current dt (can be minute) and current day.
66
        # In daily mode, they point to the same thing. In minute mode, it's
67
        # useful to have separate pointers to the current day and to the
68
        # current minute.  These pointers are updated by the
69
        # AlgorithmSimulator's transform loop.
70
        self.current_dt = None
71
        self.current_day = None
72
73
        # This is a bit ugly, but is here for performance reasons.  In minute
74
        # simulations, we need to very quickly go from dt -> (# of minutes
75
        # since Jan 1 2002 9:30 Eastern).
76
        #
77
        # The clock that heartbeats the simulation has all the necessary
78
        # information to do this calculation very quickly.  This value is
79
        # calculated there, and then set here
80
        self.cur_data_offset = 0
81
82
        self.views = {}
83
84
        if minutes_equities_path is None and daily_equities_path is None:
85
            raise ValueError("Must provide at least one of minute or "
86
                             "daily data path!")
87
88
        self._minutes_equities_path = minutes_equities_path
89
        self._daily_equities_path = daily_equities_path
90
        self._asset_finder = env.asset_finder
91
92
        self._carrays = {
93
            'open': {},
94
            'high': {},
95
            'low': {},
96
            'close': {},
97
            'volume': {},
98
            'sid': {},
99
            'dt': {},
100
        }
101
102
        self._adjustment_reader = adjustment_reader
103
104
        # caches of sid -> adjustment list
105
        self._splits_dict = {}
106
        self._mergers_dict = {}
107
        self._dividends_dict = {}
108
109
        # Pointer to the daily bcolz file.
110
        self._daily_equities_data = None
111
112
        # Cache of sid -> the first trading day of an asset, even if that day
113
        # is before 1/2/2002.
114
        self._asset_start_dates = {}
115
        self._asset_end_dates = {}
116
117
        # Fetcher state
118
        self._augmented_sources_map = {}
119
        self._fetcher_df = None
120
121
        self._sim_params = sim_params
122
        if self._sim_params is not None:
123
            self._data_frequency = self._sim_params.data_frequency
124
        else:
125
            self._data_frequency = "minute"
126
127
        self._equity_sid_path_func = equity_sid_path_func
128
        self._futures_sid_path_func = futures_sid_path_func
129
130
        self.DAILY_PRICE_ADJUSTMENT_FACTOR = 0.001
131
        self.MINUTE_PRICE_ADJUSTMENT_FACTOR = 0.001
132
133
    def handle_extra_source(self, source_df):
134
        """
135
        Extra sources always have a sid column.
136
137
        We expand the given data (by forward filling) to the full range of
138
        the simulation dates, so that lookup is fast during simulation.
139
        """
140
        if source_df is None:
141
            return
142
143
        self._fetcher_df = source_df
144
145
        # source_df's sid column can either consist of assets we know about
146
        # (such as sid(24)) or of assets we don't know about (such as
147
        # palladium).
148
        #
149
        # In both cases, we break up the dataframe into individual dfs
150
        # that only contain a single asset's information.  ie, if source_df
151
        # has data for PALLADIUM and GOLD, we split source_df into two
152
        # dataframes, one for each. (same applies if source_df has data for
153
        # AAPL and IBM).
154
        #
155
        # We then take each child df and reindex it to the simulation's date
156
        # range by forward-filling missing values. this makes reads simpler.
157
        #
158
        # Finally, we store the data. For each fetcher column, we store a
159
        # mapping in self.augmented_sources_map from it to a dictionary of
160
        # asset -> df.  In other words,
161
        # self.augmented_sources_map['days_to_cover']['AAPL'] gives us the df
162
        # holding that data.
163
164
        if self._sim_params.emission_rate == "daily":
165
            fetcher_date_index = self.env.days_in_range(
166
                start=self._sim_params.period_start,
167
                end=self._sim_params.period_end
168
            )
169
        else:
170
            fetcher_date_index = self.env.minutes_for_days_in_range(
171
                start=self._sim_params.period_start,
172
                end=self._sim_params.period_end
173
            )
174
175
        # break the source_df up into one dataframe per sid.  this lets
176
        # us (more easily) calculate accurate start/end dates for each sid,
177
        # de-dup data, and expand the data to fit the backtest start/end date.
178
        grouped_by_sid = source_df.groupby(["sid"])
179
        group_names = grouped_by_sid.groups.keys()
180
        group_dict = {}
181
        for group_name in group_names:
182
            group_dict[group_name] = grouped_by_sid.get_group(group_name)
183
184
        for identifier, df in group_dict.iteritems():
185
            # before reindexing, save the earliest and latest dates
186
            earliest_date = df.index[0]
187
            latest_date = df.index[-1]
188
189
            # since we know this df only contains a single sid, we can safely
190
            # de-dupe by the index (dt)
191
            df = df.groupby(level=0).last()
192
193
            # reindex the dataframe based on the backtest start/end date.
194
            # this makes reads easier during the backtest.
195
            df = df.reindex(index=fetcher_date_index, method='ffill')
196
197
            if not isinstance(identifier, Asset):
198
                # for fake assets we need to store a start/end date
199
                self._asset_start_dates[identifier] = earliest_date
200
                self._asset_end_dates[identifier] = latest_date
201
202
            for col_name in df.columns.difference(['sid']):
203
                if col_name not in self._augmented_sources_map:
204
                    self._augmented_sources_map[col_name] = {}
205
206
                self._augmented_sources_map[col_name][identifier] = df
207
208
    def _open_daily_file(self):
209
        if self._daily_equities_data is None:
210
            self._daily_equities_data = bcolz.open(self._daily_equities_path)
211
            self.daily_equities_attrs = self._daily_equities_data.attrs
212
213
        return self._daily_equities_data, self.daily_equities_attrs
214
215
    def _open_minute_file(self, field, asset):
216
        sid_str = str(int(asset))
217
218
        try:
219
            carray = self._carrays[field][sid_str]
220
        except KeyError:
221
            carray = self._carrays[field][sid_str] = \
222
                self._get_ctable(asset)[field]
223
224
        return carray
225
226
    def _get_ctable(self, asset):
227
        sid = int(asset)
228
229
        if isinstance(asset, Future) and \
230
                self._futures_sid_path_func is not None:
231
            path = self._futures_sid_path_func(
232
                self._minutes_equities_path, sid
233
            )
234
        elif isinstance(asset, Equity) and \
235
                self._equity_sid_path_func is not None:
236
            path = self._equity_sid_path_func(
237
                self._minutes_equities_path, sid
238
            )
239
        else:
240
            path = "{0}/{1}.bcolz".format(self._minutes_equities_path, sid)
241
242
        return bcolz.open(path, mode='r')
243
244
    def get_previous_value(self, asset, field, dt):
245
        """
246
        Given an asset and a column and a dt, returns the previous value for
247
        the same asset/column pair.  If this data portal is in minute mode,
248
        it's the previous minute value, otherwise it's the previous day's
249
        value.
250
251
        Parameters
252
        ---------
253
        asset : Asset
254
            The asset whose data is desired.
255
256
        field: string
257
            The desired field of the asset.  Valid values are "open",
258
            "open_price", "high", "low", "close", "close_price", "volume", and
259
            "price".
260
261
        dt: pd.Timestamp
262
            The timestamp from which to go back in time one slot.
263
264
        Returns
265
        -------
266
        The value of the desired field at the desired time.
267
        """
268
        if self._data_frequency == 'daily':
269
            prev_dt = self.env.previous_trading_day(dt)
270
        elif self._data_frequency == 'minute':
271
            prev_dt = self.env.previous_market_minute(dt)
272
273
        return self.get_spot_value(asset, field, prev_dt)
274
275
    def _check_fetcher(self, asset, column, day):
276
        # if there is a fetcher column called "price", only look at it if
277
        # it's on something like palladium and not AAPL (since our own price
278
        # data always wins when dealing with assets)
279
        look_in_augmented_sources = column in self._augmented_sources_map and \
280
            not (column in BASE_FIELDS and isinstance(asset, Asset))
281
282
        if look_in_augmented_sources:
283
            # we're being asked for a fetcher field
284
            try:
285
                return self._augmented_sources_map[column][asset].\
286
                    loc[day, column]
287
            except:
288
                log.error(
289
                    "Could not find value for asset={0}, current_day={1},"
290
                    "column={2}".format(
291
                        str(asset),
292
                        str(self.current_day),
293
                        str(column)))
294
295
                raise KeyError
296
297
    def get_spot_value(self, asset, field, dt=None):
298
        """
299
        Public API method that returns a scalar value representing the value
300
        of the desired asset's field at either the given dt, or this data
301
        portal's current_dt.
302
303
        Parameters
304
        ---------
305
        asset : Asset
306
            The asset whose data is desired.
307
308
        field: string
309
            The desired field of the asset.  Valid values are "open",
310
            "open_price", "high", "low", "close", "close_price", "volume", and
311
            "price".
312
313
        dt: pd.Timestamp
314
            (Optional) The timestamp for the desired value.
315
316
        Returns
317
        -------
318
        The value of the desired field at the desired time.
319
        """
320
        fetcher_val = self._check_fetcher(asset, field,
321
                                          (dt or self.current_dt))
322
323
        if fetcher_val is not None:
324
            return fetcher_val
325
326
        if field not in BASE_FIELDS:
327
            raise KeyError("Invalid column: " + str(field))
328
329
        column_to_use = BASE_FIELDS[field]
330
331
        if isinstance(asset, int):
332
            asset = self._asset_finder.retrieve_asset(asset)
333
334
        self._check_is_currently_alive(asset, dt)
335
336
        if self._data_frequency == "daily":
337
            day_to_use = dt or self.current_day
338
            return self._get_daily_data(asset, column_to_use, day_to_use)
339
        else:
340
            dt_to_use = dt or self.current_dt
341
342
            if isinstance(asset, Future):
343
                return self._get_minute_spot_value_future(
344
                    asset, column_to_use, dt_to_use)
345
            else:
346
                return self._get_minute_spot_value(
347
                    asset, column_to_use, dt_to_use)
348
349
    def _get_minute_spot_value_future(self, asset, column, dt):
350
        # Futures bcolz files have 1440 bars per day (24 hours), 7 days a week.
351
        # The file attributes contain the "start_dt" and "last_dt" fields,
352
        # which represent the time period for this bcolz file.
353
354
        # The start_dt is midnight of the first day that this future started
355
        # trading.
356
357
        # figure out the # of minutes between dt and this asset's start_dt
358
        start_date = self._get_asset_start_date(asset)
359
        minute_offset = int((dt - start_date).total_seconds() / 60)
360
361
        if minute_offset < 0:
362
            # asking for a date that is before the asset's start date, no dice
363
            return 0.0
364
365
        # then just index into the bcolz carray at that offset
366
        carray = self._open_minute_file(column, asset)
367
        result = carray[minute_offset]
368
369
        # if there's missing data, go backwards until we run out of file
370
        while result == 0 and minute_offset > 0:
371
            minute_offset -= 1
372
            result = carray[minute_offset]
373
374
        if column != 'volume':
375
            return result * self.MINUTE_PRICE_ADJUSTMENT_FACTOR
376
        else:
377
            return result
378
379
    def _get_minute_spot_value(self, asset, column, dt):
380
        # All our minute bcolz files are written starting on 1/2/2002,
381
        # with 390 minutes per day, regarding of when the security started
382
        # trading. This lets us avoid doing an offset calculation related
383
        # to the asset start date.  Hard-coding 390 minutes per day lets us
384
        # ignore half days.
385
386
        if dt == self.current_dt:
387
            minute_offset_to_use = self.cur_data_offset
388
        else:
389
            # this is the slow path.
390
            # dt was passed in, so calculate the offset.
391
            # = (390 * number of trading days since 1/2/2002) +
392
            #   (index of minute in day)
393
            given_day = pd.Timestamp(dt.date(), tz='utc')
394
            day_index = tradingcalendar.trading_days.searchsorted(
395
                given_day) - INDEX_OF_FIRST_TRADING_DAY
396
397
            # if dt is before the first market minute, minute_index
398
            # will be 0.  if it's after the last market minute, it'll
399
            # be len(minutes_for_day)
400
            minute_index = self.env.market_minutes_for_day(given_day).\
401
                searchsorted(dt)
402
403
            minute_offset_to_use = (day_index * 390) + minute_index
404
405
        carray = self._open_minute_file(column, asset)
406
        result = carray[minute_offset_to_use]
407
408
        if result == 0:
409
            # if the given minute doesn't have data, we need to seek
410
            # backwards until we find data. This makes the data
411
            # forward-filled.
412
413
            # get this asset's start date, so that we don't look before it.
414
            start_date = self._get_asset_start_date(asset)
415
            start_date_idx = tradingcalendar.trading_days.searchsorted(
416
                start_date) - INDEX_OF_FIRST_TRADING_DAY
417
            start_day_offset = start_date_idx * 390
418
419
            original_start = minute_offset_to_use
420
421
            while result == 0 and minute_offset_to_use > start_day_offset:
422
                minute_offset_to_use -= 1
423
                result = carray[minute_offset_to_use]
424
425
            # once we've found data, we need to check whether it needs
426
            # to be adjusted.
427
            if result != 0:
428
                minutes = self.env.market_minute_window(
429
                    start=(dt or self.current_dt),
430
                    count=(original_start - minute_offset_to_use + 1),
431
                    step=-1
432
                ).order()
433
434
                # only need to check for adjustments if we've gone back
435
                # far enough to cross the day boundary.
436
                if minutes[0].date() != minutes[-1].date():
437
                    # create a np array of size minutes, fill it all with
438
                    # the same value.  and adjust the array.
439
                    arr = np.array([result] * len(minutes),
440
                                   dtype=np.float64)
441
                    self._apply_all_adjustments(
442
                        data=arr,
443
                        asset=asset,
444
                        dts=minutes,
445
                        field=column
446
                    )
447
448
                    # The first value of the adjusted array is the value
449
                    # we want.
450
                    result = arr[0]
451
452
        if column != 'volume':
453
            return result * self.MINUTE_PRICE_ADJUSTMENT_FACTOR
454
        else:
455
            return result
456
457
    def _get_daily_data(self, asset, column, dt):
458
        dt = pd.Timestamp(dt.date(), tz='utc')
459
        daily_data, daily_attrs = self._open_daily_file()
460
461
        sid = int(asset)
462
463
        # find the start index in the daily file for this asset
464
        asset_file_index = daily_attrs['first_row'][str(sid)]
465
466
        # find when the asset started trading
467
        asset_data_start_date = max(self._get_asset_start_date(asset),
468
                                    FIRST_TRADING_DAY)
469
470
        tradingdays = tradingcalendar.trading_days
471
472
        # figure out how many days it's been between now and when this
473
        # asset starting trading
474
        # FIXME can cache tradingdays.searchsorted(asset_data_start_date)
475
        window_offset = tradingdays.searchsorted(dt) - \
476
            tradingdays.searchsorted(asset_data_start_date)
477
478
        # and use that offset to find our lookup index
479
        lookup_idx = asset_file_index + window_offset
480
481
        # sanity check
482
        assert lookup_idx >= asset_file_index
483
        assert lookup_idx <= daily_attrs['last_row'][str(sid)] + 1
484
485
        ctable = daily_data[column]
486
        raw_value = ctable[lookup_idx]
487
488
        while raw_value == 0 and lookup_idx > asset_file_index:
489
            lookup_idx -= 1
490
            raw_value = ctable[lookup_idx]
491
492
        if column != 'volume':
493
            return raw_value * self.DAILY_PRICE_ADJUSTMENT_FACTOR
494
        else:
495
            return raw_value
496
497
    def _get_history_daily_window(self, assets, end_dt, bar_count,
498
                                  field_to_use):
499
        """
500
        Internal method that returns a dataframe containing history bars
501
        of daily frequency for the given sids.
502
        """
503
        day_idx = tradingcalendar.trading_days.searchsorted(end_dt.date())
504
        days_for_window = tradingcalendar.trading_days[
505
            (day_idx - bar_count + 1):(day_idx + 1)]
506
507
        if len(assets) == 0:
508
            return pd.DataFrame(None,
509
                                index=days_for_window,
510
                                columns=None)
511
512
        data = []
513
514
        for asset in assets:
515
            if isinstance(asset, Future):
516
                data.append(self._get_history_daily_window_future(
517
                    asset, days_for_window, end_dt, field_to_use
518
                ))
519
            else:
520
                data.append(self._get_history_daily_window_equity(
521
                    asset, days_for_window, end_dt, field_to_use
522
                ))
523
524
        return pd.DataFrame(
525
            np.array(data).T,
526
            index=days_for_window,
527
            columns=assets
528
        )
529
530
    def _get_history_daily_window_future(self, asset, days_for_window,
531
                                         end_dt, column):
532
        # Since we don't have daily bcolz files for futures (yet), use minute
533
        # bars to calculate the daily values.
534
        data = []
535
        data_groups = []
536
537
        # get all the minutes for the days NOT including today
538
        for day in days_for_window[:-1]:
539
            minutes = self.env.market_minutes_for_day(day)
540
541
            values_for_day = np.zeros(len(minutes), dtype=np.float64)
542
543
            for idx, minute in enumerate(minutes):
544
                minute_val = self._get_minute_spot_value_future(
545
                    asset, column, minute
546
                )
547
548
                values_for_day[idx] = minute_val
549
550
            data_groups.append(values_for_day)
551
552
        # get the minutes for today
553
        last_day_minutes = pd.date_range(
554
            start=self.env.get_open_and_close(end_dt)[0],
555
            end=end_dt,
556
            freq="T"
557
        )
558
559
        values_for_last_day = np.zeros(len(last_day_minutes), dtype=np.float64)
560
561
        for idx, minute in enumerate(last_day_minutes):
562
            minute_val = self._get_minute_spot_value_future(
563
                asset, column, minute
564
            )
565
566
            values_for_last_day[idx] = minute_val
567
568
        data_groups.append(values_for_last_day)
569
570
        for group in data_groups:
571
            if len(group) == 0:
572
                continue
573
574
            if column == 'volume':
575
                data.append(np.sum(group))
576
            elif column == 'open':
577
                data.append(group[0])
578
            elif column == 'close':
579
                data.append(group[-1])
580
            elif column == 'high':
581
                data.append(np.amax(group))
582
            elif column == 'low':
583
                data.append(np.amin(group))
584
585
        return data
586
587
    def _get_history_daily_window_equity(self, asset, days_for_window,
588
                                         end_dt, field_to_use):
589
        sid = int(asset)
590
        ends_at_midnight = end_dt.hour == 0 and end_dt.minute == 0
591
592
        # get the start and end dates for this sid
593
        end_date = self._get_asset_end_date(asset)
594
595
        if ends_at_midnight or (days_for_window[-1] > end_date):
596
            # two cases where we use daily data for the whole range:
597
            # 1) the history window ends at midnight utc.
598
            # 2) the last desired day of the window is after the
599
            # last trading day, use daily data for the whole range.
600
            return self._get_daily_window_for_sid(
601
                asset,
602
                field_to_use,
603
                days_for_window,
604
                extra_slot=False
605
            )
606
        else:
607
            # for the last day of the desired window, use minute
608
            # data and aggregate it.
609
            all_minutes_for_day = self.env.market_minutes_for_day(
610
                pd.Timestamp(end_dt.date()))
611
612
            last_minute_idx = all_minutes_for_day.searchsorted(end_dt)
613
614
            # these are the minutes for the partial day
615
            minutes_for_partial_day =\
616
                all_minutes_for_day[0:(last_minute_idx + 1)]
617
618
            daily_data = self._get_daily_window_for_sid(
619
                sid,
620
                field_to_use,
621
                days_for_window[0:-1]
622
            )
623
624
            minute_data = self._get_minute_window_for_equity(
625
                sid,
626
                field_to_use,
627
                minutes_for_partial_day
628
            )
629
630
            if field_to_use == 'volume':
631
                minute_value = np.sum(minute_data)
632
            elif field_to_use == 'open':
633
                minute_value = minute_data[0]
634
            elif field_to_use == 'close':
635
                minute_value = minute_data[-1]
636
            elif field_to_use == 'high':
637
                minute_value = np.amax(minute_data)
638
            elif field_to_use == 'low':
639
                minute_value = np.amin(minute_data)
640
641
            # append the partial day.
642
            daily_data[-1] = minute_value
643
644
            return daily_data
645
646
    def _get_history_minute_window(self, assets, end_dt, bar_count,
647
                                   field_to_use):
648
        """
649
        Internal method that returns a dataframe containing history bars
650
        of minute frequency for the given sids.
651
        """
652
        # get all the minutes for this window
653
        minutes_for_window = self.env.market_minute_window(
654
            end_dt, bar_count, step=-1)[::-1]
655
656
        # but then cut it down to only the minutes after
657
        # FIRST_TRADING_MINUTE
658
        modified_minutes_for_window = minutes_for_window[
659
            minutes_for_window.slice_indexer(FIRST_TRADING_MINUTE)]
660
661
        modified_minutes_length = len(modified_minutes_for_window)
662
663
        if modified_minutes_length == 0:
664
            raise ValueError("Cannot calculate history window that ends"
665
                             "before 2002-01-02 14:31 UTC!")
666
667
        data = []
668
        bars_to_prepend = 0
669
        nans_to_prepend = None
670
671
        if modified_minutes_length < bar_count and \
672
           (modified_minutes_for_window[0] == FIRST_TRADING_MINUTE):
673
            # the beginning of the window goes before our global trading
674
            # start date
675
            bars_to_prepend = bar_count - modified_minutes_length
676
            nans_to_prepend = np.repeat(np.nan, bars_to_prepend)
677
678
        if len(assets) == 0:
679
            return pd.DataFrame(
680
                None,
681
                index=modified_minutes_for_window,
682
                columns=None
683
            )
684
685
        for asset in assets:
686
            asset_minute_data = self._get_minute_window_for_asset(
687
                asset,
688
                field_to_use,
689
                modified_minutes_for_window
690
            )
691
692
            if bars_to_prepend != 0:
693
                asset_minute_data = np.insert(asset_minute_data, 0,
694
                                              nans_to_prepend)
695
696
            data.append(asset_minute_data)
697
698
        return pd.DataFrame(
699
            np.array(data).T,
700
            index=minutes_for_window,
701
            columns=map(int, assets)
702
        )
703
704
    def get_history_window(self, assets, end_dt, bar_count, frequency, field,
705
                           ffill=True):
706
        """
707
        Public API method that returns a dataframe containing the requested
708
        history window.  Data is fully adjusted.
709
710
        Parameters
711
        ---------
712
        assets : list of zipline.data.Asset objects
713
            The assets whose data is desired.
714
715
        bar_count: int
716
            The number of bars desired.
717
718
        frequency: string
719
            "1d" or "1m"
720
721
        field: string
722
            The desired field of the asset.
723
724
        ffill: boolean
725
            Forward-fill missing values. Only has effect if field
726
            is 'price'.
727
728
        Returns
729
        -------
730
        A dataframe containing the requested data.
731
        """
732
        try:
733
            field_to_use = BASE_FIELDS[field]
734
        except KeyError:
735
            raise ValueError("Invalid history field: " + str(field))
736
737
        # sanity check in case sids were passed in
738
        assets = [(self.env.asset_finder.retrieve_asset(asset) if
739
                   isinstance(asset, int) else asset) for asset in assets]
740
741
        if frequency == "1d":
742
            df = self._get_history_daily_window(assets, end_dt, bar_count,
743
                                                field_to_use)
744
        elif frequency == "1m":
745
            df = self._get_history_minute_window(assets, end_dt, bar_count,
746
                                                 field_to_use)
747
        else:
748
            raise ValueError("Invalid frequency: {0}".format(frequency))
749
750
        # forward-fill if needed
751
        if field == "price" and ffill:
752
            df.fillna(method='ffill', inplace=True)
753
754
        return df
755
756
    def _get_minute_window_for_asset(self, asset, field, minutes_for_window):
757
        """
758
        Internal method that gets a window of adjusted minute data for an asset
759
        and specified date range.  Used to support the history API method for
760
        minute bars.
761
762
        Missing bars are filled with NaN.
763
764
        Parameters
765
        ----------
766
        asset : Asset
767
            The asset whose data is desired.
768
769
        field: string
770
            The specific field to return.  "open", "high", "close_price", etc.
771
772
        minutes_for_window: pd.DateTimeIndex
773
            The list of minutes representing the desired window.  Each minute
774
            is a pd.Timestamp.
775
776
        Returns
777
        -------
778
        A numpy array with requested values.
779
        """
780
        if isinstance(asset, int):
781
            asset = self.env.asset_finder.retrieve_asset(asset)
782
783
        if isinstance(asset, Future):
784
            return self._get_minute_window_for_future(asset, field,
785
                                                      minutes_for_window)
786
        else:
787
            return self._get_minute_window_for_equity(asset, field,
788
                                                      minutes_for_window)
789
790
    def _get_minute_window_for_future(self, asset, field, minutes_for_window):
791
        # THIS IS TEMPORARY.  For now, we are only exposing futures within
792
        # equity trading hours (9:30 am to 4pm, Eastern).  The easiest way to
793
        # do this is to simply do a spot lookup for each desired minute.
794
        return_data = np.zeros(len(minutes_for_window), dtype=np.float64)
795
        for idx, minute in enumerate(minutes_for_window):
796
            return_data[idx] = \
797
                self._get_minute_spot_value_future(asset, field, minute)
798
799
        # Note: an improvement could be to find the consecutive runs within
800
        # minutes_for_window, and use them to read the underlying ctable
801
        # more efficiently.
802
803
        # Once futures are on 24-hour clock, then we can just grab all the
804
        # requested minutes in one shot from the ctable.
805
806
        # no adjustments for futures, yay.
807
        return return_data
808
809
    def _get_minute_window_for_equity(self, asset, field, minutes_for_window):
810
        # each sid's minutes are stored in a bcolz file
811
        # the bcolz file has 390 bars per day, starting at 1/2/2002, regardless
812
        # of when the asset started trading and regardless of half days.
813
        # for a half day, the second half is filled with zeroes.
814
815
        # find the position of start_dt in the entire timeline, go back
816
        # bar_count bars, and that's the unadjusted data
817
        raw_data = self._open_minute_file(field, asset)
818
819
        start_idx = max(self._find_position_of_minute(minutes_for_window[0]),
820
                        0)
821
        end_idx = self._find_position_of_minute(minutes_for_window[-1]) + 1
822
823
        return_data = np.zeros(len(minutes_for_window), dtype=np.float64)
824
        data_to_copy = raw_data[start_idx:end_idx]
825
826
        num_minutes = len(minutes_for_window)
827
828
        # data_to_copy contains all the zeros (from 1pm to 4pm of an early
829
        # close).  num_minutes is the number of actual trading minutes.  if
830
        # these two have different lengths, that means that we need to trim
831
        # away data due to early closes.
832
        if len(data_to_copy) != num_minutes:
833
            # get a copy of the minutes in Eastern time, since we depend on
834
            # an early close being at 1pm Eastern.
835
            eastern_minutes = minutes_for_window.tz_convert("US/Eastern")
836
837
            # accumulate a list of indices of the last minute of an early
838
            # close day.  For example, if data_to_copy starts at 12:55 pm, and
839
            # there are five minutes of real data before 180 zeroes, we would
840
            # put 5 into last_minute_idx_of_early_close_day, because the fifth
841
            # minute is the last "real" minute of the day.
842
            last_minute_idx_of_early_close_day = []
843
            for minute_idx, minute_dt in enumerate(eastern_minutes):
844
                if minute_idx == (num_minutes - 1):
845
                    break
846
847
                if minute_dt.hour == 13 and minute_dt.minute == 0:
848
                    next_minute = eastern_minutes[minute_idx + 1]
849
                    if next_minute.hour != 13:
850
                        # minute_dt is the last minute of an early close day
851
                        last_minute_idx_of_early_close_day.append(minute_idx)
852
853
            # spin through the list of early close markers, and use them to
854
            # chop off 180 minutes at a time from data_to_copy.
855
            for idx, early_close_minute_idx in \
856
                    enumerate(last_minute_idx_of_early_close_day):
857
                early_close_minute_idx -= (180 * idx)
858
                data_to_copy = np.delete(
859
                    data_to_copy,
860
                    range(
861
                        early_close_minute_idx + 1,
862
                        early_close_minute_idx + 181
863
                    )
864
                )
865
866
        return_data[0:len(data_to_copy)] = data_to_copy
867
868
        self._apply_all_adjustments(
869
            return_data,
870
            asset,
871
            minutes_for_window,
872
            field,
873
            self.MINUTE_PRICE_ADJUSTMENT_FACTOR
874
        )
875
876
        return return_data
877
878
    def _apply_all_adjustments(self, data, asset, dts, field,
879
                               price_adj_factor=1.0):
880
        """
881
        Internal method that applies all the necessary adjustments on the
882
        given data array.
883
884
        The adjustments are:
885
        - splits
886
        - if field != "volume":
887
            - mergers
888
            - dividends
889
            - * 0.001
890
            - any zero fields replaced with NaN
891
        - all values rounded to 3 digits after the decimal point.
892
893
        Parameters
894
        ----------
895
        data : np.array
896
            The data to be adjusted.
897
898
        asset: Asset
899
            The asset whose data is being adjusted.
900
901
        dts: pd.DateTimeIndex
902
            The list of minutes or days representing the desired window.
903
904
        field: string
905
            The field whose values are in the data array.
906
907
        price_adj_factor: float
908
            Factor with which to adjust OHLC values.
909
        Returns
910
        -------
911
        None.  The data array is modified in place.
912
        """
913
        self._apply_adjustments_to_window(
914
            self._get_adjustment_list(
915
                asset, self._splits_dict, "SPLITS"
916
            ),
917
            data,
918
            dts,
919
            field != 'volume'
920
        )
921
922
        if field != 'volume':
923
            self._apply_adjustments_to_window(
924
                self._get_adjustment_list(
925
                    asset, self._mergers_dict, "MERGERS"
926
                ),
927
                data,
928
                dts,
929
                True
930
            )
931
932
            self._apply_adjustments_to_window(
933
                self._get_adjustment_list(
934
                    asset, self._dividends_dict, "DIVIDENDS"
935
                ),
936
                data,
937
                dts,
938
                True
939
            )
940
941
            data *= price_adj_factor
942
943
            # if anything is zero, it's a missing bar, so replace it with NaN.
944
            # we only want to do this for non-volume fields, because a missing
945
            # volume should be 0.
946
            data[data == 0] = np.NaN
947
948
        np.around(data, 3, out=data)
949
950
    @staticmethod
951
    def _find_position_of_minute(minute_dt):
952
        """
953
        Internal method that returns the position of the given minute in the
954
        list of every trading minute since market open on 1/2/2002.
955
956
        IMPORTANT: This method assumes every day is 390 minutes long, even
957
        early closes.  Our minute bcolz files are generated like this to
958
        support fast lookup.
959
960
        ex. this method would return 2 for 1/2/2002 9:32 AM Eastern.
961
962
        Parameters
963
        ----------
964
        minute_dt: pd.Timestamp
965
            The minute whose position should be calculated.
966
967
        Returns
968
        -------
969
        The position of the given minute in the list of all trading minutes
970
        since market open on 1/2/2002.
971
        """
972
        day = minute_dt.date()
973
        day_idx = tradingcalendar.trading_days.searchsorted(day) -\
974
            INDEX_OF_FIRST_TRADING_DAY
975
976
        day_open = pd.Timestamp(
977
            datetime(
978
                year=day.year,
979
                month=day.month,
980
                day=day.day,
981
                hour=9,
982
                minute=31),
983
            tz='US/Eastern').tz_convert('UTC')
984
985
        minutes_offset = int((minute_dt - day_open).total_seconds()) / 60
986
987
        return int((390 * day_idx) + minutes_offset)
988
989
    def _get_daily_window_for_sid(self, asset, field, days_in_window,
990
                                  extra_slot=True):
991
        """
992
        Internal method that gets a window of adjusted daily data for a sid
993
        and specified date range.  Used to support the history API method for
994
        daily bars.
995
996
        Parameters
997
        ----------
998
        asset : Asset
999
            The asset whose data is desired.
1000
1001
        start_dt: pandas.Timestamp
1002
            The start of the desired window of data.
1003
1004
        bar_count: int
1005
            The number of days of data to return.
1006
1007
        field: string
1008
            The specific field to return.  "open", "high", "close_price", etc.
1009
1010
        extra_slot: boolean
1011
            Whether to allocate an extra slot in the returned numpy array.
1012
            This extra slot will hold the data for the last partial day.  It's
1013
            much better to create it here than to create a copy of the array
1014
            later just to add a slot.
1015
1016
        Returns
1017
        -------
1018
        A numpy array with requested values.  Any missing slots filled with
1019
        nan.
1020
1021
        """
1022
        daily_data, daily_attrs = self._open_daily_file()
1023
1024
        # the daily file stores each sid's daily OHLCV in a contiguous block.
1025
        # the first row per sid is either 1/2/2002, or the sid's start_date if
1026
        # it started after 1/2/2002.  once a sid stops trading, there are no
1027
        # rows for it.
1028
1029
        bar_count = len(days_in_window)
1030
1031
        # create an np.array of size bar_count
1032
        if extra_slot:
1033
            return_array = np.zeros((bar_count + 1,))
1034
        else:
1035
            return_array = np.zeros((bar_count,))
1036
1037
        return_array[:] = np.NAN
1038
        sid = int(asset)
1039
1040
        # find the start index in the daily file for this asset
1041
        asset_file_index = daily_attrs['first_row'][str(sid)]
1042
1043
        trading_days = tradingcalendar.trading_days
1044
1045
        # Calculate the starting day to use (either the asset's first trading
1046
        # day, or 1/1/2002 (which is the 3028th day in the trading calendar).
1047
        first_trading_day_to_use = max(trading_days.searchsorted(
1048
            self._asset_start_dates[asset]), INDEX_OF_FIRST_TRADING_DAY)
1049
1050
        # find the # of trading days between max(asset's first trade date,
1051
        # 2002-01-02) and start_dt
1052
        window_offset = (trading_days.searchsorted(days_in_window[0]) -
1053
                         first_trading_day_to_use)
1054
1055
        start_index = max(asset_file_index, asset_file_index + window_offset)
1056
1057
        if window_offset < 0 and (abs(window_offset) > bar_count):
1058
            # consumer is requesting a history window that starts AND ends
1059
            # before this equity started trading, so gtfo
1060
            return return_array
1061
1062
        # find the end index in the daily file. make sure it doesn't extend
1063
        # past the end of this asset's data in the daily file.
1064
        if window_offset < 0:
1065
            # if the window_offset is negative, we need to decrease the
1066
            # end_index accordingly.
1067
            end_index = min(start_index + window_offset + bar_count,
1068
                            daily_attrs['last_row'][str(sid)] + 1)
1069
1070
            # get data from bcolz file
1071
            data = daily_data[field][start_index:end_index]
1072
1073
            # have to leave a bunch of empty slots at the beginning of
1074
            # return_array, since they represent days before this asset
1075
            # started trading.
1076
            return_array[abs(window_offset):bar_count] = data
1077
        else:
1078
            end_index = min(start_index + bar_count,
1079
                            daily_attrs['last_row'][str(sid)])
1080
            data = daily_data[field][start_index:(end_index + 1)]
1081
1082
            if len(data) > len(return_array):
1083
                return_array[:] = data[0:len(return_array)]
1084
            else:
1085
                return_array[0:len(data)] = data
1086
1087
        self._apply_all_adjustments(
1088
            return_array,
1089
            sid,
1090
            days_in_window,
1091
            field,
1092
            self.DAILY_PRICE_ADJUSTMENT_FACTOR
1093
        )
1094
1095
        return return_array
1096
1097
    @staticmethod
1098
    def _apply_adjustments_to_window(adjustments_list, window_data,
1099
                                     dts_in_window, multiply):
1100
        if len(adjustments_list) == 0:
1101
            return
1102
1103
        # advance idx to the correct spot in the adjustments list, based on
1104
        # when the window starts
1105
        idx = 0
1106
1107
        while idx < len(adjustments_list) and dts_in_window[0] >\
1108
                adjustments_list[idx][0]:
1109
            idx += 1
1110
1111
        # if we've advanced through all the adjustments, then there's nothing
1112
        # to do.
1113
        if idx == len(adjustments_list):
1114
            return
1115
1116
        while idx < len(adjustments_list):
1117
            adjustment_to_apply = adjustments_list[idx]
1118
1119
            if adjustment_to_apply[0] > dts_in_window[-1]:
1120
                break
1121
1122
            range_end = dts_in_window.searchsorted(adjustment_to_apply[0])
1123
            if multiply:
1124
                window_data[0:range_end] *= adjustment_to_apply[1]
1125
            else:
1126
                window_data[0:range_end] /= adjustment_to_apply[1]
1127
1128
            idx += 1
1129
1130
    def _get_adjustment_list(self, asset, adjustments_dict, table_name):
1131
        """
1132
        Internal method that returns a list of adjustments for the given sid.
1133
1134
        Parameters
1135
        ----------
1136
        asset : Asset
1137
            The asset for which to return adjustments.
1138
1139
        adjustments_dict: dict
1140
            A dictionary of sid -> list that is used as a cache.
1141
1142
        table_name: string
1143
            The table that contains this data in the adjustments db.
1144
1145
        Returns
1146
        -------
1147
        adjustments: list
1148
            A list of [multiplier, pd.Timestamp], earliest first
1149
1150
        """
1151
        if self._adjustment_reader is None:
1152
            return []
1153
1154
        sid = int(asset)
1155
1156
        try:
1157
            adjustments = adjustments_dict[sid]
1158
        except KeyError:
1159
            adjustments = adjustments_dict[sid] = self._adjustment_reader.\
1160
                get_adjustments_for_sid(table_name, sid)
1161
1162
        return adjustments
1163
1164
    def get_equity_price_view(self, asset):
1165
        """
1166
        Returns a DataPortalSidView for the given asset.  Used to support the
1167
        data[sid(N)] public API.  Not needed if DataPortal is used standalone.
1168
1169
        Parameters
1170
        ----------
1171
        asset : Asset
1172
            Asset that is being queried.
1173
1174
        Returns
1175
        -------
1176
        DataPortalSidView: Accessor into the given asset's data.
1177
        """
1178
        try:
1179
            view = self.views[asset]
1180
        except KeyError:
1181
            view = self.views[asset] = DataPortalSidView(asset, self)
1182
1183
        return view
1184
1185
    def _check_is_currently_alive(self, asset, dt):
1186
        if dt is None:
1187
            dt = self.current_day
1188
1189
        sid = int(asset)
1190
1191
        if sid not in self._asset_start_dates:
1192
            self._get_asset_start_date(asset)
1193
1194
        start_date = self._asset_start_dates[sid]
1195
        if self._asset_start_dates[sid] > dt:
1196
            raise NoTradeDataAvailableTooEarly(
1197
                sid=sid,
1198
                dt=dt,
1199
                start_dt=start_date
1200
            )
1201
1202
        end_date = self._asset_end_dates[sid]
1203
        if self._asset_end_dates[sid] < dt:
1204
            raise NoTradeDataAvailableTooLate(
1205
                sid=sid,
1206
                dt=dt,
1207
                end_dt=end_date
1208
            )
1209
1210
    def _get_asset_start_date(self, asset):
1211
        self._ensure_asset_dates(asset)
1212
        return self._asset_start_dates[asset]
1213
1214
    def _get_asset_end_date(self, asset):
1215
        self._ensure_asset_dates(asset)
1216
        return self._asset_end_dates[asset]
1217
1218
    def _ensure_asset_dates(self, asset):
1219
        sid = int(asset)
1220
1221
        if sid not in self._asset_start_dates:
1222
            self._asset_start_dates[sid] = asset.start_date
1223
            self._asset_end_dates[sid] = asset.end_date
1224
1225
    def get_splits(self, sids, dt):
1226
        """
1227
        Returns any splits for the given sids and the given dt.
1228
1229
        Parameters
1230
        ----------
1231
        sids : list
1232
            Sids for which we want splits.
1233
1234
        dt: pd.Timestamp
1235
            The date for which we are checking for splits.  Note: this is
1236
            expected to be midnight UTC.
1237
1238
        Returns
1239
        -------
1240
        list: List of splits, where each split is a (sid, ratio) tuple.
1241
        """
1242
        if self._adjustment_reader is None or len(sids) == 0:
1243
            return {}
1244
1245
        # convert dt to # of seconds since epoch, because that's what we use
1246
        # in the adjustments db
1247
        seconds = int(dt.value / 1e9)
1248
1249
        splits = self._adjustment_reader.conn.execute(
1250
            "SELECT sid, ratio FROM SPLITS WHERE effective_date = ?",
1251
            (seconds,)).fetchall()
1252
1253
        sids_set = set(sids)
1254
        splits = [split for split in splits if split[0] in sids_set]
1255
1256
        return splits
1257
1258
    def get_stock_dividends(self, sid, trading_days):
1259
        """
1260
        Returns all the stock dividends for a specific sid that occur
1261
        in the given trading range.
1262
1263
        Parameters
1264
        ----------
1265
        sid: int
1266
            The asset whose stock dividends should be returned.
1267
1268
        trading_days: pd.DatetimeIndex
1269
            The trading range.
1270
1271
        Returns
1272
        -------
1273
        list: A list of objects with all relevant attributes populated.
1274
        All timestamp fields are converted to pd.Timestamps.
1275
        """
1276
1277
        if self._adjustment_reader is None:
1278
            return []
1279
1280
        if len(trading_days) == 0:
1281
            return []
1282
1283
        start_dt = trading_days[0].value / 1e9
1284
        end_dt = trading_days[-1].value / 1e9
1285
1286
        dividends = self._adjustment_reader.conn.execute(
1287
            "SELECT * FROM stock_dividend_payouts WHERE sid = ? AND "
1288
            "ex_date > ? AND pay_date < ?", (int(sid), start_dt, end_dt,)).\
1289
            fetchall()
1290
1291
        dividend_info = []
1292
        for dividend_tuple in dividends:
1293
            dividend_info.append({
1294
                "declared_date": dividend_tuple[1],
1295
                "ex_date": pd.Timestamp(dividend_tuple[2], unit="s"),
1296
                "pay_date": pd.Timestamp(dividend_tuple[3], unit="s"),
1297
                "payment_sid": dividend_tuple[4],
1298
                "ratio": dividend_tuple[5],
1299
                "record_date": pd.Timestamp(dividend_tuple[6], unit="s"),
1300
                "sid": dividend_tuple[7]
1301
            })
1302
1303
        return dividend_info
1304
1305
    def contains(self, asset, field):
1306
        return field in BASE_FIELDS or \
1307
            (field in self._augmented_sources_map and
1308
             asset in self._augmented_sources_map[field])
1309
1310
    def get_fetcher_assets(self):
1311
        """
1312
        Returns a list of assets for the current date, as defined by the
1313
        fetcher data.
1314
1315
        Notes
1316
        -----
1317
        Data is forward-filled.  If there is no fetcher data defined for day
1318
        N, we use day N-1's data (if available, otherwise we keep going back).
1319
1320
        Returns
1321
        -------
1322
        list: a list of Asset objects.
1323
        """
1324
        # return a list of assets for the current date, as defined by the
1325
        # fetcher source
1326
        if self._fetcher_df is None:
1327
            return []
1328
1329
        if self.current_day in self._fetcher_df.index:
1330
            date_to_use = self.current_day
1331
        else:
1332
            # current day isn't in the fetcher df, go back the last
1333
            # available day
1334
            idx = self._fetcher_df.index.searchsorted(self.current_day)
1335
            if idx == 0:
1336
                return []
1337
1338
            date_to_use = self._fetcher_df.index[idx - 1]
1339
1340
        asset_list = self._fetcher_df.loc[date_to_use]["sid"]
1341
1342
        # make sure they're actually assets
1343
        asset_list = [asset for asset in asset_list
1344
                      if isinstance(asset, Asset)]
1345
1346
        return asset_list
1347
1348
1349
class DataPortalSidView(object):
1350
    def __init__(self, asset, portal):
1351
        self.asset = asset
1352
        self.portal = portal
1353
1354
    def __getattr__(self, column):
1355
        return self.portal.get_spot_value(self.asset, column)
1356
1357
    def __contains__(self, column):
1358
        return self.portal.contains(self.asset, column)
1359
1360
    def __getitem__(self, column):
1361
        return self.__getattr__(column)
1362