URRobot   F
last analyzed

Complexity

Total Complexity 84

Size/Duplication

Total Lines 423
Duplicated Lines 0 %

Importance

Changes 46
Bugs 3 Features 10
Metric Value
wmc 84
c 46
b 3
f 10
dl 0
loc 423
rs 1.5789

49 Methods

Rating   Name   Duplication   Size   Complexity  
A __init__() 0 18 2
A __repr__() 0 2 1
A __str__() 0 2 1
A get_digital_in_bits() 0 5 1
A movex() 0 13 4
A get_analog_in() 0 5 1
A get_digital_in() 0 5 1
A send_program() 0 8 1
A _format_move() 0 6 2
A get_digital_out_bits() 0 5 1
A get_force() 0 10 2
A movep() 0 5 1
A servoc() 0 5 1
A getl() 0 10 3
A set_payload() 0 13 2
A movec() 0 12 4
A movej() 0 12 4
A __exit__() 0 2 1
A movexs() 0 19 4
A _get_dist() 0 5 2
A _get_lin_dist() 0 9 3
A is_program_running() 0 7 1
A set_digital_out() 0 9 2
A set_gravity() 0 6 1
A __enter__() 0 2 1
A is_running() 0 6 1
A _get_joints_dist() 0 6 2
A speedx() 0 6 2
A movel() 0 5 1
A stopj() 0 2 1
A send_message() 0 6 1
A set_analog_out() 0 6 1
A set_tool_voltage() 0 6 1
A movels() 0 8 1
A stop() 0 2 1
A get_digital_out() 0 5 1
A stopl() 0 2 1
A close() 0 8 2
A get_analog_inputs() 0 5 1
A set_tcp() 0 6 1
A get_tcp_force() 0 6 1
A getj() 0 6 1
D _wait_for_move() 0 29 8
A get_realtime_monitor() 0 11 2
A down() 0 5 1
A translate() 0 9 1
A up() 0 7 1
A set_freedrive() 0 12 2
A set_simulation() 0 5 2

How to fix   Complexity   

Complex Class

Complex classes like URRobot often do a lot of different things. To break such a class down, we need to identify a cohesive component within that class. A common approach to find such a component is to look for fields/methods that share the same prefixes, or suffixes.

Once you have determined the fields that belong together, you can apply the Extract Class refactoring. If the component makes sense as a sub-class, Extract Subclass is also a candidate, and is often faster.

1
"""
2
Python library to control an UR robot through its TCP/IP interface
3
Documentation from universal robots:
4
http://support.universal-robots.com/URRobot/RemoteAccess
5
"""
6
7
import logging
8
9
from urx import urrtmon
10
from urx import ursecmon
11
12
__author__ = "Olivier Roulet-Dubonnet"
13
__copyright__ = "Copyright 2011-2015, Sintef Raufoss Manufacturing"
14
__license__ = "LGPLv3"
15
16
17
class RobotException(Exception):
18
    pass
19
20
21
class URRobot(object):
22
23
    """
24
    Python interface to socket interface of UR robot.
25
    programs are send to port 3002
26
    data is read from secondary interface(10Hz?) and real-time interface(125Hz) (called Matlab interface in documentation)
27
    Since parsing the RT interface uses som CPU, and does not support all robots versions, it is disabled by default
28
    The RT interfaces is only used for the get_force related methods
29
    Rmq: A program sent to the robot i executed immendiatly and any running program is stopped
30
    """
31
32
    def __init__(self, host, use_rt=False):
33
        self.logger = logging.getLogger("urx")
34
        self.host = host
35
        self.csys = None
36
37
        self.logger.debug("Opening secondary monitor socket")
38
        self.secmon = ursecmon.SecondaryMonitor(self.host)  # data from robot at 10Hz
39
40
        self.rtmon = None
41
        if use_rt:
42
            self.rtmon = self.get_realtime_monitor()
43
        # precision of joint movem used to wait for move completion
44
        # the value must be conservative! otherwise we may wait forever
45
        self.joinEpsilon = 0.01
46
        # It seems URScript is  limited in the character length of floats it accepts
47
        self.max_float_length = 6  # FIXME: check max length!!!
48
49
        self.secmon.wait()  # make sure we get data from robot before letting clients access our methods
50
51
    def __repr__(self):
52
        return "Robot Object (IP=%s, state=%s)" % (self.host, self.secmon.get_all_data()["RobotModeData"])
53
54
    def __str__(self):
55
        return self.__repr__()
56
57
    def __enter__(self):
58
        return self
59
60
    def __exit__(self, exc_type, exc_value, traceback):
61
        self.close()
62
63
    def is_running(self):
64
        """
65
        Return True if robot is running (not
66
        necessary running a program, it might be idle)
67
        """
68
        return self.secmon.running
69
70
    def is_program_running(self):
71
        """
72
        check if program is running.
73
        Warning!!!!!:  After sending a program it might take several 10th of
74
        a second before the robot enters the running state
75
        """
76
        return self.secmon.is_program_running()
77
78
    def send_program(self, prog):
79
        """
80
        send a complete program using urscript to the robot
81
        the program is executed immediatly and any runnning
82
        program is interrupted
83
        """
84
        self.logger.info("Sending program: " + prog)
85
        self.secmon.send_program(prog)
86
87
    def get_tcp_force(self, wait=True):
88
        """
89
        return measured force in TCP
90
        if wait==True, waits for next packet before returning
91
        """
92
        return self.rtmon.getTCFForce(wait)
93
94
    def get_force(self, wait=True):
95
        """
96
        length of force vector returned by get_tcp_force
97
        if wait==True, waits for next packet before returning
98
        """
99
        tcpf = self.get_tcp_force(wait)
100
        force = 0
101
        for i in tcpf:
102
            force += i**2
103
        return force**0.5
104
105
    def set_tcp(self, tcp):
106
        """
107
        set robot flange to tool tip transformation
108
        """
109
        prog = "set_tcp(p[{}, {}, {}, {}, {}, {}])".format(*tcp)
110
        self.send_program(prog)
111
112
    def set_payload(self, weight, cog=None):
113
        """
114
        set payload in Kg
115
        cog is a vector x,y,z
116
        if cog is not specified, then tool center point is used
117
        """
118
        if cog:
119
            cog = list(cog)
120
            cog.insert(0, weight)
121
            prog = "set_payload({}, ({},{},{}))".format(*cog)
122
        else:
123
            prog = "set_payload(%s)" % weight
124
        self.send_program(prog)
125
126
    def set_gravity(self, vector):
127
        """
128
        set direction of gravity
129
        """
130
        prog = "set_gravity(%s)" % list(vector)
131
        self.send_program(prog)
132
133
    def send_message(self, msg):
134
        """
135
        send message to the GUI log tab on the robot controller
136
        """
137
        prog = "textmsg(%s)" % msg
138
        self.send_program(prog)
139
140
    def set_digital_out(self, output, val):
141
        """
142
        set digital output. val is a bool
143
        """
144
        if val in (True, 1):
145
            val = "True"
146
        else:
147
            val = "False"
148
        self.send_program('digital_out[%s]=%s' % (output, val))
149
150
    def get_analog_inputs(self):
151
        """
152
        get analog input
153
        """
154
        return self.secmon.get_analog_inputs()
155
156
    def get_analog_in(self, nb, wait=False):
157
        """
158
        get analog input
159
        """
160
        return self.secmon.get_analog_in(nb, wait=wait)
161
162
    def get_digital_in_bits(self):
163
        """
164
        get digital output
165
        """
166
        return self.secmon.get_digital_in_bits()
167
168
    def get_digital_in(self, nb, wait=False):
169
        """
170
        get digital output
171
        """
172
        return self.secmon.get_digital_in(nb, wait)
173
174
    def get_digital_out(self, val, wait=False):
175
        """
176
        get digital output
177
        """
178
        return self.secmon.get_digital_out(val, wait=wait)
179
180
    def get_digital_out_bits(self, wait=False):
181
        """
182
        get digital output as a byte
183
        """
184
        return self.secmon.get_digital_out_bits(wait=wait)
185
186
    def set_analog_out(self, output, val):
187
        """
188
        set analog output, val is a float
189
        """
190
        prog = "set_analog_out(%s, %s)" % (output, val)
191
        self.send_program(prog)
192
193
    def set_tool_voltage(self, val):
194
        """
195
        set voltage to be delivered to the tool, val is 0, 12 or 24
196
        """
197
        prog = "set_tool_voltage(%s)" % (val)
198
        self.send_program(prog)
199
200
    def _wait_for_move(self, target, threshold=None, timeout=5, joints=False):
201
        """
202
        wait for a move to complete. Unfortunately there is no good way to know when a move has finished
203
        so for every received data from robot we compute a dist equivalent and when it is lower than
204
        'threshold' we return.
205
        if threshold is not reached within timeout, an exception is raised
206
        """
207
        self.logger.debug("Waiting for move completion using threshold %s and target %s", threshold, target)
208
        start_dist = self._get_dist(target, joints)
209
        if threshold is None:
210
            threshold = start_dist * 0.8
211
            if threshold < 0.001:  # roboten precision is limited
212
                threshold = 0.001
213
            self.logger.debug("No threshold set, setting it to %s", threshold)
214
        count = 0
215
        while True:
216
            if not self.is_running():
217
                raise RobotException("Robot stopped")
218
            dist = self._get_dist(target, joints)
219
            self.logger.debug("distance to target is: %s, target dist is %s", dist, threshold)
220
            if not self.secmon.is_program_running():
221
                if dist < threshold:
222
                    self.logger.debug("we are threshold(%s) close to target, move has ended", threshold)
223
                    return
224
                count += 1
225
                if count > timeout * 10:
226
                    raise RobotException("Goal not reached but no program has been running for {} seconds. dist is {}, threshold is {}, target is {}, current pose is {}".format(timeout, dist, threshold, target, URRobot.getl(self)))
227
            else:
228
                count = 0
229
230
    def _get_dist(self, target, joints=False):
231
        if joints:
232
            return self._get_joints_dist(target)
233
        else:
234
            return self._get_lin_dist(target)
235
236
    def _get_lin_dist(self, target):
237
        # FIXME: we have an issue here, it seems sometimes the axis angle received from robot
238
        pose = URRobot.getl(self, wait=True)
239
        dist = 0
240
        for i in range(3):
241
            dist += (target[i] - pose[i]) ** 2
242
        for i in range(3, 6):
243
            dist += ((target[i] - pose[i]) / 5) ** 2  # arbitraty length like
244
        return dist ** 0.5
245
246
    def _get_joints_dist(self, target):
247
        joints = self.getj(wait=True)
248
        dist = 0
249
        for i in range(6):
250
            dist += (target[i] - joints[i]) ** 2
251
        return dist ** 0.5
252
253
    def getj(self, wait=False):
254
        """
255
        get joints position
256
        """
257
        jts = self.secmon.get_joint_data(wait)
258
        return [jts["q_actual0"], jts["q_actual1"], jts["q_actual2"], jts["q_actual3"], jts["q_actual4"], jts["q_actual5"]]
259
260
    def speedx(self, command, velocities, acc, min_time):
261
        vels = [round(i, self.max_float_length) for i in velocities]
262
        vels.append(acc)
263
        vels.append(min_time)
264
        prog = "{}([{},{},{},{},{},{}], a={}, t_min={})".format(command, *vels)
265
        self.send_program(prog)
266
267
    def movej(self, joints, acc=0.1, vel=0.05, wait=True, relative=False, threshold=None):
268
        """
269
        move in joint space
270
        """
271
        if relative:
272
            l = self.getj()
273
            joints = [v + l[i] for i, v in enumerate(joints)]
274
        prog = self._format_move("movej", joints, acc, vel)
275
        self.send_program(prog)
276
        if wait:
277
            self._wait_for_move(joints[:6], threshold=threshold, joints=True)
278
            return self.getj()
279
280
    def movel(self, tpose, acc=0.01, vel=0.01, wait=True, relative=False, threshold=None):
281
        """
282
        Send a movel command to the robot. See URScript documentation.
283
        """
284
        return self.movex("movel", tpose, acc=acc, vel=vel, wait=wait, relative=relative, threshold=threshold)
285
286
    def movep(self, tpose, acc=0.01, vel=0.01, wait=True, relative=False, threshold=None):
287
        """
288
        Send a movep command to the robot. See URScript documentation.
289
        """
290
        return self.movex("movep", tpose, acc=acc, vel=vel, wait=wait, relative=relative, threshold=threshold)
291
292
    def servoc(self, tpose, acc=0.01, vel=0.01, wait=True, relative=False, threshold=None):
293
        """
294
        Send a servoc command to the robot. See URScript documentation.
295
        """
296
        return self.movex("servoc", tpose, acc=acc, vel=vel, wait=wait, relative=relative, threshold=threshold)
297
298
    def _format_move(self, command, tpose, acc, vel, radius=0, prefix=""):
299
        tpose = [round(i, self.max_float_length) for i in tpose]
300
        tpose.append(acc)
301
        tpose.append(vel)
302
        tpose.append(radius)
303
        return "{}({}[{},{},{},{},{},{}], a={}, v={}, r={})".format(command, prefix, *tpose)
304
305
    def movex(self, command, tpose, acc=0.01, vel=0.01, wait=True, relative=False, threshold=None):
306
        """
307
        Send a move command to the robot. since UR robotene have several methods this one
308
        sends whatever is defined in 'command' string
309
        """
310
        if relative:
311
            l = self.getl()
312
            tpose = [v + l[i] for i, v in enumerate(tpose)]
313
        prog = self._format_move(command, tpose, acc, vel, prefix="p")
314
        self.send_program(prog)
315
        if wait:
316
            self._wait_for_move(tpose[:6], threshold=threshold)
317
            return self.getl()
318
319
    def getl(self, wait=False, _log=True):
320
        """
321
        get TCP position
322
        """
323
        pose = self.secmon.get_cartesian_info(wait)
324
        if pose:
325
            pose = [pose["X"], pose["Y"], pose["Z"], pose["Rx"], pose["Ry"], pose["Rz"]]
326
        if _log:
327
            self.logger.debug("Received pose from robot: %s", pose)
328
        return pose
329
330
    def movec(self, pose_via, pose_to, acc=0.01, vel=0.01, wait=True, threshold=None):
331
        """
332
        Move Circular: Move to position (circular in tool-space)
333
        see UR documentation
334
        """
335
        pose_via = [round(i, self.max_float_length) for i in pose_via]
336
        pose_to = [round(i, self.max_float_length) for i in pose_to]
337
        prog = "movec(p%s, p%s, a=%s, v=%s, r=%s)" % (pose_via, pose_to, acc, vel, "0")
338
        self.send_program(prog)
339
        if wait:
340
            self._wait_for_move(pose_to, threshold=threshold)
341
            return self.getl()
342
343
    def movels(self, pose_list, acc=0.01, vel=0.01, radius=0.01, wait=True, threshold=None):
344
        """
345
        Concatenate several movel commands and applies a blending radius
346
        pose_list is a list of pose.
347
        This method is usefull since any new command from python
348
        to robot make the robot stop
349
        """
350
        return self.movexs("movel", pose_list, acc, vel, radius, wait, threshold=threshold)
351
352
    def movexs(self, command, pose_list, acc=0.01, vel=0.01, radius=0.01, wait=True, threshold=None):
353
        """
354
        Concatenate several movex commands and applies a blending radius
355
        pose_list is a list of pose.
356
        This method is usefull since any new command from python
357
        to robot make the robot stop
358
        """
359
        header = "def myProg():\n"
360
        end = "end\n"
361
        prog = header
362
        for idx, pose in enumerate(pose_list):
363
            if idx == (len(pose_list) - 1):
364
                radius = 0
365
            prog += self._format_move(command, pose, acc, vel, radius, prefix="p") + "\n"
366
        prog += end
367
        self.send_program(prog)
368
        if wait:
369
            self._wait_for_move(target=pose_list[-1], threshold=threshold)
370
            return self.getl()
371
372
    def stopl(self, acc=0.5):
373
        self.send_program("stopl(%s)" % acc)
374
375
    def stopj(self, acc=1.5):
376
        self.send_program("stopj(%s)" % acc)
377
378
    def stop(self):
379
        self.stopj()
380
381
    def close(self):
382
        """
383
        close connection to robot and stop internal thread
384
        """
385
        self.logger.info("Closing sockets to robot")
386
        self.secmon.close()
387
        if self.rtmon:
388
            self.rtmon.stop()
389
390
    def set_freedrive(self, val, timeout=60):
391
        """
392
        set robot in freedrive/backdrive mode where an operator can jog
393
        the robot to wished pose.
394
395
        Freedrive will timeout at 60 seconds.
396
        """
397
        if val:
398
            self.send_program("def myProg():\n\tfreedrive_mode()\n\tsleep({})\nend".format(timeout))
399
        else:
400
            # This is a non-existant program, but running it will stop freedrive
401
            self.send_program("def myProg():\n\tend_freedrive_mode()\nend")
402
403
    def set_simulation(self, val):
404
        if val:
405
            self.send_program("set sim")
406
        else:
407
            self.send_program("set real")
408
409
    def get_realtime_monitor(self):
410
        """
411
        return a pointer to the realtime monitor object
412
        usefull to track robot position for example
413
        """
414
        if not self.rtmon:
415
            self.logger.info("Opening real-time monitor socket")
416
            self.rtmon = urrtmon.URRTMonitor(self.host)  # som information is only available on rt interface
417
            self.rtmon.start()
418
        self.rtmon.set_csys(self.csys)
419
        return self.rtmon
420
421
    def translate(self, vect, acc=0.01, vel=0.01, wait=True, command="movel"):
422
        """
423
        move tool in base coordinate, keeping orientation
424
        """
425
        p = self.getl()
426
        p[0] += vect[0]
427
        p[1] += vect[1]
428
        p[2] += vect[2]
429
        return self.movex(command, p, vel=vel, acc=acc, wait=wait)
430
431
    def up(self, z=0.05, acc=0.01, vel=0.01):
432
        """
433
        Move up in csys z
434
        """
435
        p = self.getl()
436
        p[2] += z
437
        self.movel(p, acc=acc, vel=vel)
438
439
    def down(self, z=0.05, acc=0.01, vel=0.01):
440
        """
441
        Move down in csys z
442
        """
443
        self.up(-z, acc, vel)
444