Completed
Pull Request — master (#858)
by Eddie
01:43
created

gen_frames()   A

Complexity

Conditions 2

Size

Total Lines 4

Duplication

Lines 0
Ratio 0 %
Metric Value
cc 2
dl 0
loc 4
rs 10
1
from abc import (
2
    ABCMeta,
3
    abstractmethod,
4
)
5
import bcolz
6
import json
7
import os
8
from bcolz import ctable
9
from datetime import datetime
10
import numpy as np
11
from numpy import float64
12
from os.path import join
13
import pandas as pd
14
from pandas import read_csv
15
from six import with_metaclass
16
17
from zipline.finance.trading import TradingEnvironment
18
from zipline.utils import tradingcalendar
19
20
MINUTES_PER_DAY = 390
21
22
_writer_env = TradingEnvironment()
23
24
METADATA_FILENAME = 'metadata.json'
25
26
27
def write_metadata(directory, first_trading_day):
28
    metadata_path = os.path.join(directory, METADATA_FILENAME)
29
30
    metadata = {
31
        'first_trading_day': str(first_trading_day.date())
32
    }
33
34
    with open(metadata_path, 'w') as fp:
35
        json.dump(metadata, fp)
36
37
38
class BcolzMinuteBarWriter(with_metaclass(ABCMeta)):
39
    """
40
    Class capable of writing minute OHLCV data to disk into bcolz format.
41
    """
42
    @property
43
    def first_trading_day(self):
44
        return self._first_trading_day
45
46
    @abstractmethod
47
    def gen_frames(self, assets):
48
        """
49
        Return an iterator of pairs of (asset_id, pd.dataframe).
50
        """
51
        raise NotImplementedError()
52
53
    def write(self, directory, assets, sid_path_func=None):
54
        _iterator = self.gen_frames(assets)
55
56
        return self._write_internal(directory, _iterator,
57
                                    sid_path_func=sid_path_func)
58
59
    @staticmethod
60
    def full_minutes_for_days(env, dt1, dt2):
61
        start_date = env.normalize_date(dt1)
62
        end_date = env.normalize_date(dt2)
63
64
        all_minutes = []
65
66
        for day in env.days_in_range(start_date, end_date):
67
            minutes_in_day = pd.date_range(
68
                start=pd.Timestamp(
69
                    datetime(
70
                        year=day.year,
71
                        month=day.month,
72
                        day=day.day,
73
                        hour=9,
74
                        minute=31),
75
                    tz='US/Eastern').tz_convert('UTC'),
76
                periods=390,
77
                freq="min"
78
            )
79
80
            all_minutes.append(minutes_in_day)
81
82
        # flatten
83
        return pd.DatetimeIndex(
84
            np.concatenate(all_minutes), copy=False, tz='UTC'
85
        )
86
87
    def _write_internal(self, directory, iterator, sid_path_func=None):
88
        first_trading_day = self.first_trading_day
89
90
        write_metadata(directory, first_trading_day)
91
92
        first_open = pd.Timestamp(
93
            datetime(
94
                year=first_trading_day.year,
95
                month=first_trading_day.month,
96
                day=first_trading_day.day,
97
                hour=9,
98
                minute=31
99
            ), tz='US/Eastern').tz_convert('UTC')
100
101
        for asset_id, df in iterator:
102
            if sid_path_func is None:
103
                path = join(directory, "{0}.bcolz".format(asset_id))
104
            else:
105
                path = sid_path_func(directory, asset_id)
106
107
            os.makedirs(path)
108
109
            minutes = self.full_minutes_for_days(_writer_env,
110
                                                 first_open, df.index[-1])
111
            minutes_count = len(minutes)
112
113
            dt_col = np.zeros(minutes_count, dtype=np.uint32)
114
            open_col = np.zeros(minutes_count, dtype=np.uint32)
115
            high_col = np.zeros(minutes_count, dtype=np.uint32)
116
            low_col = np.zeros(minutes_count, dtype=np.uint32)
117
            close_col = np.zeros(minutes_count, dtype=np.uint32)
118
            vol_col = np.zeros(minutes_count, dtype=np.uint32)
119
120
            for row in df.iterrows():
121
                dt = row[0]
122
                idx = minutes.searchsorted(dt)
123
124
                dt_col[idx] = dt.value / 1e9
125
                open_col[idx] = row[1].loc["open"]
126
                high_col[idx] = row[1].loc["high"]
127
                low_col[idx] = row[1].loc["low"]
128
                close_col[idx] = row[1].loc["close"]
129
                vol_col[idx] = row[1].loc["volume"]
130
131
            ctable(
132
                columns=[
133
                    open_col,
134
                    high_col,
135
                    low_col,
136
                    close_col,
137
                    vol_col,
138
                    dt_col
139
                ],
140
                names=[
141
                    "open",
142
                    "high",
143
                    "low",
144
                    "close",
145
                    "volume",
146
                    "dt"
147
                ],
148
                rootdir=path,
149
                mode='w'
150
            )
151
152
153
class MinuteBarWriterFromDataFrames(BcolzMinuteBarWriter):
154
    _csv_dtypes = {
155
        'open': float64,
156
        'high': float64,
157
        'low': float64,
158
        'close': float64,
159
        'volume': float64,
160
    }
161
162
    def __init__(self, first_trading_day):
163
        self._first_trading_day = first_trading_day
164
165
    def gen_frames(self, assets):
166
        for asset in assets:
167
            df = assets[asset]
168
            yield asset, df.set_index("minute")
169
170
171
class MinuteBarWriterFromCSVs(BcolzMinuteBarWriter):
172
    """
173
    BcolzMinuteBarWriter constructed from a map of CSVs to assets.
174
175
    Parameters
176
    ----------
177
    asset_map: dict
178
        A map from asset_id -> path to csv with data for that asset.
179
180
    CSVs should have the following columns:
181
        minute : datetime64
182
        open : float64
183
        high : float64
184
        low : float64
185
        close : float64
186
        volume : int64
187
    """
188
    _csv_dtypes = {
189
        'open': float64,
190
        'high': float64,
191
        'low': float64,
192
        'close': float64,
193
        'volume': float64,
194
    }
195
196
    def __init__(self, asset_map, first_trading_day):
197
        self._asset_map = asset_map
198
        self._first_trading_day = first_trading_day
199
200
    def gen_frames(self, assets):
201
        """
202
        Read CSVs as DataFrames from our asset map.
203
        """
204
        dtypes = self._csv_dtypes
205
206
        for asset in assets:
207
            path = self._asset_map.get(asset)
208
            if path is None:
209
                raise KeyError("No path supplied for asset %s" % asset)
210
            df = read_csv(path, parse_dates=['minute'], dtype=dtypes)
211
            df = df.set_index("minute").tz_localize("UTC")
212
213
            yield asset, df
214
215
216
class BcolzMinuteBarReader(object):
217
218
    def __init__(self, rootdir, sid_path_func=None):
219
        self.rootdir = rootdir
220
221
        metadata = self._get_metadata()
222
223
        self.first_trading_day = pd.Timestamp(
224
            metadata['first_trading_day'], tz='UTC')
225
        mask = tradingcalendar.trading_days.slice_indexer(
226
            self.first_trading_day)
227
        # TODO: Read/write calendar to match daily, so that calendar is not
228
        # 'hardcoded'.
229
        self.trading_days = tradingcalendar.trading_days[mask]
230
        self._sid_path_func = sid_path_func
231
232
        self._carrays = {
233
            'open': {},
234
            'high': {},
235
            'low': {},
236
            'close': {},
237
            'volume': {},
238
            'sid': {},
239
            'dt': {},
240
        }
241
242
    def _get_metadata(self):
243
        with open(os.path.join(self.rootdir, METADATA_FILENAME)) as fp:
244
            return json.load(fp)
245
246
    def _get_ctable(self, asset):
247
        sid = int(asset)
248
        if self._sid_path_func is not None:
249
            path = self._sid_path_func(self.rootdir, sid)
250
        else:
251
            path = "{0}/{1}.bcolz".format(self.rootdir, sid)
252
253
        return bcolz.open(path, mode='r')
254
255
    def _find_position_of_minute(self, minute_dt):
256
        """
257
        Internal method that returns the position of the given minute in the
258
        list of every trading minute since market open on 1/2/2002.
259
260
        IMPORTANT: This method assumes every day is 390 minutes long, even
261
        early closes.  Our minute bcolz files are generated like this to
262
        support fast lookup.
263
264
        ex. this method would return 2 for 1/2/2002 9:32 AM Eastern.
265
266
        Parameters
267
        ----------
268
        minute_dt: pd.Timestamp
269
            The minute whose position should be calculated.
270
271
        Returns
272
        -------
273
        The position of the given minute in the list of all trading minutes
274
        since market open on 1/2/2002.
275
        """
276
        day = minute_dt.date()
277
        day_idx = self.trading_days.searchsorted(day)
278
        if day_idx < 0:
279
            return -1
280
281
        day_open = pd.Timestamp(
282
            datetime(
283
                year=day.year,
284
                month=day.month,
285
                day=day.day,
286
                hour=9,
287
                minute=31),
288
            tz='US/Eastern').tz_convert('UTC')
289
290
        minutes_offset = int((minute_dt - day_open).total_seconds()) / 60
291
292
        return int((390 * day_idx) + minutes_offset)
293
294
    def _open_minute_file(self, field, asset):
295
        sid_str = str(int(asset))
296
297
        try:
298
            carray = self._carrays[field][sid_str]
299
        except KeyError:
300
            carray = self._carrays[field][sid_str] = \
301
                self._get_ctable(asset)[field]
302
303
        return carray
304