Completed
Pull Request — master (#858)
by Eddie
01:35
created

zipline.data.DataPortal   F

Complexity

Total Complexity 151

Size/Duplication

Total Lines 1268
Duplicated Lines 0 %
Metric Value
dl 0
loc 1268
rs 0.6316
wmc 151

35 Methods

Rating   Name   Duplication   Size   Complexity  
C get_history_window() 0 51 8
C _get_minute_spot_value() 0 70 8
A minute_bar_reader() 0 6 2
B _check_is_currently_alive() 0 23 5
A setup_offset_cache() 0 16 3
B __init__() 0 92 3
A _get_asset_start_date() 0 3 1
C _get_history_daily_window_equity() 0 58 8
B get_stock_dividends() 0 46 4
A _get_asset_end_date() 0 3 1
D _get_minute_window_for_equity() 0 73 9
B get_splits() 0 32 5
B _get_minute_spot_value_future() 0 29 5
A get_equity_price_view() 0 20 2
A contains() 0 4 1
A _ensure_asset_dates() 0 6 2
B _get_minute_window_for_asset() 0 33 3
C _get_history_minute_window() 0 59 7
A _check_extra_sources() 0 21 3
B _get_ctable() 0 22 5
D _apply_adjustments_to_window() 0 32 8
A _minute_offset() 0 8 3
A _apply_all_adjustments() 0 71 2
B get_previous_value() 0 30 3
B _get_history_daily_window() 0 31 4
A _get_minute_window_for_future() 0 18 2
B _get_adjustment_list() 0 33 3
D _get_history_daily_window_future() 0 56 11
A _open_minute_file() 0 10 2
B _find_position_of_minute() 0 38 2
A _get_daily_window_for_sid() 0 60 2
A _get_daily_data() 0 10 4
B get_spot_value() 0 55 6
D handle_extra_source() 0 74 8
B get_fetcher_assets() 0 37 6

How to fix   Complexity   

Complex Class

Complex classes like zipline.data.DataPortal often do a lot of different things. To break such a class down, we need to identify a cohesive component within that class. A common approach to find such a component is to look for fields/methods that share the same prefixes, or suffixes.

Once you have determined the fields that belong together, you can apply the Extract Class refactoring. If the component makes sense as a sub-class, Extract Subclass is also a candidate, and is often faster.

1
#
2
# Copyright 2015 Quantopian, Inc.
3
#
4
# Licensed under the Apache License, Version 2.0 (the "License");
5
# you may not use this file except in compliance with the License.
6
# You may obtain a copy of the License at
7
#
8
#     http://www.apache.org/licenses/LICENSE-2.0
9
#
10
# Unless required by applicable law or agreed to in writing, software
11
# distributed under the License is distributed on an "AS IS" BASIS,
12
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13
# See the License for the specific language governing permissions and
14
# limitations under the License.
15
16
from datetime import datetime
17
import bcolz
18
from logbook import Logger
19
20
import numpy as np
21
import pandas as pd
22
from pandas.tslib import normalize_date
23
from six import iteritems
24
25
from zipline.assets import Asset, Future, Equity
26
from zipline.data.us_equity_pricing import (
27
    BcolzDailyBarReader,
28
    NoDataOnDate
29
)
30
from zipline.data.us_equity_minutes import BcolzMinuteBarReader
31
from zipline.pipeline.data.equity_pricing import USEquityPricing
32
33
from zipline.utils import tradingcalendar
34
from zipline.errors import (
35
    NoTradeDataAvailableTooEarly,
36
    NoTradeDataAvailableTooLate
37
)
38
39
log = Logger('DataPortal')
40
41
HISTORY_FREQUENCIES = ["1d", "1m"]
42
43
BASE_FIELDS = {
44
    'open': 'open',
45
    'open_price': 'open',
46
    'high': 'high',
47
    'low': 'low',
48
    'close': 'close',
49
    'close_price': 'close',
50
    'volume': 'volume',
51
    'price': 'close'
52
}
53
54
55
US_EQUITY_COLUMNS = {
56
    'open': USEquityPricing.open,
57
    'open_price': USEquityPricing.open,
58
    'high': USEquityPricing.high,
59
    'low': USEquityPricing.low,
60
    'close': USEquityPricing.close,
61
    'close_price': USEquityPricing.close,
62
    'volume': USEquityPricing.volume,
63
    'price': USEquityPricing.close,
64
}
65
66
67
class DataPortal(object):
68
    def __init__(self,
69
                 env,
70
                 sim_params=None,
71
                 minutes_equities_path=None,
72
                 minutes_futures_path=None,
73
                 daily_equities_path=None,
74
                 adjustment_reader=None,
75
                 equity_sid_path_func=None,
76
                 futures_sid_path_func=None):
77
        self.env = env
78
79
        # Internal pointers to the current dt (can be minute) and current day.
80
        # In daily mode, they point to the same thing. In minute mode, it's
81
        # useful to have separate pointers to the current day and to the
82
        # current minute.  These pointers are updated by the
83
        # AlgorithmSimulator's transform loop.
84
        self.current_dt = None
85
        self.current_day = None
86
87
        # This is a bit ugly, but is here for performance reasons.  In minute
88
        # simulations, we need to very quickly go from dt -> (# of minutes
89
        # since Jan 1 2002 9:30 Eastern).
90
        #
91
        # The clock that heartbeats the simulation has all the necessary
92
        # information to do this calculation very quickly.  This value is
93
        # calculated there, and then set here
94
        self.cur_data_offset = 0
95
96
        self.views = {}
97
98
        self._daily_equities_path = daily_equities_path
99
        self._minutes_equities_path = minutes_equities_path
100
        self._minutes_futures_path = minutes_futures_path
101
102
        self._asset_finder = env.asset_finder
103
104
        self._carrays = {
105
            'open': {},
106
            'high': {},
107
            'low': {},
108
            'close': {},
109
            'volume': {},
110
            'sid': {},
111
            'dt': {},
112
        }
113
114
        self._adjustment_reader = adjustment_reader
115
116
        # caches of sid -> adjustment list
117
        self._splits_dict = {}
118
        self._mergers_dict = {}
119
        self._dividends_dict = {}
120
121
        # Cache of sid -> the first trading day of an asset, even if that day
122
        # is before 1/2/2002.
123
        self._asset_start_dates = {}
124
        self._asset_end_dates = {}
125
126
        # Handle extra sources, like Fetcher.
127
        self._augmented_sources_map = {}
128
        self._extra_source_df = None
129
130
        self._sim_params = sim_params
131
        if self._sim_params is not None:
132
            self._data_frequency = self._sim_params.data_frequency
133
        else:
134
            self._data_frequency = "minute"
135
136
        self._equity_sid_path_func = equity_sid_path_func
137
        self._futures_sid_path_func = futures_sid_path_func
138
139
        self.MINUTE_PRICE_ADJUSTMENT_FACTOR = 0.001
140
141
        if daily_equities_path is not None:
142
            self._daily_bar_reader = BcolzDailyBarReader(daily_equities_path)
143
        else:
144
            self._daily_bar_reader = None
145
146
        self._minute_bar_reader = None
147
148
        # The following values are used by _minute_offset to calculate the
149
        # index into the minute bcolz date.
150
151
        # A lookup of table every minute to the corresponding day, to avoid
152
        # calling `.date()` on every lookup.
153
        self._minutes_to_day = {}
154
        # A map of days (keyed by midnight) to a DatetimeIndex of market
155
        # minutes for that day.
156
        self._minutes_by_day = {}
157
        # A dict of day to the offset into the minute bcolz on which that
158
        # days data starts.
159
        self._day_offsets = None
160
161
    @property
162
    def minute_bar_reader(self):
163
        if self._minute_bar_reader is None:
164
            self._minute_bar_reader = BcolzMinuteBarReader(
165
                self._minutes_equities_path)
166
        return self._minute_bar_reader
167
168
    def handle_extra_source(self, source_df):
169
        """
170
        Extra sources always have a sid column.
171
172
        We expand the given data (by forward filling) to the full range of
173
        the simulation dates, so that lookup is fast during simulation.
174
        """
175
        if source_df is None:
176
            return
177
178
        self._extra_source_df = source_df
179
180
        # source_df's sid column can either consist of assets we know about
181
        # (such as sid(24)) or of assets we don't know about (such as
182
        # palladium).
183
        #
184
        # In both cases, we break up the dataframe into individual dfs
185
        # that only contain a single asset's information.  ie, if source_df
186
        # has data for PALLADIUM and GOLD, we split source_df into two
187
        # dataframes, one for each. (same applies if source_df has data for
188
        # AAPL and IBM).
189
        #
190
        # We then take each child df and reindex it to the simulation's date
191
        # range by forward-filling missing values. this makes reads simpler.
192
        #
193
        # Finally, we store the data. For each column, we store a mapping in
194
        # self.augmented_sources_map from the column to a dictionary of
195
        # asset -> df.  In other words,
196
        # self.augmented_sources_map['days_to_cover']['AAPL'] gives us the df
197
        # holding that data.
198
199
        if self._sim_params.emission_rate == "daily":
200
            source_date_index = self.env.days_in_range(
201
                start=self._sim_params.period_start,
202
                end=self._sim_params.period_end
203
            )
204
        else:
205
            source_date_index = self.env.minutes_for_days_in_range(
206
                start=self._sim_params.period_start,
207
                end=self._sim_params.period_end
208
            )
209
210
        # Break the source_df up into one dataframe per sid.  This lets
211
        # us (more easily) calculate accurate start/end dates for each sid,
212
        # de-dup data, and expand the data to fit the backtest start/end date.
213
        grouped_by_sid = source_df.groupby(["sid"])
214
        group_names = grouped_by_sid.groups.keys()
215
        group_dict = {}
216
        for group_name in group_names:
217
            group_dict[group_name] = grouped_by_sid.get_group(group_name)
218
219
        for identifier, df in iteritems(group_dict):
220
            # Before reindexing, save the earliest and latest dates
221
            earliest_date = df.index[0]
222
            latest_date = df.index[-1]
223
224
            # Since we know this df only contains a single sid, we can safely
225
            # de-dupe by the index (dt)
226
            df = df.groupby(level=0).last()
227
228
            # Reindex the dataframe based on the backtest start/end date.
229
            # This makes reads easier during the backtest.
230
            df = df.reindex(index=source_date_index, method='ffill')
231
232
            if not isinstance(identifier, Asset):
233
                # for fake assets we need to store a start/end date
234
                self._asset_start_dates[identifier] = earliest_date
235
                self._asset_end_dates[identifier] = latest_date
236
237
            for col_name in df.columns.difference(['sid']):
238
                if col_name not in self._augmented_sources_map:
239
                    self._augmented_sources_map[col_name] = {}
240
241
                self._augmented_sources_map[col_name][identifier] = df
242
243
    def _open_minute_file(self, field, asset):
244
        sid_str = str(int(asset))
245
246
        try:
247
            carray = self._carrays[field][sid_str]
248
        except KeyError:
249
            carray = self._carrays[field][sid_str] = \
250
                self._get_ctable(asset)[field]
251
252
        return carray
253
254
    def _get_ctable(self, asset):
255
        sid = int(asset)
256
257
        if isinstance(asset, Future):
258
            if self._futures_sid_path_func is not None:
259
                path = self._futures_sid_path_func(
260
                    self._minutes_futures_path, sid
261
                )
262
            else:
263
                path = "{0}/{1}.bcolz".format(self._minutes_futures_path, sid)
264
        elif isinstance(asset, Equity):
265
            if self._equity_sid_path_func is not None:
266
                path = self._equity_sid_path_func(
267
                    self._minutes_equities_path, sid
268
                )
269
            else:
270
                path = "{0}/{1}.bcolz".format(self._minutes_equities_path, sid)
271
272
        else:
273
            path = "{0}/{1}.bcolz".format(self._minutes_equities_path, sid)
274
275
        return bcolz.open(path, mode='r')
276
277
    def get_previous_value(self, asset, field, dt):
278
        """
279
        Given an asset and a column and a dt, returns the previous value for
280
        the same asset/column pair.  If this data portal is in minute mode,
281
        it's the previous minute value, otherwise it's the previous day's
282
        value.
283
284
        Parameters
285
        ---------
286
        asset : Asset
287
            The asset whose data is desired.
288
289
        field: string
290
            The desired field of the asset.  Valid values are "open",
291
            "open_price", "high", "low", "close", "close_price", "volume", and
292
            "price".
293
294
        dt: pd.Timestamp
295
            The timestamp from which to go back in time one slot.
296
297
        Returns
298
        -------
299
        The value of the desired field at the desired time.
300
        """
301
        if self._data_frequency == 'daily':
302
            prev_dt = self.env.previous_trading_day(dt)
303
        elif self._data_frequency == 'minute':
304
            prev_dt = self.env.previous_market_minute(dt)
305
306
        return self.get_spot_value(asset, field, prev_dt)
307
308
    def _check_extra_sources(self, asset, column, day):
309
        # If we have an extra source with a column called "price", only look
310
        # at it if it's on something like palladium and not AAPL (since our
311
        # own price data always wins when dealing with assets).
312
        look_in_augmented_sources = column in self._augmented_sources_map and \
313
            not (column in BASE_FIELDS and isinstance(asset, Asset))
314
315
        if look_in_augmented_sources:
316
            # we're being asked for a field in an extra source
317
            try:
318
                return self._augmented_sources_map[column][asset].\
319
                    loc[day, column]
320
            except:
321
                log.error(
322
                    "Could not find value for asset={0}, current_day={1},"
323
                    "column={2}".format(
324
                        str(asset),
325
                        str(self.current_day),
326
                        str(column)))
327
328
                raise KeyError
329
330
    def get_spot_value(self, asset, field, dt=None):
331
        """
332
        Public API method that returns a scalar value representing the value
333
        of the desired asset's field at either the given dt, or this data
334
        portal's current_dt.
335
336
        Parameters
337
        ---------
338
        asset : Asset
339
            The asset whose data is desired.gith
340
341
        field: string
342
            The desired field of the asset.  Valid values are "open",
343
            "open_price", "high", "low", "close", "close_price", "volume", and
344
            "price".
345
346
        dt: pd.Timestamp
347
            (Optional) The timestamp for the desired value.
348
349
        Returns
350
        -------
351
        The value of the desired field at the desired time.
352
        """
353
        extra_source_val = self._check_extra_sources(
354
            asset,
355
            field,
356
            (dt or self.current_dt)
357
        )
358
359
        if extra_source_val is not None:
360
            return extra_source_val
361
362
        if field not in BASE_FIELDS:
363
            raise KeyError("Invalid column: " + str(field))
364
365
        column_to_use = BASE_FIELDS[field]
366
367
        if isinstance(asset, int):
368
            asset = self._asset_finder.retrieve_asset(asset)
369
370
        self._check_is_currently_alive(asset, dt)
371
372
        if self._data_frequency == "daily":
373
            day_to_use = dt or self.current_day
374
            day_to_use = normalize_date(day_to_use)
375
            return self._get_daily_data(asset, column_to_use, day_to_use)
376
        else:
377
            dt_to_use = dt or self.current_dt
378
379
            if isinstance(asset, Future):
380
                return self._get_minute_spot_value_future(
381
                    asset, column_to_use, dt_to_use)
382
            else:
383
                return self._get_minute_spot_value(
384
                    asset, column_to_use, dt_to_use)
385
386
    def _get_minute_spot_value_future(self, asset, column, dt):
387
        # Futures bcolz files have 1440 bars per day (24 hours), 7 days a week.
388
        # The file attributes contain the "start_dt" and "last_dt" fields,
389
        # which represent the time period for this bcolz file.
390
391
        # The start_dt is midnight of the first day that this future started
392
        # trading.
393
394
        # figure out the # of minutes between dt and this asset's start_dt
395
        start_date = self._get_asset_start_date(asset)
396
        minute_offset = int((dt - start_date).total_seconds() / 60)
397
398
        if minute_offset < 0:
399
            # asking for a date that is before the asset's start date, no dice
400
            return 0.0
401
402
        # then just index into the bcolz carray at that offset
403
        carray = self._open_minute_file(column, asset)
404
        result = carray[minute_offset]
405
406
        # if there's missing data, go backwards until we run out of file
407
        while result == 0 and minute_offset > 0:
408
            minute_offset -= 1
409
            result = carray[minute_offset]
410
411
        if column != 'volume':
412
            return result * self.MINUTE_PRICE_ADJUSTMENT_FACTOR
413
        else:
414
            return result
415
416
    def setup_offset_cache(self, minutes_by_day, minutes_to_day):
417
        # All our minute bcolz files are written starting on 1/2/2002,
418
        # with 390 minutes per day, regarding of when the security started
419
        # trading. This lets us avoid doing an offset calculation related
420
        # to the asset start date.  Hard-coding 390 minutes per day lets us
421
        # ignore half days.
422
        self._minutes_to_day = minutes_to_day
423
        self._minutes_by_day = minutes_by_day
424
        if self._sim_params is not None:
425
            start = self._sim_params.trading_days[0]
426
            first_trading_day_idx = self.minute_bar_reader.trading_days.\
427
                searchsorted(start)
428
            self._day_offsets = {
429
                day: (i + first_trading_day_idx) * 390
430
                for i, day in enumerate(
431
                    self._sim_params.trading_days)}
432
433
    def _minute_offset(self, dt):
434
        if self._day_offsets is not None:
435
            try:
436
                day = self._minutes_to_day[dt]
437
                minutes = self._minutes_by_day[day]
438
                return self._day_offsets[day] + minutes.get_loc(dt)
439
            except KeyError:
440
                return None
441
442
    def _get_minute_spot_value(self, asset, column, dt):
443
        # if dt is before the first market minute, minute_index
444
        # will be 0.  if it's after the last market minute, it'll
445
        # be len(minutes_for_day)
446
        minute_offset_to_use = self._minute_offset(dt)
447
448
        if minute_offset_to_use is None:
449
            given_day = pd.Timestamp(dt.date(), tz='utc')
450
            day_index = self.minute_bar_reader.trading_days.searchsorted(
451
                given_day)
452
453
            # if dt is before the first market minute, minute_index
454
            # will be 0.  if it's after the last market minute, it'll
455
            # be len(minutes_for_day)
456
            minute_index = self.env.market_minutes_for_day(given_day).\
457
                searchsorted(dt)
458
459
            minute_offset_to_use = (day_index * 390) + minute_index
460
461
        carray = self._open_minute_file(column, asset)
462
        result = carray[minute_offset_to_use]
463
464
        if result == 0:
465
            # if the given minute doesn't have data, we need to seek
466
            # backwards until we find data. This makes the data
467
            # forward-filled.
468
469
            # get this asset's start date, so that we don't look before it.
470
            start_date = self._get_asset_start_date(asset)
471
            start_date_idx = self.minute_bar_reader.trading_days.searchsorted(
472
                start_date)
473
            start_day_offset = start_date_idx * 390
474
475
            original_start = minute_offset_to_use
476
477
            while result == 0 and minute_offset_to_use > start_day_offset:
478
                minute_offset_to_use -= 1
479
                result = carray[minute_offset_to_use]
480
481
            # once we've found data, we need to check whether it needs
482
            # to be adjusted.
483
            if result != 0:
484
                minutes = self.env.market_minute_window(
485
                    start=(dt or self.current_dt),
486
                    count=(original_start - minute_offset_to_use + 1),
487
                    step=-1
488
                ).order()
489
490
                # only need to check for adjustments if we've gone back
491
                # far enough to cross the day boundary.
492
                if minutes[0].date() != minutes[-1].date():
493
                    # create a np array of size minutes, fill it all with
494
                    # the same value.  and adjust the array.
495
                    arr = np.array([result] * len(minutes),
496
                                   dtype=np.float64)
497
                    self._apply_all_adjustments(
498
                        data=arr,
499
                        asset=asset,
500
                        dts=minutes,
501
                        field=column
502
                    )
503
504
                    # The first value of the adjusted array is the value
505
                    # we want.
506
                    result = arr[0]
507
508
        if column != 'volume':
509
            return result * self.MINUTE_PRICE_ADJUSTMENT_FACTOR
510
        else:
511
            return result
512
513
    def _get_daily_data(self, asset, column, dt):
514
        while True:
515
            try:
516
                value = self._daily_bar_reader.spot_price(asset, dt, column)
517
                if value != -1:
518
                    return value
519
                else:
520
                    dt -= tradingcalendar.trading_day
521
            except NoDataOnDate:
522
                return 0
523
524
    def _get_history_daily_window(self, assets, end_dt, bar_count,
525
                                  field_to_use):
526
        """
527
        Internal method that returns a dataframe containing history bars
528
        of daily frequency for the given sids.
529
        """
530
        day_idx = tradingcalendar.trading_days.searchsorted(end_dt.date())
531
        days_for_window = tradingcalendar.trading_days[
532
            (day_idx - bar_count + 1):(day_idx + 1)]
533
534
        if len(assets) == 0:
535
            return pd.DataFrame(None,
536
                                index=days_for_window,
537
                                columns=None)
538
539
        data = []
540
541
        for asset in assets:
542
            if isinstance(asset, Future):
543
                data.append(self._get_history_daily_window_future(
544
                    asset, days_for_window, end_dt, field_to_use
545
                ))
546
            else:
547
                data.append(self._get_history_daily_window_equity(
548
                    asset, days_for_window, end_dt, field_to_use
549
                ))
550
551
        return pd.DataFrame(
552
            np.array(data).T,
553
            index=days_for_window,
554
            columns=assets
555
        )
556
557
    def _get_history_daily_window_future(self, asset, days_for_window,
558
                                         end_dt, column):
559
        # Since we don't have daily bcolz files for futures (yet), use minute
560
        # bars to calculate the daily values.
561
        data = []
562
        data_groups = []
563
564
        # get all the minutes for the days NOT including today
565
        for day in days_for_window[:-1]:
566
            minutes = self.env.market_minutes_for_day(day)
567
568
            values_for_day = np.zeros(len(minutes), dtype=np.float64)
569
570
            for idx, minute in enumerate(minutes):
571
                minute_val = self._get_minute_spot_value_future(
572
                    asset, column, minute
573
                )
574
575
                values_for_day[idx] = minute_val
576
577
            data_groups.append(values_for_day)
578
579
        # get the minutes for today
580
        last_day_minutes = pd.date_range(
581
            start=self.env.get_open_and_close(end_dt)[0],
582
            end=end_dt,
583
            freq="T"
584
        )
585
586
        values_for_last_day = np.zeros(len(last_day_minutes), dtype=np.float64)
587
588
        for idx, minute in enumerate(last_day_minutes):
589
            minute_val = self._get_minute_spot_value_future(
590
                asset, column, minute
591
            )
592
593
            values_for_last_day[idx] = minute_val
594
595
        data_groups.append(values_for_last_day)
596
597
        for group in data_groups:
598
            if len(group) == 0:
599
                continue
600
601
            if column == 'volume':
602
                data.append(np.sum(group))
603
            elif column == 'open':
604
                data.append(group[0])
605
            elif column == 'close':
606
                data.append(group[-1])
607
            elif column == 'high':
608
                data.append(np.amax(group))
609
            elif column == 'low':
610
                data.append(np.amin(group))
611
612
        return data
613
614
    def _get_history_daily_window_equity(self, asset, days_for_window,
615
                                         end_dt, field_to_use):
616
        sid = int(asset)
617
        ends_at_midnight = end_dt.hour == 0 and end_dt.minute == 0
618
619
        # get the start and end dates for this sid
620
        end_date = self._get_asset_end_date(asset)
621
622
        if ends_at_midnight or (days_for_window[-1] > end_date):
623
            # two cases where we use daily data for the whole range:
624
            # 1) the history window ends at midnight utc.
625
            # 2) the last desired day of the window is after the
626
            # last trading day, use daily data for the whole range.
627
            return self._get_daily_window_for_sid(
628
                asset,
629
                field_to_use,
630
                days_for_window,
631
                extra_slot=False
632
            )
633
        else:
634
            # for the last day of the desired window, use minute
635
            # data and aggregate it.
636
            all_minutes_for_day = self.env.market_minutes_for_day(
637
                pd.Timestamp(end_dt.date()))
638
639
            last_minute_idx = all_minutes_for_day.searchsorted(end_dt)
640
641
            # these are the minutes for the partial day
642
            minutes_for_partial_day =\
643
                all_minutes_for_day[0:(last_minute_idx + 1)]
644
645
            daily_data = self._get_daily_window_for_sid(
646
                sid,
647
                field_to_use,
648
                days_for_window[0:-1]
649
            )
650
651
            minute_data = self._get_minute_window_for_equity(
652
                sid,
653
                field_to_use,
654
                minutes_for_partial_day
655
            )
656
657
            if field_to_use == 'volume':
658
                minute_value = np.sum(minute_data)
659
            elif field_to_use == 'open':
660
                minute_value = minute_data[0]
661
            elif field_to_use == 'close':
662
                minute_value = minute_data[-1]
663
            elif field_to_use == 'high':
664
                minute_value = np.amax(minute_data)
665
            elif field_to_use == 'low':
666
                minute_value = np.amin(minute_data)
667
668
            # append the partial day.
669
            daily_data[-1] = minute_value
670
671
            return daily_data
672
673
    def _get_history_minute_window(self, assets, end_dt, bar_count,
674
                                   field_to_use):
675
        """
676
        Internal method that returns a dataframe containing history bars
677
        of minute frequency for the given sids.
678
        """
679
        # get all the minutes for this window
680
        minutes_for_window = self.env.market_minute_window(
681
            end_dt, bar_count, step=-1)[::-1]
682
683
        first_trading_day = self.minute_bar_reader.first_trading_day
684
685
        # but then cut it down to only the minutes after
686
        # the first trading day.
687
        modified_minutes_for_window = minutes_for_window[
688
            minutes_for_window.slice_indexer(first_trading_day)]
689
690
        modified_minutes_length = len(modified_minutes_for_window)
691
692
        if modified_minutes_length == 0:
693
            raise ValueError("Cannot calculate history window that ends"
694
                             "before 2002-01-02 14:31 UTC!")
695
696
        data = []
697
        bars_to_prepend = 0
698
        nans_to_prepend = None
699
700
        if modified_minutes_length < bar_count:
701
            first_trading_date = first_trading_day.date()
702
            if modified_minutes_for_window[0].date() == first_trading_date:
703
                # the beginning of the window goes before our global trading
704
                # start date
705
                bars_to_prepend = bar_count - modified_minutes_length
706
                nans_to_prepend = np.repeat(np.nan, bars_to_prepend)
707
708
        if len(assets) == 0:
709
            return pd.DataFrame(
710
                None,
711
                index=modified_minutes_for_window,
712
                columns=None
713
            )
714
715
        for asset in assets:
716
            asset_minute_data = self._get_minute_window_for_asset(
717
                asset,
718
                field_to_use,
719
                modified_minutes_for_window
720
            )
721
722
            if bars_to_prepend != 0:
723
                asset_minute_data = np.insert(asset_minute_data, 0,
724
                                              nans_to_prepend)
725
726
            data.append(asset_minute_data)
727
728
        return pd.DataFrame(
729
            np.array(data).T,
730
            index=minutes_for_window,
731
            columns=map(int, assets)
732
        )
733
734
    def get_history_window(self, assets, end_dt, bar_count, frequency, field,
735
                           ffill=True):
736
        """
737
        Public API method that returns a dataframe containing the requested
738
        history window.  Data is fully adjusted.
739
740
        Parameters
741
        ---------
742
        assets : list of zipline.data.Asset objects
743
            The assets whose data is desired.
744
745
        bar_count: int
746
            The number of bars desired.
747
748
        frequency: string
749
            "1d" or "1m"
750
751
        field: string
752
            The desired field of the asset.
753
754
        ffill: boolean
755
            Forward-fill missing values. Only has effect if field
756
            is 'price'.
757
758
        Returns
759
        -------
760
        A dataframe containing the requested data.
761
        """
762
        try:
763
            field_to_use = BASE_FIELDS[field]
764
        except KeyError:
765
            raise ValueError("Invalid history field: " + str(field))
766
767
        # sanity check in case sids were passed in
768
        assets = [(self.env.asset_finder.retrieve_asset(asset) if
769
                   isinstance(asset, int) else asset) for asset in assets]
770
771
        if frequency == "1d":
772
            df = self._get_history_daily_window(assets, end_dt, bar_count,
773
                                                field_to_use)
774
        elif frequency == "1m":
775
            df = self._get_history_minute_window(assets, end_dt, bar_count,
776
                                                 field_to_use)
777
        else:
778
            raise ValueError("Invalid frequency: {0}".format(frequency))
779
780
        # forward-fill if needed
781
        if field == "price" and ffill:
782
            df.fillna(method='ffill', inplace=True)
783
784
        return df
785
786
    def _get_minute_window_for_asset(self, asset, field, minutes_for_window):
787
        """
788
        Internal method that gets a window of adjusted minute data for an asset
789
        and specified date range.  Used to support the history API method for
790
        minute bars.
791
792
        Missing bars are filled with NaN.
793
794
        Parameters
795
        ----------
796
        asset : Asset
797
            The asset whose data is desired.
798
799
        field: string
800
            The specific field to return.  "open", "high", "close_price", etc.
801
802
        minutes_for_window: pd.DateTimeIndex
803
            The list of minutes representing the desired window.  Each minute
804
            is a pd.Timestamp.
805
806
        Returns
807
        -------
808
        A numpy array with requested values.
809
        """
810
        if isinstance(asset, int):
811
            asset = self.env.asset_finder.retrieve_asset(asset)
812
813
        if isinstance(asset, Future):
814
            return self._get_minute_window_for_future(asset, field,
815
                                                      minutes_for_window)
816
        else:
817
            return self._get_minute_window_for_equity(asset, field,
818
                                                      minutes_for_window)
819
820
    def _get_minute_window_for_future(self, asset, field, minutes_for_window):
821
        # THIS IS TEMPORARY.  For now, we are only exposing futures within
822
        # equity trading hours (9:30 am to 4pm, Eastern).  The easiest way to
823
        # do this is to simply do a spot lookup for each desired minute.
824
        return_data = np.zeros(len(minutes_for_window), dtype=np.float64)
825
        for idx, minute in enumerate(minutes_for_window):
826
            return_data[idx] = \
827
                self._get_minute_spot_value_future(asset, field, minute)
828
829
        # Note: an improvement could be to find the consecutive runs within
830
        # minutes_for_window, and use them to read the underlying ctable
831
        # more efficiently.
832
833
        # Once futures are on 24-hour clock, then we can just grab all the
834
        # requested minutes in one shot from the ctable.
835
836
        # no adjustments for futures, yay.
837
        return return_data
838
839
    def _get_minute_window_for_equity(self, asset, field, minutes_for_window):
840
        # each sid's minutes are stored in a bcolz file
841
        # the bcolz file has 390 bars per day, starting at 1/2/2002, regardless
842
        # of when the asset started trading and regardless of half days.
843
        # for a half day, the second half is filled with zeroes.
844
845
        # find the position of start_dt in the entire timeline, go back
846
        # bar_count bars, and that's the unadjusted data
847
        raw_data = self._open_minute_file(field, asset)
848
849
        start_idx = max(self._find_position_of_minute(minutes_for_window[0]),
850
                        0)
851
        end_idx = self._find_position_of_minute(minutes_for_window[-1]) + 1
852
853
        if end_idx == 0:
854
            # No data to return for minute window.
855
            return np.full(len(minutes_for_window), np.nan)
856
857
        return_data = np.zeros(len(minutes_for_window), dtype=np.float64)
858
859
        data_to_copy = raw_data[start_idx:end_idx]
860
861
        num_minutes = len(minutes_for_window)
862
863
        # data_to_copy contains all the zeros (from 1pm to 4pm of an early
864
        # close).  num_minutes is the number of actual trading minutes.  if
865
        # these two have different lengths, that means that we need to trim
866
        # away data due to early closes.
867
        if len(data_to_copy) != num_minutes:
868
            # get a copy of the minutes in Eastern time, since we depend on
869
            # an early close being at 1pm Eastern.
870
            eastern_minutes = minutes_for_window.tz_convert("US/Eastern")
871
872
            # accumulate a list of indices of the last minute of an early
873
            # close day.  For example, if data_to_copy starts at 12:55 pm, and
874
            # there are five minutes of real data before 180 zeroes, we would
875
            # put 5 into last_minute_idx_of_early_close_day, because the fifth
876
            # minute is the last "real" minute of the day.
877
            last_minute_idx_of_early_close_day = []
878
            for minute_idx, minute_dt in enumerate(eastern_minutes):
879
                if minute_idx == (num_minutes - 1):
880
                    break
881
882
                if minute_dt.hour == 13 and minute_dt.minute == 0:
883
                    next_minute = eastern_minutes[minute_idx + 1]
884
                    if next_minute.hour != 13:
885
                        # minute_dt is the last minute of an early close day
886
                        last_minute_idx_of_early_close_day.append(minute_idx)
887
888
            # spin through the list of early close markers, and use them to
889
            # chop off 180 minutes at a time from data_to_copy.
890
            for idx, early_close_minute_idx in \
891
                    enumerate(last_minute_idx_of_early_close_day):
892
                early_close_minute_idx -= (180 * idx)
893
                data_to_copy = np.delete(
894
                    data_to_copy,
895
                    range(
896
                        early_close_minute_idx + 1,
897
                        early_close_minute_idx + 181
898
                    )
899
                )
900
901
        return_data[0:len(data_to_copy)] = data_to_copy
902
903
        self._apply_all_adjustments(
904
            return_data,
905
            asset,
906
            minutes_for_window,
907
            field,
908
            self.MINUTE_PRICE_ADJUSTMENT_FACTOR
909
        )
910
911
        return return_data
912
913
    def _apply_all_adjustments(self, data, asset, dts, field,
914
                               price_adj_factor=1.0):
915
        """
916
        Internal method that applies all the necessary adjustments on the
917
        given data array.
918
919
        The adjustments are:
920
        - splits
921
        - if field != "volume":
922
            - mergers
923
            - dividends
924
            - * 0.001
925
            - any zero fields replaced with NaN
926
        - all values rounded to 3 digits after the decimal point.
927
928
        Parameters
929
        ----------
930
        data : np.array
931
            The data to be adjusted.
932
933
        asset: Asset
934
            The asset whose data is being adjusted.
935
936
        dts: pd.DateTimeIndex
937
            The list of minutes or days representing the desired window.
938
939
        field: string
940
            The field whose values are in the data array.
941
942
        price_adj_factor: float
943
            Factor with which to adjust OHLC values.
944
        Returns
945
        -------
946
        None.  The data array is modified in place.
947
        """
948
        self._apply_adjustments_to_window(
949
            self._get_adjustment_list(
950
                asset, self._splits_dict, "SPLITS"
951
            ),
952
            data,
953
            dts,
954
            field != 'volume'
955
        )
956
957
        if field != 'volume':
958
            self._apply_adjustments_to_window(
959
                self._get_adjustment_list(
960
                    asset, self._mergers_dict, "MERGERS"
961
                ),
962
                data,
963
                dts,
964
                True
965
            )
966
967
            self._apply_adjustments_to_window(
968
                self._get_adjustment_list(
969
                    asset, self._dividends_dict, "DIVIDENDS"
970
                ),
971
                data,
972
                dts,
973
                True
974
            )
975
976
            data *= price_adj_factor
977
978
            # if anything is zero, it's a missing bar, so replace it with NaN.
979
            # we only want to do this for non-volume fields, because a missing
980
            # volume should be 0.
981
            data[data == 0] = np.NaN
982
983
        np.around(data, 3, out=data)
984
985
    def _find_position_of_minute(self, minute_dt):
986
        """
987
        Internal method that returns the position of the given minute in the
988
        list of every trading minute since market open on 1/2/2002.
989
990
        IMPORTANT: This method assumes every day is 390 minutes long, even
991
        early closes.  Our minute bcolz files are generated like this to
992
        support fast lookup.
993
994
        ex. this method would return 2 for 1/2/2002 9:32 AM Eastern.
995
996
        Parameters
997
        ----------
998
        minute_dt: pd.Timestamp
999
            The minute whose position should be calculated.
1000
1001
        Returns
1002
        -------
1003
        The position of the given minute in the list of all trading minutes
1004
        since market open on 1/2/2002.
1005
        """
1006
        day = minute_dt.date()
1007
        day_idx = self.minute_bar_reader.trading_days.searchsorted(day)
1008
        if day_idx < 0:
1009
            return -1
1010
1011
        day_open = pd.Timestamp(
1012
            datetime(
1013
                year=day.year,
1014
                month=day.month,
1015
                day=day.day,
1016
                hour=9,
1017
                minute=31),
1018
            tz='US/Eastern').tz_convert('UTC')
1019
1020
        minutes_offset = int((minute_dt - day_open).total_seconds()) / 60
1021
1022
        return int((390 * day_idx) + minutes_offset)
1023
1024
    def _get_daily_window_for_sid(self, asset, field, days_in_window,
1025
                                  extra_slot=True):
1026
        """
1027
        Internal method that gets a window of adjusted daily data for a sid
1028
        and specified date range.  Used to support the history API method for
1029
        daily bars.
1030
1031
        Parameters
1032
        ----------
1033
        asset : Asset
1034
            The asset whose data is desired.
1035
1036
        start_dt: pandas.Timestamp
1037
            The start of the desired window of data.
1038
1039
        bar_count: int
1040
            The number of days of data to return.
1041
1042
        field: string
1043
            The specific field to return.  "open", "high", "close_price", etc.
1044
1045
        extra_slot: boolean
1046
            Whether to allocate an extra slot in the returned numpy array.
1047
            This extra slot will hold the data for the last partial day.  It's
1048
            much better to create it here than to create a copy of the array
1049
            later just to add a slot.
1050
1051
        Returns
1052
        -------
1053
        A numpy array with requested values.  Any missing slots filled with
1054
        nan.
1055
1056
        """
1057
        column = US_EQUITY_COLUMNS[field]
1058
        data = self._daily_bar_reader.load_raw_arrays([column],
1059
                                                      days_in_window[0],
1060
                                                      days_in_window[-1],
1061
                                                      [asset])
1062
1063
        bar_count = len(days_in_window)
1064
1065
        # create an np.array of size bar_count
1066
        if extra_slot:
1067
            return_array = np.zeros((bar_count + 1,))
1068
        else:
1069
            return_array = np.zeros((bar_count,))
1070
1071
        return_array[:] = np.NAN
1072
        sid = int(asset)
1073
1074
        return_array[0:bar_count] = data[0].T[0]
1075
1076
        self._apply_all_adjustments(
1077
            return_array,
1078
            sid,
1079
            days_in_window,
1080
            field,
1081
        )
1082
1083
        return return_array
1084
1085
    @staticmethod
1086
    def _apply_adjustments_to_window(adjustments_list, window_data,
1087
                                     dts_in_window, multiply):
1088
        if len(adjustments_list) == 0:
1089
            return
1090
1091
        # advance idx to the correct spot in the adjustments list, based on
1092
        # when the window starts
1093
        idx = 0
1094
1095
        while idx < len(adjustments_list) and dts_in_window[0] >\
1096
                adjustments_list[idx][0]:
1097
            idx += 1
1098
1099
        # if we've advanced through all the adjustments, then there's nothing
1100
        # to do.
1101
        if idx == len(adjustments_list):
1102
            return
1103
1104
        while idx < len(adjustments_list):
1105
            adjustment_to_apply = adjustments_list[idx]
1106
1107
            if adjustment_to_apply[0] > dts_in_window[-1]:
1108
                break
1109
1110
            range_end = dts_in_window.searchsorted(adjustment_to_apply[0])
1111
            if multiply:
1112
                window_data[0:range_end] *= adjustment_to_apply[1]
1113
            else:
1114
                window_data[0:range_end] /= adjustment_to_apply[1]
1115
1116
            idx += 1
1117
1118
    def _get_adjustment_list(self, asset, adjustments_dict, table_name):
1119
        """
1120
        Internal method that returns a list of adjustments for the given sid.
1121
1122
        Parameters
1123
        ----------
1124
        asset : Asset
1125
            The asset for which to return adjustments.
1126
1127
        adjustments_dict: dict
1128
            A dictionary of sid -> list that is used as a cache.
1129
1130
        table_name: string
1131
            The table that contains this data in the adjustments db.
1132
1133
        Returns
1134
        -------
1135
        adjustments: list
1136
            A list of [multiplier, pd.Timestamp], earliest first
1137
1138
        """
1139
        if self._adjustment_reader is None:
1140
            return []
1141
1142
        sid = int(asset)
1143
1144
        try:
1145
            adjustments = adjustments_dict[sid]
1146
        except KeyError:
1147
            adjustments = adjustments_dict[sid] = self._adjustment_reader.\
1148
                get_adjustments_for_sid(table_name, sid)
1149
1150
        return adjustments
1151
1152
    def get_equity_price_view(self, asset):
1153
        """
1154
        Returns a DataPortalSidView for the given asset.  Used to support the
1155
        data[sid(N)] public API.  Not needed if DataPortal is used standalone.
1156
1157
        Parameters
1158
        ----------
1159
        asset : Asset
1160
            Asset that is being queried.
1161
1162
        Returns
1163
        -------
1164
        DataPortalSidView: Accessor into the given asset's data.
1165
        """
1166
        try:
1167
            view = self.views[asset]
1168
        except KeyError:
1169
            view = self.views[asset] = DataPortalSidView(asset, self)
1170
1171
        return view
1172
1173
    def _check_is_currently_alive(self, asset, dt):
1174
        if dt is None:
1175
            dt = self.current_day
1176
1177
        sid = int(asset)
1178
1179
        if sid not in self._asset_start_dates:
1180
            self._get_asset_start_date(asset)
1181
1182
        start_date = self._asset_start_dates[sid]
1183
        if self._asset_start_dates[sid] > dt:
1184
            raise NoTradeDataAvailableTooEarly(
1185
                sid=sid,
1186
                dt=dt,
1187
                start_dt=start_date
1188
            )
1189
1190
        end_date = self._asset_end_dates[sid]
1191
        if self._asset_end_dates[sid] < dt:
1192
            raise NoTradeDataAvailableTooLate(
1193
                sid=sid,
1194
                dt=dt,
1195
                end_dt=end_date
1196
            )
1197
1198
    def _get_asset_start_date(self, asset):
1199
        self._ensure_asset_dates(asset)
1200
        return self._asset_start_dates[asset]
1201
1202
    def _get_asset_end_date(self, asset):
1203
        self._ensure_asset_dates(asset)
1204
        return self._asset_end_dates[asset]
1205
1206
    def _ensure_asset_dates(self, asset):
1207
        sid = int(asset)
1208
1209
        if sid not in self._asset_start_dates:
1210
            self._asset_start_dates[sid] = asset.start_date
1211
            self._asset_end_dates[sid] = asset.end_date
1212
1213
    def get_splits(self, sids, dt):
1214
        """
1215
        Returns any splits for the given sids and the given dt.
1216
1217
        Parameters
1218
        ----------
1219
        sids : list
1220
            Sids for which we want splits.
1221
1222
        dt: pd.Timestamp
1223
            The date for which we are checking for splits.  Note: this is
1224
            expected to be midnight UTC.
1225
1226
        Returns
1227
        -------
1228
        list: List of splits, where each split is a (sid, ratio) tuple.
1229
        """
1230
        if self._adjustment_reader is None or len(sids) == 0:
1231
            return {}
1232
1233
        # convert dt to # of seconds since epoch, because that's what we use
1234
        # in the adjustments db
1235
        seconds = int(dt.value / 1e9)
1236
1237
        splits = self._adjustment_reader.conn.execute(
1238
            "SELECT sid, ratio FROM SPLITS WHERE effective_date = ?",
1239
            (seconds,)).fetchall()
1240
1241
        sids_set = set(sids)
1242
        splits = [split for split in splits if split[0] in sids_set]
1243
1244
        return splits
1245
1246
    def get_stock_dividends(self, sid, trading_days):
1247
        """
1248
        Returns all the stock dividends for a specific sid that occur
1249
        in the given trading range.
1250
1251
        Parameters
1252
        ----------
1253
        sid: int
1254
            The asset whose stock dividends should be returned.
1255
1256
        trading_days: pd.DatetimeIndex
1257
            The trading range.
1258
1259
        Returns
1260
        -------
1261
        list: A list of objects with all relevant attributes populated.
1262
        All timestamp fields are converted to pd.Timestamps.
1263
        """
1264
1265
        if self._adjustment_reader is None:
1266
            return []
1267
1268
        if len(trading_days) == 0:
1269
            return []
1270
1271
        start_dt = trading_days[0].value / 1e9
1272
        end_dt = trading_days[-1].value / 1e9
1273
1274
        dividends = self._adjustment_reader.conn.execute(
1275
            "SELECT * FROM stock_dividend_payouts WHERE sid = ? AND "
1276
            "ex_date > ? AND pay_date < ?", (int(sid), start_dt, end_dt,)).\
1277
            fetchall()
1278
1279
        dividend_info = []
1280
        for dividend_tuple in dividends:
1281
            dividend_info.append({
1282
                "declared_date": dividend_tuple[1],
1283
                "ex_date": pd.Timestamp(dividend_tuple[2], unit="s"),
1284
                "pay_date": pd.Timestamp(dividend_tuple[3], unit="s"),
1285
                "payment_sid": dividend_tuple[4],
1286
                "ratio": dividend_tuple[5],
1287
                "record_date": pd.Timestamp(dividend_tuple[6], unit="s"),
1288
                "sid": dividend_tuple[7]
1289
            })
1290
1291
        return dividend_info
1292
1293
    def contains(self, asset, field):
1294
        return field in BASE_FIELDS or \
1295
            (field in self._augmented_sources_map and
1296
             asset in self._augmented_sources_map[field])
1297
1298
    def get_fetcher_assets(self):
1299
        """
1300
        Returns a list of assets for the current date, as defined by the
1301
        fetcher data.
1302
1303
        Notes
1304
        -----
1305
        Data is forward-filled.  If there is no fetcher data defined for day
1306
        N, we use day N-1's data (if available, otherwise we keep going back).
1307
1308
        Returns
1309
        -------
1310
        list: a list of Asset objects.
1311
        """
1312
        # return a list of assets for the current date, as defined by the
1313
        # fetcher source
1314
        if self._extra_source_df is None:
1315
            return []
1316
1317
        if self.current_day in self._extra_source_df.index:
1318
            date_to_use = self.current_day
1319
        else:
1320
            # current day isn't in the fetcher df, go back the last
1321
            # available day
1322
            idx = self._extra_source_df.index.searchsorted(self.current_day)
1323
            if idx == 0:
1324
                return []
1325
1326
            date_to_use = self._extra_source_df.index[idx - 1]
1327
1328
        asset_list = self._extra_source_df.loc[date_to_use]["sid"]
1329
1330
        # make sure they're actually assets
1331
        asset_list = [asset for asset in asset_list
1332
                      if isinstance(asset, Asset)]
1333
1334
        return asset_list
1335
1336
1337
class DataPortalSidView(object):
1338
    def __init__(self, asset, portal):
1339
        self.asset = asset
1340
        self.portal = portal
1341
1342
    def __getattr__(self, column):
1343
        return self.portal.get_spot_value(self.asset, column)
1344
1345
    def __contains__(self, column):
1346
        return self.portal.contains(self.asset, column)
1347
1348
    def __getitem__(self, column):
1349
        return self.__getattr__(column)
1350