1
|
|
|
# |
2
|
|
|
# Copyright 2014 Quantopian, Inc. |
3
|
|
|
# |
4
|
|
|
# Licensed under the Apache License, Version 2.0 (the "License"); |
5
|
|
|
# you may not use this file except in compliance with the License. |
6
|
|
|
# You may obtain a copy of the License at |
7
|
|
|
# |
8
|
|
|
# http://www.apache.org/licenses/LICENSE-2.0 |
9
|
|
|
# |
10
|
|
|
# Unless required by applicable law or agreed to in writing, software |
11
|
|
|
# distributed under the License is distributed on an "AS IS" BASIS, |
12
|
|
|
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
13
|
|
|
# See the License for the specific language governing permissions and |
14
|
|
|
# limitations under the License. |
15
|
|
|
from bisect import insort_left |
16
|
|
|
from collections import namedtuple |
17
|
|
|
from itertools import groupby, product |
18
|
|
|
|
19
|
|
|
import logbook |
20
|
|
|
import numpy as np |
21
|
|
|
import pandas as pd |
22
|
|
|
from six import itervalues, iteritems, iterkeys |
23
|
|
|
|
24
|
|
|
from . history import HistorySpec |
25
|
|
|
|
26
|
|
|
from zipline.utils.data import RollingPanel, _ensure_index |
27
|
|
|
from zipline.utils.munge import ffill, bfill |
28
|
|
|
|
29
|
|
|
logger = logbook.Logger('History Container') |
30
|
|
|
|
31
|
|
|
|
32
|
|
|
# The closing price is referred to by multiple names, |
33
|
|
|
# allow both for price rollover logic etc. |
34
|
|
|
CLOSING_PRICE_FIELDS = frozenset({'price', 'close_price'}) |
35
|
|
|
|
36
|
|
|
|
37
|
|
|
def ffill_buffer_from_prior_values(freq, |
38
|
|
|
field, |
39
|
|
|
buffer_frame, |
40
|
|
|
digest_frame, |
41
|
|
|
pv_frame, |
42
|
|
|
raw=False): |
43
|
|
|
""" |
44
|
|
|
Forward-fill a buffer frame, falling back to the end-of-period values of a |
45
|
|
|
digest frame if the buffer frame has leading NaNs. |
46
|
|
|
""" |
47
|
|
|
# convert to ndarray if necessary |
48
|
|
|
digest_values = digest_frame |
49
|
|
|
if raw and isinstance(digest_frame, pd.DataFrame): |
50
|
|
|
digest_values = digest_frame.values |
51
|
|
|
|
52
|
|
|
buffer_values = buffer_frame |
53
|
|
|
if raw and isinstance(buffer_frame, pd.DataFrame): |
54
|
|
|
buffer_values = buffer_frame.values |
55
|
|
|
|
56
|
|
|
nan_sids = pd.isnull(buffer_values[0]) |
57
|
|
|
if np.any(nan_sids) and len(digest_values): |
58
|
|
|
# If we have any leading nans in the buffer and we have a non-empty |
59
|
|
|
# digest frame, use the oldest digest values as the initial buffer |
60
|
|
|
# values. |
61
|
|
|
buffer_values[0, nan_sids] = digest_values[-1, nan_sids] |
62
|
|
|
|
63
|
|
|
nan_sids = pd.isnull(buffer_values[0]) |
64
|
|
|
if np.any(nan_sids): |
65
|
|
|
# If we still have leading nans, fall back to the last known values |
66
|
|
|
# from before the digest. |
67
|
|
|
key_loc = pv_frame.index.get_loc((freq.freq_str, field)) |
68
|
|
|
filler = pv_frame.values[key_loc, nan_sids] |
69
|
|
|
buffer_values[0, nan_sids] = filler |
70
|
|
|
|
71
|
|
|
if raw: |
72
|
|
|
filled = ffill(buffer_values) |
73
|
|
|
return filled |
74
|
|
|
|
75
|
|
|
return buffer_frame.ffill() |
76
|
|
|
|
77
|
|
|
|
78
|
|
|
def ffill_digest_frame_from_prior_values(freq, |
79
|
|
|
field, |
80
|
|
|
digest_frame, |
81
|
|
|
pv_frame, |
82
|
|
|
raw=False): |
83
|
|
|
""" |
84
|
|
|
Forward-fill a digest frame, falling back to the last known prior values if |
85
|
|
|
necessary. |
86
|
|
|
""" |
87
|
|
|
# convert to ndarray if necessary |
88
|
|
|
values = digest_frame |
89
|
|
|
if raw and isinstance(digest_frame, pd.DataFrame): |
90
|
|
|
values = digest_frame.values |
91
|
|
|
|
92
|
|
|
nan_sids = pd.isnull(values[0]) |
93
|
|
|
if np.any(nan_sids): |
94
|
|
|
# If we have any leading nans in the frame, use values from pv_frame to |
95
|
|
|
# seed values for those sids. |
96
|
|
|
key_loc = pv_frame.index.get_loc((freq.freq_str, field)) |
97
|
|
|
filler = pv_frame.values[key_loc, nan_sids] |
98
|
|
|
values[0, nan_sids] = filler |
99
|
|
|
|
100
|
|
|
if raw: |
101
|
|
|
filled = ffill(values) |
102
|
|
|
return filled |
103
|
|
|
|
104
|
|
|
return digest_frame.ffill() |
105
|
|
|
|
106
|
|
|
|
107
|
|
|
def freq_str_and_bar_count(history_spec): |
108
|
|
|
""" |
109
|
|
|
Helper for getting the frequency string and bar count from a history spec. |
110
|
|
|
""" |
111
|
|
|
return (history_spec.frequency.freq_str, history_spec.bar_count) |
112
|
|
|
|
113
|
|
|
|
114
|
|
|
def next_bar(spec, env): |
115
|
|
|
""" |
116
|
|
|
Returns a function that will return the next bar for a given datetime. |
117
|
|
|
""" |
118
|
|
|
if spec.frequency.unit_str == 'd': |
119
|
|
|
if spec.frequency.data_frequency == 'minute': |
120
|
|
|
return lambda dt: env.get_open_and_close( |
121
|
|
|
env.next_trading_day(dt), |
122
|
|
|
)[1] |
123
|
|
|
else: |
124
|
|
|
return env.next_trading_day |
125
|
|
|
else: |
126
|
|
|
return env.next_market_minute |
127
|
|
|
|
128
|
|
|
|
129
|
|
|
def compute_largest_specs(history_specs): |
130
|
|
|
""" |
131
|
|
|
Maps a Frequency to the largest HistorySpec at that frequency from an |
132
|
|
|
iterable of HistorySpecs. |
133
|
|
|
""" |
134
|
|
|
return {key: max(group, key=lambda f: f.bar_count) |
135
|
|
|
for key, group in groupby( |
136
|
|
|
sorted(history_specs, key=freq_str_and_bar_count), |
137
|
|
|
key=lambda spec: spec.frequency)} |
138
|
|
|
|
139
|
|
|
|
140
|
|
|
# tuples to store a change to the shape of a HistoryContainer |
141
|
|
|
|
142
|
|
|
FrequencyDelta = namedtuple( |
143
|
|
|
'FrequencyDelta', |
144
|
|
|
['freq', 'buffer_delta'], |
145
|
|
|
) |
146
|
|
|
|
147
|
|
|
|
148
|
|
|
LengthDelta = namedtuple( |
149
|
|
|
'LengthDelta', |
150
|
|
|
['freq', 'delta'], |
151
|
|
|
) |
152
|
|
|
|
153
|
|
|
|
154
|
|
|
HistoryContainerDeltaSuper = namedtuple( |
155
|
|
|
'HistoryContainerDelta', |
156
|
|
|
['field', 'frequency_delta', 'length_delta'], |
157
|
|
|
) |
158
|
|
|
|
159
|
|
|
|
160
|
|
|
class HistoryContainerDelta(HistoryContainerDeltaSuper): |
161
|
|
|
""" |
162
|
|
|
A class representing a resize of the history container. |
163
|
|
|
""" |
164
|
|
|
def __new__(cls, field=None, frequency_delta=None, length_delta=None): |
165
|
|
|
""" |
166
|
|
|
field is a new field that was added. |
167
|
|
|
frequency is a FrequencyDelta representing a new frequency was added. |
168
|
|
|
length is a bar LengthDelta which is a frequency and a bar_count. |
169
|
|
|
If any field is None, then no change occurred of that type. |
170
|
|
|
""" |
171
|
|
|
return super(HistoryContainerDelta, cls).__new__( |
172
|
|
|
cls, field, frequency_delta, length_delta, |
173
|
|
|
) |
174
|
|
|
|
175
|
|
|
@property |
176
|
|
|
def empty(self): |
177
|
|
|
""" |
178
|
|
|
Checks if the delta is empty. |
179
|
|
|
""" |
180
|
|
|
return (self.field is None and |
181
|
|
|
self.frequency_delta is None and |
182
|
|
|
self.length_delta is None) |
183
|
|
|
|
184
|
|
|
|
185
|
|
|
def normalize_to_data_freq(data_frequency, dt): |
186
|
|
|
if data_frequency == 'minute': |
187
|
|
|
return dt |
188
|
|
|
return pd.tslib.normalize_date(dt) |
189
|
|
|
|
190
|
|
|
|
191
|
|
|
class HistoryContainer(object): |
192
|
|
|
""" |
193
|
|
|
Container for all history panels and frames used by an algoscript. |
194
|
|
|
|
195
|
|
|
To be used internally by TradingAlgorithm, but *not* passed directly to the |
196
|
|
|
algorithm. |
197
|
|
|
|
198
|
|
|
Entry point for the algoscript is the result of `get_history`. |
199
|
|
|
""" |
200
|
|
|
VALID_FIELDS = { |
201
|
|
|
'price', 'open_price', 'volume', 'high', 'low', 'close_price', |
202
|
|
|
} |
203
|
|
|
|
204
|
|
|
def __init__(self, |
205
|
|
|
history_specs, |
206
|
|
|
initial_sids, |
207
|
|
|
initial_dt, |
208
|
|
|
data_frequency, |
209
|
|
|
env, |
210
|
|
|
bar_data=None): |
211
|
|
|
""" |
212
|
|
|
A container to hold a rolling window of historical data within a user's |
213
|
|
|
algorithm. |
214
|
|
|
|
215
|
|
|
Args: |
216
|
|
|
history_specs (dict[Frequency:HistorySpec]): The starting history |
217
|
|
|
specs that this container should be able to service. |
218
|
|
|
|
219
|
|
|
initial_sids (set[Asset or Int]): The starting sids to watch. |
220
|
|
|
|
221
|
|
|
initial_dt (datetime): The datetime to start collecting history from. |
222
|
|
|
|
223
|
|
|
bar_data (BarData): If this container is being constructed during |
224
|
|
|
handle_data, this is the BarData for the current bar to fill the |
225
|
|
|
buffer with. If this is constructed elsewhere, it is None. |
226
|
|
|
|
227
|
|
|
Returns: |
228
|
|
|
An instance of a new HistoryContainer |
229
|
|
|
""" |
230
|
|
|
|
231
|
|
|
# Store a reference to the env |
232
|
|
|
self.env = env |
233
|
|
|
|
234
|
|
|
# History specs to be served by this container. |
235
|
|
|
self.history_specs = history_specs |
236
|
|
|
self.largest_specs = compute_largest_specs( |
237
|
|
|
itervalues(self.history_specs) |
238
|
|
|
) |
239
|
|
|
|
240
|
|
|
# The set of fields specified by all history specs |
241
|
|
|
self.fields = pd.Index( |
242
|
|
|
sorted(set(spec.field for spec in itervalues(history_specs))) |
243
|
|
|
) |
244
|
|
|
self.sids = pd.Index( |
245
|
|
|
sorted(set(initial_sids or [])) |
246
|
|
|
) |
247
|
|
|
|
248
|
|
|
self.data_frequency = data_frequency |
249
|
|
|
|
250
|
|
|
initial_dt = normalize_to_data_freq(self.data_frequency, initial_dt) |
251
|
|
|
|
252
|
|
|
# This panel contains raw minutes for periods that haven't been fully |
253
|
|
|
# completed. When a frequency period rolls over, these minutes are |
254
|
|
|
# digested using some sort of aggregation call on the panel (e.g. `sum` |
255
|
|
|
# for volume, `max` for high, `min` for low, etc.). |
256
|
|
|
self.buffer_panel = self.create_buffer_panel(initial_dt, bar_data) |
257
|
|
|
|
258
|
|
|
# Dictionaries with Frequency objects as keys. |
259
|
|
|
self.digest_panels, self.cur_window_starts, self.cur_window_closes = \ |
260
|
|
|
self.create_digest_panels(initial_sids, initial_dt) |
261
|
|
|
|
262
|
|
|
# Helps prop up the prior day panel against having a nan, when the data |
263
|
|
|
# has been seen. |
264
|
|
|
self.last_known_prior_values = pd.DataFrame( |
265
|
|
|
data=None, |
266
|
|
|
index=self.prior_values_index, |
267
|
|
|
columns=self.prior_values_columns, |
268
|
|
|
# Note: For bizarre "intricacies of the spaghetti that is pandas |
269
|
|
|
# indexing logic" reasons, setting this dtype prevents indexing |
270
|
|
|
# errors in update_last_known_values. This is safe for the time |
271
|
|
|
# being because our only forward-fillable fields are floats. If we |
272
|
|
|
# need to add a non-float-typed forward-fillable field, then we may |
273
|
|
|
# find ourselves having to track down and fix a pandas bug. |
274
|
|
|
dtype=np.float64, |
275
|
|
|
) |
276
|
|
|
|
277
|
|
|
_ffillable_fields = None |
278
|
|
|
|
279
|
|
|
@property |
280
|
|
|
def ffillable_fields(self): |
281
|
|
|
if self._ffillable_fields is None: |
282
|
|
|
fillables = self.fields.intersection(HistorySpec.FORWARD_FILLABLE) |
283
|
|
|
self._ffillable_fields = fillables |
284
|
|
|
return self._ffillable_fields |
285
|
|
|
|
286
|
|
|
@property |
287
|
|
|
def prior_values_index(self): |
288
|
|
|
index_values = list( |
289
|
|
|
product( |
290
|
|
|
(freq.freq_str for freq in self.unique_frequencies), |
291
|
|
|
# Only store prior values for forward-fillable fields. |
292
|
|
|
self.ffillable_fields, |
293
|
|
|
) |
294
|
|
|
) |
295
|
|
|
if index_values: |
296
|
|
|
return pd.MultiIndex.from_tuples(index_values) |
297
|
|
|
else: |
298
|
|
|
# MultiIndex doesn't gracefully support empty input, so we return |
299
|
|
|
# an empty regular Index if we have values. |
300
|
|
|
return pd.Index(index_values) |
301
|
|
|
|
302
|
|
|
@property |
303
|
|
|
def prior_values_columns(self): |
304
|
|
|
return self.sids |
305
|
|
|
|
306
|
|
|
@property |
307
|
|
|
def all_panels(self): |
308
|
|
|
yield self.buffer_panel |
309
|
|
|
for panel in self.digest_panels.values(): |
310
|
|
|
yield panel |
311
|
|
|
|
312
|
|
|
@property |
313
|
|
|
def unique_frequencies(self): |
314
|
|
|
""" |
315
|
|
|
Return an iterator over all the unique frequencies serviced by this |
316
|
|
|
container. |
317
|
|
|
""" |
318
|
|
|
return iterkeys(self.largest_specs) |
319
|
|
|
|
320
|
|
|
def _add_frequency(self, spec, dt, data): |
321
|
|
|
""" |
322
|
|
|
Adds a new frequency to the container. This reshapes the buffer_panel |
323
|
|
|
if needed. |
324
|
|
|
""" |
325
|
|
|
freq = spec.frequency |
326
|
|
|
self.largest_specs[freq] = spec |
327
|
|
|
new_buffer_len = 0 |
328
|
|
|
|
329
|
|
|
if freq.max_bars > self.buffer_panel.window_length: |
330
|
|
|
# More bars need to be held in the buffer_panel to support this |
331
|
|
|
# freq |
332
|
|
|
if freq.data_frequency \ |
333
|
|
|
!= self.buffer_spec.frequency.data_frequency: |
334
|
|
|
# If the data_frequencies are not the same, then we need to |
335
|
|
|
# create a fresh buffer. |
336
|
|
|
self.buffer_panel = self.create_buffer_panel( |
337
|
|
|
dt, bar_data=data, |
338
|
|
|
) |
339
|
|
|
new_buffer_len = None |
340
|
|
|
else: |
341
|
|
|
# The frequencies are the same, we just need to add more bars. |
342
|
|
|
self._resize_panel( |
343
|
|
|
self.buffer_panel, |
344
|
|
|
freq.max_bars, |
345
|
|
|
dt, |
346
|
|
|
self.buffer_spec.frequency, |
347
|
|
|
) |
348
|
|
|
new_buffer_len = freq.max_minutes |
349
|
|
|
# update the current buffer_spec to reflect the new lenght. |
350
|
|
|
self.buffer_spec.bar_count = new_buffer_len + 1 |
351
|
|
|
|
352
|
|
|
if spec.bar_count > 1: |
353
|
|
|
# This spec has more than one bar, construct a digest panel for it. |
354
|
|
|
self.digest_panels[freq] = self._create_digest_panel(dt, spec=spec) |
355
|
|
|
else: |
356
|
|
|
self.cur_window_starts[freq] = dt |
357
|
|
|
self.cur_window_closes[freq] = freq.window_close( |
358
|
|
|
self.cur_window_starts[freq] |
359
|
|
|
) |
360
|
|
|
|
361
|
|
|
self.last_known_prior_values = self.last_known_prior_values.reindex( |
362
|
|
|
index=self.prior_values_index, |
363
|
|
|
) |
364
|
|
|
|
365
|
|
|
return FrequencyDelta(freq, new_buffer_len) |
366
|
|
|
|
367
|
|
|
def _add_field(self, field): |
368
|
|
|
""" |
369
|
|
|
Adds a new field to the container. |
370
|
|
|
""" |
371
|
|
|
# self.fields is already sorted, so we just need to insert the new |
372
|
|
|
# field in the correct index. |
373
|
|
|
ls = list(self.fields) |
374
|
|
|
insort_left(ls, field) |
375
|
|
|
self.fields = pd.Index(ls) |
376
|
|
|
# unset fillable fields cache |
377
|
|
|
self._ffillable_fields = None |
378
|
|
|
|
379
|
|
|
self._realign_fields() |
380
|
|
|
self.last_known_prior_values = self.last_known_prior_values.reindex( |
381
|
|
|
index=self.prior_values_index, |
382
|
|
|
) |
383
|
|
|
return field |
384
|
|
|
|
385
|
|
|
def _add_length(self, spec, dt): |
386
|
|
|
""" |
387
|
|
|
Increases the length of the digest panel for spec.frequency. If this |
388
|
|
|
does not have a panel, and one is needed; a digest panel will be |
389
|
|
|
constructed. |
390
|
|
|
""" |
391
|
|
|
old_count = self.largest_specs[spec.frequency].bar_count |
392
|
|
|
self.largest_specs[spec.frequency] = spec |
393
|
|
|
delta = spec.bar_count - old_count |
394
|
|
|
|
395
|
|
|
panel = self.digest_panels.get(spec.frequency) |
396
|
|
|
|
397
|
|
|
if panel is None: |
398
|
|
|
# The old length for this frequency was 1 bar, meaning no digest |
399
|
|
|
# panel was held. We must construct a new one here. |
400
|
|
|
panel = self._create_digest_panel(dt, spec=spec) |
401
|
|
|
|
402
|
|
|
else: |
403
|
|
|
self._resize_panel(panel, spec.bar_count - 1, dt, |
404
|
|
|
freq=spec.frequency) |
405
|
|
|
|
406
|
|
|
self.digest_panels[spec.frequency] = panel |
407
|
|
|
|
408
|
|
|
return LengthDelta(spec.frequency, delta) |
409
|
|
|
|
410
|
|
|
def _resize_panel(self, panel, size, dt, freq): |
411
|
|
|
""" |
412
|
|
|
Resizes a panel, fills the date_buf with the correct values. |
413
|
|
|
""" |
414
|
|
|
# This is the oldest datetime that will be shown in the current window |
415
|
|
|
# of the panel. |
416
|
|
|
oldest_dt = pd.Timestamp(panel.start_date, tz='utc',) |
417
|
|
|
delta = size - panel.window_length |
418
|
|
|
|
419
|
|
|
# Construct the missing dates. |
420
|
|
|
missing_dts = self._create_window_date_buf( |
421
|
|
|
delta, freq.unit_str, freq.data_frequency, oldest_dt, |
422
|
|
|
) |
423
|
|
|
|
424
|
|
|
panel.extend_back(missing_dts) |
425
|
|
|
|
426
|
|
|
def _create_window_date_buf(self, |
427
|
|
|
window, |
428
|
|
|
unit_str, |
429
|
|
|
data_frequency, |
430
|
|
|
dt): |
431
|
|
|
""" |
432
|
|
|
Creates a window length date_buf looking backwards from dt. |
433
|
|
|
""" |
434
|
|
|
if unit_str == 'd': |
435
|
|
|
# Get the properly key'd datetime64 out of the pandas Timestamp |
436
|
|
|
if data_frequency != 'daily': |
437
|
|
|
arr = self.env.open_close_window( |
438
|
|
|
dt, |
439
|
|
|
window, |
440
|
|
|
offset=-window, |
441
|
|
|
).market_close.astype('datetime64[ns]').values |
442
|
|
|
else: |
443
|
|
|
arr = self.env.open_close_window( |
444
|
|
|
dt, |
445
|
|
|
window, |
446
|
|
|
offset=-window, |
447
|
|
|
).index.values |
448
|
|
|
|
449
|
|
|
return arr |
450
|
|
|
else: |
451
|
|
|
return self.env.market_minute_window( |
452
|
|
|
self.env.previous_market_minute(dt), |
453
|
|
|
window, |
454
|
|
|
step=-1, |
455
|
|
|
)[::-1].values |
456
|
|
|
|
457
|
|
|
def _create_panel(self, dt, spec): |
458
|
|
|
""" |
459
|
|
|
Constructs a rolling panel with a properly aligned date_buf. |
460
|
|
|
""" |
461
|
|
|
dt = normalize_to_data_freq(spec.frequency.data_frequency, dt) |
462
|
|
|
|
463
|
|
|
window = spec.bar_count - 1 |
464
|
|
|
|
465
|
|
|
date_buf = self._create_window_date_buf( |
466
|
|
|
window, |
467
|
|
|
spec.frequency.unit_str, |
468
|
|
|
spec.frequency.data_frequency, |
469
|
|
|
dt, |
470
|
|
|
) |
471
|
|
|
|
472
|
|
|
panel = RollingPanel( |
473
|
|
|
window=window, |
474
|
|
|
items=self.fields, |
475
|
|
|
sids=self.sids, |
476
|
|
|
initial_dates=date_buf, |
477
|
|
|
) |
478
|
|
|
|
479
|
|
|
return panel |
480
|
|
|
|
481
|
|
|
def _create_digest_panel(self, |
482
|
|
|
dt, |
483
|
|
|
spec, |
484
|
|
|
window_starts=None, |
485
|
|
|
window_closes=None): |
486
|
|
|
""" |
487
|
|
|
Creates a digest panel, setting the window_starts and window_closes. |
488
|
|
|
If window_starts or window_closes are None, then self.cur_window_starts |
489
|
|
|
or self.cur_window_closes will be used. |
490
|
|
|
""" |
491
|
|
|
freq = spec.frequency |
492
|
|
|
|
493
|
|
|
window_starts = window_starts if window_starts is not None \ |
494
|
|
|
else self.cur_window_starts |
495
|
|
|
window_closes = window_closes if window_closes is not None \ |
496
|
|
|
else self.cur_window_closes |
497
|
|
|
|
498
|
|
|
window_starts[freq] = freq.normalize(dt) |
499
|
|
|
window_closes[freq] = freq.window_close(window_starts[freq]) |
500
|
|
|
|
501
|
|
|
return self._create_panel(dt, spec) |
502
|
|
|
|
503
|
|
|
def ensure_spec(self, spec, dt, bar_data): |
504
|
|
|
""" |
505
|
|
|
Ensure that this container has enough space to hold the data for the |
506
|
|
|
given spec. This returns a HistoryContainerDelta to represent the |
507
|
|
|
changes in shape that the container made to support the new |
508
|
|
|
HistorySpec. |
509
|
|
|
""" |
510
|
|
|
updated = {} |
511
|
|
|
if spec.field not in self.fields: |
512
|
|
|
updated['field'] = self._add_field(spec.field) |
513
|
|
|
if spec.frequency not in self.largest_specs: |
514
|
|
|
updated['frequency_delta'] = self._add_frequency( |
515
|
|
|
spec, dt, bar_data, |
516
|
|
|
) |
517
|
|
|
if spec.bar_count > self.largest_specs[spec.frequency].bar_count: |
518
|
|
|
updated['length_delta'] = self._add_length(spec, dt) |
519
|
|
|
return HistoryContainerDelta(**updated) |
520
|
|
|
|
521
|
|
|
def add_sids(self, to_add): |
522
|
|
|
""" |
523
|
|
|
Add new sids to the container. |
524
|
|
|
""" |
525
|
|
|
self.sids = pd.Index( |
526
|
|
|
sorted(self.sids.union(_ensure_index(to_add))), |
527
|
|
|
) |
528
|
|
|
self._realign_sids() |
529
|
|
|
|
530
|
|
|
def drop_sids(self, to_drop): |
531
|
|
|
""" |
532
|
|
|
Remove sids from the container. |
533
|
|
|
""" |
534
|
|
|
self.sids = pd.Index( |
535
|
|
|
sorted(self.sids.difference(_ensure_index(to_drop))), |
536
|
|
|
) |
537
|
|
|
self._realign_sids() |
538
|
|
|
|
539
|
|
|
def _realign_sids(self): |
540
|
|
|
""" |
541
|
|
|
Realign our constituent panels after adding or removing sids. |
542
|
|
|
""" |
543
|
|
|
self.last_known_prior_values = self.last_known_prior_values.reindex( |
544
|
|
|
columns=self.sids, |
545
|
|
|
) |
546
|
|
|
for panel in self.all_panels: |
547
|
|
|
panel.set_minor_axis(self.sids) |
548
|
|
|
|
549
|
|
|
def _realign_fields(self): |
550
|
|
|
self.last_known_prior_values = self.last_known_prior_values.reindex( |
551
|
|
|
index=self.prior_values_index, |
552
|
|
|
) |
553
|
|
|
for panel in self.all_panels: |
554
|
|
|
panel.set_items(self.fields) |
555
|
|
|
|
556
|
|
|
def create_digest_panels(self, |
557
|
|
|
initial_sids, |
558
|
|
|
initial_dt): |
559
|
|
|
""" |
560
|
|
|
Initialize a RollingPanel for each unique panel frequency being stored |
561
|
|
|
by this container. Each RollingPanel pre-allocates enough storage |
562
|
|
|
space to service the highest bar-count of any history call that it |
563
|
|
|
serves. |
564
|
|
|
""" |
565
|
|
|
# Map from frequency -> first/last minute of the next digest to be |
566
|
|
|
# rolled for that frequency. |
567
|
|
|
first_window_starts = {} |
568
|
|
|
first_window_closes = {} |
569
|
|
|
|
570
|
|
|
# Map from frequency -> digest_panels. |
571
|
|
|
panels = {} |
572
|
|
|
for freq, largest_spec in iteritems(self.largest_specs): |
573
|
|
|
if largest_spec.bar_count == 1: |
574
|
|
|
# No need to allocate a digest panel; this frequency will only |
575
|
|
|
# ever use data drawn from self.buffer_panel. |
576
|
|
|
first_window_starts[freq] = freq.normalize(initial_dt) |
577
|
|
|
first_window_closes[freq] = freq.window_close( |
578
|
|
|
first_window_starts[freq] |
579
|
|
|
) |
580
|
|
|
|
581
|
|
|
continue |
582
|
|
|
|
583
|
|
|
dt = initial_dt |
584
|
|
|
|
585
|
|
|
rp = self._create_digest_panel( |
586
|
|
|
dt, |
587
|
|
|
spec=largest_spec, |
588
|
|
|
window_starts=first_window_starts, |
589
|
|
|
window_closes=first_window_closes, |
590
|
|
|
) |
591
|
|
|
|
592
|
|
|
panels[freq] = rp |
593
|
|
|
|
594
|
|
|
return panels, first_window_starts, first_window_closes |
595
|
|
|
|
596
|
|
|
def create_buffer_panel(self, initial_dt, bar_data): |
597
|
|
|
""" |
598
|
|
|
Initialize a RollingPanel containing enough minutes to service all our |
599
|
|
|
frequencies. |
600
|
|
|
""" |
601
|
|
|
max_bars_needed = max( |
602
|
|
|
freq.max_bars for freq in self.unique_frequencies |
603
|
|
|
) |
604
|
|
|
freq = '1m' if self.data_frequency == 'minute' else '1d' |
605
|
|
|
spec = HistorySpec( |
606
|
|
|
max_bars_needed + 1, freq, None, None, self.env, |
607
|
|
|
self.data_frequency, |
608
|
|
|
) |
609
|
|
|
|
610
|
|
|
rp = self._create_panel( |
611
|
|
|
initial_dt, spec, |
612
|
|
|
) |
613
|
|
|
self.buffer_spec = spec |
614
|
|
|
|
615
|
|
|
if bar_data is not None: |
616
|
|
|
frame = self.frame_from_bardata(bar_data, initial_dt) |
617
|
|
|
rp.add_frame(initial_dt, frame) |
618
|
|
|
|
619
|
|
|
return rp |
620
|
|
|
|
621
|
|
|
def convert_columns(self, values): |
622
|
|
|
""" |
623
|
|
|
If columns have a specific type you want to enforce, overwrite this |
624
|
|
|
method and return the transformed values. |
625
|
|
|
""" |
626
|
|
|
return values |
627
|
|
|
|
628
|
|
|
def digest_bars(self, history_spec, do_ffill): |
629
|
|
|
""" |
630
|
|
|
Get the last (history_spec.bar_count - 1) bars from self.digest_panel |
631
|
|
|
for the requested HistorySpec. |
632
|
|
|
""" |
633
|
|
|
bar_count = history_spec.bar_count |
634
|
|
|
if bar_count == 1: |
635
|
|
|
# slicing with [1 - bar_count:] doesn't work when bar_count == 1, |
636
|
|
|
# so special-casing this. |
637
|
|
|
res = pd.DataFrame(index=[], columns=self.sids, dtype=float) |
638
|
|
|
return res.values, res.index |
639
|
|
|
|
640
|
|
|
field = history_spec.field |
641
|
|
|
|
642
|
|
|
# Panel axes are (field, dates, sids). We want just the entries for |
643
|
|
|
# the requested field, the last (bar_count - 1) data points, and all |
644
|
|
|
# sids. |
645
|
|
|
digest_panel = self.digest_panels[history_spec.frequency] |
646
|
|
|
frame = digest_panel.get_current(field, raw=True) |
647
|
|
|
if do_ffill: |
648
|
|
|
# Do forward-filling *before* truncating down to the requested |
649
|
|
|
# number of bars. This protects us from losing data if an illiquid |
650
|
|
|
# stock has a gap in its price history. |
651
|
|
|
filled = ffill_digest_frame_from_prior_values( |
652
|
|
|
history_spec.frequency, |
653
|
|
|
history_spec.field, |
654
|
|
|
frame, |
655
|
|
|
self.last_known_prior_values, |
656
|
|
|
raw=True |
657
|
|
|
# Truncate only after we've forward-filled |
658
|
|
|
) |
659
|
|
|
indexer = slice(1 - bar_count, None) |
660
|
|
|
return filled[indexer], digest_panel.current_dates()[indexer] |
661
|
|
|
else: |
662
|
|
|
indexer = slice(1 - bar_count, None) |
663
|
|
|
return frame[indexer, :], digest_panel.current_dates()[indexer] |
664
|
|
|
|
665
|
|
|
def buffer_panel_minutes(self, |
666
|
|
|
buffer_panel, |
667
|
|
|
earliest_minute=None, |
668
|
|
|
latest_minute=None, |
669
|
|
|
raw=False): |
670
|
|
|
""" |
671
|
|
|
Get the minutes in @buffer_panel between @earliest_minute and |
672
|
|
|
@latest_minute, inclusive. |
673
|
|
|
|
674
|
|
|
@buffer_panel can be a RollingPanel or a plain Panel. If a |
675
|
|
|
RollingPanel is supplied, we call `get_current` to extract a Panel |
676
|
|
|
object. |
677
|
|
|
|
678
|
|
|
If no value is specified for @earliest_minute, use all the minutes we |
679
|
|
|
have up until @latest minute. |
680
|
|
|
|
681
|
|
|
If no value for @latest_minute is specified, use all values up until |
682
|
|
|
the latest minute. |
683
|
|
|
""" |
684
|
|
|
if isinstance(buffer_panel, RollingPanel): |
685
|
|
|
buffer_panel = buffer_panel.get_current(start=earliest_minute, |
686
|
|
|
end=latest_minute, |
687
|
|
|
raw=raw) |
688
|
|
|
return buffer_panel |
689
|
|
|
# Using .ix here rather than .loc because loc requires that the keys |
690
|
|
|
# are actually in the index, whereas .ix returns all the values between |
691
|
|
|
# earliest_minute and latest_minute, which is what we want. |
692
|
|
|
return buffer_panel.ix[:, earliest_minute:latest_minute, :] |
693
|
|
|
|
694
|
|
|
def frame_from_bardata(self, data, algo_dt): |
695
|
|
|
""" |
696
|
|
|
Create a DataFrame from the given BarData and algo dt. |
697
|
|
|
""" |
698
|
|
|
data = data._data |
699
|
|
|
frame_data = np.empty((len(self.fields), len(self.sids))) * np.nan |
700
|
|
|
|
701
|
|
|
for j, sid in enumerate(self.sids): |
702
|
|
|
sid_data = data.get(sid) |
703
|
|
|
if not sid_data: |
704
|
|
|
continue |
705
|
|
|
if algo_dt != sid_data['dt']: |
706
|
|
|
continue |
707
|
|
|
for i, field in enumerate(self.fields): |
708
|
|
|
frame_data[i, j] = sid_data.get(field, np.nan) |
709
|
|
|
|
710
|
|
|
return pd.DataFrame( |
711
|
|
|
frame_data, |
712
|
|
|
index=self.fields.copy(), |
713
|
|
|
columns=self.sids.copy(), |
714
|
|
|
) |
715
|
|
|
|
716
|
|
|
def update(self, data, algo_dt): |
717
|
|
|
""" |
718
|
|
|
Takes the bar at @algo_dt's @data, checks to see if we need to roll any |
719
|
|
|
new digests, then adds new data to the buffer panel. |
720
|
|
|
""" |
721
|
|
|
frame = self.frame_from_bardata(data, algo_dt) |
722
|
|
|
|
723
|
|
|
self.update_last_known_values() |
724
|
|
|
self.update_digest_panels(algo_dt, self.buffer_panel) |
725
|
|
|
self.buffer_panel.add_frame(algo_dt, frame) |
726
|
|
|
|
727
|
|
|
def update_digest_panels(self, algo_dt, buffer_panel, freq_filter=None): |
728
|
|
|
""" |
729
|
|
|
Check whether @algo_dt is greater than cur_window_close for any of our |
730
|
|
|
frequencies. If so, roll a digest for that frequency using data drawn |
731
|
|
|
from @buffer panel and insert it into the appropriate digest panels. |
732
|
|
|
|
733
|
|
|
If @freq_filter is specified, only use the given data to update |
734
|
|
|
frequencies on which the filter returns True. |
735
|
|
|
|
736
|
|
|
This takes `buffer_panel` as an argument rather than using |
737
|
|
|
self.buffer_panel so that this method can be used to add supplemental |
738
|
|
|
data from an external source. |
739
|
|
|
""" |
740
|
|
|
for frequency in filter(freq_filter, self.unique_frequencies): |
741
|
|
|
|
742
|
|
|
# We don't keep a digest panel if we only have a length-1 history |
743
|
|
|
# spec for a given frequency |
744
|
|
|
digest_panel = self.digest_panels.get(frequency, None) |
745
|
|
|
|
746
|
|
|
while algo_dt > self.cur_window_closes[frequency]: |
747
|
|
|
|
748
|
|
|
earliest_minute = self.cur_window_starts[frequency] |
749
|
|
|
latest_minute = self.cur_window_closes[frequency] |
750
|
|
|
minutes_to_process = self.buffer_panel_minutes( |
751
|
|
|
buffer_panel, |
752
|
|
|
earliest_minute=earliest_minute, |
753
|
|
|
latest_minute=latest_minute, |
754
|
|
|
raw=True |
755
|
|
|
) |
756
|
|
|
|
757
|
|
|
if digest_panel is not None: |
758
|
|
|
# Create a digest from minutes_to_process and add it to |
759
|
|
|
# digest_panel. |
760
|
|
|
digest_frame = self.create_new_digest_frame( |
761
|
|
|
minutes_to_process, |
762
|
|
|
self.fields, |
763
|
|
|
self.sids |
764
|
|
|
) |
765
|
|
|
digest_panel.add_frame( |
766
|
|
|
latest_minute, |
767
|
|
|
digest_frame, |
768
|
|
|
self.fields, |
769
|
|
|
self.sids |
770
|
|
|
) |
771
|
|
|
|
772
|
|
|
# Update panel start/close for this frequency. |
773
|
|
|
self.cur_window_starts[frequency] = \ |
774
|
|
|
frequency.next_window_start(latest_minute) |
775
|
|
|
self.cur_window_closes[frequency] = \ |
776
|
|
|
frequency.window_close(self.cur_window_starts[frequency]) |
777
|
|
|
|
778
|
|
|
def frame_to_series(self, field, frame, columns=None): |
779
|
|
|
""" |
780
|
|
|
Convert a frame with a DatetimeIndex and sid columns into a series with |
781
|
|
|
a sid index, using the aggregator defined by the given field. |
782
|
|
|
""" |
783
|
|
|
if isinstance(frame, pd.DataFrame): |
784
|
|
|
columns = frame.columns |
785
|
|
|
frame = frame.values |
786
|
|
|
|
787
|
|
|
if not len(frame): |
788
|
|
|
return pd.Series( |
789
|
|
|
data=(0 if field == 'volume' else np.nan), |
790
|
|
|
index=columns, |
791
|
|
|
).values |
792
|
|
|
|
793
|
|
|
if field in ['price', 'close_price']: |
794
|
|
|
# shortcircuit for full last row |
795
|
|
|
vals = frame[-1] |
796
|
|
|
if np.all(~np.isnan(vals)): |
797
|
|
|
return vals |
798
|
|
|
return ffill(frame)[-1] |
799
|
|
|
elif field == 'open_price': |
800
|
|
|
return bfill(frame)[0] |
801
|
|
|
elif field == 'volume': |
802
|
|
|
return np.nansum(frame, axis=0) |
803
|
|
|
elif field == 'high': |
804
|
|
|
return np.nanmax(frame, axis=0) |
805
|
|
|
elif field == 'low': |
806
|
|
|
return np.nanmin(frame, axis=0) |
807
|
|
|
else: |
808
|
|
|
raise ValueError("Unknown field {}".format(field)) |
809
|
|
|
|
810
|
|
|
def aggregate_ohlcv_panel(self, |
811
|
|
|
fields, |
812
|
|
|
ohlcv_panel, |
813
|
|
|
items=None, |
814
|
|
|
minor_axis=None): |
815
|
|
|
""" |
816
|
|
|
Convert an OHLCV Panel into a DataFrame by aggregating each field's |
817
|
|
|
frame into a Series. |
818
|
|
|
""" |
819
|
|
|
vals = ohlcv_panel |
820
|
|
|
if isinstance(ohlcv_panel, pd.Panel): |
821
|
|
|
vals = ohlcv_panel.values |
822
|
|
|
items = ohlcv_panel.items |
823
|
|
|
minor_axis = ohlcv_panel.minor_axis |
824
|
|
|
|
825
|
|
|
data = [ |
826
|
|
|
self.frame_to_series( |
827
|
|
|
field, |
828
|
|
|
vals[items.get_loc(field)], |
829
|
|
|
minor_axis |
830
|
|
|
) |
831
|
|
|
for field in fields |
832
|
|
|
] |
833
|
|
|
return np.array(data) |
834
|
|
|
|
835
|
|
|
def create_new_digest_frame(self, buffer_minutes, items=None, |
836
|
|
|
minor_axis=None): |
837
|
|
|
""" |
838
|
|
|
Package up minutes in @buffer_minutes into a single digest frame. |
839
|
|
|
""" |
840
|
|
|
return self.aggregate_ohlcv_panel( |
841
|
|
|
self.fields, |
842
|
|
|
buffer_minutes, |
843
|
|
|
items=items, |
844
|
|
|
minor_axis=minor_axis |
845
|
|
|
) |
846
|
|
|
|
847
|
|
|
def update_last_known_values(self): |
848
|
|
|
""" |
849
|
|
|
Store the non-NaN values from our oldest frame in each frequency. |
850
|
|
|
""" |
851
|
|
|
ffillable = self.ffillable_fields |
852
|
|
|
if not len(ffillable): |
853
|
|
|
return |
854
|
|
|
|
855
|
|
|
for frequency in self.unique_frequencies: |
856
|
|
|
digest_panel = self.digest_panels.get(frequency, None) |
857
|
|
|
if digest_panel: |
858
|
|
|
oldest_known_values = digest_panel.oldest_frame(raw=True) |
859
|
|
|
else: |
860
|
|
|
oldest_known_values = self.buffer_panel.oldest_frame(raw=True) |
861
|
|
|
|
862
|
|
|
oldest_vals = oldest_known_values |
863
|
|
|
oldest_columns = self.fields |
864
|
|
|
for field in ffillable: |
865
|
|
|
f_idx = oldest_columns.get_loc(field) |
866
|
|
|
field_vals = oldest_vals[f_idx] |
867
|
|
|
# isnan would be fast, possible to use? |
868
|
|
|
non_nan_sids = np.where(pd.notnull(field_vals)) |
869
|
|
|
key = (frequency.freq_str, field) |
870
|
|
|
key_loc = self.last_known_prior_values.index.get_loc(key) |
871
|
|
|
self.last_known_prior_values.values[ |
872
|
|
|
key_loc, non_nan_sids |
873
|
|
|
] = field_vals[non_nan_sids] |
874
|
|
|
|
875
|
|
|
def get_history(self, history_spec, algo_dt): |
876
|
|
|
""" |
877
|
|
|
Main API used by the algoscript is mapped to this function. |
878
|
|
|
|
879
|
|
|
Selects from the overarching history panel the values for the |
880
|
|
|
@history_spec at the given @algo_dt. |
881
|
|
|
""" |
882
|
|
|
field = history_spec.field |
883
|
|
|
do_ffill = history_spec.ffill |
884
|
|
|
|
885
|
|
|
# Get our stored values from periods prior to the current period. |
886
|
|
|
digest_frame, index = self.digest_bars(history_spec, do_ffill) |
887
|
|
|
|
888
|
|
|
# Get minutes from our buffer panel to build the last row of the |
889
|
|
|
# returned frame. |
890
|
|
|
buffer_panel = self.buffer_panel_minutes( |
891
|
|
|
self.buffer_panel, |
892
|
|
|
earliest_minute=self.cur_window_starts[history_spec.frequency], |
893
|
|
|
raw=True |
894
|
|
|
) |
895
|
|
|
buffer_frame = buffer_panel[self.fields.get_loc(field)] |
896
|
|
|
|
897
|
|
|
if do_ffill: |
898
|
|
|
buffer_frame = ffill_buffer_from_prior_values( |
899
|
|
|
history_spec.frequency, |
900
|
|
|
field, |
901
|
|
|
buffer_frame, |
902
|
|
|
digest_frame, |
903
|
|
|
self.last_known_prior_values, |
904
|
|
|
raw=True |
905
|
|
|
) |
906
|
|
|
last_period = self.frame_to_series(field, buffer_frame, self.sids) |
907
|
|
|
return fast_build_history_output(digest_frame, |
908
|
|
|
last_period, |
909
|
|
|
algo_dt, |
910
|
|
|
index=index, |
911
|
|
|
columns=self.sids) |
912
|
|
|
|
913
|
|
|
|
914
|
|
|
def fast_build_history_output(buffer_frame, |
915
|
|
|
last_period, |
916
|
|
|
algo_dt, |
917
|
|
|
index=None, |
918
|
|
|
columns=None): |
919
|
|
|
""" |
920
|
|
|
Optimized concatenation of DataFrame and Series for use in |
921
|
|
|
HistoryContainer.get_history. |
922
|
|
|
|
923
|
|
|
Relies on the fact that the input arrays have compatible shapes. |
924
|
|
|
""" |
925
|
|
|
buffer_values = buffer_frame |
926
|
|
|
if isinstance(buffer_frame, pd.DataFrame): |
927
|
|
|
buffer_values = buffer_frame.values |
928
|
|
|
index = buffer_frame.index |
929
|
|
|
columns = buffer_frame.columns |
930
|
|
|
|
931
|
|
|
return pd.DataFrame( |
932
|
|
|
data=np.vstack( |
933
|
|
|
[ |
934
|
|
|
buffer_values, |
935
|
|
|
last_period, |
936
|
|
|
] |
937
|
|
|
), |
938
|
|
|
index=fast_append_date_to_index( |
939
|
|
|
index, |
940
|
|
|
pd.Timestamp(algo_dt) |
941
|
|
|
), |
942
|
|
|
columns=columns, |
943
|
|
|
) |
944
|
|
|
|
945
|
|
|
|
946
|
|
|
def fast_append_date_to_index(index, timestamp): |
947
|
|
|
""" |
948
|
|
|
Append a timestamp to a DatetimeIndex. DatetimeIndex.append does not |
949
|
|
|
appear to work. |
950
|
|
|
""" |
951
|
|
|
return pd.DatetimeIndex( |
952
|
|
|
np.hstack( |
953
|
|
|
[ |
954
|
|
|
index.values, |
955
|
|
|
[timestamp.asm8], |
956
|
|
|
] |
957
|
|
|
), |
958
|
|
|
tz='UTC', |
959
|
|
|
) |
960
|
|
|
|