Completed
Pull Request — master (#858)
by Eddie
01:35
created

zipline.data.BcolzMinuteBarWriter.write()   A

Complexity

Conditions 1

Size

Total Lines 5

Duplication

Lines 0
Ratio 0 %
Metric Value
cc 1
dl 0
loc 5
rs 9.4286
1
from abc import (
2
    ABCMeta,
3
    abstractmethod,
4
)
5
import os
6
from bcolz import ctable
7
from datetime import datetime
8
import numpy as np
9
from numpy import float64
10
from os.path import join
11
import pandas as pd
12
from pandas import read_csv
13
from six import with_metaclass
14
15
from zipline.finance.trading import TradingEnvironment
16
17
MINUTES_PER_DAY = 390
18
19
_writer_env = TradingEnvironment()
20
21
22
class BcolzMinuteBarWriter(with_metaclass(ABCMeta)):
23
    """
24
    Class capable of writing minute OHLCV data to disk into bcolz format.
25
    """
26
    @abstractmethod
27
    def gen_frames(self, assets):
28
        """
29
        Return an iterator of pairs of (asset_id, pd.dataframe).
30
        """
31
        raise NotImplementedError()
32
33
    def write(self, directory, assets, sid_path_func=None):
34
        _iterator = self.gen_frames(assets)
35
36
        return self._write_internal(directory, _iterator,
37
                                    sid_path_func=sid_path_func)
38
39
    @staticmethod
40
    def full_minutes_for_days(env, dt1, dt2):
41
        start_date = env.normalize_date(dt1)
42
        end_date = env.normalize_date(dt2)
43
44
        all_minutes = []
45
46
        for day in env.days_in_range(start_date, end_date):
47
            minutes_in_day = pd.date_range(
48
                start=pd.Timestamp(
49
                    datetime(
50
                        year=day.year,
51
                        month=day.month,
52
                        day=day.day,
53
                        hour=9,
54
                        minute=31),
55
                    tz='US/Eastern').tz_convert('UTC'),
56
                periods=390,
57
                freq="min"
58
            )
59
60
            all_minutes.append(minutes_in_day)
61
62
        # flatten
63
        return pd.DatetimeIndex(
64
            np.concatenate(all_minutes), copy=False, tz='UTC'
65
        )
66
67
    def _write_internal(self, directory, iterator, sid_path_func=None):
68
        first_open = pd.Timestamp(
69
            datetime(
70
                year=2002,
71
                month=1,
72
                day=2,
73
                hour=9,
74
                minute=31
75
            ), tz='US/Eastern').tz_convert('UTC')
76
77
        for asset_id, df in iterator:
78
            if sid_path_func is None:
79
                path = join(directory, "{0}.bcolz".format(asset_id))
80
            else:
81
                path = sid_path_func(directory, asset_id)
82
83
            os.makedirs(path)
84
85
            minutes = self.full_minutes_for_days(_writer_env,
86
                                                 first_open, df.index[-1])
87
            minutes_count = len(minutes)
88
89
            dt_col = np.zeros(minutes_count, dtype=np.uint32)
90
            open_col = np.zeros(minutes_count, dtype=np.uint32)
91
            high_col = np.zeros(minutes_count, dtype=np.uint32)
92
            low_col = np.zeros(minutes_count, dtype=np.uint32)
93
            close_col = np.zeros(minutes_count, dtype=np.uint32)
94
            vol_col = np.zeros(minutes_count, dtype=np.uint32)
95
96
            for row in df.iterrows():
97
                dt = row[0]
98
                idx = minutes.searchsorted(dt)
99
100
                dt_col[idx] = dt.value / 1e9
101
                open_col[idx] = row[1].loc["open"]
102
                high_col[idx] = row[1].loc["high"]
103
                low_col[idx] = row[1].loc["low"]
104
                close_col[idx] = row[1].loc["close"]
105
                vol_col[idx] = row[1].loc["volume"]
106
107
            ctable(
108
                columns=[
109
                    open_col,
110
                    high_col,
111
                    low_col,
112
                    close_col,
113
                    vol_col,
114
                    dt_col
115
                ],
116
                names=[
117
                    "open",
118
                    "high",
119
                    "low",
120
                    "close",
121
                    "volume",
122
                    "dt"
123
                ],
124
                rootdir=path,
125
                mode='w'
126
            )
127
128
129
class MinuteBarWriterFromDataFrames(BcolzMinuteBarWriter):
130
    _csv_dtypes = {
131
        'open': float64,
132
        'high': float64,
133
        'low': float64,
134
        'close': float64,
135
        'volume': float64,
136
    }
137
138
    def gen_frames(self, assets):
139
        for asset in assets:
140
            df = assets[asset]
141
            yield asset, df.set_index("minute")
142
143
144
class MinuteBarWriterFromCSVs(BcolzMinuteBarWriter):
145
    """
146
    BcolzMinuteBarWriter constructed from a map of CSVs to assets.
147
148
    Parameters
149
    ----------
150
    asset_map: dict
151
        A map from asset_id -> path to csv with data for that asset.
152
153
    CSVs should have the following columns:
154
        minute : datetime64
155
        open : float64
156
        high : float64
157
        low : float64
158
        close : float64
159
        volume : int64
160
    """
161
    _csv_dtypes = {
162
        'open': float64,
163
        'high': float64,
164
        'low': float64,
165
        'close': float64,
166
        'volume': float64,
167
    }
168
169
    def __init__(self, asset_map):
170
        self._asset_map = asset_map
171
172
    def gen_frames(self, assets):
173
        """
174
        Read CSVs as DataFrames from our asset map.
175
        """
176
        dtypes = self._csv_dtypes
177
178
        for asset in assets:
179
            path = self._asset_map.get(asset)
180
            if path is None:
181
                raise KeyError("No path supplied for asset %s" % asset)
182
            df = read_csv(path, parse_dates=['minute'], dtype=dtypes)
183
            df = df.set_index("minute").tz_localize("UTC")
184
185
            yield asset, df
186