zipline.history.HistoryContainer.update()   A
last analyzed

Complexity

Conditions 1

Size

Total Lines 10

Duplication

Lines 0
Ratio 0 %
Metric Value
cc 1
dl 0
loc 10
rs 9.4286
1
#
2
# Copyright 2014 Quantopian, Inc.
3
#
4
# Licensed under the Apache License, Version 2.0 (the "License");
5
# you may not use this file except in compliance with the License.
6
# You may obtain a copy of the License at
7
#
8
#     http://www.apache.org/licenses/LICENSE-2.0
9
#
10
# Unless required by applicable law or agreed to in writing, software
11
# distributed under the License is distributed on an "AS IS" BASIS,
12
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13
# See the License for the specific language governing permissions and
14
# limitations under the License.
15
from bisect import insort_left
16
from collections import namedtuple
17
from itertools import groupby, product
18
19
import logbook
20
import numpy as np
21
import pandas as pd
22
from six import itervalues, iteritems, iterkeys
23
24
from . history import HistorySpec
25
26
from zipline.utils.data import RollingPanel, _ensure_index
27
from zipline.utils.munge import ffill, bfill
28
29
logger = logbook.Logger('History Container')
30
31
32
# The closing price is referred to by multiple names,
33
# allow both for price rollover logic etc.
34
CLOSING_PRICE_FIELDS = frozenset({'price', 'close_price'})
35
36
37
def ffill_buffer_from_prior_values(freq,
38
                                   field,
39
                                   buffer_frame,
40
                                   digest_frame,
41
                                   pv_frame,
42
                                   raw=False):
43
    """
44
    Forward-fill a buffer frame, falling back to the end-of-period values of a
45
    digest frame if the buffer frame has leading NaNs.
46
    """
47
    # convert to ndarray if necessary
48
    digest_values = digest_frame
49
    if raw and isinstance(digest_frame, pd.DataFrame):
50
        digest_values = digest_frame.values
51
52
    buffer_values = buffer_frame
53
    if raw and isinstance(buffer_frame, pd.DataFrame):
54
        buffer_values = buffer_frame.values
55
56
    nan_sids = pd.isnull(buffer_values[0])
57
    if np.any(nan_sids) and len(digest_values):
58
        # If we have any leading nans in the buffer and we have a non-empty
59
        # digest frame, use the oldest digest values as the initial buffer
60
        # values.
61
        buffer_values[0, nan_sids] = digest_values[-1, nan_sids]
62
63
    nan_sids = pd.isnull(buffer_values[0])
64
    if np.any(nan_sids):
65
        # If we still have leading nans, fall back to the last known values
66
        # from before the digest.
67
        key_loc = pv_frame.index.get_loc((freq.freq_str, field))
68
        filler = pv_frame.values[key_loc, nan_sids]
69
        buffer_values[0, nan_sids] = filler
70
71
    if raw:
72
        filled = ffill(buffer_values)
73
        return filled
74
75
    return buffer_frame.ffill()
76
77
78
def ffill_digest_frame_from_prior_values(freq,
79
                                         field,
80
                                         digest_frame,
81
                                         pv_frame,
82
                                         raw=False):
83
    """
84
    Forward-fill a digest frame, falling back to the last known prior values if
85
    necessary.
86
    """
87
    # convert to ndarray if necessary
88
    values = digest_frame
89
    if raw and isinstance(digest_frame, pd.DataFrame):
90
        values = digest_frame.values
91
92
    nan_sids = pd.isnull(values[0])
93
    if np.any(nan_sids):
94
        # If we have any leading nans in the frame, use values from pv_frame to
95
        # seed values for those sids.
96
        key_loc = pv_frame.index.get_loc((freq.freq_str, field))
97
        filler = pv_frame.values[key_loc, nan_sids]
98
        values[0, nan_sids] = filler
99
100
    if raw:
101
        filled = ffill(values)
102
        return filled
103
104
    return digest_frame.ffill()
105
106
107
def freq_str_and_bar_count(history_spec):
108
    """
109
    Helper for getting the frequency string and bar count from a history spec.
110
    """
111
    return (history_spec.frequency.freq_str, history_spec.bar_count)
112
113
114
def next_bar(spec, env):
115
    """
116
    Returns a function that will return the next bar for a given datetime.
117
    """
118
    if spec.frequency.unit_str == 'd':
119
        if spec.frequency.data_frequency == 'minute':
120
            return lambda dt: env.get_open_and_close(
121
                env.next_trading_day(dt),
122
            )[1]
123
        else:
124
            return env.next_trading_day
125
    else:
126
        return env.next_market_minute
127
128
129
def compute_largest_specs(history_specs):
130
    """
131
    Maps a Frequency to the largest HistorySpec at that frequency from an
132
    iterable of HistorySpecs.
133
    """
134
    return {key: max(group, key=lambda f: f.bar_count)
135
            for key, group in groupby(
136
                sorted(history_specs, key=freq_str_and_bar_count),
137
                key=lambda spec: spec.frequency)}
138
139
140
# tuples to store a change to the shape of a HistoryContainer
141
142
FrequencyDelta = namedtuple(
143
    'FrequencyDelta',
144
    ['freq', 'buffer_delta'],
145
)
146
147
148
LengthDelta = namedtuple(
149
    'LengthDelta',
150
    ['freq', 'delta'],
151
)
152
153
154
HistoryContainerDeltaSuper = namedtuple(
155
    'HistoryContainerDelta',
156
    ['field', 'frequency_delta', 'length_delta'],
157
)
158
159
160
class HistoryContainerDelta(HistoryContainerDeltaSuper):
161
    """
162
    A class representing a resize of the history container.
163
    """
164
    def __new__(cls, field=None, frequency_delta=None, length_delta=None):
165
        """
166
        field is a new field that was added.
167
        frequency is a FrequencyDelta representing a new frequency was added.
168
        length is a bar LengthDelta which is a frequency and a bar_count.
169
        If any field is None, then no change occurred of that type.
170
        """
171
        return super(HistoryContainerDelta, cls).__new__(
172
            cls, field, frequency_delta, length_delta,
173
        )
174
175
    @property
176
    def empty(self):
177
        """
178
        Checks if the delta is empty.
179
        """
180
        return (self.field is None and
181
                self.frequency_delta is None and
182
                self.length_delta is None)
183
184
185
def normalize_to_data_freq(data_frequency, dt):
186
    if data_frequency == 'minute':
187
        return dt
188
    return pd.tslib.normalize_date(dt)
189
190
191
class HistoryContainer(object):
192
    """
193
    Container for all history panels and frames used by an algoscript.
194
195
    To be used internally by TradingAlgorithm, but *not* passed directly to the
196
    algorithm.
197
198
    Entry point for the algoscript is the result of `get_history`.
199
    """
200
    VALID_FIELDS = {
201
        'price', 'open_price', 'volume', 'high', 'low', 'close_price',
202
    }
203
204
    def __init__(self,
205
                 history_specs,
206
                 initial_sids,
207
                 initial_dt,
208
                 data_frequency,
209
                 env,
210
                 bar_data=None):
211
        """
212
        A container to hold a rolling window of historical data within a user's
213
        algorithm.
214
215
        Args:
216
          history_specs (dict[Frequency:HistorySpec]): The starting history
217
            specs that this container should be able to service.
218
219
          initial_sids (set[Asset or Int]): The starting sids to watch.
220
221
          initial_dt (datetime): The datetime to start collecting history from.
222
223
          bar_data (BarData): If this container is being constructed during
224
            handle_data, this is the BarData for the current bar to fill the
225
            buffer with. If this is constructed elsewhere, it is None.
226
227
        Returns:
228
          An instance of a new HistoryContainer
229
        """
230
231
        # Store a reference to the env
232
        self.env = env
233
234
        # History specs to be served by this container.
235
        self.history_specs = history_specs
236
        self.largest_specs = compute_largest_specs(
237
            itervalues(self.history_specs)
238
        )
239
240
        # The set of fields specified by all history specs
241
        self.fields = pd.Index(
242
            sorted(set(spec.field for spec in itervalues(history_specs)))
243
        )
244
        self.sids = pd.Index(
245
            sorted(set(initial_sids or []))
246
        )
247
248
        self.data_frequency = data_frequency
249
250
        initial_dt = normalize_to_data_freq(self.data_frequency, initial_dt)
251
252
        # This panel contains raw minutes for periods that haven't been fully
253
        # completed.  When a frequency period rolls over, these minutes are
254
        # digested using some sort of aggregation call on the panel (e.g. `sum`
255
        # for volume, `max` for high, `min` for low, etc.).
256
        self.buffer_panel = self.create_buffer_panel(initial_dt, bar_data)
257
258
        # Dictionaries with Frequency objects as keys.
259
        self.digest_panels, self.cur_window_starts, self.cur_window_closes = \
260
            self.create_digest_panels(initial_sids, initial_dt)
261
262
        # Helps prop up the prior day panel against having a nan, when the data
263
        # has been seen.
264
        self.last_known_prior_values = pd.DataFrame(
265
            data=None,
266
            index=self.prior_values_index,
267
            columns=self.prior_values_columns,
268
            # Note: For bizarre "intricacies of the spaghetti that is pandas
269
            # indexing logic" reasons, setting this dtype prevents indexing
270
            # errors in update_last_known_values.  This is safe for the time
271
            # being because our only forward-fillable fields are floats.  If we
272
            # need to add a non-float-typed forward-fillable field, then we may
273
            # find ourselves having to track down and fix a pandas bug.
274
            dtype=np.float64,
275
        )
276
277
    _ffillable_fields = None
278
279
    @property
280
    def ffillable_fields(self):
281
        if self._ffillable_fields is None:
282
            fillables = self.fields.intersection(HistorySpec.FORWARD_FILLABLE)
283
            self._ffillable_fields = fillables
284
        return self._ffillable_fields
285
286
    @property
287
    def prior_values_index(self):
288
        index_values = list(
289
            product(
290
                (freq.freq_str for freq in self.unique_frequencies),
291
                # Only store prior values for forward-fillable fields.
292
                self.ffillable_fields,
293
            )
294
        )
295
        if index_values:
296
            return pd.MultiIndex.from_tuples(index_values)
297
        else:
298
            # MultiIndex doesn't gracefully support empty input, so we return
299
            # an empty regular Index if we have values.
300
            return pd.Index(index_values)
301
302
    @property
303
    def prior_values_columns(self):
304
        return self.sids
305
306
    @property
307
    def all_panels(self):
308
        yield self.buffer_panel
309
        for panel in self.digest_panels.values():
310
            yield panel
311
312
    @property
313
    def unique_frequencies(self):
314
        """
315
        Return an iterator over all the unique frequencies serviced by this
316
        container.
317
        """
318
        return iterkeys(self.largest_specs)
319
320
    def _add_frequency(self, spec, dt, data):
321
        """
322
        Adds a new frequency to the container. This reshapes the buffer_panel
323
        if needed.
324
        """
325
        freq = spec.frequency
326
        self.largest_specs[freq] = spec
327
        new_buffer_len = 0
328
329
        if freq.max_bars > self.buffer_panel.window_length:
330
            # More bars need to be held in the buffer_panel to support this
331
            # freq
332
            if freq.data_frequency \
333
               != self.buffer_spec.frequency.data_frequency:
334
                # If the data_frequencies are not the same, then we need to
335
                # create a fresh buffer.
336
                self.buffer_panel = self.create_buffer_panel(
337
                    dt, bar_data=data,
338
                )
339
                new_buffer_len = None
340
            else:
341
                # The frequencies are the same, we just need to add more bars.
342
                self._resize_panel(
343
                    self.buffer_panel,
344
                    freq.max_bars,
345
                    dt,
346
                    self.buffer_spec.frequency,
347
                )
348
                new_buffer_len = freq.max_minutes
349
                # update the current buffer_spec to reflect the new lenght.
350
                self.buffer_spec.bar_count = new_buffer_len + 1
351
352
        if spec.bar_count > 1:
353
            # This spec has more than one bar, construct a digest panel for it.
354
            self.digest_panels[freq] = self._create_digest_panel(dt, spec=spec)
355
        else:
356
            self.cur_window_starts[freq] = dt
357
            self.cur_window_closes[freq] = freq.window_close(
358
                self.cur_window_starts[freq]
359
            )
360
361
        self.last_known_prior_values = self.last_known_prior_values.reindex(
362
            index=self.prior_values_index,
363
        )
364
365
        return FrequencyDelta(freq, new_buffer_len)
366
367
    def _add_field(self, field):
368
        """
369
        Adds a new field to the container.
370
        """
371
        # self.fields is already sorted, so we just need to insert the new
372
        # field in the correct index.
373
        ls = list(self.fields)
374
        insort_left(ls, field)
375
        self.fields = pd.Index(ls)
376
        # unset fillable fields cache
377
        self._ffillable_fields = None
378
379
        self._realign_fields()
380
        self.last_known_prior_values = self.last_known_prior_values.reindex(
381
            index=self.prior_values_index,
382
        )
383
        return field
384
385
    def _add_length(self, spec, dt):
386
        """
387
        Increases the length of the digest panel for spec.frequency. If this
388
        does not have a panel, and one is needed; a digest panel will be
389
        constructed.
390
        """
391
        old_count = self.largest_specs[spec.frequency].bar_count
392
        self.largest_specs[spec.frequency] = spec
393
        delta = spec.bar_count - old_count
394
395
        panel = self.digest_panels.get(spec.frequency)
396
397
        if panel is None:
398
            # The old length for this frequency was 1 bar, meaning no digest
399
            # panel was held. We must construct a new one here.
400
            panel = self._create_digest_panel(dt, spec=spec)
401
402
        else:
403
            self._resize_panel(panel, spec.bar_count - 1, dt,
404
                               freq=spec.frequency)
405
406
        self.digest_panels[spec.frequency] = panel
407
408
        return LengthDelta(spec.frequency, delta)
409
410
    def _resize_panel(self, panel, size, dt, freq):
411
        """
412
        Resizes a panel, fills the date_buf with the correct values.
413
        """
414
        # This is the oldest datetime that will be shown in the current window
415
        # of the panel.
416
        oldest_dt = pd.Timestamp(panel.start_date, tz='utc',)
417
        delta = size - panel.window_length
418
419
        # Construct the missing dates.
420
        missing_dts = self._create_window_date_buf(
421
            delta, freq.unit_str, freq.data_frequency, oldest_dt,
422
        )
423
424
        panel.extend_back(missing_dts)
425
426
    def _create_window_date_buf(self,
427
                                window,
428
                                unit_str,
429
                                data_frequency,
430
                                dt):
431
        """
432
        Creates a window length date_buf looking backwards from dt.
433
        """
434
        if unit_str == 'd':
435
            # Get the properly key'd datetime64 out of the pandas Timestamp
436
            if data_frequency != 'daily':
437
                arr = self.env.open_close_window(
438
                    dt,
439
                    window,
440
                    offset=-window,
441
                ).market_close.astype('datetime64[ns]').values
442
            else:
443
                arr = self.env.open_close_window(
444
                    dt,
445
                    window,
446
                    offset=-window,
447
                ).index.values
448
449
            return arr
450
        else:
451
            return self.env.market_minute_window(
452
                self.env.previous_market_minute(dt),
453
                window,
454
                step=-1,
455
            )[::-1].values
456
457
    def _create_panel(self, dt, spec):
458
        """
459
        Constructs a rolling panel with a properly aligned date_buf.
460
        """
461
        dt = normalize_to_data_freq(spec.frequency.data_frequency, dt)
462
463
        window = spec.bar_count - 1
464
465
        date_buf = self._create_window_date_buf(
466
            window,
467
            spec.frequency.unit_str,
468
            spec.frequency.data_frequency,
469
            dt,
470
        )
471
472
        panel = RollingPanel(
473
            window=window,
474
            items=self.fields,
475
            sids=self.sids,
476
            initial_dates=date_buf,
477
        )
478
479
        return panel
480
481
    def _create_digest_panel(self,
482
                             dt,
483
                             spec,
484
                             window_starts=None,
485
                             window_closes=None):
486
        """
487
        Creates a digest panel, setting the window_starts and window_closes.
488
        If window_starts or window_closes are None, then self.cur_window_starts
489
        or self.cur_window_closes will be used.
490
        """
491
        freq = spec.frequency
492
493
        window_starts = window_starts if window_starts is not None \
494
            else self.cur_window_starts
495
        window_closes = window_closes if window_closes is not None \
496
            else self.cur_window_closes
497
498
        window_starts[freq] = freq.normalize(dt)
499
        window_closes[freq] = freq.window_close(window_starts[freq])
500
501
        return self._create_panel(dt, spec)
502
503
    def ensure_spec(self, spec, dt, bar_data):
504
        """
505
        Ensure that this container has enough space to hold the data for the
506
        given spec. This returns a HistoryContainerDelta to represent the
507
        changes in shape that the container made to support the new
508
        HistorySpec.
509
        """
510
        updated = {}
511
        if spec.field not in self.fields:
512
            updated['field'] = self._add_field(spec.field)
513
        if spec.frequency not in self.largest_specs:
514
            updated['frequency_delta'] = self._add_frequency(
515
                spec, dt, bar_data,
516
            )
517
        if spec.bar_count > self.largest_specs[spec.frequency].bar_count:
518
            updated['length_delta'] = self._add_length(spec, dt)
519
        return HistoryContainerDelta(**updated)
520
521
    def add_sids(self, to_add):
522
        """
523
        Add new sids to the container.
524
        """
525
        self.sids = pd.Index(
526
            sorted(self.sids.union(_ensure_index(to_add))),
527
        )
528
        self._realign_sids()
529
530
    def drop_sids(self, to_drop):
531
        """
532
        Remove sids from the container.
533
        """
534
        self.sids = pd.Index(
535
            sorted(self.sids.difference(_ensure_index(to_drop))),
536
        )
537
        self._realign_sids()
538
539
    def _realign_sids(self):
540
        """
541
        Realign our constituent panels after adding or removing sids.
542
        """
543
        self.last_known_prior_values = self.last_known_prior_values.reindex(
544
            columns=self.sids,
545
        )
546
        for panel in self.all_panels:
547
            panel.set_minor_axis(self.sids)
548
549
    def _realign_fields(self):
550
        self.last_known_prior_values = self.last_known_prior_values.reindex(
551
            index=self.prior_values_index,
552
        )
553
        for panel in self.all_panels:
554
            panel.set_items(self.fields)
555
556
    def create_digest_panels(self,
557
                             initial_sids,
558
                             initial_dt):
559
        """
560
        Initialize a RollingPanel for each unique panel frequency being stored
561
        by this container.  Each RollingPanel pre-allocates enough storage
562
        space to service the highest bar-count of any history call that it
563
        serves.
564
        """
565
        # Map from frequency -> first/last minute of the next digest to be
566
        # rolled for that frequency.
567
        first_window_starts = {}
568
        first_window_closes = {}
569
570
        # Map from frequency -> digest_panels.
571
        panels = {}
572
        for freq, largest_spec in iteritems(self.largest_specs):
573
            if largest_spec.bar_count == 1:
574
                # No need to allocate a digest panel; this frequency will only
575
                # ever use data drawn from self.buffer_panel.
576
                first_window_starts[freq] = freq.normalize(initial_dt)
577
                first_window_closes[freq] = freq.window_close(
578
                    first_window_starts[freq]
579
                )
580
581
                continue
582
583
            dt = initial_dt
584
585
            rp = self._create_digest_panel(
586
                dt,
587
                spec=largest_spec,
588
                window_starts=first_window_starts,
589
                window_closes=first_window_closes,
590
            )
591
592
            panels[freq] = rp
593
594
        return panels, first_window_starts, first_window_closes
595
596
    def create_buffer_panel(self, initial_dt, bar_data):
597
        """
598
        Initialize a RollingPanel containing enough minutes to service all our
599
        frequencies.
600
        """
601
        max_bars_needed = max(
602
            freq.max_bars for freq in self.unique_frequencies
603
        )
604
        freq = '1m' if self.data_frequency == 'minute' else '1d'
605
        spec = HistorySpec(
606
            max_bars_needed + 1, freq, None, None, self.env,
607
            self.data_frequency,
608
        )
609
610
        rp = self._create_panel(
611
            initial_dt, spec,
612
        )
613
        self.buffer_spec = spec
614
615
        if bar_data is not None:
616
            frame = self.frame_from_bardata(bar_data, initial_dt)
617
            rp.add_frame(initial_dt, frame)
618
619
        return rp
620
621
    def convert_columns(self, values):
622
        """
623
        If columns have a specific type you want to enforce, overwrite this
624
        method and return the transformed values.
625
        """
626
        return values
627
628
    def digest_bars(self, history_spec, do_ffill):
629
        """
630
        Get the last (history_spec.bar_count - 1) bars from self.digest_panel
631
        for the requested HistorySpec.
632
        """
633
        bar_count = history_spec.bar_count
634
        if bar_count == 1:
635
            # slicing with [1 - bar_count:] doesn't work when bar_count == 1,
636
            # so special-casing this.
637
            res = pd.DataFrame(index=[], columns=self.sids, dtype=float)
638
            return res.values, res.index
639
640
        field = history_spec.field
641
642
        # Panel axes are (field, dates, sids).  We want just the entries for
643
        # the requested field, the last (bar_count - 1) data points, and all
644
        # sids.
645
        digest_panel = self.digest_panels[history_spec.frequency]
646
        frame = digest_panel.get_current(field, raw=True)
647
        if do_ffill:
648
            # Do forward-filling *before* truncating down to the requested
649
            # number of bars.  This protects us from losing data if an illiquid
650
            # stock has a gap in its price history.
651
            filled = ffill_digest_frame_from_prior_values(
652
                history_spec.frequency,
653
                history_spec.field,
654
                frame,
655
                self.last_known_prior_values,
656
                raw=True
657
                # Truncate only after we've forward-filled
658
            )
659
            indexer = slice(1 - bar_count, None)
660
            return filled[indexer], digest_panel.current_dates()[indexer]
661
        else:
662
            indexer = slice(1 - bar_count, None)
663
            return frame[indexer, :], digest_panel.current_dates()[indexer]
664
665
    def buffer_panel_minutes(self,
666
                             buffer_panel,
667
                             earliest_minute=None,
668
                             latest_minute=None,
669
                             raw=False):
670
        """
671
        Get the minutes in @buffer_panel between @earliest_minute and
672
        @latest_minute, inclusive.
673
674
        @buffer_panel can be a RollingPanel or a plain Panel.  If a
675
        RollingPanel is supplied, we call `get_current` to extract a Panel
676
        object.
677
678
        If no value is specified for @earliest_minute, use all the minutes we
679
        have up until @latest minute.
680
681
        If no value for @latest_minute is specified, use all values up until
682
        the latest minute.
683
        """
684
        if isinstance(buffer_panel, RollingPanel):
685
            buffer_panel = buffer_panel.get_current(start=earliest_minute,
686
                                                    end=latest_minute,
687
                                                    raw=raw)
688
            return buffer_panel
689
        # Using .ix here rather than .loc because loc requires that the keys
690
        # are actually in the index, whereas .ix returns all the values between
691
        # earliest_minute and latest_minute, which is what we want.
692
        return buffer_panel.ix[:, earliest_minute:latest_minute, :]
693
694
    def frame_from_bardata(self, data, algo_dt):
695
        """
696
        Create a DataFrame from the given BarData and algo dt.
697
        """
698
        data = data._data
699
        frame_data = np.empty((len(self.fields), len(self.sids))) * np.nan
700
701
        for j, sid in enumerate(self.sids):
702
            sid_data = data.get(sid)
703
            if not sid_data:
704
                continue
705
            if algo_dt != sid_data['dt']:
706
                continue
707
            for i, field in enumerate(self.fields):
708
                frame_data[i, j] = sid_data.get(field, np.nan)
709
710
        return pd.DataFrame(
711
            frame_data,
712
            index=self.fields.copy(),
713
            columns=self.sids.copy(),
714
        )
715
716
    def update(self, data, algo_dt):
717
        """
718
        Takes the bar at @algo_dt's @data, checks to see if we need to roll any
719
        new digests, then adds new data to the buffer panel.
720
        """
721
        frame = self.frame_from_bardata(data, algo_dt)
722
723
        self.update_last_known_values()
724
        self.update_digest_panels(algo_dt, self.buffer_panel)
725
        self.buffer_panel.add_frame(algo_dt, frame)
726
727
    def update_digest_panels(self, algo_dt, buffer_panel, freq_filter=None):
728
        """
729
        Check whether @algo_dt is greater than cur_window_close for any of our
730
        frequencies.  If so, roll a digest for that frequency using data drawn
731
        from @buffer panel and insert it into the appropriate digest panels.
732
733
        If @freq_filter is specified, only use the given data to update
734
        frequencies on which the filter returns True.
735
736
        This takes `buffer_panel` as an argument rather than using
737
        self.buffer_panel so that this method can be used to add supplemental
738
        data from an external source.
739
        """
740
        for frequency in filter(freq_filter, self.unique_frequencies):
741
742
            # We don't keep a digest panel if we only have a length-1 history
743
            # spec for a given frequency
744
            digest_panel = self.digest_panels.get(frequency, None)
745
746
            while algo_dt > self.cur_window_closes[frequency]:
747
748
                earliest_minute = self.cur_window_starts[frequency]
749
                latest_minute = self.cur_window_closes[frequency]
750
                minutes_to_process = self.buffer_panel_minutes(
751
                    buffer_panel,
752
                    earliest_minute=earliest_minute,
753
                    latest_minute=latest_minute,
754
                    raw=True
755
                )
756
757
                if digest_panel is not None:
758
                    # Create a digest from minutes_to_process and add it to
759
                    # digest_panel.
760
                    digest_frame = self.create_new_digest_frame(
761
                        minutes_to_process,
762
                        self.fields,
763
                        self.sids
764
                    )
765
                    digest_panel.add_frame(
766
                        latest_minute,
767
                        digest_frame,
768
                        self.fields,
769
                        self.sids
770
                    )
771
772
                # Update panel start/close for this frequency.
773
                self.cur_window_starts[frequency] = \
774
                    frequency.next_window_start(latest_minute)
775
                self.cur_window_closes[frequency] = \
776
                    frequency.window_close(self.cur_window_starts[frequency])
777
778
    def frame_to_series(self, field, frame, columns=None):
779
        """
780
        Convert a frame with a DatetimeIndex and sid columns into a series with
781
        a sid index, using the aggregator defined by the given field.
782
        """
783
        if isinstance(frame, pd.DataFrame):
784
            columns = frame.columns
785
            frame = frame.values
786
787
        if not len(frame):
788
            return pd.Series(
789
                data=(0 if field == 'volume' else np.nan),
790
                index=columns,
791
            ).values
792
793
        if field in ['price', 'close_price']:
794
            # shortcircuit for full last row
795
            vals = frame[-1]
796
            if np.all(~np.isnan(vals)):
797
                return vals
798
            return ffill(frame)[-1]
799
        elif field == 'open_price':
800
            return bfill(frame)[0]
801
        elif field == 'volume':
802
            return np.nansum(frame, axis=0)
803
        elif field == 'high':
804
            return np.nanmax(frame, axis=0)
805
        elif field == 'low':
806
            return np.nanmin(frame, axis=0)
807
        else:
808
            raise ValueError("Unknown field {}".format(field))
809
810
    def aggregate_ohlcv_panel(self,
811
                              fields,
812
                              ohlcv_panel,
813
                              items=None,
814
                              minor_axis=None):
815
        """
816
        Convert an OHLCV Panel into a DataFrame by aggregating each field's
817
        frame into a Series.
818
        """
819
        vals = ohlcv_panel
820
        if isinstance(ohlcv_panel, pd.Panel):
821
            vals = ohlcv_panel.values
822
            items = ohlcv_panel.items
823
            minor_axis = ohlcv_panel.minor_axis
824
825
        data = [
826
            self.frame_to_series(
827
                field,
828
                vals[items.get_loc(field)],
829
                minor_axis
830
            )
831
            for field in fields
832
        ]
833
        return np.array(data)
834
835
    def create_new_digest_frame(self, buffer_minutes, items=None,
836
                                minor_axis=None):
837
        """
838
        Package up minutes in @buffer_minutes into a single digest frame.
839
        """
840
        return self.aggregate_ohlcv_panel(
841
            self.fields,
842
            buffer_minutes,
843
            items=items,
844
            minor_axis=minor_axis
845
        )
846
847
    def update_last_known_values(self):
848
        """
849
        Store the non-NaN values from our oldest frame in each frequency.
850
        """
851
        ffillable = self.ffillable_fields
852
        if not len(ffillable):
853
            return
854
855
        for frequency in self.unique_frequencies:
856
            digest_panel = self.digest_panels.get(frequency, None)
857
            if digest_panel:
858
                oldest_known_values = digest_panel.oldest_frame(raw=True)
859
            else:
860
                oldest_known_values = self.buffer_panel.oldest_frame(raw=True)
861
862
            oldest_vals = oldest_known_values
863
            oldest_columns = self.fields
864
            for field in ffillable:
865
                f_idx = oldest_columns.get_loc(field)
866
                field_vals = oldest_vals[f_idx]
867
                # isnan would be fast, possible to use?
868
                non_nan_sids = np.where(pd.notnull(field_vals))
869
                key = (frequency.freq_str, field)
870
                key_loc = self.last_known_prior_values.index.get_loc(key)
871
                self.last_known_prior_values.values[
872
                    key_loc, non_nan_sids
873
                ] = field_vals[non_nan_sids]
874
875
    def get_history(self, history_spec, algo_dt):
876
        """
877
        Main API used by the algoscript is mapped to this function.
878
879
        Selects from the overarching history panel the values for the
880
        @history_spec at the given @algo_dt.
881
        """
882
        field = history_spec.field
883
        do_ffill = history_spec.ffill
884
885
        # Get our stored values from periods prior to the current period.
886
        digest_frame, index = self.digest_bars(history_spec, do_ffill)
887
888
        # Get minutes from our buffer panel to build the last row of the
889
        # returned frame.
890
        buffer_panel = self.buffer_panel_minutes(
891
            self.buffer_panel,
892
            earliest_minute=self.cur_window_starts[history_spec.frequency],
893
            raw=True
894
        )
895
        buffer_frame = buffer_panel[self.fields.get_loc(field)]
896
897
        if do_ffill:
898
            buffer_frame = ffill_buffer_from_prior_values(
899
                history_spec.frequency,
900
                field,
901
                buffer_frame,
902
                digest_frame,
903
                self.last_known_prior_values,
904
                raw=True
905
            )
906
        last_period = self.frame_to_series(field, buffer_frame, self.sids)
907
        return fast_build_history_output(digest_frame,
908
                                         last_period,
909
                                         algo_dt,
910
                                         index=index,
911
                                         columns=self.sids)
912
913
914
def fast_build_history_output(buffer_frame,
915
                              last_period,
916
                              algo_dt,
917
                              index=None,
918
                              columns=None):
919
    """
920
    Optimized concatenation of DataFrame and Series for use in
921
    HistoryContainer.get_history.
922
923
    Relies on the fact that the input arrays have compatible shapes.
924
    """
925
    buffer_values = buffer_frame
926
    if isinstance(buffer_frame, pd.DataFrame):
927
        buffer_values = buffer_frame.values
928
        index = buffer_frame.index
929
        columns = buffer_frame.columns
930
931
    return pd.DataFrame(
932
        data=np.vstack(
933
            [
934
                buffer_values,
935
                last_period,
936
            ]
937
        ),
938
        index=fast_append_date_to_index(
939
            index,
940
            pd.Timestamp(algo_dt)
941
        ),
942
        columns=columns,
943
    )
944
945
946
def fast_append_date_to_index(index, timestamp):
947
    """
948
    Append a timestamp to a DatetimeIndex.  DatetimeIndex.append does not
949
    appear to work.
950
    """
951
    return pd.DatetimeIndex(
952
        np.hstack(
953
            [
954
                index.values,
955
                [timestamp.asm8],
956
            ]
957
        ),
958
        tz='UTC',
959
    )
960