1
|
|
|
"""Blaze integration with the Pipeline API. |
2
|
|
|
|
3
|
|
|
For an overview of the blaze project, see blaze.pydata.org |
4
|
|
|
|
5
|
|
|
The blaze loader for the Pipeline API is designed to allow us to load |
6
|
|
|
data from arbitrary sources as long as we can execute the needed expressions |
7
|
|
|
against the data with blaze. |
8
|
|
|
|
9
|
|
|
Data Format |
10
|
|
|
----------- |
11
|
|
|
|
12
|
|
|
The blaze Pipeline API loader expects that data is formatted in a tabular way. |
13
|
|
|
The only required column in your table is ``asof_date`` where this column |
14
|
|
|
represents the date this data is referencing. For example, one might have a CSV |
15
|
|
|
like: |
16
|
|
|
|
17
|
|
|
asof_date,value |
18
|
|
|
2014-01-06,0 |
19
|
|
|
2014-01-07,1 |
20
|
|
|
2014-01-08,2 |
21
|
|
|
|
22
|
|
|
This says that the value on 2014-01-01 was 0 and so on. |
23
|
|
|
|
24
|
|
|
Optionally, we may provide a ``timestamp`` column to be used to represent |
25
|
|
|
point in time data. This column tells us when the data was known, or became |
26
|
|
|
available to for use. Using our same CSV, we could write this with a timestamp |
27
|
|
|
like: |
28
|
|
|
|
29
|
|
|
asof_date,timestamp,value |
30
|
|
|
2014-01-06,2014-01-07,0 |
31
|
|
|
2014-01-07,2014-01-08,1 |
32
|
|
|
2014-01-08,2014-01-09,2 |
33
|
|
|
|
34
|
|
|
This says that the value was 0 on 2014-01-01; however, we did not learn this |
35
|
|
|
until 2014-01-02. This is useful for avoiding look-ahead bias in your |
36
|
|
|
pipelines. If this column does not exist, the ``asof_date`` column will be used |
37
|
|
|
instead. |
38
|
|
|
|
39
|
|
|
If your data references a particular asset, you can add a ``sid`` column to |
40
|
|
|
your dataset to represent this. For example: |
41
|
|
|
|
42
|
|
|
asof_date,value,sid |
43
|
|
|
2014-01-06,0,10 |
44
|
|
|
2014-01-06,1,20 |
45
|
|
|
2014-01-07,1,10 |
46
|
|
|
2014-01-07,2,20 |
47
|
|
|
2014-01-08,2,10 |
48
|
|
|
2014-01-08,3,20 |
49
|
|
|
|
50
|
|
|
This says that on 2014-01-01, the asset with id 10 had a value of 0, and the |
51
|
|
|
asset with id 20 had a value of 1. |
52
|
|
|
|
53
|
|
|
|
54
|
|
|
One of the key features of the Pipeline API is the handling of adjustments and |
55
|
|
|
restatements. Often our data will be amended after the fact and we would like |
56
|
|
|
to trade on the newest information; however, we do not want to introduce this |
57
|
|
|
knowledge to our model too early. The blaze loader handles this case by |
58
|
|
|
accepting a second ``deltas`` expression that contains all of the restatements |
59
|
|
|
in the original expression. |
60
|
|
|
|
61
|
|
|
For example, let's use our table from above: |
62
|
|
|
|
63
|
|
|
asof_date,value |
64
|
|
|
2014-01-06,0 |
65
|
|
|
2014-01-07,1 |
66
|
|
|
2014-01-08,2 |
67
|
|
|
|
68
|
|
|
Imagine that on the fourth the vendor realized that the calculation was |
69
|
|
|
incorrect and the value on the first was actually -1. Then, on the fifth, they |
70
|
|
|
realized that the value for the third was actually 3. We can construct a |
71
|
|
|
``deltas`` expression to pass to our blaze loader that has the same shape as |
72
|
|
|
our baseline table but only contains these new values like: |
73
|
|
|
|
74
|
|
|
asof_date,timestamp,value |
75
|
|
|
2014-01-06,2014-01-09,-1 |
76
|
|
|
2014-01-08,2014-01-10,3 |
77
|
|
|
|
78
|
|
|
This shows that we learned on the fourth that the value on the first was |
79
|
|
|
actually -1 and that we learned on the fifth that the value on the third was |
80
|
|
|
actually 3. By pulling our data into these two tables and not silently updating |
81
|
|
|
our original table we can run our pipelines using the information we would |
82
|
|
|
have had on that day, and we can prevent lookahead bias in the pipelines. |
83
|
|
|
|
84
|
|
|
Conversion from Blaze to the Pipeline API |
85
|
|
|
----------------------------------------- |
86
|
|
|
|
87
|
|
|
Now that our data is structured in the way that the blaze loader expects, we |
88
|
|
|
are ready to convert our blaze expressions into Pipeline API objects. |
89
|
|
|
|
90
|
|
|
This module (zipline.pipeline.loaders.blaze) exports a function called |
91
|
|
|
``from_blaze`` which performs this mapping. |
92
|
|
|
|
93
|
|
|
The expression that you are trying to convert must either be tabular or |
94
|
|
|
array-like. This means the ``dshape`` must be like: |
95
|
|
|
|
96
|
|
|
``Dim * {A: B}`` or ``Dim * A``. |
97
|
|
|
|
98
|
|
|
This represents an expression of dimension 1 which may be fixed or variable, |
99
|
|
|
whose measure is either some record or a scalar. |
100
|
|
|
|
101
|
|
|
The record case defines the entire table with all of the columns, this maps the |
102
|
|
|
blaze expression into a pipeline DataSet. This dataset will have a column for |
103
|
|
|
each field of the record. Some datashape types cannot be coerced into Pipeline |
104
|
|
|
API compatible types and in that case, a column cannot be constructed. |
105
|
|
|
Currently any numeric type that may be promoted to a float64 is compatible with |
106
|
|
|
the Pipeline API. |
107
|
|
|
|
108
|
|
|
The scalar case defines a single column pulled out a table. For example, let |
109
|
|
|
``expr = bz.symbol('s', 'var * {field: int32, asof_date: datetime}')``. |
110
|
|
|
When we pass ``expr.field`` to ``from_blaze``, we will walk back up the |
111
|
|
|
expression tree until we find the table that ``field`` is defined on. We will |
112
|
|
|
then proceed with the record case to construct a dataset; however, before |
113
|
|
|
returning the dataset we will pull out only the column that was passed in. |
114
|
|
|
|
115
|
|
|
For full documentation, see ``help(from_blaze)`` or ``from_blaze?`` in IPython. |
116
|
|
|
|
117
|
|
|
Using our Pipeline DataSets and Columns |
118
|
|
|
--------------------------------------- |
119
|
|
|
|
120
|
|
|
Once we have mapped our blaze expressions into Pipeline API objects, we may |
121
|
|
|
use them just like any other datasets or columns. For more information on how |
122
|
|
|
to run a pipeline or using the Pipeline API, see: |
123
|
|
|
www.quantopian.com/help#pipeline-api |
124
|
|
|
""" |
125
|
|
|
from __future__ import division, absolute_import |
126
|
|
|
|
127
|
|
|
from abc import ABCMeta, abstractproperty |
128
|
|
|
from collections import namedtuple, defaultdict |
129
|
|
|
from copy import copy |
130
|
|
|
from functools import partial |
131
|
|
|
from itertools import count |
132
|
|
|
import warnings |
133
|
|
|
from weakref import WeakKeyDictionary |
134
|
|
|
|
135
|
|
|
import blaze as bz |
136
|
|
|
from datashape import ( |
137
|
|
|
Date, |
138
|
|
|
DateTime, |
139
|
|
|
Option, |
140
|
|
|
float64, |
141
|
|
|
isrecord, |
142
|
|
|
isscalar, |
143
|
|
|
promote, |
144
|
|
|
) |
145
|
|
|
from odo import odo |
146
|
|
|
import pandas as pd |
147
|
|
|
from toolz import ( |
148
|
|
|
complement, |
149
|
|
|
compose, |
150
|
|
|
concat, |
151
|
|
|
flip, |
152
|
|
|
groupby, |
153
|
|
|
identity, |
154
|
|
|
memoize, |
155
|
|
|
) |
156
|
|
|
import toolz.curried.operator as op |
157
|
|
|
from six import with_metaclass, PY2, itervalues, iteritems |
158
|
|
|
|
159
|
|
|
|
160
|
|
|
from zipline.pipeline.data.dataset import DataSet, Column |
161
|
|
|
from zipline.lib.adjusted_array import AdjustedArray |
162
|
|
|
from zipline.lib.adjustment import Float64Overwrite |
163
|
|
|
from zipline.utils.enum import enum |
164
|
|
|
from zipline.utils.input_validation import expect_element |
165
|
|
|
from zipline.utils.numpy_utils import repeat_last_axis |
166
|
|
|
|
167
|
|
|
|
168
|
|
|
AD_FIELD_NAME = 'asof_date' |
169
|
|
|
TS_FIELD_NAME = 'timestamp' |
170
|
|
|
SID_FIELD_NAME = 'sid' |
171
|
|
|
valid_deltas_node_types = ( |
172
|
|
|
bz.expr.Field, |
173
|
|
|
bz.expr.ReLabel, |
174
|
|
|
bz.expr.Symbol, |
175
|
|
|
) |
176
|
|
|
traversable_nodes = ( |
177
|
|
|
bz.expr.Field, |
178
|
|
|
bz.expr.Label, |
179
|
|
|
) |
180
|
|
|
is_invalid_deltas_node = complement(flip(isinstance, valid_deltas_node_types)) |
181
|
|
|
getname = op.attrgetter('__name__') |
182
|
|
|
|
183
|
|
|
|
184
|
|
|
class _ExprRepr(object): |
185
|
|
|
"""Box for repring expressions with the str of the expression. |
186
|
|
|
|
187
|
|
|
Parameters |
188
|
|
|
---------- |
189
|
|
|
expr : Expr |
190
|
|
|
The expression to box for repring. |
191
|
|
|
""" |
192
|
|
|
__slots__ = 'expr', |
193
|
|
|
|
194
|
|
|
def __init__(self, expr): |
195
|
|
|
self.expr = expr |
196
|
|
|
|
197
|
|
|
def __repr__(self): |
198
|
|
|
return str(self.expr) |
199
|
|
|
__str__ = __repr__ |
200
|
|
|
|
201
|
|
|
|
202
|
|
|
class ExprData(namedtuple('ExprData', 'expr deltas resources')): |
203
|
|
|
"""A pair of expressions and data resources. The expresions will be |
204
|
|
|
computed using the resources as the starting scope. |
205
|
|
|
|
206
|
|
|
Parameters |
207
|
|
|
---------- |
208
|
|
|
expr : Expr |
209
|
|
|
The baseline values. |
210
|
|
|
deltas : Expr, optional |
211
|
|
|
The deltas for the data. |
212
|
|
|
resources : resource or dict of resources, optional |
213
|
|
|
The resources to compute the exprs against. |
214
|
|
|
""" |
215
|
|
|
def __new__(cls, expr, deltas=None, resources=None): |
216
|
|
|
return super(ExprData, cls).__new__(cls, expr, deltas, resources) |
217
|
|
|
|
218
|
|
|
def __repr__(self): |
219
|
|
|
# If the expressions have _resources() then the repr will |
220
|
|
|
# drive computation so we box them. |
221
|
|
|
cls = type(self) |
222
|
|
|
return super(ExprData, cls).__repr__(cls( |
223
|
|
|
_ExprRepr(self.expr), |
224
|
|
|
_ExprRepr(self.deltas), |
225
|
|
|
self.resources, |
226
|
|
|
)) |
227
|
|
|
|
228
|
|
|
|
229
|
|
|
class InvalidField(with_metaclass(ABCMeta)): |
230
|
|
|
"""A field that raises an exception indicating that the |
231
|
|
|
field was invalid. |
232
|
|
|
|
233
|
|
|
Parameters |
234
|
|
|
---------- |
235
|
|
|
field : str |
236
|
|
|
The name of the field. |
237
|
|
|
type_ : dshape |
238
|
|
|
The shape of the field. |
239
|
|
|
""" |
240
|
|
|
@abstractproperty |
241
|
|
|
def error_format(self): |
242
|
|
|
raise NotImplementedError('error_format') |
243
|
|
|
|
244
|
|
|
def __init__(self, field, type_): |
245
|
|
|
self._field = field |
246
|
|
|
self._type = type_ |
247
|
|
|
|
248
|
|
|
def __get__(self, instance, owner): |
249
|
|
|
raise AttributeError( |
250
|
|
|
self.error_format.format(field=self._field, type_=self._type), |
251
|
|
|
) |
252
|
|
|
|
253
|
|
|
|
254
|
|
|
class NonNumpyField(InvalidField): |
255
|
|
|
error_format = ( |
256
|
|
|
"field '{field}' was a non numpy compatible type: '{type_}'" |
257
|
|
|
) |
258
|
|
|
|
259
|
|
|
|
260
|
|
|
class NonPipelineField(InvalidField): |
261
|
|
|
error_format = ( |
262
|
|
|
"field '{field}' was a non Pipeline API compatible type: '{type_}'" |
263
|
|
|
) |
264
|
|
|
|
265
|
|
|
|
266
|
|
|
class NotPipelineCompatible(TypeError): |
267
|
|
|
"""Exception used to indicate that a dshape is not Pipeline API |
268
|
|
|
compatible. |
269
|
|
|
""" |
270
|
|
|
def __str__(self): |
271
|
|
|
return "'%s' is a non Pipeline API compatible type'" % self.args |
272
|
|
|
|
273
|
|
|
|
274
|
|
|
_new_names = ('BlazeDataSet_%d' % n for n in count()) |
275
|
|
|
|
276
|
|
|
|
277
|
|
|
@memoize |
278
|
|
|
def new_dataset(expr, deltas): |
279
|
|
|
"""Creates or returns a dataset from a pair of blaze expressions. |
280
|
|
|
|
281
|
|
|
Parameters |
282
|
|
|
---------- |
283
|
|
|
expr : Expr |
284
|
|
|
The blaze expression representing the first known values. |
285
|
|
|
deltas : Expr |
286
|
|
|
The blaze expression representing the deltas to the data. |
287
|
|
|
|
288
|
|
|
Returns |
289
|
|
|
------- |
290
|
|
|
ds : type |
291
|
|
|
A new dataset type. |
292
|
|
|
|
293
|
|
|
Notes |
294
|
|
|
----- |
295
|
|
|
This function is memoized. repeated calls with the same inputs will return |
296
|
|
|
the same type. |
297
|
|
|
""" |
298
|
|
|
columns = {} |
299
|
|
|
for name, type_ in expr.dshape.measure.fields: |
300
|
|
|
try: |
301
|
|
|
if promote(type_, float64, promote_option=False) != float64: |
302
|
|
|
raise NotPipelineCompatible() |
303
|
|
|
if isinstance(type_, Option): |
304
|
|
|
type_ = type_.ty |
305
|
|
|
except NotPipelineCompatible: |
306
|
|
|
col = NonPipelineField(name, type_) |
307
|
|
|
except TypeError: |
308
|
|
|
col = NonNumpyField(name, type_) |
309
|
|
|
else: |
310
|
|
|
col = Column(type_.to_numpy_dtype()) |
311
|
|
|
|
312
|
|
|
columns[name] = col |
313
|
|
|
|
314
|
|
|
name = expr._name |
315
|
|
|
if name is None: |
316
|
|
|
name = next(_new_names) |
317
|
|
|
|
318
|
|
|
# unicode is a name error in py3 but the branch is only hit |
319
|
|
|
# when we are in python 2. |
320
|
|
|
if PY2 and isinstance(name, unicode): # noqa |
321
|
|
|
name = name.encode('utf-8') |
322
|
|
|
|
323
|
|
|
return type(name, (DataSet,), columns) |
324
|
|
|
|
325
|
|
|
|
326
|
|
|
def _check_resources(name, expr, resources): |
327
|
|
|
"""Validate that the expression and resources passed match up. |
328
|
|
|
|
329
|
|
|
Parameters |
330
|
|
|
---------- |
331
|
|
|
name : str |
332
|
|
|
The name of the argument we are checking. |
333
|
|
|
expr : Expr |
334
|
|
|
The potentially bound expr. |
335
|
|
|
resources |
336
|
|
|
The explicitly passed resources to compute expr. |
337
|
|
|
|
338
|
|
|
Raises |
339
|
|
|
------ |
340
|
|
|
ValueError |
341
|
|
|
If the resources do not match for an expression. |
342
|
|
|
""" |
343
|
|
|
if expr is None: |
344
|
|
|
return |
345
|
|
|
bound = expr._resources() |
346
|
|
|
if not bound and resources is None: |
347
|
|
|
raise ValueError('no resources provided to compute %s' % name) |
348
|
|
|
if bound and resources: |
349
|
|
|
raise ValueError( |
350
|
|
|
'explicit and implicit resources provided to compute %s' % name, |
351
|
|
|
) |
352
|
|
|
|
353
|
|
|
|
354
|
|
|
def _check_datetime_field(name, measure): |
355
|
|
|
"""Check that a field is a datetime inside some measure. |
356
|
|
|
|
357
|
|
|
Parameters |
358
|
|
|
---------- |
359
|
|
|
name : str |
360
|
|
|
The name of the field to check. |
361
|
|
|
measure : Record |
362
|
|
|
The record to check the field of. |
363
|
|
|
|
364
|
|
|
Raises |
365
|
|
|
------ |
366
|
|
|
TypeError |
367
|
|
|
If the field is not a datetime inside ``measure``. |
368
|
|
|
""" |
369
|
|
|
if not isinstance(measure[name], (Date, DateTime)): |
370
|
|
|
raise TypeError( |
371
|
|
|
"'{name}' field must be a '{dt}', not: '{dshape}'".format( |
372
|
|
|
name=name, |
373
|
|
|
dt=DateTime(), |
374
|
|
|
dshape=measure[name], |
375
|
|
|
), |
376
|
|
|
) |
377
|
|
|
|
378
|
|
|
|
379
|
|
|
class NoDeltasWarning(UserWarning): |
380
|
|
|
"""Warning used to signal that no deltas could be found and none |
381
|
|
|
were provided. |
382
|
|
|
|
383
|
|
|
Parameters |
384
|
|
|
---------- |
385
|
|
|
expr : Expr |
386
|
|
|
The expression that was searched. |
387
|
|
|
""" |
388
|
|
|
def __init__(self, expr): |
389
|
|
|
self._expr = expr |
390
|
|
|
|
391
|
|
|
def __str__(self): |
392
|
|
|
return 'No deltas could be inferred from expr: %s' % self._expr |
393
|
|
|
|
394
|
|
|
|
395
|
|
|
no_deltas_rules = enum('warn', 'raise_', 'ignore') |
396
|
|
|
|
397
|
|
|
|
398
|
|
|
def get_deltas(expr, deltas, no_deltas_rule): |
399
|
|
|
"""Find the correct deltas for the expression. |
400
|
|
|
|
401
|
|
|
Parameters |
402
|
|
|
---------- |
403
|
|
|
expr : Expr |
404
|
|
|
The baseline expression. |
405
|
|
|
deltas : Expr, 'auto', or None |
406
|
|
|
The deltas argument. If this is 'auto', then the deltas table will |
407
|
|
|
be searched for by walking up the expression tree. If this cannot be |
408
|
|
|
reflected, then an action will be taken based on the |
409
|
|
|
``no_deltas_rule``. |
410
|
|
|
no_deltas_rule : no_deltas_rule |
411
|
|
|
How to handle the case where deltas='auto' but no deltas could be |
412
|
|
|
found. |
413
|
|
|
|
414
|
|
|
Returns |
415
|
|
|
------- |
416
|
|
|
deltas : Expr or None |
417
|
|
|
The deltas table to use. |
418
|
|
|
""" |
419
|
|
|
if isinstance(deltas, bz.Expr) or deltas != 'auto': |
420
|
|
|
return deltas |
421
|
|
|
|
422
|
|
|
try: |
423
|
|
|
return expr._child[(expr._name or '') + '_deltas'] |
424
|
|
|
except (ValueError, AttributeError): |
425
|
|
|
if no_deltas_rule == no_deltas_rules.raise_: |
426
|
|
|
raise ValueError( |
427
|
|
|
"no deltas table could be reflected for %s" % expr |
428
|
|
|
) |
429
|
|
|
elif no_deltas_rule == no_deltas_rules.warn: |
430
|
|
|
warnings.warn(NoDeltasWarning(expr)) |
431
|
|
|
return None |
432
|
|
|
|
433
|
|
|
|
434
|
|
|
def _ensure_timestamp_field(dataset_expr, deltas): |
435
|
|
|
"""Verify that the baseline and deltas expressions have a timestamp field. |
436
|
|
|
|
437
|
|
|
If there is not a ``TS_FIELD_NAME`` on either of the expressions, it will |
438
|
|
|
be copied from the ``AD_FIELD_NAME``. If one is provided, then we will |
439
|
|
|
verify that it is the correct dshape. |
440
|
|
|
|
441
|
|
|
Parameters |
442
|
|
|
---------- |
443
|
|
|
dataset_expr : Expr |
444
|
|
|
The baseline expression. |
445
|
|
|
deltas : Expr or None |
446
|
|
|
The deltas expression if any was provided. |
447
|
|
|
|
448
|
|
|
Returns |
449
|
|
|
------- |
450
|
|
|
dataset_expr, deltas : Expr |
451
|
|
|
The new baseline and deltas expressions to use. |
452
|
|
|
""" |
453
|
|
|
measure = dataset_expr.dshape.measure |
454
|
|
|
if TS_FIELD_NAME not in measure.names: |
455
|
|
|
dataset_expr = bz.transform( |
456
|
|
|
dataset_expr, |
457
|
|
|
**{TS_FIELD_NAME: dataset_expr[AD_FIELD_NAME]} |
458
|
|
|
) |
459
|
|
|
if deltas is not None: |
460
|
|
|
deltas = bz.transform( |
461
|
|
|
deltas, |
462
|
|
|
**{TS_FIELD_NAME: deltas[AD_FIELD_NAME]} |
463
|
|
|
) |
464
|
|
|
else: |
465
|
|
|
_check_datetime_field(TS_FIELD_NAME, measure) |
466
|
|
|
|
467
|
|
|
return dataset_expr, deltas |
468
|
|
|
|
469
|
|
|
|
470
|
|
|
@expect_element(no_deltas_rule=no_deltas_rules) |
471
|
|
|
def from_blaze(expr, |
472
|
|
|
deltas='auto', |
473
|
|
|
loader=None, |
474
|
|
|
resources=None, |
475
|
|
|
no_deltas_rule=no_deltas_rules.warn): |
476
|
|
|
"""Create a Pipeline API object from a blaze expression. |
477
|
|
|
|
478
|
|
|
Parameters |
479
|
|
|
---------- |
480
|
|
|
expr : Expr |
481
|
|
|
The blaze expression to use. |
482
|
|
|
deltas : Expr or 'auto', optional |
483
|
|
|
The expression to use for the point in time adjustments. |
484
|
|
|
If the string 'auto' is passed, a deltas expr will be looked up |
485
|
|
|
by stepping up the expression tree and looking for another field |
486
|
|
|
with the name of ``expr`` + '_deltas'. If None is passed, no deltas |
487
|
|
|
will be used. |
488
|
|
|
loader : BlazeLoader, optional |
489
|
|
|
The blaze loader to attach this pipeline dataset to. If None is passed, |
490
|
|
|
the global blaze loader is used. |
491
|
|
|
resources : dict or any, optional |
492
|
|
|
The data to execute the blaze expressions against. This is used as the |
493
|
|
|
scope for ``bz.compute``. |
494
|
|
|
no_deltas_rule : no_deltas_rule |
495
|
|
|
What should happen if ``deltas='auto'`` but no deltas can be found. |
496
|
|
|
'warn' says to raise a warning but continue. |
497
|
|
|
'raise' says to raise an exception if no deltas can be found. |
498
|
|
|
'ignore' says take no action and proceed with no deltas. |
499
|
|
|
|
500
|
|
|
Returns |
501
|
|
|
------- |
502
|
|
|
pipeline_api_obj : DataSet or BoundColumn |
503
|
|
|
Either a new dataset or bound column based on the shape of the expr |
504
|
|
|
passed in. If a table shaped expression is passed, this will return |
505
|
|
|
a ``DataSet`` that represents the whole table. If an array-like shape |
506
|
|
|
is passed, a ``BoundColumn`` on the dataset that would be constructed |
507
|
|
|
from passing the parent is returned. |
508
|
|
|
""" |
509
|
|
|
deltas = get_deltas(expr, deltas, no_deltas_rule) |
510
|
|
|
if deltas is not None: |
511
|
|
|
invalid_nodes = tuple(filter(is_invalid_deltas_node, expr._subterms())) |
512
|
|
|
if invalid_nodes: |
513
|
|
|
raise TypeError( |
514
|
|
|
'expression with deltas may only contain (%s) nodes,' |
515
|
|
|
" found: %s" % ( |
516
|
|
|
', '.join(map(getname, valid_deltas_node_types)), |
517
|
|
|
', '.join(set(map(compose(getname, type), invalid_nodes))), |
518
|
|
|
), |
519
|
|
|
) |
520
|
|
|
|
521
|
|
|
# Check if this is a single column out of a dataset. |
522
|
|
|
if bz.ndim(expr) != 1: |
523
|
|
|
raise TypeError( |
524
|
|
|
'expression was not tabular or array-like,' |
525
|
|
|
' %s dimensions: %d' % ( |
526
|
|
|
'too many' if bz.ndim(expr) > 1 else 'not enough', |
527
|
|
|
bz.ndim(expr), |
528
|
|
|
), |
529
|
|
|
) |
530
|
|
|
|
531
|
|
|
single_column = None |
532
|
|
|
if isscalar(expr.dshape.measure): |
533
|
|
|
# This is a single column. Record which column we are to return |
534
|
|
|
# but create the entire dataset. |
535
|
|
|
single_column = rename = expr._name |
536
|
|
|
field_hit = False |
537
|
|
|
if not isinstance(expr, traversable_nodes): |
538
|
|
|
raise TypeError( |
539
|
|
|
"expression '%s' was array-like but not a simple field of" |
540
|
|
|
" some larger table" % str(expr), |
541
|
|
|
) |
542
|
|
|
while isinstance(expr, traversable_nodes): |
543
|
|
|
if isinstance(expr, bz.expr.Field): |
544
|
|
|
if not field_hit: |
545
|
|
|
field_hit = True |
546
|
|
|
else: |
547
|
|
|
break |
548
|
|
|
rename = expr._name |
549
|
|
|
expr = expr._child |
550
|
|
|
dataset_expr = expr.relabel({rename: single_column}) |
551
|
|
|
else: |
552
|
|
|
dataset_expr = expr |
553
|
|
|
|
554
|
|
|
measure = dataset_expr.dshape.measure |
555
|
|
|
if not isrecord(measure) or AD_FIELD_NAME not in measure.names: |
556
|
|
|
raise TypeError( |
557
|
|
|
"The dataset must be a collection of records with at least an" |
558
|
|
|
" '{ad}' field. Fields provided: '{fields}'\nhint: maybe you need" |
559
|
|
|
" to use `relabel` to change your field names".format( |
560
|
|
|
ad=AD_FIELD_NAME, |
561
|
|
|
fields=measure, |
562
|
|
|
), |
563
|
|
|
) |
564
|
|
|
_check_datetime_field(AD_FIELD_NAME, measure) |
565
|
|
|
dataset_expr, deltas = _ensure_timestamp_field(dataset_expr, deltas) |
566
|
|
|
|
567
|
|
|
if deltas is not None and (sorted(deltas.dshape.measure.fields) != |
568
|
|
|
sorted(measure.fields)): |
569
|
|
|
raise TypeError( |
570
|
|
|
'baseline measure != deltas measure:\n%s != %s' % ( |
571
|
|
|
measure, |
572
|
|
|
deltas.dshape.measure, |
573
|
|
|
), |
574
|
|
|
) |
575
|
|
|
|
576
|
|
|
# Ensure that we have a data resource to execute the query against. |
577
|
|
|
_check_resources('dataset_expr', dataset_expr, resources) |
578
|
|
|
_check_resources('deltas', deltas, resources) |
579
|
|
|
|
580
|
|
|
# Create or retrieve the Pipeline API dataset. |
581
|
|
|
ds = new_dataset(dataset_expr, deltas) |
582
|
|
|
# Register our new dataset with the loader. |
583
|
|
|
(loader if loader is not None else global_loader)[ds] = ExprData( |
584
|
|
|
dataset_expr, |
585
|
|
|
deltas, |
586
|
|
|
resources, |
587
|
|
|
) |
588
|
|
|
if single_column is not None: |
589
|
|
|
# We were passed a single column, extract and return it. |
590
|
|
|
return getattr(ds, single_column) |
591
|
|
|
return ds |
592
|
|
|
|
593
|
|
|
|
594
|
|
|
getdataset = op.attrgetter('dataset') |
595
|
|
|
dataset_name = op.attrgetter('name') |
596
|
|
|
|
597
|
|
|
|
598
|
|
|
def overwrite_novel_deltas(baseline, deltas, dates): |
599
|
|
|
"""overwrite any deltas into the baseline set that would have changed our |
600
|
|
|
most recently known value. |
601
|
|
|
|
602
|
|
|
Parameters |
603
|
|
|
---------- |
604
|
|
|
baseline : pd.DataFrame |
605
|
|
|
The first known values. |
606
|
|
|
deltas : pd.DataFrame |
607
|
|
|
Overwrites to the baseline data. |
608
|
|
|
dates : pd.DatetimeIndex |
609
|
|
|
The dates requested by the loader. |
610
|
|
|
|
611
|
|
|
Returns |
612
|
|
|
------- |
613
|
|
|
non_novel_deltas : pd.DataFrame |
614
|
|
|
The deltas that do not represent a baseline value. |
615
|
|
|
""" |
616
|
|
|
get_indexes = dates.searchsorted |
617
|
|
|
novel_idx = ( |
618
|
|
|
get_indexes(deltas[TS_FIELD_NAME].values, 'right') - |
619
|
|
|
get_indexes(deltas[AD_FIELD_NAME].values, 'left') |
620
|
|
|
) <= 1 |
621
|
|
|
novel_deltas = deltas.loc[novel_idx] |
622
|
|
|
non_novel_deltas = deltas.loc[~novel_idx] |
623
|
|
|
return pd.concat( |
624
|
|
|
(baseline, novel_deltas), |
625
|
|
|
ignore_index=True, |
626
|
|
|
).sort(TS_FIELD_NAME), non_novel_deltas |
627
|
|
|
|
628
|
|
|
|
629
|
|
|
def overwrite_from_dates(asof, dense_dates, sparse_dates, asset_idx, value): |
630
|
|
|
"""Construct a `Float64Overwrite` with the correct |
631
|
|
|
start and end date based on the asof date of the delta, |
632
|
|
|
the dense_dates, and the dense_dates. |
633
|
|
|
|
634
|
|
|
Parameters |
635
|
|
|
---------- |
636
|
|
|
asof : datetime |
637
|
|
|
The asof date of the delta. |
638
|
|
|
dense_dates : pd.DatetimeIndex |
639
|
|
|
The dates requested by the loader. |
640
|
|
|
sparse_dates : pd.DatetimeIndex |
641
|
|
|
The dates that appeared in the dataset. |
642
|
|
|
asset_idx : tuple of int |
643
|
|
|
The index of the asset in the block. If this is a tuple, then this |
644
|
|
|
is treated as the first and last index to use. |
645
|
|
|
value : np.float64 |
646
|
|
|
The value to overwrite with. |
647
|
|
|
|
648
|
|
|
Returns |
649
|
|
|
------- |
650
|
|
|
overwrite : Float64Overwrite |
651
|
|
|
The overwrite that will apply the new value to the data. |
652
|
|
|
|
653
|
|
|
Notes |
654
|
|
|
----- |
655
|
|
|
This is forward-filling all dense dates that are between the asof_date date |
656
|
|
|
and the next sparse date after the asof_date. |
657
|
|
|
|
658
|
|
|
For example: |
659
|
|
|
let ``asof = pd.Timestamp('2014-01-02')``, |
660
|
|
|
``dense_dates = pd.date_range('2014-01-01', '2014-01-05')`` |
661
|
|
|
``sparse_dates = pd.to_datetime(['2014-01', '2014-02', '2014-04'])`` |
662
|
|
|
|
663
|
|
|
Then the overwrite will apply to indexes: 1, 2, 3, 4 |
664
|
|
|
""" |
665
|
|
|
first_row = dense_dates.searchsorted(asof) |
666
|
|
|
next_idx = sparse_dates.searchsorted(asof, 'right') |
667
|
|
|
if next_idx == len(sparse_dates): |
668
|
|
|
# There is no next date in the sparse, this overwrite should apply |
669
|
|
|
# through the end of the dense dates. |
670
|
|
|
last_row = len(dense_dates) - 1 |
671
|
|
|
else: |
672
|
|
|
# There is a next date in sparse dates. This means that the overwrite |
673
|
|
|
# should only apply until the index of this date in the dense dates. |
674
|
|
|
last_row = dense_dates.searchsorted(sparse_dates[next_idx]) - 1 |
675
|
|
|
|
676
|
|
|
if first_row > last_row: |
677
|
|
|
return |
678
|
|
|
|
679
|
|
|
first, last = asset_idx |
680
|
|
|
yield Float64Overwrite(first_row, last_row, first, last, value) |
681
|
|
|
|
682
|
|
|
|
683
|
|
|
def adjustments_from_deltas_no_sids(dates, |
684
|
|
|
dense_dates, |
685
|
|
|
column_idx, |
686
|
|
|
column_name, |
687
|
|
|
assets, |
688
|
|
|
deltas): |
689
|
|
|
"""Collect all the adjustments that occur in a dataset that does not |
690
|
|
|
have a sid column. |
691
|
|
|
|
692
|
|
|
Parameters |
693
|
|
|
---------- |
694
|
|
|
dates : pd.DatetimeIndex |
695
|
|
|
The dates requested by the loader. |
696
|
|
|
dense_dates : pd.DatetimeIndex |
697
|
|
|
The dates that were in the dense data. |
698
|
|
|
column_idx : int |
699
|
|
|
The index of the column in the dataset. |
700
|
|
|
column_name : str |
701
|
|
|
The name of the column to compute deltas for. |
702
|
|
|
deltas : pd.DataFrame |
703
|
|
|
The overwrites that should be applied to the dataset. |
704
|
|
|
|
705
|
|
|
Returns |
706
|
|
|
------- |
707
|
|
|
adjustments : dict[idx -> Float64Overwrite] |
708
|
|
|
The adjustments dictionary to feed to the adjusted array. |
709
|
|
|
""" |
710
|
|
|
ad_series = deltas[AD_FIELD_NAME] |
711
|
|
|
asset_idx = 0, len(assets) - 1 |
712
|
|
|
return { |
713
|
|
|
dates.get_loc(kd): overwrite_from_dates( |
714
|
|
|
ad_series.loc[kd], |
715
|
|
|
dates, |
716
|
|
|
dense_dates, |
717
|
|
|
asset_idx, |
718
|
|
|
v, |
719
|
|
|
) for kd, v in deltas[column_name].iteritems() |
720
|
|
|
} |
721
|
|
|
|
722
|
|
|
|
723
|
|
|
def adjustments_from_deltas_with_sids(dates, |
724
|
|
|
dense_dates, |
725
|
|
|
column_idx, |
726
|
|
|
column_name, |
727
|
|
|
assets, |
728
|
|
|
deltas): |
729
|
|
|
"""Collect all the adjustments that occur in a dataset that does not |
730
|
|
|
have a sid column. |
731
|
|
|
|
732
|
|
|
Parameters |
733
|
|
|
---------- |
734
|
|
|
dates : pd.DatetimeIndex |
735
|
|
|
The dates requested by the loader. |
736
|
|
|
dense_dates : pd.DatetimeIndex |
737
|
|
|
The dates that were in the dense data. |
738
|
|
|
column_idx : int |
739
|
|
|
The index of the column in the dataset. |
740
|
|
|
column_name : str |
741
|
|
|
The name of the column to compute deltas for. |
742
|
|
|
deltas : pd.DataFrame |
743
|
|
|
The overwrites that should be applied to the dataset. |
744
|
|
|
|
745
|
|
|
Returns |
746
|
|
|
------- |
747
|
|
|
adjustments : dict[idx -> Float64Overwrite] |
748
|
|
|
The adjustments dictionary to feed to the adjusted array. |
749
|
|
|
""" |
750
|
|
|
ad_series = deltas[AD_FIELD_NAME] |
751
|
|
|
adjustments = defaultdict(list) |
752
|
|
|
for sid_idx, (sid, per_sid) in enumerate(deltas[column_name].iteritems()): |
753
|
|
|
for kd, v in per_sid.iteritems(): |
754
|
|
|
adjustments[dates.searchsorted(kd)].extend( |
755
|
|
|
overwrite_from_dates( |
756
|
|
|
ad_series.loc[kd, sid], |
757
|
|
|
dates, |
758
|
|
|
dense_dates, |
759
|
|
|
(sid_idx, sid_idx), |
760
|
|
|
v, |
761
|
|
|
), |
762
|
|
|
) |
763
|
|
|
return dict(adjustments) # no subclasses of dict |
764
|
|
|
|
765
|
|
|
|
766
|
|
|
class BlazeLoader(dict): |
767
|
|
|
def __init__(self, colmap=None): |
768
|
|
|
self.update(colmap or {}) |
769
|
|
|
|
770
|
|
|
@classmethod |
771
|
|
|
@memoize(cache=WeakKeyDictionary()) |
772
|
|
|
def global_instance(cls): |
773
|
|
|
return cls() |
774
|
|
|
|
775
|
|
|
def __hash__(self): |
776
|
|
|
return id(self) |
777
|
|
|
|
778
|
|
|
def __call__(self, column): |
779
|
|
|
if column.dataset in self: |
780
|
|
|
return self |
781
|
|
|
raise KeyError(column) |
782
|
|
|
|
783
|
|
|
def load_adjusted_array(self, columns, dates, assets, mask): |
784
|
|
|
return dict( |
785
|
|
|
concat(map( |
786
|
|
|
partial(self._load_dataset, dates, assets, mask), |
787
|
|
|
itervalues(groupby(getdataset, columns)) |
788
|
|
|
)) |
789
|
|
|
) |
790
|
|
|
|
791
|
|
|
def _load_dataset(self, dates, assets, mask, columns): |
792
|
|
|
try: |
793
|
|
|
(dataset,) = set(map(getdataset, columns)) |
794
|
|
|
except ValueError: |
795
|
|
|
raise AssertionError('all columns must come from the same dataset') |
796
|
|
|
|
797
|
|
|
expr, deltas, resources = self[dataset] |
798
|
|
|
have_sids = SID_FIELD_NAME in expr.fields |
799
|
|
|
assets = list(map(int, assets)) # coerce from numpy.int64 |
800
|
|
|
fields = list(map(dataset_name, columns)) |
801
|
|
|
query_fields = fields + [AD_FIELD_NAME, TS_FIELD_NAME] + ( |
802
|
|
|
[SID_FIELD_NAME] if have_sids else [] |
803
|
|
|
) |
804
|
|
|
|
805
|
|
|
def where(e): |
806
|
|
|
"""Create the query to run against the resources. |
807
|
|
|
|
808
|
|
|
Parameters |
809
|
|
|
---------- |
810
|
|
|
e : Expr |
811
|
|
|
The baseline or deltas expression. |
812
|
|
|
|
813
|
|
|
Returns |
814
|
|
|
------- |
815
|
|
|
q : Expr |
816
|
|
|
The query to run. |
817
|
|
|
""" |
818
|
|
|
ts = e[TS_FIELD_NAME] |
819
|
|
|
# Hack to get the lower bound to query: |
820
|
|
|
# This must be strictly executed because the data for `ts` will |
821
|
|
|
# be removed from scope too early otherwise. |
822
|
|
|
lower = odo(ts[ts <= dates[0]].max(), pd.Timestamp) |
823
|
|
|
selection = ts <= dates[-1] |
824
|
|
|
if have_sids: |
825
|
|
|
selection &= e[SID_FIELD_NAME].isin(assets) |
826
|
|
|
if lower is not pd.NaT: |
827
|
|
|
selection &= ts >= lower |
828
|
|
|
|
829
|
|
|
return e[selection][query_fields] |
830
|
|
|
|
831
|
|
|
extra_kwargs = {'d': resources} if resources else {} |
832
|
|
|
materialized_expr = odo(where(expr), pd.DataFrame, **extra_kwargs) |
833
|
|
|
materialized_deltas = ( |
834
|
|
|
odo(where(deltas), pd.DataFrame, **extra_kwargs) |
835
|
|
|
if deltas is not None else |
836
|
|
|
pd.DataFrame(columns=query_fields) |
837
|
|
|
) |
838
|
|
|
|
839
|
|
|
# Inline the deltas that changed our most recently known value. |
840
|
|
|
# Also, we reindex by the dates to create a dense representation of |
841
|
|
|
# the data. |
842
|
|
|
sparse_output, non_novel_deltas = overwrite_novel_deltas( |
843
|
|
|
materialized_expr, |
844
|
|
|
materialized_deltas, |
845
|
|
|
dates, |
846
|
|
|
) |
847
|
|
|
sparse_output.drop(AD_FIELD_NAME, axis=1, inplace=True) |
848
|
|
|
|
849
|
|
|
if have_sids: |
850
|
|
|
# Unstack by the sid so that we get a multi-index on the columns |
851
|
|
|
# of datacolumn, sid. |
852
|
|
|
sparse_output = sparse_output.set_index( |
853
|
|
|
[TS_FIELD_NAME, SID_FIELD_NAME], |
854
|
|
|
).unstack() |
855
|
|
|
sparse_deltas = non_novel_deltas.set_index( |
856
|
|
|
[TS_FIELD_NAME, SID_FIELD_NAME], |
857
|
|
|
).unstack() |
858
|
|
|
|
859
|
|
|
dense_output = sparse_output.reindex(dates, method='ffill') |
860
|
|
|
cols = dense_output.columns |
861
|
|
|
dense_output = dense_output.reindex( |
862
|
|
|
columns=pd.MultiIndex.from_product( |
863
|
|
|
(cols.levels[0], assets), |
864
|
|
|
names=cols.names, |
865
|
|
|
), |
866
|
|
|
) |
867
|
|
|
|
868
|
|
|
adjustments_from_deltas = adjustments_from_deltas_with_sids |
869
|
|
|
column_view = identity |
870
|
|
|
else: |
871
|
|
|
# We use the column view to make an array per asset. |
872
|
|
|
column_view = compose( |
873
|
|
|
# We need to copy this because we need a concrete ndarray. |
874
|
|
|
# The `repeat_last_axis` call will give us a fancy strided |
875
|
|
|
# array which uses a buffer to represent `len(assets)` columns. |
876
|
|
|
# The engine puts nans at the indicies for which we do not have |
877
|
|
|
# sid information so that the nan-aware reductions still work. |
878
|
|
|
# A future change to the engine would be to add first class |
879
|
|
|
# support for macro econimic datasets. |
880
|
|
|
copy, |
881
|
|
|
partial(repeat_last_axis, count=len(assets)), |
882
|
|
|
) |
883
|
|
|
sparse_output = sparse_output.set_index(TS_FIELD_NAME) |
884
|
|
|
dense_output = sparse_output.reindex(dates, method='ffill') |
885
|
|
|
sparse_deltas = non_novel_deltas.set_index(TS_FIELD_NAME) |
886
|
|
|
adjustments_from_deltas = adjustments_from_deltas_no_sids |
887
|
|
|
|
888
|
|
|
for column_idx, column in enumerate(columns): |
889
|
|
|
column_name = column.name |
890
|
|
|
yield column, AdjustedArray( |
891
|
|
|
column_view( |
892
|
|
|
dense_output[column_name].values.astype(column.dtype), |
893
|
|
|
), |
894
|
|
|
mask, |
895
|
|
|
adjustments_from_deltas( |
896
|
|
|
dates, |
897
|
|
|
sparse_output.index, |
898
|
|
|
column_idx, |
899
|
|
|
column_name, |
900
|
|
|
assets, |
901
|
|
|
sparse_deltas, |
902
|
|
|
) |
903
|
|
|
) |
904
|
|
|
|
905
|
|
|
global_loader = BlazeLoader.global_instance() |
906
|
|
|
|
907
|
|
|
|
908
|
|
|
def bind_expression_to_resources(expr, resources): |
909
|
|
|
""" |
910
|
|
|
Bind a Blaze expression to resources. |
911
|
|
|
|
912
|
|
|
Parameters |
913
|
|
|
---------- |
914
|
|
|
expr : bz.Expr |
915
|
|
|
The expression to which we want to bind resources. |
916
|
|
|
resources : dict[bz.Symbol -> any] |
917
|
|
|
Mapping from the atomic terms of ``expr`` to actual data resources. |
918
|
|
|
|
919
|
|
|
Returns |
920
|
|
|
------- |
921
|
|
|
bound_expr : bz.Expr |
922
|
|
|
``expr`` with bound resources. |
923
|
|
|
""" |
924
|
|
|
# bind the resources into the expression |
925
|
|
|
if resources is None: |
926
|
|
|
resources = {} |
927
|
|
|
|
928
|
|
|
# _subs stands for substitute. It's not actually private, blaze just |
929
|
|
|
# prefixes symbol-manipulation methods with underscores to prevent |
930
|
|
|
# collisions with data column names. |
931
|
|
|
return expr._subs({ |
932
|
|
|
k: bz.Data(v, dshape=k.dshape) for k, v in iteritems(resources) |
933
|
|
|
}) |
934
|
|
|
|
935
|
|
|
|
936
|
|
|
def ffill_query_in_range(expr, |
937
|
|
|
lower, |
938
|
|
|
upper, |
939
|
|
|
odo_kwargs=None, |
940
|
|
|
ts_field=TS_FIELD_NAME, |
941
|
|
|
sid_field=SID_FIELD_NAME): |
942
|
|
|
"""Query a blaze expression in a given time range properly forward filling |
943
|
|
|
from values that fall before the lower date. |
944
|
|
|
|
945
|
|
|
Parameters |
946
|
|
|
---------- |
947
|
|
|
expr : Expr |
948
|
|
|
Bound blaze expression. |
949
|
|
|
lower : datetime |
950
|
|
|
The lower date to query for. |
951
|
|
|
upper : datetime |
952
|
|
|
The upper date to query for. |
953
|
|
|
odo_kwargs : dict, optional |
954
|
|
|
The extra keyword arguments to pass to ``odo``. |
955
|
|
|
ts_field : str, optional |
956
|
|
|
The name of the timestamp field in the given blaze expression. |
957
|
|
|
sid_field : str, optional |
958
|
|
|
The name of the sid field in the given blaze expression. |
959
|
|
|
|
960
|
|
|
Returns |
961
|
|
|
------- |
962
|
|
|
raw : pd.DataFrame |
963
|
|
|
A strict dataframe for the data in the given date range. This may |
964
|
|
|
start before the requested start date if a value is needed to ffill. |
965
|
|
|
""" |
966
|
|
|
odo_kwargs = odo_kwargs or {} |
967
|
|
|
filtered = expr[expr[ts_field] <= lower] |
968
|
|
|
computed_lower = odo( |
969
|
|
|
bz.by( |
970
|
|
|
filtered[sid_field], |
971
|
|
|
timestamp=filtered[ts_field].max(), |
972
|
|
|
).timestamp.min(), |
973
|
|
|
pd.Timestamp, |
974
|
|
|
**odo_kwargs |
975
|
|
|
) |
976
|
|
|
if pd.isnull(computed_lower): |
977
|
|
|
# If there is no lower date, just query for data in the date |
978
|
|
|
# range. It must all be null anyways. |
979
|
|
|
computed_lower = lower |
980
|
|
|
|
981
|
|
|
return odo( |
982
|
|
|
expr[ |
983
|
|
|
(expr[ts_field] >= computed_lower) & |
984
|
|
|
(expr[ts_field] <= upper) |
985
|
|
|
], |
986
|
|
|
pd.DataFrame, |
987
|
|
|
**odo_kwargs |
988
|
|
|
) |
989
|
|
|
|