Completed
Pull Request — master (#858)
by
unknown
01:50
created

zipline.data.BcolzMinuteBarReader.__init__()   A

Complexity

Conditions 1

Size

Total Lines 10

Duplication

Lines 0
Ratio 0 %
Metric Value
cc 1
dl 0
loc 10
rs 9.4286
1
from abc import (
2
    ABCMeta,
3
    abstractmethod,
4
)
5
import json
6
import os
7
from bcolz import ctable
8
from datetime import datetime
9
import numpy as np
10
from numpy import float64
11
from os.path import join
12
import pandas as pd
13
from pandas import read_csv
14
from six import with_metaclass
15
16
from zipline.finance.trading import TradingEnvironment
17
from zipline.utils import tradingcalendar
18
19
MINUTES_PER_DAY = 390
20
21
_writer_env = TradingEnvironment()
22
23
METADATA_FILENAME = 'metadata.json'
24
25
26
def write_metadata(directory, first_trading_day):
27
    metadata_path = os.path.join(directory, METADATA_FILENAME)
28
29
    metadata = {
30
        'first_trading_day': str(first_trading_day.date())
31
    }
32
33
    with open(metadata_path, 'w') as fp:
34
        json.dump(metadata, fp)
35
36
37
class BcolzMinuteBarWriter(with_metaclass(ABCMeta)):
38
    """
39
    Class capable of writing minute OHLCV data to disk into bcolz format.
40
    """
41
    @property
42
    def first_trading_day(self):
43
        return self._first_trading_day
44
45
    @abstractmethod
46
    def gen_frames(self, assets):
47
        """
48
        Return an iterator of pairs of (asset_id, pd.dataframe).
49
        """
50
        raise NotImplementedError()
51
52
    def write(self, directory, assets, sid_path_func=None):
53
        _iterator = self.gen_frames(assets)
54
55
        return self._write_internal(directory, _iterator,
56
                                    sid_path_func=sid_path_func)
57
58
    @staticmethod
59
    def full_minutes_for_days(env, dt1, dt2):
60
        start_date = env.normalize_date(dt1)
61
        end_date = env.normalize_date(dt2)
62
63
        all_minutes = []
64
65
        for day in env.days_in_range(start_date, end_date):
66
            minutes_in_day = pd.date_range(
67
                start=pd.Timestamp(
68
                    datetime(
69
                        year=day.year,
70
                        month=day.month,
71
                        day=day.day,
72
                        hour=9,
73
                        minute=31),
74
                    tz='US/Eastern').tz_convert('UTC'),
75
                periods=390,
76
                freq="min"
77
            )
78
79
            all_minutes.append(minutes_in_day)
80
81
        # flatten
82
        return pd.DatetimeIndex(
83
            np.concatenate(all_minutes), copy=False, tz='UTC'
84
        )
85
86
    def _write_internal(self, directory, iterator, sid_path_func=None):
87
        first_trading_day = self.first_trading_day
88
89
        write_metadata(directory, first_trading_day)
90
91
        first_open = pd.Timestamp(
92
            datetime(
93
                year=first_trading_day.year,
94
                month=first_trading_day.month,
95
                day=first_trading_day.day,
96
                hour=9,
97
                minute=31
98
            ), tz='US/Eastern').tz_convert('UTC')
99
100
        for asset_id, df in iterator:
101
            if sid_path_func is None:
102
                path = join(directory, "{0}.bcolz".format(asset_id))
103
            else:
104
                path = sid_path_func(directory, asset_id)
105
106
            os.makedirs(path)
107
108
            minutes = self.full_minutes_for_days(_writer_env,
109
                                                 first_open, df.index[-1])
110
            minutes_count = len(minutes)
111
112
            dt_col = np.zeros(minutes_count, dtype=np.uint32)
113
            open_col = np.zeros(minutes_count, dtype=np.uint32)
114
            high_col = np.zeros(minutes_count, dtype=np.uint32)
115
            low_col = np.zeros(minutes_count, dtype=np.uint32)
116
            close_col = np.zeros(minutes_count, dtype=np.uint32)
117
            vol_col = np.zeros(minutes_count, dtype=np.uint32)
118
119
            for row in df.iterrows():
120
                dt = row[0]
121
                idx = minutes.searchsorted(dt)
122
123
                dt_col[idx] = dt.value / 1e9
124
                open_col[idx] = row[1].loc["open"]
125
                high_col[idx] = row[1].loc["high"]
126
                low_col[idx] = row[1].loc["low"]
127
                close_col[idx] = row[1].loc["close"]
128
                vol_col[idx] = row[1].loc["volume"]
129
130
            ctable(
131
                columns=[
132
                    open_col,
133
                    high_col,
134
                    low_col,
135
                    close_col,
136
                    vol_col,
137
                    dt_col
138
                ],
139
                names=[
140
                    "open",
141
                    "high",
142
                    "low",
143
                    "close",
144
                    "volume",
145
                    "dt"
146
                ],
147
                rootdir=path,
148
                mode='w'
149
            )
150
151
152
class MinuteBarWriterFromDataFrames(BcolzMinuteBarWriter):
153
    _csv_dtypes = {
154
        'open': float64,
155
        'high': float64,
156
        'low': float64,
157
        'close': float64,
158
        'volume': float64,
159
    }
160
161
    def __init__(self, first_trading_day):
162
        self._first_trading_day = first_trading_day
163
164
    def gen_frames(self, assets):
165
        for asset in assets:
166
            df = assets[asset]
167
            yield asset, df.set_index("minute")
168
169
170
class MinuteBarWriterFromCSVs(BcolzMinuteBarWriter):
171
    """
172
    BcolzMinuteBarWriter constructed from a map of CSVs to assets.
173
174
    Parameters
175
    ----------
176
    asset_map: dict
177
        A map from asset_id -> path to csv with data for that asset.
178
179
    CSVs should have the following columns:
180
        minute : datetime64
181
        open : float64
182
        high : float64
183
        low : float64
184
        close : float64
185
        volume : int64
186
    """
187
    _csv_dtypes = {
188
        'open': float64,
189
        'high': float64,
190
        'low': float64,
191
        'close': float64,
192
        'volume': float64,
193
    }
194
195
    def __init__(self, asset_map, first_trading_day):
196
        self._asset_map = asset_map
197
        self._first_trading_day = first_trading_day
198
199
    def gen_frames(self, assets):
200
        """
201
        Read CSVs as DataFrames from our asset map.
202
        """
203
        dtypes = self._csv_dtypes
204
205
        for asset in assets:
206
            path = self._asset_map.get(asset)
207
            if path is None:
208
                raise KeyError("No path supplied for asset %s" % asset)
209
            df = read_csv(path, parse_dates=['minute'], dtype=dtypes)
210
            df = df.set_index("minute").tz_localize("UTC")
211
212
            yield asset, df
213
214
215
class BcolzMinuteBarReader(object):
216
217
    def __init__(self, rootdir):
218
        self.rootdir = rootdir
219
220
        metadata = self._get_metadata()
221
222
        self.first_trading_day = pd.Timestamp(
223
            metadata['first_trading_day'], tz='UTC')
224
        mask = tradingcalendar.trading_days.slice_indexer(
225
            self.first_trading_day)
226
        self.trading_days = tradingcalendar.trading_days[mask]
227
228
    def _get_metadata(self):
229
        with open(os.path.join(self.rootdir, METADATA_FILENAME)) as fp:
230
            return json.load(fp)
231