1
|
|
|
from zipline.utils.input_validation import expect_types, optional |
2
|
|
|
|
3
|
|
|
from .term import Term, AssetExists |
4
|
|
|
from .filters import Filter |
5
|
|
|
from .graph import TermGraph |
6
|
|
|
|
7
|
|
|
|
8
|
|
|
class Pipeline(object): |
9
|
|
|
""" |
10
|
|
|
A Pipeline object represents a collection of named expressions to be |
11
|
|
|
compiled and executed by a PipelineEngine. |
12
|
|
|
|
13
|
|
|
A Pipeline has two important attributes: 'columns', a dictionary of named |
14
|
|
|
`Term` instances, and 'screen', a Filter representing criteria for |
15
|
|
|
including an asset in the results of a Pipeline. |
16
|
|
|
|
17
|
|
|
To compute a pipeline in the context of a TradingAlgorithm, users must call |
18
|
|
|
``attach_pipeline`` in their ``initialize`` function to register that the |
19
|
|
|
pipeline should be computed each trading day. The outputs of a pipeline on |
20
|
|
|
a given day can be accessed by calling ``pipeline_output`` in |
21
|
|
|
``handle_data`` or ``before_trading_start``. |
22
|
|
|
|
23
|
|
|
Parameters |
24
|
|
|
---------- |
25
|
|
|
columns : dict, optional |
26
|
|
|
Initial columns. |
27
|
|
|
screen : zipline.pipeline.term.Filter, optional |
28
|
|
|
Initial screen. |
29
|
|
|
""" |
30
|
|
|
__slots__ = ('_columns', '_screen', '__weakref__') |
31
|
|
|
|
32
|
|
|
@expect_types( |
33
|
|
|
columns=optional(dict), |
34
|
|
|
screen=optional(Filter), |
35
|
|
|
) |
36
|
|
|
def __init__(self, columns=None, screen=None): |
37
|
|
|
|
38
|
|
|
if columns is None: |
39
|
|
|
columns = {} |
40
|
|
|
self._columns = columns |
41
|
|
|
self._screen = screen |
42
|
|
|
|
43
|
|
|
@property |
44
|
|
|
def columns(self): |
45
|
|
|
""" |
46
|
|
|
The columns registered with this pipeline. |
47
|
|
|
""" |
48
|
|
|
return self._columns |
49
|
|
|
|
50
|
|
|
@property |
51
|
|
|
def screen(self): |
52
|
|
|
""" |
53
|
|
|
The screen applied to the rows of this pipeline. |
54
|
|
|
""" |
55
|
|
|
return self._screen |
56
|
|
|
|
57
|
|
|
@expect_types(term=Term, name=str) |
58
|
|
|
def add(self, term, name, overwrite=False): |
59
|
|
|
""" |
60
|
|
|
Add a column. |
61
|
|
|
|
62
|
|
|
The results of computing `term` will show up as a column in the |
63
|
|
|
DataFrame produced by running this pipeline. |
64
|
|
|
|
65
|
|
|
Parameters |
66
|
|
|
---------- |
67
|
|
|
column : zipline.pipeline.Term |
68
|
|
|
A Filter, Factor, or Classifier to add to the pipeline. |
69
|
|
|
name : str |
70
|
|
|
Name of the column to add. |
71
|
|
|
overwrite : bool |
72
|
|
|
Whether to overwrite the existing entry if we already have a column |
73
|
|
|
named `name`. |
74
|
|
|
""" |
75
|
|
|
columns = self.columns |
76
|
|
|
if name in columns: |
77
|
|
|
if overwrite: |
78
|
|
|
self.remove(name) |
79
|
|
|
else: |
80
|
|
|
raise KeyError("Column '{}' already exists.".format(name)) |
81
|
|
|
|
82
|
|
|
self._columns[name] = term |
83
|
|
|
|
84
|
|
|
@expect_types(name=str) |
85
|
|
|
def remove(self, name): |
86
|
|
|
""" |
87
|
|
|
Remove a column. |
88
|
|
|
|
89
|
|
|
Parameters |
90
|
|
|
---------- |
91
|
|
|
name : str |
92
|
|
|
The name of the column to remove. |
93
|
|
|
|
94
|
|
|
Raises |
95
|
|
|
------ |
96
|
|
|
KeyError |
97
|
|
|
If `name` is not in self.columns. |
98
|
|
|
|
99
|
|
|
Returns |
100
|
|
|
------- |
101
|
|
|
removed : zipline.pipeline.term.Term |
102
|
|
|
The removed term. |
103
|
|
|
""" |
104
|
|
|
return self.columns.pop(name) |
105
|
|
|
|
106
|
|
|
@expect_types(screen=Filter, overwrite=(bool, int)) |
107
|
|
|
def set_screen(self, screen, overwrite=False): |
108
|
|
|
""" |
109
|
|
|
Set a screen on this Pipeline. |
110
|
|
|
|
111
|
|
|
Parameters |
112
|
|
|
---------- |
113
|
|
|
filter : zipline.pipeline.Filter |
114
|
|
|
The filter to apply as a screen. |
115
|
|
|
overwrite : bool |
116
|
|
|
Whether to overwrite any existing screen. If overwrite is False |
117
|
|
|
and self.screen is not None, we raise an error. |
118
|
|
|
""" |
119
|
|
|
if self._screen is not None and not overwrite: |
120
|
|
|
raise ValueError( |
121
|
|
|
"set_screen() called with overwrite=False and screen already " |
122
|
|
|
"set.\n" |
123
|
|
|
"If you want to apply multiple filters as a screen use " |
124
|
|
|
"set_screen(filter1 & filter2 & ...).\n" |
125
|
|
|
"If you want to replace the previous screen with a new one, " |
126
|
|
|
"use set_screen(new_filter, overwrite=True)." |
127
|
|
|
) |
128
|
|
|
self._screen = screen |
129
|
|
|
|
130
|
|
|
def to_graph(self, screen_name, default_screen): |
131
|
|
|
""" |
132
|
|
|
Compile into a TermGraph. |
133
|
|
|
|
134
|
|
|
Parameters |
135
|
|
|
---------- |
136
|
|
|
screen_name : str |
137
|
|
|
Name to supply for self.screen. |
138
|
|
|
default_screen : zipline.pipeline.term.Term |
139
|
|
|
Term to use as a screen if self.screen is None. |
140
|
|
|
""" |
141
|
|
|
columns = self.columns.copy() |
142
|
|
|
screen = self.screen |
143
|
|
|
if screen is None: |
144
|
|
|
screen = default_screen |
145
|
|
|
columns[screen_name] = screen |
146
|
|
|
|
147
|
|
|
return TermGraph(columns) |
148
|
|
|
|
149
|
|
|
def show_graph(self, format='svg'): |
150
|
|
|
""" |
151
|
|
|
Render this Pipeline as a DAG. |
152
|
|
|
|
153
|
|
|
Parameters |
154
|
|
|
---------- |
155
|
|
|
format : str, optional |
156
|
|
|
Image format to render with. Default is 'svg'. |
157
|
|
|
""" |
158
|
|
|
g = self.to_graph('', AssetExists()) |
159
|
|
|
if format == 'svg': |
160
|
|
|
return g.svg |
161
|
|
|
elif format == 'png': |
162
|
|
|
return g.png |
163
|
|
|
else: |
164
|
|
|
return g.jpeg |
165
|
|
|
|