| Total Complexity | 50 |
| Total Lines | 250 |
| Duplicated Lines | 0 % |
Complex classes like deepy.layers.NeuralLayer often do a lot of different things. To break such a class down, we need to identify a cohesive component within that class. A common approach to find such a component is to look for fields/methods that share the same prefixes, or suffixes.
Once you have determined the fields that belong together, you can apply the Extract Class refactoring. If the component makes sense as a sub-class, Extract Subclass is also a candidate, and is often faster.
| 1 | #!/usr/bin/env python |
||
| 15 | class NeuralLayer(object): |
||
| 16 | |||
| 17 | def __init__(self, name="unknown"): |
||
| 18 | """ |
||
| 19 | Create a neural layer. |
||
| 20 | """ |
||
| 21 | self.name = name |
||
| 22 | self.input_dim = 0 |
||
| 23 | self.input_dims = [0] |
||
| 24 | self.output_dim = 0 |
||
| 25 | self.output_dims= [0] |
||
| 26 | |||
| 27 | self._linked_block = None |
||
| 28 | |||
| 29 | self.initialized = False |
||
| 30 | self.updates = [] |
||
| 31 | self.training_updates = [] |
||
| 32 | self.free_parameters = [] |
||
| 33 | self.parameters = [] |
||
| 34 | self.training_monitors = [] |
||
| 35 | self.testing_monitors = [] |
||
| 36 | self._registered_monitors = set() |
||
| 37 | self._registered_updates = set() |
||
| 38 | self._registered_training_updates = set() |
||
| 39 | self.external_inputs = [] |
||
| 40 | self.external_targets = [] |
||
| 41 | self.parameter_count = 0 |
||
| 42 | self.epoch_callbacks = [] |
||
| 43 | self.training_callbacks = [] |
||
| 44 | self.testing_callbacks = [] |
||
| 45 | |||
| 46 | def initialize(self, input_dim=0, input_dims=None, no_prepare=False): |
||
| 47 | """ |
||
| 48 | Initialize the layer. |
||
| 49 | :param no_prepare: avoid calling preparation function |
||
| 50 | """ |
||
| 51 | if self.initialized: |
||
| 52 | return |
||
| 53 | # configure input dimensions |
||
| 54 | if input_dims: |
||
| 55 | self.input_dims = input_dims |
||
| 56 | self.input_dim = input_dims[0] |
||
| 57 | else: |
||
| 58 | self.input_dim = input_dim |
||
| 59 | self.input_dims = [input_dims] |
||
| 60 | # set default output dimension |
||
| 61 | if self.output_dim == 0: |
||
| 62 | self.output_dim = self.input_dim |
||
| 63 | self.initialized = True |
||
| 64 | # call prepare |
||
| 65 | if not no_prepare: |
||
| 66 | self.prepare() |
||
| 67 | return self |
||
| 68 | |||
| 69 | def compute(self, *inputs, **kwargs): |
||
| 70 | """ |
||
| 71 | Compute based on NeuralVariable. |
||
| 72 | :type inputs: list of NeuralVariable |
||
| 73 | :return: NeuralVariable |
||
| 74 | """ |
||
| 75 | from var import NeuralVariable |
||
| 76 | if type(inputs[0]) != NeuralVariable: |
||
| 77 | raise SystemError("The input of `compute` must be NeuralVar") |
||
| 78 | |||
| 79 | dims = [t.dim() for t in inputs] |
||
| 80 | if len(inputs) == 1: |
||
| 81 | self.initialize(input_dim=dims[0]) |
||
| 82 | else: |
||
| 83 | self.initialize(input_dims=dims) |
||
| 84 | # convert kwargs |
||
| 85 | train_kwargs, test_kwargs, _, _ = convert_to_theano_var(kwargs) |
||
| 86 | |||
| 87 | output = self.compute_tensor(*[t.tensor for t in inputs], **train_kwargs) |
||
| 88 | test_output = self.compute_test_tesnor(*[t.test_tensor for t in inputs], **test_kwargs) |
||
| 89 | |||
| 90 | if type(output) != list: |
||
| 91 | return NeuralVariable(output, test_output, self.output_dim) |
||
| 92 | else: |
||
| 93 | return [NeuralVariable(*item) for item in zip(self.output_dims, output, test_output)] |
||
| 94 | |||
| 95 | def prepare(self): |
||
| 96 | """ |
||
| 97 | Prepare function will be called after connected. |
||
| 98 | """ |
||
| 99 | return self.setup() |
||
| 100 | |||
| 101 | def setup(self): |
||
| 102 | """ |
||
| 103 | !!! DEPRECATED !!! |
||
| 104 | Setup function will be called after connected. |
||
| 105 | """ |
||
| 106 | pass |
||
| 107 | |||
| 108 | @neural_computation_prefer_tensor |
||
| 109 | def compute_tensor(self, *args, **kwargs): |
||
| 110 | """ |
||
| 111 | Compute with tensors in Theano. |
||
| 112 | """ |
||
| 113 | raise NotImplementedError("output function of '%s' is not implemented" % self.name) |
||
| 114 | |||
| 115 | @neural_computation_prefer_tensor |
||
| 116 | def compute_test_tesnor(self, *args, **kwargs): |
||
| 117 | """ |
||
| 118 | Compute with tensors in Theano in test time. |
||
| 119 | """ |
||
| 120 | return self.compute_tensor(*args, **kwargs) |
||
| 121 | |||
| 122 | def compute_flexible_tensor(self, x, test=False): |
||
| 123 | """ |
||
| 124 | Deprecated. |
||
| 125 | Compute with tensors in Theano, with a parameter to switch test or not. |
||
| 126 | """ |
||
| 127 | if test: |
||
| 128 | return self.compute_test_tesnor(x) |
||
| 129 | else: |
||
| 130 | return self.compute_tensor(x) |
||
| 131 | |||
| 132 | def belongs_to(self, block): |
||
| 133 | """ |
||
| 134 | Let the given block or network manage the parameters of this layer. |
||
| 135 | :param block: Block or NeuralNetwork |
||
| 136 | :return: NeuralLayer |
||
| 137 | """ |
||
| 138 | if self._linked_block: |
||
| 139 | raise SystemError("One layer can not belong to two blocks") |
||
| 140 | self._linked_block = block |
||
| 141 | block.register_layer(self) |
||
| 142 | return self |
||
| 143 | |||
| 144 | def register(self, *layers): |
||
| 145 | """ |
||
| 146 | Register inner layers. |
||
| 147 | """ |
||
| 148 | self.register_inner_layers(*layers) |
||
| 149 | |||
| 150 | def register_inner_layers(self, *layers): |
||
| 151 | for layer in layers: |
||
| 152 | self.register_parameters(*layer.parameters) |
||
| 153 | self.register_updates(*layer.updates) |
||
| 154 | self.register_training_updates(*layer.training_updates) |
||
| 155 | |||
| 156 | def register_parameters(self, *parameters): |
||
| 157 | """ |
||
| 158 | Register parameters. |
||
| 159 | """ |
||
| 160 | for param in parameters: |
||
| 161 | self.parameter_count += np.prod(param.get_value().shape) |
||
| 162 | self.parameters.extend(parameters) |
||
| 163 | |||
| 164 | def register_free_parameters(self, *free_parameters): |
||
| 165 | """ |
||
| 166 | Register free parameters, which means their value will not be learned by trainer. |
||
| 167 | """ |
||
| 168 | return self.free_parameters.extend(free_parameters) |
||
| 169 | |||
| 170 | def register_updates(self, *updates): |
||
| 171 | """ |
||
| 172 | Register updates that will be executed in each iteration. |
||
| 173 | """ |
||
| 174 | for key, node in updates: |
||
| 175 | if key not in self._registered_updates: |
||
| 176 | self.updates.append((key, node)) |
||
| 177 | self._registered_updates.add(key) |
||
| 178 | |||
| 179 | def register_training_updates(self, *updates): |
||
| 180 | """ |
||
| 181 | Register updates that will only be executed in training phase. |
||
| 182 | """ |
||
| 183 | for key, node in updates: |
||
| 184 | if key not in self._registered_training_updates: |
||
| 185 | self.training_updates.append((key, node)) |
||
| 186 | self._registered_training_updates.add(key) |
||
| 187 | |||
| 188 | def register_monitors(self, *monitors): |
||
| 189 | """ |
||
| 190 | Register monitors they should be tuple of name and Theano variable. |
||
| 191 | """ |
||
| 192 | for key, node in monitors: |
||
| 193 | if key not in self._registered_monitors: |
||
| 194 | self.training_monitors.append((key, node)) |
||
| 195 | self.testing_monitors.append((key, node)) |
||
| 196 | self._registered_monitors.add(key) |
||
| 197 | |||
| 198 | def register_external_inputs(self, *variables): |
||
| 199 | """ |
||
| 200 | Register external input variables. |
||
| 201 | """ |
||
| 202 | self.external_inputs.extend(variables) |
||
| 203 | |||
| 204 | def register_external_targets(self, *variables): |
||
| 205 | """ |
||
| 206 | Register extenal target variables. |
||
| 207 | """ |
||
| 208 | self.external_targets.extend(variables) |
||
| 209 | |||
| 210 | def register_training_callbacks(self, *callbacks): |
||
| 211 | """ |
||
| 212 | Register callback for each iteration in the training. |
||
| 213 | """ |
||
| 214 | self.training_callbacks.extend(callbacks) |
||
| 215 | |||
| 216 | def register_testing_callbacks(self, *callbacks): |
||
| 217 | """ |
||
| 218 | Register callback for each iteration in the testing. |
||
| 219 | """ |
||
| 220 | self.testing_callbacks.extend(callbacks) |
||
| 221 | |||
| 222 | def register_epoch_callbacks(self, *callbacks): |
||
| 223 | """ |
||
| 224 | Register callback which will be called after epoch finished. |
||
| 225 | """ |
||
| 226 | self.epoch_callbacks.extend(callbacks) |
||
| 227 | |||
| 228 | def create_weight(self, input_n=1, output_n=1, suffix="", initializer=None, shape=None): |
||
| 229 | if not shape: |
||
| 230 | shape = (input_n, output_n) |
||
| 231 | |||
| 232 | if not initializer: |
||
| 233 | initializer = UniformInitializer() |
||
| 234 | |||
| 235 | weight = theano.shared(initializer.sample(shape).astype(FLOATX), name='W_{}'.format(suffix)) |
||
| 236 | |||
| 237 | logging.info('create weight W_%s: %s', suffix, str(shape)) |
||
| 238 | return weight |
||
| 239 | |||
| 240 | def create_bias(self, output_n=1, suffix="", value=0., shape=None): |
||
| 241 | if not shape: |
||
| 242 | shape = (output_n, ) |
||
| 243 | bs = np.ones(shape) |
||
| 244 | bs *= value |
||
| 245 | bias = theano.shared(bs.astype(FLOATX), name='B_{}'.format(suffix)) |
||
| 246 | logging.info('create bias B_%s: %s', suffix, str(shape)) |
||
| 247 | return bias |
||
| 248 | |||
| 249 | def create_vector(self, n, name, dtype=FLOATX): |
||
| 250 | bs = np.zeros(n) |
||
| 251 | v = theano.shared(bs.astype(dtype), name='{}'.format(name)) |
||
| 252 | |||
| 253 | logging.info('create vector %s: %d', name, n) |
||
| 254 | return v |
||
| 255 | |||
| 256 | def create_matrix(self, m, n, name): |
||
| 257 | |||
| 258 | matrix = theano.shared(np.zeros((m, n)).astype(FLOATX), name=name) |
||
| 259 | |||
| 260 | logging.info('create matrix %s: %d x %d', name, m, n) |
||
| 261 | return matrix |
||
| 262 | |||
| 263 | def callback_forward_propagation(self): |
||
| 264 | pass |
||
| 265 | |||
| 266 |