Passed
Push — master ( 871c09...3747b1 )
by Daniel
01:09
created

DatabaseTalker.connect_to_database()   A

Complexity

Conditions 3

Size

Total Lines 14
Code Lines 14

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
cc 3
eloc 14
nop 4
dl 0
loc 14
rs 9.7
c 0
b 0
f 0
1
"""
2
CommandLineArgumentManagement - library to manage input parameters from command line
3
4
This library allows handling pre-configured arguments to be received from command line and use them
5
to call the main package functions
6
"""
7
# package helping out to work with SAP HANA
8
from hdbcli import dbapi
9
# package helping out to work with Oracle MySQL
10
import mysql.connector
11
from mysql.connector import errorcode
12
# package to facilitate time operations
13
from datetime import datetime, timedelta
14
# package facilitating Data Frames manipulation
15
import pandas as pd
16
17
18
class DatabaseTalker:
19
    conn = None
20
21
    @staticmethod
22
    def append_additional_columns_to_data_frame(local_logger, timered, data_frame, session_details):
23
        if 'additional-columns' in session_details:
24
            timered.start()
25
            for crt_column in session_details['additional-columns']:
26
                if crt_column['value'] == 'utcnow': # special case
27
                    data_frame[crt_column['name']] = datetime.utcnow()
28
                else:
29
                    data_frame[crt_column['name']] = crt_column['value']
30
            local_logger.info('Additional column(s) have been added to Pandas DataFrame')
31
            timered.stop()
32
33
    def connect_to_database(self, local_logger, timered, connection_details):
34
        timered.start()
35
        local_logger.info('I will attempt to connect to '
36
                          + connection_details['server-vendor-and-type'] + ' server, layer '
37
                          + connection_details['server-layer'] + ' which means ('
38
                          + 'server ' + connection_details['ServerName']
39
                          + ', port ' + str(connection_details['ServerPort'])
40
                          + ') using the username ' + connection_details['Username']
41
                          + ' (' + connection_details['Name'] + ')')
42
        if connection_details['server-vendor-and-type'] == 'SAP HANA':
43
            self.connect_to_database_hana(local_logger, connection_details)
44
        elif connection_details['server-vendor-and-type'] == 'Oracle MySQL':
45
            self.connect_to_database_mysql(local_logger, connection_details)
46
        timered.stop()
47
48
    def connect_to_database_hana(self, local_logger, connection_details):
49
        try:
50
            self.conn = dbapi.connect(
51
                address=connection_details['ServerName'],
52
                port=connection_details['ServerPort'],
53
                user=connection_details['Username'],
54
                password=connection_details['Password'],
55
                compress='TRUE',
56
            )
57
            local_logger.info('Connecting to  ' + connection_details['server-vendor-and-type']
58
                              + ' server completed')
59
        except mysql.connector.Error as err:
60
            local_logger.error('Error connecting to MySQL with details: ')
61
            local_logger.error(err)
62
63
    def connect_to_database_mysql(self, local_logger, connection_details):
64
        try:
65
            self.conn = mysql.connector.connect(
66
                host=connection_details['ServerName'],
67
                port=connection_details['ServerPort'],
68
                user=connection_details['Username'],
69
                password=connection_details['Password'],
70
                database='mysql',
71
                compress=True,
72
                autocommit=True,
73
                use_unicode=True,
74
                charset='utf8mb4',
75
                collation='utf8mb4_0900_ai_ci',
76
                get_warnings=True,
77
            )
78
            local_logger.info('Connecting to  ' + connection_details['server-vendor-and-type']
79
                              + ' server completed')
80
        except mysql.connector.Error as err:
81
            local_logger.error('Error connecting to MySQL with details: ')
82
            local_logger.error(err)
83
84
    @staticmethod
85
    def execute_query(local_logger, timered, given_cursor, given_query):
86
        try:
87
            timered.start()
88
            given_cursor.execute(given_query)
89
            try:
90
                pt = timedelta(microseconds=(given_cursor.server_processing_time() / 1000))
91
                local_logger.info('Query executed successfully ' + format(pt))
92
            except AttributeError:
93
                local_logger.info('Query executed successfully')
94
            timered.stop()
95
            return given_cursor
96
        except TypeError as e:
97
            local_logger.error('Error running the query: ')
98
            local_logger.error(e)
99
            timered.stop()
100
101
    @staticmethod
102
    def fetch_executed_query(local_logger, timered, given_cursor):
103
        timered.start()
104
        local_result_set = given_cursor.fetchall()
105
        local_logger.info('Result-set has been completely fetched and contains '
106
                          + str(len(local_result_set)) + ' rows')
107
        timered.stop()
108
        return local_result_set
109
110
    @staticmethod
111
    def get_column_names(local_logger, timered, given_cursor):
112
        timered.start()
113
        try:
114
            column_names = given_cursor.column_names
115
        except AttributeError:
116
            column_names = []
117
            for column_name, col2, col3, col4, col5, col6, col7 in given_cursor.description:
118
                column_names.append(column_name)
119
        local_logger.info('Result-set column name determination completed')
120
        timered.stop()
121
        return column_names
122
123
    @staticmethod
124
    def result_set_to_data_frame(local_logger, timered, given_columns_name, given_result_set):
125
        timered.start()
126
        df = pd.DataFrame(data=given_result_set, index=None, columns=given_columns_name)
127
        local_logger.info('Result-set has been loaded into Pandas DataFrame')
128
        timered.stop()
129
        return df
130