Completed
Push — master ( 55df2f...e17347 )
by Daniel
16s queued 12s
created

DatabaseTalker.get_column_names()   A

Complexity

Conditions 3

Size

Total Lines 13
Code Lines 13

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
cc 3
eloc 13
nop 4
dl 0
loc 13
rs 9.75
c 0
b 0
f 0
1
"""
2
DatabaseTalker - library to facilitate database communication
3
"""
4
# package to facilitate time operations
5
from datetime import datetime, timedelta
6
# package to add support for multi-language (i18n)
7
import gettext
8
# package helping out to work with SAP HANA
9
from hdbcli import dbapi
10
# package helping out to work with Oracle MySQL
11
import mysql.connector
12
import mysql.connector.errors
13
# package to handle files/folders and related metadata/operations
14
import os
15
# package facilitating Data Frames manipulation
16
import pandas as pd
17
# package to bring ability to check hostname availability
18
import socket
19
20
21
class DatabaseTalker:
22
    connection = None
23
    locale = None
24
25
    def __init__(self, in_language='en_US'):
26
        current_script = os.path.basename(__file__).replace('.py', '')
27
        lang_folder = os.path.join(os.path.dirname(__file__), current_script + '_Locale')
28
        self.locale = gettext.translation(current_script, lang_folder, languages=[in_language])
29
30
    def append_additional_columns_to_df(self, local_logger, timer, data_frame, session_details):
31
        resulted_data_frame = data_frame
32
        timer.start()
33
        for crt_column in session_details['additional-columns']:
34
            if crt_column['value'] == 'utcnow':
35
                resulted_data_frame[crt_column['name']] = datetime.utcnow()
36
            elif crt_column['value'] == 'now':
37
                resulted_data_frame[crt_column['name']] = datetime.now()
38
            else:
39
                resulted_data_frame[crt_column['name']] = crt_column['value']
40
        local_logger.info(self.locale.ngettext(
41
            'Additional {additional_columns_counted} column added to Pandas Data Frame',
42
            'Additional {additional_columns_counted} columns added to Pandas Data Frame',
43
                          len(session_details['additional-columns']))
44
                          .replace('{additional_columns_counted}',
45
                                   str(len(session_details['additional-columns']))))
46
        timer.stop()
47
        return resulted_data_frame
48
49
    def connect_to_database(self, local_logger, timer, connection_details):
50
        timer.start()
51
        local_logger.info(self.locale.gettext(
52
            'Connection to {server_vendor_and_type} server, layer {server_layer} '
53
            + 'which means (server {server_name}, port {server_port}) '
54
            + 'using the username {username} ({name_of_user})')
55
                          .replace('{server_vendor_and_type}',
56
                                   connection_details['server-vendor-and-type'])
57
                          .replace('{server_layer}', connection_details['server-layer'])
58
                          .replace('{server_name}', connection_details['ServerName'])
59
                          .replace('{server_port}', str(connection_details['ServerPort']))
60
                          .replace('{username}', connection_details['Username'])
61
                          .replace('{name_of_user}', connection_details['Name']))
62
        try:
63
            socket.gethostbyname(connection_details['ServerName'])
64
            if connection_details['server-vendor-and-type'] == 'SAP HANA':
65
                self.connect_to_database_hana(local_logger, connection_details)
66
            elif connection_details['server-vendor-and-type'] in ('MariaDB Foundation MariaDB',
67
                                                                  'Oracle MySQL'):
68
                self.connect_to_database_mysql(local_logger, connection_details)
69
        except socket.gaierror as err:
70
            local_logger.error('Hostname not found, connection will not be established')
71
            local_logger.error(err)
72
        timer.stop()
73
74
    def connect_to_database_hana(self, local_logger, connection_details):
75
        try:
76
            self.connection = dbapi.connect(
77
                address=connection_details['ServerName'],
78
                port=connection_details['ServerPort'],
79
                user=connection_details['Username'],
80
                password=connection_details['Password'],
81
                prefetch='FALSE',
82
                chopBlanks='TRUE',
83
                compress='TRUE',
84
                connDownRollbackError='TRUE',
85
                statementCacheSize=10,
86
            )
87
            local_logger.info(self.locale.gettext(
88
                'Connection to {server_vendor_and_type} server completed')
89
                              .replace('{server_vendor_and_type}',
90
                                       connection_details['server-vendor-and-type']))
91
        except ConnectionError as err:
92
            local_logger.error(self.locale.gettext(
93
                'Error connecting to {server_vendor_and_type} server with details')
94
                              .replace('{server_vendor_and_type}',
95
                                       connection_details['server-vendor-and-type']))
96
            local_logger.error(err)
97
98
    def connect_to_database_mysql(self, local_logger, connection_details):
99
        try:
100
            self.connection = mysql.connector.connect(
101
                host=connection_details['ServerName'],
102
                port=connection_details['ServerPort'],
103
                user=connection_details['Username'],
104
                password=connection_details['Password'],
105
                database='mysql',
106
                compress=True,
107
                autocommit=True,
108
                use_unicode=True,
109
                charset='utf8mb4',
110
                collation='utf8mb4_unicode_ci',
111
                get_warnings=True,
112
            )
113
            local_logger.info(self.locale.gettext(
114
                'Connection to {server_vendor_and_type} server completed')
115
                              .replace('{server_vendor_and_type}',
116
                                       connection_details['server-vendor-and-type']))
117
        except mysql.connector.Error as err:
118
            local_logger.error(self.locale.gettext(
119
                'Error connecting to {server_vendor_and_type} server with details')
120
                              .replace('{server_vendor_and_type}',
121
                                       connection_details['server-vendor-and-type']))
122
            local_logger.error(err)
123
124
    def execute_query(self, local_logger, timer, in_cursor, in_query, in_counted_parameters,
125
                      in_tuple_parameters):
126
        try:
127
            timer.start()
128
            if in_counted_parameters > 0:
129
                in_cursor.execute(in_query % in_tuple_parameters)
130
            else:
131
                in_cursor.execute(in_query)
132
            try:
133
                processing_tm = timedelta(microseconds=(in_cursor.server_processing_time() / 1000))
134
                local_logger.info(self.locale.gettext(
135
                    'Query executed successfully '
136
                    + 'having a server processing time of {processing_time}')
137
                                  .replace('{processing_time}', format(processing_tm)))
138
            except AttributeError:
139
                local_logger.info(self.locale.gettext('Query executed successfully'))
140
            timer.stop()
141
            return in_cursor
142
        except dbapi.ProgrammingError as e:
143
            local_logger.error(self.locale.gettext('Error running the query:'))
144
            local_logger.error(e)
145
            timer.stop()
146
147
    def fetch_executed_query(self, local_logger, timer, given_cursor):
148
        timer.start()
149
        local_result_set = None
150
        try:
151
            local_result_set = given_cursor.fetchall()
152
            local_logger.info(self.locale.gettext(
153
                'Result-set has been completely fetched and contains {rows_counted} rows')
154
                              .replace('{rows_counted}', str(len(local_result_set))))
155
        except ConnectionError as e:
156
            local_logger.info(self.locale.gettext('Connection problem encountered: '))
157
            local_logger.info(e)
158
        timer.stop()
159
        return local_result_set
160
161
    def get_column_names(self, local_logger, timer, given_cursor):
162
        timer.start()
163
        try:
164
            column_names = given_cursor.column_names
165
        except AttributeError:
166
            column_names = []
167
            for column_name, col2, col3, col4, col5, col6, col7 in given_cursor.description:
168
                column_names.append(column_name)
169
        local_logger.info(self.locale.gettext(
170
            'Result-set column name determination completed: {columns_name}')
171
                          .replace('{columns_name}', str(column_names)))
172
        timer.stop()
173
        return column_names
174
175
    def result_set_to_data_frame(self, local_logger, timer, given_columns_name, given_result_set):
176
        timer.start()
177
        df = pd.DataFrame(data=given_result_set, index=None, columns=given_columns_name)
178
        local_logger.info(self.locale.gettext('Result-set has been loaded into Pandas Data Frame'))
179
        timer.stop()
180
        return df
181