db_extractor.DatabaseTalker   A
last analyzed

Complexity

Total Complexity 23

Size/Duplication

Total Lines 185
Duplicated Lines 0 %

Importance

Changes 0
Metric Value
eloc 161
dl 0
loc 185
rs 10
c 0
b 0
f 0
wmc 23

9 Methods

Rating   Name   Duplication   Size   Complexity  
A DatabaseTalker.connect_to_database() 0 24 4
A DatabaseTalker.connect_to_database_mysql() 0 25 2
A DatabaseTalker.get_column_names() 0 13 3
A DatabaseTalker.connect_to_database_hana() 0 23 2
A DatabaseTalker.result_set_to_data_frame() 0 6 1
A DatabaseTalker.append_additional_columns_to_df() 0 18 4
A DatabaseTalker.__init__() 0 8 1
A DatabaseTalker.fetch_executed_query() 0 13 2
A DatabaseTalker.execute_query() 0 22 4
1
"""
2
DatabaseTalker - library to facilitate database communication
3
"""
4
# package to facilitate time operations
5
from datetime import datetime, timedelta
6
# package to add support for multi-language (i18n)
7
import gettext
8
# package helping out to work with SAP HANA
9
from hdbcli import dbapi
10
# package helping out to work with Oracle MySQL
11
import mysql.connector
12
import mysql.connector.errors
13
# package to handle files/folders and related metadata/operations
14
import os
15
# package facilitating Data Frames manipulation
16
import pandas as pd
17
# package to bring ability to check hostname availability
18
import socket
19
20
21
class DatabaseTalker:
22
    connection = None
23
    locale = None
24
25
    def __init__(self, in_language='en_US'):
26
        file_parts = os.path.normpath(os.path.abspath(__file__)).replace('\\', os.path.altsep)\
27
            .split(os.path.altsep)
28
        locale_domain = file_parts[(len(file_parts)-1)].replace('.py', '')
29
        locale_folder = os.path.normpath(os.path.join(
30
            os.path.join(os.path.altsep.join(file_parts[:-2]), 'project_locale'), locale_domain))
31
        self.locale = gettext.translation(locale_domain, localedir=locale_folder,
32
                                          languages=[in_language], fallback=True)
33
34
    def append_additional_columns_to_df(self, local_logger, timer, data_frame, session_details):
35
        resulted_data_frame = data_frame
36
        timer.start()
37
        for crt_column in session_details['additional-columns']:
38
            if crt_column['value'] == 'utcnow':
39
                resulted_data_frame[crt_column['name']] = datetime.utcnow()
40
            elif crt_column['value'] == 'now':
41
                resulted_data_frame[crt_column['name']] = datetime.now()
42
            else:
43
                resulted_data_frame[crt_column['name']] = crt_column['value']
44
        local_logger.info(self.locale.ngettext(
45
            'Additional {additional_columns_counted} column added to Pandas Data Frame',
46
            'Additional {additional_columns_counted} columns added to Pandas Data Frame',
47
                          len(session_details['additional-columns']))
48
                          .replace('{additional_columns_counted}',
49
                                   str(len(session_details['additional-columns']))))
50
        timer.stop()
51
        return resulted_data_frame
52
53
    def connect_to_database(self, local_logger, timer, connection_details):
54
        timer.start()
55
        local_logger.info(self.locale.gettext(
56
            'Connection to {server_vendor_and_type} server, layer {server_layer} '
57
            + 'which means (server {server_name}, port {server_port}) '
58
            + 'using the username {username} ({name_of_user})')
59
                          .replace('{server_vendor_and_type}',
60
                                   connection_details['server-vendor-and-type'])
61
                          .replace('{server_layer}', connection_details['server-layer'])
62
                          .replace('{server_name}', connection_details['ServerName'])
63
                          .replace('{server_port}', str(connection_details['ServerPort']))
64
                          .replace('{username}', connection_details['Username'])
65
                          .replace('{name_of_user}', connection_details['Name']))
66
        try:
67
            socket.gethostbyname(connection_details['ServerName'])
68
            if connection_details['server-vendor-and-type'] == 'SAP HANA':
69
                self.connect_to_database_hana(local_logger, connection_details)
70
            elif connection_details['server-vendor-and-type'] in ('MariaDB Foundation MariaDB',
71
                                                                  'Oracle MySQL'):
72
                self.connect_to_database_mysql(local_logger, connection_details)
73
        except socket.gaierror as err:
74
            local_logger.error('Hostname not found, connection will not be established')
75
            local_logger.error(err)
76
        timer.stop()
77
78
    def connect_to_database_hana(self, local_logger, connection_details):
79
        try:
80
            self.connection = dbapi.connect(
81
                address=connection_details['ServerName'],
82
                port=connection_details['ServerPort'],
83
                user=connection_details['Username'],
84
                password=connection_details['Password'],
85
                prefetch='FALSE',
86
                chopBlanks='TRUE',
87
                compress='TRUE',
88
                connDownRollbackError='TRUE',
89
                statementCacheSize=10,
90
            )
91
            local_logger.info(self.locale.gettext(
92
                'Connection to {server_vendor_and_type} server completed')
93
                              .replace('{server_vendor_and_type}',
94
                                       connection_details['server-vendor-and-type']))
95
        except ConnectionError as err:
96
            local_logger.error(self.locale.gettext(
97
                'Error connecting to {server_vendor_and_type} server with details')
98
                              .replace('{server_vendor_and_type}',
99
                                       connection_details['server-vendor-and-type']))
100
            local_logger.error(err)
101
102
    def connect_to_database_mysql(self, local_logger, connection_details):
103
        try:
104
            self.connection = mysql.connector.connect(
105
                host=connection_details['ServerName'],
106
                port=connection_details['ServerPort'],
107
                user=connection_details['Username'],
108
                password=connection_details['Password'],
109
                database='mysql',
110
                compress=True,
111
                autocommit=True,
112
                use_unicode=True,
113
                charset='utf8mb4',
114
                collation='utf8mb4_unicode_ci',
115
                get_warnings=True,
116
            )
117
            local_logger.info(self.locale.gettext(
118
                'Connection to {server_vendor_and_type} server completed')
119
                              .replace('{server_vendor_and_type}',
120
                                       connection_details['server-vendor-and-type']))
121
        except mysql.connector.Error as err:
122
            local_logger.error(self.locale.gettext(
123
                'Error connecting to {server_vendor_and_type} server with details')
124
                              .replace('{server_vendor_and_type}',
125
                                       connection_details['server-vendor-and-type']))
126
            local_logger.error(err)
127
128
    def execute_query(self, local_logger, timer, in_cursor, in_query, in_counted_parameters,
129
                      in_tuple_parameters):
130
        try:
131
            timer.start()
132
            if in_counted_parameters > 0:
133
                in_cursor.execute(in_query % in_tuple_parameters)
134
            else:
135
                in_cursor.execute(in_query)
136
            try:
137
                processing_tm = timedelta(microseconds=(in_cursor.server_processing_time() / 1000))
138
                local_logger.info(self.locale.gettext(
139
                    'Query executed successfully '
140
                    + 'having a server processing time of {processing_time}')
141
                                  .replace('{processing_time}', format(processing_tm)))
142
            except AttributeError:
143
                local_logger.info(self.locale.gettext('Query executed successfully'))
144
            timer.stop()
145
            return in_cursor
146
        except dbapi.ProgrammingError as e:
147
            local_logger.error(self.locale.gettext('Error running the query:'))
148
            local_logger.error(e)
149
            timer.stop()
150
151
    def fetch_executed_query(self, local_logger, timer, given_cursor):
152
        timer.start()
153
        local_result_set = None
154
        try:
155
            local_result_set = given_cursor.fetchall()
156
            local_logger.info(self.locale.gettext(
157
                'Result-set has been completely fetched and contains {rows_counted} rows')
158
                              .replace('{rows_counted}', str(len(local_result_set))))
159
        except ConnectionError as e:
160
            local_logger.info(self.locale.gettext('Connection problem encountered: '))
161
            local_logger.info(e)
162
        timer.stop()
163
        return local_result_set
164
165
    def get_column_names(self, local_logger, timer, given_cursor):
166
        timer.start()
167
        try:
168
            column_names = given_cursor.column_names
169
        except AttributeError:
170
            column_names = []
171
            for column_name, col2, col3, col4, col5, col6, col7 in given_cursor.description:
172
                column_names.append(column_name)
173
        local_logger.info(self.locale.gettext(
174
            'Result-set column name determination completed: {columns_name}')
175
                          .replace('{columns_name}', str(column_names)))
176
        timer.stop()
177
        return column_names
178
179
    def result_set_to_data_frame(self, local_logger, timer, given_columns_name, given_result_set):
180
        timer.start()
181
        df = pd.DataFrame(data=given_result_set, index=None, columns=given_columns_name)
182
        local_logger.info(self.locale.gettext('Result-set has been loaded into Pandas Data Frame'))
183
        timer.stop()
184
        return df
185