Completed
Pull Request — master (#836)
by
unknown
01:28
created

zipline.pipeline.Pipeline.screen()   A

Complexity

Conditions 1

Size

Total Lines 6

Duplication

Lines 0
Ratio 0 %
Metric Value
cc 1
dl 0
loc 6
rs 9.4286
1
from zipline.utils.input_validation import expect_types, optional
2
3
from .term import Term, AssetExists
4
from .filters import Filter
5
from .graph import TermGraph
6
7
8
class Pipeline(object):
9
    """
10
    A Pipeline object represents a collection of named expressions to be
11
    compiled and executed by a PipelineEngine.
12
13
    A Pipeline has two important attributes: 'columns', a dictionary of named
14
    `Term` instances, and 'screen', a Filter representing criteria for
15
    including an asset in the results of a Pipeline.
16
17
    To compute a pipeline in the context of a TradingAlgorithm, users must call
18
    ``attach_pipeline`` in their ``initialize`` function to register that the
19
    pipeline should be computed each trading day.  The outputs of a pipeline on
20
    a given day can be accessed by calling ``pipeline_output`` in
21
    ``handle_data`` or ``before_trading_start``.
22
23
    Parameters
24
    ----------
25
    columns : dict, optional
26
        Initial columns.
27
    screen : zipline.pipeline.term.Filter, optional
28
        Initial screen.
29
    """
30
    __slots__ = ('_columns', '_screen', '__weakref__')
31
32
    @expect_types(
33
        columns=optional(dict),
34
        screen=optional(Filter),
35
    )
36
    def __init__(self, columns=None, screen=None):
37
38
        if columns is None:
39
            columns = {}
40
        self._columns = columns
41
        self._screen = screen
42
43
    @property
44
    def columns(self):
45
        """
46
        The columns registered with this pipeline.
47
        """
48
        return self._columns
49
50
    @property
51
    def screen(self):
52
        """
53
        The screen applied to the rows of this pipeline.
54
        """
55
        return self._screen
56
57
    @expect_types(term=Term, name=str)
58
    def add(self, term, name, overwrite=False):
59
        """
60
        Add a column.
61
62
        The results of computing `term` will show up as a column in the
63
        DataFrame produced by running this pipeline.
64
65
        Parameters
66
        ----------
67
        column : zipline.pipeline.Term
68
            A Filter, Factor, or Classifier to add to the pipeline.
69
        name : str
70
            Name of the column to add.
71
        overwrite : bool
72
            Whether to overwrite the existing entry if we already have a column
73
            named `name`.
74
        """
75
        columns = self.columns
76
        if name in columns:
77
            if overwrite:
78
                self.remove(name)
79
            else:
80
                raise KeyError("Column '{}' already exists.".format(name))
81
82
        self._columns[name] = term
83
84
    @expect_types(name=str)
85
    def remove(self, name):
86
        """
87
        Remove a column.
88
89
        Parameters
90
        ----------
91
        name : str
92
            The name of the column to remove.
93
94
        Raises
95
        ------
96
        KeyError
97
            If `name` is not in self.columns.
98
99
        Returns
100
        -------
101
        removed : zipline.pipeline.term.Term
102
            The removed term.
103
        """
104
        return self.columns.pop(name)
105
106
    @expect_types(screen=Filter, overwrite=(bool, int))
107
    def set_screen(self, screen, overwrite=False):
108
        """
109
        Set a screen on this Pipeline.
110
111
        Parameters
112
        ----------
113
        filter : zipline.pipeline.Filter
114
            The filter to apply as a screen.
115
        overwrite : bool
116
            Whether to overwrite any existing screen.  If overwrite is False
117
            and self.screen is not None, we raise an error.
118
        """
119
        if self._screen is not None and not overwrite:
120
            raise ValueError(
121
                "set_screen() called with overwrite=False and screen already "
122
                "set.\n"
123
                "If you want to apply multiple filters as a screen use "
124
                "set_screen(filter1 & filter2 & ...).\n"
125
                "If you want to replace the previous screen with a new one, "
126
                "use set_screen(new_filter, overwrite=True)."
127
            )
128
        self._screen = screen
129
130
    def to_graph(self, screen_name, default_screen):
131
        """
132
        Compile into a TermGraph.
133
134
        Parameters
135
        ----------
136
        screen_name : str
137
            Name to supply for self.screen.
138
        default_screen : zipline.pipeline.term.Term
139
            Term to use as a screen if self.screen is None.
140
        """
141
        columns = self.columns.copy()
142
        screen = self.screen
143
        if screen is None:
144
            screen = default_screen
145
        columns[screen_name] = screen
146
147
        return TermGraph(columns)
148
149
    def show_graph(self, format='svg'):
150
        """
151
        Render this Pipeline as a DAG.
152
153
        Parameters
154
        ----------
155
        format : str, optional
156
            Image format to render with.  Default is 'svg'.
157
        """
158
        g = self.to_graph('', AssetExists())
159
        if format == 'svg':
160
            return g.svg
161
        elif format == 'png':
162
            return g.png
163
        else:
164
            return g.jpeg
165