FSinfo   A
last analyzed

Complexity

Total Complexity 23

Size/Duplication

Total Lines 127
Duplicated Lines 0 %

Importance

Changes 0
Metric Value
wmc 23
dl 0
loc 127
c 0
b 0
f 0
rs 10

10 Methods

Rating   Name   Duplication   Size   Complexity  
A getRestartSofia() 0 4 1
A _execCmd() 0 16 2
A _connect() 0 12 3
A getReloadGateway() 0 4 1
A getReloadACL() 0 4 1
B _execShowCmd() 0 19 7
A getChannelCount() 0 7 1
A _execShowCountCmd() 0 14 3
A __del__() 0 4 2
B __init__() 0 25 2
1
# Copyright 2013 Mathias WOLFF
2
# This file is part of pyfreebilling.
3
#
4
# pyfreebilling is free software: you can redistribute it and/or modify
5
# it under the terms of the GNU General Public License as published by
6
# the Free Software Foundation, either version 3 of the License, or
7
# (at your option) any later version.
8
#
9
# pyfreebilling is distributed in the hope that it will be useful,
10
# but WITHOUT ANY WARRANTY; without even the implied warranty of
11
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12
# GNU General Public License for more details.
13
#
14
# You should have received a copy of the GNU General Public License
15
# along with pyfreebilling. If not, see <http://www.gnu.org/licenses/>
16
17
import re
18
import ESL
19
20
21
# Default
22
defaultESLport = 8021
23
defaultESLsecret = 'ClueCon'
24
conn_timeout = 5
25
26
27
class FSinfo:
28
    """Class that establishes connection to FreeSWITCH ESL Interface
29
    to retrieve statistics on operation.
30
31
    """
32
33
    def __init__(self, host='127.0.0.1', port=defaultESLport, secret="ClueCon", 
34
                 autoInit=True):
35
        """Initialize connection to FreeSWITCH ESL Interface.
36
        
37
        @param host:     FreeSWITCH Host
38
        @param port:     FreeSWITCH ESL Port
39
        @param secret: FreeSWITCH ESL Secret
40
        @param autoInit: If True connect to FreeSWITCH ESL Interface on 
41
                         instantiation.
42
                         
43
        >>> fs = FSinfo(host='127.0.0.1', port=defaultESLport, secret="ClueCon")
44
		>>> print fs.getChannelCount()
45
		0
46
		>>> print fs.getReloadGateway()
47
48
        """
49
        # Set Connection Parameters
50
        self._eslconn = None
51
        self._eslhost = host or '127.0.0.1'
52
        self._eslport = int(port or defaultESLport)
53
        self._eslpass = secret or defaultESLsecret
54
        
55
        ESL.eslSetLogLevel(0)
56
        if autoInit:
57
            self._connect()
58
59
    def __del__(self):
60
        """Cleanup."""
61
        if self._eslconn is not None:
62
            del self._eslconn
63
64
    def _connect(self):
65
        """Connect to FreeSWITCH ESL Interface."""
66
        try:
67
            self._eslconn = ESL.ESLconnection(self._eslhost, 
68
                                              str(self._eslport), 
69
                                              self._eslpass)
70
        except:
71
            pass
72
        if not self._eslconn.connected():
73
            raise Exception(
74
                "Connection to FreeSWITCH ESL Interface on host %s and port %d failed."
75
                % (self._eslhost, self._eslport)
76
                )
77
    
78
    def _execCmd(self, cmd, args):
79
        """Execute command and return result body as list of lines.
80
        
81
            @param cmd:  Command string.
82
            @param args: Command arguments string. 
83
            @return:     Result dictionary.
84
            
85
        """
86
        command = cmd + " " + args
87
        print command
88
        print command.encode('utf-8')
89
        output = self._eslconn.sendRecv(command.encode('utf-8'))
90
        body = output.getBody()
91
        if body:
92
            return body.splitlines()
93
        return None
94
    
95
    def _execShowCmd(self, showcmd):
96
        """Execute 'show' command and return result dictionary.
97
        
98
            @param cmd: Command string.
99
            @return: Result dictionary.
100
            
101
        """
102
        result = None
103
        lines = self._execCmd("show", showcmd)
104
        if lines and len(lines) >= 2 and lines[0] != '' and lines[0][0] != '-':
105
            result = {}
106
            result['keys'] = lines[0].split(',')
107
            items = []
108
            for line in lines[1:]:
109
                if line == '':
110
                    break
111
                items.append(line.split(','))
112
            result['items'] = items
113
        return result
114
    
115
    def _execShowCountCmd(self, showcmd):
116
        """Execute 'show' command and return result dictionary.
117
        
118
            @param cmd: Command string.
119
            @return: Result dictionary.
120
            
121
        """
122
        result = None
123
        lines = self._execCmd("api show", showcmd + " count")
124
        for line in lines:
125
            mobj = re.match('\s*(\d+)\s+total', line)
126
            if mobj:
127
                return int(mobj.group(1))
128
        return result
129
130
    def getChannelCount(self):
131
        """Get number of active channels from FreeSWITCH.
132
        
133
        @return: Integer or None.
134
        
135
        """
136
        return self._execShowCountCmd("channels")
137
        
138
    def getReloadACL(self):
139
        """Reload ACL"""
140
        result = None
141
        return self._execCmd("bgapi", "reloadacl")
142
143
        
144
    def getReloadGateway(self, profile_name):
145
        """Reload sofia's gateway"""
146
        result = None
147
        return self._execCmd("bgapi", "sofia profile " + profile_name + " rescan reloadxml")
148
        
149
        
150
    def getRestartSofia(self, profile_name):
151
        """Restart sofia profile"""
152
        result = None
153
        return self._execCmd("bgapi", "sofia profile " + profile_name + " restart")
154
        
155
        
156
# fs = FSinfo(host='127.0.0.1', port=defaultESLport, secret="ClueCon")
157
# print fs.getChannelCount()
158
# print fs.getReloadGateway("external")
159