|
1
|
|
|
"""Blaze integration with the Pipeline API. |
|
2
|
|
|
|
|
3
|
|
|
For an overview of the blaze project, see blaze.pydata.org |
|
4
|
|
|
|
|
5
|
|
|
The blaze loader for the Pipeline API is designed to allow us to load |
|
6
|
|
|
data from arbitrary sources as long as we can execute the needed expressions |
|
7
|
|
|
against the data with blaze. |
|
8
|
|
|
|
|
9
|
|
|
Data Format |
|
10
|
|
|
----------- |
|
11
|
|
|
|
|
12
|
|
|
The blaze Pipeline API loader expects that data is formatted in a tabular way. |
|
13
|
|
|
The only required column in your table is ``asof_date`` where this column |
|
14
|
|
|
represents the date this data is referencing. For example, one might have a CSV |
|
15
|
|
|
like: |
|
16
|
|
|
|
|
17
|
|
|
asof_date,value |
|
18
|
|
|
2014-01-06,0 |
|
19
|
|
|
2014-01-07,1 |
|
20
|
|
|
2014-01-08,2 |
|
21
|
|
|
|
|
22
|
|
|
This says that the value on 2014-01-01 was 0 and so on. |
|
23
|
|
|
|
|
24
|
|
|
Optionally, we may provide a ``timestamp`` column to be used to represent |
|
25
|
|
|
point in time data. This column tells us when the data was known, or became |
|
26
|
|
|
available to for use. Using our same CSV, we could write this with a timestamp |
|
27
|
|
|
like: |
|
28
|
|
|
|
|
29
|
|
|
asof_date,timestamp,value |
|
30
|
|
|
2014-01-06,2014-01-07,0 |
|
31
|
|
|
2014-01-07,2014-01-08,1 |
|
32
|
|
|
2014-01-08,2014-01-09,2 |
|
33
|
|
|
|
|
34
|
|
|
This says that the value was 0 on 2014-01-01; however, we did not learn this |
|
35
|
|
|
until 2014-01-02. This is useful for avoiding look-ahead bias in your |
|
36
|
|
|
pipelines. If this column does not exist, the ``asof_date`` column will be used |
|
37
|
|
|
instead. |
|
38
|
|
|
|
|
39
|
|
|
If your data references a particular asset, you can add a ``sid`` column to |
|
40
|
|
|
your dataset to represent this. For example: |
|
41
|
|
|
|
|
42
|
|
|
asof_date,value,sid |
|
43
|
|
|
2014-01-06,0,10 |
|
44
|
|
|
2014-01-06,1,20 |
|
45
|
|
|
2014-01-07,1,10 |
|
46
|
|
|
2014-01-07,2,20 |
|
47
|
|
|
2014-01-08,2,10 |
|
48
|
|
|
2014-01-08,3,20 |
|
49
|
|
|
|
|
50
|
|
|
This says that on 2014-01-01, the asset with id 10 had a value of 0, and the |
|
51
|
|
|
asset with id 20 had a value of 1. |
|
52
|
|
|
|
|
53
|
|
|
|
|
54
|
|
|
One of the key features of the Pipeline API is the handling of adjustments and |
|
55
|
|
|
restatements. Often our data will be amended after the fact and we would like |
|
56
|
|
|
to trade on the newest information; however, we do not want to introduce this |
|
57
|
|
|
knowledge to our model too early. The blaze loader handles this case by |
|
58
|
|
|
accepting a second ``deltas`` expression that contains all of the restatements |
|
59
|
|
|
in the original expression. |
|
60
|
|
|
|
|
61
|
|
|
For example, let's use our table from above: |
|
62
|
|
|
|
|
63
|
|
|
asof_date,value |
|
64
|
|
|
2014-01-06,0 |
|
65
|
|
|
2014-01-07,1 |
|
66
|
|
|
2014-01-08,2 |
|
67
|
|
|
|
|
68
|
|
|
Imagine that on the fourth the vendor realized that the calculation was |
|
69
|
|
|
incorrect and the value on the first was actually -1. Then, on the fifth, they |
|
70
|
|
|
realized that the value for the third was actually 3. We can construct a |
|
71
|
|
|
``deltas`` expression to pass to our blaze loader that has the same shape as |
|
72
|
|
|
our baseline table but only contains these new values like: |
|
73
|
|
|
|
|
74
|
|
|
asof_date,timestamp,value |
|
75
|
|
|
2014-01-06,2014-01-09,-1 |
|
76
|
|
|
2014-01-08,2014-01-10,3 |
|
77
|
|
|
|
|
78
|
|
|
This shows that we learned on the fourth that the value on the first was |
|
79
|
|
|
actually -1 and that we learned on the fifth that the value on the third was |
|
80
|
|
|
actually 3. By pulling our data into these two tables and not silently updating |
|
81
|
|
|
our original table we can run our pipelines using the information we would |
|
82
|
|
|
have had on that day, and we can prevent lookahead bias in the pipelines. |
|
83
|
|
|
|
|
84
|
|
|
Conversion from Blaze to the Pipeline API |
|
85
|
|
|
----------------------------------------- |
|
86
|
|
|
|
|
87
|
|
|
Now that our data is structured in the way that the blaze loader expects, we |
|
88
|
|
|
are ready to convert our blaze expressions into Pipeline API objects. |
|
89
|
|
|
|
|
90
|
|
|
This module (zipline.pipeline.loaders.blaze) exports a function called |
|
91
|
|
|
``from_blaze`` which performs this mapping. |
|
92
|
|
|
|
|
93
|
|
|
The expression that you are trying to convert must either be tabular or |
|
94
|
|
|
array-like. This means the ``dshape`` must be like: |
|
95
|
|
|
|
|
96
|
|
|
``Dim * {A: B}`` or ``Dim * A``. |
|
97
|
|
|
|
|
98
|
|
|
This represents an expression of dimension 1 which may be fixed or variable, |
|
99
|
|
|
whose measure is either some record or a scalar. |
|
100
|
|
|
|
|
101
|
|
|
The record case defines the entire table with all of the columns, this maps the |
|
102
|
|
|
blaze expression into a pipeline DataSet. This dataset will have a column for |
|
103
|
|
|
each field of the record. Some datashape types cannot be coerced into Pipeline |
|
104
|
|
|
API compatible types and in that case, a column cannot be constructed. |
|
105
|
|
|
Currently any numeric type that may be promoted to a float64 is compatible with |
|
106
|
|
|
the Pipeline API. |
|
107
|
|
|
|
|
108
|
|
|
The scalar case defines a single column pulled out a table. For example, let |
|
109
|
|
|
``expr = bz.symbol('s', 'var * {field: int32, asof_date: datetime}')``. |
|
110
|
|
|
When we pass ``expr.field`` to ``from_blaze``, we will walk back up the |
|
111
|
|
|
expression tree until we find the table that ``field`` is defined on. We will |
|
112
|
|
|
then proceed with the record case to construct a dataset; however, before |
|
113
|
|
|
returning the dataset we will pull out only the column that was passed in. |
|
114
|
|
|
|
|
115
|
|
|
For full documentation, see ``help(from_blaze)`` or ``from_blaze?`` in IPython. |
|
116
|
|
|
|
|
117
|
|
|
Using our Pipeline DataSets and Columns |
|
118
|
|
|
--------------------------------------- |
|
119
|
|
|
|
|
120
|
|
|
Once we have mapped our blaze expressions into Pipeline API objects, we may |
|
121
|
|
|
use them just like any other datasets or columns. For more information on how |
|
122
|
|
|
to run a pipeline or using the Pipeline API, see: |
|
123
|
|
|
www.quantopian.com/help#pipeline-api |
|
124
|
|
|
""" |
|
125
|
|
|
from __future__ import division, absolute_import |
|
126
|
|
|
|
|
127
|
|
|
from abc import ABCMeta, abstractproperty |
|
128
|
|
|
from collections import namedtuple, defaultdict |
|
129
|
|
|
from copy import copy |
|
130
|
|
|
from functools import partial |
|
131
|
|
|
from itertools import count |
|
132
|
|
|
import warnings |
|
133
|
|
|
from weakref import WeakKeyDictionary |
|
134
|
|
|
|
|
135
|
|
|
import blaze as bz |
|
136
|
|
|
from datashape import ( |
|
137
|
|
|
Date, |
|
138
|
|
|
DateTime, |
|
139
|
|
|
Option, |
|
140
|
|
|
float64, |
|
141
|
|
|
isrecord, |
|
142
|
|
|
isscalar, |
|
143
|
|
|
promote, |
|
144
|
|
|
) |
|
145
|
|
|
from odo import odo |
|
146
|
|
|
import pandas as pd |
|
147
|
|
|
from toolz import ( |
|
148
|
|
|
complement, |
|
149
|
|
|
compose, |
|
150
|
|
|
concat, |
|
151
|
|
|
flip, |
|
152
|
|
|
groupby, |
|
153
|
|
|
identity, |
|
154
|
|
|
memoize, |
|
155
|
|
|
) |
|
156
|
|
|
import toolz.curried.operator as op |
|
157
|
|
|
from six import with_metaclass, PY2, itervalues, iteritems |
|
158
|
|
|
|
|
159
|
|
|
|
|
160
|
|
|
from zipline.pipeline.data.dataset import DataSet, Column |
|
161
|
|
|
from zipline.lib.adjusted_array import AdjustedArray |
|
162
|
|
|
from zipline.lib.adjustment import Float64Overwrite |
|
163
|
|
|
from zipline.utils.enum import enum |
|
164
|
|
|
from zipline.utils.input_validation import expect_element |
|
165
|
|
|
from zipline.utils.numpy_utils import repeat_last_axis |
|
166
|
|
|
|
|
167
|
|
|
|
|
168
|
|
|
AD_FIELD_NAME = 'asof_date' |
|
169
|
|
|
TS_FIELD_NAME = 'timestamp' |
|
170
|
|
|
SID_FIELD_NAME = 'sid' |
|
171
|
|
|
valid_deltas_node_types = ( |
|
172
|
|
|
bz.expr.Field, |
|
173
|
|
|
bz.expr.ReLabel, |
|
174
|
|
|
bz.expr.Symbol, |
|
175
|
|
|
) |
|
176
|
|
|
traversable_nodes = ( |
|
177
|
|
|
bz.expr.Field, |
|
178
|
|
|
bz.expr.Label, |
|
179
|
|
|
) |
|
180
|
|
|
is_invalid_deltas_node = complement(flip(isinstance, valid_deltas_node_types)) |
|
181
|
|
|
getname = op.attrgetter('__name__') |
|
182
|
|
|
|
|
183
|
|
|
|
|
184
|
|
|
class _ExprRepr(object): |
|
185
|
|
|
"""Box for repring expressions with the str of the expression. |
|
186
|
|
|
|
|
187
|
|
|
Parameters |
|
188
|
|
|
---------- |
|
189
|
|
|
expr : Expr |
|
190
|
|
|
The expression to box for repring. |
|
191
|
|
|
""" |
|
192
|
|
|
__slots__ = 'expr', |
|
193
|
|
|
|
|
194
|
|
|
def __init__(self, expr): |
|
195
|
|
|
self.expr = expr |
|
196
|
|
|
|
|
197
|
|
|
def __repr__(self): |
|
198
|
|
|
return str(self.expr) |
|
199
|
|
|
__str__ = __repr__ |
|
200
|
|
|
|
|
201
|
|
|
|
|
202
|
|
|
class ExprData(namedtuple('ExprData', 'expr deltas resources')): |
|
203
|
|
|
"""A pair of expressions and data resources. The expresions will be |
|
204
|
|
|
computed using the resources as the starting scope. |
|
205
|
|
|
|
|
206
|
|
|
Parameters |
|
207
|
|
|
---------- |
|
208
|
|
|
expr : Expr |
|
209
|
|
|
The baseline values. |
|
210
|
|
|
deltas : Expr, optional |
|
211
|
|
|
The deltas for the data. |
|
212
|
|
|
resources : resource or dict of resources, optional |
|
213
|
|
|
The resources to compute the exprs against. |
|
214
|
|
|
""" |
|
215
|
|
|
def __new__(cls, expr, deltas=None, resources=None): |
|
216
|
|
|
return super(ExprData, cls).__new__(cls, expr, deltas, resources) |
|
217
|
|
|
|
|
218
|
|
|
def __repr__(self): |
|
219
|
|
|
# If the expressions have _resources() then the repr will |
|
220
|
|
|
# drive computation so we box them. |
|
221
|
|
|
cls = type(self) |
|
222
|
|
|
return super(ExprData, cls).__repr__(cls( |
|
223
|
|
|
_ExprRepr(self.expr), |
|
224
|
|
|
_ExprRepr(self.deltas), |
|
225
|
|
|
self.resources, |
|
226
|
|
|
)) |
|
227
|
|
|
|
|
228
|
|
|
|
|
229
|
|
|
class InvalidField(with_metaclass(ABCMeta)): |
|
230
|
|
|
"""A field that raises an exception indicating that the |
|
231
|
|
|
field was invalid. |
|
232
|
|
|
|
|
233
|
|
|
Parameters |
|
234
|
|
|
---------- |
|
235
|
|
|
field : str |
|
236
|
|
|
The name of the field. |
|
237
|
|
|
type_ : dshape |
|
238
|
|
|
The shape of the field. |
|
239
|
|
|
""" |
|
240
|
|
|
@abstractproperty |
|
241
|
|
|
def error_format(self): |
|
242
|
|
|
raise NotImplementedError('error_format') |
|
243
|
|
|
|
|
244
|
|
|
def __init__(self, field, type_): |
|
245
|
|
|
self._field = field |
|
246
|
|
|
self._type = type_ |
|
247
|
|
|
|
|
248
|
|
|
def __get__(self, instance, owner): |
|
249
|
|
|
raise AttributeError( |
|
250
|
|
|
self.error_format.format(field=self._field, type_=self._type), |
|
251
|
|
|
) |
|
252
|
|
|
|
|
253
|
|
|
|
|
254
|
|
|
class NonNumpyField(InvalidField): |
|
255
|
|
|
error_format = ( |
|
256
|
|
|
"field '{field}' was a non numpy compatible type: '{type_}'" |
|
257
|
|
|
) |
|
258
|
|
|
|
|
259
|
|
|
|
|
260
|
|
|
class NonPipelineField(InvalidField): |
|
261
|
|
|
error_format = ( |
|
262
|
|
|
"field '{field}' was a non Pipeline API compatible type: '{type_}'" |
|
263
|
|
|
) |
|
264
|
|
|
|
|
265
|
|
|
|
|
266
|
|
|
class NotPipelineCompatible(TypeError): |
|
267
|
|
|
"""Exception used to indicate that a dshape is not Pipeline API |
|
268
|
|
|
compatible. |
|
269
|
|
|
""" |
|
270
|
|
|
def __str__(self): |
|
271
|
|
|
return "'%s' is a non Pipeline API compatible type'" % self.args |
|
272
|
|
|
|
|
273
|
|
|
|
|
274
|
|
|
_new_names = ('BlazeDataSet_%d' % n for n in count()) |
|
275
|
|
|
|
|
276
|
|
|
|
|
277
|
|
|
@memoize |
|
278
|
|
|
def new_dataset(expr, deltas): |
|
279
|
|
|
"""Creates or returns a dataset from a pair of blaze expressions. |
|
280
|
|
|
|
|
281
|
|
|
Parameters |
|
282
|
|
|
---------- |
|
283
|
|
|
expr : Expr |
|
284
|
|
|
The blaze expression representing the first known values. |
|
285
|
|
|
deltas : Expr |
|
286
|
|
|
The blaze expression representing the deltas to the data. |
|
287
|
|
|
|
|
288
|
|
|
Returns |
|
289
|
|
|
------- |
|
290
|
|
|
ds : type |
|
291
|
|
|
A new dataset type. |
|
292
|
|
|
|
|
293
|
|
|
Notes |
|
294
|
|
|
----- |
|
295
|
|
|
This function is memoized. repeated calls with the same inputs will return |
|
296
|
|
|
the same type. |
|
297
|
|
|
""" |
|
298
|
|
|
columns = {} |
|
299
|
|
|
for name, type_ in expr.dshape.measure.fields: |
|
300
|
|
|
try: |
|
301
|
|
|
if promote(type_, float64, promote_option=False) != float64: |
|
302
|
|
|
raise NotPipelineCompatible() |
|
303
|
|
|
if isinstance(type_, Option): |
|
304
|
|
|
type_ = type_.ty |
|
305
|
|
|
except NotPipelineCompatible: |
|
306
|
|
|
col = NonPipelineField(name, type_) |
|
307
|
|
|
except TypeError: |
|
308
|
|
|
col = NonNumpyField(name, type_) |
|
309
|
|
|
else: |
|
310
|
|
|
col = Column(type_.to_numpy_dtype()) |
|
311
|
|
|
|
|
312
|
|
|
columns[name] = col |
|
313
|
|
|
|
|
314
|
|
|
name = expr._name |
|
315
|
|
|
if name is None: |
|
316
|
|
|
name = next(_new_names) |
|
317
|
|
|
|
|
318
|
|
|
# unicode is a name error in py3 but the branch is only hit |
|
319
|
|
|
# when we are in python 2. |
|
320
|
|
|
if PY2 and isinstance(name, unicode): # noqa |
|
321
|
|
|
name = name.encode('utf-8') |
|
322
|
|
|
|
|
323
|
|
|
return type(name, (DataSet,), columns) |
|
324
|
|
|
|
|
325
|
|
|
|
|
326
|
|
|
def _check_resources(name, expr, resources): |
|
327
|
|
|
"""Validate that the expression and resources passed match up. |
|
328
|
|
|
|
|
329
|
|
|
Parameters |
|
330
|
|
|
---------- |
|
331
|
|
|
name : str |
|
332
|
|
|
The name of the argument we are checking. |
|
333
|
|
|
expr : Expr |
|
334
|
|
|
The potentially bound expr. |
|
335
|
|
|
resources |
|
336
|
|
|
The explicitly passed resources to compute expr. |
|
337
|
|
|
|
|
338
|
|
|
Raises |
|
339
|
|
|
------ |
|
340
|
|
|
ValueError |
|
341
|
|
|
If the resources do not match for an expression. |
|
342
|
|
|
""" |
|
343
|
|
|
if expr is None: |
|
344
|
|
|
return |
|
345
|
|
|
bound = expr._resources() |
|
346
|
|
|
if not bound and resources is None: |
|
347
|
|
|
raise ValueError('no resources provided to compute %s' % name) |
|
348
|
|
|
if bound and resources: |
|
349
|
|
|
raise ValueError( |
|
350
|
|
|
'explicit and implicit resources provided to compute %s' % name, |
|
351
|
|
|
) |
|
352
|
|
|
|
|
353
|
|
|
|
|
354
|
|
|
def _check_datetime_field(name, measure): |
|
355
|
|
|
"""Check that a field is a datetime inside some measure. |
|
356
|
|
|
|
|
357
|
|
|
Parameters |
|
358
|
|
|
---------- |
|
359
|
|
|
name : str |
|
360
|
|
|
The name of the field to check. |
|
361
|
|
|
measure : Record |
|
362
|
|
|
The record to check the field of. |
|
363
|
|
|
|
|
364
|
|
|
Raises |
|
365
|
|
|
------ |
|
366
|
|
|
TypeError |
|
367
|
|
|
If the field is not a datetime inside ``measure``. |
|
368
|
|
|
""" |
|
369
|
|
|
if not isinstance(measure[name], (Date, DateTime)): |
|
370
|
|
|
raise TypeError( |
|
371
|
|
|
"'{name}' field must be a '{dt}', not: '{dshape}'".format( |
|
372
|
|
|
name=name, |
|
373
|
|
|
dt=DateTime(), |
|
374
|
|
|
dshape=measure[name], |
|
375
|
|
|
), |
|
376
|
|
|
) |
|
377
|
|
|
|
|
378
|
|
|
|
|
379
|
|
|
class NoDeltasWarning(UserWarning): |
|
380
|
|
|
"""Warning used to signal that no deltas could be found and none |
|
381
|
|
|
were provided. |
|
382
|
|
|
|
|
383
|
|
|
Parameters |
|
384
|
|
|
---------- |
|
385
|
|
|
expr : Expr |
|
386
|
|
|
The expression that was searched. |
|
387
|
|
|
""" |
|
388
|
|
|
def __init__(self, expr): |
|
389
|
|
|
self._expr = expr |
|
390
|
|
|
|
|
391
|
|
|
def __str__(self): |
|
392
|
|
|
return 'No deltas could be inferred from expr: %s' % self._expr |
|
393
|
|
|
|
|
394
|
|
|
|
|
395
|
|
|
no_deltas_rules = enum('warn', 'raise_', 'ignore') |
|
396
|
|
|
|
|
397
|
|
|
|
|
398
|
|
|
def get_deltas(expr, deltas, no_deltas_rule): |
|
399
|
|
|
"""Find the correct deltas for the expression. |
|
400
|
|
|
|
|
401
|
|
|
Parameters |
|
402
|
|
|
---------- |
|
403
|
|
|
expr : Expr |
|
404
|
|
|
The baseline expression. |
|
405
|
|
|
deltas : Expr, 'auto', or None |
|
406
|
|
|
The deltas argument. If this is 'auto', then the deltas table will |
|
407
|
|
|
be searched for by walking up the expression tree. If this cannot be |
|
408
|
|
|
reflected, then an action will be taken based on the |
|
409
|
|
|
``no_deltas_rule``. |
|
410
|
|
|
no_deltas_rule : no_deltas_rule |
|
411
|
|
|
How to handle the case where deltas='auto' but no deltas could be |
|
412
|
|
|
found. |
|
413
|
|
|
|
|
414
|
|
|
Returns |
|
415
|
|
|
------- |
|
416
|
|
|
deltas : Expr or None |
|
417
|
|
|
The deltas table to use. |
|
418
|
|
|
""" |
|
419
|
|
|
if isinstance(deltas, bz.Expr) or deltas != 'auto': |
|
420
|
|
|
return deltas |
|
421
|
|
|
|
|
422
|
|
|
try: |
|
423
|
|
|
return expr._child[(expr._name or '') + '_deltas'] |
|
424
|
|
|
except (ValueError, AttributeError): |
|
425
|
|
|
if no_deltas_rule == no_deltas_rules.raise_: |
|
426
|
|
|
raise ValueError( |
|
427
|
|
|
"no deltas table could be reflected for %s" % expr |
|
428
|
|
|
) |
|
429
|
|
|
elif no_deltas_rule == no_deltas_rules.warn: |
|
430
|
|
|
warnings.warn(NoDeltasWarning(expr)) |
|
431
|
|
|
return None |
|
432
|
|
|
|
|
433
|
|
|
|
|
434
|
|
|
def _ensure_timestamp_field(dataset_expr, deltas): |
|
435
|
|
|
"""Verify that the baseline and deltas expressions have a timestamp field. |
|
436
|
|
|
|
|
437
|
|
|
If there is not a ``TS_FIELD_NAME`` on either of the expressions, it will |
|
438
|
|
|
be copied from the ``AD_FIELD_NAME``. If one is provided, then we will |
|
439
|
|
|
verify that it is the correct dshape. |
|
440
|
|
|
|
|
441
|
|
|
Parameters |
|
442
|
|
|
---------- |
|
443
|
|
|
dataset_expr : Expr |
|
444
|
|
|
The baseline expression. |
|
445
|
|
|
deltas : Expr or None |
|
446
|
|
|
The deltas expression if any was provided. |
|
447
|
|
|
|
|
448
|
|
|
Returns |
|
449
|
|
|
------- |
|
450
|
|
|
dataset_expr, deltas : Expr |
|
451
|
|
|
The new baseline and deltas expressions to use. |
|
452
|
|
|
""" |
|
453
|
|
|
measure = dataset_expr.dshape.measure |
|
454
|
|
|
if TS_FIELD_NAME not in measure.names: |
|
455
|
|
|
dataset_expr = bz.transform( |
|
456
|
|
|
dataset_expr, |
|
457
|
|
|
**{TS_FIELD_NAME: dataset_expr[AD_FIELD_NAME]} |
|
458
|
|
|
) |
|
459
|
|
|
if deltas is not None: |
|
460
|
|
|
deltas = bz.transform( |
|
461
|
|
|
deltas, |
|
462
|
|
|
**{TS_FIELD_NAME: deltas[AD_FIELD_NAME]} |
|
463
|
|
|
) |
|
464
|
|
|
else: |
|
465
|
|
|
_check_datetime_field(TS_FIELD_NAME, measure) |
|
466
|
|
|
|
|
467
|
|
|
return dataset_expr, deltas |
|
468
|
|
|
|
|
469
|
|
|
|
|
470
|
|
|
@expect_element(no_deltas_rule=no_deltas_rules) |
|
471
|
|
|
def from_blaze(expr, |
|
472
|
|
|
deltas='auto', |
|
473
|
|
|
loader=None, |
|
474
|
|
|
resources=None, |
|
475
|
|
|
no_deltas_rule=no_deltas_rules.warn): |
|
476
|
|
|
"""Create a Pipeline API object from a blaze expression. |
|
477
|
|
|
|
|
478
|
|
|
Parameters |
|
479
|
|
|
---------- |
|
480
|
|
|
expr : Expr |
|
481
|
|
|
The blaze expression to use. |
|
482
|
|
|
deltas : Expr or 'auto', optional |
|
483
|
|
|
The expression to use for the point in time adjustments. |
|
484
|
|
|
If the string 'auto' is passed, a deltas expr will be looked up |
|
485
|
|
|
by stepping up the expression tree and looking for another field |
|
486
|
|
|
with the name of ``expr`` + '_deltas'. If None is passed, no deltas |
|
487
|
|
|
will be used. |
|
488
|
|
|
loader : BlazeLoader, optional |
|
489
|
|
|
The blaze loader to attach this pipeline dataset to. If None is passed, |
|
490
|
|
|
the global blaze loader is used. |
|
491
|
|
|
resources : dict or any, optional |
|
492
|
|
|
The data to execute the blaze expressions against. This is used as the |
|
493
|
|
|
scope for ``bz.compute``. |
|
494
|
|
|
no_deltas_rule : no_deltas_rule |
|
495
|
|
|
What should happen if ``deltas='auto'`` but no deltas can be found. |
|
496
|
|
|
'warn' says to raise a warning but continue. |
|
497
|
|
|
'raise' says to raise an exception if no deltas can be found. |
|
498
|
|
|
'ignore' says take no action and proceed with no deltas. |
|
499
|
|
|
|
|
500
|
|
|
Returns |
|
501
|
|
|
------- |
|
502
|
|
|
pipeline_api_obj : DataSet or BoundColumn |
|
503
|
|
|
Either a new dataset or bound column based on the shape of the expr |
|
504
|
|
|
passed in. If a table shaped expression is passed, this will return |
|
505
|
|
|
a ``DataSet`` that represents the whole table. If an array-like shape |
|
506
|
|
|
is passed, a ``BoundColumn`` on the dataset that would be constructed |
|
507
|
|
|
from passing the parent is returned. |
|
508
|
|
|
""" |
|
509
|
|
|
deltas = get_deltas(expr, deltas, no_deltas_rule) |
|
510
|
|
|
if deltas is not None: |
|
511
|
|
|
invalid_nodes = tuple(filter(is_invalid_deltas_node, expr._subterms())) |
|
512
|
|
|
if invalid_nodes: |
|
513
|
|
|
raise TypeError( |
|
514
|
|
|
'expression with deltas may only contain (%s) nodes,' |
|
515
|
|
|
" found: %s" % ( |
|
516
|
|
|
', '.join(map(getname, valid_deltas_node_types)), |
|
517
|
|
|
', '.join(set(map(compose(getname, type), invalid_nodes))), |
|
518
|
|
|
), |
|
519
|
|
|
) |
|
520
|
|
|
|
|
521
|
|
|
# Check if this is a single column out of a dataset. |
|
522
|
|
|
if bz.ndim(expr) != 1: |
|
523
|
|
|
raise TypeError( |
|
524
|
|
|
'expression was not tabular or array-like,' |
|
525
|
|
|
' %s dimensions: %d' % ( |
|
526
|
|
|
'too many' if bz.ndim(expr) > 1 else 'not enough', |
|
527
|
|
|
bz.ndim(expr), |
|
528
|
|
|
), |
|
529
|
|
|
) |
|
530
|
|
|
|
|
531
|
|
|
single_column = None |
|
532
|
|
|
if isscalar(expr.dshape.measure): |
|
533
|
|
|
# This is a single column. Record which column we are to return |
|
534
|
|
|
# but create the entire dataset. |
|
535
|
|
|
single_column = rename = expr._name |
|
536
|
|
|
field_hit = False |
|
537
|
|
|
if not isinstance(expr, traversable_nodes): |
|
538
|
|
|
raise TypeError( |
|
539
|
|
|
"expression '%s' was array-like but not a simple field of" |
|
540
|
|
|
" some larger table" % str(expr), |
|
541
|
|
|
) |
|
542
|
|
|
while isinstance(expr, traversable_nodes): |
|
543
|
|
|
if isinstance(expr, bz.expr.Field): |
|
544
|
|
|
if not field_hit: |
|
545
|
|
|
field_hit = True |
|
546
|
|
|
else: |
|
547
|
|
|
break |
|
548
|
|
|
rename = expr._name |
|
549
|
|
|
expr = expr._child |
|
550
|
|
|
dataset_expr = expr.relabel({rename: single_column}) |
|
551
|
|
|
else: |
|
552
|
|
|
dataset_expr = expr |
|
553
|
|
|
|
|
554
|
|
|
measure = dataset_expr.dshape.measure |
|
555
|
|
|
if not isrecord(measure) or AD_FIELD_NAME not in measure.names: |
|
556
|
|
|
raise TypeError( |
|
557
|
|
|
"The dataset must be a collection of records with at least an" |
|
558
|
|
|
" '{ad}' field. Fields provided: '{fields}'\nhint: maybe you need" |
|
559
|
|
|
" to use `relabel` to change your field names".format( |
|
560
|
|
|
ad=AD_FIELD_NAME, |
|
561
|
|
|
fields=measure, |
|
562
|
|
|
), |
|
563
|
|
|
) |
|
564
|
|
|
_check_datetime_field(AD_FIELD_NAME, measure) |
|
565
|
|
|
dataset_expr, deltas = _ensure_timestamp_field(dataset_expr, deltas) |
|
566
|
|
|
|
|
567
|
|
|
if deltas is not None and (sorted(deltas.dshape.measure.fields) != |
|
568
|
|
|
sorted(measure.fields)): |
|
569
|
|
|
raise TypeError( |
|
570
|
|
|
'baseline measure != deltas measure:\n%s != %s' % ( |
|
571
|
|
|
measure, |
|
572
|
|
|
deltas.dshape.measure, |
|
573
|
|
|
), |
|
574
|
|
|
) |
|
575
|
|
|
|
|
576
|
|
|
# Ensure that we have a data resource to execute the query against. |
|
577
|
|
|
_check_resources('dataset_expr', dataset_expr, resources) |
|
578
|
|
|
_check_resources('deltas', deltas, resources) |
|
579
|
|
|
|
|
580
|
|
|
# Create or retrieve the Pipeline API dataset. |
|
581
|
|
|
ds = new_dataset(dataset_expr, deltas) |
|
582
|
|
|
# Register our new dataset with the loader. |
|
583
|
|
|
(loader if loader is not None else global_loader)[ds] = ExprData( |
|
584
|
|
|
dataset_expr, |
|
585
|
|
|
deltas, |
|
586
|
|
|
resources, |
|
587
|
|
|
) |
|
588
|
|
|
if single_column is not None: |
|
589
|
|
|
# We were passed a single column, extract and return it. |
|
590
|
|
|
return getattr(ds, single_column) |
|
591
|
|
|
return ds |
|
592
|
|
|
|
|
593
|
|
|
|
|
594
|
|
|
getdataset = op.attrgetter('dataset') |
|
595
|
|
|
dataset_name = op.attrgetter('name') |
|
596
|
|
|
|
|
597
|
|
|
|
|
598
|
|
|
def overwrite_novel_deltas(baseline, deltas, dates): |
|
599
|
|
|
"""overwrite any deltas into the baseline set that would have changed our |
|
600
|
|
|
most recently known value. |
|
601
|
|
|
|
|
602
|
|
|
Parameters |
|
603
|
|
|
---------- |
|
604
|
|
|
baseline : pd.DataFrame |
|
605
|
|
|
The first known values. |
|
606
|
|
|
deltas : pd.DataFrame |
|
607
|
|
|
Overwrites to the baseline data. |
|
608
|
|
|
dates : pd.DatetimeIndex |
|
609
|
|
|
The dates requested by the loader. |
|
610
|
|
|
|
|
611
|
|
|
Returns |
|
612
|
|
|
------- |
|
613
|
|
|
non_novel_deltas : pd.DataFrame |
|
614
|
|
|
The deltas that do not represent a baseline value. |
|
615
|
|
|
""" |
|
616
|
|
|
get_indexes = dates.searchsorted |
|
617
|
|
|
novel_idx = ( |
|
618
|
|
|
get_indexes(deltas[TS_FIELD_NAME].values, 'right') - |
|
619
|
|
|
get_indexes(deltas[AD_FIELD_NAME].values, 'left') |
|
620
|
|
|
) <= 1 |
|
621
|
|
|
novel_deltas = deltas.loc[novel_idx] |
|
622
|
|
|
non_novel_deltas = deltas.loc[~novel_idx] |
|
623
|
|
|
return pd.concat( |
|
624
|
|
|
(baseline, novel_deltas), |
|
625
|
|
|
ignore_index=True, |
|
626
|
|
|
).sort(TS_FIELD_NAME), non_novel_deltas |
|
627
|
|
|
|
|
628
|
|
|
|
|
629
|
|
|
def overwrite_from_dates(asof, dense_dates, sparse_dates, asset_idx, value): |
|
630
|
|
|
"""Construct a `Float64Overwrite` with the correct |
|
631
|
|
|
start and end date based on the asof date of the delta, |
|
632
|
|
|
the dense_dates, and the dense_dates. |
|
633
|
|
|
|
|
634
|
|
|
Parameters |
|
635
|
|
|
---------- |
|
636
|
|
|
asof : datetime |
|
637
|
|
|
The asof date of the delta. |
|
638
|
|
|
dense_dates : pd.DatetimeIndex |
|
639
|
|
|
The dates requested by the loader. |
|
640
|
|
|
sparse_dates : pd.DatetimeIndex |
|
641
|
|
|
The dates that appeared in the dataset. |
|
642
|
|
|
asset_idx : tuple of int |
|
643
|
|
|
The index of the asset in the block. If this is a tuple, then this |
|
644
|
|
|
is treated as the first and last index to use. |
|
645
|
|
|
value : np.float64 |
|
646
|
|
|
The value to overwrite with. |
|
647
|
|
|
|
|
648
|
|
|
Returns |
|
649
|
|
|
------- |
|
650
|
|
|
overwrite : Float64Overwrite |
|
651
|
|
|
The overwrite that will apply the new value to the data. |
|
652
|
|
|
|
|
653
|
|
|
Notes |
|
654
|
|
|
----- |
|
655
|
|
|
This is forward-filling all dense dates that are between the asof_date date |
|
656
|
|
|
and the next sparse date after the asof_date. |
|
657
|
|
|
|
|
658
|
|
|
For example: |
|
659
|
|
|
let ``asof = pd.Timestamp('2014-01-02')``, |
|
660
|
|
|
``dense_dates = pd.date_range('2014-01-01', '2014-01-05')`` |
|
661
|
|
|
``sparse_dates = pd.to_datetime(['2014-01', '2014-02', '2014-04'])`` |
|
662
|
|
|
|
|
663
|
|
|
Then the overwrite will apply to indexes: 1, 2, 3, 4 |
|
664
|
|
|
""" |
|
665
|
|
|
first_row = dense_dates.searchsorted(asof) |
|
666
|
|
|
next_idx = sparse_dates.searchsorted(asof, 'right') |
|
667
|
|
|
if next_idx == len(sparse_dates): |
|
668
|
|
|
# There is no next date in the sparse, this overwrite should apply |
|
669
|
|
|
# through the end of the dense dates. |
|
670
|
|
|
last_row = len(dense_dates) - 1 |
|
671
|
|
|
else: |
|
672
|
|
|
# There is a next date in sparse dates. This means that the overwrite |
|
673
|
|
|
# should only apply until the index of this date in the dense dates. |
|
674
|
|
|
last_row = dense_dates.searchsorted(sparse_dates[next_idx]) - 1 |
|
675
|
|
|
|
|
676
|
|
|
if first_row > last_row: |
|
677
|
|
|
return |
|
678
|
|
|
|
|
679
|
|
|
first, last = asset_idx |
|
680
|
|
|
yield Float64Overwrite(first_row, last_row, first, last, value) |
|
681
|
|
|
|
|
682
|
|
|
|
|
683
|
|
|
def adjustments_from_deltas_no_sids(dates, |
|
684
|
|
|
dense_dates, |
|
685
|
|
|
column_idx, |
|
686
|
|
|
column_name, |
|
687
|
|
|
assets, |
|
688
|
|
|
deltas): |
|
689
|
|
|
"""Collect all the adjustments that occur in a dataset that does not |
|
690
|
|
|
have a sid column. |
|
691
|
|
|
|
|
692
|
|
|
Parameters |
|
693
|
|
|
---------- |
|
694
|
|
|
dates : pd.DatetimeIndex |
|
695
|
|
|
The dates requested by the loader. |
|
696
|
|
|
dense_dates : pd.DatetimeIndex |
|
697
|
|
|
The dates that were in the dense data. |
|
698
|
|
|
column_idx : int |
|
699
|
|
|
The index of the column in the dataset. |
|
700
|
|
|
column_name : str |
|
701
|
|
|
The name of the column to compute deltas for. |
|
702
|
|
|
deltas : pd.DataFrame |
|
703
|
|
|
The overwrites that should be applied to the dataset. |
|
704
|
|
|
|
|
705
|
|
|
Returns |
|
706
|
|
|
------- |
|
707
|
|
|
adjustments : dict[idx -> Float64Overwrite] |
|
708
|
|
|
The adjustments dictionary to feed to the adjusted array. |
|
709
|
|
|
""" |
|
710
|
|
|
ad_series = deltas[AD_FIELD_NAME] |
|
711
|
|
|
asset_idx = 0, len(assets) - 1 |
|
712
|
|
|
return { |
|
713
|
|
|
dates.get_loc(kd): overwrite_from_dates( |
|
714
|
|
|
ad_series.loc[kd], |
|
715
|
|
|
dates, |
|
716
|
|
|
dense_dates, |
|
717
|
|
|
asset_idx, |
|
718
|
|
|
v, |
|
719
|
|
|
) for kd, v in deltas[column_name].iteritems() |
|
720
|
|
|
} |
|
721
|
|
|
|
|
722
|
|
|
|
|
723
|
|
|
def adjustments_from_deltas_with_sids(dates, |
|
724
|
|
|
dense_dates, |
|
725
|
|
|
column_idx, |
|
726
|
|
|
column_name, |
|
727
|
|
|
assets, |
|
728
|
|
|
deltas): |
|
729
|
|
|
"""Collect all the adjustments that occur in a dataset that does not |
|
730
|
|
|
have a sid column. |
|
731
|
|
|
|
|
732
|
|
|
Parameters |
|
733
|
|
|
---------- |
|
734
|
|
|
dates : pd.DatetimeIndex |
|
735
|
|
|
The dates requested by the loader. |
|
736
|
|
|
dense_dates : pd.DatetimeIndex |
|
737
|
|
|
The dates that were in the dense data. |
|
738
|
|
|
column_idx : int |
|
739
|
|
|
The index of the column in the dataset. |
|
740
|
|
|
column_name : str |
|
741
|
|
|
The name of the column to compute deltas for. |
|
742
|
|
|
deltas : pd.DataFrame |
|
743
|
|
|
The overwrites that should be applied to the dataset. |
|
744
|
|
|
|
|
745
|
|
|
Returns |
|
746
|
|
|
------- |
|
747
|
|
|
adjustments : dict[idx -> Float64Overwrite] |
|
748
|
|
|
The adjustments dictionary to feed to the adjusted array. |
|
749
|
|
|
""" |
|
750
|
|
|
ad_series = deltas[AD_FIELD_NAME] |
|
751
|
|
|
adjustments = defaultdict(list) |
|
752
|
|
|
for sid_idx, (sid, per_sid) in enumerate(deltas[column_name].iteritems()): |
|
753
|
|
|
for kd, v in per_sid.iteritems(): |
|
754
|
|
|
adjustments[dates.searchsorted(kd)].extend( |
|
755
|
|
|
overwrite_from_dates( |
|
756
|
|
|
ad_series.loc[kd, sid], |
|
757
|
|
|
dates, |
|
758
|
|
|
dense_dates, |
|
759
|
|
|
(sid_idx, sid_idx), |
|
760
|
|
|
v, |
|
761
|
|
|
), |
|
762
|
|
|
) |
|
763
|
|
|
return dict(adjustments) # no subclasses of dict |
|
764
|
|
|
|
|
765
|
|
|
|
|
766
|
|
|
class BlazeLoader(dict): |
|
767
|
|
|
def __init__(self, colmap=None): |
|
768
|
|
|
self.update(colmap or {}) |
|
769
|
|
|
|
|
770
|
|
|
@classmethod |
|
771
|
|
|
@memoize(cache=WeakKeyDictionary()) |
|
772
|
|
|
def global_instance(cls): |
|
773
|
|
|
return cls() |
|
774
|
|
|
|
|
775
|
|
|
def __hash__(self): |
|
776
|
|
|
return id(self) |
|
777
|
|
|
|
|
778
|
|
|
def __call__(self, column): |
|
779
|
|
|
if column.dataset in self: |
|
780
|
|
|
return self |
|
781
|
|
|
raise KeyError(column) |
|
782
|
|
|
|
|
783
|
|
|
def load_adjusted_array(self, columns, dates, assets, mask): |
|
784
|
|
|
return dict( |
|
785
|
|
|
concat(map( |
|
786
|
|
|
partial(self._load_dataset, dates, assets, mask), |
|
787
|
|
|
itervalues(groupby(getdataset, columns)) |
|
788
|
|
|
)) |
|
789
|
|
|
) |
|
790
|
|
|
|
|
791
|
|
|
def _load_dataset(self, dates, assets, mask, columns): |
|
792
|
|
|
try: |
|
793
|
|
|
(dataset,) = set(map(getdataset, columns)) |
|
794
|
|
|
except ValueError: |
|
795
|
|
|
raise AssertionError('all columns must come from the same dataset') |
|
796
|
|
|
|
|
797
|
|
|
expr, deltas, resources = self[dataset] |
|
798
|
|
|
have_sids = SID_FIELD_NAME in expr.fields |
|
799
|
|
|
assets = list(map(int, assets)) # coerce from numpy.int64 |
|
800
|
|
|
fields = list(map(dataset_name, columns)) |
|
801
|
|
|
query_fields = fields + [AD_FIELD_NAME, TS_FIELD_NAME] + ( |
|
802
|
|
|
[SID_FIELD_NAME] if have_sids else [] |
|
803
|
|
|
) |
|
804
|
|
|
|
|
805
|
|
|
def where(e): |
|
806
|
|
|
"""Create the query to run against the resources. |
|
807
|
|
|
|
|
808
|
|
|
Parameters |
|
809
|
|
|
---------- |
|
810
|
|
|
e : Expr |
|
811
|
|
|
The baseline or deltas expression. |
|
812
|
|
|
|
|
813
|
|
|
Returns |
|
814
|
|
|
------- |
|
815
|
|
|
q : Expr |
|
816
|
|
|
The query to run. |
|
817
|
|
|
""" |
|
818
|
|
|
ts = e[TS_FIELD_NAME] |
|
819
|
|
|
# Hack to get the lower bound to query: |
|
820
|
|
|
# This must be strictly executed because the data for `ts` will |
|
821
|
|
|
# be removed from scope too early otherwise. |
|
822
|
|
|
lower = odo(ts[ts <= dates[0]].max(), pd.Timestamp) |
|
823
|
|
|
selection = ts <= dates[-1] |
|
824
|
|
|
if have_sids: |
|
825
|
|
|
selection &= e[SID_FIELD_NAME].isin(assets) |
|
826
|
|
|
if lower is not pd.NaT: |
|
827
|
|
|
selection &= ts >= lower |
|
828
|
|
|
|
|
829
|
|
|
return e[selection][query_fields] |
|
830
|
|
|
|
|
831
|
|
|
extra_kwargs = {'d': resources} if resources else {} |
|
832
|
|
|
materialized_expr = odo(where(expr), pd.DataFrame, **extra_kwargs) |
|
833
|
|
|
materialized_deltas = ( |
|
834
|
|
|
odo(where(deltas), pd.DataFrame, **extra_kwargs) |
|
835
|
|
|
if deltas is not None else |
|
836
|
|
|
pd.DataFrame(columns=query_fields) |
|
837
|
|
|
) |
|
838
|
|
|
|
|
839
|
|
|
# Inline the deltas that changed our most recently known value. |
|
840
|
|
|
# Also, we reindex by the dates to create a dense representation of |
|
841
|
|
|
# the data. |
|
842
|
|
|
sparse_output, non_novel_deltas = overwrite_novel_deltas( |
|
843
|
|
|
materialized_expr, |
|
844
|
|
|
materialized_deltas, |
|
845
|
|
|
dates, |
|
846
|
|
|
) |
|
847
|
|
|
sparse_output.drop(AD_FIELD_NAME, axis=1, inplace=True) |
|
848
|
|
|
|
|
849
|
|
|
if have_sids: |
|
850
|
|
|
# Unstack by the sid so that we get a multi-index on the columns |
|
851
|
|
|
# of datacolumn, sid. |
|
852
|
|
|
sparse_output = sparse_output.set_index( |
|
853
|
|
|
[TS_FIELD_NAME, SID_FIELD_NAME], |
|
854
|
|
|
).unstack() |
|
855
|
|
|
sparse_deltas = non_novel_deltas.set_index( |
|
856
|
|
|
[TS_FIELD_NAME, SID_FIELD_NAME], |
|
857
|
|
|
).unstack() |
|
858
|
|
|
|
|
859
|
|
|
dense_output = sparse_output.reindex(dates, method='ffill') |
|
860
|
|
|
cols = dense_output.columns |
|
861
|
|
|
dense_output = dense_output.reindex( |
|
862
|
|
|
columns=pd.MultiIndex.from_product( |
|
863
|
|
|
(cols.levels[0], assets), |
|
864
|
|
|
names=cols.names, |
|
865
|
|
|
), |
|
866
|
|
|
) |
|
867
|
|
|
|
|
868
|
|
|
adjustments_from_deltas = adjustments_from_deltas_with_sids |
|
869
|
|
|
column_view = identity |
|
870
|
|
|
else: |
|
871
|
|
|
# We use the column view to make an array per asset. |
|
872
|
|
|
column_view = compose( |
|
873
|
|
|
# We need to copy this because we need a concrete ndarray. |
|
874
|
|
|
# The `repeat_last_axis` call will give us a fancy strided |
|
875
|
|
|
# array which uses a buffer to represent `len(assets)` columns. |
|
876
|
|
|
# The engine puts nans at the indicies for which we do not have |
|
877
|
|
|
# sid information so that the nan-aware reductions still work. |
|
878
|
|
|
# A future change to the engine would be to add first class |
|
879
|
|
|
# support for macro econimic datasets. |
|
880
|
|
|
copy, |
|
881
|
|
|
partial(repeat_last_axis, count=len(assets)), |
|
882
|
|
|
) |
|
883
|
|
|
sparse_output = sparse_output.set_index(TS_FIELD_NAME) |
|
884
|
|
|
dense_output = sparse_output.reindex(dates, method='ffill') |
|
885
|
|
|
sparse_deltas = non_novel_deltas.set_index(TS_FIELD_NAME) |
|
886
|
|
|
adjustments_from_deltas = adjustments_from_deltas_no_sids |
|
887
|
|
|
|
|
888
|
|
|
for column_idx, column in enumerate(columns): |
|
889
|
|
|
column_name = column.name |
|
890
|
|
|
yield column, AdjustedArray( |
|
891
|
|
|
column_view( |
|
892
|
|
|
dense_output[column_name].values.astype(column.dtype), |
|
893
|
|
|
), |
|
894
|
|
|
mask, |
|
895
|
|
|
adjustments_from_deltas( |
|
896
|
|
|
dates, |
|
897
|
|
|
sparse_output.index, |
|
898
|
|
|
column_idx, |
|
899
|
|
|
column_name, |
|
900
|
|
|
assets, |
|
901
|
|
|
sparse_deltas, |
|
902
|
|
|
) |
|
903
|
|
|
) |
|
904
|
|
|
|
|
905
|
|
|
global_loader = BlazeLoader.global_instance() |
|
906
|
|
|
|
|
907
|
|
|
|
|
908
|
|
|
def bind_expression_to_resources(expr, resources): |
|
909
|
|
|
""" |
|
910
|
|
|
Bind a Blaze expression to resources. |
|
911
|
|
|
|
|
912
|
|
|
Parameters |
|
913
|
|
|
---------- |
|
914
|
|
|
expr : bz.Expr |
|
915
|
|
|
The expression to which we want to bind resources. |
|
916
|
|
|
resources : dict[bz.Symbol -> any] |
|
917
|
|
|
Mapping from the atomic terms of ``expr`` to actual data resources. |
|
918
|
|
|
|
|
919
|
|
|
Returns |
|
920
|
|
|
------- |
|
921
|
|
|
bound_expr : bz.Expr |
|
922
|
|
|
``expr`` with bound resources. |
|
923
|
|
|
""" |
|
924
|
|
|
# bind the resources into the expression |
|
925
|
|
|
if resources is None: |
|
926
|
|
|
resources = {} |
|
927
|
|
|
|
|
928
|
|
|
# _subs stands for substitute. It's not actually private, blaze just |
|
929
|
|
|
# prefixes symbol-manipulation methods with underscores to prevent |
|
930
|
|
|
# collisions with data column names. |
|
931
|
|
|
return expr._subs({ |
|
932
|
|
|
k: bz.Data(v, dshape=k.dshape) for k, v in iteritems(resources) |
|
933
|
|
|
}) |
|
934
|
|
|
|
|
935
|
|
|
|
|
936
|
|
|
def ffill_query_in_range(expr, |
|
937
|
|
|
lower, |
|
938
|
|
|
upper, |
|
939
|
|
|
odo_kwargs=None, |
|
940
|
|
|
ts_field=TS_FIELD_NAME, |
|
941
|
|
|
sid_field=SID_FIELD_NAME): |
|
942
|
|
|
"""Query a blaze expression in a given time range properly forward filling |
|
943
|
|
|
from values that fall before the lower date. |
|
944
|
|
|
|
|
945
|
|
|
Parameters |
|
946
|
|
|
---------- |
|
947
|
|
|
expr : Expr |
|
948
|
|
|
Bound blaze expression. |
|
949
|
|
|
lower : datetime |
|
950
|
|
|
The lower date to query for. |
|
951
|
|
|
upper : datetime |
|
952
|
|
|
The upper date to query for. |
|
953
|
|
|
odo_kwargs : dict, optional |
|
954
|
|
|
The extra keyword arguments to pass to ``odo``. |
|
955
|
|
|
ts_field : str, optional |
|
956
|
|
|
The name of the timestamp field in the given blaze expression. |
|
957
|
|
|
sid_field : str, optional |
|
958
|
|
|
The name of the sid field in the given blaze expression. |
|
959
|
|
|
|
|
960
|
|
|
Returns |
|
961
|
|
|
------- |
|
962
|
|
|
raw : pd.DataFrame |
|
963
|
|
|
A strict dataframe for the data in the given date range. This may |
|
964
|
|
|
start before the requested start date if a value is needed to ffill. |
|
965
|
|
|
""" |
|
966
|
|
|
odo_kwargs = odo_kwargs or {} |
|
967
|
|
|
filtered = expr[expr[ts_field] <= lower] |
|
968
|
|
|
computed_lower = odo( |
|
969
|
|
|
bz.by( |
|
970
|
|
|
filtered[sid_field], |
|
971
|
|
|
timestamp=filtered[ts_field].max(), |
|
972
|
|
|
).timestamp.min(), |
|
973
|
|
|
pd.Timestamp, |
|
974
|
|
|
**odo_kwargs |
|
975
|
|
|
) |
|
976
|
|
|
if pd.isnull(computed_lower): |
|
977
|
|
|
# If there is no lower date, just query for data in the date |
|
978
|
|
|
# range. It must all be null anyways. |
|
979
|
|
|
computed_lower = lower |
|
980
|
|
|
|
|
981
|
|
|
return odo( |
|
982
|
|
|
expr[ |
|
983
|
|
|
(expr[ts_field] >= computed_lower) & |
|
984
|
|
|
(expr[ts_field] <= upper) |
|
985
|
|
|
], |
|
986
|
|
|
pd.DataFrame, |
|
987
|
|
|
**odo_kwargs |
|
988
|
|
|
) |
|
989
|
|
|
|