Total Complexity | 50 |
Total Lines | 250 |
Duplicated Lines | 0 % |
Complex classes like deepy.layers.NeuralLayer often do a lot of different things. To break such a class down, we need to identify a cohesive component within that class. A common approach to find such a component is to look for fields/methods that share the same prefixes, or suffixes.
Once you have determined the fields that belong together, you can apply the Extract Class refactoring. If the component makes sense as a sub-class, Extract Subclass is also a candidate, and is often faster.
1 | #!/usr/bin/env python |
||
15 | class NeuralLayer(object): |
||
16 | |||
17 | def __init__(self, name="unknown"): |
||
18 | """ |
||
19 | Create a neural layer. |
||
20 | """ |
||
21 | self.name = name |
||
22 | self.input_dim = 0 |
||
23 | self.input_dims = [0] |
||
24 | self.output_dim = 0 |
||
25 | self.output_dims= [0] |
||
26 | |||
27 | self._linked_block = None |
||
28 | |||
29 | self.initialized = False |
||
30 | self.updates = [] |
||
31 | self.training_updates = [] |
||
32 | self.free_parameters = [] |
||
33 | self.parameters = [] |
||
34 | self.training_monitors = [] |
||
35 | self.testing_monitors = [] |
||
36 | self._registered_monitors = set() |
||
37 | self._registered_updates = set() |
||
38 | self._registered_training_updates = set() |
||
39 | self.external_inputs = [] |
||
40 | self.external_targets = [] |
||
41 | self.parameter_count = 0 |
||
42 | self.epoch_callbacks = [] |
||
43 | self.training_callbacks = [] |
||
44 | self.testing_callbacks = [] |
||
45 | |||
46 | def initialize(self, input_dim=0, input_dims=None, no_prepare=False): |
||
47 | """ |
||
48 | Initialize the layer. |
||
49 | :param no_prepare: avoid calling preparation function |
||
50 | """ |
||
51 | if self.initialized: |
||
52 | return |
||
53 | # configure input dimensions |
||
54 | if input_dims: |
||
55 | self.input_dims = input_dims |
||
56 | self.input_dim = input_dims[0] |
||
57 | else: |
||
58 | self.input_dim = input_dim |
||
59 | self.input_dims = [input_dims] |
||
60 | # set default output dimension |
||
61 | if self.output_dim == 0: |
||
62 | self.output_dim = self.input_dim |
||
63 | self.initialized = True |
||
64 | # call prepare |
||
65 | if not no_prepare: |
||
66 | self.prepare() |
||
67 | return self |
||
68 | |||
69 | def compute(self, *inputs, **kwargs): |
||
70 | """ |
||
71 | Compute based on NeuralVariable. |
||
72 | :type inputs: list of NeuralVariable |
||
73 | :return: NeuralVariable |
||
74 | """ |
||
75 | from var import NeuralVariable |
||
76 | if type(inputs[0]) != NeuralVariable: |
||
77 | raise SystemError("The input of `compute` must be NeuralVar") |
||
78 | |||
79 | dims = [t.dim() for t in inputs] |
||
80 | if len(inputs) == 1: |
||
81 | self.initialize(input_dim=dims[0]) |
||
82 | else: |
||
83 | self.initialize(input_dims=dims) |
||
84 | # convert kwargs |
||
85 | train_kwargs, test_kwargs, _, _ = convert_to_theano_var(kwargs) |
||
86 | |||
87 | output = self.compute_tensor(*[t.tensor for t in inputs], **train_kwargs) |
||
88 | test_output = self.compute_test_tesnor(*[t.test_tensor for t in inputs], **test_kwargs) |
||
89 | |||
90 | if type(output) != list: |
||
91 | return NeuralVariable(output, test_output, self.output_dim) |
||
92 | else: |
||
93 | return [NeuralVariable(*item) for item in zip(self.output_dims, output, test_output)] |
||
94 | |||
95 | def prepare(self): |
||
96 | """ |
||
97 | Prepare function will be called after connected. |
||
98 | """ |
||
99 | return self.setup() |
||
100 | |||
101 | def setup(self): |
||
102 | """ |
||
103 | !!! DEPRECATED !!! |
||
104 | Setup function will be called after connected. |
||
105 | """ |
||
106 | pass |
||
107 | |||
108 | @neural_computation_prefer_tensor |
||
109 | def compute_tensor(self, *args, **kwargs): |
||
110 | """ |
||
111 | Compute with tensors in Theano. |
||
112 | """ |
||
113 | raise NotImplementedError("output function of '%s' is not implemented" % self.name) |
||
114 | |||
115 | @neural_computation_prefer_tensor |
||
116 | def compute_test_tesnor(self, *args, **kwargs): |
||
117 | """ |
||
118 | Compute with tensors in Theano in test time. |
||
119 | """ |
||
120 | return self.compute_tensor(*args, **kwargs) |
||
121 | |||
122 | def compute_flexible_tensor(self, x, test=False): |
||
123 | """ |
||
124 | Deprecated. |
||
125 | Compute with tensors in Theano, with a parameter to switch test or not. |
||
126 | """ |
||
127 | if test: |
||
128 | return self.compute_test_tesnor(x) |
||
129 | else: |
||
130 | return self.compute_tensor(x) |
||
131 | |||
132 | def belongs_to(self, block): |
||
133 | """ |
||
134 | Let the given block or network manage the parameters of this layer. |
||
135 | :param block: Block or NeuralNetwork |
||
136 | :return: NeuralLayer |
||
137 | """ |
||
138 | if self._linked_block: |
||
139 | raise SystemError("One layer can not belong to two blocks") |
||
140 | self._linked_block = block |
||
141 | block.register_layer(self) |
||
142 | return self |
||
143 | |||
144 | def register(self, *layers): |
||
145 | """ |
||
146 | Register inner layers. |
||
147 | """ |
||
148 | self.register_inner_layers(*layers) |
||
149 | |||
150 | def register_inner_layers(self, *layers): |
||
151 | for layer in layers: |
||
152 | self.register_parameters(*layer.parameters) |
||
153 | self.register_updates(*layer.updates) |
||
154 | self.register_training_updates(*layer.training_updates) |
||
155 | |||
156 | def register_parameters(self, *parameters): |
||
157 | """ |
||
158 | Register parameters. |
||
159 | """ |
||
160 | for param in parameters: |
||
161 | self.parameter_count += np.prod(param.get_value().shape) |
||
162 | self.parameters.extend(parameters) |
||
163 | |||
164 | def register_free_parameters(self, *free_parameters): |
||
165 | """ |
||
166 | Register free parameters, which means their value will not be learned by trainer. |
||
167 | """ |
||
168 | return self.free_parameters.extend(free_parameters) |
||
169 | |||
170 | def register_updates(self, *updates): |
||
171 | """ |
||
172 | Register updates that will be executed in each iteration. |
||
173 | """ |
||
174 | for key, node in updates: |
||
175 | if key not in self._registered_updates: |
||
176 | self.updates.append((key, node)) |
||
177 | self._registered_updates.add(key) |
||
178 | |||
179 | def register_training_updates(self, *updates): |
||
180 | """ |
||
181 | Register updates that will only be executed in training phase. |
||
182 | """ |
||
183 | for key, node in updates: |
||
184 | if key not in self._registered_training_updates: |
||
185 | self.training_updates.append((key, node)) |
||
186 | self._registered_training_updates.add(key) |
||
187 | |||
188 | def register_monitors(self, *monitors): |
||
189 | """ |
||
190 | Register monitors they should be tuple of name and Theano variable. |
||
191 | """ |
||
192 | for key, node in monitors: |
||
193 | if key not in self._registered_monitors: |
||
194 | self.training_monitors.append((key, node)) |
||
195 | self.testing_monitors.append((key, node)) |
||
196 | self._registered_monitors.add(key) |
||
197 | |||
198 | def register_external_inputs(self, *variables): |
||
199 | """ |
||
200 | Register external input variables. |
||
201 | """ |
||
202 | self.external_inputs.extend(variables) |
||
203 | |||
204 | def register_external_targets(self, *variables): |
||
205 | """ |
||
206 | Register extenal target variables. |
||
207 | """ |
||
208 | self.external_targets.extend(variables) |
||
209 | |||
210 | def register_training_callbacks(self, *callbacks): |
||
211 | """ |
||
212 | Register callback for each iteration in the training. |
||
213 | """ |
||
214 | self.training_callbacks.extend(callbacks) |
||
215 | |||
216 | def register_testing_callbacks(self, *callbacks): |
||
217 | """ |
||
218 | Register callback for each iteration in the testing. |
||
219 | """ |
||
220 | self.testing_callbacks.extend(callbacks) |
||
221 | |||
222 | def register_epoch_callbacks(self, *callbacks): |
||
223 | """ |
||
224 | Register callback which will be called after epoch finished. |
||
225 | """ |
||
226 | self.epoch_callbacks.extend(callbacks) |
||
227 | |||
228 | def create_weight(self, input_n=1, output_n=1, suffix="", initializer=None, shape=None): |
||
229 | if not shape: |
||
230 | shape = (input_n, output_n) |
||
231 | |||
232 | if not initializer: |
||
233 | initializer = UniformInitializer() |
||
234 | |||
235 | weight = theano.shared(initializer.sample(shape).astype(FLOATX), name='W_{}'.format(suffix)) |
||
236 | |||
237 | logging.info('create weight W_%s: %s', suffix, str(shape)) |
||
238 | return weight |
||
239 | |||
240 | def create_bias(self, output_n=1, suffix="", value=0., shape=None): |
||
241 | if not shape: |
||
242 | shape = (output_n, ) |
||
243 | bs = np.ones(shape) |
||
244 | bs *= value |
||
245 | bias = theano.shared(bs.astype(FLOATX), name='B_{}'.format(suffix)) |
||
246 | logging.info('create bias B_%s: %s', suffix, str(shape)) |
||
247 | return bias |
||
248 | |||
249 | def create_vector(self, n, name, dtype=FLOATX): |
||
250 | bs = np.zeros(n) |
||
251 | v = theano.shared(bs.astype(dtype), name='{}'.format(name)) |
||
252 | |||
253 | logging.info('create vector %s: %d', name, n) |
||
254 | return v |
||
255 | |||
256 | def create_matrix(self, m, n, name): |
||
257 | |||
258 | matrix = theano.shared(np.zeros((m, n)).astype(FLOATX), name=name) |
||
259 | |||
260 | logging.info('create matrix %s: %d x %d', name, m, n) |
||
261 | return matrix |
||
262 | |||
263 | def callback_forward_propagation(self): |
||
264 | pass |
||
265 | |||
266 |