| Total Complexity | 80 |
| Total Lines | 721 |
| Duplicated Lines | 0 % |
Complex classes like zipline.history.HistoryContainer often do a lot of different things. To break such a class down, we need to identify a cohesive component within that class. A common approach to find such a component is to look for fields/methods that share the same prefixes, or suffixes.
Once you have determined the fields that belong together, you can apply the Extract Class refactoring. If the component makes sense as a sub-class, Extract Subclass is also a candidate, and is often faster.
| 1 | # |
||
| 191 | class HistoryContainer(object): |
||
| 192 | """ |
||
| 193 | Container for all history panels and frames used by an algoscript. |
||
| 194 | |||
| 195 | To be used internally by TradingAlgorithm, but *not* passed directly to the |
||
| 196 | algorithm. |
||
| 197 | |||
| 198 | Entry point for the algoscript is the result of `get_history`. |
||
| 199 | """ |
||
| 200 | VALID_FIELDS = { |
||
| 201 | 'price', 'open_price', 'volume', 'high', 'low', 'close_price', |
||
| 202 | } |
||
| 203 | |||
| 204 | def __init__(self, |
||
| 205 | history_specs, |
||
| 206 | initial_sids, |
||
| 207 | initial_dt, |
||
| 208 | data_frequency, |
||
| 209 | env, |
||
| 210 | bar_data=None): |
||
| 211 | """ |
||
| 212 | A container to hold a rolling window of historical data within a user's |
||
| 213 | algorithm. |
||
| 214 | |||
| 215 | Args: |
||
| 216 | history_specs (dict[Frequency:HistorySpec]): The starting history |
||
| 217 | specs that this container should be able to service. |
||
| 218 | |||
| 219 | initial_sids (set[Asset or Int]): The starting sids to watch. |
||
| 220 | |||
| 221 | initial_dt (datetime): The datetime to start collecting history from. |
||
| 222 | |||
| 223 | bar_data (BarData): If this container is being constructed during |
||
| 224 | handle_data, this is the BarData for the current bar to fill the |
||
| 225 | buffer with. If this is constructed elsewhere, it is None. |
||
| 226 | |||
| 227 | Returns: |
||
| 228 | An instance of a new HistoryContainer |
||
| 229 | """ |
||
| 230 | |||
| 231 | # Store a reference to the env |
||
| 232 | self.env = env |
||
| 233 | |||
| 234 | # History specs to be served by this container. |
||
| 235 | self.history_specs = history_specs |
||
| 236 | self.largest_specs = compute_largest_specs( |
||
| 237 | itervalues(self.history_specs) |
||
| 238 | ) |
||
| 239 | |||
| 240 | # The set of fields specified by all history specs |
||
| 241 | self.fields = pd.Index( |
||
| 242 | sorted(set(spec.field for spec in itervalues(history_specs))) |
||
| 243 | ) |
||
| 244 | self.sids = pd.Index( |
||
| 245 | sorted(set(initial_sids or [])) |
||
| 246 | ) |
||
| 247 | |||
| 248 | self.data_frequency = data_frequency |
||
| 249 | |||
| 250 | initial_dt = normalize_to_data_freq(self.data_frequency, initial_dt) |
||
| 251 | |||
| 252 | # This panel contains raw minutes for periods that haven't been fully |
||
| 253 | # completed. When a frequency period rolls over, these minutes are |
||
| 254 | # digested using some sort of aggregation call on the panel (e.g. `sum` |
||
| 255 | # for volume, `max` for high, `min` for low, etc.). |
||
| 256 | self.buffer_panel = self.create_buffer_panel(initial_dt, bar_data) |
||
| 257 | |||
| 258 | # Dictionaries with Frequency objects as keys. |
||
| 259 | self.digest_panels, self.cur_window_starts, self.cur_window_closes = \ |
||
| 260 | self.create_digest_panels(initial_sids, initial_dt) |
||
| 261 | |||
| 262 | # Helps prop up the prior day panel against having a nan, when the data |
||
| 263 | # has been seen. |
||
| 264 | self.last_known_prior_values = pd.DataFrame( |
||
| 265 | data=None, |
||
| 266 | index=self.prior_values_index, |
||
| 267 | columns=self.prior_values_columns, |
||
| 268 | # Note: For bizarre "intricacies of the spaghetti that is pandas |
||
| 269 | # indexing logic" reasons, setting this dtype prevents indexing |
||
| 270 | # errors in update_last_known_values. This is safe for the time |
||
| 271 | # being because our only forward-fillable fields are floats. If we |
||
| 272 | # need to add a non-float-typed forward-fillable field, then we may |
||
| 273 | # find ourselves having to track down and fix a pandas bug. |
||
| 274 | dtype=np.float64, |
||
| 275 | ) |
||
| 276 | |||
| 277 | _ffillable_fields = None |
||
| 278 | |||
| 279 | @property |
||
| 280 | def ffillable_fields(self): |
||
| 281 | if self._ffillable_fields is None: |
||
| 282 | fillables = self.fields.intersection(HistorySpec.FORWARD_FILLABLE) |
||
| 283 | self._ffillable_fields = fillables |
||
| 284 | return self._ffillable_fields |
||
| 285 | |||
| 286 | @property |
||
| 287 | def prior_values_index(self): |
||
| 288 | index_values = list( |
||
| 289 | product( |
||
| 290 | (freq.freq_str for freq in self.unique_frequencies), |
||
| 291 | # Only store prior values for forward-fillable fields. |
||
| 292 | self.ffillable_fields, |
||
| 293 | ) |
||
| 294 | ) |
||
| 295 | if index_values: |
||
| 296 | return pd.MultiIndex.from_tuples(index_values) |
||
| 297 | else: |
||
| 298 | # MultiIndex doesn't gracefully support empty input, so we return |
||
| 299 | # an empty regular Index if we have values. |
||
| 300 | return pd.Index(index_values) |
||
| 301 | |||
| 302 | @property |
||
| 303 | def prior_values_columns(self): |
||
| 304 | return self.sids |
||
| 305 | |||
| 306 | @property |
||
| 307 | def all_panels(self): |
||
| 308 | yield self.buffer_panel |
||
| 309 | for panel in self.digest_panels.values(): |
||
| 310 | yield panel |
||
| 311 | |||
| 312 | @property |
||
| 313 | def unique_frequencies(self): |
||
| 314 | """ |
||
| 315 | Return an iterator over all the unique frequencies serviced by this |
||
| 316 | container. |
||
| 317 | """ |
||
| 318 | return iterkeys(self.largest_specs) |
||
| 319 | |||
| 320 | def _add_frequency(self, spec, dt, data): |
||
| 321 | """ |
||
| 322 | Adds a new frequency to the container. This reshapes the buffer_panel |
||
| 323 | if needed. |
||
| 324 | """ |
||
| 325 | freq = spec.frequency |
||
| 326 | self.largest_specs[freq] = spec |
||
| 327 | new_buffer_len = 0 |
||
| 328 | |||
| 329 | if freq.max_bars > self.buffer_panel.window_length: |
||
| 330 | # More bars need to be held in the buffer_panel to support this |
||
| 331 | # freq |
||
| 332 | if freq.data_frequency \ |
||
| 333 | != self.buffer_spec.frequency.data_frequency: |
||
| 334 | # If the data_frequencies are not the same, then we need to |
||
| 335 | # create a fresh buffer. |
||
| 336 | self.buffer_panel = self.create_buffer_panel( |
||
| 337 | dt, bar_data=data, |
||
| 338 | ) |
||
| 339 | new_buffer_len = None |
||
| 340 | else: |
||
| 341 | # The frequencies are the same, we just need to add more bars. |
||
| 342 | self._resize_panel( |
||
| 343 | self.buffer_panel, |
||
| 344 | freq.max_bars, |
||
| 345 | dt, |
||
| 346 | self.buffer_spec.frequency, |
||
| 347 | ) |
||
| 348 | new_buffer_len = freq.max_minutes |
||
| 349 | # update the current buffer_spec to reflect the new lenght. |
||
| 350 | self.buffer_spec.bar_count = new_buffer_len + 1 |
||
| 351 | |||
| 352 | if spec.bar_count > 1: |
||
| 353 | # This spec has more than one bar, construct a digest panel for it. |
||
| 354 | self.digest_panels[freq] = self._create_digest_panel(dt, spec=spec) |
||
| 355 | else: |
||
| 356 | self.cur_window_starts[freq] = dt |
||
| 357 | self.cur_window_closes[freq] = freq.window_close( |
||
| 358 | self.cur_window_starts[freq] |
||
| 359 | ) |
||
| 360 | |||
| 361 | self.last_known_prior_values = self.last_known_prior_values.reindex( |
||
| 362 | index=self.prior_values_index, |
||
| 363 | ) |
||
| 364 | |||
| 365 | return FrequencyDelta(freq, new_buffer_len) |
||
| 366 | |||
| 367 | def _add_field(self, field): |
||
| 368 | """ |
||
| 369 | Adds a new field to the container. |
||
| 370 | """ |
||
| 371 | # self.fields is already sorted, so we just need to insert the new |
||
| 372 | # field in the correct index. |
||
| 373 | ls = list(self.fields) |
||
| 374 | insort_left(ls, field) |
||
| 375 | self.fields = pd.Index(ls) |
||
| 376 | # unset fillable fields cache |
||
| 377 | self._ffillable_fields = None |
||
| 378 | |||
| 379 | self._realign_fields() |
||
| 380 | self.last_known_prior_values = self.last_known_prior_values.reindex( |
||
| 381 | index=self.prior_values_index, |
||
| 382 | ) |
||
| 383 | return field |
||
| 384 | |||
| 385 | def _add_length(self, spec, dt): |
||
| 386 | """ |
||
| 387 | Increases the length of the digest panel for spec.frequency. If this |
||
| 388 | does not have a panel, and one is needed; a digest panel will be |
||
| 389 | constructed. |
||
| 390 | """ |
||
| 391 | old_count = self.largest_specs[spec.frequency].bar_count |
||
| 392 | self.largest_specs[spec.frequency] = spec |
||
| 393 | delta = spec.bar_count - old_count |
||
| 394 | |||
| 395 | panel = self.digest_panels.get(spec.frequency) |
||
| 396 | |||
| 397 | if panel is None: |
||
| 398 | # The old length for this frequency was 1 bar, meaning no digest |
||
| 399 | # panel was held. We must construct a new one here. |
||
| 400 | panel = self._create_digest_panel(dt, spec=spec) |
||
| 401 | |||
| 402 | else: |
||
| 403 | self._resize_panel(panel, spec.bar_count - 1, dt, |
||
| 404 | freq=spec.frequency) |
||
| 405 | |||
| 406 | self.digest_panels[spec.frequency] = panel |
||
| 407 | |||
| 408 | return LengthDelta(spec.frequency, delta) |
||
| 409 | |||
| 410 | def _resize_panel(self, panel, size, dt, freq): |
||
| 411 | """ |
||
| 412 | Resizes a panel, fills the date_buf with the correct values. |
||
| 413 | """ |
||
| 414 | # This is the oldest datetime that will be shown in the current window |
||
| 415 | # of the panel. |
||
| 416 | oldest_dt = pd.Timestamp(panel.start_date, tz='utc',) |
||
| 417 | delta = size - panel.window_length |
||
| 418 | |||
| 419 | # Construct the missing dates. |
||
| 420 | missing_dts = self._create_window_date_buf( |
||
| 421 | delta, freq.unit_str, freq.data_frequency, oldest_dt, |
||
| 422 | ) |
||
| 423 | |||
| 424 | panel.extend_back(missing_dts) |
||
| 425 | |||
| 426 | def _create_window_date_buf(self, |
||
| 427 | window, |
||
| 428 | unit_str, |
||
| 429 | data_frequency, |
||
| 430 | dt): |
||
| 431 | """ |
||
| 432 | Creates a window length date_buf looking backwards from dt. |
||
| 433 | """ |
||
| 434 | if unit_str == 'd': |
||
| 435 | # Get the properly key'd datetime64 out of the pandas Timestamp |
||
| 436 | if data_frequency != 'daily': |
||
| 437 | arr = self.env.open_close_window( |
||
| 438 | dt, |
||
| 439 | window, |
||
| 440 | offset=-window, |
||
| 441 | ).market_close.astype('datetime64[ns]').values |
||
| 442 | else: |
||
| 443 | arr = self.env.open_close_window( |
||
| 444 | dt, |
||
| 445 | window, |
||
| 446 | offset=-window, |
||
| 447 | ).index.values |
||
| 448 | |||
| 449 | return arr |
||
| 450 | else: |
||
| 451 | return self.env.market_minute_window( |
||
| 452 | self.env.previous_market_minute(dt), |
||
| 453 | window, |
||
| 454 | step=-1, |
||
| 455 | )[::-1].values |
||
| 456 | |||
| 457 | def _create_panel(self, dt, spec): |
||
| 458 | """ |
||
| 459 | Constructs a rolling panel with a properly aligned date_buf. |
||
| 460 | """ |
||
| 461 | dt = normalize_to_data_freq(spec.frequency.data_frequency, dt) |
||
| 462 | |||
| 463 | window = spec.bar_count - 1 |
||
| 464 | |||
| 465 | date_buf = self._create_window_date_buf( |
||
| 466 | window, |
||
| 467 | spec.frequency.unit_str, |
||
| 468 | spec.frequency.data_frequency, |
||
| 469 | dt, |
||
| 470 | ) |
||
| 471 | |||
| 472 | panel = RollingPanel( |
||
| 473 | window=window, |
||
| 474 | items=self.fields, |
||
| 475 | sids=self.sids, |
||
| 476 | initial_dates=date_buf, |
||
| 477 | ) |
||
| 478 | |||
| 479 | return panel |
||
| 480 | |||
| 481 | def _create_digest_panel(self, |
||
| 482 | dt, |
||
| 483 | spec, |
||
| 484 | window_starts=None, |
||
| 485 | window_closes=None): |
||
| 486 | """ |
||
| 487 | Creates a digest panel, setting the window_starts and window_closes. |
||
| 488 | If window_starts or window_closes are None, then self.cur_window_starts |
||
| 489 | or self.cur_window_closes will be used. |
||
| 490 | """ |
||
| 491 | freq = spec.frequency |
||
| 492 | |||
| 493 | window_starts = window_starts if window_starts is not None \ |
||
| 494 | else self.cur_window_starts |
||
| 495 | window_closes = window_closes if window_closes is not None \ |
||
| 496 | else self.cur_window_closes |
||
| 497 | |||
| 498 | window_starts[freq] = freq.normalize(dt) |
||
| 499 | window_closes[freq] = freq.window_close(window_starts[freq]) |
||
| 500 | |||
| 501 | return self._create_panel(dt, spec) |
||
| 502 | |||
| 503 | def ensure_spec(self, spec, dt, bar_data): |
||
| 504 | """ |
||
| 505 | Ensure that this container has enough space to hold the data for the |
||
| 506 | given spec. This returns a HistoryContainerDelta to represent the |
||
| 507 | changes in shape that the container made to support the new |
||
| 508 | HistorySpec. |
||
| 509 | """ |
||
| 510 | updated = {} |
||
| 511 | if spec.field not in self.fields: |
||
| 512 | updated['field'] = self._add_field(spec.field) |
||
| 513 | if spec.frequency not in self.largest_specs: |
||
| 514 | updated['frequency_delta'] = self._add_frequency( |
||
| 515 | spec, dt, bar_data, |
||
| 516 | ) |
||
| 517 | if spec.bar_count > self.largest_specs[spec.frequency].bar_count: |
||
| 518 | updated['length_delta'] = self._add_length(spec, dt) |
||
| 519 | return HistoryContainerDelta(**updated) |
||
| 520 | |||
| 521 | def add_sids(self, to_add): |
||
| 522 | """ |
||
| 523 | Add new sids to the container. |
||
| 524 | """ |
||
| 525 | self.sids = pd.Index( |
||
| 526 | sorted(self.sids.union(_ensure_index(to_add))), |
||
| 527 | ) |
||
| 528 | self._realign_sids() |
||
| 529 | |||
| 530 | def drop_sids(self, to_drop): |
||
| 531 | """ |
||
| 532 | Remove sids from the container. |
||
| 533 | """ |
||
| 534 | self.sids = pd.Index( |
||
| 535 | sorted(self.sids.difference(_ensure_index(to_drop))), |
||
| 536 | ) |
||
| 537 | self._realign_sids() |
||
| 538 | |||
| 539 | def _realign_sids(self): |
||
| 540 | """ |
||
| 541 | Realign our constituent panels after adding or removing sids. |
||
| 542 | """ |
||
| 543 | self.last_known_prior_values = self.last_known_prior_values.reindex( |
||
| 544 | columns=self.sids, |
||
| 545 | ) |
||
| 546 | for panel in self.all_panels: |
||
| 547 | panel.set_minor_axis(self.sids) |
||
| 548 | |||
| 549 | def _realign_fields(self): |
||
| 550 | self.last_known_prior_values = self.last_known_prior_values.reindex( |
||
| 551 | index=self.prior_values_index, |
||
| 552 | ) |
||
| 553 | for panel in self.all_panels: |
||
| 554 | panel.set_items(self.fields) |
||
| 555 | |||
| 556 | def create_digest_panels(self, |
||
| 557 | initial_sids, |
||
| 558 | initial_dt): |
||
| 559 | """ |
||
| 560 | Initialize a RollingPanel for each unique panel frequency being stored |
||
| 561 | by this container. Each RollingPanel pre-allocates enough storage |
||
| 562 | space to service the highest bar-count of any history call that it |
||
| 563 | serves. |
||
| 564 | """ |
||
| 565 | # Map from frequency -> first/last minute of the next digest to be |
||
| 566 | # rolled for that frequency. |
||
| 567 | first_window_starts = {} |
||
| 568 | first_window_closes = {} |
||
| 569 | |||
| 570 | # Map from frequency -> digest_panels. |
||
| 571 | panels = {} |
||
| 572 | for freq, largest_spec in iteritems(self.largest_specs): |
||
| 573 | if largest_spec.bar_count == 1: |
||
| 574 | # No need to allocate a digest panel; this frequency will only |
||
| 575 | # ever use data drawn from self.buffer_panel. |
||
| 576 | first_window_starts[freq] = freq.normalize(initial_dt) |
||
| 577 | first_window_closes[freq] = freq.window_close( |
||
| 578 | first_window_starts[freq] |
||
| 579 | ) |
||
| 580 | |||
| 581 | continue |
||
| 582 | |||
| 583 | dt = initial_dt |
||
| 584 | |||
| 585 | rp = self._create_digest_panel( |
||
| 586 | dt, |
||
| 587 | spec=largest_spec, |
||
| 588 | window_starts=first_window_starts, |
||
| 589 | window_closes=first_window_closes, |
||
| 590 | ) |
||
| 591 | |||
| 592 | panels[freq] = rp |
||
| 593 | |||
| 594 | return panels, first_window_starts, first_window_closes |
||
| 595 | |||
| 596 | def create_buffer_panel(self, initial_dt, bar_data): |
||
| 597 | """ |
||
| 598 | Initialize a RollingPanel containing enough minutes to service all our |
||
| 599 | frequencies. |
||
| 600 | """ |
||
| 601 | max_bars_needed = max( |
||
| 602 | freq.max_bars for freq in self.unique_frequencies |
||
| 603 | ) |
||
| 604 | freq = '1m' if self.data_frequency == 'minute' else '1d' |
||
| 605 | spec = HistorySpec( |
||
| 606 | max_bars_needed + 1, freq, None, None, self.env, |
||
| 607 | self.data_frequency, |
||
| 608 | ) |
||
| 609 | |||
| 610 | rp = self._create_panel( |
||
| 611 | initial_dt, spec, |
||
| 612 | ) |
||
| 613 | self.buffer_spec = spec |
||
| 614 | |||
| 615 | if bar_data is not None: |
||
| 616 | frame = self.frame_from_bardata(bar_data, initial_dt) |
||
| 617 | rp.add_frame(initial_dt, frame) |
||
| 618 | |||
| 619 | return rp |
||
| 620 | |||
| 621 | def convert_columns(self, values): |
||
| 622 | """ |
||
| 623 | If columns have a specific type you want to enforce, overwrite this |
||
| 624 | method and return the transformed values. |
||
| 625 | """ |
||
| 626 | return values |
||
| 627 | |||
| 628 | def digest_bars(self, history_spec, do_ffill): |
||
| 629 | """ |
||
| 630 | Get the last (history_spec.bar_count - 1) bars from self.digest_panel |
||
| 631 | for the requested HistorySpec. |
||
| 632 | """ |
||
| 633 | bar_count = history_spec.bar_count |
||
| 634 | if bar_count == 1: |
||
| 635 | # slicing with [1 - bar_count:] doesn't work when bar_count == 1, |
||
| 636 | # so special-casing this. |
||
| 637 | res = pd.DataFrame(index=[], columns=self.sids, dtype=float) |
||
| 638 | return res.values, res.index |
||
| 639 | |||
| 640 | field = history_spec.field |
||
| 641 | |||
| 642 | # Panel axes are (field, dates, sids). We want just the entries for |
||
| 643 | # the requested field, the last (bar_count - 1) data points, and all |
||
| 644 | # sids. |
||
| 645 | digest_panel = self.digest_panels[history_spec.frequency] |
||
| 646 | frame = digest_panel.get_current(field, raw=True) |
||
| 647 | if do_ffill: |
||
| 648 | # Do forward-filling *before* truncating down to the requested |
||
| 649 | # number of bars. This protects us from losing data if an illiquid |
||
| 650 | # stock has a gap in its price history. |
||
| 651 | filled = ffill_digest_frame_from_prior_values( |
||
| 652 | history_spec.frequency, |
||
| 653 | history_spec.field, |
||
| 654 | frame, |
||
| 655 | self.last_known_prior_values, |
||
| 656 | raw=True |
||
| 657 | # Truncate only after we've forward-filled |
||
| 658 | ) |
||
| 659 | indexer = slice(1 - bar_count, None) |
||
| 660 | return filled[indexer], digest_panel.current_dates()[indexer] |
||
| 661 | else: |
||
| 662 | indexer = slice(1 - bar_count, None) |
||
| 663 | return frame[indexer, :], digest_panel.current_dates()[indexer] |
||
| 664 | |||
| 665 | def buffer_panel_minutes(self, |
||
| 666 | buffer_panel, |
||
| 667 | earliest_minute=None, |
||
| 668 | latest_minute=None, |
||
| 669 | raw=False): |
||
| 670 | """ |
||
| 671 | Get the minutes in @buffer_panel between @earliest_minute and |
||
| 672 | @latest_minute, inclusive. |
||
| 673 | |||
| 674 | @buffer_panel can be a RollingPanel or a plain Panel. If a |
||
| 675 | RollingPanel is supplied, we call `get_current` to extract a Panel |
||
| 676 | object. |
||
| 677 | |||
| 678 | If no value is specified for @earliest_minute, use all the minutes we |
||
| 679 | have up until @latest minute. |
||
| 680 | |||
| 681 | If no value for @latest_minute is specified, use all values up until |
||
| 682 | the latest minute. |
||
| 683 | """ |
||
| 684 | if isinstance(buffer_panel, RollingPanel): |
||
| 685 | buffer_panel = buffer_panel.get_current(start=earliest_minute, |
||
| 686 | end=latest_minute, |
||
| 687 | raw=raw) |
||
| 688 | return buffer_panel |
||
| 689 | # Using .ix here rather than .loc because loc requires that the keys |
||
| 690 | # are actually in the index, whereas .ix returns all the values between |
||
| 691 | # earliest_minute and latest_minute, which is what we want. |
||
| 692 | return buffer_panel.ix[:, earliest_minute:latest_minute, :] |
||
| 693 | |||
| 694 | def frame_from_bardata(self, data, algo_dt): |
||
| 695 | """ |
||
| 696 | Create a DataFrame from the given BarData and algo dt. |
||
| 697 | """ |
||
| 698 | data = data._data |
||
| 699 | frame_data = np.empty((len(self.fields), len(self.sids))) * np.nan |
||
| 700 | |||
| 701 | for j, sid in enumerate(self.sids): |
||
| 702 | sid_data = data.get(sid) |
||
| 703 | if not sid_data: |
||
| 704 | continue |
||
| 705 | if algo_dt != sid_data['dt']: |
||
| 706 | continue |
||
| 707 | for i, field in enumerate(self.fields): |
||
| 708 | frame_data[i, j] = sid_data.get(field, np.nan) |
||
| 709 | |||
| 710 | return pd.DataFrame( |
||
| 711 | frame_data, |
||
| 712 | index=self.fields.copy(), |
||
| 713 | columns=self.sids.copy(), |
||
| 714 | ) |
||
| 715 | |||
| 716 | def update(self, data, algo_dt): |
||
| 717 | """ |
||
| 718 | Takes the bar at @algo_dt's @data, checks to see if we need to roll any |
||
| 719 | new digests, then adds new data to the buffer panel. |
||
| 720 | """ |
||
| 721 | frame = self.frame_from_bardata(data, algo_dt) |
||
| 722 | |||
| 723 | self.update_last_known_values() |
||
| 724 | self.update_digest_panels(algo_dt, self.buffer_panel) |
||
| 725 | self.buffer_panel.add_frame(algo_dt, frame) |
||
| 726 | |||
| 727 | def update_digest_panels(self, algo_dt, buffer_panel, freq_filter=None): |
||
| 728 | """ |
||
| 729 | Check whether @algo_dt is greater than cur_window_close for any of our |
||
| 730 | frequencies. If so, roll a digest for that frequency using data drawn |
||
| 731 | from @buffer panel and insert it into the appropriate digest panels. |
||
| 732 | |||
| 733 | If @freq_filter is specified, only use the given data to update |
||
| 734 | frequencies on which the filter returns True. |
||
| 735 | |||
| 736 | This takes `buffer_panel` as an argument rather than using |
||
| 737 | self.buffer_panel so that this method can be used to add supplemental |
||
| 738 | data from an external source. |
||
| 739 | """ |
||
| 740 | for frequency in filter(freq_filter, self.unique_frequencies): |
||
| 741 | |||
| 742 | # We don't keep a digest panel if we only have a length-1 history |
||
| 743 | # spec for a given frequency |
||
| 744 | digest_panel = self.digest_panels.get(frequency, None) |
||
| 745 | |||
| 746 | while algo_dt > self.cur_window_closes[frequency]: |
||
| 747 | |||
| 748 | earliest_minute = self.cur_window_starts[frequency] |
||
| 749 | latest_minute = self.cur_window_closes[frequency] |
||
| 750 | minutes_to_process = self.buffer_panel_minutes( |
||
| 751 | buffer_panel, |
||
| 752 | earliest_minute=earliest_minute, |
||
| 753 | latest_minute=latest_minute, |
||
| 754 | raw=True |
||
| 755 | ) |
||
| 756 | |||
| 757 | if digest_panel is not None: |
||
| 758 | # Create a digest from minutes_to_process and add it to |
||
| 759 | # digest_panel. |
||
| 760 | digest_frame = self.create_new_digest_frame( |
||
| 761 | minutes_to_process, |
||
| 762 | self.fields, |
||
| 763 | self.sids |
||
| 764 | ) |
||
| 765 | digest_panel.add_frame( |
||
| 766 | latest_minute, |
||
| 767 | digest_frame, |
||
| 768 | self.fields, |
||
| 769 | self.sids |
||
| 770 | ) |
||
| 771 | |||
| 772 | # Update panel start/close for this frequency. |
||
| 773 | self.cur_window_starts[frequency] = \ |
||
| 774 | frequency.next_window_start(latest_minute) |
||
| 775 | self.cur_window_closes[frequency] = \ |
||
| 776 | frequency.window_close(self.cur_window_starts[frequency]) |
||
| 777 | |||
| 778 | def frame_to_series(self, field, frame, columns=None): |
||
| 779 | """ |
||
| 780 | Convert a frame with a DatetimeIndex and sid columns into a series with |
||
| 781 | a sid index, using the aggregator defined by the given field. |
||
| 782 | """ |
||
| 783 | if isinstance(frame, pd.DataFrame): |
||
| 784 | columns = frame.columns |
||
| 785 | frame = frame.values |
||
| 786 | |||
| 787 | if not len(frame): |
||
| 788 | return pd.Series( |
||
| 789 | data=(0 if field == 'volume' else np.nan), |
||
| 790 | index=columns, |
||
| 791 | ).values |
||
| 792 | |||
| 793 | if field in ['price', 'close_price']: |
||
| 794 | # shortcircuit for full last row |
||
| 795 | vals = frame[-1] |
||
| 796 | if np.all(~np.isnan(vals)): |
||
| 797 | return vals |
||
| 798 | return ffill(frame)[-1] |
||
| 799 | elif field == 'open_price': |
||
| 800 | return bfill(frame)[0] |
||
| 801 | elif field == 'volume': |
||
| 802 | return np.nansum(frame, axis=0) |
||
| 803 | elif field == 'high': |
||
| 804 | return np.nanmax(frame, axis=0) |
||
| 805 | elif field == 'low': |
||
| 806 | return np.nanmin(frame, axis=0) |
||
| 807 | else: |
||
| 808 | raise ValueError("Unknown field {}".format(field)) |
||
| 809 | |||
| 810 | def aggregate_ohlcv_panel(self, |
||
| 811 | fields, |
||
| 812 | ohlcv_panel, |
||
| 813 | items=None, |
||
| 814 | minor_axis=None): |
||
| 815 | """ |
||
| 816 | Convert an OHLCV Panel into a DataFrame by aggregating each field's |
||
| 817 | frame into a Series. |
||
| 818 | """ |
||
| 819 | vals = ohlcv_panel |
||
| 820 | if isinstance(ohlcv_panel, pd.Panel): |
||
| 821 | vals = ohlcv_panel.values |
||
| 822 | items = ohlcv_panel.items |
||
| 823 | minor_axis = ohlcv_panel.minor_axis |
||
| 824 | |||
| 825 | data = [ |
||
| 826 | self.frame_to_series( |
||
| 827 | field, |
||
| 828 | vals[items.get_loc(field)], |
||
| 829 | minor_axis |
||
| 830 | ) |
||
| 831 | for field in fields |
||
| 832 | ] |
||
| 833 | return np.array(data) |
||
| 834 | |||
| 835 | def create_new_digest_frame(self, buffer_minutes, items=None, |
||
| 836 | minor_axis=None): |
||
| 837 | """ |
||
| 838 | Package up minutes in @buffer_minutes into a single digest frame. |
||
| 839 | """ |
||
| 840 | return self.aggregate_ohlcv_panel( |
||
| 841 | self.fields, |
||
| 842 | buffer_minutes, |
||
| 843 | items=items, |
||
| 844 | minor_axis=minor_axis |
||
| 845 | ) |
||
| 846 | |||
| 847 | def update_last_known_values(self): |
||
| 848 | """ |
||
| 849 | Store the non-NaN values from our oldest frame in each frequency. |
||
| 850 | """ |
||
| 851 | ffillable = self.ffillable_fields |
||
| 852 | if not len(ffillable): |
||
| 853 | return |
||
| 854 | |||
| 855 | for frequency in self.unique_frequencies: |
||
| 856 | digest_panel = self.digest_panels.get(frequency, None) |
||
| 857 | if digest_panel: |
||
| 858 | oldest_known_values = digest_panel.oldest_frame(raw=True) |
||
| 859 | else: |
||
| 860 | oldest_known_values = self.buffer_panel.oldest_frame(raw=True) |
||
| 861 | |||
| 862 | oldest_vals = oldest_known_values |
||
| 863 | oldest_columns = self.fields |
||
| 864 | for field in ffillable: |
||
| 865 | f_idx = oldest_columns.get_loc(field) |
||
| 866 | field_vals = oldest_vals[f_idx] |
||
| 867 | # isnan would be fast, possible to use? |
||
| 868 | non_nan_sids = np.where(pd.notnull(field_vals)) |
||
| 869 | key = (frequency.freq_str, field) |
||
| 870 | key_loc = self.last_known_prior_values.index.get_loc(key) |
||
| 871 | self.last_known_prior_values.values[ |
||
| 872 | key_loc, non_nan_sids |
||
| 873 | ] = field_vals[non_nan_sids] |
||
| 874 | |||
| 875 | def get_history(self, history_spec, algo_dt): |
||
| 876 | """ |
||
| 877 | Main API used by the algoscript is mapped to this function. |
||
| 878 | |||
| 879 | Selects from the overarching history panel the values for the |
||
| 880 | @history_spec at the given @algo_dt. |
||
| 881 | """ |
||
| 882 | field = history_spec.field |
||
| 883 | do_ffill = history_spec.ffill |
||
| 884 | |||
| 885 | # Get our stored values from periods prior to the current period. |
||
| 886 | digest_frame, index = self.digest_bars(history_spec, do_ffill) |
||
| 887 | |||
| 888 | # Get minutes from our buffer panel to build the last row of the |
||
| 889 | # returned frame. |
||
| 890 | buffer_panel = self.buffer_panel_minutes( |
||
| 891 | self.buffer_panel, |
||
| 892 | earliest_minute=self.cur_window_starts[history_spec.frequency], |
||
| 893 | raw=True |
||
| 894 | ) |
||
| 895 | buffer_frame = buffer_panel[self.fields.get_loc(field)] |
||
| 896 | |||
| 897 | if do_ffill: |
||
| 898 | buffer_frame = ffill_buffer_from_prior_values( |
||
| 899 | history_spec.frequency, |
||
| 900 | field, |
||
| 901 | buffer_frame, |
||
| 902 | digest_frame, |
||
| 903 | self.last_known_prior_values, |
||
| 904 | raw=True |
||
| 905 | ) |
||
| 906 | last_period = self.frame_to_series(field, buffer_frame, self.sids) |
||
| 907 | return fast_build_history_output(digest_frame, |
||
| 908 | last_period, |
||
| 909 | algo_dt, |
||
| 910 | index=index, |
||
| 911 | columns=self.sids) |
||
| 912 | |||
| 960 |