Completed
Pull Request — master (#858)
by Eddie
10:07 queued 01:13
created

zipline.data.BcolzMinuteBarWriter.gen_frames()   A

Complexity

Conditions 1

Size

Total Lines 6

Duplication

Lines 0
Ratio 0 %
Metric Value
cc 1
dl 0
loc 6
rs 9.4286
1
# Copyright 2015 Quantopian, Inc.
2
#
3
# Licensed under the Apache License, Version 2.0 (the "License");
4
# you may not use this file except in compliance with the License.
5
# You may obtain a copy of the License at
6
#
7
#     http://www.apache.org/licenses/LICENSE-2.0
8
#
9
# Unless required by applicable law or agreed to in writing, software
10
# distributed under the License is distributed on an "AS IS" BASIS,
11
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12
# See the License for the specific language governing permissions and
13
# limitations under the License.
14
15
from abc import (
16
    ABCMeta,
17
    abstractmethod,
18
)
19
import bcolz
20
import json
21
import os
22
from bcolz import ctable
23
from datetime import datetime
24
import numpy as np
25
from numpy import float64
26
from os.path import join
27
import pandas as pd
28
from pandas import read_csv
29
from six import with_metaclass
30
31
from zipline.finance.trading import TradingEnvironment
32
from zipline.utils import tradingcalendar
33
34
MINUTES_PER_DAY = 390
35
36
_writer_env = TradingEnvironment()
37
38
_NANOS_IN_MINUTE = 60000000000
39
40
METADATA_FILENAME = 'metadata.json'
41
42
43
def write_metadata(directory, first_trading_day):
44
    metadata_path = os.path.join(directory, METADATA_FILENAME)
45
46
    metadata = {
47
        'first_trading_day': str(first_trading_day.date())
48
    }
49
50
    with open(metadata_path, 'w') as fp:
51
        json.dump(metadata, fp)
52
53
54
def _bcolz_minute_index(trading_days):
55
    minutes = np.zeros(len(trading_days) * MINUTES_PER_DAY,
56
                       dtype='datetime64[ns]')
57
    market_opens = tradingcalendar.open_and_closes.market_open
58
    mask = market_opens.index.slice_indexer(start=trading_days[0],
59
                                            end=trading_days[-1])
60
    opens = market_opens[mask]
61
62
    deltas = np.arange(0, MINUTES_PER_DAY, dtype='timedelta64[m]')
63
    for i, market_open in enumerate(opens):
64
        start = market_open.asm8
65
        minute_values = start + deltas
66
        start_ix = MINUTES_PER_DAY * i
67
        end_ix = start_ix + MINUTES_PER_DAY
68
        minutes[start_ix:end_ix] = minute_values
69
    return pd.to_datetime(minutes, utc=True, box=True)
70
71
72
class BcolzMinuteBarWriter(with_metaclass(ABCMeta)):
73
    """
74
    Class capable of writing minute OHLCV data to disk into bcolz format.
75
    """
76
    @property
77
    def first_trading_day(self):
78
        return self._first_trading_day
79
80
    @abstractmethod
81
    def gen_frames(self, assets):
82
        """
83
        Return an iterator of pairs of (asset_id, pd.dataframe).
84
        """
85
        raise NotImplementedError()
86
87
    def write(self, directory, assets, sid_path_func=None):
88
        _iterator = self.gen_frames(assets)
89
90
        return self._write_internal(directory, _iterator,
91
                                    sid_path_func=sid_path_func)
92
93
    def full_minutes_for_days(self, dt1, dt2):
94
        start_date = _writer_env.normalize_date(dt1)
95
        end_date = _writer_env.normalize_date(dt2)
96
97
        trading_days = _writer_env.days_in_range(start_date, end_date)
98
        return _bcolz_minute_index(trading_days)
99
100
    def _write_internal(self, directory, iterator, sid_path_func=None):
101
        first_trading_day = self.first_trading_day
102
103
        write_metadata(directory, first_trading_day)
104
105
        first_open = pd.Timestamp(
106
            datetime(
107
                year=first_trading_day.year,
108
                month=first_trading_day.month,
109
                day=first_trading_day.day,
110
                hour=9,
111
                minute=31
112
            ), tz='US/Eastern').tz_convert('UTC')
113
114
        all_minutes = None
115
116
        for asset_id, df in iterator:
117
            if sid_path_func is None:
118
                path = join(directory, "{0}.bcolz".format(asset_id))
119
            else:
120
                path = sid_path_func(directory, asset_id)
121
122
            os.makedirs(path)
123
124
            last_dt = df.index[-1]
125
126
            if all_minutes is None:
127
                all_minutes = \
128
                    self.full_minutes_for_days(first_open, last_dt)
129
                minutes = all_minutes
130
            else:
131
                if df.index[-1] in all_minutes:
132
                    mask = all_minutes.slice_indexer(end=last_dt)
133
                    minutes = all_minutes[mask]
134
                else:
135
                    # Need to extend all minutes from open after last value
136
                    # in all_minutes to the last_dt.
137
                    next_open, _ = _writer_env.next_open_and_close(
138
                        all_minutes[-1])
139
                    to_append = self.full_minutes_for_days(next_open, last_dt)
140
                    all_minutes = all_minutes.append(to_append)
141
                    minutes = all_minutes
142
143
            minutes_count = len(minutes)
144
145
            open_col = np.zeros(minutes_count, dtype=np.uint32)
146
            high_col = np.zeros(minutes_count, dtype=np.uint32)
147
            low_col = np.zeros(minutes_count, dtype=np.uint32)
148
            close_col = np.zeros(minutes_count, dtype=np.uint32)
149
            vol_col = np.zeros(minutes_count, dtype=np.uint32)
150
151
            dt_ixs = np.searchsorted(minutes.values, df.index.values)
152
153
            open_col[dt_ixs] = df.open.values.astype(np.uint32)
154
            high_col[dt_ixs] = df.high.values.astype(np.uint32)
155
            low_col[dt_ixs] = df.low.values.astype(np.uint32)
156
            close_col[dt_ixs] = df.close.values.astype(np.uint32)
157
            vol_col[dt_ixs] = df.volume.values.astype(np.uint32)
158
159
            ctable(
160
                columns=[
161
                    open_col,
162
                    high_col,
163
                    low_col,
164
                    close_col,
165
                    vol_col,
166
                ],
167
                names=[
168
                    "open",
169
                    "high",
170
                    "low",
171
                    "close",
172
                    "volume",
173
                ],
174
                rootdir=path,
175
                mode='w'
176
            )
177
178
179
class MinuteBarWriterFromDataFrames(BcolzMinuteBarWriter):
180
    _csv_dtypes = {
181
        'open': float64,
182
        'high': float64,
183
        'low': float64,
184
        'close': float64,
185
        'volume': float64,
186
    }
187
188
    def __init__(self, first_trading_day):
189
        self._first_trading_day = first_trading_day
190
191
    def gen_frames(self, assets):
192
        for asset in assets:
193
            df = assets[asset]
194
            yield asset, df.set_index("minute")
195
196
197
class MinuteBarWriterFromCSVs(BcolzMinuteBarWriter):
198
    """
199
    BcolzMinuteBarWriter constructed from a map of CSVs to assets.
200
201
    Parameters
202
    ----------
203
    asset_map: dict
204
        A map from asset_id -> path to csv with data for that asset.
205
206
    CSVs should have the following columns:
207
        minute : datetime64
208
        open : float64
209
        high : float64
210
        low : float64
211
        close : float64
212
        volume : int64
213
    """
214
    _csv_dtypes = {
215
        'open': float64,
216
        'high': float64,
217
        'low': float64,
218
        'close': float64,
219
        'volume': float64,
220
    }
221
222
    def __init__(self, asset_map, first_trading_day):
223
        self._asset_map = asset_map
224
        self._first_trading_day = first_trading_day
225
226
    def gen_frames(self, assets):
227
        """
228
        Read CSVs as DataFrames from our asset map.
229
        """
230
        dtypes = self._csv_dtypes
231
232
        for asset in assets:
233
            path = self._asset_map.get(asset)
234
            if path is None:
235
                raise KeyError("No path supplied for asset %s" % asset)
236
            df = read_csv(path, parse_dates=['minute'], dtype=dtypes)
237
            df = df.set_index("minute").tz_localize("UTC")
238
239
            yield asset, df
240
241
242
class BcolzMinuteBarReader(object):
243
244
    def __init__(self, rootdir, sid_path_func=None):
245
        self.rootdir = rootdir
246
247
        metadata = self._get_metadata()
248
249
        self.first_trading_day = pd.Timestamp(
250
            metadata['first_trading_day'], tz='UTC')
251
        mask = tradingcalendar.trading_days.slice_indexer(
252
            self.first_trading_day)
253
        # TODO: Read/write calendar to match daily, so that calendar is not
254
        # 'hardcoded'.
255
        self.trading_days = tradingcalendar.trading_days[mask]
256
        self._sid_path_func = sid_path_func
257
258
        self._carrays = {
259
            'open': {},
260
            'high': {},
261
            'low': {},
262
            'close': {},
263
            'volume': {},
264
            'sid': {},
265
            'dt': {},
266
        }
267
268
        self._minute_index = _bcolz_minute_index(self.trading_days)
269
270
    def _get_metadata(self):
271
        with open(os.path.join(self.rootdir, METADATA_FILENAME)) as fp:
272
            return json.load(fp)
273
274
    def _get_ctable(self, asset):
275
        sid = int(asset)
276
        if self._sid_path_func is not None:
277
            path = self._sid_path_func(self.rootdir, sid)
278
        else:
279
            path = "{0}/{1}.bcolz".format(self.rootdir, sid)
280
281
        return bcolz.open(path, mode='r')
282
283
    def get_last_traded_dt(self, asset, dt):
284
        minute_pos = self._find_last_traded_position(asset, dt)
285
        if minute_pos == -1:
286
            return pd.NaT
287
        return self._minute_index[minute_pos]
288
289
    def _find_last_traded_position(self, asset, dt):
290
        volumes = self._open_minute_file('volume', asset)
291
        start_date = asset.start_date
292
        _minute_index = self._minute_index
293
294
        minute_pos = self._find_position_of_minute(dt)
295
296
        while True:
297
            dt = _minute_index[minute_pos]
298
            if dt < start_date:
299
                return -1
300
            if minute_pos == 0 or volumes[minute_pos] != 0:
301
                return minute_pos
302
            minute_pos -= 1
303
304
    def _find_position_of_minute(self, minute_dt):
305
        """
306
        Internal method that returns the position of the given minute in the
307
        list of every trading minute since market open of the first trading
308
        day.
309
310
        IMPORTANT: This method assumes every day is 390 minutes long, even
311
        early closes.  Our minute bcolz files are generated like this to
312
        support fast lookup.
313
314
        ex. this method would return 2 for 1/2/2002 9:32 AM Eastern, if
315
        1/2/2002 is the first trading day of the dataset.
316
317
        Parameters
318
        ----------
319
        minute_dt: pd.Timestamp
320
            The minute whose position should be calculated.
321
322
        Returns
323
        -------
324
        The position of the given minute in the list of all trading minutes
325
        since market open on the first trading day.
326
        """
327
        return self._minute_index.get_loc(minute_dt)
328
329
    def _open_minute_file(self, field, asset):
330
        sid_str = str(int(asset))
331
332
        try:
333
            carray = self._carrays[field][sid_str]
334
        except KeyError:
335
            carray = self._carrays[field][sid_str] = \
336
                self._get_ctable(asset)[field]
337
338
        return carray
339