Issues (5)

src/rospy_utils/coroutines.py (1 issue)

1
# :version:      0.1.0
2
# :copyright:    Copyright (C) 2014 Universidad Carlos III de Madrid.
3
#                Todos los derechos reservados.
4
# :license       LASR_UC3M v1.0, ver LICENCIA.txt
5
6
# Este programa es software libre: puede redistribuirlo y/o modificarlo
7
# bajo los terminos de la Licencia Academica Social Robotics Lab - UC3M
8
# publicada por la Universidad Carlos III de Madrid, tanto en su version 1.0
9
# como en una version posterior.
10
11
# Este programa se distribuye con la intencion de que sea util,
12
# pero SIN NINGUNA GARANTIA. Para mas detalles, consulte la
13
# Licencia Academica Social Robotics Lab - UC3M version 1.0 o posterior.
14
15
# Usted ha recibido una copia de la Licencia Academica Social
16
# Robotics Lab - UC3M en el fichero LICENCIA.txt, que tambien se encuentra
17
# disponible en <URL a la LASR_UC3Mv1.0>.
18
19
"""
20
Coroutines that ease the data manipulation and communication in ROS.
21
22
:author: Victor Gonzalez ()
23
:maintainer: Victor Gonzalez
24
:version: 0.2
25
26
"""
27
from __future__ import print_function
28
29
import rospy
0 ignored issues
show
The import rospy could not be resolved.

This can be caused by one of the following:

1. Missing Dependencies

This error could indicate a configuration issue of Pylint. Make sure that your libraries are available by adding the necessary commands.

# .scrutinizer.yml
before_commands:
    - sudo pip install abc # Python2
    - sudo pip3 install abc # Python3
Tip: We are currently not using virtualenv to run pylint, when installing your modules make sure to use the command for the correct version.

2. Missing __init__.py files

This error could also result from missing __init__.py files in your module folders. Make sure that you place one file in each sub-folder.

Loading history...
30
from collections import deque
31
from functools import wraps
32
33
# from decorator import decorator
34
35
36
# @decorator
37
def coroutine(func):
38
    """
39
    Decorator function that takes care of starting a coroutine automatically.
40
41
    Example
42
43
    >>> @coroutine
44
    >>> def grep(pattern):
45
    >>> print "Looking for %s" % pattern
46
    >>> while True:
47
    >>>     line = (yield)
48
    >>>     if pattern in line:
49
    >>>         print line,
50
    >>> g = grep("python")
51
    >>> # Notice how you don't need a next() call here
52
    >>> g.send("Yeah, but no, but yeah, but no")
53
    >>> g.send("A series of tubes")
54
    >>> g.send("python generators rock!")
55
56
    Unshamely taken from: http://dabeaz.com/coroutines/coroutine.py
57
    BTW: If you still havent't checked out his great tutorials on coroutines
58
    do it right now and you will learn some Python magic tricks, litte bastard:
59
    http://dabeaz.com/coroutines
60
    """
61
    @wraps(func)
62
    def start(*args, **kwargs):
63
        """Coroutine wrapper that sets up the coroutine."""
64
        cr = func(*args, **kwargs)
65
        next(cr)
66
        return cr
67
    return start
68
69
70
###############################################################################
71
# Filter Coroutines (intermediate steps in the data pipeline)
72
###############################################################################
73
74
75
@coroutine
76
def accumulator(num_items, target=None):
77
    """
78
    Accumulate items in a list to send it to target when len == num_items.
79
80
    It clears the list after it is sent.
81
    If you do not specify the target, you will have to send it later.
82
83
84
    :param int num_items: (int) Num of items to accumulate before each
85
    :param target: (default: None) Next coroutine in the data pipeline
86
        Note that if you don't specify instantiate the coroutine
87
        specifying the ``target`` you'll have to send it later using
88
        ``accumulator.send(target)`` method.
89
    :type target: coroutine or None
90
91
    Example
92
93
    >>> accum = accumulator(3, printer())
94
    >>> accum.send(1)     # Nothing sent to printer coroutine
95
    >>> accum.send(2)     # Still nothing sent
96
    >>> accum.send(3)
97
    [1, 2, 3]
98
    >>> accumf.send(4)
99
    >>> accumf.send(5)
100
    >>> accumf.send(6)
101
    [4, 5, 6]
102
103
104
    Note that you can call ``accumulator`` withouth specifying the target.
105
    If you don't specify the target, you'll have to send it manually:
106
107
    >>> accum = accumulator(3)
108
    >>> accum.send(printer())     # First you send the target
109
    >>> accum.send(1)             # And then you can send regular data
110
    >>> accum.send(2)
111
    >>> accum.send(3)
112
    [1, 2, 3]
113
    """
114
    items = list()
115
    if not target:
116
        target = (yield)
117
    while True:
118
        items.append((yield))
119
        if len(items) == num_items:
120
            target.send(items)
121
            del items[:]
122
123
124
@coroutine
125
def sliding_window(size, target=None):
126
    """
127
    Send the last size recived elements to target.
128
129
    :param int size: Size of the sliding window
130
    :param target: (default: None) Next coroutine in the data pipeline
131
        Note that if you don't specify instantiate the coroutine
132
        specifying the ``target`` you'll have to send it later using
133
        ``sliding_window.send(target)`` method.
134
    :type target: coroutine or None
135
136
137
    Example
138
139
    >>> window = sliding_window(3, printer())
140
    >>> for i in xrange(5):
141
    >>>     window.send(i)
142
    [0]
143
    [0, 1]
144
    [0, 1, 2]
145
    [1, 2, 3]
146
    [2, 3, 4]
147
    """
148
    window = deque([], size)
149
    if not target:
150
        target = (yield)
151
    while True:
152
        window.append((yield))
153
        target.send(list(window))
154
155
156
@coroutine
157
def do(f, target=None):
158
    """
159
    Apply f on x, send x to the next coroutine.
160
161
    Note that ``do`` does not send f(x) but x itself. That means that only
162
    side effects of ``f`` are relevant.
163
    This is a good coroutine for logging or publishing messages through
164
    the pipeline.
165
166
    :param callable f: Function with side effects to apply to incoming messages
167
    :param target: (default: None) Next coroutine in the data pipeline
168
        Note that if you don't specify instantiate the coroutine
169
        specifying the ``target`` you'll have to send it later using
170
        ``do.send(target)`` method.
171
    :type target: coroutine or None
172
173
    Example
174
175
    >>> def print_msg(msg):
176
    >>>     print msg
177
    >>> t = do(print_msg, transformer(lambda x: x+1, printer()))
178
    >>> t.send(1)
179
    1
180
    2
181
    >>> t.send(10)
182
    10
183
    11
184
    """
185
    if not target:
186
        target = (yield)
187
    while True:
188
        msg = (yield)
189
        f(msg)
190
        target.send(msg)
191
192
193
@coroutine
194
def transformer(f, target=None):
195
    """
196
    Apply f to incoming data and send the result to target coroutine.
197
198
    :param callable f: Function to apply to every incoming message
199
    :param target: (default: None) Next coroutine in the data pipeline
200
        Note that if you don't specify instantiate the coroutine
201
        specifying the ``target`` you'll have to send it later using
202
        ``transformer.send(target)`` method.
203
    :type target: coroutine or None
204
205
    Example
206
207
    >>> t = transformer(lambda x: x+1, printer())
208
    >>> t.send(1)
209
    2
210
    >>> t.send(10)
211
    11
212
    """
213
    if not target:
214
        target = (yield)
215
    while True:
216
        msg = f((yield))
217
        target.send(msg)
218
219
220
mapper = transformer   # alias
221
222
223
@coroutine
224
def startransformer(f, target=None, *fargs, **fkwargs):
225
    """
226
    Apply f (where f has multiple args) to incoming data. Send result to target.
227
228
    This is an extension of the transformer coroutine that uses functions
229
    that require more than one argument. Note that the first argument of f
230
    is received by the coroutine, while the remaining args are pre-set in
231
    fargs and fkwargs.
232
233
    :param callable f: Function to apply to every incoming message
234
    :param target: (default: None) Next coroutine in the data pipeline
235
        Note that if you don't specify instantiate the coroutine
236
        specifying the ``target`` you'll have to send it later using
237
        ``transformer.send(target)`` method.
238
    :type target: coroutine or None
239
    :param fargs: Extra arguments to pass to `f`
240
    :param fkwargs: keyword arguments to pass to `f`
241
242
    Example:
243
244
    >>> import operator as op
245
    >>> increment = transformer(op.add, printer(), 1)
246
    >>> increment.send(1)
247
    2
248
    >>> increment.send(2)
249
    3
250
    >>> increment.send(3)
251
    4
252
    >>> increment.send(10)
253
    11
254
255
    Note that if you want to use the curried version of startransformer
256
    (E.g. for their use in pipes), you mustset the target coroutine as `None`:
257
258
    >>> import operator as op
259
    >>> increment = transformer(op.add, None, 1)  # Explicitly say "No target".
260
    >>> increment.send(printer())          # Send the target first.
261
    >>> increment.send(1)                  # And then you can send regular data.
262
    2
263
    >>> increment.send(2)
264
    3
265
    >>> increment.send(3)
266
    4
267
    """
268
    if not target:
269
        target = (yield)
270
    while True:
271
        msg = f((yield), *fargs, **fkwargs)
272
        target.send(msg)
273
274
275
starmapper = startransformer   # alias
276
277
278
@coroutine
279
def filterer(pred, target=None):
280
    """
281
    Coroutine that Filters its messages with pred function.
282
283
    :param callable pred: Predicate that evaluates every incoming message
284
    :param target: (default: None) Next coroutine in the data pipeline
285
        Note that if you don't specify instantiate the coroutine
286
        specifying the ``target`` you'll have to send it later using
287
        ``filterer.send(target)`` method.
288
    :type target: coroutine or None
289
290
291
    Example
292
293
    >>> is_even = lambda x: x % 2 == 0
294
    >>> evens = filterer(is_even, printer())
295
    >>> for i in xrange(5):
296
    >>>     evens.send(i)
297
    0
298
    2
299
    4
300
301
    """
302
    if not target:
303
        target = (yield)
304
    while True:
305
        msg = (yield)
306
        if pred(msg):
307
            target.send(msg)
308
309
310
@coroutine
311
def starfilterer(pred, target=None, *fargs, **fkwargs):
312
    """
313
    Multiple argument version of filterer.
314
315
    This is an extension of the filterer coroutine which uses predicates
316
    that require more than one argument. Note that the first argument of f
317
    is received by the coroutine, while the remaining args are pre-set in
318
    fargs and fkwargs.
319
320
    Coroutine that Filters its messages with pred function.
321
322
    :param callable pred: Predicate that evaluates every incoming message
323
    :param target: (default: None) Next coroutine in the data pipeline
324
        Note that if you don't specify instantiate the coroutine
325
        specifying the ``target`` you'll have to send it later using
326
        ``filterer.send(target)`` method.
327
    :type target: coroutine or None
328
    :param fargs: Extra arguments to pass to `f`
329
    :param fkwargs: keyword arguments to pass to `f`
330
331
332
    Example
333
334
    >>> greater_than = lambda x, y: x > y
335
    >>> greater_than_two = filterer(greater_than, printer(), 2)
336
    >>> for i in xrange(5):
337
    >>>     greater_than_two.send(i)
338
    3
339
    4
340
341
    Note that if you want to use the curried version of starfilterer
342
    (E.g. for their use in pipes), you mustset the target coroutine as `None`:
343
344
    >>> greater_than = lambda x, y: x > y
345
    >>> greater_than_two = filterer(greater_than, None, 2)
346
    >>> greater_than_two.send(printer())   # First you set the target coroutine
347
    >>> for i in xrange(5):      # And then you can send it regular data
348
    >>>     greater_than_two.send(i)
349
    3
350
    4
351
352
    """
353
    if not target:
354
        target = (yield)
355
    while True:
356
        msg = (yield)
357
        if pred(msg, *fargs, **fkwargs):
358
            target.send(msg)
359
360
361
@coroutine
362
def splitter(*coroutines):
363
    """
364
    Send the data to all the passed coroutines.
365
366
    :param coroutines: coroutines at which the incoming data will be sent
367
368
    Example:
369
    >>> s = splitter(printer(),
370
                     printer(suffix='!!!'),
371
                     printer(suffix='World!'))
372
    >>> s.send('Hello')
373
    'Hello'
374
    'Hello!!!'
375
    'Hello World!'
376
377
    If you do not specify the coroutines at coroutine instantiation, you'll
378
    have to do it later by sending a ``tuple`` or a ``list`` using the
379
    ``send`` method.
380
381
    >>> s = splitter()   # You'll need to specify the coroutines later
382
    >>> targets = [printer(),
383
                   printer(suffix='!!!'),
384
                   printer(suffix='World!')]
385
    >>> s.send(targets)     # Now you can send regular data.
386
    >>> s.send('Hello')
387
    'Hello'
388
    'Hello!!!'
389
    'Hello World!'
390
    """
391
    if not coroutines:
392
        coroutines = (yield)
393
    while True:
394
        data = (yield)
395
        for c in coroutines:
396
            c.send(data)
397
398
399
@coroutine
400
def either(pred, targets=(None, None)):
401
    """
402
    Split an incoming message in two coroutintes according to a predicate.
403
404
    The predicate is evaluated against incoming data to decide to which
405
    coroutine resend the incoming message.
406
    If the predicate produces a ``True``, then the incoming message will be
407
    sent to ``targets[0]``. If produces ``False`` it will be sent to
408
    ``targets[1]``
409
410
    :param callable pred: Predicate to decide to which target send the data
411
    :param targets: A pair of coroutines to send the data.
412
              If you don't instantiate the coroutine with this param,
413
              you will need to send the targets afterwards prior to start
414
              sending it data.
415
    :type targets: tuple(coroutine, coroutine)
416
417
    A possible use of this coroutine is to send data to loggers if some
418
    preconditions fail:
419
420
    Example
421
422
    >>> data_processor = transformer(lambda x: x**2, printer())
423
    >>> error_logger = printer(prefix="ERROR: value too high!")
424
    >>> ei = either(lambda x: x > 10)
425
    >>> ei.send((data_processor, error_logger))
426
    >>> ei.send(2)
427
    4
428
    >>> ei.send(12)
429
    "ERROR: value too high!"
430
    >>> ei.send(5)
431
    25
432
    """
433
    if not all(targets):
434
        targets = (yield)
435
    trues, falses = targets
436
    while True:
437
        msg = (yield)
438
        if pred(msg):
439
            trues.send(msg)
440
        else:
441
            falses.send(msg)
442
443
444
@coroutine
445
def folder(binop, init_value, target=None):
446
    """
447
    The reduce equivalent for coroutines.
448
449
    Applies binop to each received value and the previous result
450
    and sends the result to target
451
452
    :param callable binop: binary operation to apply to each received msg
453
    :param init_value: value used the first time the coroutine receives data
454
    :param target: (default: None) Next coroutine in the data pipeline
455
        Note that if you don't specify instantiate the coroutine
456
        specifying the ``target`` you'll have to send it later using
457
        ``folder.send(target)`` method.
458
    :type target: coroutine or None
459
460
    Example:
461
462
    >>> import operator as op
463
    >>> mul = fold(op.mul, 1, printer())
464
    >>> for _ in xrange(5):
465
            mul.send(2)
466
    2
467
    4
468
    8
469
    16
470
    32
471
    """
472
    value = init_value
473
    target = target or (yield)
474
    # target = kwargs.get('target', (yield))
475
    # if not target:
476
    #     target = (yield)
477
    while True:
478
        value = binop(value, (yield))
479
        target.send(value)
480
481
482
@coroutine
483
def dropwhile(pred, target=None):
484
    """
485
    Drop received elements while pred is True.
486
487
    :param callable pred: Predicate that evaluates incoming data
488
    :param target: (default: None) Next coroutine in the data pipeline
489
        Note that if you don't specify instantiate the coroutine
490
        specifying the ``target`` you'll have to send it later using
491
        ``buffer.send(target)`` method.
492
    :type target: coroutine or None
493
494
    Example:
495
496
    >>> tw = dropwhile(lambda x: 0 <= x <= 1, printer())
497
    >>> tw.send(-1)     # Nothing is printed
498
    >>> tw.send(2)      # Nothing is printed
499
    >>> tw.send(0.2)
500
    0.2
501
    >>> tw.send(42)
502
    42
503
    """
504
    if not target:
505
        target = (yield)
506
    while True:
507
        value = (yield)
508
        if not pred(value):
509
            target.send(value)
510
            break
511
    while True:
512
        target.send((yield))
513
514
515
@coroutine
516
def takewhile(pred, target=None):
517
    """
518
    Send elements to target until pred returns False.
519
520
    :param callable pred: Predicate that evaluates incoming data
521
    :param target: (default: None) Next coroutine in the data pipeline
522
        Note that if you don't specify instantiate the coroutine
523
        specifying the ``target`` you'll have to send it later using
524
        ``buffer.send(target)`` method.
525
    :type target: coroutine or None
526
527
    Example:
528
    --------
529
    >>> tw = takewhile(lambda x: 0 <= x <= 1, printer())
530
    >>> tw.send(0.1)
531
    0.1
532
    >>> tw.send(0.5)
533
    0.5
534
    >>> tw.send(2)      # Nothing is printed anymore
535
    >>> tw.send(0.2)    # Nothing is printed
536
    """
537
    if not target:
538
        target = ((yield))
539
    while True:
540
        val = (yield)
541
        if pred(val):
542
            target.send(val)
543
        else:
544
            break
545
    while True:
546
        (yield)
547
548
###############################################################################
549
# Consumer Coroutines (sinks in the data pipeline)
550
###############################################################################
551
552
553
@coroutine
554
def publisher(topic, msg_type, *args, **kwargs):
555
    """
556
    A coroutine-based rospy.publisher.
557
558
    :topic: (str) Name of the topic to publish
559
    :msg_type: type of the message to publish
560
561
    Example of use:
562
563
    >>> from std_msgs import String
564
    >>> pub = publisher('/my_topic', String)
565
    >>> pub.send("Hello World!")
566
    >>> # At this point you would receive "Hello World!" in /my_topic
567
568
    See: rospy.publisher
569
    """
570
    pub = rospy.Publisher(topic, msg_type, *args, **kwargs)
571
    while True:
572
        pub.publish((yield))
573
574
575
@coroutine
576
def logger(logger_, prefix='', suffix=''):
577
    """
578
    Call logger_ on incoming data.
579
580
    :param callable logger_: Logger function that prints logging messages
581
    :param str prefix: (Default: '') Prefix to append to incoming data.
582
    :param str suffix: (Default: '') Suffix to append to incoming data.
583
584
585
    Example:
586
587
    >>> err_logger = logger(rospy.logerr, prefix="ERROR: ", suffix="!!!")
588
    >>> err_logger.send("This is an error message")
589
    "ERROR: This is an error message!!!"
590
    """
591
    while True:
592
        try:
593
            string = str((yield))
594
            logger_(''.join([prefix, string, suffix]))
595
        except StopIteration:
596
            pass
597
598
599
@coroutine
600
def printer(prefix='', suffix=''):
601
    """
602
    Print incoming data.
603
604
    :param str prefix: (Default: '') Prefix to append to incoming data.
605
    :param str suffix: (Default: '') Suffix to append to incoming data.
606
607
    Example:
608
609
    >>> p = printer()
610
    >>> p.send("Hello World")
611
    'Hello World'
612
    >>> p = printer(prefix='You said: ', suffix='!')
613
    >>> p.send("Hello World")
614
    'You said: Hello World"'
615
    """
616
    while True:
617
        try:
618
            item = (yield)
619
            print(''.join([prefix, item, suffix]))
620
        except StopIteration:
621
            pass
622
623
624
###############################################################################
625
# Data producers
626
###############################################################################
627
class PipedSubscriber(object):
628
629
    """
630
    Subscriber to a ROS topic to send/receive msgs to a coroutine or a pipe.
631
632
    A wrapper of the `rospy.Subscriber` class that connects the Subscriber
633
    directly with a coroutine (or a pipe) that processes the incoming msgs.
634
    In short it has the same api as `rospy.subscriber` but, instead of a
635
    callback you pass it a coroutine or a `pipe` to it.
636
637
    :param str topic_name: Name of the topic to Subscriber
638
    :param type msg_type: Type of the messages of `topic_name`
639
    :param generator target: The coroutine or
640
        ..mod:pipe where the incoming messages will be sent
641
642
    Here you can see an example of use:
643
644
    >>> ### This node subscribes to the 'my_topic' topic,
645
    >>> ### transforms incoming strings to uppercase and
646
    >>> ### publishes them into rospy.logerr
647
    >>> import rospy
648
    >>> pipe = pipe([transformer(lambda x: str(x).upper()),
649
                     logger(rospy.logwarn, prefix='Got: ')])
650
    >>> rospy.init_node('my_node')
651
    >>> rospy.loginfo("Node {} started".format(rospy.get_name()))
652
    >>> PipedSubscriber('my_topic', String, pipe)
653
    >>> # rospy.spin()
654
    """
655
656
    def __init__(self, topic_name, msg_type, target, *args, **kwargs):
657
        """ Class constructor. """
658
        rospy.Subscriber(topic_name, msg_type, target.send, *args, **kwargs)
659
660
661
###############################################################################
662
# Utilities
663
###############################################################################
664
def pipe(coroutines):
665
    """
666
    Chain several coroutines to create a data processing pipe.
667
668
    Chains several coroutines together and returns the first coroutine
669
    of the pipe so you can send messages through the whole pipe.
670
671
    Note: The pipe establishes the connections between coroutines,
672
    therefore, you do not need to establish the targets.
673
674
    Params
675
    :param list coroutines: list of coroutines to pipe
676
    :return: The first coroutine of the pipe
677
    :rtype: coroutine
678
679
    Example:
680
681
    >>> coroutines = (transformer(lambda x: x+1),
682
                      filterer(lambda x: x%2==0),
683
                      printer())
684
    >>> p = pipe(coroutines)
685
    >>> p.send(1)
686
    2
687
    >>> p.send(4)    # No output
688
    >>> p.send(-1)
689
    0
690
691
    Here you can see a more useful example where we calculate the mean
692
    of the last 3 received messages and print them on screen only if they
693
    meet certain conditions:
694
695
    Example
696
697
    >>> coroutines = [sliding_window(3),
698
                      transformer(np.mean),
699
                      filterer(lambda x: 0<= x <= 1),
700
                      printer(prefix="Result: ")]
701
    >>> pipe = pipe(coroutines)
702
    >>> pipe.send(3)    # No output since mean <= 1
703
    >>> pipe.send(1)    # No output since mean <= 1
704
    >>> pipe.send(1)    # No output since mean <= 1
705
    >>> pipe.send(1)
706
    1.0
707
    >>> pipe.send(1)
708
    1.0
709
    >>> pipe.send(0.1)
710
    0.7
711
    >>> pipe.send(0.1)
712
    0.4
713
    """
714
    _coroutines = list(reversed(coroutines))
715
    pairs = list(zip(_coroutines[:], _coroutines[1:]))
716
    for p in pairs:
717
        p[1].send(p[0])
718
    return coroutines[0]
719