Completed
Pull Request — master (#858)
by Eddie
02:39
created

first_trading_day()   A

Complexity

Conditions 1

Size

Total Lines 3

Duplication

Lines 0
Ratio 0 %
Metric Value
cc 1
dl 0
loc 3
rs 10
1
# Copyright 2015 Quantopian, Inc.
2
#
3
# Licensed under the Apache License, Version 2.0 (the "License");
4
# you may not use this file except in compliance with the License.
5
# You may obtain a copy of the License at
6
#
7
#     http://www.apache.org/licenses/LICENSE-2.0
8
#
9
# Unless required by applicable law or agreed to in writing, software
10
# distributed under the License is distributed on an "AS IS" BASIS,
11
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12
# See the License for the specific language governing permissions and
13
# limitations under the License.
14
15
from abc import (
16
    ABCMeta,
17
    abstractmethod,
18
)
19
import bcolz
20
import json
21
import os
22
from bcolz import ctable
23
from datetime import datetime
24
import numpy as np
25
from numpy import float64
26
from os.path import join
27
import pandas as pd
28
from pandas import read_csv
29
from six import with_metaclass
30
31
from zipline.finance.trading import TradingEnvironment
32
from zipline.utils import tradingcalendar
33
34
MINUTES_PER_DAY = 390
35
36
_writer_env = TradingEnvironment()
37
38
_NANOS_IN_MINUTE = 60000000000
39
40
METADATA_FILENAME = 'metadata.json'
41
42
43
def write_metadata(directory, first_trading_day):
44
    metadata_path = os.path.join(directory, METADATA_FILENAME)
45
46
    metadata = {
47
        'first_trading_day': str(first_trading_day.date())
48
    }
49
50
    with open(metadata_path, 'w') as fp:
51
        json.dump(metadata, fp)
52
53
54
def _bcolz_minute_index(trading_days):
55
    minutes = np.zeros(len(trading_days) * MINUTES_PER_DAY,
56
                       dtype='datetime64[ns]')
57
    market_opens = tradingcalendar.open_and_closes.market_open
58
    mask = market_opens.index.slice_indexer(start=trading_days[0],
59
                                            end=trading_days[-1])
60
    opens = market_opens[mask]
61
62
    deltas = np.arange(0, MINUTES_PER_DAY, dtype='timedelta64[m]')
63
    for i, market_open in enumerate(opens):
64
        start = market_open.asm8
65
        minute_values = start + deltas
66
        start_ix = MINUTES_PER_DAY * i
67
        end_ix = start_ix + MINUTES_PER_DAY
68
        minutes[start_ix:end_ix] = minute_values
69
    return pd.to_datetime(minutes, utc=True, box=True)
70
71
72
class BcolzMinuteBarWriter(with_metaclass(ABCMeta)):
73
    """
74
    Class capable of writing minute OHLCV data to disk into bcolz format.
75
    """
76
    @property
77
    def first_trading_day(self):
78
        return self._first_trading_day
79
80
    @abstractmethod
81
    def gen_frames(self, assets):
82
        """
83
        Return an iterator of pairs of (asset_id, pd.dataframe).
84
        """
85
        raise NotImplementedError()
86
87
    def write(self, directory, assets, sid_path_func=None):
88
        _iterator = self.gen_frames(assets)
89
90
        return self._write_internal(directory, _iterator,
91
                                    sid_path_func=sid_path_func)
92
93
    def full_minutes_for_days(self, dt1, dt2):
94
        start_date = _writer_env.normalize_date(dt1)
95
        end_date = _writer_env.normalize_date(dt2)
96
97
        trading_days = _writer_env.days_in_range(start_date, end_date)
98
        return _bcolz_minute_index(trading_days)
99
100
    def _write_internal(self, directory, iterator, sid_path_func=None):
101
        first_trading_day = self.first_trading_day
102
103
        write_metadata(directory, first_trading_day)
104
105
        first_open = pd.Timestamp(
106
            datetime(
107
                year=first_trading_day.year,
108
                month=first_trading_day.month,
109
                day=first_trading_day.day,
110
                hour=9,
111
                minute=31
112
            ), tz='US/Eastern').tz_convert('UTC')
113
114
        all_minutes = None
115
116
        for asset_id, df in iterator:
117
            if sid_path_func is None:
118
                path = join(directory, "{0}.bcolz".format(asset_id))
119
            else:
120
                path = sid_path_func(directory, asset_id)
121
122
            os.makedirs(path)
123
124
            last_dt = df.index[-1]
125
126
            if all_minutes is None:
127
                all_minutes = \
128
                    self.full_minutes_for_days(first_open, last_dt)
129
                minutes = all_minutes
130
            else:
131
                if df.index[-1] in all_minutes:
132
                    mask = all_minutes.slice_indexer(end=last_dt)
133
                    minutes = all_minutes[mask]
134
                else:
135
                    # Need to extend all minutes from open after last value
136
                    # in all_minutes to the last_dt.
137
                    next_open, _ = _writer_env.next_open_and_close(
138
                        all_minutes[-1])
139
                    to_append = self.full_minutes_for_days(next_open, last_dt)
140
                    all_minutes = all_minutes.append(to_append)
141
                    minutes = all_minutes
142
143
            minutes_count = len(minutes)
144
145
            open_col = np.zeros(minutes_count, dtype=np.uint32)
146
            high_col = np.zeros(minutes_count, dtype=np.uint32)
147
            low_col = np.zeros(minutes_count, dtype=np.uint32)
148
            close_col = np.zeros(minutes_count, dtype=np.uint32)
149
            vol_col = np.zeros(minutes_count, dtype=np.uint32)
150
151
            opens = df.open.values.astype(np.uint32)
152
            highs = df.high.values.astype(np.uint32)
153
            lows = df.low.values.astype(np.uint32)
154
            closes = df.close.values.astype(np.uint32)
155
            volumes = df.volume.values.astype(np.uint32)
156
157
            dt_ixs = np.searchsorted(minutes.values, df.index.values)
158
159
            for i, dt_ix in enumerate(dt_ixs):
160
                # Each day has 390 slots, where 9:31 is the first
161
                # slot (ix=0) of the day, and each slot represents a
162
                # minute's data.
163
                #
164
                # Get the difference in seconds between the current
165
                # minute and market open and then divide by 60 to get
166
                # the index into which to write, while still writing
167
                    # from the same row aligned with the dt in the from the
168
                # CSV.
169
                open_col[dt_ix] = opens[i]
170
                high_col[dt_ix] = highs[i]
171
                low_col[dt_ix] = lows[i]
172
                close_col[dt_ix] = closes[i]
173
                vol_col[dt_ix] = volumes[i]
174
175
            ctable(
176
                columns=[
177
                    open_col,
178
                    high_col,
179
                    low_col,
180
                    close_col,
181
                    vol_col,
182
                ],
183
                names=[
184
                    "open",
185
                    "high",
186
                    "low",
187
                    "close",
188
                    "volume",
189
                ],
190
                rootdir=path,
191
                mode='w'
192
            )
193
194
195
class MinuteBarWriterFromDataFrames(BcolzMinuteBarWriter):
196
    _csv_dtypes = {
197
        'open': float64,
198
        'high': float64,
199
        'low': float64,
200
        'close': float64,
201
        'volume': float64,
202
    }
203
204
    def __init__(self, first_trading_day):
205
        self._first_trading_day = first_trading_day
206
207
    def gen_frames(self, assets):
208
        for asset in assets:
209
            df = assets[asset]
210
            yield asset, df.set_index("minute")
211
212
213
class MinuteBarWriterFromCSVs(BcolzMinuteBarWriter):
214
    """
215
    BcolzMinuteBarWriter constructed from a map of CSVs to assets.
216
217
    Parameters
218
    ----------
219
    asset_map: dict
220
        A map from asset_id -> path to csv with data for that asset.
221
222
    CSVs should have the following columns:
223
        minute : datetime64
224
        open : float64
225
        high : float64
226
        low : float64
227
        close : float64
228
        volume : int64
229
    """
230
    _csv_dtypes = {
231
        'open': float64,
232
        'high': float64,
233
        'low': float64,
234
        'close': float64,
235
        'volume': float64,
236
    }
237
238
    def __init__(self, asset_map, first_trading_day):
239
        self._asset_map = asset_map
240
        self._first_trading_day = first_trading_day
241
242
    def gen_frames(self, assets):
243
        """
244
        Read CSVs as DataFrames from our asset map.
245
        """
246
        dtypes = self._csv_dtypes
247
248
        for asset in assets:
249
            path = self._asset_map.get(asset)
250
            if path is None:
251
                raise KeyError("No path supplied for asset %s" % asset)
252
            df = read_csv(path, parse_dates=['minute'], dtype=dtypes)
253
            df = df.set_index("minute").tz_localize("UTC")
254
255
            yield asset, df
256
257
258
class BcolzMinuteBarReader(object):
259
260
    def __init__(self, rootdir, sid_path_func=None):
261
        self.rootdir = rootdir
262
263
        metadata = self._get_metadata()
264
265
        self.first_trading_day = pd.Timestamp(
266
            metadata['first_trading_day'], tz='UTC')
267
        mask = tradingcalendar.trading_days.slice_indexer(
268
            self.first_trading_day)
269
        # TODO: Read/write calendar to match daily, so that calendar is not
270
        # 'hardcoded'.
271
        self.trading_days = tradingcalendar.trading_days[mask]
272
        self._sid_path_func = sid_path_func
273
274
        self._carrays = {
275
            'open': {},
276
            'high': {},
277
            'low': {},
278
            'close': {},
279
            'volume': {},
280
            'sid': {},
281
            'dt': {},
282
        }
283
284
        self._minute_index = _bcolz_minute_index(self.trading_days)
285
286
    def _get_metadata(self):
287
        with open(os.path.join(self.rootdir, METADATA_FILENAME)) as fp:
288
            return json.load(fp)
289
290
    def _get_ctable(self, asset):
291
        sid = int(asset)
292
        if self._sid_path_func is not None:
293
            path = self._sid_path_func(self.rootdir, sid)
294
        else:
295
            path = "{0}/{1}.bcolz".format(self.rootdir, sid)
296
297
        return bcolz.open(path, mode='r')
298
299
    def get_last_traded_dt(self, asset, dt):
300
        return self._minute_index[
301
            self._find_last_traded_position(asset, dt)]
302
303
    def _find_last_traded_position(self, asset, dt):
304
        volumes = self._open_minute_file('volume', asset)
305
        minute_pos = self._find_position_of_minute(dt)
306
        while True:
307
            if minute_pos == 0 or volumes[minute_pos] != 0:
308
                return minute_pos
309
            minute_pos -= 1
310
311
    def _find_position_of_minute(self, minute_dt):
312
        """
313
        Internal method that returns the position of the given minute in the
314
        list of every trading minute since market open of the first trading
315
        day.
316
317
        IMPORTANT: This method assumes every day is 390 minutes long, even
318
        early closes.  Our minute bcolz files are generated like this to
319
        support fast lookup.
320
321
        ex. this method would return 2 for 1/2/2002 9:32 AM Eastern, if
322
        1/2/2002 is the first trading day of the dataset.
323
324
        Parameters
325
        ----------
326
        minute_dt: pd.Timestamp
327
            The minute whose position should be calculated.
328
329
        Returns
330
        -------
331
        The position of the given minute in the list of all trading minutes
332
        since market open on the first trading day.
333
        """
334
        return self._minute_index.get_loc(minute_dt)
335
336
    def _open_minute_file(self, field, asset):
337
        sid_str = str(int(asset))
338
339
        try:
340
            carray = self._carrays[field][sid_str]
341
        except KeyError:
342
            carray = self._carrays[field][sid_str] = \
343
                self._get_ctable(asset)[field]
344
345
        return carray
346