ChainWith.__init__()   A
last analyzed

Complexity

Conditions 1

Size

Total Lines 13

Duplication

Lines 0
Ratio 0 %

Importance

Changes 1
Bugs 0 Features 0
Metric Value
cc 1
c 1
b 0
f 0
dl 0
loc 13
rs 9.4285
1
#!/usr/bin/env python
2
3
# :copyright:    Copyright (C) 2015 Universidad Carlos III de Madrid.
4
#                Todos los derechos reservados.
5
# :license       LASR_UC3M v1.0, ver LICENCIA.txt
6
7
# Este programa es software libre: puede redistribuirlo y/o modificarlo
8
# bajo los terminos de la Licencia Academica Social Robotics Lab - UC3M
9
# publicada por la Universidad Carlos III de Madrid, tanto en su version 1.0
10
# como en una version posterior.
11
12
# Este programa se distribuye con la intencion de que sea util,
13
# pero SIN NINGUNA GARANTIA. Para mas detalles, consulte la
14
# Licencia Academica Social Robotics Lab - UC3M version 1.0 o posterior.
15
16
# Usted ha recibido una copia de la Licencia Academica Social
17
# Robotics Lab - UC3M en el fichero LICENCIA.txt, que tambien se encuentra
18
# disponible en <URL a la LASR_UC3Mv1.0>.
19
20
"""Utilities to ease the subscription and publication of ros topics."""
21
22
# import roslib
23
# roslib.load_manifest('monarch_multimodal_fusion')
24
# import argparse
25
# import rospy
26
27
import rospy_utils as rpyu
28
from rospy_utils import coroutines as co
29
30
31
def get_msg_type(msg_typename):
32
    """
33
    Translate from a message typename to a message actual type.
34
35
    :param str msg_typename: The typename of the msg.
36
        Example: 'std_msgs/String'
37
    :return: The type of the message specified as parameter.
38
    :rtype: type
39
40
    Example:
41
42
        >>> msg = get_msg_type("std_msgs/String")
43
        >>> msg(data='Hello World!')
44
        data: Hello World!
45
    """
46
    msg_package, msg_name = msg_typename.split('/')
47
    msg_full_typename = '.'.join([msg_package, 'msg', msg_name])
48
    return rpyu.load_class(msg_full_typename)
49
50
51
class ChainWith(object):
52
53
    """Chains two topics with a coroutine."""
54
55
    def __init__(self, coroutine, in_topic, in_typeneme,
56
                 out_topic, out_typename, *args, **kwargs):
57
        """Constructor."""
58
        super(ChainWith, self).__init__()
59
        self.coroutine = coroutine
60
        self.in_topic = in_topic
61
        self.in_typeneme = in_typeneme
62
        self.out_topic = out_topic
63
        self.out_typename = out_typename
64
65
        pipe = co.pipe([coroutine(*args, **kwargs),
66
                        co.publisher(self.out_topic, self.out_type)])
0 ignored issues
show
Bug introduced by
The Instance of ChainWith does not seem to have a member named out_type.

This check looks for calls to members that are non-existent. These calls will fail.

The member could have been renamed or removed.

Loading history...
67
        co.PipedSubscriber(self.in_topic, self.in_type, pipe)
0 ignored issues
show
Bug introduced by
The Instance of ChainWith does not seem to have a member named in_type.

This check looks for calls to members that are non-existent. These calls will fail.

The member could have been renamed or removed.

Loading history...
68