Total Complexity | 80 |
Total Lines | 721 |
Duplicated Lines | 0 % |
Complex classes like zipline.history.HistoryContainer often do a lot of different things. To break such a class down, we need to identify a cohesive component within that class. A common approach to find such a component is to look for fields/methods that share the same prefixes, or suffixes.
Once you have determined the fields that belong together, you can apply the Extract Class refactoring. If the component makes sense as a sub-class, Extract Subclass is also a candidate, and is often faster.
1 | # |
||
191 | class HistoryContainer(object): |
||
192 | """ |
||
193 | Container for all history panels and frames used by an algoscript. |
||
194 | |||
195 | To be used internally by TradingAlgorithm, but *not* passed directly to the |
||
196 | algorithm. |
||
197 | |||
198 | Entry point for the algoscript is the result of `get_history`. |
||
199 | """ |
||
200 | VALID_FIELDS = { |
||
201 | 'price', 'open_price', 'volume', 'high', 'low', 'close_price', |
||
202 | } |
||
203 | |||
204 | def __init__(self, |
||
205 | history_specs, |
||
206 | initial_sids, |
||
207 | initial_dt, |
||
208 | data_frequency, |
||
209 | env, |
||
210 | bar_data=None): |
||
211 | """ |
||
212 | A container to hold a rolling window of historical data within a user's |
||
213 | algorithm. |
||
214 | |||
215 | Args: |
||
216 | history_specs (dict[Frequency:HistorySpec]): The starting history |
||
217 | specs that this container should be able to service. |
||
218 | |||
219 | initial_sids (set[Asset or Int]): The starting sids to watch. |
||
220 | |||
221 | initial_dt (datetime): The datetime to start collecting history from. |
||
222 | |||
223 | bar_data (BarData): If this container is being constructed during |
||
224 | handle_data, this is the BarData for the current bar to fill the |
||
225 | buffer with. If this is constructed elsewhere, it is None. |
||
226 | |||
227 | Returns: |
||
228 | An instance of a new HistoryContainer |
||
229 | """ |
||
230 | |||
231 | # Store a reference to the env |
||
232 | self.env = env |
||
233 | |||
234 | # History specs to be served by this container. |
||
235 | self.history_specs = history_specs |
||
236 | self.largest_specs = compute_largest_specs( |
||
237 | itervalues(self.history_specs) |
||
238 | ) |
||
239 | |||
240 | # The set of fields specified by all history specs |
||
241 | self.fields = pd.Index( |
||
242 | sorted(set(spec.field for spec in itervalues(history_specs))) |
||
243 | ) |
||
244 | self.sids = pd.Index( |
||
245 | sorted(set(initial_sids or [])) |
||
246 | ) |
||
247 | |||
248 | self.data_frequency = data_frequency |
||
249 | |||
250 | initial_dt = normalize_to_data_freq(self.data_frequency, initial_dt) |
||
251 | |||
252 | # This panel contains raw minutes for periods that haven't been fully |
||
253 | # completed. When a frequency period rolls over, these minutes are |
||
254 | # digested using some sort of aggregation call on the panel (e.g. `sum` |
||
255 | # for volume, `max` for high, `min` for low, etc.). |
||
256 | self.buffer_panel = self.create_buffer_panel(initial_dt, bar_data) |
||
257 | |||
258 | # Dictionaries with Frequency objects as keys. |
||
259 | self.digest_panels, self.cur_window_starts, self.cur_window_closes = \ |
||
260 | self.create_digest_panels(initial_sids, initial_dt) |
||
261 | |||
262 | # Helps prop up the prior day panel against having a nan, when the data |
||
263 | # has been seen. |
||
264 | self.last_known_prior_values = pd.DataFrame( |
||
265 | data=None, |
||
266 | index=self.prior_values_index, |
||
267 | columns=self.prior_values_columns, |
||
268 | # Note: For bizarre "intricacies of the spaghetti that is pandas |
||
269 | # indexing logic" reasons, setting this dtype prevents indexing |
||
270 | # errors in update_last_known_values. This is safe for the time |
||
271 | # being because our only forward-fillable fields are floats. If we |
||
272 | # need to add a non-float-typed forward-fillable field, then we may |
||
273 | # find ourselves having to track down and fix a pandas bug. |
||
274 | dtype=np.float64, |
||
275 | ) |
||
276 | |||
277 | _ffillable_fields = None |
||
278 | |||
279 | @property |
||
280 | def ffillable_fields(self): |
||
281 | if self._ffillable_fields is None: |
||
282 | fillables = self.fields.intersection(HistorySpec.FORWARD_FILLABLE) |
||
283 | self._ffillable_fields = fillables |
||
284 | return self._ffillable_fields |
||
285 | |||
286 | @property |
||
287 | def prior_values_index(self): |
||
288 | index_values = list( |
||
289 | product( |
||
290 | (freq.freq_str for freq in self.unique_frequencies), |
||
291 | # Only store prior values for forward-fillable fields. |
||
292 | self.ffillable_fields, |
||
293 | ) |
||
294 | ) |
||
295 | if index_values: |
||
296 | return pd.MultiIndex.from_tuples(index_values) |
||
297 | else: |
||
298 | # MultiIndex doesn't gracefully support empty input, so we return |
||
299 | # an empty regular Index if we have values. |
||
300 | return pd.Index(index_values) |
||
301 | |||
302 | @property |
||
303 | def prior_values_columns(self): |
||
304 | return self.sids |
||
305 | |||
306 | @property |
||
307 | def all_panels(self): |
||
308 | yield self.buffer_panel |
||
309 | for panel in self.digest_panels.values(): |
||
310 | yield panel |
||
311 | |||
312 | @property |
||
313 | def unique_frequencies(self): |
||
314 | """ |
||
315 | Return an iterator over all the unique frequencies serviced by this |
||
316 | container. |
||
317 | """ |
||
318 | return iterkeys(self.largest_specs) |
||
319 | |||
320 | def _add_frequency(self, spec, dt, data): |
||
321 | """ |
||
322 | Adds a new frequency to the container. This reshapes the buffer_panel |
||
323 | if needed. |
||
324 | """ |
||
325 | freq = spec.frequency |
||
326 | self.largest_specs[freq] = spec |
||
327 | new_buffer_len = 0 |
||
328 | |||
329 | if freq.max_bars > self.buffer_panel.window_length: |
||
330 | # More bars need to be held in the buffer_panel to support this |
||
331 | # freq |
||
332 | if freq.data_frequency \ |
||
333 | != self.buffer_spec.frequency.data_frequency: |
||
334 | # If the data_frequencies are not the same, then we need to |
||
335 | # create a fresh buffer. |
||
336 | self.buffer_panel = self.create_buffer_panel( |
||
337 | dt, bar_data=data, |
||
338 | ) |
||
339 | new_buffer_len = None |
||
340 | else: |
||
341 | # The frequencies are the same, we just need to add more bars. |
||
342 | self._resize_panel( |
||
343 | self.buffer_panel, |
||
344 | freq.max_bars, |
||
345 | dt, |
||
346 | self.buffer_spec.frequency, |
||
347 | ) |
||
348 | new_buffer_len = freq.max_minutes |
||
349 | # update the current buffer_spec to reflect the new lenght. |
||
350 | self.buffer_spec.bar_count = new_buffer_len + 1 |
||
351 | |||
352 | if spec.bar_count > 1: |
||
353 | # This spec has more than one bar, construct a digest panel for it. |
||
354 | self.digest_panels[freq] = self._create_digest_panel(dt, spec=spec) |
||
355 | else: |
||
356 | self.cur_window_starts[freq] = dt |
||
357 | self.cur_window_closes[freq] = freq.window_close( |
||
358 | self.cur_window_starts[freq] |
||
359 | ) |
||
360 | |||
361 | self.last_known_prior_values = self.last_known_prior_values.reindex( |
||
362 | index=self.prior_values_index, |
||
363 | ) |
||
364 | |||
365 | return FrequencyDelta(freq, new_buffer_len) |
||
366 | |||
367 | def _add_field(self, field): |
||
368 | """ |
||
369 | Adds a new field to the container. |
||
370 | """ |
||
371 | # self.fields is already sorted, so we just need to insert the new |
||
372 | # field in the correct index. |
||
373 | ls = list(self.fields) |
||
374 | insort_left(ls, field) |
||
375 | self.fields = pd.Index(ls) |
||
376 | # unset fillable fields cache |
||
377 | self._ffillable_fields = None |
||
378 | |||
379 | self._realign_fields() |
||
380 | self.last_known_prior_values = self.last_known_prior_values.reindex( |
||
381 | index=self.prior_values_index, |
||
382 | ) |
||
383 | return field |
||
384 | |||
385 | def _add_length(self, spec, dt): |
||
386 | """ |
||
387 | Increases the length of the digest panel for spec.frequency. If this |
||
388 | does not have a panel, and one is needed; a digest panel will be |
||
389 | constructed. |
||
390 | """ |
||
391 | old_count = self.largest_specs[spec.frequency].bar_count |
||
392 | self.largest_specs[spec.frequency] = spec |
||
393 | delta = spec.bar_count - old_count |
||
394 | |||
395 | panel = self.digest_panels.get(spec.frequency) |
||
396 | |||
397 | if panel is None: |
||
398 | # The old length for this frequency was 1 bar, meaning no digest |
||
399 | # panel was held. We must construct a new one here. |
||
400 | panel = self._create_digest_panel(dt, spec=spec) |
||
401 | |||
402 | else: |
||
403 | self._resize_panel(panel, spec.bar_count - 1, dt, |
||
404 | freq=spec.frequency) |
||
405 | |||
406 | self.digest_panels[spec.frequency] = panel |
||
407 | |||
408 | return LengthDelta(spec.frequency, delta) |
||
409 | |||
410 | def _resize_panel(self, panel, size, dt, freq): |
||
411 | """ |
||
412 | Resizes a panel, fills the date_buf with the correct values. |
||
413 | """ |
||
414 | # This is the oldest datetime that will be shown in the current window |
||
415 | # of the panel. |
||
416 | oldest_dt = pd.Timestamp(panel.start_date, tz='utc',) |
||
417 | delta = size - panel.window_length |
||
418 | |||
419 | # Construct the missing dates. |
||
420 | missing_dts = self._create_window_date_buf( |
||
421 | delta, freq.unit_str, freq.data_frequency, oldest_dt, |
||
422 | ) |
||
423 | |||
424 | panel.extend_back(missing_dts) |
||
425 | |||
426 | def _create_window_date_buf(self, |
||
427 | window, |
||
428 | unit_str, |
||
429 | data_frequency, |
||
430 | dt): |
||
431 | """ |
||
432 | Creates a window length date_buf looking backwards from dt. |
||
433 | """ |
||
434 | if unit_str == 'd': |
||
435 | # Get the properly key'd datetime64 out of the pandas Timestamp |
||
436 | if data_frequency != 'daily': |
||
437 | arr = self.env.open_close_window( |
||
438 | dt, |
||
439 | window, |
||
440 | offset=-window, |
||
441 | ).market_close.astype('datetime64[ns]').values |
||
442 | else: |
||
443 | arr = self.env.open_close_window( |
||
444 | dt, |
||
445 | window, |
||
446 | offset=-window, |
||
447 | ).index.values |
||
448 | |||
449 | return arr |
||
450 | else: |
||
451 | return self.env.market_minute_window( |
||
452 | self.env.previous_market_minute(dt), |
||
453 | window, |
||
454 | step=-1, |
||
455 | )[::-1].values |
||
456 | |||
457 | def _create_panel(self, dt, spec): |
||
458 | """ |
||
459 | Constructs a rolling panel with a properly aligned date_buf. |
||
460 | """ |
||
461 | dt = normalize_to_data_freq(spec.frequency.data_frequency, dt) |
||
462 | |||
463 | window = spec.bar_count - 1 |
||
464 | |||
465 | date_buf = self._create_window_date_buf( |
||
466 | window, |
||
467 | spec.frequency.unit_str, |
||
468 | spec.frequency.data_frequency, |
||
469 | dt, |
||
470 | ) |
||
471 | |||
472 | panel = RollingPanel( |
||
473 | window=window, |
||
474 | items=self.fields, |
||
475 | sids=self.sids, |
||
476 | initial_dates=date_buf, |
||
477 | ) |
||
478 | |||
479 | return panel |
||
480 | |||
481 | def _create_digest_panel(self, |
||
482 | dt, |
||
483 | spec, |
||
484 | window_starts=None, |
||
485 | window_closes=None): |
||
486 | """ |
||
487 | Creates a digest panel, setting the window_starts and window_closes. |
||
488 | If window_starts or window_closes are None, then self.cur_window_starts |
||
489 | or self.cur_window_closes will be used. |
||
490 | """ |
||
491 | freq = spec.frequency |
||
492 | |||
493 | window_starts = window_starts if window_starts is not None \ |
||
494 | else self.cur_window_starts |
||
495 | window_closes = window_closes if window_closes is not None \ |
||
496 | else self.cur_window_closes |
||
497 | |||
498 | window_starts[freq] = freq.normalize(dt) |
||
499 | window_closes[freq] = freq.window_close(window_starts[freq]) |
||
500 | |||
501 | return self._create_panel(dt, spec) |
||
502 | |||
503 | def ensure_spec(self, spec, dt, bar_data): |
||
504 | """ |
||
505 | Ensure that this container has enough space to hold the data for the |
||
506 | given spec. This returns a HistoryContainerDelta to represent the |
||
507 | changes in shape that the container made to support the new |
||
508 | HistorySpec. |
||
509 | """ |
||
510 | updated = {} |
||
511 | if spec.field not in self.fields: |
||
512 | updated['field'] = self._add_field(spec.field) |
||
513 | if spec.frequency not in self.largest_specs: |
||
514 | updated['frequency_delta'] = self._add_frequency( |
||
515 | spec, dt, bar_data, |
||
516 | ) |
||
517 | if spec.bar_count > self.largest_specs[spec.frequency].bar_count: |
||
518 | updated['length_delta'] = self._add_length(spec, dt) |
||
519 | return HistoryContainerDelta(**updated) |
||
520 | |||
521 | def add_sids(self, to_add): |
||
522 | """ |
||
523 | Add new sids to the container. |
||
524 | """ |
||
525 | self.sids = pd.Index( |
||
526 | sorted(self.sids.union(_ensure_index(to_add))), |
||
527 | ) |
||
528 | self._realign_sids() |
||
529 | |||
530 | def drop_sids(self, to_drop): |
||
531 | """ |
||
532 | Remove sids from the container. |
||
533 | """ |
||
534 | self.sids = pd.Index( |
||
535 | sorted(self.sids.difference(_ensure_index(to_drop))), |
||
536 | ) |
||
537 | self._realign_sids() |
||
538 | |||
539 | def _realign_sids(self): |
||
540 | """ |
||
541 | Realign our constituent panels after adding or removing sids. |
||
542 | """ |
||
543 | self.last_known_prior_values = self.last_known_prior_values.reindex( |
||
544 | columns=self.sids, |
||
545 | ) |
||
546 | for panel in self.all_panels: |
||
547 | panel.set_minor_axis(self.sids) |
||
548 | |||
549 | def _realign_fields(self): |
||
550 | self.last_known_prior_values = self.last_known_prior_values.reindex( |
||
551 | index=self.prior_values_index, |
||
552 | ) |
||
553 | for panel in self.all_panels: |
||
554 | panel.set_items(self.fields) |
||
555 | |||
556 | def create_digest_panels(self, |
||
557 | initial_sids, |
||
558 | initial_dt): |
||
559 | """ |
||
560 | Initialize a RollingPanel for each unique panel frequency being stored |
||
561 | by this container. Each RollingPanel pre-allocates enough storage |
||
562 | space to service the highest bar-count of any history call that it |
||
563 | serves. |
||
564 | """ |
||
565 | # Map from frequency -> first/last minute of the next digest to be |
||
566 | # rolled for that frequency. |
||
567 | first_window_starts = {} |
||
568 | first_window_closes = {} |
||
569 | |||
570 | # Map from frequency -> digest_panels. |
||
571 | panels = {} |
||
572 | for freq, largest_spec in iteritems(self.largest_specs): |
||
573 | if largest_spec.bar_count == 1: |
||
574 | # No need to allocate a digest panel; this frequency will only |
||
575 | # ever use data drawn from self.buffer_panel. |
||
576 | first_window_starts[freq] = freq.normalize(initial_dt) |
||
577 | first_window_closes[freq] = freq.window_close( |
||
578 | first_window_starts[freq] |
||
579 | ) |
||
580 | |||
581 | continue |
||
582 | |||
583 | dt = initial_dt |
||
584 | |||
585 | rp = self._create_digest_panel( |
||
586 | dt, |
||
587 | spec=largest_spec, |
||
588 | window_starts=first_window_starts, |
||
589 | window_closes=first_window_closes, |
||
590 | ) |
||
591 | |||
592 | panels[freq] = rp |
||
593 | |||
594 | return panels, first_window_starts, first_window_closes |
||
595 | |||
596 | def create_buffer_panel(self, initial_dt, bar_data): |
||
597 | """ |
||
598 | Initialize a RollingPanel containing enough minutes to service all our |
||
599 | frequencies. |
||
600 | """ |
||
601 | max_bars_needed = max( |
||
602 | freq.max_bars for freq in self.unique_frequencies |
||
603 | ) |
||
604 | freq = '1m' if self.data_frequency == 'minute' else '1d' |
||
605 | spec = HistorySpec( |
||
606 | max_bars_needed + 1, freq, None, None, self.env, |
||
607 | self.data_frequency, |
||
608 | ) |
||
609 | |||
610 | rp = self._create_panel( |
||
611 | initial_dt, spec, |
||
612 | ) |
||
613 | self.buffer_spec = spec |
||
614 | |||
615 | if bar_data is not None: |
||
616 | frame = self.frame_from_bardata(bar_data, initial_dt) |
||
617 | rp.add_frame(initial_dt, frame) |
||
618 | |||
619 | return rp |
||
620 | |||
621 | def convert_columns(self, values): |
||
622 | """ |
||
623 | If columns have a specific type you want to enforce, overwrite this |
||
624 | method and return the transformed values. |
||
625 | """ |
||
626 | return values |
||
627 | |||
628 | def digest_bars(self, history_spec, do_ffill): |
||
629 | """ |
||
630 | Get the last (history_spec.bar_count - 1) bars from self.digest_panel |
||
631 | for the requested HistorySpec. |
||
632 | """ |
||
633 | bar_count = history_spec.bar_count |
||
634 | if bar_count == 1: |
||
635 | # slicing with [1 - bar_count:] doesn't work when bar_count == 1, |
||
636 | # so special-casing this. |
||
637 | res = pd.DataFrame(index=[], columns=self.sids, dtype=float) |
||
638 | return res.values, res.index |
||
639 | |||
640 | field = history_spec.field |
||
641 | |||
642 | # Panel axes are (field, dates, sids). We want just the entries for |
||
643 | # the requested field, the last (bar_count - 1) data points, and all |
||
644 | # sids. |
||
645 | digest_panel = self.digest_panels[history_spec.frequency] |
||
646 | frame = digest_panel.get_current(field, raw=True) |
||
647 | if do_ffill: |
||
648 | # Do forward-filling *before* truncating down to the requested |
||
649 | # number of bars. This protects us from losing data if an illiquid |
||
650 | # stock has a gap in its price history. |
||
651 | filled = ffill_digest_frame_from_prior_values( |
||
652 | history_spec.frequency, |
||
653 | history_spec.field, |
||
654 | frame, |
||
655 | self.last_known_prior_values, |
||
656 | raw=True |
||
657 | # Truncate only after we've forward-filled |
||
658 | ) |
||
659 | indexer = slice(1 - bar_count, None) |
||
660 | return filled[indexer], digest_panel.current_dates()[indexer] |
||
661 | else: |
||
662 | indexer = slice(1 - bar_count, None) |
||
663 | return frame[indexer, :], digest_panel.current_dates()[indexer] |
||
664 | |||
665 | def buffer_panel_minutes(self, |
||
666 | buffer_panel, |
||
667 | earliest_minute=None, |
||
668 | latest_minute=None, |
||
669 | raw=False): |
||
670 | """ |
||
671 | Get the minutes in @buffer_panel between @earliest_minute and |
||
672 | @latest_minute, inclusive. |
||
673 | |||
674 | @buffer_panel can be a RollingPanel or a plain Panel. If a |
||
675 | RollingPanel is supplied, we call `get_current` to extract a Panel |
||
676 | object. |
||
677 | |||
678 | If no value is specified for @earliest_minute, use all the minutes we |
||
679 | have up until @latest minute. |
||
680 | |||
681 | If no value for @latest_minute is specified, use all values up until |
||
682 | the latest minute. |
||
683 | """ |
||
684 | if isinstance(buffer_panel, RollingPanel): |
||
685 | buffer_panel = buffer_panel.get_current(start=earliest_minute, |
||
686 | end=latest_minute, |
||
687 | raw=raw) |
||
688 | return buffer_panel |
||
689 | # Using .ix here rather than .loc because loc requires that the keys |
||
690 | # are actually in the index, whereas .ix returns all the values between |
||
691 | # earliest_minute and latest_minute, which is what we want. |
||
692 | return buffer_panel.ix[:, earliest_minute:latest_minute, :] |
||
693 | |||
694 | def frame_from_bardata(self, data, algo_dt): |
||
695 | """ |
||
696 | Create a DataFrame from the given BarData and algo dt. |
||
697 | """ |
||
698 | data = data._data |
||
699 | frame_data = np.empty((len(self.fields), len(self.sids))) * np.nan |
||
700 | |||
701 | for j, sid in enumerate(self.sids): |
||
702 | sid_data = data.get(sid) |
||
703 | if not sid_data: |
||
704 | continue |
||
705 | if algo_dt != sid_data['dt']: |
||
706 | continue |
||
707 | for i, field in enumerate(self.fields): |
||
708 | frame_data[i, j] = sid_data.get(field, np.nan) |
||
709 | |||
710 | return pd.DataFrame( |
||
711 | frame_data, |
||
712 | index=self.fields.copy(), |
||
713 | columns=self.sids.copy(), |
||
714 | ) |
||
715 | |||
716 | def update(self, data, algo_dt): |
||
717 | """ |
||
718 | Takes the bar at @algo_dt's @data, checks to see if we need to roll any |
||
719 | new digests, then adds new data to the buffer panel. |
||
720 | """ |
||
721 | frame = self.frame_from_bardata(data, algo_dt) |
||
722 | |||
723 | self.update_last_known_values() |
||
724 | self.update_digest_panels(algo_dt, self.buffer_panel) |
||
725 | self.buffer_panel.add_frame(algo_dt, frame) |
||
726 | |||
727 | def update_digest_panels(self, algo_dt, buffer_panel, freq_filter=None): |
||
728 | """ |
||
729 | Check whether @algo_dt is greater than cur_window_close for any of our |
||
730 | frequencies. If so, roll a digest for that frequency using data drawn |
||
731 | from @buffer panel and insert it into the appropriate digest panels. |
||
732 | |||
733 | If @freq_filter is specified, only use the given data to update |
||
734 | frequencies on which the filter returns True. |
||
735 | |||
736 | This takes `buffer_panel` as an argument rather than using |
||
737 | self.buffer_panel so that this method can be used to add supplemental |
||
738 | data from an external source. |
||
739 | """ |
||
740 | for frequency in filter(freq_filter, self.unique_frequencies): |
||
741 | |||
742 | # We don't keep a digest panel if we only have a length-1 history |
||
743 | # spec for a given frequency |
||
744 | digest_panel = self.digest_panels.get(frequency, None) |
||
745 | |||
746 | while algo_dt > self.cur_window_closes[frequency]: |
||
747 | |||
748 | earliest_minute = self.cur_window_starts[frequency] |
||
749 | latest_minute = self.cur_window_closes[frequency] |
||
750 | minutes_to_process = self.buffer_panel_minutes( |
||
751 | buffer_panel, |
||
752 | earliest_minute=earliest_minute, |
||
753 | latest_minute=latest_minute, |
||
754 | raw=True |
||
755 | ) |
||
756 | |||
757 | if digest_panel is not None: |
||
758 | # Create a digest from minutes_to_process and add it to |
||
759 | # digest_panel. |
||
760 | digest_frame = self.create_new_digest_frame( |
||
761 | minutes_to_process, |
||
762 | self.fields, |
||
763 | self.sids |
||
764 | ) |
||
765 | digest_panel.add_frame( |
||
766 | latest_minute, |
||
767 | digest_frame, |
||
768 | self.fields, |
||
769 | self.sids |
||
770 | ) |
||
771 | |||
772 | # Update panel start/close for this frequency. |
||
773 | self.cur_window_starts[frequency] = \ |
||
774 | frequency.next_window_start(latest_minute) |
||
775 | self.cur_window_closes[frequency] = \ |
||
776 | frequency.window_close(self.cur_window_starts[frequency]) |
||
777 | |||
778 | def frame_to_series(self, field, frame, columns=None): |
||
779 | """ |
||
780 | Convert a frame with a DatetimeIndex and sid columns into a series with |
||
781 | a sid index, using the aggregator defined by the given field. |
||
782 | """ |
||
783 | if isinstance(frame, pd.DataFrame): |
||
784 | columns = frame.columns |
||
785 | frame = frame.values |
||
786 | |||
787 | if not len(frame): |
||
788 | return pd.Series( |
||
789 | data=(0 if field == 'volume' else np.nan), |
||
790 | index=columns, |
||
791 | ).values |
||
792 | |||
793 | if field in ['price', 'close_price']: |
||
794 | # shortcircuit for full last row |
||
795 | vals = frame[-1] |
||
796 | if np.all(~np.isnan(vals)): |
||
797 | return vals |
||
798 | return ffill(frame)[-1] |
||
799 | elif field == 'open_price': |
||
800 | return bfill(frame)[0] |
||
801 | elif field == 'volume': |
||
802 | return np.nansum(frame, axis=0) |
||
803 | elif field == 'high': |
||
804 | return np.nanmax(frame, axis=0) |
||
805 | elif field == 'low': |
||
806 | return np.nanmin(frame, axis=0) |
||
807 | else: |
||
808 | raise ValueError("Unknown field {}".format(field)) |
||
809 | |||
810 | def aggregate_ohlcv_panel(self, |
||
811 | fields, |
||
812 | ohlcv_panel, |
||
813 | items=None, |
||
814 | minor_axis=None): |
||
815 | """ |
||
816 | Convert an OHLCV Panel into a DataFrame by aggregating each field's |
||
817 | frame into a Series. |
||
818 | """ |
||
819 | vals = ohlcv_panel |
||
820 | if isinstance(ohlcv_panel, pd.Panel): |
||
821 | vals = ohlcv_panel.values |
||
822 | items = ohlcv_panel.items |
||
823 | minor_axis = ohlcv_panel.minor_axis |
||
824 | |||
825 | data = [ |
||
826 | self.frame_to_series( |
||
827 | field, |
||
828 | vals[items.get_loc(field)], |
||
829 | minor_axis |
||
830 | ) |
||
831 | for field in fields |
||
832 | ] |
||
833 | return np.array(data) |
||
834 | |||
835 | def create_new_digest_frame(self, buffer_minutes, items=None, |
||
836 | minor_axis=None): |
||
837 | """ |
||
838 | Package up minutes in @buffer_minutes into a single digest frame. |
||
839 | """ |
||
840 | return self.aggregate_ohlcv_panel( |
||
841 | self.fields, |
||
842 | buffer_minutes, |
||
843 | items=items, |
||
844 | minor_axis=minor_axis |
||
845 | ) |
||
846 | |||
847 | def update_last_known_values(self): |
||
848 | """ |
||
849 | Store the non-NaN values from our oldest frame in each frequency. |
||
850 | """ |
||
851 | ffillable = self.ffillable_fields |
||
852 | if not len(ffillable): |
||
853 | return |
||
854 | |||
855 | for frequency in self.unique_frequencies: |
||
856 | digest_panel = self.digest_panels.get(frequency, None) |
||
857 | if digest_panel: |
||
858 | oldest_known_values = digest_panel.oldest_frame(raw=True) |
||
859 | else: |
||
860 | oldest_known_values = self.buffer_panel.oldest_frame(raw=True) |
||
861 | |||
862 | oldest_vals = oldest_known_values |
||
863 | oldest_columns = self.fields |
||
864 | for field in ffillable: |
||
865 | f_idx = oldest_columns.get_loc(field) |
||
866 | field_vals = oldest_vals[f_idx] |
||
867 | # isnan would be fast, possible to use? |
||
868 | non_nan_sids = np.where(pd.notnull(field_vals)) |
||
869 | key = (frequency.freq_str, field) |
||
870 | key_loc = self.last_known_prior_values.index.get_loc(key) |
||
871 | self.last_known_prior_values.values[ |
||
872 | key_loc, non_nan_sids |
||
873 | ] = field_vals[non_nan_sids] |
||
874 | |||
875 | def get_history(self, history_spec, algo_dt): |
||
876 | """ |
||
877 | Main API used by the algoscript is mapped to this function. |
||
878 | |||
879 | Selects from the overarching history panel the values for the |
||
880 | @history_spec at the given @algo_dt. |
||
881 | """ |
||
882 | field = history_spec.field |
||
883 | do_ffill = history_spec.ffill |
||
884 | |||
885 | # Get our stored values from periods prior to the current period. |
||
886 | digest_frame, index = self.digest_bars(history_spec, do_ffill) |
||
887 | |||
888 | # Get minutes from our buffer panel to build the last row of the |
||
889 | # returned frame. |
||
890 | buffer_panel = self.buffer_panel_minutes( |
||
891 | self.buffer_panel, |
||
892 | earliest_minute=self.cur_window_starts[history_spec.frequency], |
||
893 | raw=True |
||
894 | ) |
||
895 | buffer_frame = buffer_panel[self.fields.get_loc(field)] |
||
896 | |||
897 | if do_ffill: |
||
898 | buffer_frame = ffill_buffer_from_prior_values( |
||
899 | history_spec.frequency, |
||
900 | field, |
||
901 | buffer_frame, |
||
902 | digest_frame, |
||
903 | self.last_known_prior_values, |
||
904 | raw=True |
||
905 | ) |
||
906 | last_period = self.frame_to_series(field, buffer_frame, self.sids) |
||
907 | return fast_build_history_output(digest_frame, |
||
908 | last_period, |
||
909 | algo_dt, |
||
910 | index=index, |
||
911 | columns=self.sids) |
||
912 | |||
960 |