Completed
Pull Request — master (#906)
by Eddie
01:25
created

zipline.data.MinuteBarWriterFromCSVs.__init__()   A

Complexity

Conditions 1

Size

Total Lines 3

Duplication

Lines 0
Ratio 0 %
Metric Value
cc 1
dl 0
loc 3
rs 10
1
# Copyright 2015 Quantopian, Inc.
2
#
3
# Licensed under the Apache License, Version 2.0 (the "License");
4
# you may not use this file except in compliance with the License.
5
# You may obtain a copy of the License at
6
#
7
#     http://www.apache.org/licenses/LICENSE-2.0
8
#
9
# Unless required by applicable law or agreed to in writing, software
10
# distributed under the License is distributed on an "AS IS" BASIS,
11
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12
# See the License for the specific language governing permissions and
13
# limitations under the License.
14
from abc import (
15
    ABCMeta,
16
    abstractmethod,
17
)
18
import bcolz
19
import json
20
import os
21
from bcolz import ctable
22
from datetime import datetime
23
import numpy as np
24
from numpy import float64
25
from os.path import join
26
import pandas as pd
27
from pandas import read_csv
28
from six import with_metaclass
29
30
from zipline.finance.trading import TradingEnvironment
31
from zipline.utils import tradingcalendar
32
33
MINUTES_PER_DAY = 390
34
35
_writer_env = TradingEnvironment()
36
37
METADATA_FILENAME = 'metadata.json'
38
39
40
def write_metadata(directory, first_trading_day):
41
    metadata_path = os.path.join(directory, METADATA_FILENAME)
42
43
    metadata = {
44
        'first_trading_day': str(first_trading_day.date())
45
    }
46
47
    with open(metadata_path, 'w') as fp:
48
        json.dump(metadata, fp)
49
50
51
class BcolzMinuteBarWriter(with_metaclass(ABCMeta)):
52
    """
53
    Class capable of writing minute OHLCV data to disk into bcolz format.
54
    """
55
    @property
56
    def first_trading_day(self):
57
        return self._first_trading_day
58
59
    @abstractmethod
60
    def gen_frames(self, assets):
61
        """
62
        Return an iterator of pairs of (asset_id, pd.dataframe).
63
        """
64
        raise NotImplementedError()
65
66
    def write(self, directory, assets, sid_path_func=None):
67
        _iterator = self.gen_frames(assets)
68
69
        return self._write_internal(directory, _iterator,
70
                                    sid_path_func=sid_path_func)
71
72
    @staticmethod
73
    def full_minutes_for_days(env, dt1, dt2):
74
        start_date = env.normalize_date(dt1)
75
        end_date = env.normalize_date(dt2)
76
77
        all_minutes = []
78
79
        for day in env.days_in_range(start_date, end_date):
80
            minutes_in_day = pd.date_range(
81
                start=pd.Timestamp(
82
                    datetime(
83
                        year=day.year,
84
                        month=day.month,
85
                        day=day.day,
86
                        hour=9,
87
                        minute=31),
88
                    tz='US/Eastern').tz_convert('UTC'),
89
                periods=390,
90
                freq="min"
91
            )
92
93
            all_minutes.append(minutes_in_day)
94
95
        # flatten
96
        return pd.DatetimeIndex(
97
            np.concatenate(all_minutes), copy=False, tz='UTC'
98
        )
99
100
    def _write_internal(self, directory, iterator, sid_path_func=None):
101
        first_trading_day = self.first_trading_day
102
103
        write_metadata(directory, first_trading_day)
104
105
        first_open = pd.Timestamp(
106
            datetime(
107
                year=first_trading_day.year,
108
                month=first_trading_day.month,
109
                day=first_trading_day.day,
110
                hour=9,
111
                minute=31
112
            ), tz='US/Eastern').tz_convert('UTC')
113
114
        for asset_id, df in iterator:
115
            if sid_path_func is None:
116
                path = join(directory, "{0}.bcolz".format(asset_id))
117
            else:
118
                path = sid_path_func(directory, asset_id)
119
120
            os.makedirs(path)
121
122
            minutes = self.full_minutes_for_days(_writer_env,
123
                                                 first_open, df.index[-1])
124
            minutes_count = len(minutes)
125
126
            dt_col = np.zeros(minutes_count, dtype=np.uint32)
127
            open_col = np.zeros(minutes_count, dtype=np.uint32)
128
            high_col = np.zeros(minutes_count, dtype=np.uint32)
129
            low_col = np.zeros(minutes_count, dtype=np.uint32)
130
            close_col = np.zeros(minutes_count, dtype=np.uint32)
131
            vol_col = np.zeros(minutes_count, dtype=np.uint32)
132
133
            for row in df.iterrows():
134
                dt = row[0]
135
                idx = minutes.searchsorted(dt)
136
137
                dt_col[idx] = dt.value / 1e9
138
                open_col[idx] = row[1].loc["open"]
139
                high_col[idx] = row[1].loc["high"]
140
                low_col[idx] = row[1].loc["low"]
141
                close_col[idx] = row[1].loc["close"]
142
                vol_col[idx] = row[1].loc["volume"]
143
144
            ctable(
145
                columns=[
146
                    open_col,
147
                    high_col,
148
                    low_col,
149
                    close_col,
150
                    vol_col,
151
                    dt_col
152
                ],
153
                names=[
154
                    "open",
155
                    "high",
156
                    "low",
157
                    "close",
158
                    "volume",
159
                    "dt"
160
                ],
161
                rootdir=path,
162
                mode='w'
163
            )
164
165
166
class MinuteBarWriterFromDataFrames(BcolzMinuteBarWriter):
167
    _csv_dtypes = {
168
        'open': float64,
169
        'high': float64,
170
        'low': float64,
171
        'close': float64,
172
        'volume': float64,
173
    }
174
175
    def __init__(self, first_trading_day):
176
        self._first_trading_day = first_trading_day
177
178
    def gen_frames(self, assets):
179
        for asset in assets:
180
            df = assets[asset]
181
            yield asset, df.set_index("minute")
182
183
184
class MinuteBarWriterFromCSVs(BcolzMinuteBarWriter):
185
    """
186
    BcolzMinuteBarWriter constructed from a map of CSVs to assets.
187
188
    Parameters
189
    ----------
190
    asset_map: dict
191
        A map from asset_id -> path to csv with data for that asset.
192
193
    CSVs should have the following columns:
194
        minute : datetime64
195
        open : float64
196
        high : float64
197
        low : float64
198
        close : float64
199
        volume : int64
200
    """
201
    _csv_dtypes = {
202
        'open': float64,
203
        'high': float64,
204
        'low': float64,
205
        'close': float64,
206
        'volume': float64,
207
    }
208
209
    def __init__(self, asset_map, first_trading_day):
210
        self._asset_map = asset_map
211
        self._first_trading_day = first_trading_day
212
213
    def gen_frames(self, assets):
214
        """
215
        Read CSVs as DataFrames from our asset map.
216
        """
217
        dtypes = self._csv_dtypes
218
219
        for asset in assets:
220
            path = self._asset_map.get(asset)
221
            if path is None:
222
                raise KeyError("No path supplied for asset %s" % asset)
223
            df = read_csv(path, parse_dates=['minute'], dtype=dtypes)
224
            df = df.set_index("minute").tz_localize("UTC")
225
226
            yield asset, df
227
228
229
class BcolzMinuteBarReader(object):
230
231
    def __init__(self, rootdir, sid_path_func=None):
232
        self.rootdir = rootdir
233
234
        metadata = self._get_metadata()
235
236
        self.first_trading_day = pd.Timestamp(
237
            metadata['first_trading_day'], tz='UTC')
238
        mask = tradingcalendar.trading_days.slice_indexer(
239
            self.first_trading_day)
240
        # TODO: Read/write calendar to match daily, so that calendar is not
241
        # 'hardcoded'.
242
        self.trading_days = tradingcalendar.trading_days[mask]
243
        self._sid_path_func = sid_path_func
244
245
        self._carrays = {
246
            'open': {},
247
            'high': {},
248
            'low': {},
249
            'close': {},
250
            'volume': {},
251
            'sid': {},
252
            'dt': {},
253
        }
254
255
    def _get_metadata(self):
256
        with open(os.path.join(self.rootdir, METADATA_FILENAME)) as fp:
257
            return json.load(fp)
258
259
    def _get_ctable(self, asset):
260
        sid = int(asset)
261
        if self._sid_path_func is not None:
262
            path = self._sid_path_func(self.rootdir, sid)
263
        else:
264
            path = "{0}/{1}.bcolz".format(self.rootdir, sid)
265
266
        return bcolz.open(path, mode='r')
267
268
    def _find_position_of_minute(self, minute_dt):
269
        """
270
        Internal method that returns the position of the given minute in the
271
        list of every trading minute since market open on 1/2/2002.
272
273
        IMPORTANT: This method assumes every day is 390 minutes long, even
274
        early closes.  Our minute bcolz files are generated like this to
275
        support fast lookup.
276
277
        ex. this method would return 2 for 1/2/2002 9:32 AM Eastern.
278
279
        Parameters
280
        ----------
281
        minute_dt: pd.Timestamp
282
            The minute whose position should be calculated.
283
284
        Returns
285
        -------
286
        The position of the given minute in the list of all trading minutes
287
        since market open on 1/2/2002.
288
        """
289
        day = minute_dt.date()
290
        day_idx = self.trading_days.searchsorted(day)
291
        if day_idx < 0:
292
            return -1
293
294
        day_open = pd.Timestamp(
295
            datetime(
296
                year=day.year,
297
                month=day.month,
298
                day=day.day,
299
                hour=9,
300
                minute=31),
301
            tz='US/Eastern').tz_convert('UTC')
302
303
        minutes_offset = int((minute_dt - day_open).total_seconds()) / 60
304
305
        return int((390 * day_idx) + minutes_offset)
306
307
    def _open_minute_file(self, field, asset):
308
        sid_str = str(int(asset))
309
310
        try:
311
            carray = self._carrays[field][sid_str]
312
        except KeyError:
313
            carray = self._carrays[field][sid_str] = \
314
                self._get_ctable(asset)[field]
315
316
        return carray
317