Completed
Pull Request — master (#858)
by Eddie
02:03
created

zipline.gens.AlgorithmSimulator._process_snapshot()   F

Complexity

Conditions 34

Size

Total Lines 158

Duplication

Lines 0
Ratio 0 %
Metric Value
cc 34
dl 0
loc 158
rs 2

2 Methods

Rating   Name   Duplication   Size   Complexity  
A zipline.gens.AlgorithmSimulator._get_daily_message() 0 8 1
A zipline.gens.AlgorithmSimulator._get_minute_message() 0 14 2

How to fix   Long Method    Complexity   

Long Method

Small methods make your code easier to understand, in particular if combined with a good name. Besides, if your method is small, finding a good name is usually much easier.

For example, if you find yourself adding comments to a method's body, this is usually a good sign to extract the commented part to a new method, and use the comment as a starting point when coming up with a good name for this new method.

Commonly applied refactorings include:

Complexity

Complex classes like zipline.gens.AlgorithmSimulator._process_snapshot() often do a lot of different things. To break such a class down, we need to identify a cohesive component within that class. A common approach to find such a component is to look for fields/methods that share the same prefixes, or suffixes.

Once you have determined the fields that belong together, you can apply the Extract Class refactoring. If the component makes sense as a sub-class, Extract Subclass is also a candidate, and is often faster.

1
#
2
# Copyright 2015 Quantopian, Inc.
3
#
4
# Licensed under the Apache License, Version 2.0 (the "License");
5
# you may not use this file except in compliance with the License.
6
# You may obtain a copy of the License at
7
#
8
#     http://www.apache.org/licenses/LICENSE-2.0
9
#
10
# Unless required by applicable law or agreed to in writing, software
11
# distributed under the License is distributed on an "AS IS" BASIS,
12
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13
# See the License for the specific language governing permissions and
14
# limitations under the License.
15
from logbook import Logger, Processor
16
from pandas.tslib import normalize_date
17
from zipline.protocol import BarData
18
from zipline.utils.api_support import ZiplineAPI
19
20
from zipline.gens.sim_engine import (
21
    DATA_AVAILABLE,
22
    ONCE_A_DAY,
23
    CALC_DAILY_PERFORMANCE,
24
    CALC_MINUTE_PERFORMANCE
25
)
26
27
28
log = Logger('Trade Simulation')
29
30
31
class AlgorithmSimulator(object):
32
33
    EMISSION_TO_PERF_KEY_MAP = {
34
        'minute': 'minute_perf',
35
        'daily': 'daily_perf'
36
    }
37
38
    def __init__(self, algo, sim_params, data_portal, clock, benchmark_source):
39
40
        # ==============
41
        # Simulation
42
        # Param Setup
43
        # ==============
44
        self.sim_params = sim_params
45
        self.env = algo.trading_environment
46
        self.data_portal = data_portal
47
48
        # ==============
49
        # Algo Setup
50
        # ==============
51
        self.algo = algo
52
        self.algo_start = normalize_date(self.sim_params.first_open)
53
54
        # ==============
55
        # Snapshot Setup
56
        # ==============
57
58
        # The algorithm's data as of our most recent event.
59
        # We want an object that will have empty objects as default
60
        # values on missing keys.
61
        self.current_data = BarData(data_portal=self.data_portal)
62
63
        # We don't have a datetime for the current snapshot until we
64
        # receive a message.
65
        self.simulation_dt = None
66
67
        self.clock = clock
68
69
        self.benchmark_source = benchmark_source
70
71
        # =============
72
        # Logging Setup
73
        # =============
74
75
        # Processor function for injecting the algo_dt into
76
        # user prints/logs.
77
        def inject_algo_dt(record):
78
            if 'algo_dt' not in record.extra:
79
                record.extra['algo_dt'] = self.simulation_dt
80
        self.processor = Processor(inject_algo_dt)
81
82
    def transform(self):
83
        """
84
        Main generator work loop.
85
        """
86
        algo = self.algo
87
        algo.data_portal = self.data_portal
88
        handle_data = algo.event_manager.handle_data
89
        current_data = self.current_data
90
91
        data_portal = self.data_portal
92
93
        # can't cache a pointer to algo.perf_tracker because we're not
94
        # guaranteed that the algo doesn't swap out perf trackers during
95
        # its lifetime.
96
        # likewise, we can't cache a pointer to the blotter.
97
98
        algo.perf_tracker.position_tracker.data_portal = data_portal
99
100
        def inner_loop(dt_to_use):
101
            # called every tick (minute or day).
102
103
            data_portal.current_dt = dt_to_use
104
            self.simulation_dt = dt_to_use
105
            algo.on_dt_changed(dt_to_use)
106
107
            blotter = algo.blotter
108
            perf_tracker = algo.perf_tracker
109
110
            # handle any transactions and commissions coming out new orders
111
            # placed in the last bar
112
            new_transactions, new_commissions = \
113
                blotter.get_transactions(data_portal)
114
115
            for transaction in new_transactions:
116
                perf_tracker.process_transaction(transaction)
117
118
                # since this order was modified, record it
119
                order = blotter.orders[transaction.order_id]
120
                perf_tracker.process_order(order)
121
122
            if new_commissions:
123
                for commission in new_commissions:
124
                    perf_tracker.process_commission(commission)
125
126
            handle_data(algo, current_data, dt_to_use)
127
128
            # grab any new orders from the blotter, then clear the list.
129
            # this includes cancelled orders.
130
            new_orders = blotter.new_orders
131
            blotter.new_orders = []
132
133
            # if we have any new orders, record them so that we know
134
            # in what perf period they were placed.
135
            if new_orders:
136
                for new_order in new_orders:
137
                    perf_tracker.process_order(new_order)
138
139
        def once_a_day(midnight_dt):
140
            # set all the timestamps
141
            self.simulation_dt = midnight_dt
142
            algo.on_dt_changed(midnight_dt)
143
            data_portal.current_day = midnight_dt
144
145
            # call before trading start
146
            algo.before_trading_start(current_data)
147
148
            perf_tracker = algo.perf_tracker
149
150
            # handle any splits that impact any positions or any open orders.
151
            sids_we_care_about = \
152
                list(set(list(perf_tracker.position_tracker.positions.keys()) +
153
                         list(algo.blotter.open_orders.keys())))
154
155
            if len(sids_we_care_about) > 0:
156
                splits = data_portal.get_splits(sids_we_care_about,
157
                                                midnight_dt)
158
                if len(splits) > 0:
159
                    algo.blotter.process_splits(splits)
160
                    perf_tracker.position_tracker.handle_splits(splits)
161
162
        with self.processor, ZiplineAPI(self.algo):
163
            for dt, action in self.clock:
164
                if action == DATA_AVAILABLE:
165
                    inner_loop(dt)
166
                elif action == ONCE_A_DAY:
167
                    once_a_day(dt)
168
                elif action == CALC_DAILY_PERFORMANCE:
169
                    algo.perf_tracker.all_benchmark_returns[dt] = \
170
                        self.benchmark_source.get_value(dt)
171
                    yield self._get_daily_message(dt, algo, algo.perf_tracker)
172
                elif action == CALC_MINUTE_PERFORMANCE:
173
                    algo.perf_tracker.all_benchmark_returns[dt] = \
174
                        self.benchmark_source.get_value(dt)
175
                    minute_msg, daily_msg = \
176
                        self._get_minute_message(dt, algo, algo.perf_tracker)
177
178
                    yield minute_msg
179
180
                    if daily_msg:
181
                        yield daily_msg
182
183
        risk_message = algo.perf_tracker.handle_simulation_end()
184
        yield risk_message
185
186
    @staticmethod
187
    def _get_daily_message(dt, algo, perf_tracker):
188
        """
189
        Get a perf message for the given datetime.
190
        """
191
        perf_message = perf_tracker.handle_market_close_daily(dt)
192
        perf_message['daily_perf']['recorded_vars'] = algo.recorded_vars
193
        return perf_message
194
195
    @staticmethod
196
    def _get_minute_message(dt, algo, perf_tracker):
197
        """
198
        Get a perf message for the given datetime.
199
        """
200
        rvars = algo.recorded_vars
201
202
        minute_message, daily_message = perf_tracker.handle_minute_close(dt)
203
        minute_message['minute_perf']['recorded_vars'] = rvars
204
205
        if daily_message:
206
            daily_message["daily_perf"]["recorded_vars"] = rvars
207
208
        return minute_message, daily_message
209