Completed
Pull Request — master (#858)
by Eddie
01:43
created

zipline.data.DataPortal   F

Complexity

Total Complexity 147

Size/Duplication

Total Lines 1197
Duplicated Lines 0 %
Metric Value
dl 0
loc 1197
rs 0.6316
wmc 147

32 Methods

Rating   Name   Duplication   Size   Complexity  
C get_history_window() 0 51 8
B _get_minute_window_for_asset() 0 33 3
A _get_minute_window_for_future() 0 18 2
C _get_minute_spot_value() 0 70 8
A _check_is_currently_alive() 0 20 4
A setup_offset_cache() 0 16 4
B __init__() 0 81 3
A _get_asset_start_date() 0 3 1
C _get_history_daily_window_equity() 0 58 8
B get_stock_dividends() 0 46 4
A _get_asset_end_date() 0 3 1
F _get_minute_window_for_equity() 0 76 9
B get_splits() 0 32 5
B _get_minute_spot_value_future() 0 29 5
A contains() 0 4 1
A _ensure_asset_dates() 0 6 2
C _get_history_minute_window() 0 59 7
A _check_extra_sources() 0 21 3
B _get_ctable() 0 31 6
D _apply_adjustments_to_window() 0 32 8
A _minute_offset() 0 8 3
A _apply_all_adjustments() 0 71 2
B get_previous_value() 0 30 3
B _get_history_daily_window() 0 31 4
B _get_adjustment_list() 0 33 3
D _get_history_daily_window_future() 0 56 11
A _open_minute_file() 0 10 2
A _get_daily_window_for_sid() 0 61 3
A _get_daily_data() 0 10 4
B get_spot_value() 0 52 6
D handle_extra_source() 0 74 8
B get_fetcher_assets() 0 37 6

How to fix   Complexity   

Complex Class

Complex classes like zipline.data.DataPortal often do a lot of different things. To break such a class down, we need to identify a cohesive component within that class. A common approach to find such a component is to look for fields/methods that share the same prefixes, or suffixes.

Once you have determined the fields that belong together, you can apply the Extract Class refactoring. If the component makes sense as a sub-class, Extract Subclass is also a candidate, and is often faster.

1
#
2
# Copyright 2015 Quantopian, Inc.
3
#
4
# Licensed under the Apache License, Version 2.0 (the "License");
5
# you may not use this file except in compliance with the License.
6
# You may obtain a copy of the License at
7
#
8
#     http://www.apache.org/licenses/LICENSE-2.0
9
#
10
# Unless required by applicable law or agreed to in writing, software
11
# distributed under the License is distributed on an "AS IS" BASIS,
12
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13
# See the License for the specific language governing permissions and
14
# limitations under the License.
15
16
import bcolz
17
from logbook import Logger
18
19
import numpy as np
20
import pandas as pd
21
from pandas.tslib import normalize_date
22
from six import iteritems
23
24
from zipline.assets import Asset, Future, Equity
25
from zipline.data.us_equity_pricing import (
26
    BcolzDailyBarReader,
27
    NoDataOnDate
28
)
29
from zipline.pipeline.data.equity_pricing import USEquityPricing
30
31
from zipline.utils import tradingcalendar
32
from zipline.errors import (
33
    NoTradeDataAvailableTooEarly,
34
    NoTradeDataAvailableTooLate
35
)
36
37
log = Logger('DataPortal')
38
39
HISTORY_FREQUENCIES = ["1d", "1m"]
40
41
BASE_FIELDS = {
42
    'open': 'open',
43
    'open_price': 'open',
44
    'high': 'high',
45
    'low': 'low',
46
    'close': 'close',
47
    'close_price': 'close',
48
    'volume': 'volume',
49
    'price': 'close'
50
}
51
52
53
US_EQUITY_COLUMNS = {
54
    'open': USEquityPricing.open,
55
    'open_price': USEquityPricing.open,
56
    'high': USEquityPricing.high,
57
    'low': USEquityPricing.low,
58
    'close': USEquityPricing.close,
59
    'close_price': USEquityPricing.close,
60
    'volume': USEquityPricing.volume,
61
    'price': USEquityPricing.close,
62
}
63
64
65
class DataPortal(object):
66
    def __init__(self,
67
                 env,
68
                 sim_params=None,
69
                 equity_minute_reader=None,
70
                 minutes_futures_path=None,
71
                 daily_equities_path=None,
72
                 adjustment_reader=None,
73
                 futures_sid_path_func=None):
74
        self.env = env
75
76
        # This is a bit ugly, but is here for performance reasons.  In minute
77
        # simulations, we need to very quickly go from dt -> (# of minutes
78
        # since Jan 1 2002 9:30 Eastern).
79
        #
80
        # The clock that heartbeats the simulation has all the necessary
81
        # information to do this calculation very quickly.  This value is
82
        # calculated there, and then set here
83
        self.cur_data_offset = 0
84
85
        self.views = {}
86
87
        self._daily_equities_path = daily_equities_path
88
        self._minutes_futures_path = minutes_futures_path
89
90
        self._asset_finder = env.asset_finder
91
92
        self._carrays = {
93
            'open': {},
94
            'high': {},
95
            'low': {},
96
            'close': {},
97
            'volume': {},
98
            'sid': {},
99
            'dt': {},
100
        }
101
102
        self._adjustment_reader = adjustment_reader
103
104
        # caches of sid -> adjustment list
105
        self._splits_dict = {}
106
        self._mergers_dict = {}
107
        self._dividends_dict = {}
108
109
        # Cache of sid -> the first trading day of an asset, even if that day
110
        # is before 1/2/2002.
111
        self._asset_start_dates = {}
112
        self._asset_end_dates = {}
113
114
        # Handle extra sources, like Fetcher.
115
        self._augmented_sources_map = {}
116
        self._extra_source_df = None
117
118
        self._sim_params = sim_params
119
        if self._sim_params is not None:
120
            self._data_frequency = self._sim_params.data_frequency
121
        else:
122
            self._data_frequency = "minute"
123
124
        self._futures_sid_path_func = futures_sid_path_func
125
126
        self.MINUTE_PRICE_ADJUSTMENT_FACTOR = 0.001
127
128
        if daily_equities_path is not None:
129
            self._daily_bar_reader = BcolzDailyBarReader(daily_equities_path)
130
        else:
131
            self._daily_bar_reader = None
132
133
        self._equity_minute_reader = equity_minute_reader
134
135
        # The following values are used by _minute_offset to calculate the
136
        # index into the minute bcolz date.
137
138
        # A lookup of table every minute to the corresponding day, to avoid
139
        # calling `.date()` on every lookup.
140
        self._minutes_to_day = {}
141
        # A map of days (keyed by midnight) to a DatetimeIndex of market
142
        # minutes for that day.
143
        self._minutes_by_day = {}
144
        # A dict of day to the offset into the minute bcolz on which that
145
        # days data starts.
146
        self._day_offsets = None
147
148
    def handle_extra_source(self, source_df):
149
        """
150
        Extra sources always have a sid column.
151
152
        We expand the given data (by forward filling) to the full range of
153
        the simulation dates, so that lookup is fast during simulation.
154
        """
155
        if source_df is None:
156
            return
157
158
        self._extra_source_df = source_df
159
160
        # source_df's sid column can either consist of assets we know about
161
        # (such as sid(24)) or of assets we don't know about (such as
162
        # palladium).
163
        #
164
        # In both cases, we break up the dataframe into individual dfs
165
        # that only contain a single asset's information.  ie, if source_df
166
        # has data for PALLADIUM and GOLD, we split source_df into two
167
        # dataframes, one for each. (same applies if source_df has data for
168
        # AAPL and IBM).
169
        #
170
        # We then take each child df and reindex it to the simulation's date
171
        # range by forward-filling missing values. this makes reads simpler.
172
        #
173
        # Finally, we store the data. For each column, we store a mapping in
174
        # self.augmented_sources_map from the column to a dictionary of
175
        # asset -> df.  In other words,
176
        # self.augmented_sources_map['days_to_cover']['AAPL'] gives us the df
177
        # holding that data.
178
179
        if self._sim_params.emission_rate == "daily":
180
            source_date_index = self.env.days_in_range(
181
                start=self._sim_params.period_start,
182
                end=self._sim_params.period_end
183
            )
184
        else:
185
            source_date_index = self.env.minutes_for_days_in_range(
186
                start=self._sim_params.period_start,
187
                end=self._sim_params.period_end
188
            )
189
190
        # Break the source_df up into one dataframe per sid.  This lets
191
        # us (more easily) calculate accurate start/end dates for each sid,
192
        # de-dup data, and expand the data to fit the backtest start/end date.
193
        grouped_by_sid = source_df.groupby(["sid"])
194
        group_names = grouped_by_sid.groups.keys()
195
        group_dict = {}
196
        for group_name in group_names:
197
            group_dict[group_name] = grouped_by_sid.get_group(group_name)
198
199
        for identifier, df in iteritems(group_dict):
200
            # Before reindexing, save the earliest and latest dates
201
            earliest_date = df.index[0]
202
            latest_date = df.index[-1]
203
204
            # Since we know this df only contains a single sid, we can safely
205
            # de-dupe by the index (dt)
206
            df = df.groupby(level=0).last()
207
208
            # Reindex the dataframe based on the backtest start/end date.
209
            # This makes reads easier during the backtest.
210
            df = df.reindex(index=source_date_index, method='ffill')
211
212
            if not isinstance(identifier, Asset):
213
                # for fake assets we need to store a start/end date
214
                self._asset_start_dates[identifier] = earliest_date
215
                self._asset_end_dates[identifier] = latest_date
216
217
            for col_name in df.columns.difference(['sid']):
218
                if col_name not in self._augmented_sources_map:
219
                    self._augmented_sources_map[col_name] = {}
220
221
                self._augmented_sources_map[col_name][identifier] = df
222
223
    def _open_minute_file(self, field, asset):
224
        sid_str = str(int(asset))
225
226
        try:
227
            carray = self._carrays[field][sid_str]
228
        except KeyError:
229
            carray = self._carrays[field][sid_str] = \
230
                self._get_ctable(asset)[field]
231
232
        return carray
233
234
    def _get_ctable(self, asset):
235
        sid = int(asset)
236
237
        if isinstance(asset, Future):
238
            if self._futures_sid_path_func is not None:
239
                path = self._futures_sid_path_func(
240
                    self._minutes_futures_path, sid
241
                )
242
            else:
243
                path = "{0}/{1}.bcolz".format(self._minutes_futures_path, sid)
244
        elif isinstance(asset, Equity):
245
            if self._equity_minute_reader.sid_path_func is not None:
246
                path = self._equity_minute_reader.sid_path_func(
247
                    self._equity_minute_reader.rootdir, sid
248
                )
249
            else:
250
                path = "{0}/{1}.bcolz".format(
251
                    self._equity_minute_reader.rootdir, sid)
252
253
        else:
254
            # TODO: Figure out if assets should be allowed if neither, and
255
            # why this code path is being hit.
256
            if self._equity_minute_reader.sid_path_func is not None:
257
                path = self._equity_minute_reader.sid_path_func(
258
                    self._equity_minute_reader.rootdir, sid
259
                )
260
            else:
261
                path = "{0}/{1}.bcolz".format(
262
                    self._equity_minute_reader.rootdir, sid)
263
264
        return bcolz.open(path, mode='r')
265
266
    def get_previous_value(self, asset, field, dt):
267
        """
268
        Given an asset and a column and a dt, returns the previous value for
269
        the same asset/column pair.  If this data portal is in minute mode,
270
        it's the previous minute value, otherwise it's the previous day's
271
        value.
272
273
        Parameters
274
        ---------
275
        asset : Asset
276
            The asset whose data is desired.
277
278
        field: string
279
            The desired field of the asset.  Valid values are "open",
280
            "open_price", "high", "low", "close", "close_price", "volume", and
281
            "price".
282
283
        dt: pd.Timestamp
284
            The timestamp from which to go back in time one slot.
285
286
        Returns
287
        -------
288
        The value of the desired field at the desired time.
289
        """
290
        if self._data_frequency == 'daily':
291
            prev_dt = self.env.previous_trading_day(dt)
292
        elif self._data_frequency == 'minute':
293
            prev_dt = self.env.previous_market_minute(dt)
294
295
        return self.get_spot_value(asset, field, prev_dt)
296
297
    def _check_extra_sources(self, asset, column, day):
298
        # If we have an extra source with a column called "price", only look
299
        # at it if it's on something like palladium and not AAPL (since our
300
        # own price data always wins when dealing with assets).
301
        look_in_augmented_sources = column in self._augmented_sources_map and \
302
            not (column in BASE_FIELDS and isinstance(asset, Asset))
303
304
        if look_in_augmented_sources:
305
            # we're being asked for a field in an extra source
306
            try:
307
                return self._augmented_sources_map[column][asset].\
308
                    loc[day, column]
309
            except:
310
                log.error(
311
                    "Could not find value for asset={0}, day={1},"
312
                    "column={2}".format(
313
                        str(asset),
314
                        str(day),
315
                        str(column)))
316
317
                raise KeyError
318
319
    def get_spot_value(self, asset, field, dt):
320
        """
321
        Public API method that returns a scalar value representing the value
322
        of the desired asset's field at either the given dt.
323
324
        Parameters
325
        ---------
326
        asset : Asset
327
            The asset whose data is desired.gith
328
329
        field: string
330
            The desired field of the asset.  Valid values are "open",
331
            "open_price", "high", "low", "close", "close_price", "volume", and
332
            "price".
333
334
        dt: pd.Timestamp
335
            (Optional) The timestamp for the desired value.
336
337
        Returns
338
        -------
339
        The value of the desired field at the desired time.
340
        """
341
        extra_source_val = self._check_extra_sources(
342
            asset,
343
            field,
344
            dt,
345
        )
346
347
        if extra_source_val is not None:
348
            return extra_source_val
349
350
        if field not in BASE_FIELDS:
351
            raise KeyError("Invalid column: " + str(field))
352
353
        column_to_use = BASE_FIELDS[field]
354
355
        if isinstance(asset, int):
356
            asset = self._asset_finder.retrieve_asset(asset)
357
358
        self._check_is_currently_alive(asset, dt)
359
360
        if self._data_frequency == "daily":
361
            day_to_use = dt
362
            day_to_use = normalize_date(day_to_use)
363
            return self._get_daily_data(asset, column_to_use, day_to_use)
364
        else:
365
            if isinstance(asset, Future):
366
                return self._get_minute_spot_value_future(
367
                    asset, column_to_use, dt)
368
            else:
369
                return self._get_minute_spot_value(
370
                    asset, column_to_use, dt)
371
372
    def _get_minute_spot_value_future(self, asset, column, dt):
373
        # Futures bcolz files have 1440 bars per day (24 hours), 7 days a week.
374
        # The file attributes contain the "start_dt" and "last_dt" fields,
375
        # which represent the time period for this bcolz file.
376
377
        # The start_dt is midnight of the first day that this future started
378
        # trading.
379
380
        # figure out the # of minutes between dt and this asset's start_dt
381
        start_date = self._get_asset_start_date(asset)
382
        minute_offset = int((dt - start_date).total_seconds() / 60)
383
384
        if minute_offset < 0:
385
            # asking for a date that is before the asset's start date, no dice
386
            return 0.0
387
388
        # then just index into the bcolz carray at that offset
389
        carray = self._open_minute_file(column, asset)
390
        result = carray[minute_offset]
391
392
        # if there's missing data, go backwards until we run out of file
393
        while result == 0 and minute_offset > 0:
394
            minute_offset -= 1
395
            result = carray[minute_offset]
396
397
        if column != 'volume':
398
            return result * self.MINUTE_PRICE_ADJUSTMENT_FACTOR
399
        else:
400
            return result
401
402
    def setup_offset_cache(self, minutes_by_day, minutes_to_day):
403
        # TODO: This case should not be hit, but is when tests are setup
404
        # with data_frequency of daily, but run with minutely.
405
        if self._equity_minute_reader is None:
406
            return
407
408
        self._minutes_to_day = minutes_to_day
409
        self._minutes_by_day = minutes_by_day
410
        if self._sim_params is not None:
411
            start = self._sim_params.trading_days[0]
412
            first_trading_day_idx = self._equity_minute_reader.trading_days.\
413
                searchsorted(start)
414
            self._day_offsets = {
415
                day: (i + first_trading_day_idx) * 390
416
                for i, day in enumerate(
417
                    self._sim_params.trading_days)}
418
419
    def _minute_offset(self, dt):
420
        if self._day_offsets is not None:
421
            try:
422
                day = self._minutes_to_day[dt]
423
                minutes = self._minutes_by_day[day]
424
                return self._day_offsets[day] + minutes.get_loc(dt)
425
            except KeyError:
426
                return None
427
428
    def _get_minute_spot_value(self, asset, column, dt):
429
        # if dt is before the first market minute, minute_index
430
        # will be 0.  if it's after the last market minute, it'll
431
        # be len(minutes_for_day)
432
        minute_offset_to_use = self._minute_offset(dt)
433
434
        if minute_offset_to_use is None:
435
            given_day = pd.Timestamp(dt.date(), tz='utc')
436
            day_index = self._equity_minute_reader.trading_days.searchsorted(
437
                given_day)
438
439
            # if dt is before the first market minute, minute_index
440
            # will be 0.  if it's after the last market minute, it'll
441
            # be len(minutes_for_day)
442
            minute_index = self.env.market_minutes_for_day(given_day).\
443
                searchsorted(dt)
444
445
            minute_offset_to_use = (day_index * 390) + minute_index
446
447
        carray = self._equity_minute_reader._open_minute_file(column, asset)
448
        result = carray[minute_offset_to_use]
449
450
        if result == 0:
451
            # if the given minute doesn't have data, we need to seek
452
            # backwards until we find data. This makes the data
453
            # forward-filled.
454
455
            # get this asset's start date, so that we don't look before it.
456
            start_date = self._get_asset_start_date(asset)
457
            start_date_idx = self._equity_minute_reader.trading_days.\
458
                searchsorted(start_date)
459
            start_day_offset = start_date_idx * 390
460
461
            original_start = minute_offset_to_use
462
463
            while result == 0 and minute_offset_to_use > start_day_offset:
464
                minute_offset_to_use -= 1
465
                result = carray[minute_offset_to_use]
466
467
            # once we've found data, we need to check whether it needs
468
            # to be adjusted.
469
            if result != 0:
470
                minutes = self.env.market_minute_window(
471
                    start=dt,
472
                    count=(original_start - minute_offset_to_use + 1),
473
                    step=-1
474
                ).order()
475
476
                # only need to check for adjustments if we've gone back
477
                # far enough to cross the day boundary.
478
                if minutes[0].date() != minutes[-1].date():
479
                    # create a np array of size minutes, fill it all with
480
                    # the same value.  and adjust the array.
481
                    arr = np.array([result] * len(minutes),
482
                                   dtype=np.float64)
483
                    self._apply_all_adjustments(
484
                        data=arr,
485
                        asset=asset,
486
                        dts=minutes,
487
                        field=column
488
                    )
489
490
                    # The first value of the adjusted array is the value
491
                    # we want.
492
                    result = arr[0]
493
494
        if column != 'volume':
495
            return result * self.MINUTE_PRICE_ADJUSTMENT_FACTOR
496
        else:
497
            return result
498
499
    def _get_daily_data(self, asset, column, dt):
500
        while True:
501
            try:
502
                value = self._daily_bar_reader.spot_price(asset, dt, column)
503
                if value != -1:
504
                    return value
505
                else:
506
                    dt -= tradingcalendar.trading_day
507
            except NoDataOnDate:
508
                return 0
509
510
    def _get_history_daily_window(self, assets, end_dt, bar_count,
511
                                  field_to_use):
512
        """
513
        Internal method that returns a dataframe containing history bars
514
        of daily frequency for the given sids.
515
        """
516
        day_idx = tradingcalendar.trading_days.searchsorted(end_dt.date())
517
        days_for_window = tradingcalendar.trading_days[
518
            (day_idx - bar_count + 1):(day_idx + 1)]
519
520
        if len(assets) == 0:
521
            return pd.DataFrame(None,
522
                                index=days_for_window,
523
                                columns=None)
524
525
        data = []
526
527
        for asset in assets:
528
            if isinstance(asset, Future):
529
                data.append(self._get_history_daily_window_future(
530
                    asset, days_for_window, end_dt, field_to_use
531
                ))
532
            else:
533
                data.append(self._get_history_daily_window_equity(
534
                    asset, days_for_window, end_dt, field_to_use
535
                ))
536
537
        return pd.DataFrame(
538
            np.array(data).T,
539
            index=days_for_window,
540
            columns=assets
541
        )
542
543
    def _get_history_daily_window_future(self, asset, days_for_window,
544
                                         end_dt, column):
545
        # Since we don't have daily bcolz files for futures (yet), use minute
546
        # bars to calculate the daily values.
547
        data = []
548
        data_groups = []
549
550
        # get all the minutes for the days NOT including today
551
        for day in days_for_window[:-1]:
552
            minutes = self.env.market_minutes_for_day(day)
553
554
            values_for_day = np.zeros(len(minutes), dtype=np.float64)
555
556
            for idx, minute in enumerate(minutes):
557
                minute_val = self._get_minute_spot_value_future(
558
                    asset, column, minute
559
                )
560
561
                values_for_day[idx] = minute_val
562
563
            data_groups.append(values_for_day)
564
565
        # get the minutes for today
566
        last_day_minutes = pd.date_range(
567
            start=self.env.get_open_and_close(end_dt)[0],
568
            end=end_dt,
569
            freq="T"
570
        )
571
572
        values_for_last_day = np.zeros(len(last_day_minutes), dtype=np.float64)
573
574
        for idx, minute in enumerate(last_day_minutes):
575
            minute_val = self._get_minute_spot_value_future(
576
                asset, column, minute
577
            )
578
579
            values_for_last_day[idx] = minute_val
580
581
        data_groups.append(values_for_last_day)
582
583
        for group in data_groups:
584
            if len(group) == 0:
585
                continue
586
587
            if column == 'volume':
588
                data.append(np.sum(group))
589
            elif column == 'open':
590
                data.append(group[0])
591
            elif column == 'close':
592
                data.append(group[-1])
593
            elif column == 'high':
594
                data.append(np.amax(group))
595
            elif column == 'low':
596
                data.append(np.amin(group))
597
598
        return data
599
600
    def _get_history_daily_window_equity(self, asset, days_for_window,
601
                                         end_dt, field_to_use):
602
        sid = int(asset)
603
        ends_at_midnight = end_dt.hour == 0 and end_dt.minute == 0
604
605
        # get the start and end dates for this sid
606
        end_date = self._get_asset_end_date(asset)
607
608
        if ends_at_midnight or (days_for_window[-1] > end_date):
609
            # two cases where we use daily data for the whole range:
610
            # 1) the history window ends at midnight utc.
611
            # 2) the last desired day of the window is after the
612
            # last trading day, use daily data for the whole range.
613
            return self._get_daily_window_for_sid(
614
                asset,
615
                field_to_use,
616
                days_for_window,
617
                extra_slot=False
618
            )
619
        else:
620
            # for the last day of the desired window, use minute
621
            # data and aggregate it.
622
            all_minutes_for_day = self.env.market_minutes_for_day(
623
                pd.Timestamp(end_dt.date()))
624
625
            last_minute_idx = all_minutes_for_day.searchsorted(end_dt)
626
627
            # these are the minutes for the partial day
628
            minutes_for_partial_day =\
629
                all_minutes_for_day[0:(last_minute_idx + 1)]
630
631
            daily_data = self._get_daily_window_for_sid(
632
                sid,
633
                field_to_use,
634
                days_for_window[0:-1]
635
            )
636
637
            minute_data = self._get_minute_window_for_equity(
638
                sid,
639
                field_to_use,
640
                minutes_for_partial_day
641
            )
642
643
            if field_to_use == 'volume':
644
                minute_value = np.sum(minute_data)
645
            elif field_to_use == 'open':
646
                minute_value = minute_data[0]
647
            elif field_to_use == 'close':
648
                minute_value = minute_data[-1]
649
            elif field_to_use == 'high':
650
                minute_value = np.amax(minute_data)
651
            elif field_to_use == 'low':
652
                minute_value = np.amin(minute_data)
653
654
            # append the partial day.
655
            daily_data[-1] = minute_value
656
657
            return daily_data
658
659
    def _get_history_minute_window(self, assets, end_dt, bar_count,
660
                                   field_to_use):
661
        """
662
        Internal method that returns a dataframe containing history bars
663
        of minute frequency for the given sids.
664
        """
665
        # get all the minutes for this window
666
        minutes_for_window = self.env.market_minute_window(
667
            end_dt, bar_count, step=-1)[::-1]
668
669
        first_trading_day = self._equity_minute_reader.first_trading_day
670
671
        # but then cut it down to only the minutes after
672
        # the first trading day.
673
        modified_minutes_for_window = minutes_for_window[
674
            minutes_for_window.slice_indexer(first_trading_day)]
675
676
        modified_minutes_length = len(modified_minutes_for_window)
677
678
        if modified_minutes_length == 0:
679
            raise ValueError("Cannot calculate history window that ends"
680
                             "before 2002-01-02 14:31 UTC!")
681
682
        data = []
683
        bars_to_prepend = 0
684
        nans_to_prepend = None
685
686
        if modified_minutes_length < bar_count:
687
            first_trading_date = first_trading_day.date()
688
            if modified_minutes_for_window[0].date() == first_trading_date:
689
                # the beginning of the window goes before our global trading
690
                # start date
691
                bars_to_prepend = bar_count - modified_minutes_length
692
                nans_to_prepend = np.repeat(np.nan, bars_to_prepend)
693
694
        if len(assets) == 0:
695
            return pd.DataFrame(
696
                None,
697
                index=modified_minutes_for_window,
698
                columns=None
699
            )
700
701
        for asset in assets:
702
            asset_minute_data = self._get_minute_window_for_asset(
703
                asset,
704
                field_to_use,
705
                modified_minutes_for_window
706
            )
707
708
            if bars_to_prepend != 0:
709
                asset_minute_data = np.insert(asset_minute_data, 0,
710
                                              nans_to_prepend)
711
712
            data.append(asset_minute_data)
713
714
        return pd.DataFrame(
715
            np.array(data).T,
716
            index=minutes_for_window,
717
            columns=map(int, assets)
718
        )
719
720
    def get_history_window(self, assets, end_dt, bar_count, frequency, field,
721
                           ffill=True):
722
        """
723
        Public API method that returns a dataframe containing the requested
724
        history window.  Data is fully adjusted.
725
726
        Parameters
727
        ---------
728
        assets : list of zipline.data.Asset objects
729
            The assets whose data is desired.
730
731
        bar_count: int
732
            The number of bars desired.
733
734
        frequency: string
735
            "1d" or "1m"
736
737
        field: string
738
            The desired field of the asset.
739
740
        ffill: boolean
741
            Forward-fill missing values. Only has effect if field
742
            is 'price'.
743
744
        Returns
745
        -------
746
        A dataframe containing the requested data.
747
        """
748
        try:
749
            field_to_use = BASE_FIELDS[field]
750
        except KeyError:
751
            raise ValueError("Invalid history field: " + str(field))
752
753
        # sanity check in case sids were passed in
754
        assets = [(self.env.asset_finder.retrieve_asset(asset) if
755
                   isinstance(asset, int) else asset) for asset in assets]
756
757
        if frequency == "1d":
758
            df = self._get_history_daily_window(assets, end_dt, bar_count,
759
                                                field_to_use)
760
        elif frequency == "1m":
761
            df = self._get_history_minute_window(assets, end_dt, bar_count,
762
                                                 field_to_use)
763
        else:
764
            raise ValueError("Invalid frequency: {0}".format(frequency))
765
766
        # forward-fill if needed
767
        if field == "price" and ffill:
768
            df.fillna(method='ffill', inplace=True)
769
770
        return df
771
772
    def _get_minute_window_for_asset(self, asset, field, minutes_for_window):
773
        """
774
        Internal method that gets a window of adjusted minute data for an asset
775
        and specified date range.  Used to support the history API method for
776
        minute bars.
777
778
        Missing bars are filled with NaN.
779
780
        Parameters
781
        ----------
782
        asset : Asset
783
            The asset whose data is desired.
784
785
        field: string
786
            The specific field to return.  "open", "high", "close_price", etc.
787
788
        minutes_for_window: pd.DateTimeIndex
789
            The list of minutes representing the desired window.  Each minute
790
            is a pd.Timestamp.
791
792
        Returns
793
        -------
794
        A numpy array with requested values.
795
        """
796
        if isinstance(asset, int):
797
            asset = self.env.asset_finder.retrieve_asset(asset)
798
799
        if isinstance(asset, Future):
800
            return self._get_minute_window_for_future(asset, field,
801
                                                      minutes_for_window)
802
        else:
803
            return self._get_minute_window_for_equity(asset, field,
804
                                                      minutes_for_window)
805
806
    def _get_minute_window_for_future(self, asset, field, minutes_for_window):
807
        # THIS IS TEMPORARY.  For now, we are only exposing futures within
808
        # equity trading hours (9:30 am to 4pm, Eastern).  The easiest way to
809
        # do this is to simply do a spot lookup for each desired minute.
810
        return_data = np.zeros(len(minutes_for_window), dtype=np.float64)
811
        for idx, minute in enumerate(minutes_for_window):
812
            return_data[idx] = \
813
                self._get_minute_spot_value_future(asset, field, minute)
814
815
        # Note: an improvement could be to find the consecutive runs within
816
        # minutes_for_window, and use them to read the underlying ctable
817
        # more efficiently.
818
819
        # Once futures are on 24-hour clock, then we can just grab all the
820
        # requested minutes in one shot from the ctable.
821
822
        # no adjustments for futures, yay.
823
        return return_data
824
825
    def _get_minute_window_for_equity(self, asset, field, minutes_for_window):
826
        # each sid's minutes are stored in a bcolz file
827
        # the bcolz file has 390 bars per day, starting at 1/2/2002, regardless
828
        # of when the asset started trading and regardless of half days.
829
        # for a half day, the second half is filled with zeroes.
830
831
        # find the position of start_dt in the entire timeline, go back
832
        # bar_count bars, and that's the unadjusted data
833
        raw_data = self._equity_minute_reader._open_minute_file(field, asset)
834
835
        start_idx = max(
836
            self._equity_minute_reader._find_position_of_minute(
837
                minutes_for_window[0]),
838
            0)
839
        end_idx = self._equity_minute_reader._find_position_of_minute(
840
            minutes_for_window[-1]) + 1
841
842
        if end_idx == 0:
843
            # No data to return for minute window.
844
            return np.full(len(minutes_for_window), np.nan)
845
846
        return_data = np.zeros(len(minutes_for_window), dtype=np.float64)
847
848
        data_to_copy = raw_data[start_idx:end_idx]
849
850
        num_minutes = len(minutes_for_window)
851
852
        # data_to_copy contains all the zeros (from 1pm to 4pm of an early
853
        # close).  num_minutes is the number of actual trading minutes.  if
854
        # these two have different lengths, that means that we need to trim
855
        # away data due to early closes.
856
        if len(data_to_copy) != num_minutes:
857
            # get a copy of the minutes in Eastern time, since we depend on
858
            # an early close being at 1pm Eastern.
859
            eastern_minutes = minutes_for_window.tz_convert("US/Eastern")
860
861
            # accumulate a list of indices of the last minute of an early
862
            # close day.  For example, if data_to_copy starts at 12:55 pm, and
863
            # there are five minutes of real data before 180 zeroes, we would
864
            # put 5 into last_minute_idx_of_early_close_day, because the fifth
865
            # minute is the last "real" minute of the day.
866
            last_minute_idx_of_early_close_day = []
867
            for minute_idx, minute_dt in enumerate(eastern_minutes):
868
                if minute_idx == (num_minutes - 1):
869
                    break
870
871
                if minute_dt.hour == 13 and minute_dt.minute == 0:
872
                    next_minute = eastern_minutes[minute_idx + 1]
873
                    if next_minute.hour != 13:
874
                        # minute_dt is the last minute of an early close day
875
                        last_minute_idx_of_early_close_day.append(minute_idx)
876
877
            # spin through the list of early close markers, and use them to
878
            # chop off 180 minutes at a time from data_to_copy.
879
            for idx, early_close_minute_idx in \
880
                    enumerate(last_minute_idx_of_early_close_day):
881
                early_close_minute_idx -= (180 * idx)
882
                data_to_copy = np.delete(
883
                    data_to_copy,
884
                    range(
885
                        early_close_minute_idx + 1,
886
                        early_close_minute_idx + 181
887
                    )
888
                )
889
890
        return_data[0:len(data_to_copy)] = data_to_copy
891
892
        self._apply_all_adjustments(
893
            return_data,
894
            asset,
895
            minutes_for_window,
896
            field,
897
            self.MINUTE_PRICE_ADJUSTMENT_FACTOR
898
        )
899
900
        return return_data
901
902
    def _apply_all_adjustments(self, data, asset, dts, field,
903
                               price_adj_factor=1.0):
904
        """
905
        Internal method that applies all the necessary adjustments on the
906
        given data array.
907
908
        The adjustments are:
909
        - splits
910
        - if field != "volume":
911
            - mergers
912
            - dividends
913
            - * 0.001
914
            - any zero fields replaced with NaN
915
        - all values rounded to 3 digits after the decimal point.
916
917
        Parameters
918
        ----------
919
        data : np.array
920
            The data to be adjusted.
921
922
        asset: Asset
923
            The asset whose data is being adjusted.
924
925
        dts: pd.DateTimeIndex
926
            The list of minutes or days representing the desired window.
927
928
        field: string
929
            The field whose values are in the data array.
930
931
        price_adj_factor: float
932
            Factor with which to adjust OHLC values.
933
        Returns
934
        -------
935
        None.  The data array is modified in place.
936
        """
937
        self._apply_adjustments_to_window(
938
            self._get_adjustment_list(
939
                asset, self._splits_dict, "SPLITS"
940
            ),
941
            data,
942
            dts,
943
            field != 'volume'
944
        )
945
946
        if field != 'volume':
947
            self._apply_adjustments_to_window(
948
                self._get_adjustment_list(
949
                    asset, self._mergers_dict, "MERGERS"
950
                ),
951
                data,
952
                dts,
953
                True
954
            )
955
956
            self._apply_adjustments_to_window(
957
                self._get_adjustment_list(
958
                    asset, self._dividends_dict, "DIVIDENDS"
959
                ),
960
                data,
961
                dts,
962
                True
963
            )
964
965
            data *= price_adj_factor
966
967
            # if anything is zero, it's a missing bar, so replace it with NaN.
968
            # we only want to do this for non-volume fields, because a missing
969
            # volume should be 0.
970
            data[data == 0] = np.NaN
971
972
        np.around(data, 3, out=data)
973
974
    def _get_daily_window_for_sid(self, asset, field, days_in_window,
975
                                  extra_slot=True):
976
        """
977
        Internal method that gets a window of adjusted daily data for a sid
978
        and specified date range.  Used to support the history API method for
979
        daily bars.
980
981
        Parameters
982
        ----------
983
        asset : Asset
984
            The asset whose data is desired.
985
986
        start_dt: pandas.Timestamp
987
            The start of the desired window of data.
988
989
        bar_count: int
990
            The number of days of data to return.
991
992
        field: string
993
            The specific field to return.  "open", "high", "close_price", etc.
994
995
        extra_slot: boolean
996
            Whether to allocate an extra slot in the returned numpy array.
997
            This extra slot will hold the data for the last partial day.  It's
998
            much better to create it here than to create a copy of the array
999
            later just to add a slot.
1000
1001
        Returns
1002
        -------
1003
        A numpy array with requested values.  Any missing slots filled with
1004
        nan.
1005
1006
        """
1007
        bar_count = len(days_in_window)
1008
        # create an np.array of size bar_count
1009
        if extra_slot:
1010
            return_array = np.zeros((bar_count + 1,))
1011
        else:
1012
            return_array = np.zeros((bar_count,))
1013
1014
        return_array[:] = np.NAN
1015
1016
        start_date = self._get_asset_start_date(asset)
1017
        end_date = self._get_asset_end_date(asset)
1018
        day_slice = days_in_window.slice_indexer(start_date, end_date)
1019
        active_days = days_in_window[day_slice]
1020
1021
        if active_days.shape[0]:
1022
            data = self._daily_bar_reader.history_window(field,
1023
                                                         active_days[0],
1024
                                                         active_days[-1],
1025
                                                         asset)
1026
            return_array[day_slice] = data
1027
            self._apply_all_adjustments(
1028
                return_array,
1029
                asset,
1030
                active_days,
1031
                field,
1032
            )
1033
1034
        return return_array
1035
1036
    @staticmethod
1037
    def _apply_adjustments_to_window(adjustments_list, window_data,
1038
                                     dts_in_window, multiply):
1039
        if len(adjustments_list) == 0:
1040
            return
1041
1042
        # advance idx to the correct spot in the adjustments list, based on
1043
        # when the window starts
1044
        idx = 0
1045
1046
        while idx < len(adjustments_list) and dts_in_window[0] >\
1047
                adjustments_list[idx][0]:
1048
            idx += 1
1049
1050
        # if we've advanced through all the adjustments, then there's nothing
1051
        # to do.
1052
        if idx == len(adjustments_list):
1053
            return
1054
1055
        while idx < len(adjustments_list):
1056
            adjustment_to_apply = adjustments_list[idx]
1057
1058
            if adjustment_to_apply[0] > dts_in_window[-1]:
1059
                break
1060
1061
            range_end = dts_in_window.searchsorted(adjustment_to_apply[0])
1062
            if multiply:
1063
                window_data[0:range_end] *= adjustment_to_apply[1]
1064
            else:
1065
                window_data[0:range_end] /= adjustment_to_apply[1]
1066
1067
            idx += 1
1068
1069
    def _get_adjustment_list(self, asset, adjustments_dict, table_name):
1070
        """
1071
        Internal method that returns a list of adjustments for the given sid.
1072
1073
        Parameters
1074
        ----------
1075
        asset : Asset
1076
            The asset for which to return adjustments.
1077
1078
        adjustments_dict: dict
1079
            A dictionary of sid -> list that is used as a cache.
1080
1081
        table_name: string
1082
            The table that contains this data in the adjustments db.
1083
1084
        Returns
1085
        -------
1086
        adjustments: list
1087
            A list of [multiplier, pd.Timestamp], earliest first
1088
1089
        """
1090
        if self._adjustment_reader is None:
1091
            return []
1092
1093
        sid = int(asset)
1094
1095
        try:
1096
            adjustments = adjustments_dict[sid]
1097
        except KeyError:
1098
            adjustments = adjustments_dict[sid] = self._adjustment_reader.\
1099
                get_adjustments_for_sid(table_name, sid)
1100
1101
        return adjustments
1102
1103
    def _check_is_currently_alive(self, asset, dt):
1104
        sid = int(asset)
1105
1106
        if sid not in self._asset_start_dates:
1107
            self._get_asset_start_date(asset)
1108
1109
        start_date = self._asset_start_dates[sid]
1110
        if self._asset_start_dates[sid] > dt:
1111
            raise NoTradeDataAvailableTooEarly(
1112
                sid=sid,
1113
                dt=dt,
1114
                start_dt=start_date
1115
            )
1116
1117
        end_date = self._asset_end_dates[sid]
1118
        if self._asset_end_dates[sid] < dt:
1119
            raise NoTradeDataAvailableTooLate(
1120
                sid=sid,
1121
                dt=dt,
1122
                end_dt=end_date
1123
            )
1124
1125
    def _get_asset_start_date(self, asset):
1126
        self._ensure_asset_dates(asset)
1127
        return self._asset_start_dates[asset]
1128
1129
    def _get_asset_end_date(self, asset):
1130
        self._ensure_asset_dates(asset)
1131
        return self._asset_end_dates[asset]
1132
1133
    def _ensure_asset_dates(self, asset):
1134
        sid = int(asset)
1135
1136
        if sid not in self._asset_start_dates:
1137
            self._asset_start_dates[sid] = asset.start_date
1138
            self._asset_end_dates[sid] = asset.end_date
1139
1140
    def get_splits(self, sids, dt):
1141
        """
1142
        Returns any splits for the given sids and the given dt.
1143
1144
        Parameters
1145
        ----------
1146
        sids : list
1147
            Sids for which we want splits.
1148
1149
        dt: pd.Timestamp
1150
            The date for which we are checking for splits.  Note: this is
1151
            expected to be midnight UTC.
1152
1153
        Returns
1154
        -------
1155
        list: List of splits, where each split is a (sid, ratio) tuple.
1156
        """
1157
        if self._adjustment_reader is None or len(sids) == 0:
1158
            return {}
1159
1160
        # convert dt to # of seconds since epoch, because that's what we use
1161
        # in the adjustments db
1162
        seconds = int(dt.value / 1e9)
1163
1164
        splits = self._adjustment_reader.conn.execute(
1165
            "SELECT sid, ratio FROM SPLITS WHERE effective_date = ?",
1166
            (seconds,)).fetchall()
1167
1168
        sids_set = set(sids)
1169
        splits = [split for split in splits if split[0] in sids_set]
1170
1171
        return splits
1172
1173
    def get_stock_dividends(self, sid, trading_days):
1174
        """
1175
        Returns all the stock dividends for a specific sid that occur
1176
        in the given trading range.
1177
1178
        Parameters
1179
        ----------
1180
        sid: int
1181
            The asset whose stock dividends should be returned.
1182
1183
        trading_days: pd.DatetimeIndex
1184
            The trading range.
1185
1186
        Returns
1187
        -------
1188
        list: A list of objects with all relevant attributes populated.
1189
        All timestamp fields are converted to pd.Timestamps.
1190
        """
1191
1192
        if self._adjustment_reader is None:
1193
            return []
1194
1195
        if len(trading_days) == 0:
1196
            return []
1197
1198
        start_dt = trading_days[0].value / 1e9
1199
        end_dt = trading_days[-1].value / 1e9
1200
1201
        dividends = self._adjustment_reader.conn.execute(
1202
            "SELECT * FROM stock_dividend_payouts WHERE sid = ? AND "
1203
            "ex_date > ? AND pay_date < ?", (int(sid), start_dt, end_dt,)).\
1204
            fetchall()
1205
1206
        dividend_info = []
1207
        for dividend_tuple in dividends:
1208
            dividend_info.append({
1209
                "declared_date": dividend_tuple[1],
1210
                "ex_date": pd.Timestamp(dividend_tuple[2], unit="s"),
1211
                "pay_date": pd.Timestamp(dividend_tuple[3], unit="s"),
1212
                "payment_sid": dividend_tuple[4],
1213
                "ratio": dividend_tuple[5],
1214
                "record_date": pd.Timestamp(dividend_tuple[6], unit="s"),
1215
                "sid": dividend_tuple[7]
1216
            })
1217
1218
        return dividend_info
1219
1220
    def contains(self, asset, field):
1221
        return field in BASE_FIELDS or \
1222
            (field in self._augmented_sources_map and
1223
             asset in self._augmented_sources_map[field])
1224
1225
    def get_fetcher_assets(self, day):
1226
        """
1227
        Returns a list of assets for the current date, as defined by the
1228
        fetcher data.
1229
1230
        Notes
1231
        -----
1232
        Data is forward-filled.  If there is no fetcher data defined for day
1233
        N, we use day N-1's data (if available, otherwise we keep going back).
1234
1235
        Returns
1236
        -------
1237
        list: a list of Asset objects.
1238
        """
1239
        # return a list of assets for the current date, as defined by the
1240
        # fetcher source
1241
        if self._extra_source_df is None:
1242
            return []
1243
1244
        if day in self._extra_source_df.index:
1245
            date_to_use = day
1246
        else:
1247
            # current day isn't in the fetcher df, go back the last
1248
            # available day
1249
            idx = self._extra_source_df.index.searchsorted(day)
1250
            if idx == 0:
1251
                return []
1252
1253
            date_to_use = self._extra_source_df.index[idx - 1]
1254
1255
        asset_list = self._extra_source_df.loc[date_to_use]["sid"]
1256
1257
        # make sure they're actually assets
1258
        asset_list = [asset for asset in asset_list
1259
                      if isinstance(asset, Asset)]
1260
1261
        return asset_list
1262