Completed
Pull Request — master (#858)
by Eddie
02:03
created

zipline.data.DataPortal   F

Complexity

Total Complexity 160

Size/Duplication

Total Lines 1254
Duplicated Lines 0 %
Metric Value
dl 0
loc 1254
rs 0.6316
wmc 160

32 Methods

Rating   Name   Duplication   Size   Complexity  
D _apply_adjustments_to_window() 0 32 8
B _get_adjustment_list() 0 33 3
C _get_minute_spot_value() 0 58 7
F _get_adjusted_value() 0 73 14
B __init__() 0 45 1
C _get_history_daily_window_equity() 0 58 8
B _get_minute_spot_value_future() 0 29 5
C _get_history_minute_window() 0 59 7
A _check_extra_sources() 0 21 3
B _get_ctable() 0 32 6
B get_previous_value() 0 34 3
B _get_history_daily_window() 0 31 4
D _get_history_daily_window_future() 0 56 11
A _open_minute_file() 0 10 2
A get_last_traded_dt() 0 11 3
A _get_daily_data() 0 10 4
B get_spot_value() 0 56 6
D handle_extra_source() 0 74 8
F get_history_window() 0 82 12
A _check_is_currently_alive() 0 20 4
A _get_asset_start_date() 0 3 1
B get_stock_dividends() 0 46 4
A _get_asset_end_date() 0 3 1
F _get_minute_window_for_equity() 0 81 11
B get_splits() 0 32 5
A contains() 0 4 1
A _ensure_asset_dates() 0 6 2
B _get_minute_window_for_asset() 0 33 3
A _apply_all_adjustments() 0 71 2
A _get_minute_window_for_future() 0 18 2
A _get_daily_window_for_sid() 0 61 3
B get_fetcher_assets() 0 37 6

How to fix   Complexity   

Complex Class

Complex classes like zipline.data.DataPortal often do a lot of different things. To break such a class down, we need to identify a cohesive component within that class. A common approach to find such a component is to look for fields/methods that share the same prefixes, or suffixes.

Once you have determined the fields that belong together, you can apply the Extract Class refactoring. If the component makes sense as a sub-class, Extract Subclass is also a candidate, and is often faster.

1
#
2
# Copyright 2015 Quantopian, Inc.
3
#
4
# Licensed under the Apache License, Version 2.0 (the "License");
5
# you may not use this file except in compliance with the License.
6
# You may obtain a copy of the License at
7
#
8
#     http://www.apache.org/licenses/LICENSE-2.0
9
#
10
# Unless required by applicable law or agreed to in writing, software
11
# distributed under the License is distributed on an "AS IS" BASIS,
12
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13
# See the License for the specific language governing permissions and
14
# limitations under the License.
15
from operator import mul
16
17
import bcolz
18
from logbook import Logger
19
20
import numpy as np
21
import pandas as pd
22
from pandas.tslib import normalize_date
23
from six import iteritems
24
from six.moves import reduce
25
26
from zipline.assets import Asset, Future, Equity
27
from zipline.data.us_equity_pricing import NoDataOnDate
28
29
from zipline.utils import tradingcalendar
30
from zipline.errors import (
31
    NoTradeDataAvailableTooEarly,
32
    NoTradeDataAvailableTooLate
33
)
34
35
log = Logger('DataPortal')
36
37
BASE_FIELDS = {
38
    'open': 'open',
39
    'open_price': 'open',
40
    'high': 'high',
41
    'low': 'low',
42
    'close': 'close',
43
    'close_price': 'close',
44
    'volume': 'volume',
45
    'price': 'close'
46
}
47
48
HISTORY_FREQUENCIES = set(["1m", "1d"])
49
50
51
class DataPortal(object):
52
    def __init__(self,
53
                 env,
54
                 equity_daily_reader=None,
55
                 equity_minute_reader=None,
56
                 future_daily_reader=None,
57
                 future_minute_reader=None,
58
                 adjustment_reader=None):
59
        self.env = env
60
61
        self.views = {}
62
63
        self._asset_finder = env.asset_finder
64
65
        self._carrays = {
66
            'open': {},
67
            'high': {},
68
            'low': {},
69
            'close': {},
70
            'volume': {},
71
            'sid': {},
72
            'dt': {},
73
        }
74
75
        self._adjustment_reader = adjustment_reader
76
77
        # caches of sid -> adjustment list
78
        self._splits_dict = {}
79
        self._mergers_dict = {}
80
        self._dividends_dict = {}
81
82
        # Cache of sid -> the first trading day of an asset, even if that day
83
        # is before 1/2/2002.
84
        self._asset_start_dates = {}
85
        self._asset_end_dates = {}
86
87
        # Handle extra sources, like Fetcher.
88
        self._augmented_sources_map = {}
89
        self._extra_source_df = None
90
91
        self.MINUTE_PRICE_ADJUSTMENT_FACTOR = 0.001
92
93
        self._equity_daily_reader = equity_daily_reader
94
        self._equity_minute_reader = equity_minute_reader
95
        self._future_daily_reader = future_daily_reader
96
        self._future_minute_reader = future_minute_reader
97
98
    def handle_extra_source(self, source_df, sim_params):
99
        """
100
        Extra sources always have a sid column.
101
102
        We expand the given data (by forward filling) to the full range of
103
        the simulation dates, so that lookup is fast during simulation.
104
        """
105
        if source_df is None:
106
            return
107
108
        self._extra_source_df = source_df
109
110
        # source_df's sid column can either consist of assets we know about
111
        # (such as sid(24)) or of assets we don't know about (such as
112
        # palladium).
113
        #
114
        # In both cases, we break up the dataframe into individual dfs
115
        # that only contain a single asset's information.  ie, if source_df
116
        # has data for PALLADIUM and GOLD, we split source_df into two
117
        # dataframes, one for each. (same applies if source_df has data for
118
        # AAPL and IBM).
119
        #
120
        # We then take each child df and reindex it to the simulation's date
121
        # range by forward-filling missing values. this makes reads simpler.
122
        #
123
        # Finally, we store the data. For each column, we store a mapping in
124
        # self.augmented_sources_map from the column to a dictionary of
125
        # asset -> df.  In other words,
126
        # self.augmented_sources_map['days_to_cover']['AAPL'] gives us the df
127
        # holding that data.
128
129
        if sim_params.emission_rate == "daily":
130
            source_date_index = self.env.days_in_range(
131
                start=sim_params.period_start,
132
                end=sim_params.period_end
133
            )
134
        else:
135
            source_date_index = self.env.minutes_for_days_in_range(
136
                start=sim_params.period_start,
137
                end=sim_params.period_end
138
            )
139
140
        # Break the source_df up into one dataframe per sid.  This lets
141
        # us (more easily) calculate accurate start/end dates for each sid,
142
        # de-dup data, and expand the data to fit the backtest start/end date.
143
        grouped_by_sid = source_df.groupby(["sid"])
144
        group_names = grouped_by_sid.groups.keys()
145
        group_dict = {}
146
        for group_name in group_names:
147
            group_dict[group_name] = grouped_by_sid.get_group(group_name)
148
149
        for identifier, df in iteritems(group_dict):
150
            # Before reindexing, save the earliest and latest dates
151
            earliest_date = df.index[0]
152
            latest_date = df.index[-1]
153
154
            # Since we know this df only contains a single sid, we can safely
155
            # de-dupe by the index (dt)
156
            df = df.groupby(level=0).last()
157
158
            # Reindex the dataframe based on the backtest start/end date.
159
            # This makes reads easier during the backtest.
160
            df = df.reindex(index=source_date_index, method='ffill')
161
162
            if not isinstance(identifier, Asset):
163
                # for fake assets we need to store a start/end date
164
                self._asset_start_dates[identifier] = earliest_date
165
                self._asset_end_dates[identifier] = latest_date
166
167
            for col_name in df.columns.difference(['sid']):
168
                if col_name not in self._augmented_sources_map:
169
                    self._augmented_sources_map[col_name] = {}
170
171
                self._augmented_sources_map[col_name][identifier] = df
172
173
    def _open_minute_file(self, field, asset):
174
        sid_str = str(int(asset))
175
176
        try:
177
            carray = self._carrays[field][sid_str]
178
        except KeyError:
179
            carray = self._carrays[field][sid_str] = \
180
                self._get_ctable(asset)[field]
181
182
        return carray
183
184
    def _get_ctable(self, asset):
185
        sid = int(asset)
186
187
        if isinstance(asset, Future):
188
            if self._future_minute_reader.sid_path_func is not None:
189
                path = self._future_minute_reader.sid_path_func(
190
                    self._future_minute_reader.rootdir, sid
191
                )
192
            else:
193
                path = "{0}/{1}.bcolz".format(
194
                    self._future_minute_reader.rootdir, sid)
195
        elif isinstance(asset, Equity):
196
            if self._equity_minute_reader.sid_path_func is not None:
197
                path = self._equity_minute_reader.sid_path_func(
198
                    self._equity_minute_reader.rootdir, sid
199
                )
200
            else:
201
                path = "{0}/{1}.bcolz".format(
202
                    self._equity_minute_reader.rootdir, sid)
203
204
        else:
205
            # TODO: Figure out if assets should be allowed if neither, and
206
            # why this code path is being hit.
207
            if self._equity_minute_reader.sid_path_func is not None:
208
                path = self._equity_minute_reader.sid_path_func(
209
                    self._equity_minute_reader.rootdir, sid
210
                )
211
            else:
212
                path = "{0}/{1}.bcolz".format(
213
                    self._equity_minute_reader.rootdir, sid)
214
215
        return bcolz.open(path, mode='r')
216
217
    def get_last_traded_dt(self, asset, dt, data_frequency):
218
        """
219
        Given an asset and dt, returns the last traded dt from the viewpoint
220
        of the given dt.
221
222
        If there is a trade on the dt, the answer is dt provided.
223
        """
224
        if data_frequency == 'minute':
225
            return self._equity_minute_reader.get_last_traded_dt(asset, dt)
226
        elif data_frequency == 'daily':
227
            return self._equity_daily_reader.get_last_traded_dt(asset, dt)
228
229
    def get_previous_value(self, asset, field, dt, data_frequency):
230
        """
231
        Given an asset and a column and a dt, returns the previous value for
232
        the same asset/column pair.  If this data portal is in minute mode,
233
        it's the previous minute value, otherwise it's the previous day's
234
        value.
235
236
        Parameters
237
        ---------
238
        asset : Asset
239
            The asset whose data is desired.
240
241
        field: string
242
            The desired field of the asset.  Valid values are "open",
243
            "open_price", "high", "low", "close", "close_price", "volume", and
244
            "price".
245
246
        dt: pd.Timestamp
247
            The timestamp from which to go back in time one slot.
248
249
        data_frequency: string
250
            The frequency of the data to query; i.e. whether the data is
251
            'daily' or 'minute' bars
252
253
        Returns
254
        -------
255
        The value of the desired field at the desired time.
256
        """
257
        if data_frequency == 'daily':
258
            prev_dt = self.env.previous_trading_day(dt)
259
        elif data_frequency == 'minute':
260
            prev_dt = self.env.previous_market_minute(dt)
261
262
        return self.get_spot_value(asset, field, prev_dt, data_frequency)
263
264
    def _check_extra_sources(self, asset, column, day):
265
        # If we have an extra source with a column called "price", only look
266
        # at it if it's on something like palladium and not AAPL (since our
267
        # own price data always wins when dealing with assets).
268
        look_in_augmented_sources = column in self._augmented_sources_map and \
269
            not (column in BASE_FIELDS and isinstance(asset, Asset))
270
271
        if look_in_augmented_sources:
272
            # we're being asked for a field in an extra source
273
            try:
274
                return self._augmented_sources_map[column][asset].\
275
                    loc[day, column]
276
            except:
277
                log.error(
278
                    "Could not find value for asset={0}, day={1},"
279
                    "column={2}".format(
280
                        str(asset),
281
                        str(day),
282
                        str(column)))
283
284
                raise KeyError
285
286
    def get_spot_value(self, asset, field, dt, data_frequency):
287
        """
288
        Public API method that returns a scalar value representing the value
289
        of the desired asset's field at either the given dt.
290
291
        Parameters
292
        ---------
293
        asset : Asset
294
            The asset whose data is desired.gith
295
296
        field: string
297
            The desired field of the asset.  Valid values are "open",
298
            "open_price", "high", "low", "close", "close_price", "volume", and
299
            "price".
300
301
        dt: pd.Timestamp
302
            The timestamp for the desired value.
303
304
        data_frequency: string
305
            The frequency of the data to query; i.e. whether the data is
306
            'daily' or 'minute' bars
307
308
        Returns
309
        -------
310
        The value of the desired field at the desired time.
311
        """
312
        extra_source_val = self._check_extra_sources(
313
            asset,
314
            field,
315
            dt,
316
        )
317
318
        if extra_source_val is not None:
319
            return extra_source_val
320
321
        if field not in BASE_FIELDS:
322
            raise KeyError("Invalid column: " + str(field))
323
324
        column_to_use = BASE_FIELDS[field]
325
326
        if isinstance(asset, int):
327
            asset = self._asset_finder.retrieve_asset(asset)
328
329
        self._check_is_currently_alive(asset, dt)
330
331
        if data_frequency == "daily":
332
            day_to_use = dt
333
            day_to_use = normalize_date(day_to_use)
334
            return self._get_daily_data(asset, column_to_use, day_to_use)
335
        else:
336
            if isinstance(asset, Future):
337
                return self._get_minute_spot_value_future(
338
                    asset, column_to_use, dt)
339
            else:
340
                return self._get_minute_spot_value(
341
                    asset, column_to_use, dt)
342
343
    def _get_adjusted_value(self, asset, field, dt,
344
                            perspective_dt,
345
                            data_frequency):
346
        """
347
        Private method that returns a scalar value representing the value
348
        of the desired asset's field at the given dt with adjustments applied.
349
350
        Parameters
351
        ---------
352
        asset : Asset
353
            The asset whose data is desired.
354
355
        field: string
356
            The desired field of the asset.  Valid values are "open",
357
            "open_price", "high", "low", "close", "close_price", "volume", and
358
            "price".
359
360
        dt: pd.Timestamp
361
            The timestamp for the desired value.
362
363
        perspective_dt : pd.Timestamp
364
            The timestamp from which the data is being viewed back from.
365
366
        data_frequency: string
367
            The frequency of the data to query; i.e. whether the data is
368
            'daily' or 'minute' bars
369
370
        Returns
371
        -------
372
        The value of the desired field at the desired time.
373
        """
374
        if isinstance(asset, int):
375
            asset = self._asset_finder.retrieve_asset(asset)
376
377
        spot_value = self.get_spot_value(asset, field, dt, data_frequency)
378
379
        if isinstance(asset, Equity):
380
            adjs = []
381
            split_adjustments = self._get_adjustment_list(
382
                asset, self._splits_dict, "SPLITS"
383
            )
384
            for adj_dt, adj in split_adjustments:
385
                if adj_dt < dt:
386
                    if field != 'volume':
387
                        adjs.append(adj)
388
                    else:
389
                        adjs.append(1.0 / adj)
390
                if adj_dt >= perspective_dt:
391
                    break
392
393
            if field != 'volume':
394
                merger_adjustments = self._get_adjustment_list(
395
                    asset, self._mergers_dict, "MERGERS"
396
                )
397
                for adj_dt, adj in merger_adjustments:
398
                    if adj_dt < dt:
399
                        adjs.append(adj)
400
                    if adj_dt >= perspective_dt:
401
                        break
402
                div_adjustments = self._get_adjustment_list(
403
                    asset, self._dividends_dict, "DIVIDENDS",
404
                )
405
                for adj_dt, adj in div_adjustments:
406
                    if adj_dt < dt:
407
                        adjs.append(adj)
408
                    if adj_dt >= perspective_dt:
409
                        break
410
411
            ratio = reduce(mul, adjs, 1.0)
412
413
            spot_value *= ratio
414
415
        return spot_value
416
417
    def _get_minute_spot_value_future(self, asset, column, dt):
418
        # Futures bcolz files have 1440 bars per day (24 hours), 7 days a week.
419
        # The file attributes contain the "start_dt" and "last_dt" fields,
420
        # which represent the time period for this bcolz file.
421
422
        # The start_dt is midnight of the first day that this future started
423
        # trading.
424
425
        # figure out the # of minutes between dt and this asset's start_dt
426
        start_date = self._get_asset_start_date(asset)
427
        minute_offset = int((dt - start_date).total_seconds() / 60)
428
429
        if minute_offset < 0:
430
            # asking for a date that is before the asset's start date, no dice
431
            return 0.0
432
433
        # then just index into the bcolz carray at that offset
434
        carray = self._open_minute_file(column, asset)
435
        result = carray[minute_offset]
436
437
        # if there's missing data, go backwards until we run out of file
438
        while result == 0 and minute_offset > 0:
439
            minute_offset -= 1
440
            result = carray[minute_offset]
441
442
        if column != 'volume':
443
            return result * self.MINUTE_PRICE_ADJUSTMENT_FACTOR
444
        else:
445
            return result
446
447
    def _get_minute_spot_value(self, asset, column, dt):
448
        # if dt is before the first market minute, minute_index
449
        # will be 0.  if it's after the last market minute, it'll
450
        # be len(minutes_for_day)
451
        minute_offset_to_use = \
452
            self._equity_minute_reader._find_position_of_minute(dt)
453
454
        carray = self._equity_minute_reader._open_minute_file(column, asset)
455
        result = carray[minute_offset_to_use]
456
457
        if result == 0:
458
            # if the given minute doesn't have data, we need to seek
459
            # backwards until we find data. This makes the data
460
            # forward-filled.
461
462
            # get this asset's start date, so that we don't look before it.
463
            start_date = self._get_asset_start_date(asset)
464
            start_date_idx = self._equity_minute_reader.trading_days.\
465
                searchsorted(start_date)
466
            start_day_offset = start_date_idx * 390
467
468
            original_start = minute_offset_to_use
469
470
            while result == 0 and minute_offset_to_use > start_day_offset:
471
                minute_offset_to_use -= 1
472
                result = carray[minute_offset_to_use]
473
474
            # once we've found data, we need to check whether it needs
475
            # to be adjusted.
476
            if result != 0:
477
                minutes = self.env.market_minute_window(
478
                    start=dt,
479
                    count=(original_start - minute_offset_to_use + 1),
480
                    step=-1
481
                ).order()
482
483
                # only need to check for adjustments if we've gone back
484
                # far enough to cross the day boundary.
485
                if minutes[0].date() != minutes[-1].date():
486
                    # create a np array of size minutes, fill it all with
487
                    # the same value.  and adjust the array.
488
                    arr = np.array([result] * len(minutes),
489
                                   dtype=np.float64)
490
                    self._apply_all_adjustments(
491
                        data=arr,
492
                        asset=asset,
493
                        dts=minutes,
494
                        field=column
495
                    )
496
497
                    # The first value of the adjusted array is the value
498
                    # we want.
499
                    result = arr[0]
500
501
        if column != 'volume':
502
            return result * self.MINUTE_PRICE_ADJUSTMENT_FACTOR
503
        else:
504
            return result
505
506
    def _get_daily_data(self, asset, column, dt):
507
        while True:
508
            try:
509
                value = self._equity_daily_reader.spot_price(asset, dt, column)
510
                if value != -1:
511
                    return value
512
                else:
513
                    dt -= tradingcalendar.trading_day
514
            except NoDataOnDate:
515
                return 0
516
517
    def _get_history_daily_window(self, assets, end_dt, bar_count,
518
                                  field_to_use):
519
        """
520
        Internal method that returns a dataframe containing history bars
521
        of daily frequency for the given sids.
522
        """
523
        day_idx = tradingcalendar.trading_days.searchsorted(end_dt.date())
524
        days_for_window = tradingcalendar.trading_days[
525
            (day_idx - bar_count + 1):(day_idx + 1)]
526
527
        if len(assets) == 0:
528
            return pd.DataFrame(None,
529
                                index=days_for_window,
530
                                columns=None)
531
532
        data = []
533
534
        for asset in assets:
535
            if isinstance(asset, Future):
536
                data.append(self._get_history_daily_window_future(
537
                    asset, days_for_window, end_dt, field_to_use
538
                ))
539
            else:
540
                data.append(self._get_history_daily_window_equity(
541
                    asset, days_for_window, end_dt, field_to_use
542
                ))
543
544
        return pd.DataFrame(
545
            np.array(data).T,
546
            index=days_for_window,
547
            columns=assets
548
        )
549
550
    def _get_history_daily_window_future(self, asset, days_for_window,
551
                                         end_dt, column):
552
        # Since we don't have daily bcolz files for futures (yet), use minute
553
        # bars to calculate the daily values.
554
        data = []
555
        data_groups = []
556
557
        # get all the minutes for the days NOT including today
558
        for day in days_for_window[:-1]:
559
            minutes = self.env.market_minutes_for_day(day)
560
561
            values_for_day = np.zeros(len(minutes), dtype=np.float64)
562
563
            for idx, minute in enumerate(minutes):
564
                minute_val = self._get_minute_spot_value_future(
565
                    asset, column, minute
566
                )
567
568
                values_for_day[idx] = minute_val
569
570
            data_groups.append(values_for_day)
571
572
        # get the minutes for today
573
        last_day_minutes = pd.date_range(
574
            start=self.env.get_open_and_close(end_dt)[0],
575
            end=end_dt,
576
            freq="T"
577
        )
578
579
        values_for_last_day = np.zeros(len(last_day_minutes), dtype=np.float64)
580
581
        for idx, minute in enumerate(last_day_minutes):
582
            minute_val = self._get_minute_spot_value_future(
583
                asset, column, minute
584
            )
585
586
            values_for_last_day[idx] = minute_val
587
588
        data_groups.append(values_for_last_day)
589
590
        for group in data_groups:
591
            if len(group) == 0:
592
                continue
593
594
            if column == 'volume':
595
                data.append(np.sum(group))
596
            elif column == 'open':
597
                data.append(group[0])
598
            elif column == 'close':
599
                data.append(group[-1])
600
            elif column == 'high':
601
                data.append(np.amax(group))
602
            elif column == 'low':
603
                data.append(np.amin(group))
604
605
        return data
606
607
    def _get_history_daily_window_equity(self, asset, days_for_window,
608
                                         end_dt, field_to_use):
609
        sid = int(asset)
610
        ends_at_midnight = end_dt.hour == 0 and end_dt.minute == 0
611
612
        # get the start and end dates for this sid
613
        end_date = self._get_asset_end_date(asset)
614
615
        if ends_at_midnight or (days_for_window[-1] > end_date):
616
            # two cases where we use daily data for the whole range:
617
            # 1) the history window ends at midnight utc.
618
            # 2) the last desired day of the window is after the
619
            # last trading day, use daily data for the whole range.
620
            return self._get_daily_window_for_sid(
621
                asset,
622
                field_to_use,
623
                days_for_window,
624
                extra_slot=False
625
            )
626
        else:
627
            # for the last day of the desired window, use minute
628
            # data and aggregate it.
629
            all_minutes_for_day = self.env.market_minutes_for_day(
630
                pd.Timestamp(end_dt.date()))
631
632
            last_minute_idx = all_minutes_for_day.searchsorted(end_dt)
633
634
            # these are the minutes for the partial day
635
            minutes_for_partial_day =\
636
                all_minutes_for_day[0:(last_minute_idx + 1)]
637
638
            daily_data = self._get_daily_window_for_sid(
639
                sid,
640
                field_to_use,
641
                days_for_window[0:-1]
642
            )
643
644
            minute_data = self._get_minute_window_for_equity(
645
                sid,
646
                field_to_use,
647
                minutes_for_partial_day
648
            )
649
650
            if field_to_use == 'volume':
651
                minute_value = np.sum(minute_data)
652
            elif field_to_use == 'open':
653
                minute_value = minute_data[0]
654
            elif field_to_use == 'close':
655
                minute_value = minute_data[-1]
656
            elif field_to_use == 'high':
657
                minute_value = np.amax(minute_data)
658
            elif field_to_use == 'low':
659
                minute_value = np.amin(minute_data)
660
661
            # append the partial day.
662
            daily_data[-1] = minute_value
663
664
            return daily_data
665
666
    def _get_history_minute_window(self, assets, end_dt, bar_count,
667
                                   field_to_use):
668
        """
669
        Internal method that returns a dataframe containing history bars
670
        of minute frequency for the given sids.
671
        """
672
        # get all the minutes for this window
673
        minutes_for_window = self.env.market_minute_window(
674
            end_dt, bar_count, step=-1)[::-1]
675
676
        first_trading_day = self._equity_minute_reader.first_trading_day
677
678
        # but then cut it down to only the minutes after
679
        # the first trading day.
680
        modified_minutes_for_window = minutes_for_window[
681
            minutes_for_window.slice_indexer(first_trading_day)]
682
683
        modified_minutes_length = len(modified_minutes_for_window)
684
685
        if modified_minutes_length == 0:
686
            raise ValueError("Cannot calculate history window that ends"
687
                             "before 2002-01-02 14:31 UTC!")
688
689
        data = []
690
        bars_to_prepend = 0
691
        nans_to_prepend = None
692
693
        if modified_minutes_length < bar_count:
694
            first_trading_date = first_trading_day.date()
695
            if modified_minutes_for_window[0].date() == first_trading_date:
696
                # the beginning of the window goes before our global trading
697
                # start date
698
                bars_to_prepend = bar_count - modified_minutes_length
699
                nans_to_prepend = np.repeat(np.nan, bars_to_prepend)
700
701
        if len(assets) == 0:
702
            return pd.DataFrame(
703
                None,
704
                index=modified_minutes_for_window,
705
                columns=None
706
            )
707
708
        for asset in assets:
709
            asset_minute_data = self._get_minute_window_for_asset(
710
                asset,
711
                field_to_use,
712
                modified_minutes_for_window
713
            )
714
715
            if bars_to_prepend != 0:
716
                asset_minute_data = np.insert(asset_minute_data, 0,
717
                                              nans_to_prepend)
718
719
            data.append(asset_minute_data)
720
721
        return pd.DataFrame(
722
            np.array(data).T,
723
            index=minutes_for_window,
724
            columns=assets
725
        )
726
727
    def get_history_window(self, assets, end_dt, bar_count, frequency, field,
728
                           ffill=True):
729
        """
730
        Public API method that returns a dataframe containing the requested
731
        history window.  Data is fully adjusted.
732
733
        Parameters
734
        ---------
735
        assets : list of zipline.data.Asset objects
736
            The assets whose data is desired.
737
738
        bar_count: int
739
            The number of bars desired.
740
741
        frequency: string
742
            "1d" or "1m"
743
744
        field: string
745
            The desired field of the asset.
746
747
        ffill: boolean
748
            Forward-fill missing values. Only has effect if field
749
            is 'price'.
750
751
        Returns
752
        -------
753
        A dataframe containing the requested data.
754
        """
755
        try:
756
            field_to_use = BASE_FIELDS[field]
757
        except KeyError:
758
            raise ValueError("Invalid history field: " + str(field))
759
760
        # sanity check in case sids were passed in
761
        assets = np.array([
762
            (self.env.asset_finder.retrieve_asset(asset) if
763
             isinstance(asset, int) else asset) for asset in assets])
764
765
        if frequency == "1d":
766
            df = self._get_history_daily_window(assets, end_dt, bar_count,
767
                                                field_to_use)
768
        elif frequency == "1m":
769
            df = self._get_history_minute_window(assets, end_dt, bar_count,
770
                                                 field_to_use)
771
        else:
772
            raise ValueError("Invalid frequency: {0}".format(frequency))
773
774
        # forward-fill if needed
775
        if field == "price" and ffill:
776
            assets_with_nan_index = np.where(pd.isnull(df.iloc[0, :]))[0]
777
            assets_with_leading_nan = assets[assets_with_nan_index]
778
779
            if frequency == "1m":
780
                data_frequency = 'minute'
781
            elif frequency == "1d":
782
                data_frequency = 'daily'
783
            else:
784
                raise Exception(
785
                    "Only 1d and 1m are supported for forward-filling.")
786
787
            dt_to_fill = df.index[0]
788
789
            perspective_dt = df.index[-1]
790
            assets_with_leading_nan = np.where(pd.isnull(df.iloc[0]))[0]
791
            for missing_loc in assets_with_leading_nan:
792
                asset = assets[missing_loc]
793
                previous_dt = self.get_last_traded_dt(
794
                    asset, dt_to_fill, data_frequency)
795
                if pd.isnull(previous_dt):
796
                    continue
797
                previous_value = self._get_adjusted_value(
798
                    asset,
799
                    field,
800
                    previous_dt,
801
                    perspective_dt,
802
                    data_frequency,
803
                )
804
                df.iloc[0, missing_loc] = previous_value
805
806
            df.fillna(method='ffill', inplace=True)
807
808
        return df
809
810
    def _get_minute_window_for_asset(self, asset, field, minutes_for_window):
811
        """
812
        Internal method that gets a window of adjusted minute data for an asset
813
        and specified date range.  Used to support the history API method for
814
        minute bars.
815
816
        Missing bars are filled with NaN.
817
818
        Parameters
819
        ----------
820
        asset : Asset
821
            The asset whose data is desired.
822
823
        field: string
824
            The specific field to return.  "open", "high", "close_price", etc.
825
826
        minutes_for_window: pd.DateTimeIndex
827
            The list of minutes representing the desired window.  Each minute
828
            is a pd.Timestamp.
829
830
        Returns
831
        -------
832
        A numpy array with requested values.
833
        """
834
        if isinstance(asset, int):
835
            asset = self.env.asset_finder.retrieve_asset(asset)
836
837
        if isinstance(asset, Future):
838
            return self._get_minute_window_for_future(asset, field,
839
                                                      minutes_for_window)
840
        else:
841
            return self._get_minute_window_for_equity(asset, field,
842
                                                      minutes_for_window)
843
844
    def _get_minute_window_for_future(self, asset, field, minutes_for_window):
845
        # THIS IS TEMPORARY.  For now, we are only exposing futures within
846
        # equity trading hours (9:30 am to 4pm, Eastern).  The easiest way to
847
        # do this is to simply do a spot lookup for each desired minute.
848
        return_data = np.zeros(len(minutes_for_window), dtype=np.float64)
849
        for idx, minute in enumerate(minutes_for_window):
850
            return_data[idx] = \
851
                self._get_minute_spot_value_future(asset, field, minute)
852
853
        # Note: an improvement could be to find the consecutive runs within
854
        # minutes_for_window, and use them to read the underlying ctable
855
        # more efficiently.
856
857
        # Once futures are on 24-hour clock, then we can just grab all the
858
        # requested minutes in one shot from the ctable.
859
860
        # no adjustments for futures, yay.
861
        return return_data
862
863
    def _get_minute_window_for_equity(self, asset, field, minutes_for_window):
864
        # each sid's minutes are stored in a bcolz file
865
        # the bcolz file has 390 bars per day, starting at 1/2/2002, regardless
866
        # of when the asset started trading and regardless of half days.
867
        # for a half day, the second half is filled with zeroes.
868
869
        # find the position of start_dt in the entire timeline, go back
870
        # bar_count bars, and that's the unadjusted data
871
        raw_data = self._equity_minute_reader._open_minute_file(field, asset)
872
873
        try:
874
            start_idx = self._equity_minute_reader._find_position_of_minute(
875
                minutes_for_window[0])
876
        except KeyError:
877
            start_idx = 0
878
879
        try:
880
            end_idx = self._equity_minute_reader._find_position_of_minute(
881
                minutes_for_window[-1]) + 1
882
        except KeyError:
883
            end_idx = 0
884
885
        if end_idx == 0:
886
            # No data to return for minute window.
887
            return np.full(len(minutes_for_window), np.nan)
888
889
        return_data = np.zeros(len(minutes_for_window), dtype=np.float64)
890
891
        data_to_copy = raw_data[start_idx:end_idx]
892
893
        num_minutes = len(minutes_for_window)
894
895
        # data_to_copy contains all the zeros (from 1pm to 4pm of an early
896
        # close).  num_minutes is the number of actual trading minutes.  if
897
        # these two have different lengths, that means that we need to trim
898
        # away data due to early closes.
899
        if len(data_to_copy) != num_minutes:
900
            # get a copy of the minutes in Eastern time, since we depend on
901
            # an early close being at 1pm Eastern.
902
            eastern_minutes = minutes_for_window.tz_convert("US/Eastern")
903
904
            # accumulate a list of indices of the last minute of an early
905
            # close day.  For example, if data_to_copy starts at 12:55 pm, and
906
            # there are five minutes of real data before 180 zeroes, we would
907
            # put 5 into last_minute_idx_of_early_close_day, because the fifth
908
            # minute is the last "real" minute of the day.
909
            last_minute_idx_of_early_close_day = []
910
            for minute_idx, minute_dt in enumerate(eastern_minutes):
911
                if minute_idx == (num_minutes - 1):
912
                    break
913
914
                if minute_dt.hour == 13 and minute_dt.minute == 0:
915
                    next_minute = eastern_minutes[minute_idx + 1]
916
                    if next_minute.hour != 13:
917
                        # minute_dt is the last minute of an early close day
918
                        last_minute_idx_of_early_close_day.append(minute_idx)
919
920
            # spin through the list of early close markers, and use them to
921
            # chop off 180 minutes at a time from data_to_copy.
922
            for idx, early_close_minute_idx in \
923
                    enumerate(last_minute_idx_of_early_close_day):
924
                early_close_minute_idx -= (180 * idx)
925
                data_to_copy = np.delete(
926
                    data_to_copy,
927
                    range(
928
                        early_close_minute_idx + 1,
929
                        early_close_minute_idx + 181
930
                    )
931
                )
932
933
        return_data[0:len(data_to_copy)] = data_to_copy
934
935
        self._apply_all_adjustments(
936
            return_data,
937
            asset,
938
            minutes_for_window,
939
            field,
940
            self.MINUTE_PRICE_ADJUSTMENT_FACTOR
941
        )
942
943
        return return_data
944
945
    def _apply_all_adjustments(self, data, asset, dts, field,
946
                               price_adj_factor=1.0):
947
        """
948
        Internal method that applies all the necessary adjustments on the
949
        given data array.
950
951
        The adjustments are:
952
        - splits
953
        - if field != "volume":
954
            - mergers
955
            - dividends
956
            - * 0.001
957
            - any zero fields replaced with NaN
958
        - all values rounded to 3 digits after the decimal point.
959
960
        Parameters
961
        ----------
962
        data : np.array
963
            The data to be adjusted.
964
965
        asset: Asset
966
            The asset whose data is being adjusted.
967
968
        dts: pd.DateTimeIndex
969
            The list of minutes or days representing the desired window.
970
971
        field: string
972
            The field whose values are in the data array.
973
974
        price_adj_factor: float
975
            Factor with which to adjust OHLC values.
976
        Returns
977
        -------
978
        None.  The data array is modified in place.
979
        """
980
        self._apply_adjustments_to_window(
981
            self._get_adjustment_list(
982
                asset, self._splits_dict, "SPLITS"
983
            ),
984
            data,
985
            dts,
986
            field != 'volume'
987
        )
988
989
        if field != 'volume':
990
            self._apply_adjustments_to_window(
991
                self._get_adjustment_list(
992
                    asset, self._mergers_dict, "MERGERS"
993
                ),
994
                data,
995
                dts,
996
                True
997
            )
998
999
            self._apply_adjustments_to_window(
1000
                self._get_adjustment_list(
1001
                    asset, self._dividends_dict, "DIVIDENDS"
1002
                ),
1003
                data,
1004
                dts,
1005
                True
1006
            )
1007
1008
            data *= price_adj_factor
1009
1010
            # if anything is zero, it's a missing bar, so replace it with NaN.
1011
            # we only want to do this for non-volume fields, because a missing
1012
            # volume should be 0.
1013
            data[data == 0] = np.NaN
1014
1015
        np.around(data, 3, out=data)
1016
1017
    def _get_daily_window_for_sid(self, asset, field, days_in_window,
1018
                                  extra_slot=True):
1019
        """
1020
        Internal method that gets a window of adjusted daily data for a sid
1021
        and specified date range.  Used to support the history API method for
1022
        daily bars.
1023
1024
        Parameters
1025
        ----------
1026
        asset : Asset
1027
            The asset whose data is desired.
1028
1029
        start_dt: pandas.Timestamp
1030
            The start of the desired window of data.
1031
1032
        bar_count: int
1033
            The number of days of data to return.
1034
1035
        field: string
1036
            The specific field to return.  "open", "high", "close_price", etc.
1037
1038
        extra_slot: boolean
1039
            Whether to allocate an extra slot in the returned numpy array.
1040
            This extra slot will hold the data for the last partial day.  It's
1041
            much better to create it here than to create a copy of the array
1042
            later just to add a slot.
1043
1044
        Returns
1045
        -------
1046
        A numpy array with requested values.  Any missing slots filled with
1047
        nan.
1048
1049
        """
1050
        bar_count = len(days_in_window)
1051
        # create an np.array of size bar_count
1052
        if extra_slot:
1053
            return_array = np.zeros((bar_count + 1,))
1054
        else:
1055
            return_array = np.zeros((bar_count,))
1056
1057
        return_array[:] = np.NAN
1058
1059
        start_date = self._get_asset_start_date(asset)
1060
        end_date = self._get_asset_end_date(asset)
1061
        day_slice = days_in_window.slice_indexer(start_date, end_date)
1062
        active_days = days_in_window[day_slice]
1063
1064
        if active_days.shape[0]:
1065
            data = self._equity_daily_reader.history_window(field,
1066
                                                            active_days[0],
1067
                                                            active_days[-1],
1068
                                                            asset)
1069
            return_array[day_slice] = data
1070
            self._apply_all_adjustments(
1071
                return_array,
1072
                asset,
1073
                active_days,
1074
                field,
1075
            )
1076
1077
        return return_array
1078
1079
    @staticmethod
1080
    def _apply_adjustments_to_window(adjustments_list, window_data,
1081
                                     dts_in_window, multiply):
1082
        if len(adjustments_list) == 0:
1083
            return
1084
1085
        # advance idx to the correct spot in the adjustments list, based on
1086
        # when the window starts
1087
        idx = 0
1088
1089
        while idx < len(adjustments_list) and dts_in_window[0] >\
1090
                adjustments_list[idx][0]:
1091
            idx += 1
1092
1093
        # if we've advanced through all the adjustments, then there's nothing
1094
        # to do.
1095
        if idx == len(adjustments_list):
1096
            return
1097
1098
        while idx < len(adjustments_list):
1099
            adjustment_to_apply = adjustments_list[idx]
1100
1101
            if adjustment_to_apply[0] > dts_in_window[-1]:
1102
                break
1103
1104
            range_end = dts_in_window.searchsorted(adjustment_to_apply[0])
1105
            if multiply:
1106
                window_data[0:range_end] *= adjustment_to_apply[1]
1107
            else:
1108
                window_data[0:range_end] /= adjustment_to_apply[1]
1109
1110
            idx += 1
1111
1112
    def _get_adjustment_list(self, asset, adjustments_dict, table_name):
1113
        """
1114
        Internal method that returns a list of adjustments for the given sid.
1115
1116
        Parameters
1117
        ----------
1118
        asset : Asset
1119
            The asset for which to return adjustments.
1120
1121
        adjustments_dict: dict
1122
            A dictionary of sid -> list that is used as a cache.
1123
1124
        table_name: string
1125
            The table that contains this data in the adjustments db.
1126
1127
        Returns
1128
        -------
1129
        adjustments: list
1130
            A list of [multiplier, pd.Timestamp], earliest first
1131
1132
        """
1133
        if self._adjustment_reader is None:
1134
            return []
1135
1136
        sid = int(asset)
1137
1138
        try:
1139
            adjustments = adjustments_dict[sid]
1140
        except KeyError:
1141
            adjustments = adjustments_dict[sid] = self._adjustment_reader.\
1142
                get_adjustments_for_sid(table_name, sid)
1143
1144
        return adjustments
1145
1146
    def _check_is_currently_alive(self, asset, dt):
1147
        sid = int(asset)
1148
1149
        if sid not in self._asset_start_dates:
1150
            self._get_asset_start_date(asset)
1151
1152
        start_date = self._asset_start_dates[sid]
1153
        if self._asset_start_dates[sid] > dt:
1154
            raise NoTradeDataAvailableTooEarly(
1155
                sid=sid,
1156
                dt=normalize_date(dt),
1157
                start_dt=start_date
1158
            )
1159
1160
        end_date = self._asset_end_dates[sid]
1161
        if self._asset_end_dates[sid] < dt:
1162
            raise NoTradeDataAvailableTooLate(
1163
                sid=sid,
1164
                dt=normalize_date(dt),
1165
                end_dt=end_date
1166
            )
1167
1168
    def _get_asset_start_date(self, asset):
1169
        self._ensure_asset_dates(asset)
1170
        return self._asset_start_dates[asset]
1171
1172
    def _get_asset_end_date(self, asset):
1173
        self._ensure_asset_dates(asset)
1174
        return self._asset_end_dates[asset]
1175
1176
    def _ensure_asset_dates(self, asset):
1177
        sid = int(asset)
1178
1179
        if sid not in self._asset_start_dates:
1180
            self._asset_start_dates[sid] = asset.start_date
1181
            self._asset_end_dates[sid] = asset.end_date
1182
1183
    def get_splits(self, sids, dt):
1184
        """
1185
        Returns any splits for the given sids and the given dt.
1186
1187
        Parameters
1188
        ----------
1189
        sids : list
1190
            Sids for which we want splits.
1191
1192
        dt: pd.Timestamp
1193
            The date for which we are checking for splits.  Note: this is
1194
            expected to be midnight UTC.
1195
1196
        Returns
1197
        -------
1198
        list: List of splits, where each split is a (sid, ratio) tuple.
1199
        """
1200
        if self._adjustment_reader is None or len(sids) == 0:
1201
            return {}
1202
1203
        # convert dt to # of seconds since epoch, because that's what we use
1204
        # in the adjustments db
1205
        seconds = int(dt.value / 1e9)
1206
1207
        splits = self._adjustment_reader.conn.execute(
1208
            "SELECT sid, ratio FROM SPLITS WHERE effective_date = ?",
1209
            (seconds,)).fetchall()
1210
1211
        sids_set = set(sids)
1212
        splits = [split for split in splits if split[0] in sids_set]
1213
1214
        return splits
1215
1216
    def get_stock_dividends(self, sid, trading_days):
1217
        """
1218
        Returns all the stock dividends for a specific sid that occur
1219
        in the given trading range.
1220
1221
        Parameters
1222
        ----------
1223
        sid: int
1224
            The asset whose stock dividends should be returned.
1225
1226
        trading_days: pd.DatetimeIndex
1227
            The trading range.
1228
1229
        Returns
1230
        -------
1231
        list: A list of objects with all relevant attributes populated.
1232
        All timestamp fields are converted to pd.Timestamps.
1233
        """
1234
1235
        if self._adjustment_reader is None:
1236
            return []
1237
1238
        if len(trading_days) == 0:
1239
            return []
1240
1241
        start_dt = trading_days[0].value / 1e9
1242
        end_dt = trading_days[-1].value / 1e9
1243
1244
        dividends = self._adjustment_reader.conn.execute(
1245
            "SELECT * FROM stock_dividend_payouts WHERE sid = ? AND "
1246
            "ex_date > ? AND pay_date < ?", (int(sid), start_dt, end_dt,)).\
1247
            fetchall()
1248
1249
        dividend_info = []
1250
        for dividend_tuple in dividends:
1251
            dividend_info.append({
1252
                "declared_date": dividend_tuple[1],
1253
                "ex_date": pd.Timestamp(dividend_tuple[2], unit="s"),
1254
                "pay_date": pd.Timestamp(dividend_tuple[3], unit="s"),
1255
                "payment_sid": dividend_tuple[4],
1256
                "ratio": dividend_tuple[5],
1257
                "record_date": pd.Timestamp(dividend_tuple[6], unit="s"),
1258
                "sid": dividend_tuple[7]
1259
            })
1260
1261
        return dividend_info
1262
1263
    def contains(self, asset, field):
1264
        return field in BASE_FIELDS or \
1265
            (field in self._augmented_sources_map and
1266
             asset in self._augmented_sources_map[field])
1267
1268
    def get_fetcher_assets(self, day):
1269
        """
1270
        Returns a list of assets for the current date, as defined by the
1271
        fetcher data.
1272
1273
        Notes
1274
        -----
1275
        Data is forward-filled.  If there is no fetcher data defined for day
1276
        N, we use day N-1's data (if available, otherwise we keep going back).
1277
1278
        Returns
1279
        -------
1280
        list: a list of Asset objects.
1281
        """
1282
        # return a list of assets for the current date, as defined by the
1283
        # fetcher source
1284
        if self._extra_source_df is None:
1285
            return []
1286
1287
        if day in self._extra_source_df.index:
1288
            date_to_use = day
1289
        else:
1290
            # current day isn't in the fetcher df, go back the last
1291
            # available day
1292
            idx = self._extra_source_df.index.searchsorted(day)
1293
            if idx == 0:
1294
                return []
1295
1296
            date_to_use = self._extra_source_df.index[idx - 1]
1297
1298
        asset_list = self._extra_source_df.loc[date_to_use]["sid"]
1299
1300
        # make sure they're actually assets
1301
        asset_list = [asset for asset in asset_list
1302
                      if isinstance(asset, Asset)]
1303
1304
        return asset_list
1305