Passed
Push — development/test ( 6383cd...222d55 )
by Daniel
02:57
created

DatabaseTalker.get_column_names()   A

Complexity

Conditions 3

Size

Total Lines 13
Code Lines 13

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
cc 3
eloc 13
nop 4
dl 0
loc 13
rs 9.75
c 0
b 0
f 0
1
"""
2
DatabaseTalker - library to facilitate database communication
3
"""
4
# package to facilitate time operations
5
from datetime import datetime, timedelta
6
# package to add support for multi-language (i18n)
7
import gettext
8
# package helping out to work with SAP HANA
9
from hdbcli import dbapi
10
# package helping out to work with Oracle MySQL
11
import mysql.connector
12
import mysql.connector.errors
13
# package to handle files/folders and related metadata/operations
14
import os
15
# package facilitating Data Frames manipulation
16
import pandas as pd
17
18
19
class DatabaseTalker:
20
    conn = None
21
    lcl = None
22
23
    def __init__(self, default_language='en_US'):
24
        current_script = os.path.basename(__file__).replace('.py', '')
25
        lang_folder = os.path.join(os.path.dirname(__file__), current_script + '_Locale')
26
        self.lcl = gettext.translation(current_script, lang_folder, languages=[default_language])
27
28
    def append_additional_columns_to_df(self, local_logger, timered, data_frame, session_details):
29
        resulted_data_frame = data_frame
30
        if 'additional-columns' in session_details:
31
            timered.start()
32
            for crt_column in session_details['additional-columns']:
33
                if crt_column['value'] == 'utcnow':
34
                    resulted_data_frame[crt_column['name']] = datetime.utcnow()
35
                elif crt_column['value'] == 'now':
36
                    resulted_data_frame[crt_column['name']] = datetime.now()
37
                else:
38
                    resulted_data_frame[crt_column['name']] = crt_column['value']
39
            local_logger.info(self.lcl.ngettext( \
40
                'Additional {additional_columns_counted} column added to Pandas Data Frame',
41
                'Additional {additional_columns_counted} columns added to Pandas Data Frame') \
42
                              .replace('{additional_columns_counted}',
43
                                       str(len(session_details['additional-columns']))))
44
            timered.stop()
45
        return resulted_data_frame
46
47
    def connect_to_database(self, local_logger, timered, connection_details):
48
        timered.start()
49
        local_logger.info(self.lcl.gettext( \
50
            'Connection to {server_vendor_and_type} server, layer {server_layer} '
51
            + 'which means (server {server_name}, port {server_port}) '
52
            + 'using the username {username} ({name_of_user})') \
53
                          .replace('{server_vendor_and_type}',
54
                                   connection_details['server-vendor-and-type']) \
55
                          .replace('{server_layer}', connection_details['server-layer']) \
56
                          .replace('{server_name}', connection_details['ServerName']) \
57
                          .replace('{server_port}', connection_details['ServerPort']) \
58
                          .replace('{username}', connection_details['Username']) \
59
                          .replace('{name_of_user}', connection_details['Name']))
60
        if connection_details['server-vendor-and-type'] == 'SAP HANA':
61
            self.connect_to_database_hana(local_logger, connection_details)
62
        elif connection_details['server-vendor-and-type'] in ('MariaDB Foundation MariaDB',
63
                                                              'Oracle MySQL'):
64
            self.connect_to_database_mysql(local_logger, connection_details)
65
        timered.stop()
66
67
    def connect_to_database_hana(self, local_logger, connection_details):
68
        try:
69
            self.conn = dbapi.connect(
70
                address=connection_details['ServerName'],
71
                port=connection_details['ServerPort'],
72
                user=connection_details['Username'],
73
                password=connection_details['Password'],
74
                prefetch='FALSE',
75
                chopBlanks='TRUE',
76
                compress='TRUE',
77
                connDownRollbackError='TRUE',
78
                statementCacheSize=10,
79
            )
80
            local_logger.info(self.lcl.gettext( \
81
                'Connection to {server_vendor_and_type} server completed') \
82
                              .replace('{server_vendor_and_type}',
83
                                       connection_details['server-vendor-and-type']))
84
        except ConnectionError as err:
85
            local_logger.error(self.lcl.gettext( \
86
                'Error connecting to {server_vendor_and_type} server with details') \
87
                              .replace('{server_vendor_and_type}',
88
                                       connection_details['server-vendor-and-type']))
89
            local_logger.error(err)
90
91
    def connect_to_database_mysql(self, local_logger, connection_details):
92
        try:
93
            self.conn = mysql.connector.connect(
94
                host=connection_details['ServerName'],
95
                port=connection_details['ServerPort'],
96
                user=connection_details['Username'],
97
                password=connection_details['Password'],
98
                database='mysql',
99
                compress=True,
100
                autocommit=True,
101
                use_unicode=True,
102
                charset='utf8mb4',
103
                collation='utf8mb4_unicode_ci',
104
                get_warnings=True,
105
            )
106
            local_logger.info(self.lcl.gettext( \
107
                'Connection to {server_vendor_and_type} server completed') \
108
                              .replace('{server_vendor_and_type}',
109
                                       connection_details['server-vendor-and-type']))
110
        except mysql.connector.Error as err:
111
            local_logger.error(self.lcl.gettext( \
112
                'Error connecting to {server_vendor_and_type} server with details') \
113
                              .replace('{server_vendor_and_type}',
114
                                       connection_details['server-vendor-and-type']))
115
            local_logger.error(err)
116
117
    def execute_query(self, local_logger, timered, in_cursor, in_query, in_counted_parameters,
118
                      in_tuple_parameters):
119
        try:
120
            timered.start()
121
            if in_counted_parameters > 0:
122
                in_cursor.execute(in_query % in_tuple_parameters)
123
            else:
124
                in_cursor.execute(in_query)
125
            try:
126
                processing_tm = timedelta(microseconds=(in_cursor.server_processing_time() / 1000))
127
                local_logger.info(self.lcl.gettext( \
128
                    'Query executed successfully '
129
                    + 'having a server processing time of {processing_time}') \
130
                                  .replace('{processing_time}', format(processing_tm)))
131
            except AttributeError:
132
                local_logger.info(self.lcl.gettext('Query executed successfully'))
133
            timered.stop()
134
            return in_cursor
135
        except TypeError as e:
136
            local_logger.error(self.lcl.gettext('Error running the query:'))
137
            local_logger.error(e)
138
            timered.stop()
139
140
    def fetch_executed_query(self, local_logger, timered, given_cursor):
141
        timered.start()
142
        local_result_set = None
143
        try:
144
            local_result_set = given_cursor.fetchall()
145
            local_logger.info(self.lcl.gettext( \
146
                'Result-set has been completely fetched and contains {rows_counted} rows') \
147
                              .replace('{rows_counted}', str(len(local_result_set))))
148
        except ConnectionError as e:
149
            local_logger.info(self.lcl.gettext('Connection problem encountered: '))
150
            local_logger.info(e)
151
        timered.stop()
152
        return local_result_set
153
154
    def get_column_names(self, local_logger, timered, given_cursor):
155
        timered.start()
156
        try:
157
            column_names = given_cursor.column_names
158
        except AttributeError:
159
            column_names = []
160
            for column_name, col2, col3, col4, col5, col6, col7 in given_cursor.description:
161
                column_names.append(column_name)
162
        local_logger.info(self.lcl.gettext( \
163
            'Result-set column name determination completed: {columns_name}') \
164
                          .replace('{columns_name}', str(column_names)))
165
        timered.stop()
166
        return column_names
167
168
    def result_set_to_data_frame(self, local_logger, timered, given_columns_name, given_result_set):
169
        timered.start()
170
        df = pd.DataFrame(data=given_result_set, index=None, columns=given_columns_name)
171
        local_logger.info(self.lcl.gettext('Result-set has been loaded into Pandas Data Frame'))
172
        timered.stop()
173
        return df
174