ComputationGraph   F
last analyzed

Complexity

Total Complexity 66

Size/Duplication

Total Lines 283
Duplicated Lines 0 %

Importance

Changes 0
Metric Value
dl 0
loc 283
rs 3.1913
c 0
b 0
f 0
wmc 66

14 Methods

Rating   Name   Duplication   Size   Complexity  
A dict_of_inputs() 0 3 2
C get_snapshot() 0 24 7
A __iter__() 0 2 1
B replace() 0 86 6
A parameters() 0 4 3
A scan_variables() 0 4 2
A inputs() 0 4 3
A intermediary_variables() 0 5 4
F _get_variables() 0 50 22
B has_inputs() 0 19 6
A get_theano_function() 0 15 2
A shared_variables() 0 3 3
A __init__() 0 6 2
A auxiliary_variables() 0 3 3

How to fix   Complexity   

Complex Class

Complex classes like ComputationGraph often do a lot of different things. To break such a class down, we need to identify a cohesive component within that class. A common approach to find such a component is to look for fields/methods that share the same prefixes, or suffixes.

Once you have determined the fields that belong together, you can apply the Extract Class refactoring. If the component makes sense as a sub-class, Extract Subclass is also a candidate, and is often faster.

1
"""Annotated computation graph management."""
2
import logging
3
from collections import OrderedDict
4
from itertools import chain
5
import warnings
6
7
import numpy
8
import theano
9
from picklable_itertools.extras import equizip
10
from theano import Variable
11
from theano.gof import graph
12
from theano.sandbox.rng_mrg import MRG_RandomStreams
13
from theano.scan_module.scan_op import Scan
14
from toolz import unique
15
16
from ..config import config
17
from ..roles import (add_role, has_roles, AUXILIARY, PARAMETER, DROPOUT,
18
                     COLLECTED, COLLECTOR)
19
from ..utils import (is_graph_input, is_shared_variable, dict_union,
20
                     shared_floatx_zeros, shared_like)
21
from .annotations import add_annotation, Annotation  # noqa
22
from .bn import batch_normalization, apply_batch_normalization  # noqa
23
from .bn import get_batch_normalization_updates  # noqa
24
25
logger = logging.getLogger(__name__)
26
27
28
class ComputationGraph(object):
29
    r"""Encapsulates a managed Theano computation graph.
30
31
    This implies that it not only contains the variables required to
32
    compute the given outputs, but also all the auxiliary variables and
33
    updates that were attached to these variables through the annotation
34
    system.
35
36
    All variables are presented in topologically sorted order according to
37
    the apply nodes that they are an input to.
38
39
    Parameters
40
    ----------
41
    outputs : (list of) :class:`~tensor.TensorVariable`
42
        The output(s) of the computation graph.
43
44
    Attributes
45
    ----------
46
    inputs : list of :class:`~tensor.TensorVariable`
47
        The inputs of the computation graph. This does not include shared
48
        variables and constants.
49
    shared_variables : list of :class:`~tensor.TensorSharedVariable`
50
        All the shared variables in the graph.
51
    parameters : list of :class:`~tensor.TensorSharedVariable`
52
        All the shared variables which have the :const:`.PARAMETER` role.
53
    outputs : list of :class:`~tensor.TensorVariable`
54
        The outputs of the computations graph (as passed to the
55
        constructor).
56
    auxiliary_variables : list of :class:`~tensor.TensorVariable`
57
        All variables which have the :const:`.AUXILIARY` role.
58
    intermediary_variables : list of :class:`~tensor.TensorVariable`
59
        Any variable that is not part of :attr:`inputs` or :attr:`outputs`.
60
    variables : list of :class:`~tensor.TensorVariable`
61
        All variables (including auxiliary) in the managed graph.
62
    scans : list of :class:`~theano.scan_module.scan_op.Scan`
63
        All Scan ops used in this computation graph.
64
    scan_variables : list of :class:`~tensor.TensorVariable`
65
        All variables of the inner graphs of Scan ops.
66
    updates : :class:`~tensor.TensorSharedVariable` updates
67
        All the updates found attached to the annotations.
68
69
    """
70
    def __init__(self, outputs):
71
        if isinstance(outputs, Variable):
72
            outputs = [outputs]
73
        self.outputs = list(outputs)
74
        self._get_variables()
75
        self._has_inputs = {}
76
77
    def __iter__(self):
78
        return iter(self.variables)
79
80
    @property
81
    def inputs(self):
82
        """Inputs to the graph, excluding constants and shared variables."""
83
        return [var for var in self.variables if is_graph_input(var)]
84
85
    @property
86
    def intermediary_variables(self):
87
        return [var for var in self.variables if
88
                var not in self.inputs and
89
                var not in self.outputs]
90
91
    @property
92
    def shared_variables(self):
93
        return [var for var in self.variables if is_shared_variable(var)]
94
95
    @property
96
    def parameters(self):
97
        return [var for var in self.shared_variables
98
                if has_roles(var, [PARAMETER])]
99
100
    @property
101
    def auxiliary_variables(self):
102
        return [var for var in self.variables if has_roles(var, [AUXILIARY])]
103
104
    @property
105
    def scan_variables(self):
106
        """Variables of Scan ops."""
107
        return list(chain(*[g.variables for g in self._scan_graphs]))
108
109
    def _get_variables(self):
110
        """Collect variables, updates and auxiliary variables.
111
112
        In addition collects all :class:`.Scan` ops and recurses in the
113
        respective inner Theano graphs.
114
115
        """
116
        updates = OrderedDict()
117
118
        shared_outputs = [o for o in self.outputs if is_shared_variable(o)]
119
        usual_outputs = [o for o in self.outputs if not is_shared_variable(o)]
120
        variables = shared_outputs
121
122
        if usual_outputs:
123
            # Sort apply nodes topologically, get variables and remove
124
            # duplicates
125
            inputs = graph.inputs(self.outputs)
126
            sorted_apply_nodes = graph.io_toposort(inputs, usual_outputs)
127
            self.scans = list(unique([node.op for node in sorted_apply_nodes
128
                                     if isinstance(node.op, Scan)],
129
                                     key=lambda op: id(op)))
130
            self._scan_graphs = [ComputationGraph(scan.outputs)
131
                                 for scan in self.scans]
132
133
            seen = set()
134
            main_vars = (
135
                [var for var in list(chain(
136
                    *[apply_node.inputs for apply_node in sorted_apply_nodes]))
137
                 if not (var in seen or seen.add(var))] +
138
                [var for var in self.outputs if var not in seen])
139
140
            # While preserving order add auxiliary variables, and collect
141
            # updates
142
            seen = set()
143
            # Intermediate variables could be auxiliary
144
            seen_avs = set(main_vars)
145
            variables = []
146
            for var in main_vars:
147
                variables.append(var)
148
                for annotation in getattr(var.tag, 'annotations', []):
149
                    if annotation not in seen:
150
                        seen.add(annotation)
151
                        new_avs = [
152
                            av for av in annotation.auxiliary_variables
153
                            if not (av in seen_avs or seen_avs.add(av))]
154
                        variables.extend(new_avs)
155
                        updates = dict_union(updates, annotation.updates)
156
157
        self.variables = variables
158
        self.updates = updates
159
160
    def dict_of_inputs(self):
161
        """Return a mapping from an input name to the input."""
162
        return {var.name: var for var in self.inputs}
163
164
    def replace(self, replacements):
165
        """Replace certain variables in the computation graph.
166
167
        Parameters
168
        ----------
169
        replacements : dict
170
            The mapping from variables to be replaced to the corresponding
171
            substitutes.
172
173
        Examples
174
        --------
175
        >>> import theano
176
        >>> from theano import tensor, function
177
        >>> x = tensor.scalar('x')
178
        >>> y = x + 2
179
        >>> z = y + 3
180
        >>> a = z + 5
181
182
        Let's suppose we have dependent replacements like
183
184
        >>> replacements = {y: x * 2, z: y * 3}
185
        >>> cg = ComputationGraph([a])
186
        >>> theano.pprint(a)  # doctest: +NORMALIZE_WHITESPACE
187
        '(((x + TensorConstant{2}) + TensorConstant{3}) +
188
        TensorConstant{5})'
189
        >>> cg_new = cg.replace(replacements)
190
        >>> theano.pprint(
191
        ...     cg_new.outputs[0])  # doctest: +NORMALIZE_WHITESPACE
192
        '(((x * TensorConstant{2}) * TensorConstant{3}) +
193
        TensorConstant{5})'
194
195
        First two sums turned into multiplications
196
197
        >>> float(function(cg_new.inputs, cg_new.outputs)(3.)[0])
198
        23.0
199
200
        """
201
        # Due to theano specifics we have to make one replacement in time
202
        replacements = OrderedDict(replacements)
203
204
        outputs_cur = self.outputs
205
206
        # `replacements` with previous replacements applied. We have to track
207
        # variables in the new graph corresponding to original replacements.
208
        replacement_keys_cur = []
209
        replacement_vals_cur = []
210
        # Sort `replacements` in topological order
211
        # variables in self.variables are in topological order
212
        remaining_replacements = replacements.copy()
213
        for variable in self.variables:
214
            if variable in replacements:
215
                if has_roles(variable, [AUXILIARY]):
216
                    warnings.warn(
217
                        "replace method was asked to replace a variable ({}) "
218
                        "that is an auxiliary variable.".format(variable))
219
                replacement_keys_cur.append(variable)
220
                # self.variables should not contain duplicates,
221
                # otherwise pop() may fail.
222
                replacement_vals_cur.append(
223
                    remaining_replacements.pop(variable))
224
225
        # if remaining_replacements is not empty
226
        if remaining_replacements:
227
            warnings.warn(
228
                "replace method was asked to replace a variable(s) ({}) "
229
                "that is not a part of the computational "
230
                "graph.".format(str(remaining_replacements.keys())))
231
232
        # Replace step-by-step in topological order
233
        while replacement_keys_cur:
234
            replace_what = replacement_keys_cur[0]
235
            replace_by = replacement_vals_cur[0]
236
            # We also want to make changes in future replacements
237
            outputs_new = theano.clone(
238
                outputs_cur + replacement_keys_cur[1:] +
239
                replacement_vals_cur[1:],
240
                replace={replace_what: replace_by})
241
            # Reconstruct outputs, keys, and values
242
            outputs_cur = outputs_new[:len(outputs_cur)]
243
            replacement_keys_cur = outputs_new[len(outputs_cur):
244
                                               len(outputs_cur) +
245
                                               len(replacement_keys_cur) - 1]
246
            replacement_vals_cur = outputs_new[len(outputs_cur) +
247
                                               len(replacement_keys_cur):]
248
249
        return ComputationGraph(outputs_cur)
250
251
    def get_theano_function(self, additional_updates=None, **kwargs):
252
        r"""Create Theano function from the graph contained.
253
254
        Parameters
255
        ----------
256
        \*\*kwargs : dict
257
            Keyword arguments to theano.function.
258
            Useful for specifying compilation modes or profiling.
259
260
        """
261
        updates = self.updates
262
        if additional_updates:
263
            updates = dict_union(updates, OrderedDict(additional_updates))
264
        return theano.function(self.inputs, self.outputs, updates=updates,
265
                               **kwargs)
266
267
    def get_snapshot(self, data):
268
        """Evaluate all role-carrying Theano variables on given data.
269
270
        Parameters
271
        ----------
272
        data : dict of (data source, data) pairs
273
            Data for input variables. The sources should match with the
274
            names of the input variables.
275
276
        Returns
277
        -------
278
        Dictionary of (variable, variable value on given data) pairs.
279
280
        """
281
        role_variables = [var for var in self.variables
282
                          if hasattr(var.tag, "roles") and
283
                          not is_shared_variable(var)]
284
        value_holders = [shared_like(var) for var in role_variables]
285
        function = self.get_theano_function(equizip(value_holders,
286
                                                    role_variables))
287
        function(*(data[input_.name] for input_ in self.inputs))
288
        return OrderedDict([(var, value_holder.get_value(borrow=True))
289
                            for var, value_holder in equizip(role_variables,
290
                                                             value_holders)])
291
292
    def has_inputs(self, variable):
293
        """Check if a variable depends on input variables.
294
295
        Returns
296
        -------
297
        bool
298
            ``True`` if the given variable depends on input variables,
299
            ``False`` otherwise.
300
301
        """
302
        if variable not in self._has_inputs:
303
            self._has_inputs[variable] = False
304
            if is_graph_input(variable):
305
                self._has_inputs[variable] = True
306
            elif getattr(variable, 'owner', None):
307
                for dependancy in variable.owner.inputs:
308
                    if self.has_inputs(dependancy):
309
                        self._has_inputs[variable] = True
310
        return self._has_inputs[variable]
311
312
313
def apply_noise(computation_graph, variables, level, seed=None):
314
    """Add Gaussian noise to certain variable of a computation graph.
315
316
    Parameters
317
    ----------
318
    computation_graph : instance of :class:`ComputationGraph`
319
        The computation graph.
320
    variables : :class:`~tensor.TensorVariable`
321
        Variables to add noise to.
322
    level : float
323
        Noise level.
324
    seed : int, optional
325
        The seed with which
326
        :class:`~theano.sandbox.rng_mrg.MRG_RandomStreams` is initialized,
327
        is set to 1 by default.
328
329
    """
330
    if not seed:
331
        seed = config.default_seed
332
    rng = MRG_RandomStreams(seed)
333
    replace = {}
334
    for variable in variables:
335
        replace[variable] = (variable +
336
                             rng.normal(variable.shape, std=level))
337
    return computation_graph.replace(replace)
338
339
340
def collect_parameters(computation_graph, parameters):
341
    """Replace parameters with a single shared variable.
342
343
    This can be useful if you need to calculate the full Hessian of a
344
    computational graph. It replaces parameters with slices of a single
345
    large vectors like
346
347
    >>> from blocks.utils import shared_floatx
348
    >>> W1 = shared_floatx(numpy.random.rand(10, 10))
349
    >>> W2 = shared_floatx(numpy.random.rand(10, 10))
350
    >>> all_parameters = shared_floatx(numpy.concatenate(
351
    ...     [W1.get_value().flatten(), W2.get_value().flatten()]))
352
    >>> W1 = all_parameters[:W1.size]
353
    >>> W2 = all_parameters[W1.size:]
354
355
    Parameters
356
    ----------
357
    computation_graph : :class:`ComputationGraph` instance
358
        The managed Theano graph in which to collect parameters.
359
    parameters : list of Theano shared variables
360
        The parameters whose values should be collected.
361
362
    Returns
363
    -------
364
    ComputationGraph instance
365
        A new Theano graph which has all the given parameters collected
366
        into a single large shared variable.
367
368
    Notes
369
    -----
370
    Note that this replacement makes the training of the model
371
    significantly slower because of the large amount of Theano's
372
    ``set_subtensor`` calls needed to train the model.
373
374
    Examples
375
    --------
376
    >>> from blocks.bricks import MLP, Logistic
377
    >>> from blocks.bricks.cost import SquaredError
378
    >>> from theano import tensor
379
    >>> x = tensor.matrix()
380
    >>> mlp = MLP(activations=[Logistic(), Logistic()],
381
    ...           dims=[784, 100, 784])
382
    >>> cost = SquaredError().apply(x, mlp.apply(x))
383
    >>> cg = ComputationGraph(cost)
384
    >>> new_cg = collect_parameters(cg, cg.shared_variables)
385
386
    The new graph only has a single shared variable. This variable receives
387
    the :const:`COLLECTOR` role.
388
389
    >>> new_cg.shared_variables
390
    [collected_parameters]
391
392
    The bricks' variables have been replaced with reshaped segments of this
393
    single shared variable. These replacements are given the
394
    :const:`.COLLECTED` role.
395
396
    >>> from blocks.filter import VariableFilter
397
    >>> from blocks.roles import PARAMETER
398
    >>> var_filter = VariableFilter(roles=[COLLECTED])
399
    >>> var_filter(new_cg.variables)  # doctest: +SKIP
400
    [Reshape{1}.0, Reshape{1}.0, Reshape{2}.0, Reshape{2}.0]
401
402
    """
403
    parameter_values, parameter_sizes, parameter_shapes = [], [], []
404
    for parameter in parameters:
405
        parameter_values.append(parameter.get_value(borrow=True))
406
        parameter_sizes.append(parameter_values[-1].size)
407
        parameter_shapes.append(parameter_values[-1].shape)
408
409
    new_parameters = shared_floatx_zeros(sum(parameter_sizes))
410
    new_parameters.set_value(numpy.concatenate([value.flatten()
411
                             for value in parameter_values]))
412
    new_parameters.name = 'collected_parameters'
413
    add_role(new_parameters, COLLECTOR)
414
415
    replacements = {}
416
    for parameter, shape, i, j in zip(parameters, parameter_shapes,
417
                                      numpy.cumsum([0] + parameter_sizes[:-1]),
418
                                      numpy.cumsum(parameter_sizes)):
419
        new_parameter = new_parameters[i:j].reshape(shape)
420
        new_parameter.replacement_of = parameter
421
        add_role(new_parameter, COLLECTED)
422
        replacements[parameter] = new_parameter
423
    return computation_graph.replace(replacements)
424
425
426
def apply_dropout(computation_graph, variables, drop_prob, rng=None,
427
                  seed=None, custom_divisor=None):
428
    """Apply dropout to specified variables in a graph.
429
430
    Parameters
431
    ----------
432
    computation_graph : instance of :class:`ComputationGraph`
433
        The computation graph.
434
    variables : list of :class:`~tensor.TensorVariable`
435
        Variables to be dropped out.
436
    drop_prob : float
437
        Probability of dropping out. If you want to apply the dropout
438
        with different probabilities for different layers, call it
439
        several times.
440
    rng : :class:`~theano.sandbox.rng_mrg.MRG_RandomStreams`
441
        Random number generator.
442
    seed : int
443
        Random seed to be used if `rng` was not specified.
444
    custom_divisor : float or None, optional
445
        Divide dropped variables by a given scalar value. If `None`,
446
        (default) dropped variables will be divided by `(1 - drop_prob)`
447
        which is equivalent to scaling by `(1 - drop_prob)` at test
448
        time as recommended in [DROPOUT]_.
449
450
    Returns
451
    -------
452
    dropped_computation_graph : instance of :class:`ComputationGraph`
453
        A new computation graph with dropout applied to the specified
454
        variables. In order to train with, or monitor, the outputs
455
        of the original computation graph with dropout applies, use
456
        the variables contained in `dropped_computation_graph.outputs`.
457
458
    Notes
459
    -----
460
    For more information, see [DROPOUT]_.
461
462
    .. [DROPOUT] Hinton et al. *Improving neural networks by preventing
463
       co-adaptation of feature detectors*, arXiv:1207.0580.
464
465
    Examples
466
    --------
467
    >>> import numpy
468
    >>> from theano import tensor, function
469
    >>> from blocks.bricks import MLP, Identity
470
    >>> from blocks.filter import VariableFilter
471
    >>> from blocks.initialization import Constant
472
    >>> from blocks.roles import INPUT
473
    >>> linear = MLP([Identity(), Identity()], [2, 10, 2],
474
    ...              weights_init=Constant(1), biases_init=Constant(2))
475
    >>> x = tensor.matrix('x')
476
    >>> y = linear.apply(x)
477
    >>> cg = ComputationGraph(y)
478
479
    We are going to drop out all the input variables
480
481
    >>> inputs = VariableFilter(roles=[INPUT])(cg.variables)
482
483
    Here we apply dropout with default setting to our computation graph
484
485
    >>> cg_dropout = apply_dropout(cg, inputs, 0.5)
486
487
    Dropped out variables have role `DROPOUT` and are tagged with
488
    `replacement_of` tag. Let's filter these variables and check if they
489
    have the links to original ones.
490
491
    >>> dropped_out = VariableFilter(roles=[DROPOUT])(cg_dropout.variables)
492
    >>> inputs_referenced = [var.tag.replacement_of for var in dropped_out]
493
    >>> set(inputs) == set(inputs_referenced)
494
    True
495
496
    Compiling theano functions to forward propagate in original and dropped
497
    out graphs
498
499
    >>> fprop = function(cg.inputs, cg.outputs[0])
500
    >>> fprop_dropout = function(cg_dropout.inputs, cg_dropout.outputs[0])
501
502
    Initialize an MLP and apply these functions
503
504
    >>> linear.initialize()
505
    >>> fprop(numpy.ones((3, 2),
506
    ...       dtype=theano.config.floatX))  # doctest:+ELLIPSIS
507
    array([[ 42.,  42.],
508
           [ 42.,  42.],
509
           [ 42.,  42.]]...
510
    >>> fprop_dropout(numpy.ones((3, 2),
511
    ...               dtype=theano.config.floatX))  # doctest:+ELLIPSIS
512
    array([[ 0.,  0.],
513
           [ 0.,  0.],
514
           [ 0.,  0.]]...
515
516
    And after the second run answer is different
517
518
    >>> fprop_dropout(numpy.ones((3, 2),
519
    ...               dtype=theano.config.floatX))  # doctest:+ELLIPSIS
520
    array([[   0.,   52.],
521
           [ 100.,    0.],
522
           [   0.,    0.]]...
523
524
    """
525
    if not rng and not seed:
526
        seed = config.default_seed
527
    if not rng:
528
        rng = MRG_RandomStreams(seed)
529
    if custom_divisor is None:
530
        divisor = (1 - drop_prob)
531
    else:
532
        divisor = custom_divisor
533
    replacements = [(var, var *
534
                     rng.binomial(var.shape, p=1 - drop_prob,
535
                                  dtype=theano.config.floatX) /
536
                     divisor)
537
                    for var in variables]
538
    for variable, replacement in replacements:
539
        add_role(replacement, DROPOUT)
540
        replacement.tag.replacement_of = variable
541
542
    return computation_graph.replace(replacements)
543