Completed
Pull Request — master (#940)
by Joe
01:28
created

zipline.pipeline.loaders.blaze.ffill_query_in_range()   A

Complexity

Conditions 2

Size

Total Lines 52

Duplication

Lines 0
Ratio 0 %
Metric Value
cc 2
dl 0
loc 52
rs 9.493

How to fix   Long Method   

Long Method

Small methods make your code easier to understand, in particular if combined with a good name. Besides, if your method is small, finding a good name is usually much easier.

For example, if you find yourself adding comments to a method's body, this is usually a good sign to extract the commented part to a new method, and use the comment as a starting point when coming up with a good name for this new method.

Commonly applied refactorings include:

1
"""Blaze integration with the Pipeline API.
2
3
For an overview of the blaze project, see blaze.pydata.org
4
5
The blaze loader for the Pipeline API is designed to allow us to load
6
data from arbitrary sources as long as we can execute the needed expressions
7
against the data with blaze.
8
9
Data Format
10
-----------
11
12
The blaze Pipeline API loader expects that data is formatted in a tabular way.
13
The only required column in your table is ``asof_date`` where this column
14
represents the date this data is referencing. For example, one might have a CSV
15
like:
16
17
asof_date,value
18
2014-01-06,0
19
2014-01-07,1
20
2014-01-08,2
21
22
This says that the value on 2014-01-01 was 0 and so on.
23
24
Optionally, we may provide a ``timestamp`` column to be used to represent
25
point in time data. This column tells us when the data was known, or became
26
available to for use. Using our same CSV, we could write this with a timestamp
27
like:
28
29
asof_date,timestamp,value
30
2014-01-06,2014-01-07,0
31
2014-01-07,2014-01-08,1
32
2014-01-08,2014-01-09,2
33
34
This says that the value was 0 on 2014-01-01; however, we did not learn this
35
until 2014-01-02. This is useful for avoiding look-ahead bias in your
36
pipelines. If this column does not exist, the ``asof_date`` column will be used
37
instead.
38
39
If your data references a particular asset, you can add a ``sid`` column to
40
your dataset to represent this. For example:
41
42
asof_date,value,sid
43
2014-01-06,0,10
44
2014-01-06,1,20
45
2014-01-07,1,10
46
2014-01-07,2,20
47
2014-01-08,2,10
48
2014-01-08,3,20
49
50
This says that on 2014-01-01, the asset with id 10 had a value of 0, and the
51
asset with id 20 had a value of 1.
52
53
54
One of the key features of the Pipeline API is the handling of adjustments and
55
restatements. Often our data will be amended after the fact and we would like
56
to trade on the newest information; however, we do not want to introduce this
57
knowledge to our model too early. The blaze loader handles this case by
58
accepting a second ``deltas`` expression that contains all of the restatements
59
in the original expression.
60
61
For example, let's use our table from above:
62
63
asof_date,value
64
2014-01-06,0
65
2014-01-07,1
66
2014-01-08,2
67
68
Imagine that on the fourth the vendor realized that the calculation was
69
incorrect and the value on the first was actually -1. Then, on the fifth, they
70
realized that the value for the third was actually 3. We can construct a
71
``deltas`` expression to pass to our blaze loader that has the same shape as
72
our baseline table but only contains these new values like:
73
74
asof_date,timestamp,value
75
2014-01-06,2014-01-09,-1
76
2014-01-08,2014-01-10,3
77
78
This shows that we learned on the fourth that the value on the first was
79
actually -1 and that we learned on the fifth that the value on the third was
80
actually 3. By pulling our data into these two tables and not silently updating
81
our original table we can run our pipelines using the information we would
82
have had on that day, and we can prevent lookahead bias in the pipelines.
83
84
Conversion from Blaze to the Pipeline API
85
-----------------------------------------
86
87
Now that our data is structured in the way that the blaze loader expects, we
88
are ready to convert our blaze expressions into Pipeline API objects.
89
90
This module (zipline.pipeline.loaders.blaze) exports a function called
91
``from_blaze`` which performs this mapping.
92
93
The expression that you are trying to convert must either be tabular or
94
array-like. This means the ``dshape`` must be like:
95
96
``Dim * {A: B}`` or ``Dim * A``.
97
98
This represents an expression of dimension 1 which may be fixed or variable,
99
whose measure is either some record or a scalar.
100
101
The record case defines the entire table with all of the columns, this maps the
102
blaze expression into a pipeline DataSet. This dataset will have a column for
103
each field of the record. Some datashape types cannot be coerced into Pipeline
104
API compatible types and in that case, a column cannot be constructed.
105
Currently any numeric type that may be promoted to a float64 is compatible with
106
the Pipeline API.
107
108
The scalar case defines a single column pulled out a table. For example, let
109
``expr = bz.symbol('s', 'var * {field: int32, asof_date: datetime}')``.
110
When we pass ``expr.field`` to ``from_blaze``, we will walk back up the
111
expression tree until we find the table that ``field`` is defined on. We will
112
then proceed with the record case to construct a dataset; however, before
113
returning the dataset we will pull out only the column that was passed in.
114
115
For full documentation, see ``help(from_blaze)`` or ``from_blaze?`` in IPython.
116
117
Using our Pipeline DataSets and Columns
118
---------------------------------------
119
120
Once we have mapped our blaze expressions into Pipeline API objects, we may
121
use them just like any other datasets or columns. For more information on how
122
to run a pipeline or using the Pipeline API, see:
123
www.quantopian.com/help#pipeline-api
124
"""
125
from __future__ import division, absolute_import
126
127
from abc import ABCMeta, abstractproperty
128
from collections import namedtuple, defaultdict
129
from copy import copy
130
from functools import partial
131
from itertools import count
132
import warnings
133
from weakref import WeakKeyDictionary
134
135
import blaze as bz
136
from datashape import (
137
    Date,
138
    DateTime,
139
    Option,
140
    float64,
141
    isrecord,
142
    isscalar,
143
    promote,
144
)
145
from odo import odo
146
import pandas as pd
147
from toolz import (
148
    complement,
149
    compose,
150
    concat,
151
    flip,
152
    groupby,
153
    identity,
154
    memoize,
155
)
156
import toolz.curried.operator as op
157
from six import with_metaclass, PY2, itervalues, iteritems
158
159
160
from zipline.pipeline.data.dataset import DataSet, Column
161
from zipline.lib.adjusted_array import AdjustedArray
162
from zipline.lib.adjustment import Float64Overwrite
163
from zipline.utils.enum import enum
164
from zipline.utils.input_validation import expect_element
165
from zipline.utils.numpy_utils import repeat_last_axis
166
167
168
AD_FIELD_NAME = 'asof_date'
169
TS_FIELD_NAME = 'timestamp'
170
SID_FIELD_NAME = 'sid'
171
valid_deltas_node_types = (
172
    bz.expr.Field,
173
    bz.expr.ReLabel,
174
    bz.expr.Symbol,
175
)
176
traversable_nodes = (
177
    bz.expr.Field,
178
    bz.expr.Label,
179
)
180
is_invalid_deltas_node = complement(flip(isinstance, valid_deltas_node_types))
181
getname = op.attrgetter('__name__')
182
183
184
class _ExprRepr(object):
185
    """Box for repring expressions with the str of the expression.
186
187
    Parameters
188
    ----------
189
    expr : Expr
190
        The expression to box for repring.
191
    """
192
    __slots__ = 'expr',
193
194
    def __init__(self, expr):
195
        self.expr = expr
196
197
    def __repr__(self):
198
        return str(self.expr)
199
    __str__ = __repr__
200
201
202
class ExprData(namedtuple('ExprData', 'expr deltas resources')):
203
    """A pair of expressions and data resources. The expresions will be
204
    computed using the resources as the starting scope.
205
206
    Parameters
207
    ----------
208
    expr : Expr
209
        The baseline values.
210
    deltas : Expr, optional
211
        The deltas for the data.
212
    resources : resource or dict of resources, optional
213
        The resources to compute the exprs against.
214
    """
215
    def __new__(cls, expr, deltas=None, resources=None):
216
        return super(ExprData, cls).__new__(cls, expr, deltas, resources)
217
218
    def __repr__(self):
219
        # If the expressions have _resources() then the repr will
220
        # drive computation so we box them.
221
        cls = type(self)
222
        return super(ExprData, cls).__repr__(cls(
223
            _ExprRepr(self.expr),
224
            _ExprRepr(self.deltas),
225
            self.resources,
226
        ))
227
228
229
class InvalidField(with_metaclass(ABCMeta)):
230
    """A field that raises an exception indicating that the
231
    field was invalid.
232
233
    Parameters
234
    ----------
235
    field : str
236
        The name of the field.
237
    type_ : dshape
238
        The shape of the field.
239
    """
240
    @abstractproperty
241
    def error_format(self):
242
        raise NotImplementedError('error_format')
243
244
    def __init__(self, field, type_):
245
        self._field = field
246
        self._type = type_
247
248
    def __get__(self, instance, owner):
249
        raise AttributeError(
250
            self.error_format.format(field=self._field, type_=self._type),
251
        )
252
253
254
class NonNumpyField(InvalidField):
255
    error_format = (
256
        "field '{field}' was a non numpy compatible type: '{type_}'"
257
    )
258
259
260
class NonPipelineField(InvalidField):
261
    error_format = (
262
        "field '{field}' was a non Pipeline API compatible type: '{type_}'"
263
    )
264
265
266
class NotPipelineCompatible(TypeError):
267
    """Exception used to indicate that a dshape is not Pipeline API
268
    compatible.
269
    """
270
    def __str__(self):
271
        return "'%s' is a non Pipeline API compatible type'" % self.args
272
273
274
_new_names = ('BlazeDataSet_%d' % n for n in count())
275
276
277
@memoize
278
def new_dataset(expr, deltas):
279
    """Creates or returns a dataset from a pair of blaze expressions.
280
281
    Parameters
282
    ----------
283
    expr : Expr
284
       The blaze expression representing the first known values.
285
    deltas : Expr
286
       The blaze expression representing the deltas to the data.
287
288
    Returns
289
    -------
290
    ds : type
291
        A new dataset type.
292
293
    Notes
294
    -----
295
    This function is memoized. repeated calls with the same inputs will return
296
    the same type.
297
    """
298
    columns = {}
299
    for name, type_ in expr.dshape.measure.fields:
300
        try:
301
            if promote(type_, float64, promote_option=False) != float64:
302
                raise NotPipelineCompatible()
303
            if isinstance(type_, Option):
304
                type_ = type_.ty
305
        except NotPipelineCompatible:
306
            col = NonPipelineField(name, type_)
307
        except TypeError:
308
            col = NonNumpyField(name, type_)
309
        else:
310
            col = Column(type_.to_numpy_dtype())
311
312
        columns[name] = col
313
314
    name = expr._name
315
    if name is None:
316
        name = next(_new_names)
317
318
    # unicode is a name error in py3 but the branch is only hit
319
    # when we are in python 2.
320
    if PY2 and isinstance(name, unicode):  # noqa
321
        name = name.encode('utf-8')
322
323
    return type(name, (DataSet,), columns)
324
325
326
def _check_resources(name, expr, resources):
327
    """Validate that the expression and resources passed match up.
328
329
    Parameters
330
    ----------
331
    name : str
332
        The name of the argument we are checking.
333
    expr : Expr
334
        The potentially bound expr.
335
    resources
336
        The explicitly passed resources to compute expr.
337
338
    Raises
339
    ------
340
    ValueError
341
        If the resources do not match for an expression.
342
    """
343
    if expr is None:
344
        return
345
    bound = expr._resources()
346
    if not bound and resources is None:
347
        raise ValueError('no resources provided to compute %s' % name)
348
    if bound and resources:
349
        raise ValueError(
350
            'explicit and implicit resources provided to compute %s' % name,
351
        )
352
353
354
def _check_datetime_field(name, measure):
355
    """Check that a field is a datetime inside some measure.
356
357
    Parameters
358
    ----------
359
    name : str
360
        The name of the field to check.
361
    measure : Record
362
        The record to check the field of.
363
364
    Raises
365
    ------
366
    TypeError
367
        If the field is not a datetime inside ``measure``.
368
    """
369
    if not isinstance(measure[name], (Date, DateTime)):
370
        raise TypeError(
371
            "'{name}' field must be a '{dt}', not: '{dshape}'".format(
372
                name=name,
373
                dt=DateTime(),
374
                dshape=measure[name],
375
            ),
376
        )
377
378
379
class NoDeltasWarning(UserWarning):
380
    """Warning used to signal that no deltas could be found and none
381
    were provided.
382
383
    Parameters
384
    ----------
385
    expr : Expr
386
        The expression that was searched.
387
    """
388
    def __init__(self, expr):
389
        self._expr = expr
390
391
    def __str__(self):
392
        return 'No deltas could be inferred from expr: %s' % self._expr
393
394
395
no_deltas_rules = enum('warn', 'raise_', 'ignore')
396
397
398
def get_deltas(expr, deltas, no_deltas_rule):
399
    """Find the correct deltas for the expression.
400
401
    Parameters
402
    ----------
403
    expr : Expr
404
        The baseline expression.
405
    deltas : Expr, 'auto', or None
406
        The deltas argument. If this is 'auto', then the deltas table will
407
        be searched for by walking up the expression tree. If this cannot be
408
        reflected, then an action will be taken based on the
409
        ``no_deltas_rule``.
410
    no_deltas_rule : no_deltas_rule
411
        How to handle the case where deltas='auto' but no deltas could be
412
        found.
413
414
    Returns
415
    -------
416
    deltas : Expr or None
417
        The deltas table to use.
418
    """
419
    if isinstance(deltas, bz.Expr) or deltas != 'auto':
420
        return deltas
421
422
    try:
423
        return expr._child[(expr._name or '') + '_deltas']
424
    except (ValueError, AttributeError):
425
        if no_deltas_rule == no_deltas_rules.raise_:
426
            raise ValueError(
427
                "no deltas table could be reflected for %s" % expr
428
            )
429
        elif no_deltas_rule == no_deltas_rules.warn:
430
            warnings.warn(NoDeltasWarning(expr))
431
    return None
432
433
434
def _ensure_timestamp_field(dataset_expr, deltas):
435
    """Verify that the baseline and deltas expressions have a timestamp field.
436
437
    If there is not a ``TS_FIELD_NAME`` on either of the expressions, it will
438
    be copied from the ``AD_FIELD_NAME``. If one is provided, then we will
439
    verify that it is the correct dshape.
440
441
    Parameters
442
    ----------
443
    dataset_expr : Expr
444
        The baseline expression.
445
    deltas : Expr or None
446
        The deltas expression if any was provided.
447
448
    Returns
449
    -------
450
    dataset_expr, deltas : Expr
451
        The new baseline and deltas expressions to use.
452
    """
453
    measure = dataset_expr.dshape.measure
454
    if TS_FIELD_NAME not in measure.names:
455
        dataset_expr = bz.transform(
456
            dataset_expr,
457
            **{TS_FIELD_NAME: dataset_expr[AD_FIELD_NAME]}
458
        )
459
        if deltas is not None:
460
            deltas = bz.transform(
461
                deltas,
462
                **{TS_FIELD_NAME: deltas[AD_FIELD_NAME]}
463
            )
464
    else:
465
        _check_datetime_field(TS_FIELD_NAME, measure)
466
467
    return dataset_expr, deltas
468
469
470
@expect_element(no_deltas_rule=no_deltas_rules)
471
def from_blaze(expr,
472
               deltas='auto',
473
               loader=None,
474
               resources=None,
475
               no_deltas_rule=no_deltas_rules.warn):
476
    """Create a Pipeline API object from a blaze expression.
477
478
    Parameters
479
    ----------
480
    expr : Expr
481
        The blaze expression to use.
482
    deltas : Expr or 'auto', optional
483
        The expression to use for the point in time adjustments.
484
        If the string 'auto' is passed, a deltas expr will be looked up
485
        by stepping up the expression tree and looking for another field
486
        with the name of ``expr`` + '_deltas'. If None is passed, no deltas
487
        will be used.
488
    loader : BlazeLoader, optional
489
        The blaze loader to attach this pipeline dataset to. If None is passed,
490
        the global blaze loader is used.
491
    resources : dict or any, optional
492
        The data to execute the blaze expressions against. This is used as the
493
        scope for ``bz.compute``.
494
    no_deltas_rule : no_deltas_rule
495
        What should happen if ``deltas='auto'`` but no deltas can be found.
496
        'warn' says to raise a warning but continue.
497
        'raise' says to raise an exception if no deltas can be found.
498
        'ignore' says take no action and proceed with no deltas.
499
500
    Returns
501
    -------
502
    pipeline_api_obj : DataSet or BoundColumn
503
        Either a new dataset or bound column based on the shape of the expr
504
        passed in. If a table shaped expression is passed, this will return
505
        a ``DataSet`` that represents the whole table. If an array-like shape
506
        is passed, a ``BoundColumn`` on the dataset that would be constructed
507
        from passing the parent is returned.
508
    """
509
    deltas = get_deltas(expr, deltas, no_deltas_rule)
510
    if deltas is not None:
511
        invalid_nodes = tuple(filter(is_invalid_deltas_node, expr._subterms()))
512
        if invalid_nodes:
513
            raise TypeError(
514
                'expression with deltas may only contain (%s) nodes,'
515
                " found: %s" % (
516
                    ', '.join(map(getname, valid_deltas_node_types)),
517
                    ', '.join(set(map(compose(getname, type), invalid_nodes))),
518
                ),
519
            )
520
521
    # Check if this is a single column out of a dataset.
522
    if bz.ndim(expr) != 1:
523
        raise TypeError(
524
            'expression was not tabular or array-like,'
525
            ' %s dimensions: %d' % (
526
                'too many' if bz.ndim(expr) > 1 else 'not enough',
527
                bz.ndim(expr),
528
            ),
529
        )
530
531
    single_column = None
532
    if isscalar(expr.dshape.measure):
533
        # This is a single column. Record which column we are to return
534
        # but create the entire dataset.
535
        single_column = rename = expr._name
536
        field_hit = False
537
        if not isinstance(expr, traversable_nodes):
538
            raise TypeError(
539
                "expression '%s' was array-like but not a simple field of"
540
                " some larger table" % str(expr),
541
            )
542
        while isinstance(expr, traversable_nodes):
543
            if isinstance(expr, bz.expr.Field):
544
                if not field_hit:
545
                    field_hit = True
546
                else:
547
                    break
548
            rename = expr._name
549
            expr = expr._child
550
        dataset_expr = expr.relabel({rename: single_column})
551
    else:
552
        dataset_expr = expr
553
554
    measure = dataset_expr.dshape.measure
555
    if not isrecord(measure) or AD_FIELD_NAME not in measure.names:
556
        raise TypeError(
557
            "The dataset must be a collection of records with at least an"
558
            " '{ad}' field. Fields provided: '{fields}'\nhint: maybe you need"
559
            " to use `relabel` to change your field names".format(
560
                ad=AD_FIELD_NAME,
561
                fields=measure,
562
            ),
563
        )
564
    _check_datetime_field(AD_FIELD_NAME, measure)
565
    dataset_expr, deltas = _ensure_timestamp_field(dataset_expr, deltas)
566
567
    if deltas is not None and (sorted(deltas.dshape.measure.fields) !=
568
                               sorted(measure.fields)):
569
        raise TypeError(
570
            'baseline measure != deltas measure:\n%s != %s' % (
571
                measure,
572
                deltas.dshape.measure,
573
            ),
574
        )
575
576
    # Ensure that we have a data resource to execute the query against.
577
    _check_resources('dataset_expr', dataset_expr, resources)
578
    _check_resources('deltas', deltas, resources)
579
580
    # Create or retrieve the Pipeline API dataset.
581
    ds = new_dataset(dataset_expr, deltas)
582
    # Register our new dataset with the loader.
583
    (loader if loader is not None else global_loader)[ds] = ExprData(
584
        dataset_expr,
585
        deltas,
586
        resources,
587
    )
588
    if single_column is not None:
589
        # We were passed a single column, extract and return it.
590
        return getattr(ds, single_column)
591
    return ds
592
593
594
getdataset = op.attrgetter('dataset')
595
dataset_name = op.attrgetter('name')
596
597
598
def overwrite_novel_deltas(baseline, deltas, dates):
599
    """overwrite any deltas into the baseline set that would have changed our
600
    most recently known value.
601
602
    Parameters
603
    ----------
604
    baseline : pd.DataFrame
605
        The first known values.
606
    deltas : pd.DataFrame
607
        Overwrites to the baseline data.
608
    dates : pd.DatetimeIndex
609
        The dates requested by the loader.
610
611
    Returns
612
    -------
613
    non_novel_deltas : pd.DataFrame
614
        The deltas that do not represent a baseline value.
615
    """
616
    get_indexes = dates.searchsorted
617
    novel_idx = (
618
        get_indexes(deltas[TS_FIELD_NAME].values, 'right') -
619
        get_indexes(deltas[AD_FIELD_NAME].values, 'left')
620
    ) <= 1
621
    novel_deltas = deltas.loc[novel_idx]
622
    non_novel_deltas = deltas.loc[~novel_idx]
623
    return pd.concat(
624
        (baseline, novel_deltas),
625
        ignore_index=True,
626
    ).sort(TS_FIELD_NAME), non_novel_deltas
627
628
629
def overwrite_from_dates(asof, dense_dates, sparse_dates, asset_idx, value):
630
    """Construct a `Float64Overwrite` with the correct
631
    start and end date based on the asof date of the delta,
632
    the dense_dates, and the dense_dates.
633
634
    Parameters
635
    ----------
636
    asof : datetime
637
        The asof date of the delta.
638
    dense_dates : pd.DatetimeIndex
639
        The dates requested by the loader.
640
    sparse_dates : pd.DatetimeIndex
641
        The dates that appeared in the dataset.
642
    asset_idx : tuple of int
643
        The index of the asset in the block. If this is a tuple, then this
644
        is treated as the first and last index to use.
645
    value : np.float64
646
        The value to overwrite with.
647
648
    Returns
649
    -------
650
    overwrite : Float64Overwrite
651
        The overwrite that will apply the new value to the data.
652
653
    Notes
654
    -----
655
    This is forward-filling all dense dates that are between the asof_date date
656
    and the next sparse date after the asof_date.
657
658
    For example:
659
    let ``asof = pd.Timestamp('2014-01-02')``,
660
        ``dense_dates = pd.date_range('2014-01-01', '2014-01-05')``
661
        ``sparse_dates = pd.to_datetime(['2014-01', '2014-02', '2014-04'])``
662
663
    Then the overwrite will apply to indexes: 1, 2, 3, 4
664
    """
665
    first_row = dense_dates.searchsorted(asof)
666
    next_idx = sparse_dates.searchsorted(asof, 'right')
667
    if next_idx == len(sparse_dates):
668
        # There is no next date in the sparse, this overwrite should apply
669
        # through the end of the dense dates.
670
        last_row = len(dense_dates) - 1
671
    else:
672
        # There is a next date in sparse dates. This means that the overwrite
673
        # should only apply until the index of this date in the dense dates.
674
        last_row = dense_dates.searchsorted(sparse_dates[next_idx]) - 1
675
676
    if first_row > last_row:
677
        return
678
679
    first, last = asset_idx
680
    yield Float64Overwrite(first_row, last_row, first, last, value)
681
682
683
def adjustments_from_deltas_no_sids(dates,
684
                                    dense_dates,
685
                                    column_idx,
686
                                    column_name,
687
                                    assets,
688
                                    deltas):
689
    """Collect all the adjustments that occur in a dataset that does not
690
    have a sid column.
691
692
    Parameters
693
    ----------
694
    dates : pd.DatetimeIndex
695
        The dates requested by the loader.
696
    dense_dates : pd.DatetimeIndex
697
        The dates that were in the dense data.
698
    column_idx : int
699
        The index of the column in the dataset.
700
    column_name : str
701
        The name of the column to compute deltas for.
702
    deltas : pd.DataFrame
703
        The overwrites that should be applied to the dataset.
704
705
    Returns
706
    -------
707
    adjustments : dict[idx -> Float64Overwrite]
708
        The adjustments dictionary to feed to the adjusted array.
709
    """
710
    ad_series = deltas[AD_FIELD_NAME]
711
    asset_idx = 0, len(assets) - 1
712
    return {
713
        dates.get_loc(kd): overwrite_from_dates(
714
            ad_series.loc[kd],
715
            dates,
716
            dense_dates,
717
            asset_idx,
718
            v,
719
        ) for kd, v in deltas[column_name].iteritems()
720
    }
721
722
723
def adjustments_from_deltas_with_sids(dates,
724
                                      dense_dates,
725
                                      column_idx,
726
                                      column_name,
727
                                      assets,
728
                                      deltas):
729
    """Collect all the adjustments that occur in a dataset that does not
730
    have a sid column.
731
732
    Parameters
733
    ----------
734
    dates : pd.DatetimeIndex
735
        The dates requested by the loader.
736
    dense_dates : pd.DatetimeIndex
737
        The dates that were in the dense data.
738
    column_idx : int
739
        The index of the column in the dataset.
740
    column_name : str
741
        The name of the column to compute deltas for.
742
    deltas : pd.DataFrame
743
        The overwrites that should be applied to the dataset.
744
745
    Returns
746
    -------
747
    adjustments : dict[idx -> Float64Overwrite]
748
        The adjustments dictionary to feed to the adjusted array.
749
    """
750
    ad_series = deltas[AD_FIELD_NAME]
751
    adjustments = defaultdict(list)
752
    for sid_idx, (sid, per_sid) in enumerate(deltas[column_name].iteritems()):
753
        for kd, v in per_sid.iteritems():
754
            adjustments[dates.searchsorted(kd)].extend(
755
                overwrite_from_dates(
756
                    ad_series.loc[kd, sid],
757
                    dates,
758
                    dense_dates,
759
                    (sid_idx, sid_idx),
760
                    v,
761
                ),
762
            )
763
    return dict(adjustments)  # no subclasses of dict
764
765
766
class BlazeLoader(dict):
767
    def __init__(self, colmap=None):
768
        self.update(colmap or {})
769
770
    @classmethod
771
    @memoize(cache=WeakKeyDictionary())
772
    def global_instance(cls):
773
        return cls()
774
775
    def __hash__(self):
776
        return id(self)
777
778
    def __call__(self, column):
779
        if column.dataset in self:
780
            return self
781
        raise KeyError(column)
782
783
    def load_adjusted_array(self, columns, dates, assets, mask):
784
        return dict(
785
            concat(map(
786
                partial(self._load_dataset, dates, assets, mask),
787
                itervalues(groupby(getdataset, columns))
788
            ))
789
        )
790
791
    def _load_dataset(self, dates, assets, mask, columns):
792
        try:
793
            (dataset,) = set(map(getdataset, columns))
794
        except ValueError:
795
            raise AssertionError('all columns must come from the same dataset')
796
797
        expr, deltas, resources = self[dataset]
798
        have_sids = SID_FIELD_NAME in expr.fields
799
        assets = list(map(int, assets))  # coerce from numpy.int64
800
        fields = list(map(dataset_name, columns))
801
        query_fields = fields + [AD_FIELD_NAME, TS_FIELD_NAME] + (
802
            [SID_FIELD_NAME] if have_sids else []
803
        )
804
805
        def where(e):
806
            """Create the query to run against the resources.
807
808
            Parameters
809
            ----------
810
            e : Expr
811
                The baseline or deltas expression.
812
813
            Returns
814
            -------
815
            q : Expr
816
                The query to run.
817
            """
818
            ts = e[TS_FIELD_NAME]
819
            # Hack to get the lower bound to query:
820
            # This must be strictly executed because the data for `ts` will
821
            # be removed from scope too early otherwise.
822
            lower = odo(ts[ts <= dates[0]].max(), pd.Timestamp)
823
            selection = ts <= dates[-1]
824
            if have_sids:
825
                selection &= e[SID_FIELD_NAME].isin(assets)
826
            if lower is not pd.NaT:
827
                selection &= ts >= lower
828
829
            return e[selection][query_fields]
830
831
        extra_kwargs = {'d': resources} if resources else {}
832
        materialized_expr = odo(where(expr), pd.DataFrame, **extra_kwargs)
833
        materialized_deltas = (
834
            odo(where(deltas), pd.DataFrame, **extra_kwargs)
835
            if deltas is not None else
836
            pd.DataFrame(columns=query_fields)
837
        )
838
839
        # Inline the deltas that changed our most recently known value.
840
        # Also, we reindex by the dates to create a dense representation of
841
        # the data.
842
        sparse_output, non_novel_deltas = overwrite_novel_deltas(
843
            materialized_expr,
844
            materialized_deltas,
845
            dates,
846
        )
847
        sparse_output.drop(AD_FIELD_NAME, axis=1, inplace=True)
848
849
        if have_sids:
850
            # Unstack by the sid so that we get a multi-index on the columns
851
            # of datacolumn, sid.
852
            sparse_output = sparse_output.set_index(
853
                [TS_FIELD_NAME, SID_FIELD_NAME],
854
            ).unstack()
855
            sparse_deltas = non_novel_deltas.set_index(
856
                [TS_FIELD_NAME, SID_FIELD_NAME],
857
            ).unstack()
858
859
            dense_output = sparse_output.reindex(dates, method='ffill')
860
            cols = dense_output.columns
861
            dense_output = dense_output.reindex(
862
                columns=pd.MultiIndex.from_product(
863
                    (cols.levels[0], assets),
864
                    names=cols.names,
865
                ),
866
            )
867
868
            adjustments_from_deltas = adjustments_from_deltas_with_sids
869
            column_view = identity
870
        else:
871
            # We use the column view to make an array per asset.
872
            column_view = compose(
873
                # We need to copy this because we need a concrete ndarray.
874
                # The `repeat_last_axis` call will give us a fancy strided
875
                # array which uses a buffer to represent `len(assets)` columns.
876
                # The engine puts nans at the indicies for which we do not have
877
                # sid information so that the nan-aware reductions still work.
878
                # A future change to the engine would be to add first class
879
                # support for macro econimic datasets.
880
                copy,
881
                partial(repeat_last_axis, count=len(assets)),
882
            )
883
            sparse_output = sparse_output.set_index(TS_FIELD_NAME)
884
            dense_output = sparse_output.reindex(dates, method='ffill')
885
            sparse_deltas = non_novel_deltas.set_index(TS_FIELD_NAME)
886
            adjustments_from_deltas = adjustments_from_deltas_no_sids
887
888
        for column_idx, column in enumerate(columns):
889
            column_name = column.name
890
            yield column, AdjustedArray(
891
                column_view(
892
                    dense_output[column_name].values.astype(column.dtype),
893
                ),
894
                mask,
895
                adjustments_from_deltas(
896
                    dates,
897
                    sparse_output.index,
898
                    column_idx,
899
                    column_name,
900
                    assets,
901
                    sparse_deltas,
902
                )
903
            )
904
905
global_loader = BlazeLoader.global_instance()
906
907
908
def bind_expression_to_resources(expr, resources):
909
    """
910
    Bind a Blaze expression to resources.
911
912
    Parameters
913
    ----------
914
    expr : bz.Expr
915
        The expression to which we want to bind resources.
916
    resources : dict[bz.Symbol -> any]
917
        Mapping from the atomic terms of ``expr`` to actual data resources.
918
919
    Returns
920
    -------
921
    bound_expr : bz.Expr
922
        ``expr`` with bound resources.
923
    """
924
    # bind the resources into the expression
925
    if resources is None:
926
        resources = {}
927
928
    # _subs stands for substitute.  It's not actually private, blaze just
929
    # prefixes symbol-manipulation methods with underscores to prevent
930
    # collisions with data column names.
931
    return expr._subs({
932
        k: bz.Data(v, dshape=k.dshape) for k, v in iteritems(resources)
933
    })
934
935
936
def ffill_query_in_range(expr,
937
                         lower,
938
                         upper,
939
                         odo_kwargs=None,
940
                         ts_field=TS_FIELD_NAME,
941
                         sid_field=SID_FIELD_NAME):
942
    """Query a blaze expression in a given time range properly forward filling
943
    from values that fall before the lower date.
944
945
    Parameters
946
    ----------
947
    expr : Expr
948
        Bound blaze expression.
949
    lower : datetime
950
        The lower date to query for.
951
    upper : datetime
952
        The upper date to query for.
953
    odo_kwargs : dict, optional
954
        The extra keyword arguments to pass to ``odo``.
955
    ts_field : str, optional
956
        The name of the timestamp field in the given blaze expression.
957
    sid_field : str, optional
958
        The name of the sid field in the given blaze expression.
959
960
    Returns
961
    -------
962
    raw : pd.DataFrame
963
        A strict dataframe for the data in the given date range. This may
964
        start before the requested start date if a value is needed to ffill.
965
    """
966
    odo_kwargs = odo_kwargs or {}
967
    filtered = expr[expr[ts_field] <= lower]
968
    computed_lower = odo(
969
        bz.by(
970
            filtered[sid_field],
971
            timestamp=filtered[ts_field].max(),
972
        ).timestamp.min(),
973
        pd.Timestamp,
974
        **odo_kwargs
975
    )
976
    if pd.isnull(computed_lower):
977
        # If there is no lower date, just query for data in the date
978
        # range. It must all be null anyways.
979
        computed_lower = lower
980
981
    return odo(
982
        expr[
983
            (expr[ts_field] >= computed_lower) &
984
            (expr[ts_field] <= upper)
985
        ],
986
        pd.DataFrame,
987
        **odo_kwargs
988
    )
989