Completed
Pull Request — master (#858)
by Eddie
01:32
created

zipline.data.BcolzMinuteBarWriter.gen_frames()   A

Complexity

Conditions 1

Size

Total Lines 6

Duplication

Lines 0
Ratio 0 %
Metric Value
cc 1
dl 0
loc 6
rs 9.4286
1
# Copyright 2015 Quantopian, Inc.
2
#
3
# Licensed under the Apache License, Version 2.0 (the "License");
4
# you may not use this file except in compliance with the License.
5
# You may obtain a copy of the License at
6
#
7
#     http://www.apache.org/licenses/LICENSE-2.0
8
#
9
# Unless required by applicable law or agreed to in writing, software
10
# distributed under the License is distributed on an "AS IS" BASIS,
11
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12
# See the License for the specific language governing permissions and
13
# limitations under the License.
14
15
from abc import (
16
    ABCMeta,
17
    abstractmethod,
18
)
19
import bcolz
20
import json
21
import os
22
from bcolz import ctable
23
from datetime import datetime
24
import numpy as np
25
from numpy import float64
26
from os.path import join
27
import pandas as pd
28
from pandas import read_csv
29
from six import with_metaclass
30
31
from zipline.finance.trading import TradingEnvironment
32
from zipline.utils import tradingcalendar
33
34
MINUTES_PER_DAY = 390
35
36
_writer_env = TradingEnvironment()
37
38
METADATA_FILENAME = 'metadata.json'
39
40
41
def write_metadata(directory, first_trading_day):
42
    metadata_path = os.path.join(directory, METADATA_FILENAME)
43
44
    metadata = {
45
        'first_trading_day': str(first_trading_day.date())
46
    }
47
48
    with open(metadata_path, 'w') as fp:
49
        json.dump(metadata, fp)
50
51
52
class BcolzMinuteBarWriter(with_metaclass(ABCMeta)):
53
    """
54
    Class capable of writing minute OHLCV data to disk into bcolz format.
55
    """
56
    @property
57
    def first_trading_day(self):
58
        return self._first_trading_day
59
60
    @abstractmethod
61
    def gen_frames(self, assets):
62
        """
63
        Return an iterator of pairs of (asset_id, pd.dataframe).
64
        """
65
        raise NotImplementedError()
66
67
    def write(self, directory, assets, sid_path_func=None):
68
        _iterator = self.gen_frames(assets)
69
70
        return self._write_internal(directory, _iterator,
71
                                    sid_path_func=sid_path_func)
72
73
    @staticmethod
74
    def full_minutes_for_days(env, dt1, dt2):
75
        start_date = env.normalize_date(dt1)
76
        end_date = env.normalize_date(dt2)
77
78
        all_minutes = []
79
80
        for day in env.days_in_range(start_date, end_date):
81
            minutes_in_day = pd.date_range(
82
                start=pd.Timestamp(
83
                    datetime(
84
                        year=day.year,
85
                        month=day.month,
86
                        day=day.day,
87
                        hour=9,
88
                        minute=31),
89
                    tz='US/Eastern').tz_convert('UTC'),
90
                periods=390,
91
                freq="min"
92
            )
93
94
            all_minutes.append(minutes_in_day)
95
96
        # flatten
97
        return pd.DatetimeIndex(
98
            np.concatenate(all_minutes), copy=False, tz='UTC'
99
        )
100
101
    def _write_internal(self, directory, iterator, sid_path_func=None):
102
        first_trading_day = self.first_trading_day
103
104
        write_metadata(directory, first_trading_day)
105
106
        first_open = pd.Timestamp(
107
            datetime(
108
                year=first_trading_day.year,
109
                month=first_trading_day.month,
110
                day=first_trading_day.day,
111
                hour=9,
112
                minute=31
113
            ), tz='US/Eastern').tz_convert('UTC')
114
115
        for asset_id, df in iterator:
116
            if sid_path_func is None:
117
                path = join(directory, "{0}.bcolz".format(asset_id))
118
            else:
119
                path = sid_path_func(directory, asset_id)
120
121
            os.makedirs(path)
122
123
            minutes = self.full_minutes_for_days(_writer_env,
124
                                                 first_open, df.index[-1])
125
            minutes_count = len(minutes)
126
127
            dt_col = np.zeros(minutes_count, dtype=np.uint32)
128
            open_col = np.zeros(minutes_count, dtype=np.uint32)
129
            high_col = np.zeros(minutes_count, dtype=np.uint32)
130
            low_col = np.zeros(minutes_count, dtype=np.uint32)
131
            close_col = np.zeros(minutes_count, dtype=np.uint32)
132
            vol_col = np.zeros(minutes_count, dtype=np.uint32)
133
134
            for row in df.iterrows():
135
                dt = row[0]
136
                idx = minutes.searchsorted(dt)
137
138
                dt_col[idx] = dt.value / 1e9
139
                open_col[idx] = row[1].loc["open"]
140
                high_col[idx] = row[1].loc["high"]
141
                low_col[idx] = row[1].loc["low"]
142
                close_col[idx] = row[1].loc["close"]
143
                vol_col[idx] = row[1].loc["volume"]
144
145
            ctable(
146
                columns=[
147
                    open_col,
148
                    high_col,
149
                    low_col,
150
                    close_col,
151
                    vol_col,
152
                    dt_col
153
                ],
154
                names=[
155
                    "open",
156
                    "high",
157
                    "low",
158
                    "close",
159
                    "volume",
160
                    "dt"
161
                ],
162
                rootdir=path,
163
                mode='w'
164
            )
165
166
167
class MinuteBarWriterFromDataFrames(BcolzMinuteBarWriter):
168
    _csv_dtypes = {
169
        'open': float64,
170
        'high': float64,
171
        'low': float64,
172
        'close': float64,
173
        'volume': float64,
174
    }
175
176
    def __init__(self, first_trading_day):
177
        self._first_trading_day = first_trading_day
178
179
    def gen_frames(self, assets):
180
        for asset in assets:
181
            df = assets[asset]
182
            yield asset, df.set_index("minute")
183
184
185
class MinuteBarWriterFromCSVs(BcolzMinuteBarWriter):
186
    """
187
    BcolzMinuteBarWriter constructed from a map of CSVs to assets.
188
189
    Parameters
190
    ----------
191
    asset_map: dict
192
        A map from asset_id -> path to csv with data for that asset.
193
194
    CSVs should have the following columns:
195
        minute : datetime64
196
        open : float64
197
        high : float64
198
        low : float64
199
        close : float64
200
        volume : int64
201
    """
202
    _csv_dtypes = {
203
        'open': float64,
204
        'high': float64,
205
        'low': float64,
206
        'close': float64,
207
        'volume': float64,
208
    }
209
210
    def __init__(self, asset_map, first_trading_day):
211
        self._asset_map = asset_map
212
        self._first_trading_day = first_trading_day
213
214
    def gen_frames(self, assets):
215
        """
216
        Read CSVs as DataFrames from our asset map.
217
        """
218
        dtypes = self._csv_dtypes
219
220
        for asset in assets:
221
            path = self._asset_map.get(asset)
222
            if path is None:
223
                raise KeyError("No path supplied for asset %s" % asset)
224
            df = read_csv(path, parse_dates=['minute'], dtype=dtypes)
225
            df = df.set_index("minute").tz_localize("UTC")
226
227
            yield asset, df
228
229
230
class BcolzMinuteBarReader(object):
231
232
    def __init__(self, rootdir, sid_path_func=None):
233
        self.rootdir = rootdir
234
235
        metadata = self._get_metadata()
236
237
        self.first_trading_day = pd.Timestamp(
238
            metadata['first_trading_day'], tz='UTC')
239
        mask = tradingcalendar.trading_days.slice_indexer(
240
            self.first_trading_day)
241
        # TODO: Read/write calendar to match daily, so that calendar is not
242
        # 'hardcoded'.
243
        self.trading_days = tradingcalendar.trading_days[mask]
244
        self._sid_path_func = sid_path_func
245
246
        self._carrays = {
247
            'open': {},
248
            'high': {},
249
            'low': {},
250
            'close': {},
251
            'volume': {},
252
            'sid': {},
253
            'dt': {},
254
        }
255
256
    def _get_metadata(self):
257
        with open(os.path.join(self.rootdir, METADATA_FILENAME)) as fp:
258
            return json.load(fp)
259
260
    def _get_ctable(self, asset):
261
        sid = int(asset)
262
        if self._sid_path_func is not None:
263
            path = self._sid_path_func(self.rootdir, sid)
264
        else:
265
            path = "{0}/{1}.bcolz".format(self.rootdir, sid)
266
267
        return bcolz.open(path, mode='r')
268
269
    def _find_position_of_minute(self, minute_dt):
270
        """
271
        Internal method that returns the position of the given minute in the
272
        list of every trading minute since market open of the first trading
273
        day.
274
275
        IMPORTANT: This method assumes every day is 390 minutes long, even
276
        early closes.  Our minute bcolz files are generated like this to
277
        support fast lookup.
278
279
        ex. this method would return 2 for 1/2/2002 9:32 AM Eastern, if
280
        1/2/2002 is the first trading day of the dataset.
281
282
        Parameters
283
        ----------
284
        minute_dt: pd.Timestamp
285
            The minute whose position should be calculated.
286
287
        Returns
288
        -------
289
        The position of the given minute in the list of all trading minutes
290
        since market open on the first trading day.
291
        """
292
        day = minute_dt.date()
293
        day_idx = self.trading_days.searchsorted(day)
294
        if day_idx < 0:
295
            return -1
296
297
        day_open = pd.Timestamp(
298
            datetime(
299
                year=day.year,
300
                month=day.month,
301
                day=day.day,
302
                hour=9,
303
                minute=31),
304
            tz='US/Eastern').tz_convert('UTC')
305
306
        minutes_offset = int((minute_dt - day_open).total_seconds()) / 60
307
308
        return int((390 * day_idx) + minutes_offset)
309
310
    def _open_minute_file(self, field, asset):
311
        sid_str = str(int(asset))
312
313
        try:
314
            carray = self._carrays[field][sid_str]
315
        except KeyError:
316
            carray = self._carrays[field][sid_str] = \
317
                self._get_ctable(asset)[field]
318
319
        return carray
320