| Total Complexity | 52 |
| Total Lines | 252 |
| Duplicated Lines | 0 % |
Complex classes like deepy.layers.NeuralLayer often do a lot of different things. To break such a class down, we need to identify a cohesive component within that class. A common approach to find such a component is to look for fields/methods that share the same prefixes, or suffixes.
Once you have determined the fields that belong together, you can apply the Extract Class refactoring. If the component makes sense as a sub-class, Extract Subclass is also a candidate, and is often faster.
| 1 | #!/usr/bin/env python |
||
| 14 | class NeuralLayer(object): |
||
| 15 | |||
| 16 | def __init__(self, name="unknown"): |
||
| 17 | """ |
||
| 18 | Create a neural layer. |
||
| 19 | """ |
||
| 20 | self.name = name |
||
| 21 | self.input_dim = 0 |
||
| 22 | self.input_dims = [0] |
||
| 23 | self.output_dim = 0 |
||
| 24 | |||
| 25 | self._linked_block = None |
||
| 26 | self._linked = False |
||
| 27 | |||
| 28 | self.connected = False |
||
| 29 | self.updates = [] |
||
| 30 | self.training_updates = [] |
||
| 31 | self.free_parameters = [] |
||
| 32 | self.parameters = [] |
||
| 33 | self.training_monitors = [] |
||
| 34 | self.testing_monitors = [] |
||
| 35 | self._registered_monitors = set() |
||
| 36 | self._registered_updates = set() |
||
| 37 | self._registered_training_updates = set() |
||
| 38 | self.external_inputs = [] |
||
| 39 | self.external_targets = [] |
||
| 40 | self.parameter_count = 0 |
||
| 41 | self.epoch_callbacks = [] |
||
| 42 | self.training_callbacks = [] |
||
| 43 | self.testing_callbacks = [] |
||
| 44 | |||
| 45 | def connect(self, input_dim=0, input_dims=None, previous_layer=None, network_config=None, no_prepare=False): |
||
| 46 | """ |
||
| 47 | Connect to a previous layer. |
||
| 48 | :param no_prepare: if avoid calling setup |
||
| 49 | """ |
||
| 50 | # configure input dimensions |
||
| 51 | if input_dims: |
||
| 52 | self.input_dims = input_dims |
||
| 53 | self.input_dim = input_dims[0] |
||
| 54 | else: |
||
| 55 | self.input_dim = input_dim |
||
| 56 | self.input_dims = [input_dims] |
||
| 57 | # set default output dimension |
||
| 58 | if self.output_dim == 0: |
||
| 59 | self.output_dim = self.input_dim |
||
| 60 | self.previous_layer = previous_layer |
||
| 61 | self.network_config = network_config |
||
| 62 | self.connected = True |
||
| 63 | # call prepare |
||
| 64 | if not no_prepare: |
||
| 65 | self.prepare() |
||
| 66 | return self |
||
| 67 | |||
| 68 | def compute_raw(self, inputs, dims): |
||
| 69 | """ |
||
| 70 | Compute on raw Theano tensors. |
||
| 71 | :type inputs: list of TensorLayer |
||
| 72 | :type dims: list of int |
||
| 73 | """ |
||
| 74 | from variable import NeuralVar |
||
| 75 | tensors = [NeuralVar(d, t) for d, t in zip(dims, inputs)] |
||
| 76 | return self.compute(*tensors) |
||
| 77 | |||
| 78 | def compute(self, *inputs): |
||
| 79 | """ |
||
| 80 | Take a TensorLayer or tensor as input, compute the result. |
||
| 81 | Dimension must be given is the input is a tensor. |
||
| 82 | :type inputs: list of TensorLayer |
||
| 83 | :return: TensorLayer |
||
| 84 | """ |
||
| 85 | from variable import NeuralVar |
||
| 86 | if type(inputs[0]) != NeuralVar: |
||
| 87 | raise SystemError("The input of `compute` must be TensorLayer") |
||
| 88 | |||
| 89 | dims = [t.dim() for t in inputs] |
||
| 90 | if len(inputs) == 1: |
||
| 91 | self.connect(input_dim=dims[0]) |
||
| 92 | else: |
||
| 93 | self.connect(input_dims=dims) |
||
| 94 | if self._linked_block and not self._linked: |
||
| 95 | self._linked = True |
||
| 96 | self._linked_block.register_layer(self) |
||
| 97 | return NeuralVar(self.output_dim, self.output(*[t.tensor for t in inputs]), self.test_output(*[t.test_tensor for t in inputs])) |
||
| 98 | |||
| 99 | def prepare(self): |
||
| 100 | """ |
||
| 101 | Prepare function will be called after connected. |
||
| 102 | """ |
||
| 103 | return self.setup() |
||
| 104 | |||
| 105 | def setup(self): |
||
| 106 | """ |
||
| 107 | !!! DEPRECATED !!! |
||
| 108 | Setup function will be called after connected. |
||
| 109 | """ |
||
| 110 | pass |
||
| 111 | |||
| 112 | def output(self, x): |
||
| 113 | """ |
||
| 114 | Output function. |
||
| 115 | """ |
||
| 116 | raise NotImplementedError("output function of '%s' is not implemented" % self.name) |
||
| 117 | |||
| 118 | def test_output(self, x): |
||
| 119 | """ |
||
| 120 | Output function in test time. |
||
| 121 | """ |
||
| 122 | return self.output(x) |
||
| 123 | |||
| 124 | def call(self, x, test=False): |
||
| 125 | """ |
||
| 126 | Call this layer, with a parameter to switch test or not. |
||
| 127 | """ |
||
| 128 | if test: |
||
| 129 | return self.test_output(x) |
||
| 130 | else: |
||
| 131 | return self.output(x) |
||
| 132 | |||
| 133 | def link(self, block): |
||
| 134 | """ |
||
| 135 | Let the given block or network manage the parameters of this layer. |
||
| 136 | :param block: Block or NeuralNetwork |
||
| 137 | :return: NeuralLayer |
||
| 138 | """ |
||
| 139 | if self._linked_block: |
||
| 140 | raise SystemError("One layer can not be linked twice") |
||
| 141 | self._linked_block = block |
||
| 142 | if self.connected: |
||
| 143 | self._linked = True |
||
| 144 | block.register_layer(self) |
||
| 145 | return self |
||
| 146 | |||
| 147 | def register(self, *layers): |
||
| 148 | """ |
||
| 149 | Register inner layers. |
||
| 150 | """ |
||
| 151 | self.register_inner_layers(*layers) |
||
| 152 | |||
| 153 | def register_inner_layers(self, *layers): |
||
| 154 | for layer in layers: |
||
| 155 | self.register_parameters(*layer.parameters) |
||
| 156 | |||
| 157 | def register_parameters(self, *parameters): |
||
| 158 | """ |
||
| 159 | Register parameters. |
||
| 160 | """ |
||
| 161 | for param in parameters: |
||
| 162 | self.parameter_count += np.prod(param.get_value().shape) |
||
| 163 | self.parameters.extend(parameters) |
||
| 164 | |||
| 165 | def register_free_parameters(self, *free_parameters): |
||
| 166 | """ |
||
| 167 | Register free parameters, which means their value will not be learned by trainer. |
||
| 168 | """ |
||
| 169 | return self.free_parameters.extend(free_parameters) |
||
| 170 | |||
| 171 | def register_updates(self, *updates): |
||
| 172 | """ |
||
| 173 | Register updates that will be executed in each iteration. |
||
| 174 | """ |
||
| 175 | for key, node in updates: |
||
| 176 | if key not in self._registered_updates: |
||
| 177 | self.updates.append((key, node)) |
||
| 178 | self._registered_updates.add(key) |
||
| 179 | |||
| 180 | def register_training_updates(self, *updates): |
||
| 181 | """ |
||
| 182 | Register updates that will only be executed in training phase. |
||
| 183 | """ |
||
| 184 | for key, node in updates: |
||
| 185 | if key not in self._registered_training_updates: |
||
| 186 | self.training_updates.append((key, node)) |
||
| 187 | self._registered_training_updates.add(key) |
||
| 188 | |||
| 189 | def register_monitors(self, *monitors): |
||
| 190 | """ |
||
| 191 | Register monitors they should be tuple of name and Theano variable. |
||
| 192 | """ |
||
| 193 | for key, node in monitors: |
||
| 194 | if key not in self._registered_monitors: |
||
| 195 | self.training_monitors.append((key, node)) |
||
| 196 | self.testing_monitors.append((key, node)) |
||
| 197 | self._registered_monitors.add(key) |
||
| 198 | |||
| 199 | def register_external_inputs(self, *variables): |
||
| 200 | """ |
||
| 201 | Register external input variables. |
||
| 202 | """ |
||
| 203 | self.external_inputs.extend(variables) |
||
| 204 | |||
| 205 | def register_external_targets(self, *variables): |
||
| 206 | """ |
||
| 207 | Register extenal target variables. |
||
| 208 | """ |
||
| 209 | self.external_targets.extend(variables) |
||
| 210 | |||
| 211 | def register_training_callbacks(self, *callbacks): |
||
| 212 | """ |
||
| 213 | Register callback for each iteration in the training. |
||
| 214 | """ |
||
| 215 | self.training_callbacks.extend(callbacks) |
||
| 216 | |||
| 217 | def register_testing_callbacks(self, *callbacks): |
||
| 218 | """ |
||
| 219 | Register callback for each iteration in the testing. |
||
| 220 | """ |
||
| 221 | self.testing_callbacks.extend(callbacks) |
||
| 222 | |||
| 223 | def register_epoch_callbacks(self, *callbacks): |
||
| 224 | """ |
||
| 225 | Register callback which will be called after epoch finished. |
||
| 226 | """ |
||
| 227 | self.epoch_callbacks.extend(callbacks) |
||
| 228 | |||
| 229 | def create_weight(self, input_n=1, output_n=1, suffix="", initializer=None, shape=None): |
||
| 230 | if not shape: |
||
| 231 | shape = (input_n, output_n) |
||
| 232 | |||
| 233 | if not initializer: |
||
| 234 | initializer = UniformInitializer() |
||
| 235 | |||
| 236 | weight = theano.shared(initializer.sample(shape).astype(FLOATX), name='W_{}'.format(suffix)) |
||
| 237 | |||
| 238 | logging.info('create weight W_%s: %s', suffix, str(shape)) |
||
| 239 | return weight |
||
| 240 | |||
| 241 | def create_bias(self, output_n=1, suffix="", value=0., shape=None): |
||
| 242 | if not shape: |
||
| 243 | shape = (output_n, ) |
||
| 244 | bs = np.ones(shape) |
||
| 245 | bs *= value |
||
| 246 | bias = theano.shared(bs.astype(FLOATX), name='B_{}'.format(suffix)) |
||
| 247 | logging.info('create bias B_%s: %s', suffix, str(shape)) |
||
| 248 | return bias |
||
| 249 | |||
| 250 | def create_vector(self, n, name, dtype=FLOATX): |
||
| 251 | bs = np.zeros(n) |
||
| 252 | v = theano.shared(bs.astype(dtype), name='{}'.format(name)) |
||
| 253 | |||
| 254 | logging.info('create vector %s: %d', name, n) |
||
| 255 | return v |
||
| 256 | |||
| 257 | def create_matrix(self, m, n, name): |
||
| 258 | |||
| 259 | matrix = theano.shared(np.zeros((m, n)).astype(FLOATX), name=name) |
||
| 260 | |||
| 261 | logging.info('create matrix %s: %d x %d', name, m, n) |
||
| 262 | return matrix |
||
| 263 | |||
| 264 | def callback_forward_propagation(self): |
||
| 265 | pass |
||
| 266 | |||
| 267 |